Skip to content

Commit 00a9bd7

Browse files
committed
[FAB-2424] Gossip: Extract connection authInfo in comm
In the previous chapter of FAB-2424(https://gerrit.hyperledger.org/r/#/c/6393/) we took care of channel-based access control by consulting the MSP whether a given peer is eligible for a specific channel. This works for blocks that are broadcasted, and also for blocks that are pulled from peers in the gossip layer, but alas - the state transfer layer still stays bare and exposed to the mercy of malicious peers! This commit extends the protos/gossip/extensions.go:ReceivedMessage interface and replaces GetPKIID() with GetConnectionInfo() that returns: ID common.PKIidType Auth *AuthInfo: SignedData []byte Signature []byte Identity api.PeerIdentityType Using this, in the next commit I'll be able to modify the state transfer module by having the predicate it passes to the gossip layer when listening for messages from remote peers to also call the method provided by the MSP: VerifyByChannel() and in this way- to verify that the remote peer should indeed receive blocks or not. Change-Id: I9e2e6f4da430ed062a6fa12bebdfab4add6c4843 Signed-off-by: Yacov Manevich <[email protected]>
1 parent b36a664 commit 00a9bd7

11 files changed

+90
-44
lines changed

gossip/comm/comm_impl.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
165165
var cc *grpc.ClientConn
166166
var stream proto.Gossip_GossipStreamClient
167167
var pkiID common.PKIidType
168+
var connInfo *proto.ConnectionInfo
168169

169170
c.logger.Debug("Entering", endpoint, expectedPKIID)
170171
defer c.logger.Debug("Exiting")
@@ -185,15 +186,17 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
185186
}
186187

187188
if stream, err = cl.GossipStream(context.Background()); err == nil {
188-
pkiID, err = c.authenticateRemotePeer(stream)
189+
connInfo, err = c.authenticateRemotePeer(stream)
189190
if err == nil {
191+
pkiID = connInfo.ID
190192
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
191193
// PKIID is nil when we don't know the remote PKI id's
192194
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
193195
return nil, errors.New("Authentication failure")
194196
}
195197
conn := newConnection(cl, cc, stream, nil)
196198
conn.pkiID = pkiID
199+
conn.info = connInfo
197200
conn.logger = c.logger
198201

199202
h := func(m *proto.SignedGossipMessage) {
@@ -202,6 +205,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
202205
conn: conn,
203206
lock: conn,
204207
SignedGossipMessage: m,
208+
connInfo: connInfo,
205209
})
206210
}
207211
conn.handler = h
@@ -384,7 +388,7 @@ func extractRemoteAddress(stream stream) string {
384388
return remoteAddress
385389
}
386390

387-
func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, error) {
391+
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) {
388392
ctx := stream.Context()
389393
remoteAddress := extractRemoteAddress(stream)
390394
remoteCertHash := extractCertificateHashFromContext(ctx)
@@ -437,6 +441,11 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
437441
return nil, err
438442
}
439443

444+
connInfo := &proto.ConnectionInfo{
445+
ID: receivedMsg.PkiID,
446+
Identity: receivedMsg.Cert,
447+
}
448+
440449
// if TLS is detected, verify remote peer
441450
if remoteCertHash != nil && c.selfCertHash != nil {
442451
if !bytes.Equal(remoteCertHash, receivedMsg.Hash) {
@@ -451,24 +460,29 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
451460
c.logger.Error("Failed verifying signature from", remoteAddress, ":", err)
452461
return nil, err
453462
}
463+
connInfo.Auth = &proto.AuthInfo{
464+
Signature: m.Signature,
465+
SignedData: m.Payload,
466+
}
454467
}
455468

456469
c.logger.Debug("Authenticated", remoteAddress)
457-
return receivedMsg.PkiID, nil
470+
471+
return connInfo, nil
458472
}
459473

460474
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
461475
if c.isStopping() {
462476
return errors.New("Shutting down")
463477
}
464-
PKIID, err := c.authenticateRemotePeer(stream)
478+
connInfo, err := c.authenticateRemotePeer(stream)
465479
if err != nil {
466480
c.logger.Error("Authentication failed")
467481
return err
468482
}
469483
c.logger.Debug("Servicing", extractRemoteAddress(stream))
470484

