Skip to content

Commit fa7488c

Browse files
committed
[FAB-2424] Gossip state transfer: channel validation
In this commit, we enforce that only peers that are eligible of receiving blocks for a specific channel are allowed to do so, by filtering out peers that do not conform to the policy set by the MSP. The policy is checked by invoking VerifyByChannel on the certificate, message and signature that are obtained from the connection information - leveraging the fact that the state transfer of gossip is point-to-point and peers have to authenticate before sending subsequent messages. Change-Id: Ibdad5cd838489abdd1e97bfb0663f6946020f7fb Signed-off-by: Yacov Manevich <[email protected]>
1 parent 00a9bd7 commit fa7488c

File tree

3 files changed

+172
-38
lines changed

3 files changed

+172
-38
lines changed

gossip/service/gossip_service.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ type gossipServiceImpl struct {
7777
deliveryService deliverclient.DeliverService
7878
deliveryFactory DeliveryServiceFactory
7979
lock sync.RWMutex
80-
msgCrypto identity.Mapper
80+
idMapper identity.Mapper
81+
mcs api.MessageCryptoService
8182
peerIdentity []byte
8283
secAdv api.SecurityAdvisor
8384
}
@@ -132,10 +133,11 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string
132133

133134
gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, mcs, idMapper, dialOpts, bootPeers...)
134135
gossipServiceInstance = &gossipServiceImpl{
136+
mcs: mcs,
135137
gossipSvc: gossip,
136138
chains: make(map[string]state.GossipStateProvider),
137139
deliveryFactory: factory,
138-
msgCrypto: idMapper,
140+
idMapper: idMapper,
139141
peerIdentity: peerIdentity,
140142
secAdv: secAdv,
141143
}
@@ -158,7 +160,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
158160
defer g.lock.Unlock()
159161
// Initialize new state provider for given committer
160162
logger.Debug("Creating state provider for chainID", chainID)
161-
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer)
163+
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
162164
if g.deliveryService == nil {
163165
var err error
164166
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance)

gossip/state/state.go

+29-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
pb "github.com/golang/protobuf/proto"
2727
"github.com/hyperledger/fabric/core/committer"
28+
"github.com/hyperledger/fabric/gossip/api"
2829
"github.com/hyperledger/fabric/gossip/comm"
2930
common2 "github.com/hyperledger/fabric/gossip/common"
3031
"github.com/hyperledger/fabric/gossip/gossip"
@@ -47,10 +48,6 @@ type GossipStateProvider interface {
4748
Stop()
4849
}
4950

