-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdispatcher.go
257 lines (225 loc) · 6.86 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package gosd
import (
"container/heap"
"context"
"errors"
"sync"
)
// dispatcherState represents state for a Dispatcher.
type dispatcherState int
const (
paused dispatcherState = iota
processing
shutdown
shutdownAndDrain
)
// Dispatcher processes the ingress and dispatching of scheduled messages.
type Dispatcher[T any] struct {
state dispatcherState
maxMessages int
pq priorityQueue[T]
nextMessage *ScheduledMessage[T]
delayer delayer[T]
delayerIdleChannel chan bool
dispatchChannel chan T
ingressChannel chan *ScheduledMessage[T]
shutdown chan error
stopProcess chan bool
mutex *sync.Mutex
}
// NewDispatcher creates a new instance of a Dispatcher.
func NewDispatcher[T any](config *DispatcherConfig) (*Dispatcher[T], error) {
if config.MaxMessages <= 0 {
return nil, errors.New("MaxMessages should be greater than 0")
}
newIdleChannel := make(chan bool, 1)
newDispatchChannel := make(chan T, config.DispatchChannelSize)
newPq := priorityQueue[T]{
items: make([]*item[T], 0),
maintainOrder: config.GuaranteeOrder,
}
heap.Init(&newPq)
return &Dispatcher[T]{
pq: newPq,
maxMessages: config.MaxMessages,
delayer: newDelay[T](newDispatchChannel, newIdleChannel),
delayerIdleChannel: newIdleChannel,
dispatchChannel: newDispatchChannel,
ingressChannel: make(chan *ScheduledMessage[T], config.IngressChannelSize),
shutdown: make(chan error),
stopProcess: make(chan bool),
mutex: &sync.Mutex{},
}, nil
}
// Shutdown will attempt to shutdown the Dispatcher within the context deadline, otherwise terminating the process
// ungracefully.
//
// If drainImmediately is true, then all messages will be dispatched immediately regardless of the schedule set. Order
// can be lost if new messages are still being ingested.
func (d *Dispatcher[T]) Shutdown(ctx context.Context, drainImmediately bool) error {
if d.state == shutdown || d.state == shutdownAndDrain {
return errors.New("shutdown has already happened")
}
d.mutex.Lock()
defer d.mutex.Unlock()
// if paused, resume the process in order to drain messages
if d.state == paused {
d.delayer.wait(d.nextMessage)
go d.process()
}
if drainImmediately {
d.state = shutdownAndDrain
} else {
d.state = shutdown
}
// block new messages and let the channel drain
close(d.ingressChannel)
for {
select {
case <-ctx.Done():
// forcefully kill the process regardless of messages left
close(d.stopProcess)
close(d.dispatchChannel)
return errors.New("failed to gracefully drain and shutdown dispatcher within deadline")
default:
// wait for the ingress channel and heap to drain
if len(d.ingressChannel) == 0 && d.pq.Len() == 0 && d.delayer.available() {
close(d.stopProcess)
close(d.dispatchChannel)
return nil
}
}
}
}
// Start initializes the processing of scheduled messages and blocks.
func (d *Dispatcher[T]) Start() error {
d.mutex.Lock()
if d.state == shutdown || d.state == shutdownAndDrain {
return errors.New("dispatcher is already running and shutting/shut down")
} else if d.state == processing {
return errors.New("dispatcher is already running")
}
d.state = processing
d.mutex.Unlock()
d.process()
return nil
}
// Pause updates the state of the Dispatcher to stop processing messages and will close the main process loop.
func (d *Dispatcher[T]) Pause() error {
d.mutex.Lock()
if d.state == shutdown || d.state == shutdownAndDrain {
return errors.New("dispatcher is shutting/shut down and cannot be paused")
} else if d.state == paused {
return errors.New("dispatcher is already paused")
}
d.state = paused
d.stopProcess <- true
d.delayer.stop(false)
d.mutex.Unlock()
return nil
}
// Resume updates the state of the Dispatcher to start processing messages and starts the timer for the last message
// being processed and blocks.
func (d *Dispatcher[T]) Resume() error {
d.mutex.Lock()
if d.state == shutdown || d.state == shutdownAndDrain {
return errors.New("dispatcher is shutting/shut down")
} else if d.state == processing {
return errors.New("dispatcher is already running")
}
d.state = processing
if d.nextMessage != nil {
d.delayer.wait(d.nextMessage)
}
d.mutex.Unlock()
d.process()
return nil
}
// process handles the processing of scheduled messages.
func (d *Dispatcher[T]) process() {
for {
select {
case <-d.stopProcess:
return
default:
// check if the state has changed to `shutdownAndDrain`
d.handleShutdownAndDrain()
// check pq for the next message to wait on and continue if not full
if cont := d.handlePriorityQueue(); !cont {
continue
}
// check the ingress channel for new messages
d.handleIngress()
}
}
}
// handleShutdown drains the heap.
func (d *Dispatcher[T]) handleShutdownAndDrain() {
if d.state == shutdownAndDrain {
d.delayer.stop(true)
if len(d.delayerIdleChannel) > 0 {
<-d.delayerIdleChannel
d.drainHeap()
}
}
}
// handlePriorityQueue checks whether the heap is full and will Pop the next message if present and when the delayer is
// idle.
func (d *Dispatcher[T]) handlePriorityQueue() (cont bool) {
// check if we've exceeded the maximum messages to store in the heap
if d.pq.Len() >= d.maxMessages {
if len(d.delayerIdleChannel) > 0 {
<-d.delayerIdleChannel
d.waitNextMessage()
}
// skip ingest to prevent heap from exceeding MaxMessages
return false
} else if d.pq.Len() > 0 && len(d.delayerIdleChannel) > 0 {
<-d.delayerIdleChannel
d.waitNextMessage()
}
return true
}
// handleIngress checks for new messages off the ingress channel and will either dispatch if `shutdownAndDrain`, replace
// the current delayer message or add to the heap.
func (d *Dispatcher[T]) handleIngress() {
if len(d.ingressChannel) > 0 {
if msg, ok := <-d.ingressChannel; ok {
if d.state == shutdownAndDrain {
// dispatch the new message immediately
d.dispatchChannel <- msg.Message
} else if d.nextMessage != nil && msg.At.Before(d.nextMessage.At) {
heap.Push(&d.pq, d.nextMessage)
d.nextMessage = msg
d.delayer.stop(false)
<-d.delayerIdleChannel
d.delayer.wait(msg)
} else if d.nextMessage == nil {
d.nextMessage = msg
d.delayer.wait(msg)
} else {
heap.Push(&d.pq, msg)
}
}
}
}
func (d *Dispatcher[T]) waitNextMessage() {
msg := heap.Pop(&d.pq).(*ScheduledMessage[T])
d.nextMessage = msg
d.delayer.wait(msg)
}
func (d *Dispatcher[T]) drainHeap() {
for d.pq.Len() > 0 {
msg := heap.Pop(&d.pq).(*ScheduledMessage[T])
// dispatch the message immediately
d.dispatchChannel <- msg.Message
}
}
// IngressChannel returns the send-only channel of type `ScheduledMessage`.
func (d *Dispatcher[T]) IngressChannel() chan<- *ScheduledMessage[T] {
return d.ingressChannel
}
// DispatchChannel returns a receive-only channel of type `T`.
func (d *Dispatcher[T]) DispatchChannel() <-chan T {
return d.dispatchChannel
}