Skip to content

Commit 821c9d8

Browse files
committed
[FAB-2007] External and internal endpoints III
This is the last commit of the series, in which we enforce publishing of external endpoints to peers of different orgs and publishing of both external and internal endpoints only within the peer's organization. The commit addesses this only in the gossip layer, as the discovery layer has been taken care of in previous commits. It contains a test in which we: - Create 2 orgs with 5 peers each, while the first 2 peers have external endpoints, and the rest don't. - The first org has a bootstrap peer, the second doesn't. - JoinChannel is invoked with the first peer of each org as anchor peers. - It is tested that: - Peers know peers from other orgs only if they both have external endpoints. - Peers do not know the internal endpoints of peers of other orgs. Change-Id: I157c8cea29b35adb84314fcb695413b005f2b236 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 24dc7d9 commit 821c9d8

File tree

2 files changed

+350
-19
lines changed

2 files changed

+350
-19
lines changed

gossip/gossip/gossip_impl.go

+112-19
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
119119

120120
g.discAdapter = g.newDiscoveryAdapter()
121121
g.disSecAdap = g.newDiscoverySecurityAdapter()
122-
123-
noopDisclosurePol := func(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter) {
124-
return func(msg *proto.SignedGossipMessage) bool {
125-
return true
126-
}, func(message *proto.SignedGossipMessage) *proto.Envelope {
127-
return message.Envelope
128-
}
129-
}
130-
g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, noopDisclosurePol)
122+
g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
131123
g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())
132124

133125
g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
@@ -148,6 +140,9 @@ func (g *gossipServiceImpl) selfNetworkMember() discovery.NetworkMember {
148140
Metadata: []byte{},
149141
InternalEndpoint: g.conf.InternalEndpoint,
150142
}
143+
if g.disc != nil {
144+
self.Metadata = g.disc.Self().Metadata
145+
}
151146
return self
152147
}
153148

@@ -197,6 +192,10 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
197192
}
198193

199194
inOurOrg := bytes.Equal(g.selfOrg, ap.OrgID)
195+
if !inOurOrg && g.selfNetworkMember().Endpoint == "" {
196+
g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(ap.OrgID))
197+
continue
198+
}
200199
g.disc.Connect(discovery.NetworkMember{
201200
InternalEndpoint: endpoint, Endpoint: endpoint}, inOurOrg)
202201
}
@@ -346,6 +345,7 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage) {
346345
defer func() { // can be closed while shutting down
347346
recover()
348347
}()
348+
349349
g.discAdapter.incChan <- msg.GetGossipMessage()
350350
}
351351

@@ -427,8 +427,16 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
427427
isAStateInfoMsg := func(o interface{}) bool {
428428
return o.(*proto.SignedGossipMessage).IsStateInfoMsg()
429429
}
430+
aliveMsgsWithNoEndpointAndInOurOrg := func(o interface{}) bool {
431+
msg := o.(*proto.SignedGossipMessage)
432+
if !msg.IsAliveMsg() {
433+
return false
434+
}
435+
member := msg.GetAliveMsg().Membership
436+
return member.Endpoint == "" && g.isInMyorg(discovery.NetworkMember{PKIid: member.PkiId})
437+
}
430438
isOrgRestricted := func(o interface{}) bool {
431-
return o.(*proto.SignedGossipMessage).IsOrgRestricted()
439+
return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted()
432440
}
433441
isLeadershipMsg := func(o interface{}) bool {
434442
return o.(*proto.SignedGossipMessage).IsLeadershipMsg()
@@ -462,7 +470,29 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
462470
// Finally, gossip the remaining messages
463471
peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership())
464472
for _, msg := range msgs {
465-
g.comm.Send(msg, peers2Send...)
473+
g.sendAndFilterSecrets(msg, peers2Send...)
474+
}
475+
}
476+
477+
func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
478+
for _, peer := range peers {
479+
// Prevent forwarding alive messages of external organizations
480+
// to peers that have no external endpoints
481+
remotePeerEndpoint := g.disc.Lookup(peer.PKIID)
482+
if remotePeerEndpoint == nil {
483+
g.logger.Warning("Peer", peer, "isn't in the membership anymore, will not send to it")
484+
continue
485+
}
486+
aliveMsgFromDiffOrg := msg.IsAliveMsg() && !g.isInMyorg(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
487+
if aliveMsgFromDiffOrg && remotePeerEndpoint.Endpoint == "" {
488+
continue
489+
}
490+
// Don't gossip secrets
491+
if !g.isInMyorg(discovery.NetworkMember{PKIid: peer.PKIID}) {
492+
msg.Envelope.SecretEnvelope = nil
493+
}
494+
495+
g.comm.Send(msg, peer)
466496
}
467497
}
468498

