Skip to content

Commit

Permalink
Merge pull request #2 from shettyh/callable_impl
Browse files Browse the repository at this point in the history
Callable implementation
  • Loading branch information
shettyh authored Sep 20, 2017
2 parents 9e531aa + 4b595d8 commit bb6533e
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 21 deletions.
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Golang Threadpool implementation
Scalable threadpool implementation using Go to handle the huge network trafic.

## Install

`go get github.com/shettyh/threadpool`

## Usage

## Threadpool
### Threadpool
- Implement `Runnable` interface for tha task that needs to be executed. For example


Expand All @@ -24,10 +28,30 @@ Scalable threadpool implementation using Go to handle the huge network trafic.
task:=&MyTask{}
pool.Execute(task)
```
- Using `Callable` task
```
type MyTaskCallable struct { }
func (c *MyTaskCallable) Call() interface{} {
//Do task
return result
}
//Execute callable task
task := &MyTaskCallable{}
future := pool.ExecuteFuture(task)
//Check if the task is done
isDone := future.IsDone() // true/false
//Get response , blocking call
result := future.Get()
```

## Scheduled threadpool
### Scheduled threadpool

- Create instance of 'ScheduledThreadPool' with number of workers required
- Create instance of `ScheduledThreadPool` with number of workers required
```
schedulerPool:= threadpool.NewScheduledThreadPool(10)
```
Expand All @@ -37,5 +61,5 @@ Scalable threadpool implementation using Go to handle the huge network trafic.
pool.ScheduleOnce(task, time.Second*20) // Time delay is in seconds only as of now
```

## Note
### Note
This still in works
30 changes: 30 additions & 0 deletions callable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package threadpool

// Callable, the tasks which returns the output after exit should implement this interface
type Callable interface {
Call() interface{}
}

// Future, is the handle returned after submitting a callable task to the thread pool
type Future struct {
response chan interface{}
done bool
}

// callableTask is internally used to wrap the callable and future together
// So that the worker can send the response back through channel provided in Future object
type callableTask struct {
Task Callable
Handle *Future
}

// Get returns the response of the Callable task when done
// Is is the blocking call it waits for the execution to complete
func (f *Future) Get() interface{}{
return <-f.response
}

// IsDone returns true if the execution is already done
func (f *Future) IsDone() bool{
return f.done
}
10 changes: 4 additions & 6 deletions scheduled_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// ScheduledThreadPool
// Schedules the task with the given delay
type ScheduledThreadPool struct {
workers chan chan Runnable
workers chan chan interface{}
tasks *sync.Map
noOfWorkers int
counter uint64
Expand All @@ -19,7 +19,7 @@ type ScheduledThreadPool struct {
func NewScheduledThreadPool(noOfWorkers int) *ScheduledThreadPool {
pool := &ScheduledThreadPool{}
pool.noOfWorkers = noOfWorkers
pool.workers = make(chan chan Runnable, noOfWorkers)
pool.workers = make(chan chan interface{}, noOfWorkers)
pool.tasks = new(sync.Map)
pool.createPool()
return pool
Expand Down Expand Up @@ -58,14 +58,12 @@ func (stf *ScheduledThreadPool) intervalRunner() {

// For each tasks , get a worker from the pool and run the task
for _, val := range currentTasksSet.GetAll() {
job := val.(Runnable)

go func(job Runnable) {
go func(job interface{}) {
// get the worker from pool who is free
worker := <-stf.workers
// Submit the job to the worker
worker <- job
}(job)
}(val)
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions threadpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ type ThreadPool struct {
QueueSize int64
NoOfWorkers int

jobQueue chan Runnable
workerPool chan chan Runnable
jobQueue chan interface{}
workerPool chan chan interface{}
}

// NewThreadPool creates thread pool
func NewThreadPool(noOfWorkers int, queueSize int64) *ThreadPool {
threadPool := &ThreadPool{QueueSize: queueSize, NoOfWorkers: noOfWorkers}
threadPool.jobQueue = make(chan Runnable, queueSize)
threadPool.workerPool = make(chan chan Runnable, noOfWorkers)
threadPool.jobQueue = make(chan interface{}, queueSize)
threadPool.workerPool = make(chan chan interface{}, noOfWorkers)

threadPool.createPool()
return threadPool
Expand All @@ -25,6 +25,15 @@ func (t *ThreadPool) Execute(task Runnable) {
t.jobQueue <- task
}

// ExecuteFuture will submit the task to the threadpool and return the response handle
func (t *ThreadPool) ExecuteFuture(task Callable) *Future {
// Create future and task
handle:= &Future{response:make(chan interface{})}
futureTask:= callableTask{Task: task,Handle: handle}
t.jobQueue <- futureTask
return futureTask.Handle
}

// createPool creates the workers and start listening on the jobQueue
func (t *ThreadPool) createPool() {
for i := 0; i < t.NoOfWorkers; i++ {
Expand All @@ -42,7 +51,7 @@ func (t *ThreadPool) dispatch() {
select {
case job := <-t.jobQueue:
// Got job
go func(job Runnable) {
go func(job interface{}) {
//Find a worker for the job
jobChannel := <-t.workerPool
//Submit job to the worker
Expand Down
16 changes: 16 additions & 0 deletions threadpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ func TestThreadPool_Execute(t *testing.T) {
}
}

func TestThreadPool_ExecuteFuture(t *testing.T) {
task:= &TestTaskFuture{}
handle:=Pool.ExecuteFuture(task)
response := handle.Get()
if !handle.IsDone() {
t.Fail()
}
fmt.Println("Thread done ",response)
}

type TestTask struct {
TestData *TestData
}
Expand All @@ -44,3 +54,9 @@ func (t *TestTask) Run() {
fmt.Println("Running the task")
t.TestData.Val = "changed"
}

type TestTaskFuture struct {}

func (t *TestTaskFuture) Call() interface{} {
return "Done"
}
22 changes: 16 additions & 6 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package threadpool

// Worker type holds the job channel and passed worker pool
type Worker struct {
jobChannel chan Runnable
workerPool chan chan Runnable
jobChannel chan interface{}
workerPool chan chan interface{}
}

// NewWorker creates the new worker
func NewWorker(workerPool chan chan Runnable) *Worker {
return &Worker{workerPool: workerPool, jobChannel: make(chan Runnable)}
func NewWorker(workerPool chan chan interface{}) *Worker {
return &Worker{workerPool: workerPool, jobChannel: make(chan interface{})}
}

// Start starts the worker by listening to the job channel
Expand All @@ -21,8 +21,18 @@ func (w Worker) Start() {
select {
// Wait for the job
case job := <-w.jobChannel:
// Execute the job
job.Run()
// Execute the job based on the task type
switch job.(type) {
case Runnable:
job.(Runnable).Run()
break
case callableTask:
task := job.(callableTask)
response := task.Task.Call()
task.Handle.done=true
task.Handle.response <- response
break
}
}
}
}()
Expand Down

0 comments on commit bb6533e

Please sign in to comment.