Skip to content

Commit 05f811f

Browse files
committed
[FAB-3444] Gossip: pulled blocks aren't evicted
In Gossip, the component that handles pulling messages of blocks is the pullMediator . There is a bug that was discovered, that blocks may enter the pull mediator's internal map, but are never evicted. This is because the eviction logic only worked for blocks that were manually added, but blocks that were consumed by update messages were never removed by the dedicated callback (that worked for blocks that were manually added) I fixed the bug by filtering the incoming data request with the help of the message store, and thus - blocks that are old don't enter the pull mediator. This effectively makes all blocks of the pull mediator a subset of the blocks in the message store (which has a fixed upper bound) Blocks that are new, cause eviction of old blocks from the pull mediator in the callback in channel.go line 185 Change-Id: Idfcaeff68f941cc5ac7e437968ec8fa4d4a8fe31 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 3a2a717 commit 05f811f

File tree

6 files changed

+152
-46
lines changed

6 files changed

+152
-46
lines changed

gossip/gossip/certstore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
384384
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
385385

386386
var certStore *certStore
387-
adapter := pull.PullAdapter{
387+
adapter := &pull.PullAdapter{
388388
Sndr: sender,
389389
MsgCons: func(msg *proto.SignedGossipMessage) {
390390
certStore.idMapper.Put(msg.GetPeerIdentity().PkiId, msg.GetPeerIdentity().Cert)

gossip/gossip/channel/channel.go

+17-16
Original file line numberDiff line numberDiff line change
@@ -282,25 +282,13 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator {
282282
}
283283
return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
284284
}
285-
blockConsumer := func(msg *proto.SignedGossipMessage) {
286-
dataMsg := msg.GetDataMsg()
287-
if dataMsg == nil || dataMsg.Payload == nil {
288-
gc.logger.Warning("Invalid DataMessage:", dataMsg)
289-
return
290-
}
291-
added := gc.blockMsgStore.Add(msg)
292-
// if we can't add the message to the msgStore,
293-
// no point in disseminating it to others...
294-
if !added {
295-
return
296-
}
297-
gc.DeMultiplex(msg)
298-
}
299-
adapter := pull.PullAdapter{
285+
adapter := &pull.PullAdapter{
300286
Sndr: gc,
301287
MemSvc: gc.memFilter,
302288
IdExtractor: seqNumFromMsg,
303-
MsgCons: blockConsumer,
289+
MsgCons: func(msg *proto.SignedGossipMessage) {
290+
gc.DeMultiplex(msg)
291+
},
304292
}
305293
return pull.NewPullMediator(conf, adapter)
306294
}
@@ -446,6 +434,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
446434
return
447435
}
448436
if m.IsDataUpdate() {
437+
// Iterate over the envelopes, and filter out blocks
438+
// that we already have in the blockMsgStore, or blocks that
439+
// are too far in the past.
440+
filteredEnvelopes := []*proto.Envelope{}
449441
for _, item := range m.GetDataUpdate().Data {
450442
gMsg, err := item.ToGossipMessage()
451443
if err != nil {
@@ -459,7 +451,16 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
459451
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
460452
return
461453
}
454+
added := gc.blockMsgStore.Add(gMsg)
455+
if !added {
456+
// If this block doesn't need to be added, it means it either already
457+
// exists in memory or that it is too far in the past
458+
continue
459+
}
460+
filteredEnvelopes = append(filteredEnvelopes, item)
462461
}
462+
// Replace the update message with just the blocks that should be processed
463+
m.GetDataUpdate().Data = filteredEnvelopes
463464
}
464465
gc.blocksPuller.HandleMessage(msg)
465466
}

gossip/gossip/channel/channel_test.go

+122-11
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,103 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
313313
gc.Stop()
314314
}
315315

