Skip to content

Commit f0acc68

Browse files
committed
[FAB-3105] Gossip needs to use comm package CA support
The current gossip implementation is initialized with a set of dial options which don't change during the lifecycle of the peer. In order to resolve this, the gossip comm implementation has been modified to accept a function which returns the appropriate dial options for secure communincations with other peers. When used in the peer, the function passed in leverages the core/comm packages CASupport. Change-Id: I99cc4204e27c67f93f8f1ba91891fdb97081c99e Signed-off-by: Gari Singh <[email protected]>
1 parent 90e36d1 commit f0acc68

16 files changed

+187
-92
lines changed

core/comm/connection.go

+26
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,32 @@ func (cas *CASupport) GetDeliverServiceCredentials() credentials.TransportCreden
111111
return creds
112112
}
113113

114+
// GetPeerCredentials returns GRPC transport credentials for use by GRPC
115+
// clients which communicate with remote peer endpoints.
116+
func (cas *CASupport) GetPeerCredentials() credentials.TransportCredentials {
117+
var creds credentials.TransportCredentials
118+
var tlsConfig = &tls.Config{}
119+
var certPool = x509.NewCertPool()
120+
// loop through the orderer CAs
121+
roots, _ := cas.GetServerRootCAs()
122+
for _, root := range roots {
123+
block, _ := pem.Decode(root)
124+
if block != nil {
125+
cert, err := x509.ParseCertificate(block.Bytes)
126+
if err == nil {
127+
certPool.AddCert(cert)
128+
} else {
129+
commLogger.Warningf("Failed to add root cert to credentials (%s)", err)
130+
}
131+
} else {
132+
commLogger.Warning("Failed to add root cert to credentials")
133+
}
134+
}
135+
tlsConfig.RootCAs = certPool
136+
creds = credentials.NewTLS(tlsConfig)
137+
return creds
138+
}
139+
114140
// GetClientRootCAs returns the PEM-encoded root certificates for all of the
115141
// application and orderer organizations defined for all chains. The root
116142
// certificates returned should be used to set the trusted client roots for

core/comm/connection_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ func TestCASupport(t *testing.T) {
139139
assert.Exactly(t, casClone, cas, "Expected GetCASupport to be a singleton")
140140

141141
creds := cas.GetDeliverServiceCredentials()
142+
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
143+
"Expected Security version to be 1.2")
144+
creds = cas.GetPeerCredentials()
142145
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
143146
"Expected Security version to be 1.2")
144147

@@ -148,5 +151,8 @@ func TestCASupport(t *testing.T) {
148151
creds = cas.GetDeliverServiceCredentials()
149152
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
150153
"Expected Security version to be 1.2")
154+
creds = cas.GetPeerCredentials()
155+
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
156+
"Expected Security version to be 1.2")
151157

152158
}

core/peer/peer_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,15 @@ func TestCreateChainFromBlock(t *testing.T) {
9797
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
9898
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
9999
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
100+
var defaultSecureDialOpts = func() []grpc.DialOption {
101+
var dialOpts []grpc.DialOption
102+
dialOpts = append(dialOpts, grpc.WithInsecure())
103+
return dialOpts
104+
}
100105
service.InitGossipServiceCustomDeliveryFactory(
101106
identity, "localhost:13611", grpcServer,
102107
&mockDeliveryClientFactory{},
103-
messageCryptoService, secAdv)
108+
messageCryptoService, secAdv, defaultSecureDialOpts)
104109

105110
err = CreateChainFromBlock(block)
106111
if err != nil {

core/scc/cscc/configure_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
167167
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
168168
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
169169
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
170-
service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv)
170+
service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)
171171

172172
// Successful path for JoinChain
173173
blockBytes := mockConfigBlock()

gossip/api/crypto.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ limitations under the License.
1616

1717
package api
1818