@@ -670,19 +700,21 @@ func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter {
670700
}
671701
g.emitter.Add(msg)
672702
},
673-
incChan: make(chan *proto.SignedGossipMessage),
674-
presumedDead: g.presumedDead,
703+
incChan: make(chan *proto.SignedGossipMessage),
704+
presumedDead: g.presumedDead,
705+
disclosurePolicy: g.disclosurePolicy,
675706
}
676707
}
677708

678709
// discoveryAdapter is used to supply the discovery module with needed abilities
679710
// that the comm interface in the discovery module declares
680711
type discoveryAdapter struct {
681-
stopping int32
682-
c comm.Comm
683-
presumedDead chan common.PKIidType
684-
incChan chan *proto.SignedGossipMessage
685-
gossipFunc func(message *proto.SignedGossipMessage)
712+
stopping int32
713+
c comm.Comm
714+
presumedDead chan common.PKIidType
715+
incChan chan *proto.SignedGossipMessage
716+
gossipFunc func(message *proto.SignedGossipMessage)
717+
disclosurePolicy discovery.DisclosurePolicy
686718
}
687719

688720
func (da *discoveryAdapter) close() {
@@ -698,13 +730,40 @@ func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage) {
698730
if da.toDie() {
699731
return
700732
}
733+
701734
da.gossipFunc(msg)
702735
}
703736

704737
func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage) {
705738
if da.toDie() {
706739
return
707740
}
741+
// Check membership requests for peers that we know of their PKI-ID.
742+
// The only peers we don't know about their PKI-IDs are bootstrap peers.
743+
if memReq := msg.GetMemReq(); memReq != nil && len(peer.PKIid) != 0 {
744+
selfMsg, err := memReq.SelfInformation.ToGossipMessage()
745+
if err != nil {
746+
// Shouldn't happen
747+
panic("Tried to send a membership request with a malformed AliveMessage")
748+
}
749+
// Apply the EnvelopeFilter of the disclosure policy
750+
// on the alive message of the selfInfo field of the membership request
751+
_, omitConcealedFields := da.disclosurePolicy(peer)
752+
selfMsg.Envelope = omitConcealedFields(selfMsg)
753+
// Backup old known field
754+
oldKnown := memReq.Known
755+
// Override new SelfInfo message with updated envelope
756+
memReq = &proto.MembershipRequest{
757+
SelfInformation: selfMsg.Envelope,
758+
Known: oldKnown,
759+
}
760+
// Update original message
761+
msg.Content = &proto.GossipMessage_MemReq{
762+
MemReq: memReq,
763+
}
764+
// Update the envelope of the outer message, no need to sign (point2point)
765+
msg = msg.NoopSign()
766+
}
708767
da.c.Send(msg, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
709768
}
710769

@@ -892,7 +951,6 @@ func (g *gossipServiceImpl) isInMyorg(member discovery.NetworkMember) bool {
892951
func (g *gossipServiceImpl) getOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
893952
cert, err := g.idMapper.Get(PKIID)
894953
if err != nil {
895-
g.logger.Error("Failed getting certificate by PKIid:", PKIID, ":", err)
896954
return nil
897955
}
898956

@@ -928,6 +986,41 @@ func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage)
928986
return msg.Verify(identity, verifier)
929987
}
930988

989+
func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter) {
990+
remotePeerOrg := g.getOrgOfPeer(remotePeer.PKIid)
991+
992+
if len(remotePeerOrg) == 0 {
993+
g.logger.Warning("Cannot determine organization of", remotePeer)
994+
return func(msg *proto.SignedGossipMessage) bool {
995+
return false
996+
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
997+
return msg.Envelope
998+
}
999+
}
1000+
1001+
return func(msg *proto.SignedGossipMessage) bool {
1002+
if !msg.IsAliveMsg() {
1003+
g.logger.Panic("Programming error, this should be used only on alive messages")
1004+
}
1005+
org := g.getOrgOfPeer(msg.GetAliveMsg().Membership.PkiId)
1006+
if len(org) == 0 {
1007+
// Panic here, because we are somehow trying to send an AliveMessage
1008+
// without having its matching identity beforehand, and the message
1009+
// should have never reached this far- but should've been dropped
1010+
// at signature validation.
1011+
g.logger.Panic("Unable to determine org of message", msg.GossipMessage)
1012+
}
1013+
// Pass the alive message only if the alive message is in the same org as the remote peer
1014+
// or the message has an external endpoint, and the remote peer also has one
1015+
return bytes.Equal(org, remotePeerOrg) || msg.GetAliveMsg().Membership.Endpoint != "" && remotePeer.Endpoint != ""
1016+
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
1017+
if !bytes.Equal(g.selfOrg, remotePeerOrg) {
1018+
msg.SecretEnvelope = nil
1019+
}
1020+
return msg.Envelope
1021+
}
1022+
}
1023+
9311024
// partitionMessages receives a predicate and a slice of gossip messages
9321025
// and returns a tuple of two slices: the messages that hold for the predicate
9331026
// and the rest

0 commit comments

Comments
 (0)