316+
func TestChannelMsgStoreEviction(t *testing.T) {
317+
t.Parallel()
318+
// Scenario: Create 4 phases in which the pull mediator of the channel would receive blocks
319+
// via pull.
320+
// The total amount of blocks should be restricted by the capacity of the message store.
321+
// After the pull phases end, we ensure that only the latest blocks are preserved in the pull
322+
// mediator, and the old blocks were evicted.
323+
// We test this by sending a hello message to the pull mediator and inspecting the digest message
324+
// returned as a response.
325+
326+
cs := &cryptoService{}
327+
cs.On("VerifyBlock", mock.Anything).Return(nil)
328+
adapter := new(gossipAdapterMock)
329+
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
330+
adapter.On("Gossip", mock.Anything)
331+
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
332+
})
333+
334+
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
335+
defer gc.Stop()
336+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})
337+
338+
var wg sync.WaitGroup
339+
340+
msgsPerPhase := uint64(50)
341+
lastPullPhase := make(chan uint64, msgsPerPhase)
342+
totalPhases := uint64(4)
343+
phaseNum := uint64(0)
344+
wg.Add(int(totalPhases))
345+
346+
adapter.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
347+
msg := args.Get(0).(*proto.SignedGossipMessage)
348+
// Ignore all other messages sent like StateInfo messages
349+
if !msg.IsPullMsg() {
350+
return
351+
}
352+
// Stop the pull when we reach the final phase
353+
if atomic.LoadUint64(&phaseNum) == totalPhases && msg.IsHelloMsg() {
354+
return
355+
}
356+
357+
start := atomic.LoadUint64(&phaseNum) * msgsPerPhase
358+
end := start + msgsPerPhase
359+
if msg.IsHelloMsg() {
360+
// Advance phase
361+
atomic.AddUint64(&phaseNum, uint64(1))
362+
}
363+
364+
// Create and execute the current phase of pull
365+
currSeq := sequence(start, end)
366+
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {}, currSeq...)
367+
pullPhase(args)
368+
369+
// If we finished the last phase, save the sequence to be used later for inspection
370+
if msg.IsDataReq() && atomic.LoadUint64(&phaseNum) == totalPhases {
371+
for _, seq := range currSeq {
372+
lastPullPhase <- seq
373+
}
374+
close(lastPullPhase)
375+
}
376+
})
377+
// Wait for all pull phases to end
378+
wg.Wait()
379+
380+
msgSentFromPullMediator := make(chan *proto.GossipMessage, 1)
381+
382+
helloMsg := createHelloMsg(pkiIDInOrg1)
383+
helloMsg.On("Respond", mock.Anything).Run(func(arg mock.Arguments) {
384+
msg := arg.Get(0).(*proto.GossipMessage)
385+
if !msg.IsDigestMsg() {
386+
return
387+
}
388+
msgSentFromPullMediator <- msg
389+
})
390+
gc.HandleMessage(helloMsg)
391+
select {
392+
case msg := <-msgSentFromPullMediator:
393+
// This is just to check that we responded with a digest on time.
394+
// Put message back into the channel for further inspection
395+
msgSentFromPullMediator <- msg
396+
case <-time.After(time.Second * 5):
397+
t.Fatal("Didn't reply with a digest on time")
398+
}
399+
// Only 1 digest sent
400+
assert.Len(t, msgSentFromPullMediator, 1)
401+
msg := <-msgSentFromPullMediator
402+
// It's a digest and not anything else, like an update
403+
assert.True(t, msg.IsDigestMsg())
404+
assert.Len(t, msg.GetDataDig().Digests, adapter.GetConf().MaxBlockCountToStore+1)
405+
// Check that the last sequences are kept.
406+
// Since we checked the length, it proves that the old blocks were discarded, since we had much more
407+
// total blocks overall than our capacity
408+
for seq := range lastPullPhase {
409+
assert.Contains(t, msg.GetDataDig().Digests, fmt.Sprintf("%d", seq))
410+
}
411+
}
412+
316413
func TestChannelPull(t *testing.T) {
317414
t.Parallel()
318415
cs := &cryptoService{}
@@ -334,7 +431,7 @@ func TestChannelPull(t *testing.T) {
334431
go gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})
335432

336433
var wg sync.WaitGroup
337-
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {})
434+
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {}, 10, 11)
338435
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase)
339436

340437
wg.Wait()
@@ -696,7 +793,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
696793
env.Payload = sMsg.NoopSign().Payload
697794
}
698795

