Skip to content

Commit 6f84396

Browse files
author
Luis Sanchez
committed
[FAB-4537] Sporadic CI failures in orderer/kafka
When halting fabric channel it is possible to select on the closure of the kafka consumer channel before the haltChan. Also reverts 26d71e0, re-enabling the TestChain/EnqueueProper test. Change-Id: I01d63801bf29ce1e73b4bdc84a3945570c2c515a Signed-off-by: Luis Sanchez <[email protected]>
1 parent f20846c commit 6f84396

7 files changed

+68
-37
lines changed

orderer/kafka/chain.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
205205
counts[indexExitChanPass]++
206206
return counts, nil
207207
case kafkaErr := <-chain.channelConsumer.Errors():
208-
logger.Error(kafkaErr)
208+
logger.Errorf("[channel: %s] Error during consumption: %s", chain.support.ChainID(), kafkaErr)
209209
counts[indexRecvError]++
210210
select {
211211
case <-chain.errorChan: // If already closed, don't do anything
@@ -221,14 +221,17 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
221221
// mark the chain as available, so we have to force that trigger via
222222
// the emission of a CONNECT message. TODO Consider rate limiting
223223
go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)
224-
case in := <-chain.channelConsumer.Messages():
224+
case in, ok := <-chain.channelConsumer.Messages():
225+
if !ok {
226+
logger.Criticalf("[channel: %s] Kafka consumer closed.", chain.support.ChainID())
227+
return counts, nil
228+
}
225229
select {
226230
case <-chain.errorChan: // If this channel was closed...
227231
chain.errorChan = make(chan struct{}) // ...make a new one.
228232
logger.Infof("[channel: %s] Marked consenter as available again", chain.support.ChainID())
229233
default:
230234
}
231-
232235
if err := proto.Unmarshal(in.Value, msg); err != nil {
233236
// This shouldn't happen, it should be filtered at ingress
234237
logger.Criticalf("[channel: %s] Unable to unmarshal consumed message = %s", chain.support.ChainID(), err)

orderer/kafka/chain_test.go

+48-27
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,38 @@ var (
3131
)
3232

3333
func TestChain(t *testing.T) {
34-
mockChannel := newChannel("foo.channel", defaultPartition)
3534

3635
oldestOffset := int64(0)
3736
newestOffset := int64(5)
3837

3938
message := sarama.StringEncoder("messageFoo")
4039

41-
mockBroker := sarama.NewMockBroker(t, 0)
42-
defer func() { mockBroker.Close() }()
43-
44-
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
45-
"MetadataRequest": sarama.NewMockMetadataResponse(t).
46-
SetBroker(mockBroker.Addr(), mockBroker.BrokerID()).
47-
SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()),
48-
"ProduceRequest": sarama.NewMockProduceResponse(t).
49-
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError),
50-
"OffsetRequest": sarama.NewMockOffsetResponse(t).
51-
SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset).
52-
SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset),
53-
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
54-
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
55-
})
56-
57-
mockSupport := &mockmultichain.ConsenterSupport{
58-
ChainIDVal: mockChannel.topic(),
59-
HeightVal: uint64(3),
60-
SharedConfigVal: &mockconfig.Orderer{KafkaBrokersVal: []string{mockBroker.Addr()}},
40+
newMocks := func(t *testing.T) (mockChannel channel, mockBroker *sarama.MockBroker, mockSupport *mockmultichain.ConsenterSupport) {
41+
mockChannel = newChannel(channelNameForTest(t), defaultPartition)
42+
mockBroker = sarama.NewMockBroker(t, 0)
43+
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
44+
"MetadataRequest": sarama.NewMockMetadataResponse(t).
45+
SetBroker(mockBroker.Addr(), mockBroker.BrokerID()).
46+
SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()),
47+
"ProduceRequest": sarama.NewMockProduceResponse(t).
48+
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError),
49+
"OffsetRequest": sarama.NewMockOffsetResponse(t).
50+
SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset).
51+
SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset),
52+
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
53+
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
54+
})
55+
mockSupport = &mockmultichain.ConsenterSupport{
56+
ChainIDVal: mockChannel.topic(),
57+
HeightVal: uint64(3),
58+
SharedConfigVal: &mockconfig.Orderer{KafkaBrokersVal: []string{mockBroker.Addr()}},
59+
}
60+
return
6161
}
6262

