Skip to content

Commit 6962ee3

Browse files
committed
[FAB-5313] Leader election yield if deliver unavailable
This commit makes the peer relinquish its leadership if it can't connect to the ordering service. Change-Id: I5fe679d5f23e539828fea4a9398c7dd4d9fd0f93 Signed-off-by: yacovm <[email protected]>
1 parent 3a4b1f2 commit 6962ee3

9 files changed

+295
-15
lines changed

core/deliverservice/deliveryclient.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,20 @@ var (
3434
reConnectBackoffThreshold = float64(time.Hour)
3535
)
3636

37+
// SetReconnectTotalTimeThreshold sets the total time the delivery service
38+
// may spend in reconnection attempts until its retry logic gives up
39+
// and returns an error
40+
func SetReconnectTotalTimeThreshold(duration time.Duration) {
41+
reConnectTotalTimeThreshold = duration
42+
}
43+
3744
// DeliverService used to communicate with orderers to obtain
3845
// new blocks and send them to the committer service
3946
type DeliverService interface {
4047
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
4148
// to channel peers.
42-
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
49+
// When the delivery finishes, the finalizer func is called
50+
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
4351

4452
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
4553
// to channel peers.
@@ -117,7 +125,7 @@ func (d *deliverServiceImpl) validateConfiguration() error {
117125
// initializes the grpc stream for given chainID, creates blocks provider instance
118126
// that spawns in go routine to read new blocks starting from the position provided by ledger
119127
// info instance.
120-
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
128+
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
121129
d.lock.Lock()
122130
defer d.lock.Unlock()
123131
if d.stopping {
@@ -133,7 +141,10 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
133141
client := d.newClient(chainID, ledgerInfo)
134142
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
135143
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
136-
go d.blockProviders[chainID].DeliverBlocks()
144+
go func() {
145+
d.blockProviders[chainID].DeliverBlocks()
146+
finalizer()
147+
}()
137148
}
138149
return nil
139150
}

core/deliverservice/deliveryclient_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ func TestNewDeliverService(t *testing.T) {
113113
ConnFactory: connFactory,
114114
})
115115
assert.NoError(t, err)
116-
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}))
116+
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}))
117117

118118
// Lets start deliver twice
119-
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery")
119+
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "can't start delivery")
120120
// Lets stop deliver that not started
121121
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery")
122122

@@ -130,7 +130,7 @@ func TestNewDeliverService(t *testing.T) {
130130
assert.Equal(t, 0, connNumber)
131131
assertBlockDissemination(0, gossipServiceAdapter.GossipBlockDisseminations, t)
132132
assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt))
133-
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping")
133+
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "Delivery service is stopping")
134134
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping")
135135
}
136136

