@@ -17,6 +17,7 @@ limitations under the License.
17
17
package kafka
18
18
19
19
import (
20
+ "fmt"
20
21
"sync"
21
22
"testing"
22
23
"time"
@@ -39,30 +40,29 @@ func newMockSharedConfigManager() *mocksharedconfig.Manager {
39
40
return & mocksharedconfig.Manager {KafkaBrokersVal : testConf .Kafka .Brokers }
40
41
}
41
42
42
- func syncQueueMessage (msg * cb.Envelope , chain multichain.Chain , bc * mockblockcutter.Receiver ) {
43
- chain .Enqueue (msg )
44
- bc .Block <- struct {}{}
45
- }
46
-
47
43
type mockConsenterImpl struct {
48
44
consenterImpl
49
45
prodDisk , consDisk chan * ab.KafkaMessage
50
46
consumerSetUp bool
51
47
t * testing.T
52
48
}
53
49
54
- func mockNewConsenter (t * testing.T , kafkaVersion sarama.KafkaVersion , retryOptions config.Retry ) * mockConsenterImpl {
50
+ func mockNewConsenter (t * testing.T , kafkaVersion sarama.KafkaVersion , retryOptions config.Retry , nextProducedOffset int64 ) * mockConsenterImpl {
55
51
prodDisk := make (chan * ab.KafkaMessage )
56
52
consDisk := make (chan * ab.KafkaMessage )
57
53
58
54
mockBfValue := func (brokers []string , cp ChainPartition ) (Broker , error ) {
59
55
return mockNewBroker (t , cp )
60
56
}
61
57
mockPfValue := func (brokers []string , kafkaVersion sarama.KafkaVersion , retryOptions config.Retry ) Producer {
62
- return mockNewProducer (t , cp , testOldestOffset , prodDisk )
58
+ // The first Send on this producer will return a blob with offset #nextProducedOffset
59
+ return mockNewProducer (t , cp , nextProducedOffset , prodDisk )
63
60
}
64
- mockCfValue := func (brokers []string , kafkaVersion sarama.KafkaVersion , cp ChainPartition , offset int64 ) (Consumer , error ) {
65
- return mockNewConsumer (t , cp , offset , consDisk )
61
+ mockCfValue := func (brokers []string , kafkaVersion sarama.KafkaVersion , cp ChainPartition , lastPersistedOffset int64 ) (Consumer , error ) {
62
+ if lastPersistedOffset != nextProducedOffset {
63
+ panic (fmt .Errorf ("Mock objects about to be set up incorrectly (consumer to seek to %d, producer to post %d)" , lastPersistedOffset , nextProducedOffset ))
64
+ }
65
+ return mockNewConsumer (t , cp , lastPersistedOffset , consDisk )
66
66
}
67
67
68
68
return & mockConsenterImpl {
@@ -96,6 +96,11 @@ func prepareMockObjectDisks(t *testing.T, co *mockConsenterImpl, ch *chainImpl)
96
96
}
97
97
}
98
98
99
+ func syncQueueMessage (msg * cb.Envelope , chain multichain.Chain , bc * mockblockcutter.Receiver ) {
100
+ chain .Enqueue (msg )
101
+ bc .Block <- struct {}{}
102
+ }
103
+
99
104
func waitableSyncQueueMessage (env * cb.Envelope , messagesToPickUp int , wg * sync.WaitGroup ,
100
105
co * mockConsenterImpl , cs * mockmultichain.ConsenterSupport , ch * chainImpl ) {
101
106
wg .Add (1 )
@@ -128,9 +133,10 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) {
128
133
}
129
134
defer close (cs .BlockCutterVal .Block )
130
135
131
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
132
- ch := newChain (co , cs )
133
- ch .lastProcessed = testOldestOffset - 1
136
+ lastPersistedOffset := testOldestOffset - 1
137
+ nextProducedOffset := lastPersistedOffset + 1
138
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
139
+ ch := newChain (co , cs , lastPersistedOffset )
134
140
135
141
go ch .Start ()
136
142
defer ch .Halt ()
@@ -162,9 +168,10 @@ func TestKafkaConsenterBatchTimer(t *testing.T) {
162
168
}
163
169
defer close (cs .BlockCutterVal .Block )
164
170
165
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
166
- ch := newChain (co , cs )
167
- ch .lastProcessed = testOldestOffset - 1
171
+ lastPersistedOffset := testOldestOffset - 1
172
+ nextProducedOffset := lastPersistedOffset + 1
173
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
174
+ ch := newChain (co , cs , lastPersistedOffset )
168
175
169
176
go ch .Start ()
170
177
defer ch .Halt ()
@@ -213,9 +220,10 @@ func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) {
213
220
}
214
221
defer close (cs .BlockCutterVal .Block )
215
222
216
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
217
- ch := newChain (co , cs )
218
- ch .lastProcessed = testOldestOffset - 1
223
+ lastPersistedOffset := testOldestOffset - 1
224
+ nextProducedOffset := lastPersistedOffset + 1
225
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
226
+ ch := newChain (co , cs , lastPersistedOffset )
219
227
220
228
go ch .Start ()
221
229
defer ch .Halt ()
@@ -272,9 +280,10 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) {
272
280
}
273
281
defer close (cs .BlockCutterVal .Block )
274
282
275
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
276
- ch := newChain (co , cs )
277
- ch .lastProcessed = testOldestOffset - 1
283
+ lastPersistedOffset := testOldestOffset - 1
284
+ nextProducedOffset := lastPersistedOffset + 1
285
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
286
+ ch := newChain (co , cs , lastPersistedOffset )
278
287
279
288
go ch .Start ()
280
289
defer ch .Halt ()
@@ -321,9 +330,10 @@ func TestKafkaConsenterTimeToCutForced(t *testing.T) {
321
330
}
322
331
defer close (cs .BlockCutterVal .Block )
323
332
324
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
325
- ch := newChain (co , cs )
326
- ch .lastProcessed = testOldestOffset - 1
333
+ lastPersistedOffset := testOldestOffset - 1
334
+ nextProducedOffset := lastPersistedOffset + 1
335
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
336
+ ch := newChain (co , cs , lastPersistedOffset )
327
337
328
338
go ch .Start ()
329
339
defer ch .Halt ()
@@ -377,9 +387,10 @@ func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) {
377
387
}
378
388
defer close (cs .BlockCutterVal .Block )
379
389
380
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
381
- ch := newChain (co , cs )
382
- ch .lastProcessed = testOldestOffset - 1
390
+ lastPersistedOffset := testOldestOffset - 1
391
+ nextProducedOffset := lastPersistedOffset + 1
392
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
393
+ ch := newChain (co , cs , lastPersistedOffset )
383
394
384
395
go ch .Start ()
385
396
defer ch .Halt ()
@@ -465,9 +476,10 @@ func TestKafkaConsenterTimeToCutStale(t *testing.T) {
465
476
}
466
477
defer close (cs .BlockCutterVal .Block )
467
478
468
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
469
- ch := newChain (co , cs )
470
- ch .lastProcessed = testOldestOffset - 1
479
+ lastPersistedOffset := testOldestOffset - 1
480
+ nextProducedOffset := lastPersistedOffset + 1
481
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
482
+ ch := newChain (co , cs , lastPersistedOffset )
471
483
472
484
go ch .Start ()
473
485
defer ch .Halt ()
@@ -523,9 +535,10 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {
523
535
}
524
536
defer close (cs .BlockCutterVal .Block )
525
537
526
- co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry )
527
- ch := newChain (co , cs )
528
- ch .lastProcessed = testOldestOffset - 1
538
+ lastPersistedOffset := testOldestOffset - 1
539
+ nextProducedOffset := lastPersistedOffset + 1
540
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
541
+ ch := newChain (co , cs , lastPersistedOffset )
529
542
530
543
go ch .Start ()
531
544
defer ch .Halt ()
@@ -574,3 +587,81 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {
574
587
case <- ch .haltedChan : // If we're here, we definitely had a chance to invoke Append but didn't (which is great)
575
588
}
576
589
}
590
+
591
+ func TestGetLastOffsetPersistedEmpty (t * testing.T ) {
592
+ expected := sarama .OffsetOldest - 1
593
+ actual := getLastOffsetPersisted (& cb.Metadata {})
594
+ if actual != expected {
595
+ t .Fatalf ("Expected last offset %d, got %d" , expected , actual )
596
+ }
597
+ }
598
+
599
+ func TestGetLastOffsetPersistedRight (t * testing.T ) {
600
+ expected := int64 (100 )
601
+ actual := getLastOffsetPersisted (& cb.Metadata {Value : utils .MarshalOrPanic (& ab.KafkaMetadata {LastOffsetPersisted : expected })})
602
+ if actual != expected {
603
+ t .Fatalf ("Expected last offset %d, got %d" , expected , actual )
604
+ }
605
+ }
606
+
607
+ func TestKafkaConsenterRestart (t * testing.T ) {
608
+ var wg sync.WaitGroup
609
+ defer wg .Wait ()
610
+
611
+ batchTimeout , _ := time .ParseDuration ("1ms" )
612
+ cs := & mockmultichain.ConsenterSupport {
613
+ Batches : make (chan []* cb.Envelope ),
614
+ BlockCutterVal : mockblockcutter .NewReceiver (),
615
+ ChainIDVal : provisional .TestChainID ,
616
+ SharedConfigVal : & mocksharedconfig.Manager {BatchTimeoutVal : batchTimeout },
617
+ }
618
+ defer close (cs .BlockCutterVal .Block )
619
+
620
+ lastPersistedOffset := testOldestOffset - 1
621
+ nextProducedOffset := lastPersistedOffset + 1
622
+ co := mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
623
+ ch := newChain (co , cs , lastPersistedOffset )
624
+
625
+ go ch .Start ()
626
+ defer ch .Halt ()
627
+
628
+ prepareMockObjectDisks (t , co , ch )
629
+
630
+ // The second message that will be picked up is the time-to-cut message
631
+ // that will be posted when the short timer expires
632
+ waitableSyncQueueMessage (newTestEnvelope ("one" ), 2 , & wg , co , cs , ch )
633
+
634
+ select {
635
+ case <- cs .Batches : // This is the success path
636
+ case <- time .After (testTimePadding ):
637
+ t .Fatal ("Expected block to be cut because batch timer expired" )
638
+ }
639
+
640
+ // Stop the loop
641
+ ch .Halt ()
642
+
643
+ select {
644
+ case <- cs .Batches :
645
+ t .Fatal ("Expected no invocations of Append" )
646
+ case <- ch .haltedChan : // If we're here, we definitely had a chance to invoke Append but didn't (which is great)
647
+ }
648
+
649
+ lastBlock := cs .WriteBlockVal
650
+ metadata , err := utils .GetMetadataFromBlock (lastBlock , cb .BlockMetadataIndex_ORDERER )
651
+ if err != nil {
652
+ logger .Fatalf ("Error extracting orderer metadata for chain %x: %s" , cs .ChainIDVal , err )
653
+ }
654
+
655
+ lastPersistedOffset = getLastOffsetPersisted (metadata )
656
+ nextProducedOffset = lastPersistedOffset + 1
657
+
658
+ co = mockNewConsenter (t , testConf .Kafka .Version , testConf .Kafka .Retry , nextProducedOffset )
659
+ ch = newChain (co , cs , lastPersistedOffset )
660
+ go ch .Start ()
661
+ prepareMockObjectDisks (t , co , ch )
662
+
663
+ actual := ch .producer .(* mockProducerImpl ).producedOffset
664
+ if actual != nextProducedOffset {
665
+ t .Fatalf ("Restarted orderer post-connect should have been at offset %d, got %d instead" , nextProducedOffset , actual )
666
+ }
667
+ }
0 commit comments