Skip to content

Commit ecda4c2

Browse files
committed
[FAB-5330] Prevent payload buffer overpopulation
The state transfer module receives blocks from either the orderer or other peers, and puts them into a payload buffer for reordering so they would enter the ledger in-order. In some cases: - If the peer joined late and it receives blocks from peers starting from index i where the ledger is missing indices [j, i] for some j<i and the rate of block reception from peers is very fast - If the ledger is "stuck" (i.e file system full, etc.) and cannot advance This buffer would overpopulate. This commit addresses this, and adds a maximum distance constant that if the difference between the ledger height and the sequence of the block that is received is greater than this constant, the block is dropped. Change-Id: Ia1ba8966ea6d211c5d1b7ddd84a4fad34af797d4 Signed-off-by: yacovm <[email protected]>
1 parent 97d4846 commit ecda4c2

File tree

3 files changed

+129
-5
lines changed

3 files changed

+129
-5
lines changed

core/deliverservice/blocksprovider/blocksprovider.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,9 @@ func (b *blocksProviderImpl) DeliverBlocks() {
183183

184184
logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
185185
// Add payload to local state payloads buffer
186-
b.gossip.AddPayload(b.chainID, payload)
186+
if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
187+
logger.Warning("Failed adding payload of", seqNum, "because:", err)
188+
}
187189

188190
// Gossip messages with other nodes
189191
logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)

gossip/state/state.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package state
1919
import (
2020
"bytes"
2121
"errors"
22+
"fmt"
2223
"sync"
2324
"sync/atomic"
2425
"time"
@@ -55,6 +56,8 @@ const (
5556

5657
defChannelBufferSize = 100
5758
defAntiEntropyMaxRetries = 3
59+
60+
defMaxBlockDistance = 100
5861
)
5962

6063
// GossipAdapter defines gossip/communication required interface for state provider
@@ -404,10 +407,11 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
404407

405408
dataMsg := msg.GetDataMsg()
406409
if dataMsg != nil {
407-
// Add new payload to ordered set
408-
410+
if err := s.AddPayload(dataMsg.GetPayload()); err != nil {
411+
logger.Warning("Failed adding payload:", err)
412+
return
413+
}
409414
logger.Debugf("Received new payload with sequence number = [%d]", dataMsg.Payload.SeqNum)
410-
s.payloads.Push(dataMsg.GetPayload())
411415
} else {
412416
logger.Debug("Gossip message received is not of data message type, usually this should not happen.")
413417
}
@@ -616,8 +620,19 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
616620

617621
// AddPayload add new payload into state
618622
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
619-
623+
if payload == nil {
624+
return errors.New("Given payload is nil")
625+
}
620626
logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum)
627+
height, err := s.committer.LedgerHeight()
628+
if err != nil {
629+
return fmt.Errorf("Failed obtaining ledger height: %v", err)
630+
}
631+
632+
if payload.SeqNum-height >= defMaxBlockDistance {
633+
return fmt.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
634+
}
635+
621636
return s.payloads.Push(payload)
622637
}
623638

gossip/state/state_test.go