@@ -157,7 +157,7 @@ func TestDeliverServiceRestart(t *testing.T) {
157157
li := &mocks.MockLedgerInfo{Height: uint64(100)}
158158
os.SetNextExpectedSeek(uint64(100))
159159

160-
err = service.StartDeliverForChannel("TEST_CHAINID", li)
160+
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
161161
assert.NoError(t, err, "can't start delivery")
162162
// Check that delivery client requests blocks in order
163163
go os.SendBlock(uint64(100))
@@ -203,7 +203,7 @@ func TestDeliverServiceFailover(t *testing.T) {
203203
os1.SetNextExpectedSeek(uint64(100))
204204
os2.SetNextExpectedSeek(uint64(100))
205205

206-
err = service.StartDeliverForChannel("TEST_CHAINID", li)
206+
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
207207
assert.NoError(t, err, "can't start delivery")
208208
// We need to discover to which instance the client connected to
209209
go os1.SendBlock(uint64(100))
@@ -278,7 +278,7 @@ func TestDeliverServiceServiceUnavailable(t *testing.T) {
278278
os1.SetNextExpectedSeek(li.Height)
279279
os2.SetNextExpectedSeek(li.Height)
280280

281-
err = service.StartDeliverForChannel("TEST_CHAINID", li)
281+
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
282282
assert.NoError(t, err, "can't start delivery")
283283

284284
waitForConnectionToSomeOSN := func() (*mocks.Orderer, *mocks.Orderer) {
@@ -367,7 +367,7 @@ func TestDeliverServiceShutdown(t *testing.T) {
367367

368368
li := &mocks.MockLedgerInfo{Height: uint64(100)}
369369
os.SetNextExpectedSeek(uint64(100))
370-
err = service.StartDeliverForChannel("TEST_CHAINID", li)
370+
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
371371
assert.NoError(t, err, "can't start delivery")
372372

373373
// Check that delivery service requests blocks in order

core/peer/peer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type mockDeliveryClient struct {
4848

4949
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
5050
// to channel peers.
51-
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
51+
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
5252
return nil
5353
}
5454

core/scc/cscc/configure_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type mockDeliveryClient struct {
5555

5656
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
5757
// to channel peers.
58-
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
58+
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
5959
return nil
6060
}
6161

gossip/election/election.go

+55
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ type LeaderElectionService interface {
9393

9494
// Stop stops the LeaderElectionService
9595
Stop()
96+
97+
// Yield relinquishes the leadership until a new leader is elected,
98+
// or a timeout expires
99+
Yield()
96100
}
97101

98102
type peerID []byte
@@ -150,10 +154,12 @@ type leaderElectionSvcImpl struct {
150154
isLeader int32
151155
toDie int32
152156
leaderExists int32
157+
yield int32
153158
sleeping bool
154159
adapter LeaderElectionAdapter
155160
logger *logging.Logger
156161
callback leadershipCallback
162+
yieldTimer *time.Timer
157163
}
158164

159165
func (le *leaderElectionSvcImpl) start() {
@@ -239,6 +245,11 @@ func (le *leaderElectionSvcImpl) run() {
239245
if !le.isLeaderExists() {
240246
le.leaderElection()
241247
}
248+
// If we are yielding and some leader has been elected,
249+
// stop yielding
250+
if le.isLeaderExists() && le.isYielding() {
251+
le.stopYielding()
252+
}
242253
if le.shouldStop() {
243254
return
244255
}
@@ -253,14 +264,26 @@ func (le *leaderElectionSvcImpl) run() {
253264
func (le *leaderElectionSvcImpl) leaderElection() {
254265
le.logger.Debug(le.id, ": Entering")
255266
defer le.logger.Debug(le.id, ": Exiting")
267+
// If we're yielding to other peers, do not participate
268+
// in leader election
269+
if le.isYielding() {
270+
return
271+
}
272+
// Propose ourselves as a leader
256273
le.propose()
274+
// Collect other proposals
257275
le.waitForInterrupt(getLeaderElectionDuration())
258276
// If someone declared itself as a leader, give up
259277
// on trying to become a leader too
260278
if le.isLeaderExists() {
261279
le.logger.Debug(le.id, ": Some peer is already a leader")
262280
return
263281
}
282+
283+
if le.isYielding() {
284+
le.logger.Debug(le.id, ": Aborting leader election because yielding")
285+
return
286+
}
264287
// Leader doesn't exist, let's see if there is a better candidate than us
265288
// for being a leader
266289
for _, o := range le.proposals.ToArray() {
@@ -364,6 +387,38 @@ func (le *leaderElectionSvcImpl) shouldStop() bool {
364387
return atomic.LoadInt32(&le.toDie) == int32(1)
365388
}
366389

390+
func (le *leaderElectionSvcImpl) isYielding() bool {
391+
return atomic.LoadInt32(&le.yield) == int32(1)
392+
}
393+
394+
func (le *leaderElectionSvcImpl) stopYielding() {
395+
le.logger.Debug("Stopped yielding")
396+
le.Lock()
397+
defer le.Unlock()
398+
atomic.StoreInt32(&le.yield, int32(0))
399+
le.yieldTimer.Stop()
400+
}
401+
402+
// Yield relinquishes the leadership until a new leader is elected,
403+
// or a timeout expires
404+
func (le *leaderElectionSvcImpl) Yield() {
405+
le.Lock()
406+
defer le.Unlock()
407+
if !le.IsLeader() || le.isYielding() {
408+
return
409+
}
410+
// Turn on the yield flag
411+
atomic.StoreInt32(&le.yield, int32(1))
412+
// Stop being a leader
413+
le.stopBeingLeader()
414+
// Clear the leader exists flag since it could be that we are the leader
415+
atomic.StoreInt32(&le.leaderExists, int32(0))
416+
// Clear the yield flag in any case afterwards
417+
le.yieldTimer = time.AfterFunc(getLeaderAliveThreshold()*6, func() {
418+
atomic.StoreInt32(&le.yield, int32(0))
419+
})
420+
}
421+
367422
// Stop stops the LeaderElectionService
368423
func (le *leaderElectionSvcImpl) Stop() {
369424
le.logger.Debug(le.id, ": Entering")

gossip/election/election_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,70 @@ func TestLeadershipTakeover(t *testing.T) {
308308
assert.Equal(t, "p2", leaders[0])
309309
}
310310

311+
func TestYield(t *testing.T) {
312+
t.Parallel()
313+
// Scenario: Peers spawn and a leader is elected.
314+
// After a while, the leader yields.
315+
// (Call yield twice to ensure only one callback is called)
316+
// Expected outcome:
317+
// (1) A new leader is elected
318+
// (2) The old leader doesn't take back its leadership
319+
peers := createPeers(0, 0, 1, 2, 3, 4, 5)
320+
leaders := waitForLeaderElection(t, peers)
321+
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
322+
assert.Equal(t, "p0", leaders[0])
323+
peers[0].Yield()
324+
// Ensure the callback was called with 'false'
325+
assert.True(t, peers[0].isCallbackInvoked())
326+
assert.False(t, peers[0].isLeaderFromCallback())
327+
// Clear the callback invoked flag
328+
peers[0].lock.Lock()
329+
peers[0].callbackInvoked = false
330+
peers[0].lock.Unlock()
331+
// Yield again and ensure it isn't called again
332+
peers[0].Yield()
333+
assert.False(t, peers[0].isCallbackInvoked())
334+
335+
ensureP0isNotAleader := func() bool {
336+
leaders := waitForLeaderElection(t, peers)
337+
return len(leaders) == 1 && leaders[0] != "p0"
338+
}
339+
// A new leader is elected, and it is not p0
340+
waitForBoolFunc(t, ensureP0isNotAleader, true)
341+
time.Sleep(getLeaderAliveThreshold() * 2)
342+
// After a while, p0 doesn't restore its leadership status
343+
waitForBoolFunc(t, ensureP0isNotAleader, true)
344+
}
345+
346+
func TestYieldSinglePeer(t *testing.T) {
347+
t.Parallel()
348+
// Scenario: spawn a single peer and have it yield.
349+
// Ensure it recovers its leadership after a while.
350+
peers := createPeers(0, 0)
351+
waitForLeaderElection(t, peers)
352+
peers[0].Yield()
353+
assert.False(t, peers[0].IsLeader())
354+
waitForLeaderElection(t, peers)
355+
}
356+
357+
func TestYieldAllPeers(t *testing.T) {
358+
t.Parallel()
359+
// Scenario: spawn 2 peers and have them all yield after regaining leadership.
360+
// Ensure the first peer is the leader in the end after both peers yield
361+
peers := createPeers(0, 0, 1)
362+
leaders := waitForLeaderElection(t, peers)
363+
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
364+
assert.Equal(t, "p0", leaders[0])
365+
peers[0].Yield()
366+
leaders = waitForLeaderElection(t, peers)
367+
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
368+
assert.Equal(t, "p1", leaders[0])
369+
peers[1].Yield()
370+
leaders = waitForLeaderElection(t, peers)
371+
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
372+
assert.Equal(t, "p0", leaders[0])
373+
}
374+
311375
func TestPartition(t *testing.T) {
312376
t.Parallel()
313377
// Scenario: peers spawn together, and then after a while a network partition occurs

gossip/service/gossip_service.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
196196
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
197197
} else if isStaticOrgLeader {
198198
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
199-
g.deliveryService.StartDeliverForChannel(chainID, committer)
199+
g.deliveryService.StartDeliverForChannel(chainID, committer, func() {})
200200
} else {
201201
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
202202
}
@@ -282,8 +282,12 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
282282
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
283283
return func(isLeader bool) {
284284
if isLeader {
285+
yield := func() {
286+
le := g.leaderElection[chainID]
287+
le.Yield()
288+
}
285289
logger.Info("Elected as a leader, starting delivery service for channel", chainID)
286-
if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil {
290+
if err := g.deliveryService.StartDeliverForChannel(chainID, committer, yield); err != nil {
287291
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
288292
}
289293
} else {

gossip/service/gossip_service_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ type mockDeliverService struct {
276276
running map[string]bool
277277
}
278278

279-
func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
279+
func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
280280
ds.running[chainID] = true
281281
return nil
282282
}

0 commit comments

Comments
 (0)