699-
pullPhase1 := simulatePullPhase(gc, t, &wg, changeChan)
796+
pullPhase1 := simulatePullPhase(gc, t, &wg, changeChan, 10, 11)
700797
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase1)
701798
adapter.On("DeMultiplex", mock.Anything)
702799
wg.Wait()
@@ -718,7 +815,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
718815
noop := func(env *proto.Envelope) {
719816

720817
}
721-
pullPhase2 := simulatePullPhase(gc, t, &wg2, noop)
818+
pullPhase2 := simulatePullPhase(gc, t, &wg2, noop, 10, 11)
722819
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase2)
723820
wg2.Wait()
724821
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
@@ -740,7 +837,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
740837
sMsg.GossipMessage.GetDataMsg().Payload = nil
741838
env.Payload = sMsg.NoopSign().Payload
742839
}
743-
pullPhase3 := simulatePullPhase(gc, t, &wg3, emptyBlock)
840+
pullPhase3 := simulatePullPhase(gc, t, &wg3, emptyBlock, 10, 11)
744841
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase3)
745842
wg3.Wait()
746843
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
@@ -763,7 +860,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
763860
sMsg.Content = createHelloMsg(pkiIDInOrg1).GetGossipMessage().Content
764861
env.Payload = sMsg.NoopSign().Payload
765862
}
766-
pullPhase4 := simulatePullPhase(gc, t, &wg4, nonBlockMsg)
863+
pullPhase4 := simulatePullPhase(gc, t, &wg4, nonBlockMsg, 10, 11)
767864
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase4)
768865
wg4.Wait()
769866
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
@@ -1253,19 +1350,23 @@ func TestOnDemandGossip(t *testing.T) {
12531350
}
12541351
}
12551352

1256-
func createDataUpdateMsg(nonce uint64) *proto.SignedGossipMessage {
1257-
return (&proto.GossipMessage{
1353+
func createDataUpdateMsg(nonce uint64, seqs ...uint64) *proto.SignedGossipMessage {
1354+
msg := &proto.GossipMessage{
12581355
Nonce: 0,
12591356
Channel: []byte(channelA),
12601357
Tag: proto.GossipMessage_CHAN_AND_ORG,
12611358
Content: &proto.GossipMessage_DataUpdate{
12621359
DataUpdate: &proto.DataUpdate{
12631360
MsgType: proto.PullMsgType_BLOCK_MSG,
12641361
Nonce: nonce,
1265-
Data: []*proto.Envelope{createDataMsg(10, channelA).Envelope, createDataMsg(11, channelA).Envelope},
1362+
Data: []*proto.Envelope{},
12661363
},
12671364
},
1268-
}).NoopSign()
1365+
}
1366+
for _, seq := range seqs {
1367+
msg.GetDataUpdate().Data = append(msg.GetDataUpdate().Data, createDataMsg(seq, channelA).Envelope)
1368+
}
1369+
return (msg).NoopSign()
12691370
}
12701371

12711372
func createHelloMsg(PKIID common.PKIidType) *receivedMsg {
@@ -1348,7 +1449,7 @@ func createDataMsg(seqnum uint64, channel common.ChainID) *proto.SignedGossipMes
13481449
}).NoopSign()
13491450
}
13501451

1351-
func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator) func(args mock.Arguments) {
1452+
func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator, seqs ...uint64) func(args mock.Arguments) {
13521453
var l sync.Mutex
13531454
var sentHello bool
13541455
var sentReq bool
@@ -1386,11 +1487,21 @@ func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutat
13861487
// from the imaginary peer that got the request
13871488
dataUpdateMsg := new(receivedMsg)
13881489
dataUpdateMsg.PKIID = pkiIDInOrg1
1389-
dataUpdateMsg.msg = createDataUpdateMsg(dataReq.Nonce)
1490+
dataUpdateMsg.msg = createDataUpdateMsg(dataReq.Nonce, seqs...)
13901491
mutator(dataUpdateMsg.msg.GetDataUpdate().Data[0])
13911492
gc.HandleMessage(dataUpdateMsg)
13921493
wg.Done()
13931494
}
13941495
}
1496+
}
1497+
1498+
func sequence(start uint64, end uint64) []uint64 {
1499+
sequence := make([]uint64, end-start+1)
1500+
i := 0
1501+
for n := start; n <= end; n++ {
1502+
sequence[i] = n
1503+
i++
1504+
}
1505+
return sequence
13951506

13961507
}

