Skip to content

Commit 6d03a16

Browse files
author
Luis Sanchez
committed
[FAB-1253] Allow attempt to send >1MB blocks to kafka
This is a quick fix to unblock using the Kafka orderer. Your kafka instance must also be configured with a corresponding message.max.bytes property in server.properties, otherwise, the server will reject the message. This changeset makes the default 100MiB. We might want to add this to the orderer config.yml, but I think it's important to get this quick fix out into master for downstream users. Change-Id: Ib4bffbd8afdc229e64d43f011f09d942e0cd5b10 Signed-off-by: Luis Sanchez <[email protected]>
1 parent 5f17fde commit 6d03a16

File tree

2 files changed

+44
-0
lines changed

2 files changed

+44
-0
lines changed

orderer/kafka/util.go

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
3131
brokerConfig := sarama.NewConfig()
3232
brokerConfig.Version = conf.Kafka.Version
3333
brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID)
34+
// set equivalent of kafka producer config max.request.bytes to the deafult
35+
// value of a kafka server's socket.request.max.bytes property (100MiB).
36+
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
3437
return brokerConfig
3538
}
3639

orderer/kafka/util_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,47 @@ func TestStaticPartitioner(t *testing.T) {
4141
}
4242
}
4343

44+
func TestProducerConfigMessageMaxBytes(t *testing.T) {
45+
46+
topic := testConf.Kafka.Topic
47+
48+
broker := sarama.NewMockBroker(t, 1000)
49+
broker.SetHandlerByMap(map[string]sarama.MockResponse{
50+
"MetadataRequest": sarama.NewMockMetadataResponse(t).
51+
SetBroker(broker.Addr(), broker.BrokerID()).
52+
SetLeader(topic, 0, broker.BrokerID()),
53+
"ProduceRequest": sarama.NewMockProduceResponse(t),
54+
})
55+
56+
config := newBrokerConfig(testConf)
57+
producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config)
58+
if err != nil {
59+
t.Fatal(err)
60+
}
61+
62+
testCases := []struct {
63+
name string
64+
size int
65+
err error
66+
}{
67+
{"TypicalDeploy", 8 * 1024 * 1024, nil},
68+
{"TooBig", 100*1024*1024 + 1, sarama.ErrMessageSizeTooLarge},
69+
}
70+
71+
for _, tc := range testCases {
72+
t.Run(tc.name, func(t *testing.T) {
73+
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(make([]byte, tc.size))})
74+
if err != tc.err {
75+
t.Fatal(err)
76+
}
77+
})
78+
79+
}
80+
81+
producer.Close()
82+
broker.Close()
83+
}
84+
4485
func TestNewBrokerConfig(t *testing.T) {
4586

4687
topic := testConf.Kafka.Topic

0 commit comments

Comments
 (0)