Skip to content

Commit 23a7d50

Browse files
committed
[FAB-4078] StateInfo expiration membership awareness
The stateInfo message store is used in gossip to know the membership of the peers of the channel. Each entry in the message store is the latest StateInfo message a peer has sent either directly or indirectly. Peers send StateInfo messages in the following manner: - Upon reception of a block, a new StateInfo message is created and is propagated to adjacent peers. - Once in a while, a peer send StateInfo pull requests to adjacent peers and they send back the entire StateInfo message set stored in their memory that they have received from other peers (reconciliation). However - the StateInfo message store is an expirable message store - its messages are expired after a limited time. This causes the following problem: If no new blocks are generated in the channel, no new StateInfo messages are generated as a result, and eventually - he StateInfo messages of peers are cleaned up from memory. This causes many problems, namely that when a new peers joins a channel after no new blocks were created in that channel for a while - it will not get StateInfo messages from other peers and thus won't be able to pull blocks from them, unless it is configured to use leader election, and then it will elect itself as a leader and connect directly to the ordering service. Change-Id: Ifa38a714a448ae3708e9852384b5e4107380e947 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 59e0b94 commit 23a7d50

File tree

8 files changed

+254
-114
lines changed

8 files changed

+254
-114
lines changed

gossip/discovery/discovery_impl.go

+3
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
137137

138138
// Lookup returns a network member, or nil if not found
139139
func (d *gossipDiscoveryImpl) Lookup(PKIID common.PKIidType) *NetworkMember {
140+
if bytes.Equal(PKIID, d.self.PKIid) {
141+
return &d.self
142+
}
140143
d.lock.RLock()
141144
defer d.lock.RUnlock()
142145
nm := d.id2Member[string(PKIID)]

gossip/gossip/channel/channel.go

+44-31
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,14 @@ import (
4040
// Config is a configuration item
4141
// of the channel store
4242
type Config struct {
43-
ID string
44-
PublishStateInfoInterval time.Duration
45-
MaxBlockCountToStore int
46-
PullPeerNum int
47-
PullInterval time.Duration
48-
RequestStateInfoInterval time.Duration
49-
43+
ID string
44+
PublishStateInfoInterval time.Duration
45+
MaxBlockCountToStore int
46+
PullPeerNum int
47+
PullInterval time.Duration
48+
RequestStateInfoInterval time.Duration
5049
BlockExpirationInterval time.Duration
51-
StateInfoExpirationInterval time.Duration
50+
StateInfoCacheSweepInterval time.Duration
5251
}
5352

5453
// GossipChannel defines an object that deals with all channel-related messages
@@ -188,13 +187,16 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M
188187
gc.blocksPuller.Remove(seqNumFromMsg(m))
189188
})
190189

191-
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoExpirationInterval)
190+
hashPeerExpiredInMembership := func(o interface{}) bool {
191+
pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
192+
return gc.Lookup(pkiID) == nil
193+
}
194+
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership)
192195

193196
ttl := election.GetMsgExpirationTimeout()
194-
noopFunc := func(m interface{}) {}
195197
pol := proto.NewGossipMessageComparator(0)
196198

197-
gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, noopFunc, ttl, nil, nil, nil)
199+
gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, msgstore.Noop, ttl, nil, nil, nil)
198200

199201
gc.ConfigureChannel(joinMsg)
200202

@@ -521,6 +523,13 @@ func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender c
521523
stateInf, ":", err, "sent from", sender)
522524
return
523525
}
526+
527+
if gc.Lookup(si.PkiId) == nil {
528+
// Skip StateInfo messages that belong to peers
529+
// that have been expired
530+
continue
531+
}
532+
524533
gc.stateInfoMsgStore.Add(stateInf)
525534
}
526535
}
@@ -646,30 +655,29 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
646655
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
647656
}
648657

