Skip to content

Commit 55d96b2

Browse files
committed
[FAB-3114] Gossip identity expiration
In gossip we have peer identities that are stored in-memory. This commit adds 2 timely checks that deletes the identity from memory and also closes connections to corresponding peers. - That the certificate (identity) didn't expire - If the identity hasn't been used for a long time Change-Id: I901b60e2a09c11202f305106991b7eadd5f7b29c Signed-off-by: Yacov Manevich <[email protected]>
1 parent 852997a commit 55d96b2

9 files changed

+333
-79
lines changed

gossip/gossip/certstore.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a
6161
}
6262

6363
puller.Add(certStore.createIdentityMessage())
64-
puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.SignedGossipMessage, _ proto.ReceivedMessage) {
64+
puller.RegisterMsgHook(pull.RequestMsgType, func(_ []string, msgs []*proto.SignedGossipMessage, _ proto.ReceivedMessage) {
6565
for _, msg := range msgs {
6666
pkiID := common.PKIidType(msg.GetPeerIdentity().PkiId)
6767
cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert)
@@ -144,7 +144,11 @@ func (cs *certStore) createIdentityMessage() *proto.SignedGossipMessage {
144144
}
145145

146146
func (cs *certStore) listRevokedPeers(isSuspected api.PeerSuspector) []common.PKIidType {
147-
return cs.idMapper.ListRevokedPeers(isSuspected)
147+
revokedPeers := cs.idMapper.ListInvalidIdentities(isSuspected)
148+
for _, pkiID := range revokedPeers {
149+
cs.pull.Remove(string(pkiID))
150+
}
151+
return revokedPeers
148152
}
149153

150154
func (cs *certStore) stop() {

gossip/gossip/certstore_test.go

+161-39
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/hyperledger/fabric/gossip/api"
2525
"github.com/hyperledger/fabric/gossip/comm"
26+
"github.com/hyperledger/fabric/gossip/common"
2627
"github.com/hyperledger/fabric/gossip/discovery"
2728
"github.com/hyperledger/fabric/gossip/gossip/algo"
2829
"github.com/hyperledger/fabric/gossip/gossip/pull"
@@ -41,6 +42,12 @@ func init() {
4142
algo.SetResponseWaitTime(shortenedWaitTime)
4243
}
4344

45+
var (
46+
cs = &naiveCryptoService{
47+
revokedPkiIDS: make(map[string]struct{}),
48+
}
49+
)
50+
4451
type pullerMock struct {
4552
mock.Mock
4653
pull.Mediator
@@ -90,78 +97,135 @@ func TestCertStoreBadSignature(t *testing.T) {
9097
badSignature := func(nonce uint64) proto.ReceivedMessage {
9198
return createUpdateMessage(nonce, createBadlySignedUpdateMessage())
9299
}
93-
94-
testCertificateUpdate(t, badSignature, false)
100+
pm, cs, _ := createObjects(badSignature, nil)
101+
defer pm.Stop()
102+
testCertificateUpdate(t, false, cs)
95103
}
96104

97105
func TestCertStoreMismatchedIdentity(t *testing.T) {
98106
mismatchedIdentity := func(nonce uint64) proto.ReceivedMessage {
99107
return createUpdateMessage(nonce, createMismatchedUpdateMessage())
100108
}
101109

102-
testCertificateUpdate(t, mismatchedIdentity, false)
110+
pm, cs, _ := createObjects(mismatchedIdentity, nil)
111+
defer pm.Stop()
112+
testCertificateUpdate(t, false, cs)
103113
}
104114

105115
func TestCertStoreShouldSucceed(t *testing.T) {
106116
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
107117
return createUpdateMessage(nonce, createValidUpdateMessage())
108118
}
109119

110-
testCertificateUpdate(t, totallyFineIdentity, true)
120+
pm, cs, _ := createObjects(totallyFineIdentity, nil)
121+
defer pm.Stop()
122+
testCertificateUpdate(t, true, cs)
111123
}
112124

113-
func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.ReceivedMessage, shouldSucceed bool) {
114-
config := pull.Config{
115-
MsgType: proto.PullMsgType_IDENTITY_MSG,
116-
PeerCountToSelect: 1,
117-
PullInterval: time.Millisecond * 500,
118-
Tag: proto.GossipMessage_EMPTY,
119-
Channel: nil,
120-
ID: "id1",
121-
}
122-
sender := &senderMock{}
123-
memberSvc := &membershipSvcMock{}
124-
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
125-
adapter := pull.PullAdapter{
126-
Sndr: sender,
127-
MemSvc: memberSvc,
128-
IdExtractor: func(msg *proto.SignedGossipMessage) string {
129-
return string(msg.GetPeerIdentity().PkiId)
130-
},
131-
MsgCons: func(msg *proto.SignedGossipMessage) {
125+
func TestCertExpiration(t *testing.T) {
126+
identityExpCheckInterval := identityExpirationCheckInterval
127+
defer func() {
128+
identityExpirationCheckInterval = identityExpCheckInterval
129+
cs.revokedPkiIDS = map[string]struct{}{}
130+
}()
132131

133-
},
132+
identityExpirationCheckInterval = time.Second
133+
134+
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
135+
return createUpdateMessage(nonce, createValidUpdateMessage())
134136
}
135-
pullMediator := pull.NewPullMediator(config, adapter)
136-
certStore := newCertStore(&pullerMock{
137-
Mediator: pullMediator,
138-
}, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{})
139137

140-
defer pullMediator.Stop()
138+
askedForIdentity := make(chan struct{}, 1)
139+
140+
pm, cStore, sender := createObjects(totallyFineIdentity, func(message *proto.SignedGossipMessage) {
141+
askedForIdentity <- struct{}{}
142+
})
143+
defer pm.Stop()
144+
testCertificateUpdate(t, true, cStore)
145+
// Should have asked for an identity for the first time
146+
assert.Len(t, askedForIdentity, 1)
147+
// Drain channel
148+
<-askedForIdentity
149+
// Now it's 0
150+
assert.Len(t, askedForIdentity, 0)
141151

142-
wg := sync.WaitGroup{}
143-
wg.Add(1)
144152
sentHello := false
145-
sentDataReq := false
146153
l := sync.Mutex{}
147-
sender.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
154+
senderMock := mock.Mock{}
155+
senderMock.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
148156
msg := arg.Get(0).(*proto.SignedGossipMessage)
149157
l.Lock()
150158
defer l.Unlock()
151159

152160
if hello := msg.GetHello(); hello != nil && !sentHello {
153161
sentHello = true
154-
go certStore.handleMessage(createDigest(hello.Nonce))
162+
dig := &proto.GossipMessage{
163+
Tag: proto.GossipMessage_EMPTY,
164+
Content: &proto.GossipMessage_DataDig{
165+
DataDig: &proto.DataDigest{
166+
Nonce: hello.Nonce,
167+
MsgType: proto.PullMsgType_IDENTITY_MSG,
168+
Digests: []string{"B"},
169+
},
170+
},
171+
}
172+
go cStore.handleMessage(&sentMsg{msg: dig.NoopSign()})
155173
}
156174

157-
if dataReq := msg.GetDataReq(); dataReq != nil && !sentDataReq {
158-
sentDataReq = true
159-
certStore.handleMessage(updateFactory(dataReq.Nonce))
160-
wg.Done()
175+
if dataReq := msg.GetDataReq(); dataReq != nil {
176+
askedForIdentity <- struct{}{}
177+
}
178+
})
179+
sender.Mock = senderMock
180+
testCertificateUpdate(t, true, cStore)
181+
// Shouldn't have asked, because already got identity
182+
select {
183+
case <-time.After(time.Second * 3):
184+
case <-askedForIdentity:
185+
assert.Fail(t, "Shouldn't have asked for an identity, becase we already have it")
186+
}
187+
assert.Len(t, askedForIdentity, 0)
188+
// Revoke the identity
189+
cs.revoke(common.PKIidType("B"))
190+
cStore.listRevokedPeers(func(id api.PeerIdentityType) bool {
191+
return string(id) == "B"
192+
})
193+
sentHello = false
194+
l = sync.Mutex{}
195+
senderMock = mock.Mock{}
196+
senderMock.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
197+
msg := arg.Get(0).(*proto.SignedGossipMessage)
198+
l.Lock()
199+
defer l.Unlock()
200+
201+
if hello := msg.GetHello(); hello != nil && !sentHello {
202+
sentHello = true
203+
dig := &proto.GossipMessage{
204+
Tag: proto.GossipMessage_EMPTY,
205+
Content: &proto.GossipMessage_DataDig{
206+
DataDig: &proto.DataDigest{
207+
Nonce: hello.Nonce,
208+
MsgType: proto.PullMsgType_IDENTITY_MSG,
209+
Digests: []string{"B"},
210+
},
211+
},
212+
}
213+
go cStore.handleMessage(&sentMsg{msg: dig.NoopSign()})
214+
}
215+
216+
if dataReq := msg.GetDataReq(); dataReq != nil {
217+
askedForIdentity <- struct{}{}
161218
}
162219
})
163-
wg.Wait()
164220

221+
select {
222+
case <-time.After(time.Second * 5):
223+
assert.Fail(t, "Didn't ask for identity, but should have. Looks like identity hasn't expired")
224+
case <-askedForIdentity:
225+
}
226+
}
227+
228+
func testCertificateUpdate(t *testing.T, shouldSucceed bool, certStore *certStore) {
165229
hello := &sentMsg{
166230
msg: (&proto.GossipMessage{
167231
Channel: []byte(""),
@@ -302,3 +366,61 @@ func createDigest(nonce uint64) proto.ReceivedMessage {
302366
}
303367
return &sentMsg{msg: digest.NoopSign()}
304368
}
369+
370+
func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons proto.MsgConsumer) (pull.Mediator, *certStore, *senderMock) {
371+
if msgCons == nil {
372+
msgCons = func(_ *proto.SignedGossipMessage) {}
373+
}
374+
config := pull.Config{
375+
MsgType: proto.PullMsgType_IDENTITY_MSG,
376+
PeerCountToSelect: 1,
377+
PullInterval: time.Millisecond * 500,
378+
Tag: proto.GossipMessage_EMPTY,
379+
Channel: nil,
380+
ID: "id1",
381+
}
382+
sender := &senderMock{}
383+
memberSvc := &membershipSvcMock{}
384+
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
385+
386+
var certStore *certStore
387+
adapter := pull.PullAdapter{
388+
Sndr: sender,
389+
MsgCons: func(msg *proto.SignedGossipMessage) {
390+
certStore.idMapper.Put(msg.GetPeerIdentity().PkiId, msg.GetPeerIdentity().Cert)
391+
msgCons(msg)
392+
},
393+
IdExtractor: func(msg *proto.SignedGossipMessage) string {
394+
return string(msg.GetPeerIdentity().PkiId)
395+
},
396+
MemSvc: memberSvc,
397+
}
398+
pullMediator := pull.NewPullMediator(config, adapter)
399+
certStore = newCertStore(&pullerMock{
400+
Mediator: pullMediator,
401+
}, identity.NewIdentityMapper(cs), api.PeerIdentityType("SELF"), cs)
402+
403+
wg := sync.WaitGroup{}
404+
wg.Add(1)
405+
sentHello := false
406+
sentDataReq := false
407+
l := sync.Mutex{}
408+
sender.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
409+
msg := arg.Get(0).(*proto.SignedGossipMessage)
410+
l.Lock()
411+
defer l.Unlock()
412+
413+
if hello := msg.GetHello(); hello != nil && !sentHello {
414+
sentHello = true
415+
go certStore.handleMessage(createDigest(hello.Nonce))
416+
}
417+
418+
if dataReq := msg.GetDataReq(); dataReq != nil && !sentDataReq {
419+
sentDataReq = true
420+
certStore.handleMessage(updateFactory(dataReq.Nonce))
421+
wg.Done()
422+
}
423+
})
424+
wg.Wait()
425+
return pullMediator, certStore, sender
426+
}

gossip/gossip/channel/channel.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,13 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M
179179

180180
gc.blocksPuller = gc.createBlockPuller()
181181

182+
seqNumFromMsg := func(m interface{}) string {
183+
return fmt.Sprintf("%d", m.(*proto.SignedGossipMessage).GetDataMsg().Payload.SeqNum)
184+
}
182185
gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) {
183-
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
186+
gc.blocksPuller.Remove(seqNumFromMsg(m))
184187
}, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) {
185-
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
188+
gc.blocksPuller.Remove(seqNumFromMsg(m))
186189
})
187190

