Skip to content

Commit 9d12166

Browse files
committed
[FAB-3213] Gossip identity revocation support
When a peer gets a new config block and it contains CRLs, the gossip layer needs to be notified in order to close existing connections to peers that their certificates have been expired. If this is not done, then these peers are still forwarded data like peer membership and channel membership, because the connection is already open. This commit adds an ability to revoke identities by receiving a predicate function that: given an identity, it returns whether it is suspected of being revoked (i.e, the SN is found within some CRL of some MSP). Then- the gossip layer calls ValidateIdentity on the stored identity, and if it is found to be invalid- it: - deletes the identity from memory - closes an active connection to the peer, if such exists. Currently the implementation of that predicate is the naive/obvious one, that suspects all identities. In a future commit I'll (hopefully) add code that uses the CRLs themselves. Change-Id: I56d995a3720a736b1242b13a193f9a7933299345 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 077126e commit 9d12166

12 files changed

+234
-126
lines changed

core/peer/peer.go

+8
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/hyperledger/fabric/core/committer/txvalidator"
3636
"github.com/hyperledger/fabric/core/ledger"
3737
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
38+
"github.com/hyperledger/fabric/gossip/api"
3839
"github.com/hyperledger/fabric/gossip/service"
3940
"github.com/hyperledger/fabric/msp"
4041
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
@@ -187,6 +188,13 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
187188
Manager: cm,
188189
Application: configtxInitializer.ApplicationConfig(),
189190
})
191+
service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
192+
// TODO: this is a place-holder that would somehow make the MSP layer suspect
193+
// that a given certificate is revoked, or its intermediate CA is revoked.
194+
// In the meantime, before we have such an ability, we return true in order
195+
// to suspect ALL identities in order to validate all of them.
196+
return true
197+
})
190198
}
191199

192200
trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {

gossip/api/crypto.go

+4
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,7 @@ type MessageCryptoService interface {
5757

5858
// PeerIdentityType is the peer's certificate
5959
type PeerIdentityType []byte
60+
61+
// PeerSuspector returns whether a peer with a given identity is suspected
62+
// as being revoked, or its CA is revoked
63+
type PeerSuspector func(identity PeerIdentityType) bool

gossip/comm/comm.go

-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ type Comm interface {
5454

5555
// Stop stops the module
5656
Stop()
57-
58-
// BlackListPKIid prohibits the module communicating with the given PKIid
59-
BlackListPKIid(PKIid common.PKIidType)
6057
}
6158

6259
// RemotePeer defines a peer's endpoint and its PKIid

gossip/comm/comm_impl.go

+33-61
Original file line numberDiff line numberDiff line change
@@ -86,22 +86,21 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
8686
}
8787

8888
commInst := &commImpl{
89-
selfCertHash: certHash,
90-
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
91-
idMapper: idMapper,
92-
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
93-
peerIdentity: peerIdentity,
94-
opts: dialOpts,
95-
port: port,
96-
lsnr: ll,
97-
gSrv: s,
98-
msgPublisher: NewChannelDemultiplexer(),
99-
lock: &sync.RWMutex{},
100-
deadEndpoints: make(chan common.PKIidType, 100),
101-
stopping: int32(0),
102-
exitChan: make(chan struct{}, 1),
103-
subscriptions: make([]chan proto.ReceivedMessage, 0),
104-
blackListedPKIIDs: make([]common.PKIidType, 0),
89+
selfCertHash: certHash,
90+
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
91+
idMapper: idMapper,
92+
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
93+
peerIdentity: peerIdentity,
94+
opts: dialOpts,
95+
port: port,
96+
lsnr: ll,
97+
gSrv: s,
98+
msgPublisher: NewChannelDemultiplexer(),
99+
lock: &sync.RWMutex{},
100+
deadEndpoints: make(chan common.PKIidType, 100),
101+
stopping: int32(0),
102+
exitChan: make(chan struct{}, 1),
103+
subscriptions: make([]chan proto.ReceivedMessage, 0),
105104
}
106105
commInst.connStore = newConnStore(commInst, commInst.logger)
107106
commInst.idMapper.Put(idMapper.GetPKIidOfCert(peerIdentity), peerIdentity)
@@ -145,25 +144,24 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
145144
}
146145

