Skip to content

Commit 3177af1

Browse files
committed
[FAB-4302] Harden gossip bootstrap peer connection
Full details and introduction is in the JIRA item. Gossip should be protect against mis-configurations that have security implications, and therefore I suggest to make the bootstrap peer connection to use the same mechanism as the anchor peer connection - when a peer connects to an anchor peer, it first performs a handshake (which is authenticated), and learns about the organization of the peer behind the endpoint and only then decides whether to send its internal endpoint or not. Change-Id: Ieee1abde85f8e22c4ff6adad16dcd6fdab47b57f Signed-off-by: Yacov Manevich <[email protected]>
1 parent 670be92 commit 3177af1

File tree

5 files changed

+139
-75
lines changed

5 files changed

+139
-75
lines changed

gossip/discovery/discovery_impl.go

+35-55
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package discovery
1919
import (
2020
"bytes"
2121
"fmt"
22+
"math"
2223
"strconv"
2324
"strings"
2425
"sync"
@@ -94,13 +95,14 @@ type gossipDiscoveryImpl struct {
9495

9596
toDieChan chan struct{}
9697
toDieFlag int32
98+
port int
9799
logger *logging.Logger
98100
disclosurePolicy DisclosurePolicy
99101
pubsub *util.PubSub
100102
}
101103

102104
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
103-
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery {
105+
func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery {
104106
d := &gossipDiscoveryImpl{
105107
self: self,
106108
incTime: uint64(time.Now().UnixNano()),
@@ -120,6 +122,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
120122
pubsub: util.NewPubSub(),
121123
}
122124

125+
d.validateSelfConfig()
123126
d.msgStore = newAliveMsgStore(d)
124127

125128
go d.periodicalSendAlive()
@@ -128,8 +131,6 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
128131
go d.periodicalReconnectToDead()
129132
go d.handlePresumedDeadPeers()
130133

131-
go d.connect2BootstrapPeers(bootstrapPeers)
132-
133134
d.logger.Info("Started", self, "incTime is", d.incTime)
134135

135136
return d
@@ -147,6 +148,13 @@ func (d *gossipDiscoveryImpl) Lookup(PKIID common.PKIidType) *NetworkMember {
147148
}
148149

149150
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
151+
for _, endpoint := range []string{member.InternalEndpoint, member.Endpoint} {
152+
if d.isMyOwnEndpoint(endpoint) {
153+
d.logger.Debug("Skipping connecting to myself")
154+
return
155+
}
156+
}
157+
150158
d.logger.Debug("Entering", member)
151159
defer d.logger.Debug("Exiting")
152160
go func() {
@@ -175,58 +183,41 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
175183
}()
176184
}
177185

178-
func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) {
179-
nonce := message.Nonce
180-
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
181-
sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5)
182-
d.comm.SendToPeer(peer, message)
183-
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
184-
return
185-
}
186-
time.Sleep(getReconnectInterval())
187-
}
186+
func (d *gossipDiscoveryImpl) isMyOwnEndpoint(endpoint string) bool {
187+
return endpoint == fmt.Sprintf("127.0.0.1:%d", d.port) || endpoint == fmt.Sprintf("localhost:%d", d.port) ||
188+
endpoint == d.self.InternalEndpoint || endpoint == d.self.Endpoint
188189
}
189190

