Skip to content

Commit f966c8c

Browse files
committed
[FAB-3109] Fix anchor peer connection logic
The current implementation of what happens when the peer connects to an anchor peer has a flawed logic. It assumes that if the probe is successful, the sending is also. That's too optimistic. Another problem is that after the handshake is performed, the remote peer may not distinguish between the closing of the connection of the handshake and the closing of the new connection that sends the MembershipRequest, since we only support 1 connection between each peer (to prevent DOS). I re-implemented the logic to do the following: 1) First, perform a handshake to know whether the remote peer is from our org or not. Peers in different orgs should not know of our internal endpoints). The handshake is performed until it succeeds or too many attempts are made. 2) Now, construct a MembershipRequest and send it until you get acked from the remote peer, or when attempts are exhausted. Added a test that simulates the flow. Change-Id: I72370d9be816105fdc5af7577ce578faf6e5abdc Signed-off-by: Yacov Manevich <[email protected]>
1 parent 5c353eb commit f966c8c

14 files changed

+623
-33
lines changed

gossip/comm/comm_impl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOp
626626
defer os.Remove(keyFileName)
627627
defer os.Remove(certFileName)
628628

629-
err = generateCertificates(keyFileName, certFileName)
629+
err = GenerateCertificates(keyFileName, certFileName)
630630
if err == nil {
631631
cert, err := tls.LoadX509KeyPair(certFileName, keyFileName)
632632
if err != nil {

gossip/comm/comm_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
111111

112112
func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte) []byte, pkiIDmutator func([]byte) []byte, mutualTLS bool) <-chan proto.ReceivedMessage {
113113
c := &commImpl{}
114-
err := generateCertificates("key.pem", "cert.pem")
114+
err := GenerateCertificates("key.pem", "cert.pem")
115115
assert.NoError(t, err, "%v", err)
116116
defer os.Remove("cert.pem")
117117
defer os.Remove("key.pem")
@@ -289,7 +289,7 @@ func TestProdConstructor(t *testing.T) {
289289
keyFileName := fmt.Sprintf("key.%d.pem", util.RandomUInt64())
290290
certFileName := fmt.Sprintf("cert.%d.pem", util.RandomUInt64())
291291

292-
generateCertificates(keyFileName, certFileName)
292+
GenerateCertificates(keyFileName, certFileName)
293293
cert, _ := tls.LoadX509KeyPair(certFileName, keyFileName)
294294
os.Remove(keyFileName)
295295
os.Remove(certFileName)
@@ -300,7 +300,7 @@ func TestProdConstructor(t *testing.T) {
300300
comm1.(*commImpl).selfCertHash = certHash
301301
go srv.Serve(lsnr)
302302

303-
generateCertificates(keyFileName, certFileName)
303+
GenerateCertificates(keyFileName, certFileName)
304304
cert, _ = tls.LoadX509KeyPair(certFileName, keyFileName)
305305
os.Remove(keyFileName)
306306
os.Remove(certFileName)
@@ -350,7 +350,7 @@ func TestCloseConn(t *testing.T) {
350350
defer comm1.Stop()
351351
acceptChan := comm1.Accept(acceptAll)
352352

353-
err := generateCertificates("key.pem", "cert.pem")
353+
err := GenerateCertificates("key.pem", "cert.pem")
354354
assert.NoError(t, err, "%v", err)
355355
defer os.Remove("cert.pem")
356356
defer os.Remove("key.pem")

gossip/comm/crypto.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func writeFile(filename string, keyType string, data []byte) error {
4343
return pem.Encode(f, &pem.Block{Type: keyType, Bytes: data})
4444
}
4545

46-
func generateCertificates(privKeyFile string, certKeyFile string) error {
46+
func GenerateCertificates(privKeyFile string, certKeyFile string) error {
4747
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
4848
if err != nil {
4949
return err

gossip/comm/crypto_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *gossipTestServer) Ping(context.Context, *proto.Empty) (*proto.Empty, er
8484
}
8585

8686
func TestCertificateExtraction(t *testing.T) {
87-
err := generateCertificates("key.pem", "cert.pem")
87+
err := GenerateCertificates("key.pem", "cert.pem")
8888
defer os.Remove("cert.pem")
8989
defer os.Remove("key.pem")
9090
assert.NoError(t, err, "%v", err)
@@ -94,7 +94,7 @@ func TestCertificateExtraction(t *testing.T) {
9494
srv := createTestServer(t, &serverCert)
9595
defer srv.stop()
9696

97-
generateCertificates("key2.pem", "cert2.pem")
97+
GenerateCertificates("key2.pem", "cert2.pem")
9898
defer os.Remove("cert2.pem")
9999
defer os.Remove("key2.pem")
100100
clientCert, err := tls.LoadX509KeyPair("cert2.pem", "key2.pem")

gossip/discovery/discovery.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ func (n NetworkMember) PreferredEndpoint() string {
9797
return n.Endpoint
9898
}
9999

100+
// PeerIdentification encompasses a remote peer's
101+
// PKI-ID and whether its in the same org as the current
102+
// peer or not
103+
type PeerIdentification struct {
104+
ID common.PKIidType
105+
SelfOrg bool
106+
}
107+
108+
type identifier func() (*PeerIdentification, error)
109+
100110
// Discovery is the interface that represents a discovery module
101111
type Discovery interface {
102112

@@ -123,7 +133,8 @@ type Discovery interface {
123133
InitiateSync(peerNum int)
124134

125135
// Connect makes this instance to connect to a remote instance
126-
// The sendInternalEndpoint param determines whether or not
127-
// to include the internal endpoint in the membership request,
128-
Connect(member NetworkMember, sendInternalEndpoint func() bool)
136+
// The identifier param is a function that can be used to identify
137+
// the peer, and to assert its PKI-ID, whether its in the peer's org or not,
138+
// and whether the action was successful or not
139+
Connect(member NetworkMember, id identifier)
129140
}

gossip/discovery/discovery_impl.go

+38-14
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ func SetReconnectInterval(interval time.Duration) {
6060
viper.Set("peer.gossip.reconnectInterval", interval)
6161
}
6262

63+
// SetMaxConnAttempts sets the maximum number of connection
64+
// attempts the peer would perform when invoking Connect()
65+
func SetMaxConnAttempts(attempts int) {
66+
maxConnectionAttempts = attempts
67+
}
68+
6369
type timestamp struct {
6470
incTime time.Time
6571
seqNum uint64
@@ -92,6 +98,7 @@ type gossipDiscoveryImpl struct {
9298
toDieFlag int32
9399
logger *logging.Logger
94100
disclosurePolicy DisclosurePolicy
101+
pubsub *util.PubSub
95102
}
96103

97104
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
@@ -112,6 +119,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
112119
toDieFlag: int32(0),
113120
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint),
114121
disclosurePolicy: disPol,
122+
pubsub: util.NewPubSub(),
115123
}
116124

117125
d.msgStore = newAliveMsgStore(d)
@@ -137,32 +145,47 @@ func (d *gossipDiscoveryImpl) Lookup(PKIID common.PKIidType) *NetworkMember {
137145
return nm
138146
}
139147

140-
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, sendInternalEndpoint func() bool) {
148+
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
141149
d.logger.Debug("Entering", member)
142150
defer d.logger.Debug("Exiting")
143-
144151
go func() {
145152
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
146-
peer := &NetworkMember{
147-
InternalEndpoint: member.InternalEndpoint,
148-
Endpoint: member.Endpoint,
149-
}
150-
151-
if !d.comm.Ping(peer) {
153+
id, err := id()
154+
if err != nil {
152155
if d.toDie() {
153156
return
154157
}
155-
d.logger.Warning("Could not connect to", member)
158+
d.logger.Warning("Could not connect to", member, ":", err)
156159
time.Sleep(getReconnectInterval())
157160
continue
158161
}
159-
req := d.createMembershipRequest(sendInternalEndpoint()).NoopSign()
160-
d.comm.SendToPeer(peer, req)
162+
peer := &NetworkMember{
163+
InternalEndpoint: member.InternalEndpoint,
164+
Endpoint: member.Endpoint,
165+
PKIid: id.ID,
166+
}
167+
req := d.createMembershipRequest(id.SelfOrg).NoopSign()
168+
req.Nonce = util.RandomUInt64()
169+
req.NoopSign()
170+
go d.sendUntilAcked(peer, req)
161171
return
162172
}
173+
163174
}()
164175
}
165176

177+
func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) {
178+
nonce := message.Nonce
179+
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
180+
sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5)
181+
d.comm.SendToPeer(peer, message)
182+
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
183+
return
184+
}
185+
time.Sleep(getReconnectInterval())
186+
}
187+
}
188+
166189
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
167190
if len(d.self.InternalEndpoint) == 0 {
168191
d.logger.Panic("Internal endpoint is empty:", d.self.InternalEndpoint)
@@ -319,7 +342,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
319342
// Sending a membership response to a peer may block this routine
320343
// in case the sending is deliberately slow (i.e attack).
321344
// will keep this async until I'll write a timeout detector in the comm layer
322-
go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint)
345+
go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint, m.Nonce)
323346
return
324347
}
325348

@@ -335,6 +358,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
335358
}
336359

