Skip to content

Commit 48987d2

Browse files
committed
[FAB-2624] Gossip Anchor peer connect bug fix
When the PEM was removed from the anchor peers, the Connect() method of the discovery layer wasn't updated accordingly. As a result, all anchor peers were mapped to the same (empty string) key in the discovery layer This commit: 1) Changes the logic and semantics of the Connect(): It now creates a membership request that doesn't (or does, up to the caller) have the internal endpoint inside of it, according to the param passed. This is because we don't have knowledge of whether the remote anchor peer is in our organization or not. 2) Changes the test case of the gossip:ConnectToAnchorPeers to simulate a situation that would've caught the bug had it been tested in the first place: Starting 10 peers, waiting a bit and then starting a *random* anchor peer out of the 3 anchor peers given to the 10 peers. With the bug- all peers would've mapped all anchor peers to the same key, thus unable to connect with probability of 2/3. 3) Changes the test of TestConnect in the discovery package to test that the when the Connect() is invoked with false, meaning- not to send the internal endpoint, the first membershipRequest that is sent indeed doesn't contain the internal endpoint. and also adds a test for each of the options (true, false) of the method createMembershipRequest(includeInternalEndpoint) Signed-off-by: Yacov Manevich <[email protected]> Change-Id: Ibd167188dac214951adedfc2ef050d635c6db79f
1 parent bc7f9d8 commit 48987d2

File tree

5 files changed

+101
-35
lines changed

5 files changed

+101
-35
lines changed

gossip/discovery/discovery.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,7 @@ type Discovery interface {
117117
InitiateSync(peerNum int)
118118

119119
// Connect makes this instance to connect to a remote instance
120-
Connect(NetworkMember)
120+
// The sendInternalEndpoint param determines whether or not
121+
// to include the internal endpoint in the membership request.
122+
Connect(member NetworkMember, sendInternalEndpoint bool)
121123
}

gossip/discovery/discovery_impl.go

+38-25
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
const defaultHelloInterval = time.Duration(5) * time.Second
3737

3838
var aliveExpirationCheckInterval time.Duration
39+
var maxConnectionAttempts = 120
3940

4041
// SetAliveTimeInterval sets the alive time interval
4142
func SetAliveTimeInterval(interval time.Duration) {
@@ -132,24 +133,30 @@ func (d *gossipDiscoveryImpl) Exists(PKIID common.PKIidType) bool {
132133
return exists
133134
}
134135

135-
func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
136+
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, sendInternalEndpoint bool) {
136137
d.logger.Debug("Entering", member)
137138
defer d.logger.Debug("Exiting")
138139

139-
d.lock.Lock()
140-
defer d.lock.Unlock()
141-
142-
if _, exists := d.id2Member[string(member.PKIid)]; exists {
143-
d.logger.Info("Member", member, "already known")
144-
return
145-
}
140+
req := d.createMembershipRequest(sendInternalEndpoint).NoopSign()
146141

147-
d.deadLastTS[string(member.PKIid)] = &timestamp{
148-
incTime: time.Unix(0, 0),
149-
lastSeen: time.Now(),
150-
seqNum: 0,
151-
}
152-
d.id2Member[string(member.PKIid)] = &member
142+
go func() {
143+
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
144+
peer := &NetworkMember{
145+
InternalEndpoint: member.InternalEndpoint,
146+
Endpoint: member.Endpoint,
147+
}
148+
if !d.comm.Ping(peer) {
149+
if d.toDie() {
150+
return
151+
}
152+
d.logger.Warning("Could not connect to", member)
153+
time.Sleep(getReconnectInterval())
154+
continue
155+
}
156+
d.comm.SendToPeer(peer, req)
157+
return
158+
}
159+
}()
153160
}
154161

155162
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
@@ -175,7 +182,7 @@ func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
175182

