@@ -17,29 +17,365 @@ limitations under the License.
17
17
package election
18
18
19
19
import (
20
- "github.com/hyperledger/fabric/gossip/common"
21
- "github.com/hyperledger/fabric/gossip/proto"
20
+ "fmt"
21
+ "sync"
22
+ "sync/atomic"
23
+ "time"
24
+
25
+ "github.com/hyperledger/fabric/gossip/util"
26
+ "github.com/op/go-logging"
27
+ )
28
+
29
+ var (
30
+ startupGracePeriod = time .Second * 15
31
+ membershipSampleInterval = time .Second
32
+ leaderAliveThreshold = time .Second * 10
33
+ leadershipDeclarationInterval = leaderAliveThreshold / 2
34
+ leaderElectionDuration = time .Second * 5
22
35
)
23
36
37
+ // Gossip leader election module
38
+ // Algorithm properties:
39
+ // - Peers break symmetry by comparing IDs
40
+ // - Each peer is either a leader or a follower,
41
+ // and the aim is to have exactly 1 leader if the membership view
42
+ // is the same for all peers
43
+ // - If the network is partitioned into 2 or more sets, the number of leaders
44
+ // is the number of network partitions, but when the partition heals,
45
+ // only 1 leader should be left eventually
46
+ // - Peers communicate by gossiping leadership proposal or declaration messages
47
+
48
+
49
+ // The Algorithm, in pseudo code:
50
+ //
51
+ //
52
+ // variables:
53
+ // leaderKnown = false
54
+ //
55
+ // Invariant:
56
+ // Peer listens for messages from remote peers
57
+ // and whenever it receives a leadership declaration,
58
+ // leaderKnown is set to true
59
+ //
60
+ // Startup():
61
+ // wait for membership view to stabilize, or for a leadership declaration is received
62
+ // or the startup timeout expires.
63
+ // goto SteadyState()
64
+ //
65
+ // SteadyState():
66
+ // while true:
67
+ // If leaderKnown is false:
68
+ // LeaderElection()
69
+ // If you are the leader:
70
+ // Broadcast leadership declaration
71
+ // If a leadership declaration was received from
72
+ // a peer with a lower ID,
73
+ // become a follower
74
+ // Else, you're a follower:
75
+ // If haven't received a leadership declaration within
76
+ // a time threshold:
77
+ // set leaderKnown to false
78
+ //
79
+ // LeaderElection():
80
+ // Gossip leadership proposal message
81
+ // Collect messages from other peers sent within a time period
82
+ // If received a leadership declaration:
83
+ // return
84
+ // Iterate over all proposal messages collected.
85
+ // If a proposal message from a peer with an ID lower
86
+ // than yourself was received, return.
87
+ // Else, declare yourself a leader
88
+
89
+
90
+
24
91
// LeaderElectionAdapter is used by the leader election module
25
- // to send and receive messages, as well as notify a leader change
92
+ // to send and receive messages and to get membership information
26
93
type LeaderElectionAdapter interface {
27
94
28
95
// Gossip gossips a message to other peers
29
- Gossip (msg * proto. GossipMessage )
96
+ Gossip (Msg )
30
97
31
- // Accept returns a channel that emits messages that fit
32
- // the given predicate
33
- Accept (common.MessageAcceptor ) <- chan * proto.GossipMessage
98
+ // Accept returns a channel that emits messages
99
+ Accept () <- chan Msg
100
+
101
+ // CreateProposalMessage
102
+ CreateMessage (isDeclaration bool ) Msg
103
+
104
+ // Peers returns a list of peers considered alive
105
+ Peers () []Peer
34
106
}
35
107
36
108
// LeaderElectionService is the object that runs the leader election algorithm
37
109
type LeaderElectionService interface {
38
110
// IsLeader returns whether this peer is a leader or not
39
111
IsLeader () bool
112
+
113
+ // Stop stops the LeaderElectionService
114
+ Stop ()
115
+ }
116
+
117
+ // Peer describes a remote peer
118
+ type Peer interface {
119
+ // ID returns the ID of the peer
120
+ ID () string
121
+ }
122
+
123
+ // Msg describes a message sent from a remote peer
124
+ type Msg interface {
125
+ // SenderID returns the ID of the peer sent the message
126
+ SenderID () string
127
+ // IsProposal returns whether this message is a leadership proposal
128
+ IsProposal () bool
129
+ // IsDeclaration returns whether this message is a leadership declaration
130
+ IsDeclaration () bool
131
+ }
132
+
133
+ // NewLeaderElectionService returns a new LeaderElectionService
134
+ func NewLeaderElectionService (adapter LeaderElectionAdapter , id string ) LeaderElectionService {
135
+ if len (id ) == 0 {
136
+ panic (fmt .Errorf ("Empty id" ))
137
+ }
138
+ le := & leaderElectionSvcImpl {
139
+ id : id ,
140
+ proposals : util .NewSet (),
141
+ adapter : adapter ,
142
+ stopChan : make (chan struct {}, 1 ),
143
+ interruptChan : make (chan struct {}, 1 ),
144
+ logger : logging .MustGetLogger ("LeaderElection" ),
145
+ }
146
+ // TODO: This will be configured using the core.yaml when FAB-1217 (Integrate peer logging with gossip logging) is done
147
+ logging .SetLevel (logging .WARNING , "LeaderElection" )
148
+ go le .start ()
149
+ return le
150
+ }
151
+
152
+ // leaderElectionSvcImpl is an implementation of a LeaderElectionService
153
+ type leaderElectionSvcImpl struct {
154
+ id string
155
+ proposals * util.Set
156
+ sync.Mutex
157
+ stopChan chan struct {}
158
+ interruptChan chan struct {}
159
+ stopWG sync.WaitGroup
160
+ isLeader int32
161
+ toDie int32
162
+ leaderExists int32
163
+ sleeping bool
164
+ adapter LeaderElectionAdapter
165
+ logger * logging.Logger
166
+ }
167
+
168
+ func (le * leaderElectionSvcImpl ) start () {
169
+ le .stopWG .Add (2 )
170
+ go le .handleMessages ()
171
+ le .waitForMembershipStabilization (startupGracePeriod )
172
+ go le .run ()
173
+ }
174
+
175
+ func (le * leaderElectionSvcImpl ) handleMessages () {
176
+ le .logger .Info (le .id , ": Entering" )
177
+ defer le .logger .Info (le .id , ": Exiting" )
178
+ defer le .stopWG .Done ()
179
+ msgChan := le .adapter .Accept ()
180
+ for {
181
+ select {
182
+ case <- le .stopChan :
183
+ le .stopChan <- struct {}{}
184
+ return
185
+ case msg := <- msgChan :
186
+ if ! le .isAlive (msg .SenderID ()) {
187
+ le .logger .Debug (le .id , ": Got message from" , msg .SenderID (), "but it is not in the view" )
188
+ break
189
+ }
190
+ le .handleMessage (msg )
191
+ }
192
+ }
193
+ }
194
+
195
+ func (le * leaderElectionSvcImpl ) handleMessage (msg Msg ) {
196
+ msgType := "proposal"
197
+ if msg .IsDeclaration () {
198
+ msgType = "declaration"
199
+ }
200
+ le .logger .Debug (le .id , ":" , msg .SenderID (), "sent us" , msgType )
201
+ le .Lock ()
202
+ defer le .Unlock ()
203
+
204
+ if msg .IsProposal () {
205
+ le .proposals .Add (msg .SenderID ())
206
+ } else if msg .IsDeclaration () {
207
+ atomic .StoreInt32 (& le .leaderExists , int32 (1 ))
208
+ if le .sleeping && len (le .interruptChan ) == 0 {
209
+ le .interruptChan <- struct {}{}
210
+ }
211
+ if msg .SenderID () < le .id && le .IsLeader () {
212
+ le .stopBeingLeader ()
213
+ }
214
+ } else {
215
+ // We shouldn't get here
216
+ le .logger .Error ("Got a message that's not a proposal and not a declaration" )
217
+ }
218
+ }
219
+
220
+ // waitForInterrupt sleeps until the interrupt channel is triggered
221
+ // or given timeout expires
222
+ func (le * leaderElectionSvcImpl ) waitForInterrupt (timeout time.Duration ) {
223
+ le .logger .Debug (le .id , ": Entering" )
224
+ defer le .logger .Debug (le .id , ": Exiting" )
225
+ le .Lock ()
226
+ le .sleeping = true
227
+ le .Unlock ()
228
+
229
+ select {
230
+ case <- le .interruptChan :
231
+ case <- le .stopChan :
232
+ le .stopChan <- struct {}{}
233
+ case <- time .After (timeout ):
234
+ }
235
+
236
+ le .Lock ()
237
+ le .sleeping = false
238
+ // We drain the interrupt channel
239
+ // because we might get 2 leadership declarations messages
240
+ // while sleeping, but we would only read 1 of them in the select block above
241
+ le .drainInterruptChannel ()
242
+ le .Unlock ()
243
+ }
244
+
245
+ func (le * leaderElectionSvcImpl ) run () {
246
+ defer le .stopWG .Done ()
247
+ for ! le .shouldStop () {
248
+ if ! le .isLeaderExists () {
249
+ le .leaderElection ()
250
+ }
251
+ if le .shouldStop () {
252
+ return
253
+ }
254
+ if le .IsLeader () {
255
+ le .leader ()
256
+ } else {
257
+ le .follower ()
258
+ }
259
+ }
260
+ }
261
+
262
+ func (le * leaderElectionSvcImpl ) leaderElection () {
263
+ le .logger .Info (le .id , ": Entering" )
264
+ defer le .logger .Info (le .id , ": Exiting" )
265
+ le .propose ()
266
+ le .waitForInterrupt (leaderElectionDuration )
267
+ // If someone declared itself as a leader, give up
268
+ // on trying to become a leader too
269
+ if le .isLeaderExists () {
270
+ le .logger .Info (le .id , ": Some peer is already a leader" )
271
+ return
272
+ }
273
+ // Leader doesn't exist, let's see if there is a better candidate than us
274
+ // for being a leader
275
+ for _ , o := range le .proposals .ToArray () {
276
+ id := o .(string )
277
+ if id < le .id {
278
+ return
279
+ }
280
+ }
281
+ // If we got here, there is no one that proposed being a leader
282
+ // that's a better candidate than us.
283
+ le .beLeader ()
284
+ atomic .StoreInt32 (& le .leaderExists , int32 (1 ))
285
+ }
286
+
287
+ // propose sends a leadership proposal message to remote peers
288
+ func (le * leaderElectionSvcImpl ) propose () {
289
+ le .logger .Info (le .id , ": Entering" )
290
+ le .logger .Info (le .id , ": Exiting" )
291
+ leadershipProposal := le .adapter .CreateMessage (false )
292
+ le .adapter .Gossip (leadershipProposal )
293
+ }
294
+
295
+ func (le * leaderElectionSvcImpl ) follower () {
296
+ le .logger .Debug (le .id , ": Entering" )
297
+ defer le .logger .Debug (le .id , ": Exiting" )
298
+
299
+ le .proposals .Clear ()
300
+ atomic .StoreInt32 (& le .leaderExists , int32 (0 ))
301
+ select {
302
+ case <- time .After (leaderAliveThreshold ):
303
+ case <- le .stopChan :
304
+ le .stopChan <- struct {}{}
305
+ }
306
+ }
307
+
308
+ func (le * leaderElectionSvcImpl ) leader () {
309
+ leaderDeclaration := le .adapter .CreateMessage (true )
310
+ le .adapter .Gossip (leaderDeclaration )
311
+ le .waitForInterrupt (leadershipDeclarationInterval )
312
+ }
313
+
314
+ // waitForMembershipStabilization waits for membership view to stabilize
315
+ // or until a time limit expires, or until a peer declares itself as a leader
316
+ func (le * leaderElectionSvcImpl ) waitForMembershipStabilization (timeLimit time.Duration ) {
317
+ le .logger .Info (le .id , ": Entering" )
318
+ defer le .logger .Info (le .id , ": Exiting" )
319
+ endTime := time .Now ().Add (timeLimit )
320
+ viewSize := len (le .adapter .Peers ())
321
+ for ! le .shouldStop () {
322
+ time .Sleep (membershipSampleInterval )
323
+ newSize := len (le .adapter .Peers ())
324
+ if newSize == viewSize || time .Now ().After (endTime ) || le .isLeaderExists () {
325
+ return
326
+ }
327
+ viewSize = newSize
328
+ }
329
+ }
330
+
331
+ // drainInterruptChannel clears the interruptChannel
332
+ // if needed
333
+ func (le * leaderElectionSvcImpl ) drainInterruptChannel () {
334
+ if len (le .interruptChan ) == 1 {
335
+ <- le .interruptChan
336
+ }
337
+ }
338
+
339
+ // isAlive returns whether peer of given id is considered alive
340
+ func (le * leaderElectionSvcImpl ) isAlive (id string ) bool {
341
+ for _ , p := range le .adapter .Peers () {
342
+ if p .ID () == id {
343
+ return true
344
+ }
345
+ }
346
+ return false
347
+ }
348
+
349
+ func (le * leaderElectionSvcImpl ) isLeaderExists () bool {
350
+ return atomic .LoadInt32 (& le .leaderExists ) == int32 (1 )
351
+ }
352
+
353
+ // IsLeader returns whether this peer is a leader
354
+ func (le * leaderElectionSvcImpl ) IsLeader () bool {
355
+ isLeader := atomic .LoadInt32 (& le .isLeader ) == int32 (1 )
356
+ le .logger .Debug (le .id , ": Returning" , isLeader )
357
+ return isLeader
358
+ }
359
+
360
+ func (le * leaderElectionSvcImpl ) beLeader () {
361
+ le .logger .Info (le .id , ": Becoming a leader" )
362
+ atomic .StoreInt32 (& le .isLeader , int32 (1 ))
363
+ }
364
+
365
+ func (le * leaderElectionSvcImpl ) stopBeingLeader () {
366
+ le .logger .Info (le .id , "Stopped being a leader" )
367
+ atomic .StoreInt32 (& le .isLeader , int32 (0 ))
368
+ }
369
+
370
+ func (le * leaderElectionSvcImpl ) shouldStop () bool {
371
+ return atomic .LoadInt32 (& le .toDie ) == int32 (1 )
40
372
}
41
373
42
- // LeaderElectionService is the implementation of LeaderElectionService
43
- type leaderElectionServiceImpl struct {
44
- adapter LeaderElectionAdapter
374
+ // Stop stops the LeaderElectionService
375
+ func (le * leaderElectionSvcImpl ) Stop () {
376
+ le .logger .Info (le .id , ": Entering" )
377
+ defer le .logger .Info (le .id , ": Exiting" )
378
+ atomic .StoreInt32 (& le .toDie , int32 (1 ))
379
+ le .stopChan <- struct {}{}
380
+ le .stopWG .Wait ()
45
381
}
0 commit comments