6363
t.Run("New", func(t *testing.T) {
64+
_, mockBroker, mockSupport := newMocks(t)
65+
defer func() { mockBroker.Close() }()
6466
chain, err := newChain(mockConsenter, mockSupport, newestOffset-1)
6567

6668
assert.NoError(t, err, "Expected newChain to return without errors")
@@ -87,6 +89,8 @@ func TestChain(t *testing.T) {
8789
})
8890

8991
t.Run("Start", func(t *testing.T) {
92+
_, mockBroker, mockSupport := newMocks(t)
93+
defer func() { mockBroker.Close() }()
9094
// Set to -1 because we haven't sent the CONNECT message yet
9195
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
9296

@@ -103,6 +107,8 @@ func TestChain(t *testing.T) {
103107
})
104108

105109
t.Run("Halt", func(t *testing.T) {
110+
_, mockBroker, mockSupport := newMocks(t)
111+
defer func() { mockBroker.Close() }()
106112
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
107113

108114
chain.Start()
@@ -132,6 +138,8 @@ func TestChain(t *testing.T) {
132138
})
133139

134140
t.Run("DoubleHalt", func(t *testing.T) {
141+
_, mockBroker, mockSupport := newMocks(t)
142+
defer func() { mockBroker.Close() }()
135143
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
136144

137145
chain.Start()
@@ -148,6 +156,8 @@ func TestChain(t *testing.T) {
148156
})
149157

150158
t.Run("StartWithProducerForChannelError", func(t *testing.T) {
159+
_, mockBroker, mockSupport := newMocks(t)
160+
defer func() { mockBroker.Close() }()
151161
// Point to an empty brokers list
152162
mockSupportCopy := *mockSupport
153163
mockSupportCopy.SharedConfigVal = &mockconfig.Orderer{KafkaBrokersVal: []string{}}
@@ -164,6 +174,8 @@ func TestChain(t *testing.T) {
164174
// - Net.ReadTimeout
165175
// - Consumer.Retry.Backoff
166176
// - Metadata.Retry.Max
177+
mockChannel, mockBroker, mockSupport := newMocks(t)
178+
defer func() { mockBroker.Close() }()
167179
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
168180

169181
// Have the broker return an ErrNotLeaderForPartition error
@@ -184,6 +196,8 @@ func TestChain(t *testing.T) {
184196
})
185197

186198
t.Run("EnqueueIfNotStarted", func(t *testing.T) {
199+
mockChannel, mockBroker, mockSupport := newMocks(t)
200+
defer func() { mockBroker.Close() }()
187201
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
188202

189203
// As in StartWithConnectMessageError, have the broker return an
@@ -211,6 +225,9 @@ func TestChain(t *testing.T) {
211225
// - Consumer.Retry.Backoff
212226
// - Metadata.Retry.Max
213227

228+
mockChannel, mockBroker, mockSupport := newMocks(t)
229+
defer func() { mockBroker.Close() }()
230+
214231
// Provide an out-of-range offset
215232
chain, _ := newChain(mockConsenter, mockSupport, newestOffset)
216233

@@ -231,8 +248,8 @@ func TestChain(t *testing.T) {
231248
})
232249

233250
t.Run("EnqueueProper", func(t *testing.T) {
234-
t.Skip("Skipping test until FAB-4537 is resolved")
235-
251+
mockChannel, mockBroker, mockSupport := newMocks(t)
252+
defer func() { mockBroker.Close() }()
236253
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
237254

238255
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
@@ -264,6 +281,8 @@ func TestChain(t *testing.T) {
264281
})
265282

266283
t.Run("EnqueueIfHalted", func(t *testing.T) {
284+
mockChannel, mockBroker, mockSupport := newMocks(t)
285+
defer func() { mockBroker.Close() }()
267286
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
268287

269288
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
@@ -293,6 +312,8 @@ func TestChain(t *testing.T) {
293312
})
294313

295314
t.Run("EnqueueError", func(t *testing.T) {
315+
mockChannel, mockBroker, mockSupport := newMocks(t)
316+
defer func() { mockBroker.Close() }()
296317
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
297318

298319
// Use the "good" handler map that allows the Stage to complete without
@@ -336,7 +357,7 @@ func TestSetupProducerForChannel(t *testing.T) {
336357
mockBroker := sarama.NewMockBroker(t, 0)
337358
defer mockBroker.Close()
338359

339-
mockChannel := newChannel("foo.channel", defaultPartition)
360+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
340361

341362
haltChan := make(chan struct{})
342363

@@ -361,7 +382,7 @@ func TestSetupConsumerForChannel(t *testing.T) {
361382
mockBroker := sarama.NewMockBroker(t, 0)
362383
defer func() { mockBroker.Close() }()
363384

364-
mockChannel := newChannel("foo.channel", defaultPartition)
385+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
365386

366387
oldestOffset := int64(0)
367388
newestOffset := int64(5)
@@ -412,7 +433,7 @@ func TestSetupConsumerForChannel(t *testing.T) {
412433
}
413434

414435
func TestCloseKafkaObjects(t *testing.T) {
415-
mockChannel := newChannel("foo.channel", defaultPartition)
436+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
416437

417438
mockSupport := &mockmultichain.ConsenterSupport{
418439
ChainIDVal: mockChannel.topic(),
@@ -526,7 +547,7 @@ func TestGetLastCutBlockNumber(t *testing.T) {
526547
}
527548

528549
func TestGetLastOffsetPersisted(t *testing.T) {
529-
mockChannel := newChannel("foo.channel", defaultPartition)
550+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
530551
mockMetadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: int64(5)})}
531552

532553
testCases := []struct {

orderer/kafka/channel_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import (
2424
)
2525

2626
func TestChannel(t *testing.T) {
27-
chn := newChannel("foo.channel", defaultPartition)
27+
chn := newChannel(channelNameForTest(t), defaultPartition)
2828

29-
expectedTopic := fmt.Sprintf("%s", "foo.channel")
29+
expectedTopic := fmt.Sprintf("%s", channelNameForTest(t))
3030
actualTopic := chn.topic()
3131
assert.Equal(t, expectedTopic, actualTopic, "Got the wrong topic, expected %s, got %s instead", expectedTopic, actualTopic)
3232

orderer/kafka/config_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import (
1717
)
1818

1919
func TestBrokerConfig(t *testing.T) {
20-
mockChannel1 := newChannel("foo.channel", defaultPartition)
20+
mockChannel1 := newChannel(channelNameForTest(t), defaultPartition)
2121
// Use a partition ID that is not the 'default' (defaultPartition)
2222
var differentPartition int32 = defaultPartition + 1
23-
mockChannel2 := newChannel("foo.channel", differentPartition)
23+
mockChannel2 := newChannel(channelNameForTest(t), differentPartition)
2424

2525
mockBroker := sarama.NewMockBroker(t, 0)
2626
defer func() { mockBroker.Close() }()

orderer/kafka/consenter_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ SPDX-License-Identifier: Apache-2.0
77
package kafka
88

99
import (
10+
"fmt"
1011
"log"
1112
"os"
13+
"strings"
1214
"testing"
1315
"time"
1416

@@ -67,7 +69,7 @@ func TestHandleChain(t *testing.T) {
6769
newestOffset := int64(5)
6870
message := sarama.StringEncoder("messageFoo")
6971

70-
mockChannel := newChannel("foo.channel", defaultPartition)
72+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
7173

7274
mockBroker := sarama.NewMockBroker(t, 0)
7375
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
@@ -171,3 +173,8 @@ func tamperBytes(original []byte) []byte {
171173
byteCount := len(original)
172174
return original[:byteCount-1]
173175
}
176+
177+
func channelNameForTest(t *testing.T) string {
178+
name := strings.Split(fmt.Sprint(t), " ")[18] // w/golang 1.8, use t.Name()
179+
return fmt.Sprintf("%s.channel", strings.Replace(strings.ToLower(name), "/", ".", -1))
180+
}

orderer/kafka/partitioner_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestStaticPartitioner(t *testing.T) {
1818
var numberOfPartitions int32 = 6
1919

2020
partitionerConstructor := newStaticPartitioner(partition)
21-
partitioner := partitionerConstructor("foo.channel")
21+
partitioner := partitionerConstructor(channelNameForTest(t))
2222

2323
for i := 0; i < 10; i++ {
2424
assignedPartition, err := partitioner.Partition(new(sarama.ProducerMessage), numberOfPartitions)

orderer/kafka/retry_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
func TestRetry(t *testing.T) {
1717
var rp *retryProcess
1818

19-
mockChannel := newChannel("foo.channel", defaultPartition)
19+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
2020
flag := false
2121

2222
noErrorFn := func() error {

0 commit comments

Comments
 (0)