Skip to content

Commit 95094cd

Browse files
committed
[FAB-1359] Drop custom flags for Kafka orderer
https://jira.hyperledger.org/browse/FAB-1359 As we are slowly moving to a setup where the behavior of the orderer is controlled by the config options captured in the genesis block and the orderer YAML file (and their overrides via ENV vars), it's time to drop the flag support that the Kafka orderer provided. This changeset then: 1. Removes all flags from the Kafka orderer. 2. Adds a "verbose" option to the YAML file to control logging for the package that we use to interact with the Kafka cluster (sarama). Additionally it: 3. Prefixes all test-related variables with "test" so as to make tests more legible and remove ambiguity. 4. Updates the test config object to match the keys of the actual config object. 5. Adds a helper test envelope constructor function, and moves some test-related functions around in an effort to consolidate files. Change-Id: Id749d0b88f62a4212854e18b8c469d90fe2f6877 Signed-off-by: Kostas Christidis <[email protected]>
1 parent addfd4d commit 95094cd

19 files changed

+114
-132
lines changed

bddtests/docker-compose-orderer-kafka.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ orderer0:
1212
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
1313
links:
1414
- kafka0
15-
command: orderer -loglevel debug -verbose true
15+
command: orderer

bddtests/environments/orderer-1-kafka-1/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ services:
1616
- ORDERER_GENERAL_ORDERERTYPE=kafka
1717
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
1818
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
19-
command: orderer -loglevel debug -verbose true
19+
command: orderer
2020
depends_on:
2121
- kafka0
2222

bddtests/environments/orderer-1-kafka-3/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ services:
1616
- ORDERER_GENERAL_ORDERERTYPE=kafka
1717
- ORDERER_KAFKA_BROKERS=[kafka0:9092,kafka1:9092,kafka2:9092]
1818
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
19-
command: orderer -loglevel debug -verbose true
19+
command: orderer
2020
depends_on:
2121
- kafka0
2222
- kafka1

bddtests/environments/orderer-n-kafka-n/docker-compose.yml

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ services:
1111
depends_on:
1212
- zookeeper
1313
- kafka
14-
command: -loglevel debug -verbose true
1514

1615
kafka:
1716
build: ../kafka

orderer/kafka/broadcast_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func TestBroadcastResponse(t *testing.T) {
3030
disk := make(chan []byte)
3131

32-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
32+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
3333
defer testClose(t, mb)
3434

3535
mbs := newMockBroadcastStream(t)
@@ -60,7 +60,7 @@ func TestBroadcastResponse(t *testing.T) {
6060
func TestBroadcastBatch(t *testing.T) {
6161
disk := make(chan []byte)
6262

63-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
63+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
6464
defer testClose(t, mb)
6565

6666
mbs := newMockBroadcastStream(t)
@@ -113,7 +113,7 @@ func TestBroadcastBatch(t *testing.T) {
113113
114114
disk := make(chan []byte)
115115
116-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
116+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
117117
defer testClose(t, mb)
118118
119119
mbs := newMockBroadcastStream(t)
@@ -139,7 +139,7 @@ loop:
139139
select {
140140
case <-mbs.outgoing:
141141
t.Fatal("Client shouldn't have received anything from the orderer")
142-
case <-time.After(testConf.General.BatchTimeout + timePadding):
142+
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
143143
break loop // This is the success path
144144
}
145145
}
@@ -154,7 +154,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
154154

155155
disk := make(chan []byte)
156156

157-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
157+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
158158
defer testClose(t, mb)
159159

160160
mbs := newMockBroadcastStream(t)
@@ -189,7 +189,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
189189
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
190190
}
191191
return
192-
case <-time.After(testConf.General.BatchTimeout + timePadding):
192+
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
193193
t.Fatal("Should have received a block by now")
194194
}
195195
}
@@ -206,7 +206,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
206206

207207
disk := make(chan []byte)
208208

209-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
209+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
210210
defer testClose(t, mb)
211211