188191
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoExpirationInterval)

gossip/gossip/gossip_impl.go

+33
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ const (
4646
acceptChanSize = 100
4747
)
4848

49+
var (
50+
identityExpirationCheckInterval = time.Hour * 24
51+
identityInactivityCheckInterval = time.Minute * 10
52+
)
53+
4954
type channelRoutingFilterFactory func(channel.GossipChannel) filter.RoutingFilter
5055

5156
type gossipServiceImpl struct {
@@ -128,6 +133,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
128133
}
129134

130135
go g.start()
136+
go g.periodicalIdentityValidationAndExpiration()
131137

132138
return g
133139
}
@@ -187,6 +193,33 @@ func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
187193
}
188194
}
189195

196+
func (g *gossipServiceImpl) periodicalIdentityValidationAndExpiration() {
197+
// We check once every identityExpirationCheckInterval for identities that have been expired
198+
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
199+
// We need to validate every identity to check if it has been expired
200+
return true
201+
}, identityExpirationCheckInterval)
202+
203+
// We check once every identityInactivityCheckInterval for identities that have not been used for a long time
204+
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
205+
// We don't validate any identity, because we just want to know whether
206+
// it has not been used for a long time
207+
return false
208+
}, identityInactivityCheckInterval)
209+
}
210+
211+
func (g *gossipServiceImpl) periodicalIdentityValidation(suspectFunc api.PeerSuspector, interval time.Duration) {
212+
for {
213+
select {
214+
case s := <-g.toDieChan:
215+
g.toDieChan <- s
216+
return
217+
case <-time.After(interval):
218+
g.SuspectPeers(suspectFunc)
219+
}
220+
}
221+
}
222+
190223
func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
191224
for _, ap := range anchorPeers {
192225
if ap.Host == "" {

gossip/gossip/gossip_test.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func init() {
5555
discovery.SetReconnectInterval(aliveTimeInterval)
5656
testWG.Add(7)
5757
factory.InitFactories(nil)
58+
identityExpirationCheckInterval = time.Second
5859
}
5960

6061
var orgInChannelA = api.OrgIdentityType("ORG1")
@@ -984,11 +985,11 @@ func TestDisseminateAll2All(t *testing.T) {
984985
testWG.Done()
985986
}
986987

987-
func TestRevocation(t *testing.T) {
988+
func TestIdentityExpiration(t *testing.T) {
988989
t.Parallel()
989-
// Scenario: spawn 4 peers and revoke one of them.
990-
// The rest of the peers should not be able to communicate with
991-
// the revoked peer at all.
990+
// Scenario: spawn 4 peers and make the MessageCryptoService revoke one of them.
991+
// Eventually, the rest of the peers should not be able to communicate with
992+
// the revoked peer at all because its identity would seem to them as expired
992993

993994
portPrefix := 7000
994995
g1 := newGossipInstance(portPrefix, 0, 100)
@@ -1016,9 +1017,6 @@ func TestRevocation(t *testing.T) {
10161017
continue
10171018
}
10181019
p.(*gossipServiceImpl).mcs.(*naiveCryptoService).revoke(revokedPkiID)
1019-
p.SuspectPeers(func(_ api.PeerIdentityType) bool {
1020-
return true
1021-
})
10221020
}
10231021
// Ensure that no one talks to the peer that is revoked
10241022
ensureRevokedPeerIsIgnored := func() bool {

0 commit comments

Comments
 (0)