Skip to content

Commit f6640f2

Browse files
committed
Add Kafka-based orderer
Related: http://jira.hyperledger.org/browse/FAB-32 This changeset introduces a simple, solo-like Kafka-based orderer, complete with a config file that ties into the orderer config mechanism introduced in an earlier changeset, unit and BDD tests. It also provides a sample client that broadcasts and delivers counter values. For a demo of this work please watch this video: https://ibm.box.com/s/kqkk12vn18w3s3in2vkioucl9z32jk2h This changeset introduces the following abstractions: - Broker: Provides info on the atomic broadcast seek requests (earliest batch available, etc.) - Producer: Sends batches to Kafka - Consumer: Reads a stream of batches from Kafka - Client Deliver: A consumer dedicated to a connected client - Deliverer: Handles the deliver part of the Kafka-based orderer, spawns a new Client Deliver instance per connected client - Broadcaster: Handles the broadcast part of the Kafka-based orderer; cuts batches and sends them to Kafka - Orderer: Consists of a Deliverer and Broadcaster and, as the name suggests, handles all ordering requests (broadcast and deliver RPCs) issued by the connected clients. Change-Id: I09a313e9bf681051ee73b35d8d14401fee234f02 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 3cedee1 commit f6640f2

34 files changed

+2519
-76
lines changed
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
ordererBase:
2+
image: hyperledger/fabric-orderer
3+
environment:
4+
- ORDERER_GENERAL_LEDGERTYPE=ram
5+
- ORDERER_GENERAL_BATCHTIMEOUT=10s
6+
- ORDERER_GENERAL_BATCHSIZE=10
7+
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
8+
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
9+
- ORDERER_GENERAL_LISTENPORT=5005
10+
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
11+
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
12+
command: orderer
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
kafka0:
2+
image: kchristidis/kafka
3+
environment:
4+
- ADVERTISED_PORT=9092
5+
6+
orderer0:
7+
extends:
8+
file: docker-compose-orderer-base.yml
9+
service: ordererBase
10+
environment:
11+
- ORDERER_GENERAL_ORDERERTYPE=kafka
12+
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
13+
links:
14+
- kafka0
15+
command: orderer -loglevel debug -verbose true
+5-13
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
orderer0:
2-
image: hyperledger/fabric-orderer
3-
environment:
4-
- ORDERER_GENERAL_ORDERERTYPE=solo
5-
- ORDERER_GENERAL_LEDGERTYPE=ram
6-
- ORDERER_GENERAL_BATCHTIMEOUT=10s
7-
- ORDERER_GENERAL_BATCHSIZE=10
8-
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
9-
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
10-
- ORDERER_GENERAL_LISTENPORT=5005
11-
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
12-
13-
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
14-
command: orderer
2+
extends:
3+
file: docker-compose-orderer-base.yml
4+
service: ordererBase
5+
environment:
6+
- ORDERER_GENERAL_ORDERERTYPE=solo

bddtests/features/orderer.feature

+45-47
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,73 @@
1-
#
2-
# Test Orderer
3-
#
41
# Tags that can be used and will affect test internals:
5-
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends. Useful for setting up environment and reviewing after scenario.
6-
# @chaincodeImagesUpToDate use this if all scenarios chaincode images are up to date, and do NOT require building. BE SURE!!!
2+
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends.
3+
# Useful for setting up environment and reviewing after scenario.
74

8-
#@chaincodeImagesUpToDate
95
@orderer
106
Feature: Orderer
117
As a Fabric developer
128
I want to run and validate a orderer service
139

1410

15-
16-
1711
# @doNotDecompose
18-
Scenario Outline: Basic orderer function
12+
Scenario Outline: Basic orderer function
1913

20-
Given we compose "<ComposeFile>"
21-
And I wait ".5" seconds
22-
And user "binhn" is an authorized user of the ordering service
23-
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
24-
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
14+
Given we compose "<ComposeFile>"
15+
And I wait "<BootTime>" seconds
16+
And user "binhn" is an authorized user of the ordering service
17+
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
18+
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
2519
| Start | SpecifiedNumber | WindowSize |
2620
| SPECIFIED | 1 | 10 |
27-
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
28-
29-
Examples: Orderer Options
30-
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
31-
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
32-
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
33-
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |
34-
21+
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
3522

23+
Examples: Orderer Options
24+
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
25+
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
26+
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
27+
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
28+
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
29+
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
30+
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |
3631