212212
mbs := newMockBroadcastStream(t)
@@ -247,7 +247,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
247247
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
248248
}
249249
return
250-
case <-time.After(testConf.General.BatchTimeout + timePadding):
250+
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
251251
t.Fatal("Should have received a block by now")
252252
}
253253
}
@@ -256,7 +256,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
256256
func TestBroadcastBatchAndQuitEarly(t *testing.T) {
257257
disk := make(chan []byte)
258258

259-
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
259+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
260260
defer testClose(t, mb)
261261

262262
mbs := newMockBroadcastStream(t)
@@ -301,7 +301,7 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
301301
func TestBroadcastClose(t *testing.T) {
302302
errChan := make(chan error)
303303

304-
mb := mockNewBroadcaster(t, testConf, oldestOffset, make(chan []byte))
304+
mb := mockNewBroadcaster(t, testConf, testOldestOffset, make(chan []byte))
305305
mbs := newMockBroadcastStream(t)
306306
go func() {
307307
if err := mb.Broadcast(mbs); err != nil {

orderer/kafka/broker_mock_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,18 @@ type mockBrockerImpl struct {
3131
}
3232

3333
func mockNewBroker(t *testing.T, conf *config.TopLevel) Broker {
34-
mockBroker := sarama.NewMockBroker(t, brokerID)
34+
mockBroker := sarama.NewMockBroker(t, testBrokerID)
3535
handlerMap := make(map[string]sarama.MockResponse)
3636
// The sarama mock package doesn't allow us to return an error
3737
// for invalid offset requests, so we return an offset of -1.
3838
// Note that the mock offset responses below imply a broker with
39-
// newestOffset-1 blocks available. Therefore, if you are using this
39+
// testNewestOffset-1 blocks available. Therefore, if you are using this
4040
// broker as part of a bigger test where you intend to consume blocks,
4141
// make sure that the mockConsumer has been initialized accordingly
42-
// (Set the 'seek' parameter to newestOffset-1.)
42+
// (Set the 'seek' parameter to testNewestOffset-1.)
4343
handlerMap["OffsetRequest"] = sarama.NewMockOffsetResponse(t).
44-
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, oldestOffset).
45-
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, newestOffset)
44+
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, testOldestOffset).
45+
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, testNewestOffset)
4646
mockBroker.SetHandlerByMap(handlerMap)
4747

4848
broker := sarama.NewBroker(mockBroker.Addr())

orderer/kafka/broker_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
)
2424

2525
func TestBrokerGetOffset(t *testing.T) {
26-
t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, oldestOffset))
27-
t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, newestOffset))
26+
t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, testOldestOffset))
27+
t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, testNewestOffset))
2828
}
2929

3030
func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) {

orderer/kafka/client_deliver_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import (
2424
)
2525

2626
func TestClientDeliverSeekWrong(t *testing.T) {
27-
t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset)-1, 10))
28-
t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(newestOffset), 10))
29-
t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset), 0))
30-
t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(oldestOffset), uint64(testConf.General.MaxWindowSize+1)))
27+
t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset)-1, 10))
28+
t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(testNewestOffset), 10))
29+
t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), 0))
30+
t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), uint64(testConf.General.MaxWindowSize+1)))
3131
}
3232

3333
func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) {
@@ -65,7 +65,7 @@ func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) {
6565

6666
func TestClientDeliverSeek(t *testing.T) {
6767
t.Run("oldest", testClientDeliverSeekFunc("oldest", 0, 10, 10))
68-
t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(middleOffset), 10, 10))
68+
t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(testMiddleOffset), 10, 10))
6969
t.Run("newest", testClientDeliverSeekFunc("newest", 0, 10, 1))
7070
}
7171

@@ -104,8 +104,8 @@ func testClientDeliverSeekFunc(label string, seek, window uint64, expected int)
104104
}
105105

106106
func TestClientDeliverAckWrong(t *testing.T) {
107-
t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(middleOffset)-2))
108-
t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(newestOffset)))
107+
t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(testMiddleOffset)-2))
108+
t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(testNewestOffset)))
109109
}
110110

111111
func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
@@ -123,7 +123,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
123123
}
124124
}()
125125

