Skip to content

Commit a6edbff

Browse files
[FAB-1846] Integration between deliver and election
Creation of leader election service during channel initialization in gossip and connecting between election callback to deliver service Change-Id: Ic2b8b1b5ebc770abcf4b935d39ce06087086b0c9 Signed-off-by: Gennady Laventman <[email protected]>
1 parent 6a3f766 commit a6edbff

11 files changed

+753
-63
lines changed

core/deliverservice/deliveryclient.go

+47-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"sync"
2222
"time"
2323

24+
"fmt"
25+
2426
"github.com/hyperledger/fabric/core/comm"
2527
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
2628
"github.com/hyperledger/fabric/protos/orderer"
@@ -39,10 +41,13 @@ func init() {
3941
// DeliverService used to communicate with orderers to obtain
4042
// new block and send the to the committer service
4143
type DeliverService interface {
42-
// JoinChain once peer joins the chain it should need to check whenever
43-
// it has been selected as a leader and open connection to the configured
44-
// ordering service endpoint
45-
JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
44+
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
45+
// to channel peers.
46+
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
47+
48+
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
49+
// to channel peers.
50+
StopDeliverForChannel(chainID string) error
4651

4752
// Stop terminates delivery service and closes the connection
4853
Stop()
@@ -131,27 +136,29 @@ func NewFactoryDeliverService(gossip blocksprovider.GossipServiceAdapter, factor
131136
}
132137
}
133138

134-
// JoinChain initialize the grpc stream for given chainID, creates blocks provider instance
135-
// to spawn in go routine to read new blocks starting from the position provided by ledger
139+
// StartDeliverForChannel starts blocks delivery for channel
140+
// initializes the grpc stream for given chainID, creates blocks provider instance
141+
// that spawns in go routine to read new blocks starting from the position provided by ledger
136142
// info instance.
137-
func (d *deliverServiceImpl) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
138-
isLeader := viper.GetBool("peer.gossip.orgLeader")
139-
140-
if isLeader {
143+
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
144+
d.lock.Lock()
145+
defer d.lock.Unlock()
146+
if d.stopping {
147+
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
148+
logger.Errorf(errMsg)
149+
return errors.New(errMsg)
150+
}
151+
if _, exist := d.clients[chainID]; exist {
152+
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
153+
logger.Errorf(errMsg)
154+
return errors.New(errMsg)
155+
} else {
141156
abc, err := d.clientsFactory.Create()
142157
if err != nil {
143158
logger.Errorf("Unable to initialize atomic broadcast, due to %s", err)
144159
return err
145160
}
146-
147-
d.lock.Lock()
148-
defer d.lock.Unlock()
149-
150-
if d.stopping {
151-
logger.Errorf("Delivery service is stopping cannot join a new channel")
152-
return errors.New("Delivery service is stopping cannot join a new channel")
153-
}
154-
161+
logger.Debug("This peer will pass blocks from orderer service to other peers")
155162
d.clients[chainID] = blocksprovider.NewBlocksProvider(chainID, abc, d.gossip)
156163

157164
if err := d.clients[chainID].RequestBlocks(ledgerInfo); err == nil {
@@ -162,6 +169,27 @@ func (d *deliverServiceImpl) JoinChain(chainID string, ledgerInfo blocksprovider
162169
return nil
163170
}
164171

172+
// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider
173+
func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error {
174+
d.lock.Lock()
175+
defer d.lock.Unlock()
176+
if d.stopping {
177+
errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID)
178+
logger.Errorf(errMsg)
179+
return errors.New(errMsg)
180+
}
181+
if client, exist := d.clients[chainID]; exist {
182+
client.Stop()
183+
delete(d.clients, chainID)
184+
logger.Debug("This peer will stop pass blocks from orderer service to other peers")
185+
} else {
186+
errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID)
187+
logger.Errorf(errMsg)
188+
return errors.New(errMsg)
189+
}
190+
return nil
191+
}
192+
165193
// Stop all service and release resources
166194
func (d *deliverServiceImpl) Stop() {
167195
d.lock.Lock()

core/deliverservice/deliveryclient_test.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,17 @@ func TestNewDeliverService(t *testing.T) {
4949
}
5050

5151
service := NewFactoryDeliverService(gossipServiceAdapter, factory, nil)
52-
service.JoinChain("TEST_CHAINID", &mocks.MockLedgerInfo{0})
52+
assert.NilError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}))
53+
54+
// Lets start deliver twice
55+
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery")
56+
// Lets stop deliver that not started
57+
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery")
5358