50-
var remoteStateMsgFilter = func(message interface{}) bool {
51-
return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
52-
}
53-
5451
const (
5552
defPollingPeriod = 200 * time.Millisecond
5653
defAntiEntropyInterval = 10 * time.Second
@@ -60,6 +57,9 @@ const (
6057
// the struct to handle in memory sliding window of
6158
// new ledger block to be acquired by hyper ledger
6259
type GossipStateProviderImpl struct {
60+
// MessageCryptoService
61+
mcs api.MessageCryptoService
62+
6363
// Chain id
6464
chainID string
6565

@@ -87,7 +87,7 @@ type GossipStateProviderImpl struct {
8787
}
8888

8989
// NewGossipStateProvider creates initialized instance of gossip state provider
90-
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider {
90+
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
9191
logger := util.GetLogger(util.LoggingStateModule, "")
9292

9393
gossipChan, _ := g.Accept(func(message interface{}) bool {
@@ -96,6 +96,26 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
9696
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
9797
}, false)
9898

99+
remoteStateMsgFilter := func(message interface{}) bool {
100+
receivedMsg := message.(proto.ReceivedMessage)
101+
msg := receivedMsg.GetGossipMessage()
102+
if !msg.IsRemoteStateMessage() {
103+
return false
104+
}
105+
// If we're not running with authentication, no point
106+
// in enforcing access control
107+
if !receivedMsg.GetConnectionInfo().IsAuthenticated() {
108+
return true
109+
}
110+
connInfo := receivedMsg.GetConnectionInfo()
111+
authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
112+
if authErr != nil {
113+
logger.Warning("Got unauthorized state transfer request from", string(connInfo.Identity))
114+
return false
115+
}
116+
return true
117+
}
118+
99119
// Filter message which are only relevant for state transfer
100120
_, commChan := g.Accept(remoteStateMsgFilter, true)
101121

@@ -109,6 +129,10 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
109129
}
110130

111131
s := &GossipStateProviderImpl{
132+
// MessageCryptoService
133+
mcs: mcs,
134+
135+
// Chain ID
112136
chainID: chainID,
113137

114138
// Instance of the gossip

gossip/state/state_test.go

+138-30
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"testing"
2525
"time"
2626

27+
"errors"
28+
2729
pb "github.com/golang/protobuf/proto"
2830
"github.com/hyperledger/fabric/common/configtx/test"
2931
"github.com/hyperledger/fabric/common/util"
@@ -49,6 +51,12 @@ var (
4951

5052
var orgID = []byte("ORG1")
5153

54+
type peerIdentityAcceptor func(identity api.PeerIdentityType) error
55+
56+
var noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error {
57+
return nil
58+
}
59+
5260
type joinChanMsg struct {
5361
}
5462

@@ -78,30 +86,33 @@ func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error {
7886
return nil
7987
}
8088

81-
type naiveCryptoService struct {
89+
type cryptoServiceMock struct {
90+
acceptor peerIdentityAcceptor
8291
}
8392

8493
// GetPKIidOfCert returns the PKI-ID of a peer's identity
85-
func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
94+
func (*cryptoServiceMock) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
8695
return common.PKIidType(peerIdentity)
8796
}
8897

8998
// VerifyBlock returns nil if the block is properly signed,
9099
// else returns error
91-
func (*naiveCryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
100+
func (*cryptoServiceMock) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
92101
return nil
93102
}
94103

95104
// Sign signs msg with this peer's signing key and outputs
96105
// the signature if no error occurred.
97-
func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) {
98-
return msg, nil
106+
func (*cryptoServiceMock) Sign(msg []byte) ([]byte, error) {
107+
clone := make([]byte, len(msg))
108+
copy(clone, msg)
109+
return clone, nil
99110
}
100111

101112
// Verify checks that signature is a valid signature of message under a peer's verification key.
102113
// If the verification succeeded, Verify returns nil meaning no error occurred.
103114
// If peerCert is nil, then the signature is verified against this peer's verification key.
104-
func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
115+
func (*cryptoServiceMock) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
105116
equal := bytes.Equal(signature, message)
106117
if !equal {
107118
return fmt.Errorf("Wrong signature:%v, %v", signature, message)
@@ -113,11 +124,11 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature,
113124
// under a peer's verification key, but also in the context of a specific channel.
114125
// If the verification succeeded, Verify returns nil meaning no error occurred.
115126
// If peerIdentity is nil, then the signature is verified against this peer's verification key.
116-
func (*naiveCryptoService) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
117-
return nil
127+
func (cs *cryptoServiceMock) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
128+
return cs.acceptor(peerIdentity)
118129
}
119130

120-
func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
131+
func (*cryptoServiceMock) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
121132
return nil
122133
}
123134

@@ -132,9 +143,10 @@ func bootPeers(ids ...int) []string {
132143
// Simple presentation of peer which includes only
133144
// communication module, gossip and state transfer
134145
type peerNode struct {
135-
g gossip.Gossip
136-
s GossipStateProvider
137-
146+
port int
147+
g gossip.Gossip
148+
s GossipStateProvider
149+
cs *cryptoServiceMock
138150
commit committer.Committer
139151
}
140152

@@ -145,13 +157,13 @@ func (node *peerNode) shutdown() {
145157
}
146158

147159
// Default configuration to be used for gossip and communication modules
148-
func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config {
160+
func newGossipConfig(id int, boot ...int) *gossip.Config {
149161
port := id + portPrefix
150162
return &gossip.Config{
151163
BindPort: port,
152164
BootstrapPeers: bootPeers(boot...),
153165
ID: fmt.Sprintf("p%d", id),
154-
MaxBlockCountToStore: maxMsgCount,
166+
MaxBlockCountToStore: 0,
155167
MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond,
156168
MaxPropagationBurstSize: 10,
157169
PropagateIterations: 1,
@@ -166,11 +178,10 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config {
166178
}
167179

168180
// Create gossip instance
169-
func newGossipInstance(config *gossip.Config) gossip.Gossip {
170-
cryptoService := &naiveCryptoService{}
171-
idMapper := identity.NewIdentityMapper(cryptoService)
181+
func newGossipInstance(config *gossip.Config, mcs api.MessageCryptoService) gossip.Gossip {
182+
idMapper := identity.NewIdentityMapper(mcs)
172183

173-
return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, cryptoService, idMapper, []byte(config.InternalEndpoint))
184+
return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, mcs, idMapper, []byte(config.InternalEndpoint))
174185
}
175186

176187
// Create new instance of KVLedger to be used for testing
@@ -182,24 +193,117 @@ func newCommitter(id int) committer.Committer {
182193
}
183194

184195
// Constructing pseudo peer node, simulating only gossip and state transfer part
185-
func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode {
186-
196+
func newPeerNode(config *gossip.Config, committer committer.Committer, acceptor peerIdentityAcceptor) *peerNode {
197+
cs := &cryptoServiceMock{acceptor: acceptor}
187198
// Gossip component based on configuration provided and communication module
188-
gossip := newGossipInstance(config)
199+
gossip := newGossipInstance(config, &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor})
189200

190201
logger.Debug("Joinning channel", util.GetTestChainID())
191202
gossip.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID()))
192203

193204
// Initialize pseudo peer simulator, which has only three
194205
// basic parts
195-
return &peerNode{
196-
g: gossip,
197-
s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer),
198206

207+
return &peerNode{
208+
port: config.BindPort,
209+
g: gossip,
210+
s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer, cs),
199211
commit: committer,
212+
cs: cs,
200213
}
201214
}
202215

216+
func TestAccessControl(t *testing.T) {
217+
viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node")
218+
ledgermgmt.InitializeTestEnv()
219+
defer ledgermgmt.CleanupTestEnv()
220+
221+
bootstrapSetSize := 5
222+
bootstrapSet := make([]*peerNode, 0)
223+
224+
authorizedPeers := map[string]struct{}{
225+
"localhost:5610": {},
226+
"localhost:5615": {},
227+
"localhost:5618": {},
228+
"localhost:5621": {},
229+
}
230+
231+
blockPullPolicy := func(identity api.PeerIdentityType) error {
232+
if _, isAuthorized := authorizedPeers[string(identity)]; isAuthorized {
233+
return nil
234+
}
235+
return errors.New("Not authorized")
236+
}
237+
238+
for i := 0; i < bootstrapSetSize; i++ {
239+
committer := newCommitter(i)
240+
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, blockPullPolicy))
241+
}
242+
243+
defer func() {
244+
for _, p := range bootstrapSet {
245+
p.shutdown()
246+
}
247+
}()
248+
249+
msgCount := 5
250+
251+
for i := 1; i <= msgCount; i++ {
252+
rawblock := pcomm.NewBlock(uint64(i), []byte{})
253+
if bytes, err := pb.Marshal(rawblock); err == nil {
254+
payload := &proto.Payload{uint64(i), "", bytes}
255+
bootstrapSet[0].s.AddPayload(payload)
256+
} else {
257+
t.Fail()
258+
}
259+
}
260+
261+
standardPeerSetSize := 10
262+
peersSet := make([]*peerNode, 0)
263+
264+
for i := 0; i < standardPeerSetSize; i++ {
265+
committer := newCommitter(bootstrapSetSize + i)
266+
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, blockPullPolicy))
267+
}
268+
269+
defer func() {
270+
for _, p := range peersSet {
271+
p.shutdown()
272+
}
273+
}()
274+
275+
waitUntilTrueOrTimeout(t, func() bool {
276+
for _, p := range peersSet {
277+
if len(p.g.PeersOfChannel(common.ChainID(util.GetTestChainID()))) != bootstrapSetSize+standardPeerSetSize-1 {
278+
logger.Debug("Peer discovery has not finished yet")
279+
return false
280+
}
281+
}
282+
logger.Debug("All peer discovered each other!!!")
283+
return true
284+
}, 30*time.Second)
285+
286+
logger.Debug("Waiting for all blocks to arrive.")
287+
waitUntilTrueOrTimeout(t, func() bool {
288+
logger.Debug("Trying to see all authorized peers get all blocks, and all non-authorized didn't")
289+
for _, p := range peersSet {
290+
height, err := p.commit.LedgerHeight()
291+
id := fmt.Sprintf("localhost:%d", p.port)
292+
if _, isAuthorized := authorizedPeers[id]; isAuthorized {
293+
if height != uint64(msgCount+1) || err != nil {
294+
return false
295+
}
296+
} else {
297+
if err == nil && height > 1 {
298+
assert.Fail(t, "Peer", id, "got message but isn't authorized! Height:", height)
299+
}
300+
}
301+
}
302+
logger.Debug("All peers have same ledger height!!!")
303+
return true
304+
}, 60*time.Second)
305+
}
306+
203307
/*// Simple scenario to start first booting node, gossip a message
204308
// then start second node and verify second node also receives it
205309
func TestNewGossipStateProvider_GossipingOneMessage(t *testing.T) {
@@ -289,7 +393,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
289393

290394
for i := 0; i < bootstrapSetSize; i++ {
291395
committer := newCommitter(i)
292-
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer))
396+
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, noopPeerIdentityAcceptor))
293397
}
294398

295399
defer func() {
@@ -315,7 +419,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
315419

316420
for i := 0; i < standartPeersSize; i++ {
317421
committer := newCommitter(bootstrapSetSize + i)
318-
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 100, 0, 1, 2, 3, 4), committer))
422+
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, noopPeerIdentityAcceptor))
319423
}
320424

321425
defer func() {
@@ -354,14 +458,18 @@ func TestGossipStateProvider_TestStateMessages(t *testing.T) {
354458
ledgermgmt.InitializeTestEnv()
355459
defer ledgermgmt.CleanupTestEnv()
356460

357-
bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0))
461+
bootPeer := newPeerNode(newGossipConfig(0), newCommitter(0), noopPeerIdentityAcceptor)
358462
defer bootPeer.shutdown()
359463

360-
peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1))
464+
peer := newPeerNode(newGossipConfig(1, 0), newCommitter(1), noopPeerIdentityAcceptor)
361465
defer peer.shutdown()
362466

363-
_, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true)
364-
_, peerCh := peer.g.Accept(remoteStateMsgFilter, true)
467+
naiveStateMsgPredicate := func(message interface{}) bool {
468+
return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
469+
}
470+
471+
_, bootCh := bootPeer.g.Accept(naiveStateMsgPredicate, true)
472+
_, peerCh := peer.g.Accept(naiveStateMsgPredicate, true)
365473

366474
wg := sync.WaitGroup{}
367475
wg.Add(2)

0 commit comments

Comments
 (0)