649-
// NewStateInfoMessageStore returns a expirable MessageStore
650-
// ttl is time duration before msg expires and removed from store
651-
func NewStateInfoMessageStore(ttl time.Duration) msgstore.MessageStore {
652-
return NewStateInfoMessageStoreWithCallback(ttl, nil)
653-
}
654-
655-
// NewStateInfoMessageStoreWithCallback returns a exiprable MessageStore
656-
// Callback invoked once message expires and removed from store
657-
// ttl is time duration before msg expires
658-
func NewStateInfoMessageStoreWithCallback(ttl time.Duration, callback func(m interface{})) msgstore.MessageStore {
659-
pol := proto.NewGossipMessageComparator(0)
660-
noopTrigger := func(m interface{}) {}
661-
return msgstore.NewMessageStoreExpirable(pol, noopTrigger, ttl, nil, nil, callback)
662-
}
663-
664-
func newStateInfoCache(ttl time.Duration) *stateInfoCache {
658+
func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool) *stateInfoCache {
665659
membershipStore := util.NewMembershipStore()
666-
callback := func(m interface{}) {
667-
membershipStore.Remove(m.(*proto.SignedGossipMessage).GetStateInfo().PkiId)
668-
}
660+
pol := proto.NewGossipMessageComparator(0)
669661
s := &stateInfoCache{
670662
MembershipStore: membershipStore,
671-
MessageStore: NewStateInfoMessageStoreWithCallback(ttl, callback),
663+
stopChan: make(chan struct{}),
672664
}
665+
invalidationTrigger := func(m interface{}) {
666+
pkiID := m.(*proto.SignedGossipMessage).GetStateInfo().PkiId
667+
membershipStore.Remove(pkiID)
668+
}
669+
s.MessageStore = msgstore.NewMessageStore(pol, invalidationTrigger)
670+
671+
go func() {
672+
for {
673+
select {
674+
case <-s.stopChan:
675+
return
676+
case <-time.After(sweepInterval):
677+
s.Purge(hasExpired)
678+
}
679+
}
680+
}()
673681
return s
674682
}
675683

@@ -679,6 +687,7 @@ func newStateInfoCache(ttl time.Duration) *stateInfoCache {
679687
type stateInfoCache struct {
680688
*util.MembershipStore
681689
msgstore.MessageStore
690+
stopChan chan struct{}
682691
}
683692

684693
// Add attempts to add the given message to the stateInfoCache,
@@ -693,6 +702,10 @@ func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
693702
return added
694703
}
695704

