Skip to content

Commit be08bc5

Browse files
committed
[FAB-1361] Move partitioner functions to own file
https://jira.hyperledger.org/browse/FAB-1361 Change-Id: If951bb6d76d96f7f519471b76c73b1ec01b86063 Signed-off-by: Kostas Christidis <[email protected]>
1 parent b9db02d commit be08bc5

File tree

4 files changed

+90
-41
lines changed

4 files changed

+90
-41
lines changed

orderer/kafka/partitioner.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import "github.com/Shopify/sarama"
20+
21+
// newStaticPartitioner returns a PartitionerConstructor that
22+
// returns a Partitioner that always chooses the specified partition.
23+
func newStaticPartitioner(partition int32) sarama.PartitionerConstructor {
24+
return func(topic string) sarama.Partitioner {
25+
return &staticPartitioner{partition}
26+
}
27+
}
28+
29+
type staticPartitioner struct {
30+
partitionID int32
31+
}
32+
33+
// Partition takes a message and partition count and chooses a partition.
34+
func (p *staticPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
35+
return p.partitionID, nil
36+
}
37+
38+
// RequiresConsistency indicates to the user of the partitioner
39+
// whether the mapping of key->partition is consistent or not.
40+
func (p *staticPartitioner) RequiresConsistency() bool {
41+
return true
42+
}

orderer/kafka/partitioner_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import (
20+
"testing"
21+
22+
"github.com/Shopify/sarama"
23+
"github.com/hyperledger/fabric/orderer/common/bootstrap/static"
24+
)
25+
26+
func TestStaticPartitioner(t *testing.T) {
27+
var partition int32 = 3
28+
var numberOfPartitions int32 = 6
29+
30+
partitionerConstructor := newStaticPartitioner(partition)
31+
partitioner := partitionerConstructor(static.TestChainID)
32+
33+
for i := 0; i < 10; i++ {
34+
assignedPartition, err := partitioner.Partition(new(sarama.ProducerMessage), numberOfPartitions)
35+
if err != nil {
36+
t.Fatal("Partitioner not functioning as expected:", err)
37+
}
38+
if assignedPartition != partition {
39+
t.Fatalf("Partitioner not returning the expected partition - expected %d, got %v", partition, assignedPartition)
40+
}
41+
}
42+
}

orderer/kafka/util.go

+6-22
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ const (
2929

3030
func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
3131
brokerConfig := sarama.NewConfig()
32+
3233
brokerConfig.Version = conf.Kafka.Version
34+
// A partitioner is actually not needed the way we do things now,
35+
// but we're adding it now to allow for flexibility in the future.
3336
brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID)
34-
// set equivalent of kafka producer config max.request.bytes to the deafult
35-
// value of a kafka server's socket.request.max.bytes property (100MiB).
37+
// Set equivalent of kafka producer config max.request.bytes to the deafult
38+
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
3639
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
40+
3741
return brokerConfig
3842
}
3943

@@ -54,23 +58,3 @@ func newOffsetReq(conf *config.TopLevel, seek int64) *sarama.OffsetRequest {
5458
req.AddBlock(conf.Kafka.Topic, conf.Kafka.PartitionID, seek, 1)
5559
return req
5660
}
57-
58-
// newStaticPartitioner returns a PartitionerConstructor that returns a Partitioner
59-
// that always chooses the specified partition.
60-
func newStaticPartitioner(partition int32) sarama.PartitionerConstructor {
61-
return func(topic string) sarama.Partitioner {
62-
return &staticPartitioner{partition}
63-
}
64-
}
65-
66-
type staticPartitioner struct {
67-
partitionID int32
68-
}
69-
70-
func (p *staticPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
71-
return p.partitionID, nil
72-
}
73-
74-
func (p *staticPartitioner) RequiresConsistency() bool {
75-
return true
76-
}

orderer/kafka/util_test.go

-19
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,6 @@ import (
2222
"github.com/Shopify/sarama"
2323
)
2424

25-
func TestStaticPartitioner(t *testing.T) {
26-
27-
var partition int32 = 3
28-
var numberOfPartitions int32 = 6
29-
30-
partitionerConstructor := newStaticPartitioner(partition)
31-
partitioner := partitionerConstructor(testConf.Kafka.Topic)
32-
33-
for i := 0; i < 10; i++ {
34-
assignedPartition, err := partitioner.Partition(new(sarama.ProducerMessage), numberOfPartitions)
35-
if err != nil {
36-
t.Fatal(err)
37-
}
38-
if assignedPartition != partition {
39-
t.Fatalf("Expected: %v. Actual: %v", partition, assignedPartition)
40-
}
41-
}
42-
}
43-
4425
func TestProducerConfigMessageMaxBytes(t *testing.T) {
4526

4627
topic := testConf.Kafka.Topic

0 commit comments

Comments
 (0)