Skip to content

Commit bc6420d

Browse files
yacovmmastersingh24
authored andcommitted
[FAB-5793] Block deliver if payload buffer is too full
This commit tunes the state AddPayload to have the following effect: - If the block is received from gossip, and the ledger height is too low in comparison to the block's sequence - it is discarded (like previously - no change). The delta that triggers the discard is set to 100 blocks. - If the block is received from the orderer, and the payload buffer is too full (over 200 blocks) - the operation is blocked until it isn't full. Change-Id: Ieedbc728e0ec25031c5a09cf55071ae11061f5b8 Signed-off-by: yacovm <[email protected]>
1 parent a3b40de commit bc6420d

File tree

3 files changed

+117
-23
lines changed

3 files changed

+117
-23
lines changed

gossip/state/payloads_buffer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ func (b *PayloadsBufferImpl) Pop() *proto.Payload {
133133

134134
// Size returns current number of payloads stored within buffer
135135
func (b *PayloadsBufferImpl) Size() int {
136-
b.mutex.Lock()
137-
defer b.mutex.Unlock()
136+
b.mutex.RLock()
137+
defer b.mutex.RUnlock()
138138
return len(b.buf)
139139
}
140140

gossip/state/state.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/hyperledger/fabric/protos/common"
3535
proto "github.com/hyperledger/fabric/protos/gossip"
3636
"github.com/op/go-logging"
37+
"github.com/spf13/viper"
3738
)
3839

3940
// GossipStateProvider is the interface to acquire sequences of the ledger blocks
@@ -58,6 +59,11 @@ const (
5859
defAntiEntropyMaxRetries = 3
5960

6061
defMaxBlockDistance = 100
62+
63+
blocking = true
64+
nonBlocking = false
65+
66+
enqueueRetryInterval = time.Millisecond * 100
6167
)
6268

6369
// GossipAdapter defines gossip/communication required interface for state provider
@@ -407,7 +413,7 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
407413

408414
dataMsg := msg.GetDataMsg()
409415
if dataMsg != nil {
410-
if err := s.AddPayload(dataMsg.GetPayload()); err != nil {
416+
if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {
411417
logger.Warning("Failed adding payload:", err)
412418
return
413419
}
@@ -620,8 +626,20 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
620626
return nil
621627
}
622628

623-
// AddPayload add new payload into state
629+
// AddPayload add new payload into state.
624630
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
631+
blockingMode := blocking
632+
if viper.GetBool("peer.gossip.nonBlockingCommitMode") {
633+
blockingMode = false
634+
}
635+
return s.addPayload(payload, blockingMode)
636+
}
637+
638+
// addPayload add new payload into state. It may (or may not) block according to the
639+
// given parameter. If it gets a block while in blocking mode - it would wait until
640+
// the block is sent into the payloads buffer.
641+
// Else - it may drop the block, if the payload buffer is too full.
642+
func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
625643
if payload == nil {
626644
return errors.New("Given payload is nil")
627645
}
@@ -631,10 +649,14 @@ func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
631649
return fmt.Errorf("Failed obtaining ledger height: %v", err)
632650
}
633651

634-
if payload.SeqNum-height >= defMaxBlockDistance {
652+
if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance {
635653
return fmt.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
636654
}
637655

656+
for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 {
657+
time.Sleep(enqueueRetryInterval)
658+
}
659+
638660
return s.payloads.Push(payload)
639661
}
640662

gossip/state/state_test.go

