Skip to content

Commit 1a721b1

Browse files
committed
[FAB-4438] Fix race condition in mock WriteBlock
WriteBlockVal could get overwritten before the listener on the Batches channel had a chance to retrieve it. This changeset fixes this by replacing the Batches channel with a Blocks channel. Change-Id: Idc8899a4317a1c2dc7f5b6affae1c42f24095117 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 2590cce commit 1a721b1

File tree

3 files changed

+31
-42
lines changed

3 files changed

+31
-42
lines changed

orderer/kafka/chain_test.go

+14-16
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func TestProcessLoopRegularError(t *testing.T) {
420420
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
421421

422422
mockSupport := &mockmultichain.ConsenterSupport{
423-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
423+
Blocks: make(chan *cb.Block), // WriteBlock will post here
424424
BlockCutterVal: mockblockcutter.NewReceiver(),
425425
ChainIDVal: mockChannel.topic(),
426426
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -477,7 +477,7 @@ func TestProcessLoopRegularQueueEnvelope(t *testing.T) {
477477
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
478478

479479
mockSupport := &mockmultichain.ConsenterSupport{
480-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
480+
Blocks: make(chan *cb.Block), // WriteBlock will post here
481481
BlockCutterVal: mockblockcutter.NewReceiver(),
482482
ChainIDVal: mockChannel.topic(),
483483
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -530,7 +530,7 @@ func TestProcessLoopRegularCutBlock(t *testing.T) {
530530
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
531531

532532
mockSupport := &mockmultichain.ConsenterSupport{
533-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
533+
Blocks: make(chan *cb.Block), // WriteBlock will post here
534534
BlockCutterVal: mockblockcutter.NewReceiver(),
535535
ChainIDVal: mockChannel.topic(),
536536
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -546,7 +546,7 @@ func TestProcessLoopRegularCutBlock(t *testing.T) {
546546
go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good
547547
mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return
548548
logger.Debugf("Mock blockcutter's Ordered call has returned")
549-
<-mockSupport.Batches // Let the `mockConsenterSupport.WriteBlock` proceed
549+
<-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed
550550
logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once
551551
close(exitChan) // Identical to chain.Halt()
552552
logger.Debug("exitChan closed")
@@ -590,7 +590,7 @@ func TestProcessLoopRegularCutTwoBlocks(t *testing.T) {
590590
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
591591

592592
mockSupport := &mockmultichain.ConsenterSupport{
593-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
593+
Blocks: make(chan *cb.Block), // WriteBlock will post here
594594
BlockCutterVal: mockblockcutter.NewReceiver(),
595595
ChainIDVal: mockChannel.topic(),
596596
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -614,15 +614,13 @@ func TestProcessLoopRegularCutTwoBlocks(t *testing.T) {
614614
logger.Debugf("Mock blockcutter's Ordered call has returned for the second time")
615615

616616
select {
617-
case <-mockSupport.Batches: // Let the `mockConsenterSupport.WriteBlock` proceed
618-
block1 = mockSupport.WriteBlockVal
617+
case block1 = <-mockSupport.Blocks: // Let the `mockConsenterSupport.WriteBlock` proceed
619618
case <-time.After(hitBranch):
620619
logger.Fatalf("Did not receive a block from the blockcutter as expected")
621620
}
622621

623622
select {
624-
case <-mockSupport.Batches:
625-
block2 = mockSupport.WriteBlockVal
623+
case block2 = <-mockSupport.Blocks:
626624
case <-time.After(hitBranch):
627625
logger.Fatalf("Did not receive a block from the blockcutter as expected")
628626
}
@@ -677,7 +675,7 @@ func TestProcessLoopRegularAndSendTimeToCutRegular(t *testing.T) {
677675
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
678676

679677
mockSupport := &mockmultichain.ConsenterSupport{
680-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
678+
Blocks: make(chan *cb.Block), // WriteBlock will post here
681679
BlockCutterVal: mockblockcutter.NewReceiver(),
682680
ChainIDVal: mockChannel.topic(),
683681
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -746,7 +744,7 @@ func TestProcessLoopRegularAndSendTimeToCutError(t *testing.T) {
746744
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
747745

748746
mockSupport := &mockmultichain.ConsenterSupport{
749-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
747+
Blocks: make(chan *cb.Block), // WriteBlock will post here
750748
BlockCutterVal: mockblockcutter.NewReceiver(),
751749
ChainIDVal: mockChannel.topic(),
752750
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -801,7 +799,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageRegular(t *testing.T) {
801799
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
802800

803801
mockSupport := &mockmultichain.ConsenterSupport{
804-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
802+
Blocks: make(chan *cb.Block), // WriteBlock will post here
805803
BlockCutterVal: mockblockcutter.NewReceiver(),
806804
ChainIDVal: mockChannel.topic(),
807805
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -818,7 +816,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageRegular(t *testing.T) {
818816
mockSupport.BlockCutterVal.Ordered(newMockEnvelope("fooMessage"))
819817

820818
go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good
821-
<-mockSupport.Batches // Let the `mockConsenterSupport.WriteBlock` proceed
819+
<-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed
822820
logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once
823821
close(exitChan) // Identical to chain.Halt()
824822
logger.Debug("exitChan closed")
@@ -858,7 +856,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageZeroBatch(t *testing.T) {
858856
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
859857

860858
mockSupport := &mockmultichain.ConsenterSupport{
861-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
859+
Blocks: make(chan *cb.Block), // WriteBlock will post here
862860
BlockCutterVal: mockblockcutter.NewReceiver(),
863861
ChainIDVal: mockChannel.topic(),
864862
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -902,7 +900,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageLargerThanExpected(t *testing.T)
902900
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
903901

904902
mockSupport := &mockmultichain.ConsenterSupport{
905-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
903+
Blocks: make(chan *cb.Block), // WriteBlock will post here
906904
BlockCutterVal: mockblockcutter.NewReceiver(),
907905
ChainIDVal: mockChannel.topic(),
908906
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
@@ -948,7 +946,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageStale(t *testing.T) {
948946
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")
949947

950948
mockSupport := &mockmultichain.ConsenterSupport{
951-
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
949+
Blocks: make(chan *cb.Block), // WriteBlock will post here
952950
BlockCutterVal: mockblockcutter.NewReceiver(),
953951
ChainIDVal: mockChannel.topic(),
954952
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call

orderer/mocks/multichain/multichain.go

+5-14
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type ConsenterSupport struct {
3939
// BlockCutterVal is the value returned by BlockCutter()
4040
BlockCutterVal *mockblockcutter.Receiver
4141

42-
// Batches is the channel which WriteBlock writes data to
43-
Batches chan []*cb.Envelope
42+
// Blocks is the channel where WriteBlock writes the most recently created block
43+
Blocks chan *cb.Block
4444

4545
// ChainIDVal is the value returned by ChainID()
4646
ChainIDVal string
@@ -50,9 +50,6 @@ type ConsenterSupport struct {
5050

5151
// NextBlockVal stores the block created by the most recent CreateNextBlock() call
5252
NextBlockVal *cb.Block
53-
54-
// WriteBlockVal stores the block created by the most recent WriteBlock() call
55-
WriteBlockVal *cb.Block
5653
}
5754

5855
// BlockCutter returns BlockCutterVal
@@ -77,20 +74,14 @@ func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block {
7774
return block
7875
}
7976

80-
// WriteBlock writes data to the Batches channel
77+
// WriteBlock writes data to the Blocks channel
8178
// Note that _committers is ignored by this mock implementation
8279
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
83-
logger.Debugf("mockWriter: attempting to write batch")
84-
umtxs := make([]*cb.Envelope, len(block.Data.Data))
85-
for i := range block.Data.Data {
86-
umtxs[i] = utils.UnmarshalEnvelopeOrPanic(block.Data.Data[i])
87-
}
88-
mcs.HeightVal++
8980
if encodedMetadataValue != nil {
9081
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
9182
}
92-
mcs.WriteBlockVal = block
93-
mcs.Batches <- umtxs
83+
mcs.HeightVal++
84+
mcs.Blocks <- block
9485
return block
9586
}
9687

orderer/solo/consensus_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func goWithWait(target func()) *waitableGo {
5757
func TestEmptyBatch(t *testing.T) {
5858
batchTimeout, _ := time.ParseDuration("1ms")
5959
support := &mockmultichain.ConsenterSupport{
60-
Batches: make(chan []*cb.Envelope),
60+
Blocks: make(chan *cb.Block),
6161
BlockCutterVal: mockblockcutter.NewReceiver(),
6262
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
6363
}
@@ -69,7 +69,7 @@ func TestEmptyBatch(t *testing.T) {
6969
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
7070
bs.Halt()
7171
select {
72-
case <-support.Batches:
72+
case <-support.Blocks:
7373
t.Fatalf("Expected no invocations of Append")
7474
case <-wg.done:
7575
}
@@ -78,7 +78,7 @@ func TestEmptyBatch(t *testing.T) {
7878
func TestBatchTimer(t *testing.T) {
7979
batchTimeout, _ := time.ParseDuration("1ms")
8080
support := &mockmultichain.ConsenterSupport{
81-
Batches: make(chan []*cb.Envelope),
81+
Blocks: make(chan *cb.Block),
8282
BlockCutterVal: mockblockcutter.NewReceiver(),
8383
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
8484
}
@@ -90,21 +90,21 @@ func TestBatchTimer(t *testing.T) {
9090
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
9191

9292
select {
93-
case <-support.Batches:
93+
case <-support.Blocks:
9494
case <-time.After(time.Second):
9595
t.Fatalf("Expected a block to be cut because of batch timer expiration but did not")
9696
}
9797

9898
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
9999
select {
100-
case <-support.Batches:
100+
case <-support.Blocks:
101101
case <-time.After(time.Second):
102102
t.Fatalf("Did not create the second batch, indicating that the timer was not appopriately reset")
103103
}
104104

105105
bs.Halt()
106106
select {
107-
case <-support.Batches:
107+
case <-support.Blocks:
108108
t.Fatalf("Expected no invocations of Append")
109109
case <-wg.done:
110110
}
@@ -113,7 +113,7 @@ func TestBatchTimer(t *testing.T) {
113113
func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
114114
batchTimeout, _ := time.ParseDuration("1h")
115115
support := &mockmultichain.ConsenterSupport{
116-
Batches: make(chan []*cb.Envelope),
116+
Blocks: make(chan *cb.Block),
117117
BlockCutterVal: mockblockcutter.NewReceiver(),
118118
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
119119
}
@@ -128,7 +128,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
128128
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
129129

130130
select {
131-
case <-support.Batches:
131+
case <-support.Blocks:
132132
case <-time.After(time.Second):
133133
t.Fatalf("Expected a block to be cut because the batch was filled, but did not")
134134
}
@@ -140,7 +140,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
140140
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
141141

142142
select {
143-
case <-support.Batches:
143+
case <-support.Blocks:
144144
case <-time.After(time.Second):
145145
t.Fatalf("Did not create the second batch, indicating that the old timer was still running")
146146
}
@@ -156,7 +156,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
156156
func TestConfigStyleMultiBatch(t *testing.T) {
157157
batchTimeout, _ := time.ParseDuration("1h")
158158
support := &mockmultichain.ConsenterSupport{
159-
Batches: make(chan []*cb.Envelope),
159+
Blocks: make(chan *cb.Block),
160160
BlockCutterVal: mockblockcutter.NewReceiver(),
161161
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
162162
}
@@ -170,13 +170,13 @@ func TestConfigStyleMultiBatch(t *testing.T) {
170170
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
171171

172172
select {
173-
case <-support.Batches:
173+
case <-support.Blocks:
174174
case <-time.After(time.Second):
175175
t.Fatalf("Expected two blocks to be cut but never got the first")
176176
}
177177

178178
select {
179-
case <-support.Batches:
179+
case <-support.Blocks:
180180
case <-time.After(time.Second):
181181
t.Fatalf("Expected the config type tx to create two blocks, but only go the first")
182182
}

0 commit comments

Comments
 (0)