147146
type commImpl struct {
148-
skipHandshake bool
149-
selfCertHash []byte
150-
peerIdentity api.PeerIdentityType
151-
idMapper identity.Mapper
152-
logger *logging.Logger
153-
opts []grpc.DialOption
154-
connStore *connectionStore
155-
PKIID []byte
156-
port int
157-
deadEndpoints chan common.PKIidType
158-
msgPublisher *ChannelDeMultiplexer
159-
lock *sync.RWMutex
160-
lsnr net.Listener
161-
gSrv *grpc.Server
162-
exitChan chan struct{}
163-
stopping int32
164-
stopWG sync.WaitGroup
165-
subscriptions []chan proto.ReceivedMessage
166-
blackListedPKIIDs []common.PKIidType
147+
skipHandshake bool
148+
selfCertHash []byte
149+
peerIdentity api.PeerIdentityType
150+
idMapper identity.Mapper
151+
logger *logging.Logger
152+
opts []grpc.DialOption
153+
connStore *connectionStore
154+
PKIID []byte
155+
port int
156+
deadEndpoints chan common.PKIidType
157+
msgPublisher *ChannelDeMultiplexer
158+
lock *sync.RWMutex
159+
lsnr net.Listener
160+
gSrv *grpc.Server
161+
exitChan chan struct{}
162+
stopping int32
163+
stopWG sync.WaitGroup
164+
subscriptions []chan proto.ReceivedMessage
167165
}
168166

169167
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
@@ -238,28 +236,6 @@ func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
238236
}
239237
}
240238

241-
func (c *commImpl) BlackListPKIid(PKIID common.PKIidType) {
242-
c.logger.Info("Entering", PKIID)
243-
defer c.logger.Info("Exiting")
244-
c.lock.Lock()
245-
defer c.lock.Unlock()
246-
c.connStore.closeByPKIid(PKIID)
247-
c.blackListedPKIIDs = append(c.blackListedPKIIDs, PKIID)
248-
}
249-
250-
func (c *commImpl) isPKIblackListed(p common.PKIidType) bool {
251-
c.lock.RLock()
252-
defer c.lock.RUnlock()
253-
for _, pki := range c.blackListedPKIIDs {
254-
if bytes.Equal(pki, p) {
255-
c.logger.Debug(p, ":", true)
256-
return true
257-
}
258-
}
259-
c.logger.Debug(p, ":", false)
260-
return false
261-
}
262-
263239
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
264240
if c.isStopping() {
265241
return
@@ -464,10 +440,6 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
464440
return nil, fmt.Errorf("%s didn't send a pkiID", remoteAddress)
465441
}
466442

467-
if c.isPKIblackListed(receivedMsg.PkiId) {
468-
c.logger.Warning("Connection attempt from", remoteAddress, "but it is black-listed")
469-
return nil, errors.New("Black-listed")
470-
}
471443
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
472444
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Cert)
473445
if err != nil {

gossip/comm/comm_test.go

+32-53
Original file line numberDiff line numberDiff line change
@@ -295,65 +295,44 @@ func TestGetConnectionInfo(t *testing.T) {
295295
}
296296
}
297297

