Skip to content

Commit 2e27110

Browse files
committed
[FAB-5849] calibrate state transfer pace
Blocks received from peers via state transfer are added to the payload buffer right away regardless the payload buffer's size. In cases when state transfer is much faster than the commit process, blocks pile up in the payload buffer and the peer might be out of memory. This change set makes the method that handles payload reception from remote peers to add the payloads through the same code path that receives blocks from the orderer, which blocks in case the payload buffer is too overpopulated. Change-Id: I2fc1a916b809311a7d3aa0308b64d2127ad1ee60 Signed-off-by: yacovm <[email protected]> Signed-off-by: Gari Singh <[email protected]> Signed-off-by: yacovm <[email protected]>
1 parent 8d07299 commit 2e27110

File tree

4 files changed

+94
-14
lines changed

4 files changed

+94
-14
lines changed

gossip/state/mocks/gossip.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,17 @@ func (*GossipMock) SuspectPeers(s api.PeerSuspector) {
3333
panic("implement me")
3434
}
3535

36-
func (*GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
37-
panic("implement me")
36+
func (g *GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
37+
g.Called(msg, peers)
3838
}
3939

4040
func (*GossipMock) Peers() []discovery.NetworkMember {
4141
panic("implement me")
4242
}
4343

44-
func (*GossipMock) PeersOfChannel(common.ChainID) []discovery.NetworkMember {
45-
return nil
44+
func (g *GossipMock) PeersOfChannel(chainID common.ChainID) []discovery.NetworkMember {
45+
args := g.Called(chainID)
46+
return args.Get(0).([]discovery.NetworkMember)
4647
}
4748

4849
func (*GossipMock) UpdateMetadata(metadata []byte) {
@@ -60,7 +61,7 @@ func (*GossipMock) Gossip(msg *proto.GossipMessage) {
6061
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
6162
args := g.Called(acceptor, passThrough)
6263
if args.Get(0) == nil {
63-
return nil, args.Get(1).(<-chan proto.ReceivedMessage)
64+
return nil, args.Get(1).(chan proto.ReceivedMessage)
6465
}
6566
return args.Get(0).(<-chan *proto.GossipMessage), nil
6667
}

gossip/state/mocks/gossip_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/hyperledger/fabric/gossip/api"
2323
"github.com/hyperledger/fabric/gossip/common"
24+
"github.com/hyperledger/fabric/gossip/discovery"
2425
proto "github.com/hyperledger/fabric/protos/gossip"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/mock"
@@ -34,6 +35,7 @@ func TestGossipMock(t *testing.T) {
3435
return c
3536
}
3637
g.On("Accept", mock.Anything, false).Return(mkChan(), nil)
38+
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
3739
a, b := g.Accept(func(o interface{}) bool {
3840
return true
3941
}, false)

gossip/state/state.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -379,9 +379,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
379379
if max < payload.SeqNum {
380380
max = payload.SeqNum
381381
}
382-
err := s.payloads.Push(payload)
382+
383+
err := s.addPayload(payload, blocking)
383384
if err != nil {
384-
logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
385+
logger.Warningf("Payload with sequence number %d wasn't added to payload buffer: %v", payload.SeqNum, err)
385386
}
386387
}
387388
return max, nil

gossip/state/state_test.go

+83-7
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ import (
3535
"github.com/hyperledger/fabric/gossip/api"
3636
"github.com/hyperledger/fabric/gossip/comm"
3737
"github.com/hyperledger/fabric/gossip/common"
38+
"github.com/hyperledger/fabric/gossip/discovery"
3839
"github.com/hyperledger/fabric/gossip/gossip"
3940
"github.com/hyperledger/fabric/gossip/identity"
4041
"github.com/hyperledger/fabric/gossip/state/mocks"
4142
gutil "github.com/hyperledger/fabric/gossip/util"
4243
pcomm "github.com/hyperledger/fabric/protos/common"
4344
proto "github.com/hyperledger/fabric/protos/gossip"
45+
"github.com/op/go-logging"
4446
"github.com/spf13/viper"
4547
"github.com/stretchr/testify/assert"
4648
"github.com/stretchr/testify/mock"
@@ -63,6 +65,7 @@ type joinChanMsg struct {
6365

6466
func init() {
6567
gutil.SetupTestLogging()
68+
logging.SetLevel(logging.DEBUG, gutil.LoggingStateModule)
6669
}
6770

6871
// SequenceNumber returns the sequence number of the block that the message
@@ -273,7 +276,7 @@ func TestNilDirectMsg(t *testing.T) {
273276
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
274277
g := &mocks.GossipMock{}
275278
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
276-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
279+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
277280
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
278281
defer p.shutdown()
279282
p.s.handleStateRequest(nil)
@@ -290,7 +293,7 @@ func TestNilAddPayload(t *testing.T) {
290293
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
291294
g := &mocks.GossipMock{}
292295
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
293-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
296+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
294297
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
295298
defer p.shutdown()
296299
err := p.s.AddPayload(nil)
@@ -303,7 +306,7 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
303306
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
304307
g := &mocks.GossipMock{}
305308
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
306-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
309+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
307310
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
308311
defer p.shutdown()
309312
// Simulate a problem in the ledger
@@ -324,6 +327,77 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
324327
assert.Contains(t, err.Error(), "cannot query ledger")
325328
}
326329

330+
func TestLargeBlockGap(t *testing.T) {
331+
// Scenario: the peer knows of a peer who has a ledger height much higher
332+
// than itself (500 blocks higher).
333+
// The peer needs to ask blocks in a way such that the size of the payload buffer
334+
// never rises above a certain threshold.
335+
336+
mc := &mockCommitter{}
337+
blocksPassedToLedger := make(chan uint64, 200)
338+
mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
339+
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
340+
})
341+
msgsFromPeer := make(chan proto.ReceivedMessage)
342+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
343+
g := &mocks.GossipMock{}
344+
metaState := NewNodeMetastate(500)
345+
md, _ := metaState.Bytes()
346+
membership := []discovery.NetworkMember{
347+
{
348+
PKIid: common.PKIidType("a"),
349+
Endpoint: "a",
350+
Metadata: md,
351+
}}
352+
g.On("PeersOfChannel", mock.Anything).Return(membership)
353+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
354+
g.On("Accept", mock.Anything, true).Return(nil, msgsFromPeer)
355+
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
356+
msg := arguments.Get(0).(*proto.GossipMessage)
357+
// The peer requested a state request
358+
req := msg.GetStateRequest()
359+
// Construct a skeleton for the response
360+
res := &proto.GossipMessage{
361+
Nonce: msg.Nonce,
362+
Channel: []byte(util.GetTestChainID()),
363+
Content: &proto.GossipMessage_StateResponse{
364+
StateResponse: &proto.RemoteStateResponse{},
365+
},
366+
}
367+
// Populate the response with payloads according to what the peer asked
368+
for seq := req.StartSeqNum; seq <= req.EndSeqNum; seq++ {
369+
rawblock := pcomm.NewBlock(seq, []byte{})
370+
b, _ := pb.Marshal(rawblock)
371+
payload := &proto.Payload{
372+
SeqNum: seq,
373+
Data: b,
374+
}
375+
res.GetStateResponse().Payloads = append(res.GetStateResponse().Payloads, payload)
376+
}
377+
// Finally, send the response down the channel the peer expects to receive it from
378+
sMsg, _ := res.NoopSign()
379+
msgsFromPeer <- &comm.ReceivedMessageImpl{
380+
SignedGossipMessage: sMsg,
381+
}
382+
})
383+
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
384+
defer p.shutdown()
385+
386+
// Process blocks at a speed of 20 Millisecond for each block.
387+
// The imaginative peer that responds to state
388+
// If the payload buffer expands above defMaxBlockDistance*2 + defAntiEntropyBatchSize blocks, fail the test
389+
blockProcessingTime := 20 * time.Millisecond // 10 seconds for total 500 blocks
390+
expectedSequence := 1
391+
for expectedSequence < 500 {
392+
blockSeq := <-blocksPassedToLedger
393+
assert.Equal(t, expectedSequence, int(blockSeq))
394+
// Ensure payload buffer isn't over-populated
395+
assert.True(t, p.s.payloads.Size() <= defMaxBlockDistance*2+defAntiEntropyBatchSize, "payload buffer size is %d", p.s.payloads.Size())
396+
expectedSequence++
397+
time.Sleep(blockProcessingTime)
398+
}
399+
}
400+
327401
func TestOverPopulation(t *testing.T) {
328402
// Scenario: Add to the state provider blocks
329403
// with a gap in between, and ensure that the payload buffer
@@ -338,7 +412,7 @@ func TestOverPopulation(t *testing.T) {
338412
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
339413
g := &mocks.GossipMock{}
340414
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
341-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
415+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
342416
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
343417
defer p.shutdown()
344418

@@ -400,7 +474,7 @@ func TestBlockingEnqueue(t *testing.T) {
400474
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
401475
g := &mocks.GossipMock{}
402476
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
403-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
477+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
404478
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
405479
defer p.shutdown()
406480

@@ -461,7 +535,8 @@ func TestFailures(t *testing.T) {
461535
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
462536
g := &mocks.GossipMock{}
463537
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
464-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
538+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
539+
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
465540
assert.Panics(t, func() {
466541
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
467542
})
@@ -519,7 +594,8 @@ func TestGossipReception(t *testing.T) {
519594
g.On("Accept", mock.Anything, false).Return(rmc, nil).Run(func(_ mock.Arguments) {
520595
signalChan <- struct{}{}
521596
})
522-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
597+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
598+
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
523599
mc := &mockCommitter{}
524600
receivedChan := make(chan struct{})
525601
mc.On("Commit", mock.Anything).Run(func(arguments mock.Arguments) {

0 commit comments

Comments
 (0)