Skip to content

Commit d2c8fed

Browse files
committed
[FAB-1913] Connect to anchor peers at join Channel
When the peer joins a channel it gets a list of anchor peers. We need the gossip layer to reach out to these anchor peers and connect to them. - Added 2 tests (one in the discovery module and one in the gossip module) I tested this using the following setup: used docs/docker-compose-channel.yml And added another peer, and didn't give that peer a bootstrap peer, so both peers don't know of each other at startup. Then I created an anchorPeer file with the ip address (172.21.0.4) and port of one of the peers. After that, I created a channel with that anchor peer file, and made the other peer to join the channel. The peer tried contacting the anchor peer (looked at the gossip communication logs) and tried authentication with it: Sending tag:EMPTY signature:"[xxx]" conn:<pkiID:"172.21.0.4:7051" cert:"[yyyyy]" > to 172.21.0.4:53637 WARN 213 Remote endpoint claims to be a different peer, expected [PEM FILE] but got [fake PKI-ID I use until MSP is integrated with gossip] This proves that when joinChannel is called upon a peer with suitable anchor peer configuration, the peers try to connect to one another. Signed-off-by: Yacov Manevich <[email protected]> Change-Id: Ia033f81eeaf38cb53cb65dc06a01dca07342386b
1 parent d9d5170 commit d2c8fed

File tree

5 files changed

+129
-3
lines changed

5 files changed

+129
-3
lines changed

gossip/discovery/discovery.go

+3
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,7 @@ type Discovery interface {
8080
// InitiateSync makes the instance ask a given number of peers
8181
// for their membership information
8282
InitiateSync(peerNum int)
83+
84+
// Connect makes this instance to connect to a remote instance
85+
Connect(NetworkMember)
8386
}

gossip/discovery/discovery_impl.go

+25
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,31 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
133133
return d
134134
}
135135

136+
func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
137+
d.logger.Debug("Entering", member)
138+
defer d.logger.Debug("Exiting")
139+
140+
if member.PKIid == nil {
141+
d.logger.Warning("Empty PkiID, aborting")
142+
return
143+
}
144+
145+
d.lock.Lock()
146+
defer d.lock.Unlock()
147+
148+
if _, exists := d.id2Member[string(member.PKIid)]; exists {
149+
d.logger.Info("Member", member, "already known")
150+
return
151+
}
152+
153+
d.deadLastTS[string(member.PKIid)] = &timestamp{
154+
incTime: time.Unix(0, 0),
155+
lastSeen: time.Now(),
156+
seqNum: 0,
157+
}
158+
d.id2Member[string(member.PKIid)] = &member
159+
}
160+
136161
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
137162
d.logger.Info("Entering:", endpoints)
138163
defer d.logger.Info("Exiting")

gossip/discovery/discovery_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,28 @@ func bootPeer(port int) string {
258258
return fmt.Sprintf("localhost:%d", port)
259259
}
260260

261+
func TestConnect(t *testing.T) {
262+
t.Parallel()
263+
nodeNum := 10
264+
instances := []*gossipInstance{}
265+
for i := 0; i < nodeNum; i++ {
266+
inst := createDiscoveryInstance(7611+i, fmt.Sprintf("d%d", i), []string{})
267+
instances = append(instances, inst)
268+
j := (i + 1) % 10
269+
endpoint := fmt.Sprintf("localhost:%d", 7611+j)
270+
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
271+
inst.Connect(netMember2Connect2)
272+
// Check passing nil PKI-ID doesn't crash peer
273+
inst.Connect(NetworkMember{PKIid: nil, Endpoint: endpoint})
274+
}
275+
276+
fullMembership := func() bool {
277+
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
278+
}
279+
waitUntilOrFail(t, fullMembership)
280+
stopInstances(t, instances)
281+
}
282+
261283
func TestUpdate(t *testing.T) {
262284
t.Parallel()
263285
nodeNum := 5

gossip/gossip/gossip_impl.go

+18
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,24 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
158158
return
159159
}
160160
g.chanState.joinChannel(joinMsg, chainID)
161+
162+
selfPkiID := g.mcs.GetPKIidOfCert(g.selfIdentity)
163+
for _, ap := range joinMsg.AnchorPeers() {
164+
if ap.Host == "" {
165+
g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap)
166+
}
167+
if ap.Port == 0 {
168+
g.logger.Warning("Got invalid port (0), skipping connecting to anchor peer", ap)
169+
}
170+
pkiID := g.mcs.GetPKIidOfCert(ap.Cert)
171+
// Skip connecting to self
172+
if bytes.Equal([]byte(pkiID), []byte(selfPkiID)) {
173+
g.logger.Info("Anchor peer with same PKI-ID, skipping connecting to myself")
174+
continue
175+
}
176+
endpoint := fmt.Sprintf("%s:%d", ap.Host, ap.Port)
177+
g.disc.Connect(discovery.NetworkMember{Endpoint: endpoint, PKIid: pkiID})
178+
}
161179
}
162180