19-
import "github.com/hyperledger/fabric/gossip/common"
19+
import (
20+
"github.com/hyperledger/fabric/gossip/common"
21+
"google.golang.org/grpc"
22+
)
2023

2124
// MessageCryptoService is the contract between the gossip component and the
2225
// peer's cryptographic layer and is used by the gossip component to verify,
@@ -61,3 +64,7 @@ type PeerIdentityType []byte
6164
// PeerSuspector returns whether a peer with a given identity is suspected
6265
// as being revoked, or its CA is revoked
6366
type PeerSuspector func(identity PeerIdentityType) bool
67+
68+
// PeerSecureDialOpts returns the gRPC DialOptions to use for connection level
69+
// security when communicating with remote peer endpoints
70+
type PeerSecureDialOpts func() []grpc.DialOption

gossip/comm/comm_impl.go

+69-48
Original file line numberDiff line numberDiff line change
@@ -65,37 +65,38 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
6565
}
6666

6767
// NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server
68-
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) (Comm, error) {
68+
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
69+
secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {
70+
6971
var ll net.Listener
7072
var s *grpc.Server
71-
var secOpt grpc.DialOption
7273
var certHash []byte
7374

7475
if len(dialOpts) == 0 {
7576
dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
7677
}
7778

7879
if port > 0 {
79-
s, ll, secOpt, certHash = createGRPCLayer(port)
80-
dialOpts = append(dialOpts, secOpt)
80+
s, ll, secureDialOpts, certHash = createGRPCLayer(port)
8181
}
8282

8383
commInst := &commImpl{
84-
selfCertHash: certHash,
85-
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
86-
idMapper: idMapper,
87-
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
88-
peerIdentity: peerIdentity,
89-
opts: dialOpts,
90-
port: port,
91-
lsnr: ll,
92-
gSrv: s,
93-
msgPublisher: NewChannelDemultiplexer(),
94-
lock: &sync.RWMutex{},
95-
deadEndpoints: make(chan common.PKIidType, 100),
96-
stopping: int32(0),
97-
exitChan: make(chan struct{}, 1),
98-
subscriptions: make([]chan proto.ReceivedMessage, 0),
84+
selfCertHash: certHash,
85+
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
86+
idMapper: idMapper,
87+
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
88+
peerIdentity: peerIdentity,
89+
opts: dialOpts,
90+
secureDialOpts: secureDialOpts,
91+
port: port,
92+
lsnr: ll,
93+
gSrv: s,
94+
msgPublisher: NewChannelDemultiplexer(),
95+
lock: &sync.RWMutex{},
96+
deadEndpoints: make(chan common.PKIidType, 100),
97+
stopping: int32(0),
98+
exitChan: make(chan struct{}, 1),
99+
subscriptions: make([]chan proto.ReceivedMessage, 0),
99100
}
100101
commInst.connStore = newConnStore(commInst, commInst.logger)
101102
commInst.idMapper.Put(idMapper.GetPKIidOfCert(peerIdentity), peerIdentity)
@@ -117,9 +118,12 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
117118
}
118119

119120
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
120-
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, peerIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) (Comm, error) {
121+
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
122+
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
123+
dialOpts ...grpc.DialOption) (Comm, error) {
124+
121125
dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
122-
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, dialOpts...)
126+
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
123127
if err != nil {
124128
return nil, err
125129
}
@@ -139,24 +143,25 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
139143
}
140144

