Skip to content

Commit 9ff8fc4

Browse files
committed
[FAB-3744] Gossip: only pull from peers in the same org
Currently, the pull mechanism works with peers from any orgs that are eligible of being in the channel. This has the following problem: If in a certain channel there are 2 orgs: {A, B} and a peer from orgB initiates a pull with a peer from orgA and as a result, the peer from orgA sends the peer from orgB blocks with sequences [n... n+k] it has received from either the ordering service or from peers in its own org, it is not safe because a block i in [n.. n+k] can be a configuration block that evicts orgB from the channel, and as a result, orgB would receive blocks it isn't eligible of receiving. I fixed this by checking that the org is the same org as the peer and also added a test Change-Id: I348a22334a0751bb09a5f962ddfd08d516c12f30 Signed-off-by: Yacov Manevich <[email protected]>
1 parent e76aa71 commit 9ff8fc4

File tree

2 files changed

+101
-3
lines changed

2 files changed

+101
-3
lines changed

gossip/gossip/channel/channel.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ type membershipFilter struct {
149149
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
150150
var members []discovery.NetworkMember
151151
for _, mem := range mf.adapter.GetMembership() {
152-
if mf.EligibleForChannel(mem) {
152+
if mf.eligibleForChannelAndSameOrg(mem) {
153153
members = append(members, mem)
154154
}
155155
}
@@ -252,6 +252,13 @@ func (gc *gossipChannel) requestStateInfo() {
252252
gc.Send(req, endpoints...)
253253
}
254254

255+
func (gc *gossipChannel) eligibleForChannelAndSameOrg(member discovery.NetworkMember) bool {
256+
sameOrg := func(networkMember discovery.NetworkMember) bool {
257+
return bytes.Equal(gc.GetOrgOfPeer(networkMember.PKIid), gc.selfOrg)
258+
}
259+
return filter.CombineRoutingFilters(gc.EligibleForChannel, sameOrg)(member)
260+
}
261+
255262
func (gc *gossipChannel) publishStateInfo() {
256263
if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
257264
return
@@ -430,8 +437,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
430437
return
431438
}
432439
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
433-
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
434-
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for channel", string(gc.chainID))
440+
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
441+
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for pulling blocks of", string(gc.chainID))
435442
return
436443
}
437444
if m.IsDataUpdate() {

gossip/gossip/channel/channel_test.go

+91
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,97 @@ func TestChannelPull(t *testing.T) {
445445
}
446446
}
447447

448+
func TestChannelPullAccessControl(t *testing.T) {
449+
t.Parallel()
450+
// Scenario: We have 2 organizations in the channel: ORG1, ORG2
451+
// The "acting peer" is from ORG1 and peers "1", "2", "3" are from
452+
// the following organizations:
453+
// ORG1: "1"
454+
// ORG2: "2", "3"
455+
// We test 2 cases:
456+
// 1) We don't respond for Hello messages from peers in foreign organizations
457+
// 2) We don't select peers from foreign organizations when doing pull
458+
459+
cs := &cryptoService{}
460+
adapter := new(gossipAdapterMock)
461+
cs.Mock = mock.Mock{}
462+
cs.On("VerifyBlock", mock.Anything).Return(nil)
463+
464+
pkiID1 := common.PKIidType("1")
465+
pkiID2 := common.PKIidType("2")
466+
pkiID3 := common.PKIidType("3")
467+
468+
peer1 := discovery.NetworkMember{PKIid: pkiID1, InternalEndpoint: "1", Endpoint: "1"}
469+
peer2 := discovery.NetworkMember{PKIid: pkiID2, InternalEndpoint: "2", Endpoint: "2"}
470+
peer3 := discovery.NetworkMember{PKIid: pkiID3, InternalEndpoint: "3", Endpoint: "3"}
471+
472+
adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(api.OrgIdentityType("ORG1"))
473+
adapter.On("GetOrgOfPeer", pkiID1).Return(api.OrgIdentityType("ORG1"))
474+
adapter.On("GetOrgOfPeer", pkiID2).Return(api.OrgIdentityType("ORG2"))
475+
adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2"))
476+
477+
adapter.On("GetMembership").Return([]discovery.NetworkMember{peer1, peer2, peer3})
478+
adapter.On("DeMultiplex", mock.Anything)
479+
adapter.On("Gossip", mock.Anything)
480+
adapter.On("GetConf").Return(conf)
481+
482+
sentHello := int32(0)
483+
adapter.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
484+
msg := arg.Get(0).(*proto.SignedGossipMessage)
485+
if !msg.IsHelloMsg() {
486+
return
487+
}
488+
atomic.StoreInt32(&sentHello, int32(1))
489+
peerID := string(arg.Get(1).([]*comm.RemotePeer)[0].PKIID)
490+
assert.Equal(t, "1", peerID)
491+
assert.NotEqual(t, "2", peerID, "Sent hello to peer 2 but it's in a different org")
492+
assert.NotEqual(t, "3", peerID, "Sent hello to peer 3 but it's in a different org")
493+
})
494+
495+
jcm := &joinChanMsg{
496+
members2AnchorPeers: map[string][]api.AnchorPeer{
497+
"ORG1": {},
498+
"ORG2": {},
499+
},
500+
}
501+
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, jcm)
502+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})
503+
gc.HandleMessage(&receivedMsg{PKIID: pkiID1, msg: createStateInfoMsg(100, pkiID1, channelA)})
504+
gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(100, pkiID2, channelA)})
505+
gc.HandleMessage(&receivedMsg{PKIID: pkiID3, msg: createStateInfoMsg(100, pkiID3, channelA)})
506+
507+
respondedChan := make(chan *proto.GossipMessage, 1)
508+
messageRelayer := func(arg mock.Arguments) {
509+
msg := arg.Get(0).(*proto.GossipMessage)
510+
respondedChan <- msg
511+
}
512+
513+
gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(5, channelA), PKIID: pkiIDInOrg1})
514+
515+
helloMsg := createHelloMsg(pkiID1)
516+
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
517+
go gc.HandleMessage(helloMsg)
518+
select {
519+
case <-respondedChan:
520+
case <-time.After(time.Second):
521+
assert.Fail(t, "Didn't reply to a hello within a timely manner")
522+
}
523+
524+
helloMsg = createHelloMsg(pkiID2)
525+
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
526+
go gc.HandleMessage(helloMsg)
527+
select {
528+
case <-respondedChan:
529+
assert.Fail(t, "Shouldn't have replied to a hello, because the peer is from a foreign org")
530+
case <-time.After(time.Second):
531+
}
532+
533+
// Sleep a bit to let the gossip channel send out its hello messages
534+
time.Sleep(time.Second * 3)
535+
// Make sure we sent at least 1 hello message, otherwise the test passed vacuously
536+
assert.Equal(t, int32(1), atomic.LoadInt32(&sentHello))
537+
}
538+
448539
func TestChannelPeerNotInChannel(t *testing.T) {
449540
t.Parallel()
450541

0 commit comments

Comments
 (0)