3732
# @doNotDecompose
38-
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)
33+
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)
3934

40-
Given we compose "<ComposeFile>"
41-
And I wait ".5" seconds
42-
And user "binhn" is an authorized user of the ordering service
43-
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
44-
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
35+
Given we compose "<ComposeFile>"
36+
And I wait "<BootTime>" seconds
37+
And user "binhn" is an authorized user of the ordering service
38+
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
39+
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
4540
| Start | SpecifiedNumber | WindowSize |
4641
| SPECIFIED | 1 | 10 |
47-
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
48-
When user "binhn" seeks to block "1" on deliver function on "orderer0"
49-
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds
50-
42+
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
43+
When user "binhn" seeks to block "1" on deliver function on "orderer0"
44+
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds
5145

5246
Examples: Orderer Options
53-
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
54-
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
55-
# | docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
56-
# | docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |
47+
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
48+
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
49+
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
50+
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
51+
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
52+
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
53+
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |
5754

5855

5956
# @doNotDecompose
60-
Scenario Outline: Basic orderer function varying ACK
57+
Scenario Outline: Basic orderer function varying ACK
6158

62-
Given we compose "<ComposeFile>"
63-
And I wait ".5" seconds
64-
And user "binhn" is an authorized user of the ordering service
65-
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
66-
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
59+
Given we compose "<ComposeFile>"
60+
And I wait "<BootTime>" seconds
61+
And user "binhn" is an authorized user of the ordering service
62+
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
63+
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
6764
| Start | SpecifiedNumber | WindowSize |
6865
| SPECIFIED | 1 | 1 |
69-
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
70-
66+
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
7167

7268
Examples: Orderer Options
73-
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
74-
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 |
75-
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
69+
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
70+
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 | .5 |
71+
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
72+
| docker-compose-orderer-kafka.yml | false | 20 | 1 | 10 | 5 |
73+
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |

orderer/config/config.go

+43
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424
"time"
2525

26+
"github.com/Shopify/sarama"
2627
"github.com/op/go-logging"
2728
"github.com/spf13/viper"
2829
)
@@ -59,6 +60,21 @@ type FileLedger struct {
5960
Prefix string
6061
}
6162

63+
// Kafka contains config for the Kafka orderer
64+
type Kafka struct {
65+
Brokers []string
66+
Topic string
67+
PartitionID int32
68+
Retry Retry
69+
Version sarama.KafkaVersion // TODO For now set this in code
70+
}
71+
72+
// Retry contains config for the reconnection attempts to the Kafka brokers
73+
type Retry struct {
74+
Period time.Duration
75+
Stop time.Duration
76+
}
77+
6278
// TopLevel directly corresponds to the orderer config yaml
6379
// Note, for non 1-1 mappings, you may append
6480
// something like `mapstructure:"weirdFoRMat"` to
@@ -68,6 +84,7 @@ type TopLevel struct {
6884
General General
6985
RAMLedger RAMLedger
7086
FileLedger FileLedger
87+
Kafka Kafka
7188
}
7289

7390
var defaults = TopLevel{
@@ -88,6 +105,16 @@ var defaults = TopLevel{
88105
Location: "",
89106
Prefix: "hyperledger-fabric-rawledger",
90107
},
108+
Kafka: Kafka{
109+
Brokers: []string{"127.0.0.1:9092"},
110+
Topic: "test",
111+
PartitionID: 0,
112+
Version: sarama.V0_9_0_1,
113+
Retry: Retry{
114+
Period: 3 * time.Second,
115+
Stop: 60 * time.Second,
116+
},
117+
},
91118
}
92119