126-
mds.incoming <- testNewSeekMessage("specific", uint64(middleOffset), 10)
126+
mds.incoming <- testNewSeekMessage("specific", uint64(testMiddleOffset), 10)
127127
mds.incoming <- testNewAckMessage(ack)
128128
for {
129129
select {
@@ -141,7 +141,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
141141
}
142142

143143
func TestClientDeliverAck(t *testing.T) {
144-
t.Run("in-between", testClientDeliverAckFunc("specific", uint64(middleOffset), 10, 10, 2*10))
144+
t.Run("in-between", testClientDeliverAckFunc("specific", uint64(testMiddleOffset), 10, 10, 2*10))
145145
t.Run("newest", testClientDeliverAckFunc("newest", 0, 10, 1, 1))
146146
}
147147

orderer/kafka/common_test.go

-61
This file was deleted.

orderer/kafka/config_test.go

+59-9
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,88 @@ limitations under the License.
1717
package kafka
1818

1919
import (
20+
"testing"
2021
"time"
2122

2223
"github.com/Shopify/sarama"
2324
"github.com/hyperledger/fabric/orderer/localconfig"
25+
cb "github.com/hyperledger/fabric/protos/common"
26+
ab "github.com/hyperledger/fabric/protos/orderer"
2427
)
2528

2629
var (
27-
brokerID = int32(0)
28-
oldestOffset = int64(100) // The oldest block available on the broker
29-
newestOffset = int64(1100) // The offset that will be assigned to the next block
30-
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
30+
testBrokerID = int32(0)
31+
testOldestOffset = int64(100) // The oldest block available on the broker
32+
testNewestOffset = int64(1100) // The offset that will be assigned to the next block
33+
testMiddleOffset = (testOldestOffset + testNewestOffset - 1) / 2 // Just an offset in the middle
3134

3235
// Amount of time to wait for block processing when doing time-based tests
3336
// We generally want this value to be as small as possible so as to make tests execute faster
3437
// But this may have to be bumped up in slower machines
35-
timePadding = 200 * time.Millisecond
38+
testTimePadding = 200 * time.Millisecond
3639
)
3740

3841
var testConf = &config.TopLevel{
3942
General: config.General{
4043
OrdererType: "kafka",
44+
LedgerType: "ram",
4145
BatchTimeout: 500 * time.Millisecond,
4246
BatchSize: 100,
4347
QueueSize: 100,
4448
MaxWindowSize: 100,
4549
ListenAddress: "127.0.0.1",
4650
ListenPort: 7050,
51+
GenesisMethod: "static",
4752
},
4853
Kafka: config.Kafka{
49-
Brokers: []string{"127.0.0.1:9092"},
50-
Topic: "test",
51-
PartitionID: 0,
52-
Version: sarama.V0_9_0_1,
54+
Brokers: []string{"127.0.0.1:9092"},
55+
Retry: config.Retry{
56+
Period: 3 * time.Second,
57+
Stop: 60 * time.Second,
58+
},
59+
Verbose: false,
60+
Version: sarama.V0_9_0_1,
5361
},
5462
}
63+
64+
func testClose(t *testing.T, x Closeable) {
65+
if err := x.Close(); err != nil {
66+
t.Fatal("Cannot close mock resource:", err)
67+
}
68+
}
69+
70+
func newTestEnvelope(content string) *cb.Envelope {
71+
return &cb.Envelope{Payload: []byte(content)}
72+
}
73+
74+
func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate {
75+
var startVal ab.SeekInfo_StartType
76+
switch startLabel {
77+
case "oldest":
78+
startVal = ab.SeekInfo_OLDEST
79+
case "newest":
80+
startVal = ab.SeekInfo_NEWEST
81+
default:
82+
startVal = ab.SeekInfo_SPECIFIED
83+
84+
}
85+
return &ab.DeliverUpdate{
86+
Type: &ab.DeliverUpdate_Seek{
87+
Seek: &ab.SeekInfo{
88+
Start: startVal,
89+
SpecifiedNumber: seekNo,
90+
WindowSize: windowNo,
91+
},
92+
},
93+
}
94+
}
95+
96+
func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate {
97+
return &ab.DeliverUpdate{
98+
Type: &ab.DeliverUpdate_Acknowledgement{
99+
Acknowledgement: &ab.Acknowledgement{
100+
Number: ackNo,
101+
},
102+
},
103+
}
104+
}

orderer/kafka/consumer_mock_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer,
6464
t: t,
6565
}
6666
// Stop-gap hack until #745 is resolved:
67-
if seek >= oldestOffset && seek <= (newestOffset-1) {
67+
if seek >= testOldestOffset && seek <= (testNewestOffset-1) {
6868
mc.testFillWithBlocks(seek - 1) // Prepare the consumer so that the next Recv gives you block "seek"
6969
} else {
7070
err = fmt.Errorf("Out of range seek number given to consumer")
@@ -73,7 +73,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer,
7373
}
7474

7575
func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage {
76-
if mc.consumedOffset >= newestOffset-1 {
76+
if mc.consumedOffset >= testNewestOffset-1 {
7777
return nil
7878
}
7979
mc.consumedOffset++

0 commit comments

Comments
 (0)