176183
for !d.somePeerIsKnown() {
177184
var wg sync.WaitGroup
178-
req := d.createMembershipRequest().NoopSign()
185+
req := d.createMembershipRequest(true).NoopSign()
179186
wg.Add(len(endpoints))
180187
for _, endpoint := range endpoints {
181188
go func(endpoint string) {
@@ -206,7 +213,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
206213
return
207214
}
208215
var peers2SendTo []*NetworkMember
209-
memReq := d.createMembershipRequest().NoopSign()
216+
memReq := d.createMembershipRequest(true).NoopSign()
210217

211218
d.lock.RLock()
212219

@@ -384,7 +391,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, intern
384391

385392
func (d *gossipDiscoveryImpl) createMembershipResponse(targetMember *NetworkMember) *proto.MembershipResponse {
386393
shouldBeDisclosed, omitConcealedFields := d.disclosurePolicy(targetMember)
387-
aliveMsg := d.createAliveMessage()
394+
aliveMsg := d.createAliveMessage(true)
388395

389396
if !shouldBeDisclosed(aliveMsg) {
390397
return nil
@@ -525,7 +532,7 @@ func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
525532
defer wg.Done()
526533
if d.comm.Ping(&member) {
527534
d.logger.Debug(member, "is responding, sending membership request")
528-
d.sendMembershipRequest(&member)
535+
d.sendMembershipRequest(&member, true)
529536
} else {
530537
d.logger.Debug(member, "is still dead")
531538
}
@@ -538,13 +545,13 @@ func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
538545
}
539546
}
540547

541-
func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember) {
542-
d.comm.SendToPeer(member, d.createMembershipRequest())
548+
func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember, includeInternalEndpoint bool) {
549+
d.comm.SendToPeer(member, d.createMembershipRequest(includeInternalEndpoint))
543550
}
544551

545-
func (d *gossipDiscoveryImpl) createMembershipRequest() *proto.SignedGossipMessage {
552+
func (d *gossipDiscoveryImpl) createMembershipRequest(includeInternalEndpoint bool) *proto.SignedGossipMessage {
546553
req := &proto.MembershipRequest{
547-
SelfInformation: d.createAliveMessage().Envelope,
554+
SelfInformation: d.createAliveMessage(includeInternalEndpoint).Envelope,
548555
// TODO: sending the known peers is not secure because the remote peer might shouldn't know
549556
// TODO: about the known peers. I'm deprecating this until a secure mechanism will be implemented.
550557
// TODO: See FAB-2570 for tracking this issue.
@@ -649,11 +656,11 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
649656
for !d.toDie() {
650657
d.logger.Debug("Sleeping", getAliveTimeInterval())
651658
time.Sleep(getAliveTimeInterval())
652-
d.comm.Gossip(d.createAliveMessage())
659+
d.comm.Gossip(d.createAliveMessage(true))
653660
}
654661
}
655662

656-
func (d *gossipDiscoveryImpl) createAliveMessage() *proto.SignedGossipMessage {
663+
func (d *gossipDiscoveryImpl) createAliveMessage(includeInternalEndpoint bool) *proto.SignedGossipMessage {
657664
d.lock.Lock()
658665
d.seqNum++
659666
seqNum := d.seqNum
@@ -682,10 +689,16 @@ func (d *gossipDiscoveryImpl) createAliveMessage() *proto.SignedGossipMessage {
682689
},
683690
}
684691

685-
return &proto.SignedGossipMessage{
692+
signedMsg := &proto.SignedGossipMessage{
686693
GossipMessage: msg2Gossip,
687694
Envelope: d.crypt.SignMessage(msg2Gossip, internalEndpoint),
688695
}
696+
697+
if !includeInternalEndpoint {
698+
signedMsg.Envelope.SecretEnvelope = nil
699+
}
700+
701+
return signedMsg
689702
}
690703

691704
func (d *gossipDiscoveryImpl) learnExistingMembers(aliveArr []*proto.SignedGossipMessage) {

gossip/discovery/discovery_test.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package discovery
1919
import (
2020
"fmt"
2121
"io"
22+
"math/rand"
2223
"net"
2324
"sort"
2425
"strconv"
@@ -45,6 +46,7 @@ func init() {
4546
SetAliveExpirationTimeout(10 * aliveTimeInterval)
4647
SetAliveExpirationCheckInterval(aliveTimeInterval)
4748
SetReconnectInterval(10 * aliveTimeInterval)
49+
maxConnectionAttempts = 10000
4850
}
4951

5052
type dummyCommModule struct {
@@ -117,7 +119,7 @@ func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGo
117119
comm.lock.RUnlock()
118120

119121
if mock != nil {
120-
mock.Called()
122+
mock.Called(peer, msg)
121123
}
122124

123125
if !exists {
@@ -329,19 +331,50 @@ func TestConnect(t *testing.T) {
329331
t.Parallel()
330332
nodeNum := 10
331333
instances := []*gossipInstance{}
334+
firstSentMemReqMsgs := make(chan *proto.SignedGossipMessage, nodeNum)
332335
for i := 0; i < nodeNum; i++ {
333336
inst := createDiscoveryInstance(7611+i, fmt.Sprintf("d%d", i), []string{})
337+
338+
inst.comm.lock.Lock()
339+
inst.comm.mock = &mock.Mock{}
340+
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
341+
inst := inst
342+
msg := arguments.Get(1).(*proto.SignedGossipMessage)
343+
if req := msg.GetMemReq(); req != nil {
344+
selfMsg, _ := req.SelfInformation.ToGossipMessage()
345+
firstSentMemReqMsgs <- selfMsg
346+
inst.comm.lock.Lock()
347+
inst.comm.mock = nil
348+
inst.comm.lock.Unlock()
349+
}
350+
})
351+
inst.comm.mock.On("Ping", mock.Anything)
352+
inst.comm.lock.Unlock()
353+
334354
instances = append(instances, inst)
335355
j := (i + 1) % 10
336356
endpoint := fmt.Sprintf("localhost:%d", 7611+j)
337357
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
338-
inst.Connect(netMember2Connect2)
358+
inst.Connect(netMember2Connect2, false)
359+
}
360+
361+
time.Sleep(time.Second * 1)
362+
assert.Len(t, firstSentMemReqMsgs, 10)
363+
close(firstSentMemReqMsgs)
364+
for firstSentSelfMsg := range firstSentMemReqMsgs {
365+
assert.Nil(t, firstSentSelfMsg.Envelope.SecretEnvelope)
339366
}
340367

341368
fullMembership := func() bool {
342369
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
343370
}
344371
waitUntilOrFail(t, fullMembership)
372+
373+
discInst := instances[rand.Intn(len(instances))].Discovery.(*gossipDiscoveryImpl)
374+
am, _ := discInst.createMembershipRequest(true).GetMemReq().SelfInformation.ToGossipMessage()
375+
assert.NotNil(t, am.SecretEnvelope)
376+
am, _ = discInst.createMembershipRequest(false).GetMemReq().SelfInformation.ToGossipMessage()
377+
assert.Nil(t, am.SecretEnvelope)
345378
stopInstances(t, instances)
346379
}
347380

gossip/gossip/gossip_impl.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,9 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
196196
continue
197197
}
198198

199+
inOurOrg := bytes.Equal(g.selfOrg, ap.OrgID)
199200
g.disc.Connect(discovery.NetworkMember{
200-
InternalEndpoint: endpoint, Endpoint: endpoint})
201+
InternalEndpoint: endpoint, Endpoint: endpoint}, inOurOrg)
201202
}
202203
}
203204

gossip/gossip/gossip_test.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"errors"
2222
"fmt"
23+
"math/rand"
2324
"runtime"
2425
"strconv"
2526
"strings"
@@ -49,7 +50,7 @@ func init() {
4950
discovery.SetAliveTimeInterval(aliveTimeInterval)
5051
discovery.SetAliveExpirationCheckInterval(aliveTimeInterval)
5152
discovery.SetAliveExpirationTimeout(aliveTimeInterval * 10)
52-
discovery.SetReconnectInterval(aliveTimeInterval * 5)
53+
discovery.SetReconnectInterval(aliveTimeInterval)
5354
testWG.Add(7)
5455
factory.InitFactories(nil)
5556
}
@@ -321,15 +322,21 @@ func TestPull(t *testing.T) {
321322

322323
func TestConnectToAnchorPeers(t *testing.T) {
323324
t.Parallel()
325+
// Scenario: spawn 10 peers, and have them join a channel
326+
// of 3 anchor peers that don't exist yet.
327+
// Wait 5 seconds, and then spawn a random anchor peer out of the 3.
328+
// Ensure that all peers successfully see each other in the channel
329+
324330
portPrefix := 8610
325331
// Scenario: Spawn 5 peers, and make each of them connect to
326332
// the other 2 using join channel.
327333
stopped := int32(0)
328334
go waitForTestCompletion(&stopped, t)
329-
n := 5
335+
n := 10
336+
anchorPeercount := 3
330337

331338
jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}}
332-
for i := 0; i < n; i++ {
339+
for i := 0; i < anchorPeercount; i++ {
333340
ap := api.AnchorPeer{
334341
Port: portPrefix + i,
335342
Host: "localhost",
@@ -343,18 +350,28 @@ func TestConnectToAnchorPeers(t *testing.T) {
343350
for i := 0; i < n; i++ {
344351
wg.Add(1)
345352
go func(i int) {
346-
peers[i] = newGossipInstance(portPrefix, i, 100)
353+
peers[i] = newGossipInstance(portPrefix, i+anchorPeercount, 100)
347354
peers[i].JoinChan(jcm, common.ChainID("A"))
348355
peers[i].UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A"))
349356
wg.Done()
350357
}(i)
351358
}
359+
352360
waitUntilOrFailBlocking(t, wg.Wait)
353-
waitUntilOrFail(t, checkPeersMembership(t, peers, n-1))
361+
362+
time.Sleep(time.Second * 5)
363+
364+
// Now start a random anchor peer
365+
anchorPeer := newGossipInstance(portPrefix, rand.Intn(anchorPeercount), 100)
366+
anchorPeer.JoinChan(jcm, common.ChainID("A"))
367+
anchorPeer.UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A"))
368+
369+
defer anchorPeer.Stop()
370+
waitUntilOrFail(t, checkPeersMembership(t, peers, n))
354371

355372
channelMembership := func() bool {
356373
for _, peer := range peers {
357-
if len(peer.PeersOfChannel(common.ChainID("A"))) != n-1 {
374+
if len(peer.PeersOfChannel(common.ChainID("A"))) != n {
358375
return false
359376
}
360377
}

0 commit comments

Comments
 (0)