Skip to content

Commit 077126e

Browse files
committed
[FAB-2061] Gossip inter-org confidentiality - P1
This change set makes every 2 orgs (that share a channel), gossip only alive messages that that originated in one of the two organizations. The motivation is to prevent a third organization from knowing that the two orgs share a common channel (unless of course, they all share it). So, if we have the following situation: Channel C0: { orgA, orgB } Channel C1: { orgB, orgC } [ A ]--C0--[ B ]--C1--[ C ] Then orgA has no idea of orgC's existence and vice-versa. But, also if we have the following situation: Channel C0: { orgA, orgB } Channel C1: { orgA, orgC } Channel C2: { orgB, orgC } [ A ]-C0-[ B ] | / | / C1 C2 | / | / [ C ] Then, alive message from orgA, are never sent to orgB via orgC (and the same for orgB, orgC) Or, in other words: an Alive message m about a peer in org X is sent to org Y, only if X is either in X or in Y. In the next change set I'll do the same for identity messages (messages that contain certificates) Change-Id: I816e6b23050d320f8b99a64b5c84d619de54aaa8 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 7f336b9 commit 077126e

File tree

3 files changed

+328
-9
lines changed

3 files changed

+328
-9
lines changed

gossip/filter/filter.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ import (
2727
// selected for be given a message
2828
type RoutingFilter func(discovery.NetworkMember) bool
2929

30+
// SelectNonePolicy selects an empty set of members
31+
var SelectNonePolicy = func(discovery.NetworkMember) bool {
32+
return false
33+
}
34+
35+
// SelectAllPolicy selects all members given
36+
var SelectAllPolicy = func(discovery.NetworkMember) bool {
37+
return true
38+
}
39+
3040
// CombineRoutingFilters returns the logical AND of given routing filters
3141
func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter {
3242
return func(member discovery.NetworkMember) bool {
@@ -39,11 +49,11 @@ func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter {
3949
}
4050
}
4151

42-
// SelectPeers returns a slice of peers that match a list of routing filters
43-
func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFilter) []*comm.RemotePeer {
52+
// SelectPeers returns a slice of peers that match the routing filter
53+
func SelectPeers(k int, peerPool []discovery.NetworkMember, filter RoutingFilter) []*comm.RemotePeer {
4454
var filteredPeers []*comm.RemotePeer
4555
for _, peer := range peerPool {
46-
if CombineRoutingFilters(filters...)(peer) {
56+
if filter(peer) {
4757
filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
4858
}
4959
}

gossip/gossip/gossip_impl.go

+47-6
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,13 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
483483
}
484484

485485
// Finally, gossip the remaining messages
486-
peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership())
487486
for _, msg := range msgs {
487+
if !msg.IsAliveMsg() {
488+
g.logger.Error("Unknown message type", msg)
489+
continue
490+
}
491+
selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
492+
peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg)
488493
g.sendAndFilterSecrets(msg, peers2Send...)
489494
}
490495
}
@@ -1021,12 +1026,20 @@ func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember
10211026
}
10221027
org := g.getOrgOfPeer(msg.GetAliveMsg().Membership.PkiId)
10231028
if len(org) == 0 {
1024-
// Panic here, because we are somehow trying to send an AliveMessage
1025-
// without having its matching identity beforehand, and the message
1026-
// should have never reached this far- but should've been dropped
1027-
// at signature validation.
1028-
g.logger.Panic("Unable to determine org of message", msg.GossipMessage)
1029+
g.logger.Warning("Unable to determine org of message", msg.GossipMessage)
1030+
// Don't disseminate messages who's origin org is unknown
1031+
return false
10291032
}
1033+
1034+
// Target org and the message are from the same org
1035+
fromSameForeignOrg := bytes.Equal(remotePeerOrg, org)
1036+
// The message is from my org
1037+
fromMyOrg := bytes.Equal(g.selfOrg, org)
1038+
// Forward to target org only messages from our org, or from the target org itself.
1039+
if !(fromSameForeignOrg || fromMyOrg) {
1040+
return false
1041+
}
1042+
10301043
// Pass the alive message only if the alive message is in the same org as the remote peer
10311044
// or the message has an external endpoint, and the remote peer also has one
10321045
return bytes.Equal(org, remotePeerOrg) || msg.GetAliveMsg().Membership.Endpoint != "" && remotePeer.Endpoint != ""
@@ -1038,6 +1051,34 @@ func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember
10381051
}
10391052
}
10401053