+107
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func (node *peerNode) shutdown() {
167167

168168
type mockCommitter struct {
169169
mock.Mock
170+
sync.Mutex
170171
}
171172

172173
func (mc *mockCommitter) Commit(block *pcomm.Block) error {
@@ -175,6 +176,8 @@ func (mc *mockCommitter) Commit(block *pcomm.Block) error {
175176
}
176177

177178
func (mc *mockCommitter) LedgerHeight() (uint64, error) {
179+
mc.Lock()
180+
defer mc.Unlock()
178181
if mc.Called().Get(1) == nil {
179182
return mc.Called().Get(0).(uint64), nil
180183
}
@@ -277,6 +280,110 @@ func TestNilDirectMsg(t *testing.T) {
277280
p.s.(*GossipStateProviderImpl).directMessage(req)
278281
}
279282

283+
func TestNilAddPayload(t *testing.T) {
284+
mc := &mockCommitter{}
285+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
286+
g := &mocks.GossipMock{}
287+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
288+
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
289+
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
290+
defer p.shutdown()
291+
err := p.s.AddPayload(nil)
292+
assert.Error(t, err)
293+
assert.Contains(t, err.Error(), "nil")
294+
}
295+
296+
func TestAddPayloadLedgerUnavailable(t *testing.T) {
297+
mc := &mockCommitter{}
298+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
299+
g := &mocks.GossipMock{}
300+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
301+
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
302+
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
303+
defer p.shutdown()
304+
// Simulate a problem in the ledger
305+
failedLedger := mock.Mock{}
306+
failedLedger.On("LedgerHeight", mock.Anything).Return(uint64(0), errors.New("cannot query ledger"))
307+
mc.Lock()
308+
mc.Mock = failedLedger
309+
mc.Unlock()
310+
311+
rawblock := pcomm.NewBlock(uint64(1), []byte{})
312+
b, _ := pb.Marshal(rawblock)
313+
err := p.s.AddPayload(&proto.Payload{
314+
SeqNum: uint64(1),
315+
Data: b,
316+
})
317+
assert.Error(t, err)
318+
assert.Contains(t, err.Error(), "Failed obtaining ledger height")
319+
assert.Contains(t, err.Error(), "cannot query ledger")
320+
}
321+
322+
func TestOverPopulation(t *testing.T) {
323+
// Scenario: Add to the state provider blocks
324+
// with a gap in between, and ensure that the payload buffer
325+
// rejects blocks starting if the distance between the ledger height to the latest
326+
// block it contains is bigger than defMaxBlockDistance.
327+
328+
mc := &mockCommitter{}
329+
blocksPassedToLedger := make(chan uint64, 10)
330+
mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
331+
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
332+
})
333+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
334+
g := &mocks.GossipMock{}
335+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
336+
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
337+
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
338+
defer p.shutdown()
339+
340+
// Add some blocks in a sequential manner and make sure it works
341+
for i := 1; i <= 4; i++ {
342+
rawblock := pcomm.NewBlock(uint64(i), []byte{})
343+
b, _ := pb.Marshal(rawblock)
344+
assert.NoError(t, p.s.AddPayload(&proto.Payload{
345+
SeqNum: uint64(i),
346+
Data: b,
347+
}))
348+
}
349+
350+
// Add payloads from 10 to defMaxBlockDistance, while we're missing blocks [5,9]
351+
// Should succeed
352+
for i := 10; i <= defMaxBlockDistance; i++ {
353+
rawblock := pcomm.NewBlock(uint64(i), []byte{})
354+
b, _ := pb.Marshal(rawblock)
355+
assert.NoError(t, p.s.AddPayload(&proto.Payload{
356+
SeqNum: uint64(i),
357+
Data: b,
358+
}))
359+
}
360+
361+
// Add payloads from defMaxBlockDistance + 2 to defMaxBlockDistance * 10
362+
// Should fail.
363+
for i := defMaxBlockDistance + 1; i <= defMaxBlockDistance*10; i++ {
364+
rawblock := pcomm.NewBlock(uint64(i), []byte{})
365+
b, _ := pb.Marshal(rawblock)
366+
assert.Error(t, p.s.AddPayload(&proto.Payload{
367+
SeqNum: uint64(i),
368+
Data: b,
369+
}))
370+
}
371+
372+
// Ensure only blocks 1-4 were passed to the ledger
373+
close(blocksPassedToLedger)
374+
i := 1
375+
for seq := range blocksPassedToLedger {
376+
assert.Equal(t, uint64(i), seq)
377+
i++
378+
}
379+
assert.Equal(t, 5, i)
380+
381+
// Ensure we don't store too many blocks in memory
382+
sp := p.s.(*GossipStateProviderImpl)
383+
assert.True(t, sp.payloads.Size() < defMaxBlockDistance)
384+
385+
}
386+
280387
func TestFailures(t *testing.T) {
281388
mc := &mockCommitter{}
282389
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)

0 commit comments

Comments
 (0)