Skip to content

Commit 383f34d

Browse files
committed
Add BroadcastIncompleteBatch test to Kafka orderer
Test that a batch is cut even if the batch size requirement is not met. Change-Id: Ic02cea7bd835a85a833af21cab45626677f00817 Signed-off-by: Kostas Christidis <[email protected]>
1 parent b4473da commit 383f34d

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

orderer/kafka/broadcast_test.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,58 @@ func TestBroadcastBatch(t *testing.T) {
130130
}
131131
return
132132
case <-time.After(500 * time.Millisecond):
133-
t.Fatal("Should have received the initialization block by now")
133+
t.Fatal("Should have received a block by now")
134+
}
135+
}
136+
}
137+
138+
func TestBroadcastIncompleteBatch(t *testing.T) {
139+
if testConf.General.BatchSize <= 1 {
140+
t.Skip("Skipping test as it requires a batchsize > 1")
141+
}
142+
143+
messageCount := int(testConf.General.BatchSize) - 1
144+
145+
disk := make(chan []byte)
146+
147+
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
148+
defer testClose(t, mb)
149+
150+
mbs := newMockBroadcastStream(t)
151+
go func() {
152+
if err := mb.Broadcast(mbs); err != nil {
153+
t.Fatal("Broadcast error:", err)
154+
}
155+
}()
156+
157+
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
158+
159+
// Pump less than batchSize messages into the system
160+
go func() {
161+
for i := 0; i < messageCount; i++ {
162+
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("message " + strconv.Itoa(i))}
163+
}
164+
}()
165+
166+
// Ignore the broadcast replies as they have been tested elsewhere
167+
for i := 0; i < messageCount; i++ {
168+
<-mbs.outgoing
169+
}
170+
171+
for {
172+
select {
173+
case in := <-disk:
174+
block := new(ab.Block)
175+
err := proto.Unmarshal(in, block)
176+
if err != nil {
177+
t.Fatal("Expected a block on the broker's disk")
178+
}
179+
if len(block.Messages) != messageCount {
180+
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
181+
}
182+
return
183+
case <-time.After(testConf.General.BatchTimeout + timePadding):
184+
t.Fatal("Should have received a block by now")
134185
}
135186
}
136187
}
@@ -177,7 +228,7 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
177228
}
178229
return
179230
case <-time.After(500 * time.Millisecond):
180-
t.Fatal("Should have received the initialization block by now")
231+
t.Fatal("Should have received a block by now")
181232
}
182233
}
183234
}

orderer/kafka/config_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ var (
2828
oldestOffset = int64(100) // The oldest block available on the broker
2929
newestOffset = int64(1100) // The offset that will be assigned to the next block
3030
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
31+
32+
// Amount of time to wait for block processing when doing time-based tests
33+
// We generally want this value to be as small as possible so as to make tests execute faster
34+
// But this may have to be bumped up in slower machines
35+
timePadding = 200 * time.Millisecond
3136
)
3237

3338
var testConf = &config.TopLevel{

0 commit comments

Comments
 (0)