1054+
func (g *gossipServiceImpl) peersByOriginOrgPolicy(peer discovery.NetworkMember) filter.RoutingFilter {
1055+
peersOrg := g.getOrgOfPeer(peer.PKIid)
1056+
if len(peersOrg) == 0 {
1057+
g.logger.Warning("Unable to determine organization of peer", peer)
1058+
// Don't disseminate messages who's origin org is undetermined
1059+
return filter.SelectNonePolicy
1060+
}
1061+
1062+
if bytes.Equal(g.selfOrg, peersOrg) {
1063+
// Disseminate messages from our org to all known organizations.
1064+
// IMPORTANT: Currently a peer cannot un-join a channel, so the only way
1065+
// of making gossip stop talking to an organization is by having the MSP
1066+
// refuse validating messages from it.
1067+
return filter.SelectAllPolicy
1068+
}
1069+
1070+
// Else, select peers from the origin's organization,
1071+
// and also peers from our own organization
1072+
return func(member discovery.NetworkMember) bool {
1073+
memberOrg := g.getOrgOfPeer(member.PKIid)
1074+
if len(memberOrg) == 0 {
1075+
return false
1076+
}
1077+
isFromMyOrg := bytes.Equal(g.selfOrg, memberOrg)
1078+
return isFromMyOrg || bytes.Equal(memberOrg, peersOrg)
1079+
}
1080+
}
1081+
10411082
// partitionMessages receives a predicate and a slice of gossip messages
10421083
// and returns a tuple of two slices: the messages that hold for the predicate
10431084
// and the rest

gossip/gossip/orgs_test.go

+268
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ import (
2222
"testing"
2323
"time"
2424

25+
"sync"
26+
"sync/atomic"
27+
2528
"github.com/hyperledger/fabric/bccsp/factory"
2629
"github.com/hyperledger/fabric/gossip/api"
2730
"github.com/hyperledger/fabric/gossip/common"
2831
"github.com/hyperledger/fabric/gossip/discovery"
2932
"github.com/hyperledger/fabric/gossip/identity"
33+
proto "github.com/hyperledger/fabric/protos/gossip"
3034
"github.com/stretchr/testify/assert"
3135
)
3236