471-
conn := c.connStore.onConnected(stream, PKIID)
485+
conn := c.connStore.onConnected(stream, connInfo)
472486

473487
// if connStore denied the connection, it means we already have a connection to that peer
474488
// so close this stream
@@ -481,14 +495,15 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
481495
conn: conn,
482496
lock: conn,
483497
SignedGossipMessage: m,
498+
connInfo: connInfo,
484499
})
485500
}
486501

487502
conn.handler = h
488503

489504
defer func() {
490505
c.logger.Debug("Client", extractRemoteAddress(stream), " disconnected")
491-
c.connStore.closeByPKIid(PKIID)
506+
c.connStore.closeByPKIid(connInfo.ID)
492507
conn.close()
493508
}()
494509

gossip/comm/comm_test.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"testing"
2828
"time"
2929

30+
"github.com/hyperledger/fabric/bccsp/factory"
3031
"github.com/hyperledger/fabric/gossip/api"
3132
"github.com/hyperledger/fabric/gossip/common"
3233
"github.com/hyperledger/fabric/gossip/identity"
@@ -41,6 +42,7 @@ import (
4142

4243
func init() {
4344
rand.Seed(42)
45+
factory.InitFactories(nil)
4446
}
4547

4648
func acceptAll(msg interface{}) bool {
@@ -178,7 +180,13 @@ func TestHandshake(t *testing.T) {
178180
acceptChan := handshaker("localhost:9610", comm, t, nil, nil)
179181
time.Sleep(2 * time.Second)
180182
assert.Equal(t, 1, len(acceptChan))
181-
183+
msg := <-acceptChan
184+
expectedPKIID := common.PKIidType("localhost:9610")
185+
assert.Equal(t, expectedPKIID, msg.GetConnectionInfo().ID)
186+
assert.Equal(t, api.PeerIdentityType("localhost:9610"), msg.GetConnectionInfo().Identity)
187+
assert.NotNil(t, msg.GetConnectionInfo().Auth)
188+
assert.True(t, msg.GetConnectionInfo().IsAuthenticated())
189+
assert.Equal(t, msg.GetConnectionInfo().Auth.Signature, msg.GetConnectionInfo().Auth.SignedData)
182190
// negative path, nothing should be read from the channel because the signature is wrong
183191
mutateSig := func(b []byte) []byte {
184192
if b[0] == 0 {
@@ -222,7 +230,7 @@ func TestBasic(t *testing.T) {
222230
waitForMessages(t, out, 2, "Didn't receive 2 messages")
223231
}
224232

225-
func TestGetPKIID(t *testing.T) {
233+
func TestGetConnectionInfo(t *testing.T) {
226234
t.Parallel()
227235
comm1, _ := newCommInstance(6000, naiveSec)
228236
comm2, _ := newCommInstance(7000, naiveSec)
@@ -234,7 +242,7 @@ func TestGetPKIID(t *testing.T) {
234242
case <-time.After(time.Second * 10):
235243
t.Fatal("Didn't receive a message in time")
236244
case msg := <-m1:
237-
assert.Equal(t, comm2.GetPKIid(), msg.GetPKIID())
245+
assert.Equal(t, comm2.GetPKIid(), msg.GetConnectionInfo().ID)
238246
}
239247
}
240248

gossip/comm/conn.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -154,22 +154,23 @@ func (cs *connectionStore) shutdown() {
154154
wg.Wait()
155155
}
156156

157-
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID common.PKIidType) *connection {
157+
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection {
158158
cs.Lock()
159159
defer cs.Unlock()
160160

161-
if c, exists := cs.pki2Conn[string(pkiID)]; exists {
161+
if c, exists := cs.pki2Conn[string(connInfo.Identity)]; exists {
162162
c.close()
163163
}
164164

165-
return cs.registerConn(pkiID, serverStream)
165+
return cs.registerConn(connInfo, serverStream)
166166
}
167167

168-
func (cs *connectionStore) registerConn(pkiID common.PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection {
168+
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection {
169169
conn := newConnection(nil, nil, nil, serverStream)
170-
conn.pkiID = pkiID
170+
conn.pkiID = connInfo.ID
171+
conn.info = connInfo
171172
conn.logger = cs.logger
172-
cs.pki2Conn[string(pkiID)] = conn
173+
cs.pki2Conn[string(connInfo.ID)] = conn
173174
return conn
174175
}
175176

@@ -197,6 +198,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
197198
}
198199

199200
type connection struct {
201+
info *proto.ConnectionInfo
200202
outBuff chan *msgSending
201203
logger *logging.Logger // logger
202204
pkiID common.PKIidType // pkiID of the remote endpoint

gossip/comm/mock/mock_comm.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ func (packet *packetMock) GetGossipMessage() *proto.SignedGossipMessage {
102102
return packet.msg.(*proto.SignedGossipMessage)
103103
}
104104

105-
// GetPKIID returns the PKI-ID of the remote peer
105+
// GetConnectionInfo returns information about the remote peer
106106
// that sent the message
107-
func (packet *packetMock) GetPKIID() common.PKIidType {
107+
func (packet *packetMock) GetConnectionInfo() *proto.ConnectionInfo {
108108
return nil
109109
}
110110

gossip/comm/msg.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ package comm
1919
import (
2020
"sync"
2121

22-
"github.com/hyperledger/fabric/gossip/common"
2322
proto "github.com/hyperledger/fabric/protos/gossip"
2423
)
2524

2625
// ReceivedMessageImpl is an implementation of ReceivedMessage
2726
type ReceivedMessageImpl struct {
2827
*proto.SignedGossipMessage
29-
lock sync.Locker
30-
conn *connection
28+
lock sync.Locker
29+
conn *connection
30+
connInfo *proto.ConnectionInfo
3131
}
3232

3333
// GetSourceEnvelope Returns the Envelope the ReceivedMessage was
@@ -46,8 +46,8 @@ func (m *ReceivedMessageImpl) GetGossipMessage() *proto.SignedGossipMessage {
4646
return m.SignedGossipMessage
4747
}
4848

49-
// GetPKIID returns the PKI-ID of the remote peer
50-
// that sent the message
51-
func (m *ReceivedMessageImpl) GetPKIID() common.PKIidType {
52-
return m.conn.pkiID
49+
// GetConnectionInfo returns information about the remote peer
50+
// that send the message
51+
func (m *ReceivedMessageImpl) GetConnectionInfo() *proto.ConnectionInfo {
52+
return m.connInfo
5353
}

gossip/gossip/certstore_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/hyperledger/fabric/gossip/api"
2525
"github.com/hyperledger/fabric/gossip/comm"
26-
"github.com/hyperledger/fabric/gossip/common"
2726
"github.com/hyperledger/fabric/gossip/discovery"
2827
"github.com/hyperledger/fabric/gossip/gossip/algo"
2928
"github.com/hyperledger/fabric/gossip/gossip/pull"
@@ -64,7 +63,7 @@ func (s *sentMsg) GetGossipMessage() *proto.SignedGossipMessage {
6463
return s.msg
6564
}
6665

67-
func (s *sentMsg) GetPKIID() common.PKIidType {
66+
func (s *sentMsg) GetConnectionInfo() *proto.ConnectionInfo {
6867
return nil
6968
}
7069

gossip/gossip/channel/channel.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -359,13 +359,13 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
359359
gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it")
360360
return
361361
}
362-
orgID := gc.GetOrgOfPeer(msg.GetPKIID())
362+
orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
363363
if orgID == nil {
364-
gc.logger.Warning("Couldn't find org identity of peer", msg.GetPKIID())
364+
gc.logger.Warning("Couldn't find org identity of peer", msg.GetConnectionInfo().ID)
365365
return
366366
}
367367
if !gc.IsOrgInChannel(orgID) {
368-
gc.logger.Warning("Point to point message came from", msg.GetPKIID(), "but it's not eligible for the channel", msg.GetGossipMessage().Channel)
368+
gc.logger.Warning("Point to point message came from", msg.GetConnectionInfo().ID, "but it's not eligible for the channel", msg.GetGossipMessage().Channel)
369369
return
370370
}
371371

@@ -375,7 +375,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
375375
}
376376

377377
if m.IsStateInfoSnapshot() {
378-
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetPKIID())
378+
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
379379
return
380380
}
381381

@@ -384,10 +384,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
384384

385385
if m.IsDataMsg() {
386386
if m.GetDataMsg().Payload == nil {
387-
gc.logger.Warning("Payload is empty, got it from", msg.GetPKIID())
387+
gc.logger.Warning("Payload is empty, got it from", msg.GetConnectionInfo().ID)
388388
return
389389
}
390-
if !gc.verifyBlock(m.GossipMessage, msg.GetPKIID()) {
390+
if !gc.verifyBlock(m.GossipMessage, msg.GetConnectionInfo().ID) {
391391
gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
392392
return
393393
}
@@ -410,8 +410,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
410410
return
411411
}
412412
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage {
413-
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetPKIID()}) {
414-
gc.logger.Warning(msg.GetPKIID(), "isn't eligible for channel", gc.chainID)
413+
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
414+
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for channel", gc.chainID)
415415
return
416416
}
417417
if m.IsDataUpdate() {
@@ -425,7 +425,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
425425
gc.logger.Warning("DataUpdate message contains item with channel", gMsg.Channel, "but should be", gc.chainID)
426426
return
427427
}
428-
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetPKIID()) {
428+
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
429429
return
430430
}
431431
}
@@ -525,7 +525,7 @@ func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
525525
return false
526526
}
527527

528-
if msg.GetPKIID() == nil {
528+
if msg.GetConnectionInfo().ID == nil {
529529
gc.logger.Warning("Message has nil PKI-ID")
530530
return false
531531
}

gossip/gossip/channel/channel_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,10 @@ func (m *receivedMsg) Respond(msg *proto.GossipMessage) {
144144
m.Called(msg)
145145
}
146146

147-
func (m *receivedMsg) GetPKIID() common.PKIidType {
148-
return m.PKIID
147+
func (m *receivedMsg) GetConnectionInfo() *proto.ConnectionInfo {
148+
return &proto.ConnectionInfo{
149+
ID: m.PKIID,
150+
}
149151
}
150152

151153
type gossipAdapterMock struct {

gossip/gossip/gossip_impl.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
270270

271271
msg := m.GetGossipMessage()
272272

273-
g.logger.Debug("Entering,", m.GetPKIID(), "sent us", msg)
273+
g.logger.Debug("Entering,", m.GetConnectionInfo().ID, "sent us", msg)
274274
defer g.logger.Debug("Exiting")
275275

276276
if !g.validateMsg(m) {
@@ -302,7 +302,7 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
302302
if msg.IsChannelRestricted() {
303303
if gc := g.chanState.getGossipChannelByChainID(msg.Channel); gc == nil {
304304
// If we're not in the channel but we should forward to peers of our org
305-
if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetPKIID()}) && msg.IsStateInfoMsg() {
305+
if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
306306
if g.stateInfoMsgStore.Add(msg) {
307307
g.emitter.Add(msg)
308308
}

gossip/gossip/pull/pullstore_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"time"
2424

2525
"github.com/hyperledger/fabric/gossip/comm"
26-
"github.com/hyperledger/fabric/gossip/common"
2726
"github.com/hyperledger/fabric/gossip/discovery"
2827
"github.com/hyperledger/fabric/gossip/gossip/algo"
2928
"github.com/hyperledger/fabric/gossip/util"
@@ -63,7 +62,7 @@ func (pm *pullMsg) GetGossipMessage() *proto.SignedGossipMessage {
6362
return pm.msg
6463
}
6564

66-
func (pm *pullMsg) GetPKIID() common.PKIidType {
65+
func (pm *pullMsg) GetConnectionInfo() *proto.ConnectionInfo {
6766
return nil
6867
}
6968

0 commit comments

Comments
 (0)