337360
if memResp := m.GetMemRes(); memResp != nil {
361+
d.pubsub.Publish(fmt.Sprintf("%d", m.Nonce), m.Nonce)
338362
for _, env := range memResp.Alive {
339363
am, err := env.ToGossipMessage()
340364
if err != nil {
@@ -378,7 +402,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
378402
}
379403
}
380404

381-
func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string) {
405+
func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string, nonce uint64) {
382406
d.logger.Debug("Entering", targetMember)
383407

384408
targetPeer := &NetworkMember{
@@ -400,7 +424,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, intern
400424

401425
d.comm.SendToPeer(targetPeer, (&proto.GossipMessage{
402426
Tag: proto.GossipMessage_EMPTY,
403-
Nonce: uint64(0),
427+
Nonce: nonce,
404428
Content: &proto.GossipMessage_MemRes{
405429
MemRes: memResp,
406430
},

gossip/discovery/discovery_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ func TestConnect(t *testing.T) {
392392
j := (i + 1) % 10
393393
endpoint := fmt.Sprintf("localhost:%d", 7611+j)
394394
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
395-
inst.Connect(netMember2Connect2, func() bool { return false })
395+
inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) {
396+
return &PeerIdentification{SelfOrg: false, ID: nil}, nil
397+
})
396398
}
397399

398400
time.Sleep(time.Second * 3)

0 commit comments

Comments
 (0)