Skip to content

Commit 4d2a17c

Browse files
committed
[FAB-4357] Modify retry options for Kafka config
This changeset modifies the `Retry` options of the Kakfa-based orderer configuration. It adds short/long retry interval switches as suggested in FAB-4121 and exposes some retry/timeout-related settings from the underlying sarama client library. Defaults are introduced for all of the new options. This is part of fixing FAB-4136. Change-Id: I75b9a726a9948e0c68e062f6500b6376c3a1bba7 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 1a721b1 commit 4d2a17c

File tree

9 files changed

+263
-67
lines changed

9 files changed

+263
-67
lines changed

bddtests/dc-orderer-base.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ services:
1414
- ORDERER_GENERAL_GENESISFILE=${ORDERER_GENERAL_GENESISFILE}
1515
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
1616
- ORDERER_GENERAL_LOGLEVEL=debug
17-
- ORDERER_KAFKA_VERBOSE=true
1817
- ORDERER_GENERAL_TLS_ENABLED=true
18+
- ORDERER_KAFKA_RETRY_SHORTINTERVAL=1s
19+
- ORDERER_KAFKA_RETRY_SHORTTOTAL=30s
20+
- ORDERER_KAFKA_VERBOSE=true
1921
volumes:
2022
- ./volumes/orderer:/var/hyperledger/bddtests/volumes/orderer
2123
- /etc/hyperledger/msp:/etc/hyperledger/msp

examples/cluster/config/orderer.yaml

+48-6
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,59 @@ RAMLedger:
138138
#
139139
# SECTION: Kafka
140140
#
141-
# - This section applies to the configuration of the Kafka-based orderer.
141+
# - This section applies to the configuration of the Kafka-based orderer, and
142+
# its interaction with the Kafka cluster.
142143
#
143144
################################################################################
144145
Kafka:
145146

146-
# Retry: What to do if none of the Kafka brokers are available.
147+
# Retry: What do if a connection to the Kafka cluster cannot be established,
148+
# or if a metadata request to the Kafka cluster needs to be repeated.
147149
Retry:
148-
# The orderer should attempt to reconnect every <Period>
149-
Period: 3s
150-
# Panic if <Stop> has elapsed and no connection has been established
151-
Stop: 60s
150+
# When a new channel is created, or when an existing channel is reloaded
151+
# (as part of the orderer's bootstrapping), the orderer interacts with
152+
# the Kafka cluster in the following ways:
153+
# 1. It creates a Kafka producer (writer) for the Kafka partition that
154+
# corresponds to the channel.
155+
# 2. It uses that producer to post a no-op CONNECT message to that
156+
# partition
157+
# 3. It creates a Kafka consumer (reader) for that partition.
158+
# If any of these steps fails, they will be re-attempted every
159+
# <ShortInterval> for a total of <ShortTotal>, and then every
160+
# <LongInterval> for a total of <LongTotal> until they succeed:
161+
# 1. Creation of Kafka producer (writer) for the channel
162+
# 2. Creation of Kafka consumer (reader) for the channel
163+
# 3. Posting of no-op CONNECT message to the channel
164+
# Note that the orderer will be unable to write to or read from a
165+
# channel until all of the steps above have been completed successfully.
166+
ShortInterval: 5s
167+
ShortTotal: 10m
168+
LongInterval: 5m
169+
LongTotal: 12h
170+
# Affects the socket timeouts when waiting for an initial connection, a
171+
# response, or a transmission. See Config.Net for more info:
172+
# https://godoc.org/github.com/Shopify/sarama#Config
173+
NetworkTimeouts:
174+
DialTimeout: 10s
175+
ReadTimeout: 10s
176+
WriteTimeout: 10s
177+
# Affects the metadata requests when the Kafka cluster is in the middle
178+
# of a leader election.See Config.Metadata for more info:
179+
# https://godoc.org/github.com/Shopify/sarama#Config
180+
Metadata:
181+
RetryBackoff: 250ms
182+
RetryMax: 3
183+
# What to do if posting a message to the Kafka cluster fails. See
184+
# Config.Producer for more info:
185+
# https://godoc.org/github.com/Shopify/sarama#Config
186+
Producer:
187+
RetryBackoff: 100ms
188+
RetryMax: 3
189+
# What to do if reading from the Kafka cluster fails. See
190+
# Config.Consumer for more info:
191+
# https://godoc.org/github.com/Shopify/sarama#Config
192+
Consumer:
193+
RetryBackoff: 2s
152194

153195
# Verbose: Enable logging for interactions with the Kafka cluster.
154196
Verbose: false

