Skip to content

Commit

Permalink
feat(pool): add option to bound task queue (v2) (#99)
Browse files Browse the repository at this point in the history
- Add support to create bounded pools by specifying a queue size (`WithQueueSize` option).
- Add support to choose how to deal with tasks submitted when the queue is full (`WithNonBlocking` option).
- Ensure `RunningWorkers()` method in subpools reflect the actual number of workers running tasks belonging to the subpool.
- Allow overriding pool options when creating a subpool via `NewSupool`.
- Simplify pool submission logic and remove dispatcher goroutine.
- Simplify subpool implementation.
  • Loading branch information
alitto authored Feb 17, 2025
1 parent dfb31e5 commit e2fee68
Show file tree
Hide file tree
Showing 21 changed files with 673 additions and 937 deletions.
50 changes: 25 additions & 25 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,37 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
cache: false
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
cache: false

- name: Test
run: make test-ci
- name: Test
run: make test-ci
codecov:
name: Coverage report
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: false
- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: false

- name: Test
run: make coverage
- name: Test
run: make coverage

- uses: codecov/codecov-action@v5
with:
files: coverage.out
fail_ci_if_error: true
verbose: true
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
- uses: codecov/codecov-action@v5
with:
files: coverage.out
fail_ci_if_error: true
verbose: true
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ Some common use cases include:
- Complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring)
- Configurable parent context to stop all workers when it is cancelled
- **New features in v2**:
- Unbounded task queues
- Bounded or Unbounded task queues
- Submission of tasks that return results
- Awaitable task completion
- Type safe APIs for tasks that return errors or results
- Panics recovery (panics are captured and returned as errors)
- Subpools with a fraction of the parent pool's maximum number of workers
- Blocking and non-blocking submission of tasks when the queue is full
- [API reference](https://pkg.go.dev/github.com/alitto/pond/v2)

## Installation
Expand Down Expand Up @@ -386,6 +388,24 @@ if err != nil {
}
```

### Bounded task queues (v2)

By default, task queues are unbounded, meaning that tasks are queued indefinitely until the pool is stopped (or the process runs out of memory). You can limit the number of tasks that can be queued by setting a queue size when creating a pool (`WithQueueSize` option).

``` go
// Create a pool with a maximum of 10 tasks in the queue
pool := pond.NewPool(1, pond.WithQueueSize(10))
```

**Blocking vs non-blocking task submission**

When a pool defines a queue size (bounded), you can also specify how to handle tasks submitted when the queue is full. By default, task submission blocks until there is space in the queue (blocking mode), but you can change this behavior to non-blocking by setting the `WithNonBlocking` option to `true` when creating a pool. If the queue is full and non-blocking task submission is enabled, the task is dropped and an error is returned (`ErrQueueFull`).

``` go
// Create a pool with a maximum of 10 tasks in the queue and non-blocking task submission
pool := pond.NewPool(1, pond.WithQueueSize(10), pond.WithNonBlocking(true))
```

### Metrics & monitoring

Each worker pool instance exposes useful metrics that can be queried through the following methods:
Expand Down
2 changes: 1 addition & 1 deletion default.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pond

// defaultPool is the default pool used by the package-level functions.
var defaultPool = newPool(0)
var defaultPool = newPool(0, nil)

// Submit submits a task to the default pool and returns a future that can be used to wait for the task to complete.
func Submit(task func()) Task {
Expand Down
Empty file added go.sum
Empty file.
2 changes: 1 addition & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) {

g.taskWaitGroup.Add(1)

err := g.pool.dispatcher.Write(func() error {
err := g.pool.submit(func() error {
defer g.taskWaitGroup.Done()

// Check if the context has been cancelled to prevent running tasks that are not needed
Expand Down
16 changes: 10 additions & 6 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,23 @@ func TestTaskGroupWithContextCanceled(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
taskStarted := make(chan struct{})

task := group.SubmitErr(func() error {
taskStarted <- struct{}{}

err := group.SubmitErr(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
return nil
}
}).Wait()
})

<-taskStarted
cancel()

err := task.Wait()

assert.Equal(t, context.Canceled, err)
}
Expand Down
141 changes: 0 additions & 141 deletions internal/dispatcher/dispatcher.go

This file was deleted.

Loading

0 comments on commit e2fee68

Please sign in to comment.