163181
func (g *gossipServiceImpl) handlePresumedDead() {

gossip/gossip/gossip_test.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func init() {
4747
discovery.SetExpirationTimeout(aliveTimeInterval * 10)
4848
discovery.SetReconnectInterval(aliveTimeInterval * 5)
4949

50-
testWG.Add(6)
50+
testWG.Add(7)
5151

5252
}
5353

@@ -63,6 +63,7 @@ func acceptData(m interface{}) bool {
6363
}
6464

6565
type joinChanMsg struct {
66+
anchorPeers []api.AnchorPeer
6667
}
6768

6869
// SequenceNumber returns the sequence number of the block this joinChanMsg
@@ -72,8 +73,11 @@ func (*joinChanMsg) SequenceNumber() uint64 {
7273
}
7374

7475
// AnchorPeers returns all the anchor peers that are in the channel
75-
func (*joinChanMsg) AnchorPeers() []api.AnchorPeer {
76-
return []api.AnchorPeer{{Cert: anchorPeerIdentity}}
76+
func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer {
77+
if len(jcm.anchorPeers) == 0 {
78+
return []api.AnchorPeer{{Cert: anchorPeerIdentity}}
79+
}
80+
return jcm.anchorPeers
7781
}
7882

7983
type naiveCryptoService struct {
@@ -281,6 +285,60 @@ func TestPull(t *testing.T) {
281285
testWG.Done()
282286
}
283287

288+
func TestConnectToAnchorPeers(t *testing.T) {
289+
t.Parallel()
290+
portPrefix := 8610
291+
// Scenario: Spawn 5 peers, and make each of them connect to
292+
// the other 2 using join channel.
293+
stopped := int32(0)
294+
go waitForTestCompletion(&stopped, t)
295+
n := 5
296+
297+
jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}}
298+
for i := 0; i < n; i++ {
299+
pkiID := fmt.Sprintf("localhost:%d", portPrefix+i)
300+
ap := api.AnchorPeer{
301+
Port: portPrefix + i,
302+
Host: "localhost",
303+
Cert: []byte(pkiID),
304+
}
305+
jcm.anchorPeers = append(jcm.anchorPeers, ap)
306+
}
307+
308+
peers := make([]Gossip, n)
309+
wg := sync.WaitGroup{}
310+
for i := 0; i < n; i++ {
311+
wg.Add(1)
312+
go func(i int) {
313+
peers[i] = newGossipInstance(portPrefix, i, 100)
314+
peers[i].JoinChan(jcm, common.ChainID("A"))
315+
peers[i].UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A"))
316+
wg.Done()
317+
}(i)
318+
}
319+
waitUntilOrFailBlocking(t, wg.Wait)
320+
waitUntilOrFail(t, checkPeersMembership(peers, n-1))
321+
322+
channelMembership := func() bool {
323+
for _, peer := range peers {
324+
if len(peer.PeersOfChannel(common.ChainID("A"))) != n-1 {
325+
return false
326+
}
327+
}
328+
return true
329+
}
330+
waitUntilOrFail(t, channelMembership)
331+
332+
stop := func() {
333+
stopPeers(peers)
334+
}
335+
waitUntilOrFailBlocking(t, stop)
336+
337+
fmt.Println("<<<TestConnectToAnchorPeers>>>")
338+
atomic.StoreInt32(&stopped, int32(1))
339+
testWG.Done()
340+
}
341+
284342
func TestMembership(t *testing.T) {
285343
t.Parallel()
286344
portPrefix := 4610

0 commit comments

Comments
 (0)