-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy paththrottler.go
165 lines (148 loc) · 5.71 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Package throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines
// with channels. The API is almost identical to Wait Groups, but it allows you to set
// a max number of workers that can be running simultaneously. It uses channels internally
// to block until a job completes by calling Done(err) or until all jobs have been completed.
//
// After exiting the loop where you are using Throttler, you can call the `Err` or `Errs` method to check
// for errors. `Err` will return a single error representative of all the errors Throttler caught. The
// `Errs` method will return all the errors as a slice of errors (`[]error`).
//
// Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup
//
// See a fully functional example on the playground at http://bit.ly/throttler-v3
package throttler
import (
"fmt"
"math"
"sync"
"sync/atomic"
)
// Throttler stores all the information about the number of workers, the active workers and error information
type Throttler struct {
maxWorkers int32
workerCount int32
batchingTotal int32
batchSize int32
totalJobs int32
jobsStarted int32
jobsCompleted int32
doneChan chan struct{}
errsMutex *sync.Mutex
errs []error
errorCount int32
}
// New returns a Throttler that will govern the max number of workers and will
// work with the total number of jobs. It panics if maxWorkers < 1.
func New(maxWorkers, totalJobs int) *Throttler {
if maxWorkers < 1 {
panic("maxWorkers has to be at least 1")
}
return &Throttler{
maxWorkers: int32(maxWorkers),
batchSize: 1,
totalJobs: int32(totalJobs),
doneChan: make(chan struct{}, totalJobs),
errsMutex: &sync.Mutex{},
}
}
// NewBatchedThrottler returns a Throttler (just like New), but also enables batching.
func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler {
totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize)))
t := New(maxWorkers, totalJobs)
t.batchSize = int32(batchSize)
t.batchingTotal = int32(batchingTotal)
return t
}
// SetMaxWorkers lets you change the total number of workers that can run concurrently. NOTE: If
// all workers are currently running, this setting is not guaranteed to take effect until one of them
// completes and Throttle() is called again
func (t *Throttler) SetMaxWorkers(maxWorkers int) {
if maxWorkers < 1 {
panic("maxWorkers has to be at least 1")
}
atomic.StoreInt32(&t.maxWorkers, int32(maxWorkers))
}
// Throttle works similarly to sync.WaitGroup, except inside your goroutine dispatch
// loop rather than after. It will not block until the number of active workers
// matches the max number of workers designated in the call to NewThrottler or
// all of the jobs have been dispatched. It stops blocking when Done has been called
// as many times as totalJobs.
func (t *Throttler) Throttle() int {
if atomic.LoadInt32(&t.totalJobs) < 1 {
return int(atomic.LoadInt32(&t.errorCount))
}
atomic.AddInt32(&t.jobsStarted, 1)
atomic.AddInt32(&t.workerCount, 1)
// check to see if the current number of workers equals the max number of workers
// if they are equal, wait for one to finish before continuing
if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
atomic.AddInt32(&t.jobsCompleted, 1)
atomic.AddInt32(&t.workerCount, -1)
<-t.doneChan
}
// check to see if all of the jobs have been started, and if so, wait until all
// jobs have been completed before continuing
if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
atomic.AddInt32(&t.jobsCompleted, 1)
<-t.doneChan
}
}
return int(atomic.LoadInt32(&t.errorCount))
}
// Done lets Throttler know that a job has been completed so that another worker
// can be activated. If Done is called less times than totalJobs,
// Throttle will block forever
func (t *Throttler) Done(err error) {
if err != nil {
t.errsMutex.Lock()
t.errs = append(t.errs, err)
atomic.AddInt32(&t.errorCount, 1)
t.errsMutex.Unlock()
}
t.doneChan <- struct{}{}
}
// Err returns an error representative of all errors caught by throttler
func (t *Throttler) Err() error {
t.errsMutex.Lock()
defer t.errsMutex.Unlock()
if atomic.LoadInt32(&t.errorCount) == 0 {
return nil
}
return multiError(t.errs)
}
// Errs returns a slice of any errors that were received from calling Done()
func (t *Throttler) Errs() []error {
t.errsMutex.Lock()
defer t.errsMutex.Unlock()
return t.errs
}
type multiError []error
func (te multiError) Error() string {
errString := te[0].Error()
if len(te) > 1 {
errString += fmt.Sprintf(" (and %d more errors)", len(te)-1)
}
return errString
}
// BatchStartIndex returns the starting index for the next batch. The job count isn't modified
// until th.Throttle() is called, so if you don't call Throttle before executing this
// again, it will return the same index as before
func (t *Throttler) BatchStartIndex() int {
return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize))
}
// BatchEndIndex returns the ending index for the next batch. It either returns the full batch size
// or the remaining amount of jobs. The job count isn't modified
// until th.Throttle() is called, so if you don't call Throttle before executing this
// again, it will return the same index as before.
func (t *Throttler) BatchEndIndex() int {
end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize)
if end > atomic.LoadInt32(&t.batchingTotal) {
end = atomic.LoadInt32(&t.batchingTotal)
}
return int(end)
}
// TotalJobs returns the total number of jobs throttler is performing
func (t *Throttler) TotalJobs() int {
return int(atomic.LoadInt32(&t.totalJobs))
}