Skip to content

Commit 2f3237e

Browse files
committed
Reset timer when cutting a block via timeout
- Fixes: https://jira.hyperledger.org/browse/FAB-899 - Introduces a BroadcastConsecutiveIncompleteBatches test that failed before this fix. - Lowers the BatchTimeout to 500 milliseconds, so as to speed up the execution of the time-based unit tests in the package (drop from 7 to 3 seconds). - Moves the timer.Reset() calls before the sendBlock() ones to make the timing more accurate. Thanks to Bishop Brock for reporting this. Change-Id: I9cd8179fa54398f03e0f3b9ef95c080508cce8a3 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 383f34d commit 2f3237e

File tree

3 files changed

+58
-4
lines changed

3 files changed

+58
-4
lines changed

orderer/kafka/broadcast.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
9898
case msg := <-b.batchChan:
9999
b.messages = append(b.messages, msg)
100100
if len(b.messages) >= int(maxSize) {
101-
if err := b.sendBlock(); err != nil {
102-
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
103-
}
104101
if !timer.Stop() {
105102
<-timer.C
106103
}
107104
timer.Reset(period)
105+
if err := b.sendBlock(); err != nil {
106+
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
107+
}
108108
}
109109
case <-timer.C:
110+
timer.Reset(period)
110111
if len(b.messages) > 0 {
111112
if err := b.sendBlock(); err != nil {
112113
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))

orderer/kafka/broadcast_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,59 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
186186
}
187187
}
188188

189+
func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
190+
if testConf.General.BatchSize <= 1 {
191+
t.Skip("Skipping test as it requires a batchsize > 1")
192+
}
193+
194+
messageCount := int(testConf.General.BatchSize) - 1
195+
196+
disk := make(chan []byte)
197+
198+
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
199+
defer testClose(t, mb)
200+
201+
mbs := newMockBroadcastStream(t)
202+
go func() {
203+
if err := mb.Broadcast(mbs); err != nil {
204+
t.Fatal("Broadcast error:", err)
205+
}
206+
}()
207+
208+
for i := 0; i < 2; i++ {
209+
<-disk // Checkpoint block in first pass, first incomplete block in second pass -- both tested elsewhere
210+
211+
// Pump less than batchSize messages into the system
212+
go func() {
213+
for i := 0; i < messageCount; i++ {
214+
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("message " + strconv.Itoa(i))}
215+
}
216+
}()
217+
218+
// Ignore the broadcast replies as they have been tested elsewhere
219+
for i := 0; i < messageCount; i++ {
220+
<-mbs.outgoing
221+
}
222+
}
223+
224+
for {
225+
select {
226+
case in := <-disk:
227+
block := new(ab.Block)
228+
err := proto.Unmarshal(in, block)
229+
if err != nil {
230+
t.Fatal("Expected a block on the broker's disk")
231+
}
232+
if len(block.Messages) != messageCount {
233+
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
234+
}
235+
return
236+
case <-time.After(testConf.General.BatchTimeout + timePadding):
237+
t.Fatal("Should have received a block by now")
238+
}
239+
}
240+
}
241+
189242
func TestBroadcastBatchAndQuitEarly(t *testing.T) {
190243
disk := make(chan []byte)
191244

orderer/kafka/config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var (
3838
var testConf = &config.TopLevel{
3939
General: config.General{
4040
OrdererType: "kafka",
41-
BatchTimeout: 2 * time.Second,
41+
BatchTimeout: 500 * time.Millisecond,
4242
BatchSize: 100,
4343
QueueSize: 100,
4444
MaxWindowSize: 100,

0 commit comments

Comments
 (0)