141145
type commImpl struct {
142-
skipHandshake bool
143-
selfCertHash []byte
144-
peerIdentity api.PeerIdentityType
145-
idMapper identity.Mapper
146-
logger *logging.Logger
147-
opts []grpc.DialOption
148-
connStore *connectionStore
149-
PKIID []byte
150-
port int
151-
deadEndpoints chan common.PKIidType
152-
msgPublisher *ChannelDeMultiplexer
153-
lock *sync.RWMutex
154-
lsnr net.Listener
155-
gSrv *grpc.Server
156-
exitChan chan struct{}
157-
stopping int32
158-
stopWG sync.WaitGroup
159-
subscriptions []chan proto.ReceivedMessage
146+
skipHandshake bool
147+
selfCertHash []byte
148+
peerIdentity api.PeerIdentityType
149+
idMapper identity.Mapper
150+
logger *logging.Logger
151+
opts []grpc.DialOption
152+
secureDialOpts func() []grpc.DialOption
153+
connStore *connectionStore
154+
PKIID []byte
155+
port int
156+
deadEndpoints chan common.PKIidType
157+
msgPublisher *ChannelDeMultiplexer
158+
lock *sync.RWMutex
159+
lsnr net.Listener
160+
gSrv *grpc.Server
161+
exitChan chan struct{}
162+
stopping int32
163+
stopWG sync.WaitGroup
164+
subscriptions []chan proto.ReceivedMessage
160165
}
161166

162167
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
@@ -165,14 +170,18 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
165170
var stream proto.Gossip_GossipStreamClient
166171
var pkiID common.PKIidType
167172
var connInfo *proto.ConnectionInfo
173+
var dialOpts []grpc.DialOption
168174

169175
c.logger.Debug("Entering", endpoint, expectedPKIID)
170176
defer c.logger.Debug("Exiting")
171177

172178
if c.isStopping() {
173179
return nil, errors.New("Stopping")
174180
}
175-
cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
181+
dialOpts = append(dialOpts, c.secureDialOpts()...)
182+
dialOpts = append(dialOpts, grpc.WithBlock())
183+
dialOpts = append(dialOpts, c.opts...)
184+
cc, err = grpc.Dial(endpoint, dialOpts...)
176185
if err != nil {
177186
return nil, err
178187
}
@@ -257,13 +266,18 @@ func (c *commImpl) isStopping() bool {
257266
}
258267

259268
func (c *commImpl) Probe(remotePeer *RemotePeer) error {
269+
var dialOpts []grpc.DialOption
260270
endpoint := remotePeer.Endpoint
261271
pkiID := remotePeer.PKIID
262272
if c.isStopping() {
263273
return errors.New("Stopping")
264274
}
265275
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
266-
cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...)
276+
dialOpts = append(dialOpts, c.secureDialOpts()...)
277+
dialOpts = append(dialOpts, grpc.WithBlock())
278+
dialOpts = append(dialOpts, c.opts...)
279+
280+
cc, err := grpc.Dial(remotePeer.Endpoint, dialOpts...)
267281
if err != nil {
268282
c.logger.Debug("Returning", err)
269283
return err
@@ -276,7 +290,12 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
276290
}
277291

278292
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) {
279-
cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...)
293+
var dialOpts []grpc.DialOption
294+
dialOpts = append(dialOpts, c.secureDialOpts()...)
295+
dialOpts = append(dialOpts, grpc.WithBlock())
296+
dialOpts = append(dialOpts, c.opts...)
297+
298+
cc, err := grpc.Dial(remotePeer.Endpoint, dialOpts...)
280299
if err != nil {
281300
return nil, err
282301
}
@@ -590,13 +609,13 @@ type stream interface {
590609
grpc.Stream
591610
}
592611

593-
func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []byte) {
612+
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte) {
594613
var returnedCertHash []byte
595614
var s *grpc.Server
596615
var ll net.Listener
597616
var err error
598617
var serverOpts []grpc.ServerOption
599-
var dialOpts grpc.DialOption
618+
var dialOpts []grpc.DialOption
600619

601620
keyFileName := fmt.Sprintf("key.%d.pem", util.RandomUInt64())
602621
certFileName := fmt.Sprintf("cert.%d.pem", util.RandomUInt64())
@@ -627,17 +646,19 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []b
627646
Certificates: []tls.Certificate{cert},
628647
InsecureSkipVerify: true,
629648
})
630-
dialOpts = grpc.WithTransportCredentials(&authCreds{tlsCreds: ta})
649+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(&authCreds{tlsCreds: ta}))
631650
} else {
632-
dialOpts = grpc.WithInsecure()
651+
dialOpts = append(dialOpts, grpc.WithInsecure())
633652
}
634653