705+
func (cache *stateInfoCache) Stop() {
706+
cache.stopChan <- struct{}{}
707+
}
708+
696709
// GenerateMAC returns a byte slice that is derived from the peer's PKI-ID
697710
// and a channel name
698711
func GenerateMAC(pkiID common.PKIidType, channelID common.ChainID) []byte {

gossip/gossip/channel/channel_test.go

+113-54
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var conf = Config{
4747
PullInterval: time.Second,
4848
RequestStateInfoInterval: time.Millisecond * 100,
4949
BlockExpirationInterval: time.Second * 6,
50-
StateInfoExpirationInterval: time.Second * 6,
50+
StateInfoCacheSweepInterval: time.Second,
5151
}
5252

5353
func init() {
@@ -196,6 +196,10 @@ func (ga *gossipAdapterMock) GetMembership() []discovery.NetworkMember {
196196

197197
// Lookup returns a network member, or nil if not found
198198
func (ga *gossipAdapterMock) Lookup(PKIID common.PKIidType) *discovery.NetworkMember {
199+
// Ensure we have configured Lookup prior
200+
if !ga.wasMocked("Lookup") {
201+
return &discovery.NetworkMember{}
202+
}
199203
args := ga.Called(PKIID)
200204
if args.Get(0) == nil {
201205
return nil
@@ -205,14 +209,7 @@ func (ga *gossipAdapterMock) Lookup(PKIID common.PKIidType) *discovery.NetworkMe
205209

206210
func (ga *gossipAdapterMock) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
207211
// Ensure we have configured Send prior
208-
foundSend := false
209-
for _, ec := range ga.ExpectedCalls {
210-
if ec.Method == "Send" {
211-
foundSend = true
212-
}
213-
214-
}
215-
if !foundSend {
212+
if !ga.wasMocked("Send") {
216213
return
217214
}
218215
ga.Called(msg, peers)
@@ -238,6 +235,18 @@ func (ga *gossipAdapterMock) GetIdentityByPKIID(pkiID common.PKIidType) api.Peer
238235
return api.PeerIdentityType(pkiID)
239236
}
240237

238+
func (ga *gossipAdapterMock) wasMocked(methodName string) bool {
239+
// The following On call is just to synchronize the ExpectedCalls
240+
// access with 'On' calls from the test goroutine
241+
ga.On("bla", mock.Anything)
242+
for _, ec := range ga.ExpectedCalls {
243+
if ec.Method == methodName {
244+
return true
245+
}
246+
}
247+
return false
248+
}
249+
241250
func configureAdapter(adapter *gossipAdapterMock, members ...discovery.NetworkMember) {
242251
adapter.On("GetConf").Return(conf)
243252
adapter.On("GetMembership").Return(members)
@@ -265,6 +274,98 @@ func TestBadInput(t *testing.T) {
265274
gc.HandleMessage(&receivedMsg{msg: createDataUpdateMsg(0), PKIID: pkiIDnilOrg})
266275
}
267276

277+
func TestMsgStoreNotExpire(t *testing.T) {
278+
t.Parallel()
279+
280+
cs := &cryptoService{}
281+
282+
pkiID1 := common.PKIidType("1")
283+
pkiID2 := common.PKIidType("2")
284+
pkiID3 := common.PKIidType("3")
285+
286+
peer1 := discovery.NetworkMember{PKIid: pkiID2, InternalEndpoint: "1", Endpoint: "1"}
287+
peer2 := discovery.NetworkMember{PKIid: pkiID2, InternalEndpoint: "2", Endpoint: "2"}
288+
peer3 := discovery.NetworkMember{PKIid: pkiID3, InternalEndpoint: "3", Endpoint: "3"}
289+
290+
jcm := &joinChanMsg{
291+
members2AnchorPeers: map[string][]api.AnchorPeer{
292+
string(orgInChannelA): {},
293+
},
294+
}
295+
296+
adapter := new(gossipAdapterMock)
297+
adapter.On("GetOrgOfPeer", pkiID1).Return(orgInChannelA)
298+
adapter.On("GetOrgOfPeer", pkiID2).Return(orgInChannelA)
299+
adapter.On("GetOrgOfPeer", pkiID3).Return(orgInChannelA)
300+
301+
adapter.On("ValidateStateInfoMessage", mock.Anything).Return(nil)
302+
adapter.On("GetMembership").Return([]discovery.NetworkMember{peer2, peer3})
303+
adapter.On("DeMultiplex", mock.Anything)
304+
adapter.On("Gossip", mock.Anything)
305+
adapter.On("GetConf").Return(conf)
306+
307+
gc := NewGossipChannel(pkiID1, orgInChannelA, cs, channelA, adapter, jcm)
308+
gc.UpdateStateInfo(createStateInfoMsg(1, pkiID1, channelA))
309+
// Receive StateInfo messages from other peers
310+
gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(1, pkiID2, channelA)})
311+
gc.HandleMessage(&receivedMsg{PKIID: pkiID3, msg: createStateInfoMsg(1, pkiID3, channelA)})
312+
313+
simulateStateInfoRequest := func(pkiID []byte, outChan chan *proto.SignedGossipMessage) {
314+
sentMessages := make(chan *proto.GossipMessage, 1)
315+
// Ensure we respond to stateInfoSnapshot requests with valid MAC
316+
snapshotReq := &receivedMsg{
317+
PKIID: pkiID,
318+
msg: (&proto.GossipMessage{
319+
Tag: proto.GossipMessage_CHAN_OR_ORG,
320+
Content: &proto.GossipMessage_StateInfoPullReq{
321+
StateInfoPullReq: &proto.StateInfoPullRequest{
322+
Channel_MAC: GenerateMAC(pkiID, channelA),
323+
},
324+
},
325+
}).NoopSign(),
326+
}
327+
snapshotReq.On("Respond", mock.Anything).Run(func(args mock.Arguments) {
328+
sentMessages <- args.Get(0).(*proto.GossipMessage)
329+
})
330+
331+
go gc.HandleMessage(snapshotReq)
332+
select {
333+
case <-time.After(time.Second):
334+
t.Fatal("Haven't received a state info snapshot on time")
335+
case msg := <-sentMessages:
336+
for _, el := range msg.GetStateSnapshot().Elements {
337+
sMsg, err := el.ToGossipMessage()
338+
assert.NoError(t, err)
339+
outChan <- sMsg
340+
}
341+
}
342+
}
343+
344+
c := make(chan *proto.SignedGossipMessage, 3)
345+
simulateStateInfoRequest(pkiID2, c)
346+
assert.Len(t, c, 3)
347+
348+
c = make(chan *proto.SignedGossipMessage, 3)
349+
simulateStateInfoRequest(pkiID3, c)
350+
assert.Len(t, c, 3)
351+
352+
// Now simulate an expiration of peer 3 in the membership view
353+
adapter.On("Lookup", pkiID1).Return(&peer1)
354+
adapter.On("Lookup", pkiID2).Return(&peer2)
355+
adapter.On("Lookup", pkiID3).Return(nil)
356+
// Ensure that we got at least 1 sweep before continuing
357+
// the test
358+
time.Sleep(conf.StateInfoCacheSweepInterval * 2)
359+
360+
c = make(chan *proto.SignedGossipMessage, 3)
361+
simulateStateInfoRequest(pkiID2, c)
362+
assert.Len(t, c, 2)
363+
364+
c = make(chan *proto.SignedGossipMessage, 3)
365+
simulateStateInfoRequest(pkiID3, c)
366+
assert.Len(t, c, 2)
367+
}
368+
268369
func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
269370
t.Parallel()
270371
ledgerHeight := 5
@@ -290,6 +391,7 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
290391
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
291392
stateInfoMsg := createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA)
292393
gc.UpdateStateInfo(stateInfoMsg)
394+
defer gc.Stop()
293395

294396
var msg *proto.SignedGossipMessage
295397
select {
@@ -303,14 +405,6 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
303405
height, err := strconv.ParseInt(string(md), 10, 64)
304406
assert.NoError(t, err, "ReceivedMetadata is invalid")
305407
assert.Equal(t, ledgerHeight, int(height), "Received different ledger height than expected")
306-
307-
// We will not update StateInfo in store, so store will become empty
308-
time.Sleep(conf.StateInfoExpirationInterval + time.Second)
309-
//Store is empty
310-
assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MessageStore.Size(), "StateInfo MessageStore should be empty")
311-
assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MembershipStore.Size(), "StateInfo MembershipStore should be empty")
312-
313-
gc.Stop()
314408
}
315409

316410
func TestChannelMsgStoreEviction(t *testing.T) {
@@ -733,7 +827,7 @@ func TestChannelAddToMessageStore(t *testing.T) {
733827
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
734828
}
735829

736-
func TestChannelAddToMessageStoreExpire(t *testing.T) {
830+
func TestChannelBlockExpiration(t *testing.T) {
737831
t.Parallel()
738832

739833
cs := &cryptoService{}
@@ -747,7 +841,6 @@ func TestChannelAddToMessageStoreExpire(t *testing.T) {
747841
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
748842
demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage)
749843
})
750-
751844
respondedChan := make(chan *proto.GossipMessage, 1)
752845
messageRelayer := func(arg mock.Arguments) {
753846
msg := arg.Get(0).(*proto.GossipMessage)
@@ -1061,52 +1154,18 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
10611154
// Ensure we don't crash if we got a stateInfoMessage from a peer that its org isn't known
10621155
invalidStateInfoSnapshot = stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, common.PKIidType("unknown"), channelA))
10631156
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot})
1064-
// Lets expire msg in store
1065-
time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second)
1066-
1067-
// Lets check is state info store can't add expired msg but appear as empty to outside world
1068-
gc.HandleMessage(stateInfoMsg)
1069-
assert.Empty(t, gc.GetPeers())
1070-
// Lets see if snapshot now empty, after message in store expired
1071-
go gc.HandleMessage(snapshotReq)
1072-
select {
1073-
case <-time.After(time.Second):
1074-
t.Fatal("Haven't received a state info snapshot on time")
1075-
case msg := <-sentMessages:
1076-
elements := msg.GetStateSnapshot().Elements
1077-
assert.Len(t, elements, 0, "StateInfo snapshot should contain zero messages")
1078-
}
1079-
1080-
// Lets make sure msg removed from store
1081-
time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second)
1082-
1083-
// Lets check is state info store add just expired msg
1084-
gc.HandleMessage(stateInfoMsg)
1085-
assert.NotEmpty(t, gc.GetPeers())
1086-
// Lets see if snapshot is not empty now, after message was added back to store
1087-
go gc.HandleMessage(snapshotReq)
1088-
select {
1089-
case <-time.After(time.Second):
1090-
t.Fatal("Haven't received a state info snapshot on time")
1091-
case msg := <-sentMessages:
1092-
elements := msg.GetStateSnapshot().Elements
1093-
assert.Len(t, elements, 1)
1094-
sMsg, err := elements[0].ToGossipMessage()
1095-
assert.NoError(t, err)
1096-
assert.Equal(t, []byte("4"), sMsg.GetStateInfo().Metadata)
1097-
}
10981157
}
10991158

