Skip to content

Commit ab38851

Browse files
committed
[FAB-3384] Document Kafka-based OS
Change-Id: Id370e2e5bf009c16d640c63c2af7aef1ab809c75 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 859fd8a commit ab38851

File tree

2 files changed

+220
-1
lines changed

2 files changed

+220
-1
lines changed

docs/source/index.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ Before diving in, watch how Fabric is `Building a Blockchain for Business
6868
txflow
6969
Fabric CA's User Guide <http://hyperledger-fabric-ca.readthedocs.io/en/latest>
7070
fabric-sdks
71-
orderingservice
71+
kafka
7272
channels
7373
ledger
7474
readwrite

docs/source/kafka.rst

+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
Bringing up a Kafka-based Ordering Service
2+
===========================================
3+
4+
Caveat emptor
5+
-------------
6+
7+
This document assumes that the reader generally knows how to set up a Kafka
8+
cluster and a ZooKeeper ensemble. The purpose of this guide is to identify the
9+
steps you need to take so as to have a set of Hyperledger Fabric ordering
10+
service nodes (OSNs) use your Kafka cluster and provide an ordering service to
11+
your blockchain network.
12+
13+
Big picture
14+
-----------
15+
16+
Each channel in Fabric maps to a separate single-partition topic in Kafka. When
17+
an OSN receives transactions via the ``Broadcast`` RPC, it checks to make sure
18+
that the broadcasting client has permissions to write on the channel, then
19+
relays (i.e. produces) those transactions to the appropriate partition in Kafka.
20+
This partition is also consumed by the OSN which groups the received
21+
transactions into blocks locally, persists them in its local ledger, and serves
22+
them to receiving clients via the ``Deliver`` RPC. For low-level details, refer
23+
to `the document that describes how we came to this design
24+
<https://docs.google.com/document/d/1vNMaM7XhOlu9tB_10dKnlrhy5d7b1u8lSY8a-kVjCO4/edit>`_
25+
-- Figure 8 is a schematic representation of the process described above.
26+
27+
Steps
28+
-----
29+
30+
Let ``K`` and ``Z`` be the number of nodes in the Kafka cluster and the
31+
ZooKeeper ensemble respectively:
32+
33+
i. At a minimum, ``K`` should be set to 4. (As we will explain in Step 4 below,
34+
this is the minimum number of nodes necessary in order to exhibit crash fault
35+
tolerance, i.e. with 4 brokers, you can have 1 broker go down, all channels will
36+
continue to be writeable and readable, and new channels can be created.)
37+
38+
ii. ``Z`` will either be 3, 5, or 7. It has to be an odd number to avoid
39+
split-brain scenarios, and larger than 1 in order to avoid single point of
40+
failures. Anything beyond 7 ZooKeeper servers is considered an overkill.
41+
42+
Proceed as follows:
43+
44+
1. Orderers: **Encode the Kafka-related information in the network's genesis
45+
block.** If you are using ``configtxgen``, edit ``configtx.yaml`` -- or pick a
46+
preset profile for the system channel's genesis block -- so that:
47+
48+
a. ``Orderer.OrdererType`` is set to ``kafka``.
49+
50+
b. ``Orderer.Kafka.Brokers`` contains the address of *at least two* of the
51+
Kafka brokers in your cluster in ``IP:port`` notation. The list does not
52+
need to be exhaustive. (These are your seed brokers.)
53+
54+
2. Orderers: **Set the maximum block size.** Each block will have at most
55+
`Orderer.AbsoluteMaxBytes` bytes (not including headers), a value that you can
56+
set in ``configtx.yaml``. Let the value you pick here be ``A`` and make note of
57+
it -- it will affect how you configure your Kafka brokers in Step 4.
58+
59+
3. Orderers: **Create the genesis block.** Use ``configtxgen``. The settings you
60+
picked in Steps 1 and 2 above are system-wide settings, i.e. they apply across
61+
the network for all the OSNs. Make note of the genesis block's location.
62+
63+
4. Kafka cluster: **Configure your Kafka brokers appropriately.** Ensure that
64+
every Kafka broker has these keys configured:
65+
66+
a. ``unclean.leader.election.enable = false`` -- Data consistency is key in
67+
a blockchain environment. We cannot have a channel leader chosen outside of
68+
the in-sync replica set, or we run the risk of overwriting the offsets that
69+
the previous leader produced, and --as a result-- rewrite the blockchain
70+
that the orderers produce.
71+
72+
b. ``min.insync.replicas = M`` -- Where you pick a value ``M`` such that
73+
1 < M < N (see ``default.replication.factor`` below). Data is considered
74+
committed when it is written to at least ``M`` replicas (which are then
75+
considered in-sync and belong to the in-sync replica set, or ISR). In any
76+
other case, the write operation returns an error. Then:
77+
78+
i. If up to N-M replicas -- out of the ``N`` that the channel data is
79+
written to -- become unavailable, operations proceed normally.
80+
ii. If more replicas become unavailable, Kafka cannot maintain an ISR
81+
set of ``M,`` so it stops accepting writes. Reads work without issues.
82+
The channel becomes writeable again when ``M`` replicas get in-sync.
83+
84+
c. ``default.replication.factor = N`` -- Where you pick a value ``N`` such
85+
that N < K. A replication factor of ``N`` means that each channel will have
86+
its data replicated to ``N`` brokers. These are the candidates for the ISR
87+
set of a channel. As we noted in the ``min.insync.replicas section`` above,
88+
not all of these brokers have to be available all the time. ``N`` should be
89+
set *strictly smaller* to ``K`` because channel creations cannot go forward
90+
if less than ``N`` brokers are up. So if you set N = K, a single broker
91+
going down means that no new channels can be created on the blockchain
92+
network -- the crash fault tolerance of the ordering service is
93+
non-existent.
94+
95+
d. ``message.max.bytes`` and ``replica.fetch.max.bytes`` should be set to a
96+
value larger than ``A``, the value you picked in
97+
``Orderer.AbsoluteMaxBytes`` in Step 2 above. Add some buffer to account for
98+
headers -- 1 MiB is more than enough. The following condition applies:
99+
100+
::
101+
102+
Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes
103+
104+
(For completeness, we note that ``message.max.bytes`` should be strictly
105+
smaller to ``socket.request.max.bytes`` which is set by default to 100 MiB.
106+
If you wish to have blocks larger than 100 MiB you will need to edit the
107+
hard-coded value in ``brokerConfig.Producer.MaxMessageBytes`` in
108+
``fabric/orderer/kafka/config.go`` and rebuild the binary from source.
109+
This is not advisable.)
110+
111+
e. ``log.retention.ms = -1``. Until the ordering service in Fabric adds
112+
support for pruning of the Kafka logs, you should disable time-based
113+
retention and prevent segments from expiring. (Size-based retention -- see
114+
``log.retention.bytes`` -- is disabled by default in Kafka at the time of
115+
this writing, so there's no need to set it explicitly.)
116+
117+
Based on what we've described above, the minimum allowed values for ``M``
118+
and ``N`` are 2 and 3 respectively. This configuration allows for the
119+
creation of new channels to go forward, and for all channels to continue to
120+
be writeable.
121+
122+
5. Orderers: **Point each OSN to the genesis block.** Edit
123+
``General.GenesisFile`` in ``orderer.yaml`` so that it points to the genesis
124+
block created in Step 3 above. (While at it, ensure all other keys in that YAML
125+
file are set appropriately.)
126+
127+
6. Orderers: **Adjust polling intervals and timeouts.** (Optional step.)
128+
129+
a. The ``Kafka.Retry`` section in the ``orderer.yaml`` file allows you to
130+
adjust the frequency of the metadata/producer/consumer requests, as well as
131+
the socket timeouts. (These are all settings you would expect to see in a
132+
Kafka producer or consumer.)
133+
134+
b. Additionally, when a new channel is created, or when an existing channel
135+
is reloaded (in case of a just-restarted orderer), the orderer interacts
136+
with the Kafka cluster in the following ways:
137+
138+
a. It creates a Kafka producer (writer) for the Kafka partition that
139+
corresponds to the channel.
140+
141+
b. It uses that producer to post a no-op ``CONNECT`` message to that
142+
partition.
143+
144+
c. It creates a Kafka consumer (reader) for that partition.
145+
146+
If any of these steps fail, you can adjust the frequency with which they
147+
are repeated. Specifically they will be re-attempted every
148+
``Kafka.Retry.ShortInterval`` for a total of ``Kafka.Retry.ShortTotal``,
149+
and then every ``Kafka.Retry.LongInterval`` for a total of
150+
``Kafka.Retry.LongTotal`` until they succeed. Note that the orderer will
151+
be unable to write to or read from a channel until all of the steps
152+
above have been completed successfully.
153+
154+
7. **Set up the OSNs and Kafka cluster so that they communicate over SSL.**
155+
(Optional step, but highly recommended.) Refer to `the Confluent guide
156+
<http://docs.confluent.io/2.0.0/kafka/ssl.html>`_ for the Kafka cluster side of
157+
the equation, and set the keys under ``Kafka.TLS`` in ``orderer.yaml`` on every
158+
OSN accordingly.
159+
160+
8. **Bring up the nodes in the following order: ZooKeeper ensemble, Kafka
161+
cluster, ordering service nodes.**
162+
163+
Additional considerations
164+
-------------------------
165+
166+
1. **Preferred message size.** In Step 2 above (see `Steps`_ section) you can
167+
also set the preferred size of blocks by setting the
168+
``Orderer.Batchsize.PreferredMaxBytes`` key. Kafka offers higher throughput when
169+
dealing with relatively small messages; aim for a value no bigger than 1 MiB.
170+
171+
2. **Using environment variables to override settings.** You can override a
172+
Kafka broker or a ZooKeeper server's settings by using environment variables.
173+
Replace the dots of the configuration key with underscores --
174+
e.g. ``KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false`` will allow you to override
175+
the default value of ``unclean.leader.election.enable``. The same applies to the
176+
OSNs for their *local* configuration, i.e. what can be set in ``orderer.yaml``.
177+
For example ``ORDERER_KAFKA_RETRY_SHORTINTERVAL=1s`` allows you to override the
178+
default value for ``Orderer.Kafka.Retry.ShortInterval``.
179+
180+
Supported Kafka versions and upgrading
181+
--------------------------------------
182+
183+
Supported Kafka versions for v1 are ``0.9`` and ``0.10``. (Fabric uses the
184+
`sarama client library <https://github.com/Shopify/sarama>`_ and vendors a
185+
version of it that supports Kafka 0.9 and 0.10.)
186+
187+
Out of the box the Kafka version defaults to ``0.9.0.1``. If you wish to use a
188+
different supported version, you will have to edit the source code (modify the
189+
``Version`` field of the ``defaults`` struct in
190+
``orderer/localconfig/config.go``) and rebuild the ``orderer`` binary. For
191+
example, if you wish to run the ordering service in a Kafka cluster running
192+
0.10.0.1, you would edit the file like so:
193+
194+
::
195+
196+
...
197+
Verbose: false,
198+
Version: sarama.V0_10_0_1,
199+
TLS: TLS{
200+
...
201+
202+
And then rebuild the binary. (This process will be improved with
203+
`FAB-4619 <https://jira.hyperledger.org/browse/FAB-4619>`_.)
204+
205+
Debugging
206+
---------
207+
208+
Set ``General.LogLevel`` to ``DEBUG`` and ``Kafka.Verbose`` in ``orderer.yaml``
209+
to ``true``.
210+
211+
Example
212+
-------
213+
214+
Sample Docker Compose configuration files inline with the recommended settings
215+
above can be found under the ``fabric/bddtests`` directory. Look for
216+
``dc-orderer-kafka-base.yml`` and ``dc-orderer-kafka.yml``.
217+
218+
.. Licensed under Creative Commons Attribution 4.0 International License
219+
https://creativecommons.org/licenses/by/4.0/

0 commit comments

Comments
 (0)