orderer/kafka/chain.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -428,9 +428,9 @@ func setupProducerForChannel(brokers []string, brokerConfig *sarama.Config, chan
428428
var producer sarama.SyncProducer
429429

430430
// This will be revised in: https://jira.hyperledger.org/browse/FAB-4136
431-
repeatTick := time.NewTicker(retryOptions.Period)
432-
panicTick := time.NewTicker(retryOptions.Stop)
433-
logger.Debugf("[channel: %s] Retrying every %s for a total of %s", channel.topic(), retryOptions.Period.String(), retryOptions.Stop.String())
431+
repeatTick := time.NewTicker(retryOptions.ShortInterval)
432+
panicTick := time.NewTicker(retryOptions.ShortTotal)
433+
logger.Debugf("[channel: %s] Retrying every %s for a total of %s", channel.topic(), retryOptions.ShortInterval.String(), retryOptions.ShortTotal.String())
434434
defer repeatTick.Stop()
435435
defer panicTick.Stop()
436436

orderer/kafka/chain_test.go

+11-32
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,8 @@ func TestChain(t *testing.T) {
100100
})
101101

102102
t.Run("StartWithConnectMessageError", func(t *testing.T) {
103-
mockBrokerConfigCopy := *mockBrokerConfig
104-
mockBrokerConfigCopy.Net.ReadTimeout = 5 * time.Millisecond
105-
mockBrokerConfigCopy.Consumer.Retry.Backoff = 5 * time.Millisecond
106-
mockBrokerConfigCopy.Metadata.Retry.Max = 1
107-
108-
mockConsenterCopy := newMockConsenter(&mockBrokerConfigCopy, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)
109-
110-
chain, _ := newChain(mockConsenterCopy, mockSupport, newestOffset-1)
103+
// Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max
104+
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
111105

112106
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
113107
"MetadataRequest": sarama.NewMockMetadataResponse(t).
@@ -131,14 +125,8 @@ func TestChain(t *testing.T) {
131125
})
132126

133127
t.Run("StartWithConsumerForChannelError", func(t *testing.T) {
134-
mockBrokerConfigCopy := *mockBrokerConfig
135-
mockBrokerConfigCopy.Net.ReadTimeout = 5 * time.Millisecond
136-
mockBrokerConfigCopy.Consumer.Retry.Backoff = 5 * time.Millisecond
137-
mockBrokerConfigCopy.Metadata.Retry.Max = 1
138-
139-
mockConsenterCopy := newMockConsenter(&mockBrokerConfigCopy, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)
140-
141-
chain, _ := newChain(mockConsenterCopy, mockSupport, newestOffset) // Provide an out-of-range offset
128+
// Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max
129+
chain, _ := newChain(mockConsenter, mockSupport, newestOffset) // Provide an out-of-range offset
142130

143131
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
144132
"MetadataRequest": sarama.NewMockMetadataResponse(t).
@@ -451,7 +439,7 @@ func TestProcessLoopRegularError(t *testing.T) {
451439
}
452440

453441
func TestProcessLoopRegularQueueEnvelope(t *testing.T) {
454-
batchTimeout, _ := time.ParseDuration("1s")
442+
batchTimeout, _ := time.ParseDuration("100s") // Something big
455443
newestOffset := int64(5)
456444
lastCutBlockNumber := uint64(3)
457445
haltedFlag := false
@@ -725,12 +713,9 @@ func TestProcessLoopRegularAndSendTimeToCutError(t *testing.T) {
725713
metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError)
726714
mockBroker.Returns(metadataResponse)
727715

728-
mockBrokerConfigCopy := *mockBrokerConfig
729-
mockBrokerConfigCopy.Net.ReadTimeout = 5 * time.Millisecond
730-
mockBrokerConfigCopy.Consumer.Retry.Backoff = 5 * time.Millisecond
731-
mockBrokerConfigCopy.Metadata.Retry.Max = 1
716+
// Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max
732717

733-
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, &mockBrokerConfigCopy)
718+
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig)
734719
assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer")
735720

736721
failureResponse := new(sarama.ProduceResponse)
@@ -987,12 +972,9 @@ func TestSendConnectMessage(t *testing.T) {
987972
metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError)
988973
mockBroker.Returns(metadataResponse)
989974

990-
mockBrokerConfigCopy := *mockBrokerConfig
991-
mockBrokerConfigCopy.Net.ReadTimeout = 5 * time.Millisecond
992-
mockBrokerConfigCopy.Consumer.Retry.Backoff = 5 * time.Millisecond
993-
mockBrokerConfigCopy.Metadata.Retry.Max = 1
975+
// Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max
994976

995-
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, &mockBrokerConfigCopy)
977+
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig)
996978
assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer")
997979

998980
t.Run("Proper", func(t *testing.T) {
@@ -1022,12 +1004,9 @@ func TestSendTimeToCut(t *testing.T) {
10221004
metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError)
10231005
mockBroker.Returns(metadataResponse)
10241006

1025-
mockBrokerConfigCopy := *mockBrokerConfig
1026-
mockBrokerConfigCopy.Net.ReadTimeout = 5 * time.Millisecond
1027-
mockBrokerConfigCopy.Consumer.Retry.Backoff = 5 * time.Millisecond
1028-
mockBrokerConfigCopy.Metadata.Retry.Max = 1
1007+
// Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max
10291008

1030-
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, &mockBrokerConfigCopy)
1009+
producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig)
10311010
assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer")
10321011