11001159
func TestInterOrgExternalEndpointDisclosure(t *testing.T) {
11011160
t.Parallel()
1102-
11031161
cs := &cryptoService{}
11041162
adapter := new(gossipAdapterMock)
11051163
pkiID1 := common.PKIidType("withExternalEndpoint")
11061164
pkiID2 := common.PKIidType("noExternalEndpoint")
11071165
pkiID3 := common.PKIidType("pkiIDinOrg2")
11081166
adapter.On("Lookup", pkiID1).Return(&discovery.NetworkMember{Endpoint: "localhost:5000"})
11091167
adapter.On("Lookup", pkiID2).Return(&discovery.NetworkMember{})
1168+
adapter.On("Lookup", pkiID3).Return(&discovery.NetworkMember{})
11101169
adapter.On("GetOrgOfPeer", pkiID1).Return(orgInChannelA)
11111170
adapter.On("GetOrgOfPeer", pkiID2).Return(orgInChannelA)
11121171
adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2"))

gossip/gossip/chanstate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (ga *gossipAdapterImpl) GetConf() channel.Config {
133133
PullPeerNum: ga.conf.PullPeerNum,
134134
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval,
135135
BlockExpirationInterval: ga.conf.PullInterval * 100,
136-
StateInfoExpirationInterval: ga.conf.PublishStateInfoInterval * 100,
136+
StateInfoCacheSweepInterval: ga.conf.PullInterval * 5,
137137
}
138138
}
139139

0 commit comments

Comments
 (0)