298-
func TestBlackListPKIid(t *testing.T) {
298+
func TestCloseConn(t *testing.T) {
299299
t.Parallel()
300300
comm1, _ := newCommInstance(1611, naiveSec)
301-
comm2, _ := newCommInstance(1612, naiveSec)
302-
comm3, _ := newCommInstance(1613, naiveSec)
303-
comm4, _ := newCommInstance(1614, naiveSec)
304301
defer comm1.Stop()
305-
defer comm2.Stop()
306-
defer comm3.Stop()
307-
defer comm4.Stop()
302+
acceptChan := comm1.Accept(acceptAll)
308303

309-
reader := func(instance string, out chan uint64, in <-chan proto.ReceivedMessage) {
310-
for {
311-
msg := <-in
312-
if msg == nil {
313-
return
314-
}
315-
out <- msg.GetGossipMessage().Nonce
316-
}
304+
err := generateCertificates("key.pem", "cert.pem")
305+
defer os.Remove("cert.pem")
306+
defer os.Remove("key.pem")
307+
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
308+
tlsCfg := &tls.Config{
309+
InsecureSkipVerify: true,
310+
Certificates: []tls.Certificate{cert},
317311
}
312+
ta := credentials.NewTLS(tlsCfg)
318313

319-
out1 := make(chan uint64, 4)
320-
out2 := make(chan uint64, 4)
321-
out3 := make(chan uint64, 4)
322-
out4 := make(chan uint64, 4)
323-
324-
go reader("comm1", out1, comm1.Accept(acceptAll))
325-
go reader("comm2", out2, comm2.Accept(acceptAll))
326-
go reader("comm3", out3, comm3.Accept(acceptAll))
327-
go reader("comm4", out4, comm4.Accept(acceptAll))
328-
329-
// have comm1 BL comm3
330-
comm1.BlackListPKIid([]byte("localhost:1613"))
331-
332-
// make comm3 send to 1 and 2
333-
comm3.Send(createGossipMsg(), remotePeer(1612)) // out2++
334-
comm3.Send(createGossipMsg(), remotePeer(1611))
335-
336-
waitForMessages(t, out2, 1, "comm2 should have received 1 message")
337-
338-
// make comm1 and comm2 send to comm3
339-
comm1.Send(createGossipMsg(), remotePeer(1613))
340-
comm2.Send(createGossipMsg(), remotePeer(1613)) // out3++
341-
waitForMessages(t, out3, 1, "comm3 should have received 1 message")
342-
343-
// make comm1 and comm2 send to comm4 which is not blacklisted // out4 += 4
344-
comm1.Send(createGossipMsg(), remotePeer(1614))
345-
comm2.Send(createGossipMsg(), remotePeer(1614))
346-
comm1.Send(createGossipMsg(), remotePeer(1614))
347-
comm2.Send(createGossipMsg(), remotePeer(1614))
348-
349-
// blacklist comm3 by comm2
350-
comm2.BlackListPKIid([]byte("localhost:1613"))
351-
352-
// send from comm1 and comm2 to comm3 again
353-
comm1.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect
354-
comm2.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect
355-
356-
waitForMessages(t, out4, 4, "comm1 should have received 4 messages")
314+
conn, err := grpc.Dial("localhost:1611", grpc.WithTransportCredentials(&authCreds{tlsCreds: ta}), grpc.WithBlock(), grpc.WithTimeout(time.Second))
315+
assert.NoError(t, err, "%v", err)
316+
cl := proto.NewGossipClient(conn)
317+
stream, err := cl.GossipStream(context.Background())
318+
assert.NoError(t, err, "%v", err)
319+
c := &commImpl{}
320+
hash := certHashFromRawCert(tlsCfg.Certificates[0].Certificate[0])
321+
connMsg := c.createConnectionMsg(common.PKIidType("pkiID"), hash, api.PeerIdentityType("pkiID"), func(msg []byte) ([]byte, error) {
322+
mac := hmac.New(sha256.New, hmacKey)
323+
mac.Write(msg)
324+
return mac.Sum(nil), nil
325+
})
326+
assert.NoError(t, stream.Send(connMsg.Envelope))
327+
stream.Send(createGossipMsg().Envelope)
328+
select {
329+
case <-acceptChan:
330+
case <-time.After(time.Second):
331+
assert.Fail(t, "Didn't receive a message within a timely period")
332+
}
333+
comm1.CloseConn(&RemotePeer{PKIID: common.PKIidType("pkiID")})
334+
time.Sleep(time.Second * 10)
335+
assert.Error(t, stream.Send(createGossipMsg().Envelope), "Should have failed because connection is closed")
357336
}
358337

359338
func TestParallelSend(t *testing.T) {

gossip/gossip/certstore.go

+4
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ func (cs *certStore) createIdentityMessage() *proto.SignedGossipMessage {
143143
return sMsg
144144
}
145145

146+
func (cs *certStore) listRevokedPeers(isSuspected api.PeerSuspector) []common.PKIidType {
147+
return cs.idMapper.ListRevokedPeers(isSuspected)
148+
}
149+
146150
func (cs *certStore) stop() {
147151
cs.pull.Stop()
148152
}

gossip/gossip/gossip.go

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ type Gossip interface {
6161
// JoinChan makes the Gossip instance join a channel
6262
JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
6363

64+
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
65+
// any connections to peers with identities that are found invalid
66+
SuspectPeers(s api.PeerSuspector)
67+
6468
// Stop stops the gossip component
6569
Stop()
6670
}

gossip/gossip/gossip_impl.go

+8
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
177177
}
178178
}
179179

180+
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
181+
// any connections to peers with identities that are found invalid
182+
func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
183+
for _, pkiID := range g.certStore.listRevokedPeers(isSuspected) {
184+
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
185+
}
186+
}
187+
180188
func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
181189
for _, ap := range anchorPeers {
182190
if ap.Host == "" {

0 commit comments

Comments
 (0)