93120
func (c *TopLevel) completeInitialization() {
@@ -122,7 +149,22 @@ func (c *TopLevel) completeInitialization() {
122149
case c.FileLedger.Prefix == "":
123150
logger.Infof("FileLedger.Prefix unset, setting to %s", defaults.FileLedger.Prefix)
124151
c.FileLedger.Prefix = defaults.FileLedger.Prefix
152+
case c.Kafka.Brokers == nil:
153+
logger.Infof("Kafka.Brokers unset, setting to %v", defaults.Kafka.Brokers)
154+
c.Kafka.Brokers = defaults.Kafka.Brokers
155+
case c.Kafka.Topic == "":
156+
logger.Infof("Kafka.Topic unset, setting to %v", defaults.Kafka.Topic)
157+
c.Kafka.Topic = defaults.Kafka.Topic
158+
case c.Kafka.Retry.Period == 0*time.Second:
159+
logger.Infof("Kafka.Retry.Period unset, setting to %v", defaults.Kafka.Retry.Period)
160+
c.Kafka.Retry.Period = defaults.Kafka.Retry.Period
161+
case c.Kafka.Retry.Stop == 0*time.Second:
162+
logger.Infof("Kafka.Retry.Stop unset, setting to %v", defaults.Kafka.Retry.Stop)
163+
c.Kafka.Retry.Stop = defaults.Kafka.Retry.Stop
125164
default:
165+
// A bit hacky, but its type makes it impossible to test for a nil value.
166+
// This may be overwritten by the Kafka orderer upon instantiation.
167+
c.Kafka.Version = defaults.Kafka.Version
126168
return
127169
}
128170
}
@@ -140,6 +182,7 @@ func Load() *TopLevel {
140182

141183
config.SetConfigName("orderer")
142184
config.AddConfigPath("./")
185+
config.AddConfigPath("../../.")
143186
config.AddConfigPath("../orderer/")
144187
config.AddConfigPath("../../orderer/")
145188
// Path to look for the config file in based on GOPATH

orderer/kafka/broadcast.go

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"time"
23+
24+
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
25+
"github.com/hyperledger/fabric/orderer/config"
26+
)
27+
28+
// Broadcaster allows the caller to submit messages to the orderer
29+
type Broadcaster interface {
30+
Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error
31+
Closeable
32+
}
33+
34+
type broadcasterImpl struct {
35+
producer Producer
36+
config *config.TopLevel
37+
once sync.Once
38+
39+
batchChan chan *ab.BroadcastMessage
40+
messages []*ab.BroadcastMessage
41+
nextNumber uint64
42+
prevHash []byte
43+
}
44+
45+
func newBroadcaster(conf *config.TopLevel) Broadcaster {
46+
return &broadcasterImpl{
47+
producer: newProducer(conf),
48+
config: conf,
49+
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
50+
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("genesis")}},
51+
nextNumber: 0,
52+
}
53+
}
54+
55+
// Broadcast receives ordering requests by clients and sends back an
56+
// acknowledgement for each received message in order, indicating
57+
// success or type of failure
58+
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
59+
b.once.Do(func() {
60+
// Send the genesis block to create the topic
61+
// otherwise consumers will throw an exception.
62+
b.sendBlock()
63+
// Spawn the goroutine that cuts blocks
64+
go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize)
65+
})
66+
return b.recvRequests(stream)
67+
}
68+
69+
// Close shuts down the broadcast side of the orderer
70+
func (b *broadcasterImpl) Close() error {
71+
if b.producer != nil {
72+
return b.producer.Close()
73+
}
74+
return nil
75+
}
76+
77+
func (b *broadcasterImpl) sendBlock() error {
78+
block := &ab.Block{
79+
Messages: b.messages,
80+
Number: b.nextNumber,
81+
PrevHash: b.prevHash,
82+
}
83+
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Number, len(block.Messages), block)
84+
85+
b.messages = []*ab.BroadcastMessage{}
86+
b.nextNumber++
87+
hash, data := hashBlock(block)
88+
b.prevHash = hash
89+
90+
return b.producer.Send(data)
91+
}
92+
93+
func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
94+
timer := time.NewTimer(period)
95+
96+
for {
97+
select {
98+
case msg := <-b.batchChan:
99+
b.messages = append(b.messages, msg)
100+
if len(b.messages) >= int(maxSize) {
101+
if err := b.sendBlock(); err != nil {
102+
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
103+
}
104+
if !timer.Stop() {
105+
<-timer.C
106+
}
107+
timer.Reset(period)
108+
}
109+
case <-timer.C:
110+
if len(b.messages) > 0 {
111+
if err := b.sendBlock(); err != nil {
112+
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
113+
}
114+
}
115+
}
116+
}
117+
}
118+
119+
func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
120+
reply := new(ab.BroadcastResponse)
121+
for {
122+
msg, err := stream.Recv()
123+
if err != nil {
124+
logger.Debug("Can no longer receive requests from client (exited?)")
125+
return err
126+
}
127+
128+
b.batchChan <- msg
129+
reply.Status = ab.Status_SUCCESS // TODO This shouldn't always be a success
130+
131+
if err := stream.Send(reply); err != nil {
132+
logger.Info("Cannot send broadcast reply to client")
133+
return err
134+
}
135+
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
136+
}
137+
}

0 commit comments

Comments
 (0)