10331012
timeToCutBlockNumber := uint64(3)

orderer/kafka/config.go

+13
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ func newBrokerConfig(tlsConfig localconfig.TLS, retryOptions localconfig.Retry,
2020
// FIXME https://jira.hyperledger.org/browse/FAB-4136
2121
// Use retryOptions to populate `Net`
2222

23+
brokerConfig.Consumer.Retry.Backoff = retryOptions.Consumer.RetryBackoff
24+
2325
// Allows us to retrieve errors that occur when consuming a channel, via the
2426
// channel's `listenForErrors` goroutine.
2527
brokerConfig.Consumer.Return.Errors = true
2628

29+
brokerConfig.Metadata.Retry.Backoff = retryOptions.Metadata.RetryBackoff
30+
brokerConfig.Metadata.Retry.Max = retryOptions.Metadata.RetryMax
31+
32+
brokerConfig.Net.DialTimeout = retryOptions.NetworkTimeouts.DialTimeout
33+
brokerConfig.Net.ReadTimeout = retryOptions.NetworkTimeouts.ReadTimeout
34+
brokerConfig.Net.WriteTimeout = retryOptions.NetworkTimeouts.WriteTimeout
35+
2736
brokerConfig.Net.TLS.Enable = tlsConfig.Enabled
2837
if brokerConfig.Net.TLS.Enable {
2938
// create public/private key pair structure
@@ -49,6 +58,10 @@ func newBrokerConfig(tlsConfig localconfig.TLS, retryOptions localconfig.Retry,
4958
// Set equivalent of Kafka producer config max.request.bytes to the default
5059
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
5160
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize) // FIXME https://jira.hyperledger.org/browse/FAB-4083
61+
62+
brokerConfig.Producer.Retry.Backoff = retryOptions.Producer.RetryBackoff
63+
brokerConfig.Producer.Retry.Max = retryOptions.Producer.RetryMax
64+
5265
// A partitioner is actually not needed the way we do things now,
5366
// but we're adding it now to allow for flexibility in the future.
5467
brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)

orderer/kafka/consenter_test.go

+26-6
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,31 @@ import (
2626
"github.com/stretchr/testify/assert"
2727
)
2828

29+
var mockRetryOptions = localconfig.Retry{
30+
ShortInterval: 50 * time.Millisecond,
31+
ShortTotal: 200 * time.Millisecond,
32+
LongInterval: 200 * time.Millisecond,
33+
LongTotal: 1 * time.Second,
34+
NetworkTimeouts: localconfig.NetworkTimeouts{
35+
DialTimeout: 5 * time.Millisecond,
36+
ReadTimeout: 5 * time.Millisecond,
37+
WriteTimeout: 5 * time.Millisecond,
38+
},
39+
Metadata: localconfig.Metadata{
40+
RetryMax: 2,
41+
RetryBackoff: 40 * time.Millisecond,
42+
},
43+
Producer: localconfig.Producer{
44+
RetryMax: 2,
45+
RetryBackoff: 30 * time.Millisecond,
46+
},
47+
Consumer: localconfig.Consumer{
48+
RetryBackoff: 20 * time.Millisecond,
49+
},
50+
}
51+
2952
func init() {
30-
mockLocalConfig = newMockLocalConfig(false, 50, 200, false)
53+
mockLocalConfig = newMockLocalConfig(false, mockRetryOptions, false)
3154
mockBrokerConfig = newMockBrokerConfig(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, defaultPartition)
3255
mockConsenter = newMockConsenter(mockBrokerConfig, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)
3356
setupTestLogging("ERROR", mockLocalConfig.Kafka.Verbose)
@@ -113,18 +136,15 @@ func newMockEnvelope(content string) *cb.Envelope {
113136
return &cb.Envelope{Payload: []byte(content)}
114137
}
115138

116-
func newMockLocalConfig(enableTLS bool, retryPeriod int, retryStop int, verboseLog bool) *localconfig.TopLevel {
139+
func newMockLocalConfig(enableTLS bool, retryOptions localconfig.Retry, verboseLog bool) *localconfig.TopLevel {
117140
return &localconfig.TopLevel{
118141
General: localconfig.General{
119142
TLS: localconfig.TLS{
120143
Enabled: enableTLS,
121144
},
122145
},
123146
Kafka: localconfig.Kafka{
124-
Retry: localconfig.Retry{
125-
Period: time.Duration(retryPeriod) * time.Millisecond,
126-
Stop: time.Duration(retryStop) * time.Millisecond,
127-
},
147+
Retry: retryOptions,
128148
Verbose: verboseLog,
129149
Version: sarama.V0_9_0_1,
130150
},

0 commit comments

Comments
 (0)