190-
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
191-
if len(d.self.InternalEndpoint) == 0 {
192-
d.logger.Panic("Internal endpoint is empty:", d.self.InternalEndpoint)
191+
func (d *gossipDiscoveryImpl) validateSelfConfig() {
192+
endpoint := d.self.InternalEndpoint
193+
if len(endpoint) == 0 {
194+
d.logger.Panic("Internal endpoint is empty:", endpoint)
193195
}
194196

195-
if len(strings.Split(d.self.InternalEndpoint, ":")) != 2 {
196-
d.logger.Panicf("Self endpoint %s isn't formatted as 'host:port'", d.self.InternalEndpoint)
197+
internalEndpointSplit := strings.Split(endpoint, ":")
198+
if len(internalEndpointSplit) != 2 {
199+
d.logger.Panicf("Self endpoint %s isn't formatted as 'host:port'", endpoint)
197200
}
198-
199-
myPort, err := strconv.ParseInt(strings.Split(d.self.InternalEndpoint, ":")[1], 10, 64)
201+
myPort, err := strconv.ParseInt(internalEndpointSplit[1], 10, 64)
200202
if err != nil {
201-
d.logger.Panicf("Self endpoint %s has not valid port'", d.self.InternalEndpoint)
203+
d.logger.Panicf("Self endpoint %s has not valid port'", endpoint)
202204
}
203205

204-
d.logger.Info("Entering:", endpoints)
205-
defer d.logger.Info("Exiting")
206-
endpoints = filterOutLocalhost(endpoints, int(myPort))
207-
if len(endpoints) == 0 {
208-
return
206+
if myPort > int64(math.MaxUint16) {
207+
d.logger.Panicf("Self endpoint %s's port takes more than 16 bits", endpoint)
209208
}
210209

211-
for i := 0; i < maxConnectionAttempts && !d.somePeerIsKnown() && !d.toDie(); i++ {
212-
var wg sync.WaitGroup
213-
req := d.createMembershipRequest(true).NoopSign()
214-
wg.Add(len(endpoints))
215-
for _, endpoint := range endpoints {
216-
go func(endpoint string) {
217-
defer wg.Done()
218-
peer := &NetworkMember{
219-
Endpoint: endpoint,
220-
InternalEndpoint: endpoint,
221-
}
222-
if !d.comm.Ping(peer) {
223-
d.logger.Warning("Failed connecting to bootstrap peer", endpoint)
224-
return
225-
}
226-
d.comm.SendToPeer(peer, req)
227-
}(endpoint)
210+
d.port = int(myPort)
211+
}
212+
213+
func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) {
214+
nonce := message.Nonce
215+
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
216+
sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5)
217+
d.comm.SendToPeer(peer, message)
218+
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
219+
return
228220
}
229-
wg.Wait()
230221
time.Sleep(getReconnectInterval())
231222
}
232223
}
@@ -962,17 +953,6 @@ func getReconnectInterval() time.Duration {
962953
return util.GetDurationOrDefault("peer.gossip.reconnectInterval", getAliveExpirationTimeout())
963954
}
964955

965-
func filterOutLocalhost(endpoints []string, port int) []string {
966-
var returnedEndpoints []string
967-
for _, endpoint := range endpoints {
968-
if endpoint == fmt.Sprintf("127.0.0.1:%d", port) || endpoint == fmt.Sprintf("localhost:%d", port) {
969-
continue
970-
}
971-
returnedEndpoints = append(returnedEndpoints, endpoint)
972-
}
973-
return returnedEndpoints
974-
}
975-
976956
type aliveMsgStore struct {
977957
msgstore.MessageStore
978958
}

gossip/discovery/discovery_test.go

+8-17
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,13 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
314314
}
315315
s := grpc.NewServer()
316316

317-
discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm, pol)
317+
discSvc := NewDiscoveryService(self, comm, comm, pol)
318+
for _, bootPeer := range bootstrapPeers {
319+
discSvc.Connect(NetworkMember{Endpoint: bootPeer, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
320+
return &PeerIdentification{SelfOrg: true, ID: common.PKIidType(bootPeer)}, nil
321+
})
322+
}
323+
318324
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip, port: port}
319325