5459
// Let it try to simulate a few recv -> gossip rounds
60+
time.Sleep(time.Duration(10) * time.Millisecond)
61+
assert.NilError(t, service.StopDeliverForChannel("TEST_CHAINID"))
62+
5563
time.Sleep(time.Duration(10) * time.Millisecond)
5664
service.Stop()
5765

@@ -61,4 +69,7 @@ func TestNewDeliverService(t *testing.T) {
6169
assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt))
6270
assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.GossipCallsCnt))
6371

72+
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping")
73+
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping")
74+
6475
}

core/peer/peer_test.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,15 @@ import (
4040
type mockDeliveryClient struct {
4141
}
4242

43-
// JoinChain once peer joins the chain it should need to check whenever
44-
// it has been selected as a leader and open connection to the configured
45-
// ordering service endpoint
46-
func (*mockDeliveryClient) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
43+
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
44+
// to channel peers.
45+
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
46+
return nil
47+
}
48+
49+
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
50+
// to channel peers.
51+
func (ds *mockDeliveryClient) StopDeliverForChannel(chainID string) error {
4752
return nil
4853
}
4954

core/scc/cscc/configure_test.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,15 @@ import (
4646
type mockDeliveryClient struct {
4747
}
4848

49-
// JoinChain once peer joins the chain it should need to check whenever
50-
// it has been selected as a leader and open connection to the configured
51-
// ordering service endpoint
52-
func (*mockDeliveryClient) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
49+
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
50+
// to channel peers.
51+
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
52+
return nil
53+
}
54+
55+
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
56+
// to channel peers.
57+
func (ds *mockDeliveryClient) StopDeliverForChannel(chainID string) error {
5358
return nil
5459
}
5560

gossip/election/adapter.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/hyperledger/fabric/gossip/common"
2525
"github.com/hyperledger/fabric/gossip/discovery"
26+
"github.com/hyperledger/fabric/gossip/util"
2627
proto "github.com/hyperledger/fabric/protos/gossip"
2728
"github.com/op/go-logging"
2829
)
@@ -44,7 +45,7 @@ func (mi *msgImpl) IsDeclaration() bool {
4445
}
4546

4647
type peerImpl struct {
47-
member *discovery.NetworkMember
48+
member discovery.NetworkMember
4849
}
4950