635654
listenAddress := fmt.Sprintf("%s:%d", "", port)
636655
ll, err = net.Listen("tcp", listenAddress)
637656
if err != nil {
638657
panic(err)
639658
}
640-
659+
secureDialOpts := func() []grpc.DialOption {
660+
return dialOpts
661+
}
641662
s = grpc.NewServer(serverOpts...)
642-
return s, ll, dialOpts, returnedCertHash
663+
return s, ll, secureDialOpts, returnedCertHash
643664
}

gossip/comm/comm_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (*naiveSecProvider) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityTyp
105105

106106
func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
107107
endpoint := fmt.Sprintf("localhost:%d", port)
108-
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec), []byte(endpoint))
108+
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec), []byte(endpoint), nil)
109109
return inst, err
110110
}
111111

gossip/gossip/gossip_impl.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,18 @@ type gossipServiceImpl struct {
7979
}
8080

8181
// NewGossipService creates a gossip instance attached to a gRPC server
82-
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) Gossip {
82+
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
83+
mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,
84+
secureDialOpts api.PeerSecureDialOpts) Gossip {
85+
8386
var c comm.Comm
8487
var err error
8588

8689
lgr := util.GetLogger(util.LoggingGossipModule, conf.ID)
8790
if s == nil {
88-
c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity)
91+
c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity, secureDialOpts)
8992
} else {
90-
c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, dialOpts...)
93+
c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, secureDialOpts)
9194
}
9295

9396
if err != nil {
@@ -159,17 +162,20 @@ func newChannelState(g *gossipServiceImpl) *channelState {
159162
}
160163
}
161164

162-
func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, identity api.PeerIdentityType, dialOpts ...grpc.DialOption) (comm.Comm, error) {
163-
return comm.NewCommInstance(s, cert, idStore, identity, dialOpts...)
165+
func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
166+
identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
167+
return comm.NewCommInstance(s, cert, idStore, identity, secureDialOpts)
164168
}
165169

166170
// NewGossipServiceWithServer creates a new gossip instance with a gRPC server
167-
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, mapper identity.Mapper, identity api.PeerIdentityType) Gossip {
168-
return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity)
171+
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService,
172+
mapper identity.Mapper, identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) Gossip {
173+
return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity, secureDialOpts)
169174
}
170175

171-
func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType) (comm.Comm, error) {
172-
return comm.NewCommInstanceWithServer(port, idStore, identity)
176+
func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType,
177+
secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
178+
return comm.NewCommInstanceWithServer(port, idStore, identity, secureDialOpts)
173179
}
174180

175181
func (g *gossipServiceImpl) toDie() bool {

gossip/gossip/gossip_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs
219219
}
220220

221221
idMapper := identity.NewIdentityMapper(mcs)
222-
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
222+
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper,
223+
api.PeerIdentityType(conf.InternalEndpoint), nil)
223224

224225
return g
225226
}
@@ -251,7 +252,8 @@ func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot
251252
cryptoService := &naiveCryptoService{}
252253
idMapper := identity.NewIdentityMapper(cryptoService)
253254

254-
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
255+
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper,
256+
api.PeerIdentityType(conf.InternalEndpoint), nil)
255257
return g
256258
}
257259

gossip/gossip/orgs_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ func newGossipInstanceWithExternalEndpoint(portPrefix int, id int, mcs *configur
121121
}
122122

123123
idMapper := identity.NewIdentityMapper(mcs)
124-
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
124+
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint),
125+
nil)
125126

126127
return g
127128
}

0 commit comments

Comments
 (0)