320326
proto.RegisterGossipServer(s, gossInst)
@@ -580,7 +586,7 @@ func TestGossipDiscoverySkipConnectingToLocalhostBootstrap(t *testing.T) {
580586
inst := createDiscoveryInstance(11611, "d1", []string{"localhost:11611", "127.0.0.1:11611"})
581587
inst.comm.lock.Lock()
582588
inst.comm.mock = &mock.Mock{}
583-
inst.comm.mock.On("SendToPeer", mock.Anything).Run(func(mock.Arguments) {
589+
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(mock.Arguments) {
584590
t.Fatal("Should not have connected to any peer")
585591
})
586592
inst.comm.mock.On("Ping", mock.Anything).Run(func(mock.Arguments) {
@@ -806,21 +812,6 @@ func TestConfigFromFile(t *testing.T) {
806812
assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval())
807813
}
808814

809-
func TestFilterOutLocalhost(t *testing.T) {
810-
t.Parallel()
811-
endpoints := []string{"localhost:5611", "127.0.0.1:5611", "1.2.3.4:5611"}
812-
assert.Len(t, filterOutLocalhost(endpoints, 5611), 1)
813-
endpoints = []string{"1.2.3.4:5611"}
814-
assert.Len(t, filterOutLocalhost(endpoints, 5611), 1)
815-
endpoints = []string{"localhost:5611", "127.0.0.1:5611"}
816-
assert.Len(t, filterOutLocalhost(endpoints, 5611), 0)
817-
// Check slice returned is a copy
818-
endpoints = []string{"localhost:5611", "127.0.0.1:5611", "1.2.3.4:5611"}
819-
endpoints2 := filterOutLocalhost(endpoints, 5611)
820-
endpoints2[0] = "bla bla"
821-
assert.NotEqual(t, endpoints[2], endpoints[0])
822-
}
823-
824815
func TestMsgStoreExpiration(t *testing.T) {
825816
// Starts 4 instances, wait for membership to build, stop 2 instances
826817
// Check that membership in 2 running instances become 2

gossip/gossip/anchor_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,69 @@ func TestAnchorPeer(t *testing.T) {
287287
pm1.finishedSignal.Wait()
288288
pm2.finishedSignal.Wait()
289289
}
290+
291+
func TestBootstrapPeerMisConfiguration(t *testing.T) {
292+
t.Parallel()
293+
// Scenario:
294+
// The peer 'p' is a peer in orgA
295+
// Peers bs1 and bs2 are bootstrap peers.
296+
// bs1 is in orgB, so p shouldn't connect to it.
297+
// bs2 is in orgA, so p should connect to it.
298+
// We test by intercepting *all* messages that bs1 and bs2 get from p, that:
299+
// 1) At least 3 connection attempts were sent from p to bs1
300+
// 2) A membership request was sent from p to bs2
301+
302+
cs := &configurableCryptoService{m: make(map[string]api.OrgIdentityType)}
303+
portPrefix := 43478
304+
orgA := "orgA"
305+
orgB := "orgB"
306+
cs.putInOrg(portPrefix, orgA)
307+
cs.putInOrg(portPrefix+1, orgB)
308+
cs.putInOrg(portPrefix+2, orgA)
309+
310+
onlyHandshakes := func(t *testing.T, index int, m *receivedMsg) {
311+
// Ensure all messages sent are connection establishment messages
312+
// that are probing attempts
313+
assert.NotNil(t, m.GetConn())
314+
// If the logic we test in this test- fails,
315+
// the first message would be a membership request,
316+
// so this assertion would capture it and print a corresponding failure
317+
assert.Nil(t, m.GetMemReq())
318+
}
319+
// Initialize a peer mock that would wait for 3 messages sent to it
320+
bs1 := newPeerMock(portPrefix+1, 3, t, onlyHandshakes)
321+
defer bs1.stop()
322+
323+
membershipRequestsSent := make(chan struct{}, 100)
324+
detectMembershipRequest := func(t *testing.T, index int, m *receivedMsg) {
325+
if m.GetMemReq() != nil {
326+
membershipRequestsSent <- struct{}{}
327+
}
328+
}
329+
330+
bs2 := newPeerMock(portPrefix+2, 0, t, detectMembershipRequest)
331+
defer bs2.stop()
332+
333+
p := newGossipInstanceWithExternalEndpoint(portPrefix, 0, cs, fmt.Sprintf("localhost:%d", portPrefix), 1, 2)
334+
defer p.Stop()
335+
336+
// Wait for 3 handshake attempts from the bootstrap peer from orgB,
337+
// to prove that the peer did try to probe the bootstrap peer from orgB
338+
got3Handshakes := make(chan struct{})
339+
go func() {
340+
bs1.finishedSignal.Wait()
341+
got3Handshakes <- struct{}{}
342+
}()
343+
344+
select {
345+
case <-got3Handshakes:
346+
case <-time.After(time.Second * 15):
347+
assert.Fail(t, "Didn't detect 3 handshake attempts to the bootstrap peer from orgB")
348+
}
349+
350+
select {
351+
case <-membershipRequestsSent:
352+
case <-time.After(time.Second * 15):
353+
assert.Fail(t, "Bootstrap peer didn't receive a membership request from the peer within a timely manner")
354+
}
355+
}

gossip/gossip/gossip_impl.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
124124

125125
g.discAdapter = g.newDiscoveryAdapter()
126126
g.disSecAdap = g.newDiscoverySecurityAdapter()
127-
g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
127+
g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
128128
g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())
129129

130130
g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
@@ -135,6 +135,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
135135

136136
go g.start()
137137
go g.periodicalIdentityValidationAndExpiration()
138+
go g.connect2BootstrapPeers()
138139

139140
return g
140141
}
@@ -1033,6 +1034,32 @@ func (g *gossipServiceImpl) sameOrgOrOurOrgPullFilter(msg proto.ReceivedMessage)
10331034
}
10341035
}
10351036

1037+
func (g *gossipServiceImpl) connect2BootstrapPeers() {
1038+
for _, endpoint := range g.conf.BootstrapPeers {
1039+
endpoint := endpoint
1040+
identifier := func() (*discovery.PeerIdentification, error) {
1041+
remotePeerIdentity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint})
1042+
if err != nil {
1043+
return nil, err
1044+
}
1045+
sameOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(remotePeerIdentity))
1046+
if !sameOrg {
1047+
return nil, fmt.Errorf("%s isn't in our organization, cannot be a bootstrap peer", endpoint)
1048+
}
1049+
pkiID := g.mcs.GetPKIidOfCert(remotePeerIdentity)
1050+
if len(pkiID) == 0 {
1051+
return nil, fmt.Errorf("Wasn't able to extract PKI-ID of remote peer with identity of %v", remotePeerIdentity)
1052+
}
1053+
return &discovery.PeerIdentification{ID: pkiID, SelfOrg: sameOrg}, nil
1054+
}
1055+
g.disc.Connect(discovery.NetworkMember{
1056+
InternalEndpoint: endpoint,
1057+
Endpoint: endpoint,
1058+
}, identifier)
1059+
}
1060+
1061+
}
1062+
10361063
func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
10371064
pkiID := g.comm.GetPKIid()
10381065
stateInfMsg := &proto.StateInfo{

sampleconfig/core.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ peer:
7373
# Bootstrap set to initialize gossip with.
7474
# This is a list of peers that the peer reaches out to at startup.
7575
# Important: The endpoints here have to be endpoints of peers in the same
76-
# organization, because the connection establishment process to these peers
77-
# exposes the internal endpoint of this peer.
76+
# organization, because the peer would refuse connecting to these endpoints
77+
# unless they are in the same organization as the peer.
7878
bootstrap: 127.0.0.1:7051
7979

8080
# NOTE: orgLeader and useLeaderElection parameters are mutual exclusive

0 commit comments

Comments
 (0)