Skip to content

Commit b36a664

Browse files
committed
[FAB-2424] Enforce MSP channel validation in gossip
Before this commit, peers of an organization could publish assertions for having joined a channel, and it was enough for them to obtain blocks from peers in the same organization. This commit integrates gossip with the MSP method (VerifyByChannel) that consults the MSP policies whether a certain peer is indeed eligible for receiving blocks for the channel. I changed the gossip tests and the channel tests to mock the method, and also added a test in the channel test that checks that even when we query the membership of a channel, it also calls VerifyByChannel. I also had to make the MembershipStore thread safe for the channel Signed-off-by: Yacov Manevich <[email protected]> Change-Id: I0fccf98080d7e72d05e7e762244fe366e9f6e32a
1 parent 7134f9f commit b36a664

File tree

8 files changed

+241
-86
lines changed

8 files changed

+241
-86
lines changed

gossip/discovery/discovery_impl.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ type gossipDiscoveryImpl struct {
7575
deadLastTS map[string]*timestamp // H
7676
aliveLastTS map[string]*timestamp // V
7777
id2Member map[string]*NetworkMember // all known members
78-
aliveMembership util.MembershipStore
79-
deadMembership util.MembershipStore
78+
aliveMembership *util.MembershipStore
79+
deadMembership *util.MembershipStore
8080

8181
bootstrapPeers []string
8282

@@ -98,8 +98,8 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
9898
deadLastTS: make(map[string]*timestamp),
9999
aliveLastTS: make(map[string]*timestamp),
100100
id2Member: make(map[string]*NetworkMember),
101-
aliveMembership: make(util.MembershipStore, 0),
102-
deadMembership: make(util.MembershipStore, 0),
101+
aliveMembership: util.NewMembershipStore(),
102+
deadMembership: util.NewMembershipStore(),
103103
crypt: crypt,
104104
comm: comm,
105105
lock: &sync.RWMutex{},
@@ -208,7 +208,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
208208

209209
d.lock.RLock()
210210

211-
n := len(d.aliveMembership)
211+
n := d.aliveMembership.Size()
212212
k := peerNum
213213
if k > n {
214214
k = n

gossip/gossip/channel/channel.go

+60-31
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ type GossipChannel interface {
6262
// IsOrgInChannel returns whether the given organization is in the channel
6363
IsOrgInChannel(membersOrg api.OrgIdentityType) bool
6464

65-
// IsSubscribed returns whether the given member published
66-
// its participation in the channel
67-
IsSubscribed(member discovery.NetworkMember) bool
65+
// EligibleForChannel returns whether the given member should get blocks
66+
// for this channel
67+
EligibleForChannel(member discovery.NetworkMember) bool
6868

6969
// HandleMessage processes a message sent by a remote peer
7070
HandleMessage(proto.ReceivedMessage)
@@ -106,7 +106,11 @@ type Adapter interface {
106106
OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType
107107

108108
// GetOrgOfPeer returns the organization ID of a given peer PKI-ID
109-
GetOrgOfPeer(common.PKIidType) api.OrgIdentityType
109+
GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType
110+
111+
// GetIdentityByPKIID returns an identity of a peer with a certain
112+
// pkiID, or nil if not found
113+
GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType
110114
}
111115

112116
type gossipChannel struct {
@@ -119,7 +123,7 @@ type gossipChannel struct {
119123
orgs []api.OrgIdentityType
120124
joinMsg api.JoinChannelMessage
121125
blockMsgStore msgstore.MessageStore
122-
stateInfoMsgStore msgstore.MessageStore
126+
stateInfoMsgStore *stateInfoCache
123127
leaderMsgStore msgstore.MessageStore
124128
chainID common.ChainID
125129
blocksPuller pull.Mediator
@@ -138,7 +142,7 @@ type membershipFilter struct {
138142
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
139143
var members []discovery.NetworkMember
140144
for _, mem := range mf.adapter.GetMembership() {
141-
if mf.IsSubscribed(mem) {
145+
if mf.EligibleForChannel(mem) {
142146
members = append(members, mem)
143147
}
144148
}
@@ -166,7 +170,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap
166170
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
167171
})
168172

169-
gc.stateInfoMsgStore = NewStateInfoMessageStore()
173+
gc.stateInfoMsgStore = newStateInfoCache()
170174
gc.blocksPuller = gc.createBlockPuller()
171175
gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
172176

@@ -203,30 +207,23 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
203207
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
204208
members := []discovery.NetworkMember{}
205209

206-
pkiID2NetMember := make(map[string]discovery.NetworkMember)
207210
for _, member := range gc.GetMembership() {
208-
pkiID2NetMember[string(member.PKIid)] = member
209-
}
210-
211-
for _, o := range gc.stateInfoMsgStore.Get() {
212-
stateInf := o.(*proto.SignedGossipMessage).GetStateInfo()
213-
pkiID := stateInf.PkiID
214-
if member, exists := pkiID2NetMember[string(pkiID)]; !exists {
211+
if !gc.EligibleForChannel(member) {
215212
continue
216-
} else {
217-
member.Metadata = stateInf.Metadata
218-
members = append(members, member)
219213
}
214+
stateInf := gc.stateInfoMsgStore.MsgByID(member.PKIid)
215+
if stateInf == nil {
216+
continue
217+
}
218+
member.Metadata = stateInf.GetStateInfo().Metadata
219+
members = append(members, member)
220220
}
221221
return members
222222
}
223223

224224
func (gc *gossipChannel) requestStateInfo() {
225225
req := gc.createStateInfoRequest().NoopSign()
226-
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsSubscribed)
227-
if len(endpoints) == 0 {
228-
endpoints = filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
229-
}
226+
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
230227
gc.Send(req, endpoints...)
231228
}
232229

@@ -296,19 +293,20 @@ func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
296293
return false
297294
}
298295

299-
// IsSubscribed returns whether the given member published
300-
// its participation in the channel
301-
func (gc *gossipChannel) IsSubscribed(member discovery.NetworkMember) bool {
296+
// EligibleForChannel returns whether the given member should get blocks
297+
// for this channel
298+
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
302299
if !gc.IsMemberInChan(member) {
303300
return false
304301
}
305-
for _, o := range gc.stateInfoMsgStore.Get() {
306-
m, isMsg := o.(*proto.SignedGossipMessage)
307-
if isMsg && m.IsStateInfoMsg() && bytes.Equal(m.GetStateInfo().PkiID, member.PKIid) {
308-
return true
309-
}
302+
303+
identity := gc.GetIdentityByPKIID(member.PKIid)
304+
msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
305+
if msg == nil || identity == nil {
306+
return false
310307
}
311-
return false
308+
309+
return gc.mcs.VerifyByChannel(gc.chainID, identity, msg.Envelope.Signature, msg.Envelope.Payload) == nil
312310
}
313311

314312
// AddToMsgStore adds a given GossipMessage to the message store
@@ -412,6 +410,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
412410
return
413411
}
414412
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage {
413+
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetPKIID()}) {
414+
gc.logger.Warning(msg.GetPKIID(), "isn't eligible for channel", gc.chainID)
415+
return
416+
}
415417
if m.IsDataUpdate() {
416418
for _, item := range m.GetDataUpdate().Data {
417419
gMsg, err := item.ToGossipMessage()
@@ -563,3 +565,30 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
563565
func NewStateInfoMessageStore() msgstore.MessageStore {
564566
return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
565567
}
568+
569+
func newStateInfoCache() *stateInfoCache {
570+
return &stateInfoCache{
571+
MembershipStore: util.NewMembershipStore(),
572+
MessageStore: NewStateInfoMessageStore(),
573+
}
574+
}
575+
576+
// stateInfoCache is actually a messageStore
577+
// that also indexes messages that are added
578+
// so that they could be extracted later
579+
type stateInfoCache struct {
580+
*util.MembershipStore
581+
msgstore.MessageStore
582+
}
583+
584+
// Add attempts to add the given message to the stateInfoCache,
585+
// and if the message was added, also indexes it.
586+
// Message must be a StateInfo message.
587+
func (cache stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
588+
added := cache.MessageStore.Add(msg)
589+
pkiID := msg.GetStateInfo().PkiID
590+
if added {
591+
cache.MembershipStore.Put(pkiID, msg)
592+
}
593+
return added
594+
}

gossip/gossip/channel/channel_test.go

+71-11
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ limitations under the License.
1717
package channel
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"strconv"
2223
"sync"
2324
"sync/atomic"
2425
"testing"
2526
"time"
2627

27-
"errors"
28-
2928
"github.com/hyperledger/fabric/gossip/api"
3029
"github.com/hyperledger/fabric/gossip/comm"
3130
"github.com/hyperledger/fabric/gossip/common"
@@ -57,11 +56,12 @@ func init() {
5756
var (
5857
// Organizations: {ORG1, ORG2}
5958
// Channel A: {ORG1}
60-
channelA = common.ChainID("A")
61-
orgInChannelA = api.OrgIdentityType("ORG1")
62-
orgNotInChannelA = api.OrgIdentityType("ORG2")
63-
pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1")
64-
pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2")
59+
channelA = common.ChainID("A")
60+
orgInChannelA = api.OrgIdentityType("ORG1")
61+
orgNotInChannelA = api.OrgIdentityType("ORG2")
62+
pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1")
63+
pkiIDInOrg1ButNotEligible = common.PKIidType("pkiIDInOrg1ButNotEligible")
64+
pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2")
6565
)
6666

6767
type joinChanMsg struct {
@@ -88,15 +88,20 @@ func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer {
8888
}
8989

9090
type cryptoService struct {
91+
mocked bool
9192
mock.Mock
9293
}
9394

9495
func (cs *cryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
9596
panic("Should not be called in this test")
9697
}
9798

98-
func (cs *cryptoService) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityType, _, _ []byte) error {
99-
panic("Should not be called in this test")
99+
func (cs *cryptoService) VerifyByChannel(channel common.ChainID, identity api.PeerIdentityType, _, _ []byte) error {
100+
if !cs.mocked {
101+
return nil
102+
}
103+
args := cs.Called(identity)
104+
return args.Get(0).(error)
100105
}
101106

102107
func (cs *cryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
@@ -199,10 +204,15 @@ func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdenti
199204
return args.Get(0).(api.OrgIdentityType)
200205
}
201206

207+
func (ga *gossipAdapterMock) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
208+
return api.PeerIdentityType(pkiID)
209+
}
210+
202211
func configureAdapter(adapter *gossipAdapterMock, members ...discovery.NetworkMember) {
203212
adapter.On("GetConf").Return(conf)
204213
adapter.On("GetMembership").Return(members)
205214
adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA)
215+
adapter.On("GetOrgOfPeer", pkiIDInOrg1ButNotEligible).Return(orgInChannelA)
206216
adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA)
207217
adapter.On("GetOrgOfPeer", mock.Anything).Return(api.OrgIdentityType(nil))
208218
}
@@ -312,6 +322,8 @@ func TestChannelPeerNotInChannel(t *testing.T) {
312322
gossipMessagesSentFromChannel <- msg
313323
}
314324
// First, ensure it does that for pull messages from peers that are in the channel
325+
// Let the peer first publish it is in the channel
326+
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
315327
helloMsg := createHelloMsg(pkiIDInOrg1)
316328
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
317329
gc.HandleMessage(helloMsg)
@@ -330,6 +342,23 @@ func TestChannelPeerNotInChannel(t *testing.T) {
330342
case <-time.After(time.Second * 1):
331343
}
332344

345+
// Now for a more advanced scenario- the peer claims to be in the right org, and also claims to be in the channel
346+
// but the MSP declares it is not eligible for the channel
347+
// pkiIDInOrg1ButNotEligible
348+
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
349+
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
350+
cs.mocked = true
351+
helloMsg = createHelloMsg(pkiIDInOrg1ButNotEligible)
352+
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
353+
gc.HandleMessage(helloMsg)
354+
select {
355+
case <-gossipMessagesSentFromChannel:
356+
t.Fatal("Responded with digest, but shouldn't have since peer is not eligible for the channel")
357+
case <-time.After(time.Second * 1):
358+
}
359+
360+
cs.Mock = mock.Mock{}
361+
333362
// Ensure we respond to a valid StateInfoRequest
334363
req := gc.(*gossipChannel).createStateInfoRequest()
335364
validReceivedMsg := &receivedMsg{
@@ -404,7 +433,7 @@ func TestChannelIsSubscribed(t *testing.T) {
404433
adapter.On("Send", mock.Anything, mock.Anything)
405434
adapter.On("DeMultiplex", mock.Anything)
406435
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
407-
assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
436+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
408437
}
409438

410439
func TestChannelAddToMessageStore(t *testing.T) {
@@ -456,7 +485,7 @@ func TestChannelAddToMessageStore(t *testing.T) {
456485
}
457486

458487
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
459-
assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
488+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
460489
}
461490

462491
func TestChannelBadBlocks(t *testing.T) {
@@ -782,7 +811,38 @@ func TestChannelReconfigureChannel(t *testing.T) {
782811
t.Fatal("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel")
783812
case <-time.After(time.Second * 1):
784813
}
814+
}
815+
816+
func TestChannelGetPeers(t *testing.T) {
817+
t.Parallel()
785818

819+
// Scenario: We have a peer in an org, and the peer is notified that several peers
820+
// exist, and some of them:
821+
// (1) Join its channel, and are eligible for receiving blocks.
822+
// (2) Join its channel, but are not eligible for receiving blocks (MSP doesn't allow this).
823+
// (3) Say they join its channel, but are actually from an org that is not in the channel.
824+
// The GetPeers query should only return peers that belong to the first group.
825+
cs := &cryptoService{}
826+
adapter := new(gossipAdapterMock)
827+
adapter.On("Gossip", mock.Anything)
828+
adapter.On("Send", mock.Anything, mock.Anything)
829+
adapter.On("DeMultiplex", mock.Anything)
830+
members := []discovery.NetworkMember{
831+
{PKIid: pkiIDInOrg1},
832+
{PKIid: pkiIDInOrg1ButNotEligible},
833+
{PKIid: pkiIDinOrg2},
834+
}
835+
configureAdapter(adapter, members...)
836+
gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{})
837+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
838+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
839+
assert.Len(t, gc.GetPeers(), 1)
840+
assert.Equal(t, pkiIDInOrg1, gc.GetPeers()[0].PKIid)
841+
842+
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
843+
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
844+
cs.mocked = true
845+
assert.Len(t, gc.GetPeers(), 0)
786846
}
787847

788848
func createDataUpdateMsg(nonce uint64) *proto.SignedGossipMessage {

gossip/gossip/chanstate.go

+10
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,13 @@ func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) ap
113113
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
114114
return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
115115
}
116+
117+
// GetIdentityByPKIID returns an identity of a peer with a certain
118+
// pkiID, or nil if not found
119+
func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
120+
identity, err := ga.idMapper.Get(pkiID)
121+
if err != nil {
122+
return nil
123+
}
124+
return identity
125+
}

gossip/gossip/gossip_impl.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
433433
// Gossip blocks
434434
blocks, msgs = partitionMessages(isABlock, msgs)
435435
g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter {
436-
return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg)
436+
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
437437
})
438438

439439
// Gossip StateInfo messages
@@ -445,7 +445,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
445445
// Gossip Leadership messages
446446
leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
447447
g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
448-
return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg)
448+
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
449449
})
450450

451451
// Gossip messages restricted to our org

0 commit comments

Comments
 (0)