5051
func (pi *peerImpl) ID() peerID {
@@ -66,8 +67,8 @@ type gossip interface {
6667
}
6768

6869
type adapterImpl struct {
69-
gossip gossip
70-
self *discovery.NetworkMember
70+
gossip gossip
71+
selfPKIid common.PKIidType
7172

7273
incTime uint64
7374
seqNum uint64
@@ -81,17 +82,17 @@ type adapterImpl struct {
8182
}
8283

8384
// NewAdapter creates new leader election adapter
84-
func NewAdapter(gossip gossip, self *discovery.NetworkMember, channel common.ChainID) LeaderElectionAdapter {
85+
func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChainID) LeaderElectionAdapter {
8586
return &adapterImpl{
86-
gossip: gossip,
87-
self: self,
87+
gossip: gossip,
88+
selfPKIid: pkiid,
8889

8990
incTime: uint64(time.Now().UnixNano()),
9091
seqNum: uint64(0),
9192

9293
channel: channel,
9394

94-
logger: logging.MustGetLogger("LeaderElectionAdapter"),
95+
logger: util.GetLogger(util.LoggingElectionModule, ""),
9596

9697
doneCh: make(chan struct{}),
9798
stopOnce: &sync.Once{},
@@ -134,7 +135,7 @@ func (ai *adapterImpl) CreateMessage(isDeclaration bool) Msg {
134135
seqNum := ai.seqNum
135136

136137
leadershipMsg := &proto.LeadershipMessage{
137-
PkiID: ai.self.PKIid,
138+
PkiID: ai.selfPKIid,
138139
IsDeclaration: isDeclaration,
139140
Timestamp: &proto.PeerTime{
140141
IncNumber: ai.incTime,
@@ -156,7 +157,7 @@ func (ai *adapterImpl) Peers() []Peer {
156157

157158
var res []Peer
158159
for _, peer := range peers {
159-
res = append(res, &peerImpl{&peer})
160+
res = append(res, &peerImpl{peer})
160161
}
161162

162163
return res

gossip/election/adapter_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestNewAdapter(t *testing.T) {
4141
peersCluster := newClusterOfPeers("0")
4242
peersCluster.addPeer("peer0", mockGossip)
4343

44-
NewAdapter(mockGossip, selfNetworkMember, []byte("channel0"))
44+
NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"))
4545
}
4646

4747
func TestAdapterImpl_CreateMessage(t *testing.T) {
@@ -52,7 +52,7 @@ func TestAdapterImpl_CreateMessage(t *testing.T) {
5252
}
5353
mockGossip := newGossip("peer0", selfNetworkMember)
5454

55-
adapter := NewAdapter(mockGossip, selfNetworkMember, []byte("channel0"))
55+
adapter := NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"))
5656
msg := adapter.CreateMessage(true)
5757

5858
if !msg.(*msgImpl).msg.IsLeadershipMsg() {
@@ -142,15 +142,15 @@ func TestAdapterImpl_Gossip(t *testing.T) {
142142
case msg := <-channels[fmt.Sprintf("Peer%d", 1)]:
143143
if !msg.IsDeclaration() {
144144
t.Error("Msg should be declaration")
145-
} else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) {
145+
} else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) {
146146
t.Error("Msg Sender is wrong")
147147
} else {
148148
totalMsg++
149149
}
150150
case msg := <-channels[fmt.Sprintf("Peer%d", 2)]:
151151
if !msg.IsDeclaration() {
152152
t.Error("Msg should be declaration")
153-
} else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) {
153+
} else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) {
154154
t.Error("Msg Sender is wrong")
155155
} else {
156156
totalMsg++
@@ -308,7 +308,7 @@ func createCluster(peers ...int) (*clusterOfPeers, map[string]*adapterImpl) {
308308
}
309309

310310
mockGossip := newGossip(peerEndpoint, peerMember)
311-
adapter := NewAdapter(mockGossip, peerMember, []byte("channel0"))
311+
adapter := NewAdapter(mockGossip, peerMember.PKIid, []byte("channel0"))
312312
adapters[peerEndpoint] = adapter.(*adapterImpl)
313313
cluster.addPeer(peerEndpoint, mockGossip)
314314
}

gossip/election/election.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ func (le *leaderElectionSvcImpl) start() {
183183
}
184184

185185
func (le *leaderElectionSvcImpl) handleMessages() {
186-
le.logger.Info(le.id, ": Entering")
187-
defer le.logger.Info(le.id, ": Exiting")
186+
le.logger.Debug(le.id, ": Entering")
187+
defer le.logger.Debug(le.id, ": Exiting")
188188
defer le.stopWG.Done()
189189
msgChan := le.adapter.Accept()
190190
for {
@@ -270,14 +270,14 @@ func (le *leaderElectionSvcImpl) run() {
270270
}
271271

272272
func (le *leaderElectionSvcImpl) leaderElection() {
273-
le.logger.Info(le.id, ": Entering")
274-
defer le.logger.Info(le.id, ": Exiting")
273+
le.logger.Debug(le.id, ": Entering")
274+
defer le.logger.Debug(le.id, ": Exiting")
275275
le.propose()
276276
le.waitForInterrupt(leaderElectionDuration)
277277
// If someone declared itself as a leader, give up
278278
// on trying to become a leader too
279279
if le.isLeaderExists() {
280-
le.logger.Info(le.id, ": Some peer is already a leader")
280+
le.logger.Debug(le.id, ": Some peer is already a leader")
281281
return
282282
}
283283
// Leader doesn't exist, let's see if there is a better candidate than us
@@ -296,8 +296,8 @@ func (le *leaderElectionSvcImpl) leaderElection() {
296296

297297
// propose sends a leadership proposal message to remote peers
298298
func (le *leaderElectionSvcImpl) propose() {
299-
le.logger.Info(le.id, ": Entering")
300-
le.logger.Info(le.id, ": Exiting")
299+
le.logger.Debug(le.id, ": Entering")
300+
le.logger.Debug(le.id, ": Exiting")
301301
leadershipProposal := le.adapter.CreateMessage(false)
302302
le.adapter.Gossip(leadershipProposal)
303303
}
@@ -324,8 +324,8 @@ func (le *leaderElectionSvcImpl) leader() {
324324
// waitForMembershipStabilization waits for membership view to stabilize
325325
// or until a time limit expires, or until a peer declares itself as a leader
326326
func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) {
327-
le.logger.Info(le.id, ": Entering")
328-
defer le.logger.Info(le.id, ": Exiting")
327+
le.logger.Debug(le.id, ": Entering")
328+
defer le.logger.Debug(le.id, ": Exiting, peers found", len(le.adapter.Peers()))
329329
endTime := time.Now().Add(timeLimit)
330330
viewSize := len(le.adapter.Peers())
331331
for !le.shouldStop() {
@@ -368,13 +368,13 @@ func (le *leaderElectionSvcImpl) IsLeader() bool {
368368
}
369369

370370
func (le *leaderElectionSvcImpl) beLeader() {
371-
le.logger.Info(le.id, ": Becoming a leader")
371+
le.logger.Debug(le.id, ": Becoming a leader")
372372
atomic.StoreInt32(&le.isLeader, int32(1))
373373
le.callback(true)
374374
}
375375

376376
func (le *leaderElectionSvcImpl) stopBeingLeader() {
377-
le.logger.Info(le.id, "Stopped being a leader")
377+
le.logger.Debug(le.id, "Stopped being a leader")
378378
atomic.StoreInt32(&le.isLeader, int32(0))
379379
le.callback(false)
380380
}
@@ -385,8 +385,8 @@ func (le *leaderElectionSvcImpl) shouldStop() bool {
385385

386386
// Stop stops the LeaderElectionService
387387
func (le *leaderElectionSvcImpl) Stop() {
388-
le.logger.Info(le.id, ": Entering")
389-
defer le.logger.Info(le.id, ": Exiting")
388+
le.logger.Debug(le.id, ": Entering")
389+
defer le.logger.Debug(le.id, ": Exiting")
390390
atomic.StoreInt32(&le.toDie, int32(1))
391391
le.stopChan <- struct{}{}
392392
le.stopWG.Wait()

gossip/gossip/gossip_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,10 @@ func TestDissemination(t *testing.T) {
531531
leadershipChan, _ := peers[i-1].Accept(acceptLeadershp, false)
532532
go func(index int, ch <-chan *proto.GossipMessage) {
533533
defer wgLeadership.Done()
534-
<-ch
535-
receivedLeadershipMessages[index]++
534+
msg := <-ch
535+
if bytes.Equal(msg.Channel, common.ChainID("A")) {
536+
receivedLeadershipMessages[index]++
537+
}
536538
}(i-1, leadershipChan)
537539
}
538540

@@ -886,9 +888,6 @@ func createDataMsg(seqnum uint64, data []byte, hash string, channel common.Chain
886888

887889
func createLeadershipMsg(isDeclaration bool, channel common.ChainID, incTime uint64, seqNum uint64, endpoint string, pkiid []byte) *proto.GossipMessage {
888890

889-
metadata := []byte{}
890-
metadata = strconv.AppendBool(metadata, isDeclaration)
891-
892891
leadershipMsg := &proto.LeadershipMessage{
893892
IsDeclaration: isDeclaration,
894893
PkiID: pkiid,

0 commit comments

Comments
 (0)