gossip/gossip/gossip_impl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
965965
}
966966
g.logger.Info("Learned of a new certificate:", idMsg.Cert)
967967
}
968-
adapter := pull.PullAdapter{
968+
adapter := &pull.PullAdapter{
969969
Sndr: g.comm,
970970
MemSvc: g.disc,
971971
IdExtractor: pkiIDFromMsg,

gossip/gossip/pull/pullstore.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -115,19 +115,16 @@ type Mediator interface {
115115
// pullMediatorImpl is an implementation of Mediator
116116
type pullMediatorImpl struct {
117117
sync.RWMutex
118-
Sender
118+
*PullAdapter
119119
msgType2Hook map[MsgType][]MessageHook
120-
idExtractor proto.IdentifierExtractor
121-
msgCons proto.MsgConsumer
122120
config Config
123121
logger *logging.Logger
124122
itemID2Msg map[string]*proto.SignedGossipMessage
125-
memBvc MembershipService
126123
engine *algo.PullEngine
127124
}
128125

129126
// NewPullMediator returns a new Mediator
130-
func NewPullMediator(config Config, adapter PullAdapter) Mediator {
127+
func NewPullMediator(config Config, adapter *PullAdapter) Mediator {
131128
digFilter := adapter.DigFilter
132129

133130
acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool {
@@ -141,14 +138,11 @@ func NewPullMediator(config Config, adapter PullAdapter) Mediator {
141138
}
142139

143140
p := &pullMediatorImpl{
144-
msgCons: adapter.MsgCons,
141+
PullAdapter: adapter,
145142
msgType2Hook: make(map[MsgType][]MessageHook),
146-
idExtractor: adapter.IdExtractor,
147143
config: config,
148144
logger: util.GetLogger(util.LoggingPullModule, config.ID),
149145
itemID2Msg: make(map[string]*proto.SignedGossipMessage),
150-
memBvc: adapter.MemSvc,
151-
Sender: adapter.Sndr,
152146
}
153147

154148
p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext())
@@ -196,8 +190,8 @@ func (p *pullMediatorImpl) HandleMessage(m proto.ReceivedMessage) {
196190
p.logger.Warning("Data update contains an invalid message:", err)
197191
return
198192
}
199-
p.msgCons(msg)
200-
itemIDs[i] = p.idExtractor(msg)
193+
p.MsgCons(msg)
194+
itemIDs[i] = p.IdExtractor(msg)
201195
items[i] = msg
202196
p.Lock()
203197
p.itemID2Msg[itemIDs[i]] = msg
@@ -228,7 +222,7 @@ func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType MsgType, hook MessageHook
228222
func (p *pullMediatorImpl) Add(msg *proto.SignedGossipMessage) {
229223
p.Lock()
230224
defer p.Unlock()
231-
itemID := p.idExtractor(msg)
225+
itemID := p.IdExtractor(msg)
232226
p.itemID2Msg[itemID] = msg
233227
p.engine.Add(itemID)
234228
}
@@ -244,7 +238,7 @@ func (p *pullMediatorImpl) Remove(digest string) {
244238

245239
// SelectPeers returns a slice of peers which the engine will initiate the protocol with
246240
func (p *pullMediatorImpl) SelectPeers() []string {
247-
remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.memBvc.GetMembership())
241+
remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.MemSvc.GetMembership())
248242
endpoints := make([]string, len(remotePeers))
249243
for i, peer := range remotePeers {
250244
endpoints[i] = peer.Endpoint
@@ -269,7 +263,7 @@ func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
269263
}
270264

271265
p.logger.Debug("Sending hello to", dest)
272-
p.Send(helloMsg.NoopSign(), p.peersWithEndpoints(dest)...)
266+
p.Sndr.Send(helloMsg.NoopSign(), p.peersWithEndpoints(dest)...)
273267
}
274268

275269
// SendDigest sends a digest to a remote PullEngine.
@@ -307,7 +301,7 @@ func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
307301
},
308302
}
309303
p.logger.Debug("Sending", req, "to", dest)
310-
p.Send(req.NoopSign(), p.peersWithEndpoints(dest)...)
304+
p.Sndr.Send(req.NoopSign(), p.peersWithEndpoints(dest)...)
311305
}
312306

313307
// SendRes sends an array of items to a remote PullEngine identified by a context.
@@ -339,7 +333,7 @@ func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce ui
339333

340334
func (p *pullMediatorImpl) peersWithEndpoints(endpoints ...string) []*comm.RemotePeer {
341335
peers := []*comm.RemotePeer{}
342-
for _, member := range p.memBvc.GetMembership() {
336+
for _, member := range p.MemSvc.GetMembership() {
343337
for _, endpoint := range endpoints {
344338
if member.PreferredEndpoint() == endpoint {
345339
peers = append(peers, &comm.RemotePeer{Endpoint: member.PreferredEndpoint(), PKIID: member.PKIid})

gossip/gossip/pull/pullstore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pu
143143
blockConsumer := func(msg *proto.SignedGossipMessage) {
144144
inst.items.Add(msg.GetDataMsg().Payload.SeqNum)
145145
}
146-
adapter := PullAdapter{
146+
adapter := &PullAdapter{
147147
Sndr: inst,
148148
MemSvc: inst,
149149
IdExtractor: seqNumFromMsg,

0 commit comments

Comments
 (0)