@@ -27,7 +27,9 @@ import (
27
27
"sync/atomic"
28
28
"time"
29
29
30
+ "github.com/hyperledger/fabric/gossip/api"
30
31
"github.com/hyperledger/fabric/gossip/common"
32
+ "github.com/hyperledger/fabric/gossip/identity"
31
33
"github.com/hyperledger/fabric/gossip/proto"
32
34
"github.com/hyperledger/fabric/gossip/util"
33
35
"github.com/op/go-logging"
@@ -62,7 +64,7 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
62
64
}
63
65
64
66
// NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server
65
- func NewCommInstanceWithServer (port int , sec SecurityProvider , pkID common. PKIidType , dialOpts ... grpc.DialOption ) (Comm , error ) {
67
+ func NewCommInstanceWithServer (port int , idMapper identity. Mapper , peerIdentity api. PeerIdentityType , dialOpts ... grpc.DialOption ) (Comm , error ) {
66
68
var ll net.Listener
67
69
var s * grpc.Server
68
70
var secOpt grpc.DialOption
@@ -77,10 +79,11 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIid
77
79
}
78
80
79
81
commInst := & commImpl {
82
+ PKIID : idMapper .GetPKIidOfCert (peerIdentity ),
83
+ idMapper : idMapper ,
80
84
logger : util .GetLogger (util .LOGGING_COMM_MODULE , fmt .Sprintf ("%d" , port )),
81
- PKIID : pkID ,
85
+ peerIdentity : peerIdentity ,
82
86
opts : dialOpts ,
83
- sec : sec ,
84
87
port : port ,
85
88
lsnr : ll ,
86
89
gSrv : s ,
@@ -92,15 +95,15 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIid
92
95
subscriptions : make ([]chan ReceivedMessage , 0 ),
93
96
blackListedPKIIDs : make ([]common.PKIidType , 0 ),
94
97
}
95
- commInst .connStore = newConnStore (commInst , pkID , commInst .logger )
98
+ commInst .connStore = newConnStore (commInst , commInst .logger )
99
+ commInst .idMapper .Put (idMapper .GetPKIidOfCert (peerIdentity ), peerIdentity )
96
100
97
101
if port > 0 {
98
102
go func () {
99
103
commInst .stopWG .Add (1 )
100
104
defer commInst .stopWG .Done ()
101
105
s .Serve (ll )
102
106
}()
103
-
104
107
proto .RegisterGossipServer (s , commInst )
105
108
}
106
109
@@ -110,8 +113,8 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIid
110
113
}
111
114
112
115
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
113
- func NewCommInstance (s * grpc.Server , sec SecurityProvider , PKIID common. PKIidType , dialOpts ... grpc.DialOption ) (Comm , error ) {
114
- commInst , err := NewCommInstanceWithServer (- 1 , sec , PKIID , dialOpts ... )
116
+ func NewCommInstance (s * grpc.Server , idStore identity. Mapper , peerIdentity api. PeerIdentityType , dialOpts ... grpc.DialOption ) (Comm , error ) {
117
+ commInst , err := NewCommInstanceWithServer (- 1 , idStore , peerIdentity , dialOpts ... )
115
118
if err != nil {
116
119
return nil , err
117
120
}
@@ -120,8 +123,9 @@ func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID common.PKIidTyp
120
123
}
121
124
122
125
type commImpl struct {
126
+ peerIdentity api.PeerIdentityType
127
+ idMapper identity.Mapper
123
128
logger * util.Logger
124
- sec SecurityProvider
125
129
opts []grpc.DialOption
126
130
connStore * connectionStore
127
131
PKIID []byte
@@ -168,7 +172,6 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
168
172
if expectedPKIID != nil && ! bytes .Equal (pkiID , expectedPKIID ) {
169
173
// PKIID is nil when we don't know the remote PKI id's
170
174
c .logger .Warning ("Remote endpoint claims to be a different peer, expected" , expectedPKIID , "but got" , pkiID )
171
- cc .Close ()
172
175
return nil , fmt .Errorf ("Authentication failure" )
173
176
}
174
177
conn := newConnection (cl , cc , stream , nil )
@@ -252,18 +255,20 @@ func (c *commImpl) isStopping() bool {
252
255
return atomic .LoadInt32 (& c .stopping ) == int32 (1 )
253
256
}
254
257
255
- func (c * commImpl ) Probe (peer * RemotePeer ) error {
258
+ func (c * commImpl ) Probe (remotePeer * RemotePeer ) error {
259
+ endpoint := remotePeer .Endpoint
260
+ pkiID := remotePeer .PKIID
256
261
if c .isStopping () {
257
262
return fmt .Errorf ("Stopping" )
258
263
}
259
- c .logger .Debug ("Entering, endpoint:" , peer . Endpoint , "PKIID:" , peer . PKIID )
264
+ c .logger .Debug ("Entering, endpoint:" , endpoint , "PKIID:" , pkiID )
260
265
var err error
261
266
262
267
opts := c .opts
263
268
if opts == nil {
264
269
opts = []grpc.DialOption {grpc .WithInsecure (), grpc .WithTimeout (dialTimeout )}
265
270
}
266
- cc , err := grpc .Dial (peer . Endpoint , append (opts , grpc .WithBlock ())... )
271
+ cc , err := grpc .Dial (endpoint , append (opts , grpc .WithBlock ())... )
267
272
if err != nil {
268
273
c .logger .Debug ("Returning" , err )
269
274
return err
@@ -373,45 +378,54 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
373
378
tlsUnique := ExtractTLSUnique (ctx )
374
379
var sig []byte
375
380
var err error
376
- if tlsUnique != nil && c . sec . IsEnabled () {
377
- sig , err = c .sec .Sign (tlsUnique )
381
+ if tlsUnique != nil {
382
+ sig , err = c .idMapper .Sign (tlsUnique )
378
383
if err != nil {
379
384
c .logger .Error ("Failed signing TLS-Unique:" , err )
380
385
return nil , err
381
386
}
382
387
}
383
388
384
- cMsg := createConnectionMsg (c .PKIID , sig )
389
+ cMsg := createConnectionMsg (c .PKIID , sig , c .peerIdentity )
390
+ c .logger .Debug ("Sending" , cMsg , "to" , remoteAddress )
385
391
stream .Send (cMsg )
386
392
m := readWithTimeout (stream , defConnTimeout )
387
393
if m == nil {
388
394
c .logger .Warning ("Timed out waiting for connection message from" , remoteAddress )
389
395
return nil , fmt .Errorf ("Timed out" )
390
396
}
391
- connMsg := m .GetConn ()
392
- if connMsg == nil {
393
- c .logger .Warning ("Expected connection message but got" , connMsg )
397
+ receivedMsg := m .GetConn ()
398
+ if receivedMsg == nil {
399
+ c .logger .Warning ("Expected connection message but got" , receivedMsg )
394
400
return nil , fmt .Errorf ("Wrong type" )
395
401
}
396
- if c .isPKIblackListed (connMsg .PkiID ) {
402
+
403
+ if receivedMsg .PkiID == nil {
404
+ c .logger .Warning ("%s didn't send a pkiID" )
405
+ return nil , fmt .Errorf ("%s didn't send a pkiID" , remoteAddress )
406
+ }
407
+
408
+ if c .isPKIblackListed (receivedMsg .PkiID ) {
397
409
c .logger .Warning ("Connection attempt from" , remoteAddress , "but it is black-listed" )
398
410
return nil , fmt .Errorf ("Black-listed" )
399
411
}
412
+ c .logger .Debug ("Received" , receivedMsg , "from" , remoteAddress )
413
+ err = c .idMapper .Put (receivedMsg .PkiID , receivedMsg .Cert )
414
+ if err != nil {
415
+ c .logger .Warning ("Identity store rejected" , remoteAddress , ":" , err )
416
+ return nil , err
417
+ }
400
418
401
- if tlsUnique != nil && c . sec . IsEnabled () {
402
- err = c .sec .Verify (connMsg .PkiID , connMsg .Sig , tlsUnique )
419
+ if tlsUnique != nil {
420
+ err = c .idMapper .Verify (receivedMsg .PkiID , receivedMsg .Sig , tlsUnique )
403
421
if err != nil {
404
422
c .logger .Error ("Failed verifying signature from" , remoteAddress , ":" , err )
405
423
return nil , err
406
424
}
407
425
}
408
426
409
- if connMsg .PkiID == nil {
410
- return nil , fmt .Errorf ("%s didn't send a pkiID" , "Didn't send a pkiID" )
411
- }
412
-
413
427
c .logger .Debug ("Authenticated" , remoteAddress )
414
- return connMsg .PkiID , nil
428
+ return receivedMsg .PkiID , nil
415
429
416
430
}
417
431
@@ -486,12 +500,13 @@ func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMes
486
500
}
487
501
}
488
502
489
- func createConnectionMsg (pkiID common.PKIidType , sig []byte ) * proto.GossipMessage {
503
+ func createConnectionMsg (pkiID common.PKIidType , sig []byte , cert api. PeerIdentityType ) * proto.GossipMessage {
490
504
return & proto.GossipMessage {
491
505
Tag : proto .GossipMessage_EMPTY ,
492
506
Nonce : 0 ,
493
507
Content : & proto.GossipMessage_Conn {
494
508
Conn : & proto.ConnEstablish {
509
+ Cert : cert ,
495
510
PkiID : pkiID ,
496
511
Sig : sig ,
497
512
},
0 commit comments