+90-18
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"errors"
2222
"fmt"
23+
"math/rand"
2324
"strconv"
2425
"sync"
2526
"testing"
@@ -154,7 +155,7 @@ func bootPeers(ids ...int) []string {
154155
type peerNode struct {
155156
port int
156157
g gossip.Gossip
157-
s GossipStateProvider
158+
s *GossipStateProviderImpl
158159
cs *cryptoServiceMock
159160
commit committer.Committer
160161
}
@@ -171,17 +172,21 @@ type mockCommitter struct {
171172
}
172173

173174
func (mc *mockCommitter) Commit(block *pcomm.Block) error {
174-
mc.Called(block)
175+
mc.Lock()
176+
m := mc.Mock
177+
mc.Unlock()
178+
m.Called(block)
175179
return nil
176180
}
177181

178182
func (mc *mockCommitter) LedgerHeight() (uint64, error) {
179183
mc.Lock()
180-
defer mc.Unlock()
181-
if mc.Called().Get(1) == nil {
182-
return mc.Called().Get(0).(uint64), nil
184+
m := mc.Mock
185+
mc.Unlock()
186+
if m.Called().Get(1) == nil {
187+
return m.Called().Get(0).(uint64), nil
183188
}
184-
return mc.Called().Get(0).(uint64), mc.Called().Get(1).(error)
189+
return m.Called().Get(0).(uint64), m.Called().Get(1).(error)
185190
}
186191

187192
func (mc *mockCommitter) GetBlocks(blockSeqs []uint64) []*pcomm.Block {
@@ -252,7 +257,7 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer,
252257
return &peerNode{
253258
port: config.BindPort,
254259
g: g,
255-
s: sp,
260+
s: sp.(*GossipStateProviderImpl),
256261
commit: committer,
257262
cs: cs,
258263
}
@@ -271,13 +276,13 @@ func TestNilDirectMsg(t *testing.T) {
271276
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
272277
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
273278
defer p.shutdown()
274-
p.s.(*GossipStateProviderImpl).handleStateRequest(nil)
275-
p.s.(*GossipStateProviderImpl).directMessage(nil)
276-
sMsg, _ := p.s.(*GossipStateProviderImpl).stateRequestMessage(uint64(10), uint64(8)).NoopSign()
279+
p.s.handleStateRequest(nil)
280+
p.s.directMessage(nil)
281+
sMsg, _ := p.s.stateRequestMessage(uint64(10), uint64(8)).NoopSign()
277282
req := &comm.ReceivedMessageImpl{
278283
SignedGossipMessage: sMsg,
279284
}
280-
p.s.(*GossipStateProviderImpl).directMessage(req)
285+
p.s.directMessage(req)
281286
}
282287

283288
func TestNilAddPayload(t *testing.T) {
@@ -341,32 +346,32 @@ func TestOverPopulation(t *testing.T) {
341346
for i := 1; i <= 4; i++ {
342347
rawblock := pcomm.NewBlock(uint64(i), []byte{})
343348
b, _ := pb.Marshal(rawblock)
344-
assert.NoError(t, p.s.AddPayload(&proto.Payload{
349+
assert.NoError(t, p.s.addPayload(&proto.Payload{
345350
SeqNum: uint64(i),
346351
Data: b,
347-
}))
352+
}, nonBlocking))
348353
}
349354

350355
// Add payloads from 10 to defMaxBlockDistance, while we're missing blocks [5,9]
351356
// Should succeed
352357
for i := 10; i <= defMaxBlockDistance; i++ {
353358
rawblock := pcomm.NewBlock(uint64(i), []byte{})
354359
b, _ := pb.Marshal(rawblock)
355-
assert.NoError(t, p.s.AddPayload(&proto.Payload{
360+
assert.NoError(t, p.s.addPayload(&proto.Payload{
356361
SeqNum: uint64(i),
357362
Data: b,
358-
}))
363+
}, nonBlocking))
359364
}
360365

361366
// Add payloads from defMaxBlockDistance + 2 to defMaxBlockDistance * 10
362367
// Should fail.
363368
for i := defMaxBlockDistance + 1; i <= defMaxBlockDistance*10; i++ {
364369
rawblock := pcomm.NewBlock(uint64(i), []byte{})
365370
b, _ := pb.Marshal(rawblock)
366-
assert.Error(t, p.s.AddPayload(&proto.Payload{
371+
assert.Error(t, p.s.addPayload(&proto.Payload{
367372
SeqNum: uint64(i),
368373
Data: b,
369-
}))
374+
}, nonBlocking))
370375
}
371376

372377
// Ensure only blocks 1-4 were passed to the ledger
@@ -379,9 +384,76 @@ func TestOverPopulation(t *testing.T) {
379384
assert.Equal(t, 5, i)
380385

381386
// Ensure we don't store too many blocks in memory
382-
sp := p.s.(*GossipStateProviderImpl)
387+
sp := p.s
383388
assert.True(t, sp.payloads.Size() < defMaxBlockDistance)
389+
}
384390

391+
func TestBlockingEnqueue(t *testing.T) {
392+
// Scenario: In parallel, get blocks from gossip and from the orderer.
393+
// The blocks from the orderer we get are X2 times the amount of blocks from gossip.
394+
// The blocks we get from gossip are random indices, to maximize disruption.
395+
mc := &mockCommitter{}
396+
blocksPassedToLedger := make(chan uint64, 10)
397+
mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
398+
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
399+
})
400+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
401+
g := &mocks.GossipMock{}
402+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
403+
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
404+
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
405+
defer p.shutdown()
406+
407+
numBlocksReceived := 500
408+
receivedBlockCount := 0
409+
// Get a block from the orderer every 1ms
410+
go func() {
411+
for i := 1; i <= numBlocksReceived; i++ {
412+
rawblock := pcomm.NewBlock(uint64(i), []byte{})
413+
b, _ := pb.Marshal(rawblock)
414+
block := &proto.Payload{
415+
SeqNum: uint64(i),
416+
Data: b,
417+
}
418+
p.s.AddPayload(block)
419+
time.Sleep(time.Millisecond)
420+
}
421+
}()
422+
423+
// Get a block from gossip every 1ms too
424+
go func() {
425+
rand.Seed(time.Now().UnixNano())
426+
for i := 1; i <= numBlocksReceived/2; i++ {
427+
blockSeq := rand.Intn(numBlocksReceived)
428+
rawblock := pcomm.NewBlock(uint64(blockSeq), []byte{})
429+
b, _ := pb.Marshal(rawblock)
430+
block := &proto.Payload{
431+
SeqNum: uint64(blockSeq),
432+
Data: b,
433+
}
434+
p.s.addPayload(block, nonBlocking)
435+
time.Sleep(time.Millisecond)
436+
}
437+
}()
438+
439+
for {
440+
receivedBlock := <-blocksPassedToLedger
441+
receivedBlockCount++
442+
m := mock.Mock{}
443+
m.On("LedgerHeight", mock.Anything).Return(receivedBlock, nil)
444+
m.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
445+
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
446+
})
447+
mc.Lock()
448+
mc.Mock = m
449+
mc.Unlock()
450+
assert.Equal(t, receivedBlock, uint64(receivedBlockCount))
451+
if int(receivedBlockCount) == numBlocksReceived {
452+
break
453+
}
454+
time.Sleep(time.Millisecond * 10)
455+
t.Log("got block", receivedBlock)
456+
}
385457
}
386458

387459
func TestFailures(t *testing.T) {

0 commit comments

Comments
 (0)