@@ -241,3 +245,267 @@ func TestMultipleOrgEndpointLeakage(t *testing.T) {
241245
}
242246
}
243247
}
248+
249+
func TestConfidentiality(t *testing.T) {
250+
t.Parallel()
251+
// Scenario: create 4 organizations: {A, B, C, D}, each with 3 peers.
252+
// Make only the first 2 peers have an external endpoint.
253+
// Also, add the peers to the following channels:
254+
// Channel C0: { orgA, orgB }
255+
// Channel C1: { orgA, orgC }
256+
// Channel C2: { orgB, orgC }
257+
// Channel C3: { orgB, orgD }
258+
// [ A ]-C0-[ B ]-C3-[ D ]
259+
// | /
260+
// | /
261+
// C1 C2
262+
// | /
263+
// | /
264+
// [ C ]
265+
// Subscribe to all membership messages for each peer,
266+
// and fail the test if a message is sent to a peer in org X,
267+
// from a peer in org Y about a peer in org Z not in {X, Y}
268+
// or if any org other than orgB knows peers in orgD (and vice versa).
269+
270+
portPrefix := 12610
271+
peersInOrg := 3
272+
externalEndpointsInOrg := 2
273+
274+
orgs := []string{"A", "B", "C", "D"}
275+
channels := []string{"C0", "C1", "C2", "C3"}
276+
isOrgInChan := func(org string, channel string) bool {
277+
switch org {
278+
case "A":
279+
return channel == "C0" || channel == "C1"
280+
case "B":
281+
return channel == "C0" || channel == "C2" || channel == "C3"
282+
case "C":
283+
return channel == "C1" || channel == "C2"
284+
case "D":
285+
return channel == "C3"
286+
}
287+
288+
return false
289+
}
290+
291+
// Create the message crypto service
292+
cs := &configurableCryptoService{m: make(map[string]api.OrgIdentityType)}
293+
for i, org := range orgs {
294+
for j := 0; j < peersInOrg; j++ {
295+
port := portPrefix + i*peersInOrg + j
296+
cs.putInOrg(port, org)
297+
}
298+
}
299+
300+
var peers []Gossip
301+
orgs2Peers := map[string][]Gossip{
302+
"A": {},
303+
"B": {},
304+
"C": {},
305+
"D": {},
306+
}
307+
308+
anchorPeersByOrg := map[string]api.AnchorPeer{}
309+
310+
for i, org := range orgs {
311+
for j := 0; j < peersInOrg; j++ {
312+
id := i*peersInOrg + j
313+
port := id + portPrefix
314+
endpoint := fmt.Sprintf("localhost:%d", port)
315+
externalEndpoint := ""
316+
if j < externalEndpointsInOrg { // The first peers of each org would have an external endpoint
317+
externalEndpoint = endpoint
318+
}
319+
peer := newGossipInstanceWithExternalEndpoint(portPrefix, id, cs, externalEndpoint)
320+
peers = append(peers, peer)
321+
orgs2Peers[org] = append(orgs2Peers[org], peer)
322+
// The first peer of the org will be used as the anchor peer
323+
if j == 0 {
324+
anchorPeersByOrg[org] = api.AnchorPeer{
325+
Host: "localhost",
326+
Port: port,
327+
}
328+
}
329+
}
330+
}
331+
332+
msgs2Inspect := make(chan *msg, 3000)
333+
defer close(msgs2Inspect)
334+
go inspectMsgs(t, msgs2Inspect, cs)
335+
finished := int32(0)
336+
var wg sync.WaitGroup
337+
338+
membershipMsgs := func(o interface{}) bool {
339+
msg := o.(proto.ReceivedMessage).GetGossipMessage()
340+
return msg.IsAliveMsg() || msg.GetMemRes() != nil
341+
}
342+
// Listen to all peers membership messages and forward them to the inspection channel
343+
// where they will be inspected, and the test would fail if a confidentiality violation is found
344+
for _, p := range peers {
345+
wg.Add(1)
346+
_, msgs := p.Accept(membershipMsgs, true)
347+
peerNetMember := p.(*gossipServiceImpl).selfNetworkMember()
348+
targetORg := string(cs.OrgByPeerIdentity(api.PeerIdentityType(peerNetMember.InternalEndpoint)))
349+
go func(targetOrg string, msgs <-chan proto.ReceivedMessage) {
350+
defer wg.Done()
351+
for receivedMsg := range msgs {
352+
m := &msg{
353+
src: string(cs.OrgByPeerIdentity(receivedMsg.GetConnectionInfo().Identity)),
354+
dst: targetORg,
355+
GossipMessage: receivedMsg.GetGossipMessage().GossipMessage,
356+
}
357+
if atomic.LoadInt32(&finished) == int32(1) {
358+
return
359+
}
360+
msgs2Inspect <- m
361+
}
362+
}(targetORg, msgs)
363+
}
364+
365+
// Now, construct the join channel messages
366+
joinChanMsgsByChan := map[string]*joinChanMsg{}
367+
for _, ch := range channels {
368+
jcm := &joinChanMsg{members2AnchorPeers: map[string][]api.AnchorPeer{}}
369+
for _, org := range orgs {
370+
if isOrgInChan(org, ch) {
371+
jcm.members2AnchorPeers[org] = append(jcm.members2AnchorPeers[org], anchorPeersByOrg[org])
372+
}
373+
}
374+
joinChanMsgsByChan[ch] = jcm
375+
}
376+
377+
// Next, make the peers join the channels
378+
for org, peers := range orgs2Peers {
379+
for _, ch := range channels {
380+
if isOrgInChan(org, ch) {
381+
for _, p := range peers {
382+
p.JoinChan(joinChanMsgsByChan[ch], common.ChainID(ch))
383+
}
384+
}
385+
}
386+
}
387+
388+
assertMembership := func() bool {
389+
for _, org := range orgs {
390+
for i, p := range orgs2Peers[org] {
391+
members := p.Peers()
392+
expMemberSize := expectedMembershipSize(peersInOrg, externalEndpointsInOrg, org, i < externalEndpointsInOrg)
393+
peerNetMember := p.(*gossipServiceImpl).selfNetworkMember()
394+
membersCount := len(members)
395+
if membersCount < expMemberSize {
396+
return false
397+
}
398+
// Make sure no one knows too much
399+
assert.True(t, membersCount <= expMemberSize, "%s knows too much (%d > %d) peers: %v",
400+
membersCount, expMemberSize, peerNetMember.PKIid, members)
401+
}
402+
}
403+
return true
404+
}
405+
406+
waitUntilOrFail(t, assertMembership)
407+
stopPeers(peers)
408+
wg.Wait()
409+
atomic.StoreInt32(&finished, int32(1))
410+
}
411+
412+
func expectedMembershipSize(peersInOrg, externalEndpointsInOrg int, org string, hasExternalEndpoint bool) int {
413+
// x <-- peersInOrg
414+
// y <-- externalEndpointsInOrg
415+
// (x+2y)[ A ]-C0-[ B ]---C3--[ D ] (x+y)
416+
// | /(x+3y)
417+
// | /
418+
// C1 C2
419+
// | /
420+
// | /
421+
// [ C ] x+2y
422+
423+
m := map[string]func(x, y int) int{
424+
"A": func(x, y int) int {
425+
return x + 2*y
426+
},
427+
"B": func(x, y int) int {
428+
return x + 3*y
429+
},
430+
"C": func(x, y int) int {
431+
return x + 2*y
432+
},
433+
"D": func(x, y int) int {
434+
return x + y
435+
},
436+
}
437+
438+
// If the peer doesn't have an external endpoint,
439+
// it doesn't know peers from foreign organizations that have one
440+
if !hasExternalEndpoint {
441+
externalEndpointsInOrg = 0
442+
}
443+
// Deduct 1 because the peer itself doesn't count
444+
return m[org](peersInOrg, externalEndpointsInOrg) - 1
445+
}
446+
447+
func extractOrgsFromMsg(msg *proto.GossipMessage, sec api.SecurityAdvisor) []string {
448+
if msg.IsAliveMsg() {
449+
return []string{string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))}
450+
}
451+
orgs := map[string]struct{}{}
452+
alive := msg.GetMemRes().Alive
453+
dead := msg.GetMemRes().Dead
454+
for _, envp := range append(alive, dead...) {
455+
msg, _ := envp.ToGossipMessage()
456+
orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{}
457+
}
458+
res := []string{}
459+
for org := range orgs {
460+
res = append(res, org)
461+
}
462+
return res
463+
}
464+
465+
func inspectMsgs(t *testing.T, msgChan chan *msg, sec api.SecurityAdvisor) {
466+
for msg := range msgChan {
467+
// If the destination org is the same as the source org,
468+
// the message can contain any organizations
469+
if msg.src == msg.dst {
470+
continue
471+
}
472+
// Else, it's a cross-organizational message.
473+
// Denote src organization as s and dst organization as d.
474+
// The total organizations of the message must be a subset of s U d.
475+
orgs := extractOrgsFromMsg(msg.GossipMessage, sec)
476+
s := []string{msg.src, msg.dst}
477+
assert.True(t, isSubset(orgs, s), "%v isn't a subset of %v", orgs, s)
478+
479+
// Ensure no one but B knows about D and vice versa
480+
if msg.dst == "D" {
481+
assert.NotContains(t, "A", orgs)
482+
assert.NotContains(t, "C", orgs)
483+
}
484+
485+
if msg.dst == "A" || msg.dst == "C" {
486+
assert.NotContains(t, "D", orgs)
487+
}
488+
}
489+
}
490+
491+
type msg struct {
492+
src string
493+
dst string
494+
*proto.GossipMessage
495+
}
496+
497+
func isSubset(a []string, b []string) bool {
498+
for _, s1 := range a {
499+
found := false
500+
for _, s2 := range b {
501+
if s1 == s2 {
502+
found = true
503+
break
504+
}
505+
}
506+
if !found {
507+
return false
508+
}
509+
}
510+
return true
511+
}

0 commit comments

Comments
 (0)