Skip to content

Commit a2b9b2e

Browse files
author
Jason Yellick
committed
[FAB-798] Factor out block cutting logic
This changeset removes the block cutting logic from solo and pushes it into a common package to be reused by other components. It is designed specifically to support applications which cannot utilize a batch timer (Kafka) as well as applications for which cutting a block does not imply committing it (SBFT). Change-Id: Iea14eb1c4a030406cfe4a19f691464c2805b23ee Signed-off-by: Jason Yellick <[email protected]>
1 parent b7908a3 commit a2b9b2e

File tree

4 files changed

+473
-99
lines changed

4 files changed

+473
-99
lines changed
+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package blockcutter
18+
19+
import (
20+
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
21+
"github.com/hyperledger/fabric/orderer/common/configtx"
22+
cb "github.com/hyperledger/fabric/protos/common"
23+
24+
"github.com/golang/protobuf/proto"
25+
"github.com/op/go-logging"
26+
)
27+
28+
var logger = logging.MustGetLogger("orderer/common/blockcutter")
29+
30+
func init() {
31+
logging.SetLevel(logging.DEBUG, "")
32+
}
33+
34+
// Target defines a sink for the ordered broadcast messages
35+
type Receiver interface {
36+
// Ordered should be invoked sequentially as messages are ordered
37+
// If the message is a valid normal message and does not fill the batch, nil, true is returned
38+
// If the message is a valid normal message and fills a batch, the batch, true is returned
39+
// If the message is a valid special message (like a config message) it terminates the current batch
40+
// and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true
41+
// If the ordered message is determined to be invalid, then nil, false is returned
42+
Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool)
43+
44+
// Cut returns the current batch and starts a new one
45+
Cut() []*cb.Envelope
46+
}
47+
48+
type receiver struct {
49+
batchSize int
50+
filters *broadcastfilter.RuleSet
51+
configManager configtx.Manager
52+
curBatch []*cb.Envelope
53+
}
54+
55+
func NewReceiverImpl(batchSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Receiver {
56+
return &receiver{
57+
batchSize: batchSize,
58+
filters: filters,
59+
configManager: configManager,
60+
}
61+
}
62+
63+
// Ordered should be invoked sequentially as messages are ordered
64+
// If the message is a valid normal message and does not fill the batch, nil, true is returned
65+
// If the message is a valid normal message and fills a batch, the batch, true is returned
66+
// If the message is a valid special message (like a config message) it terminates the current batch
67+
// and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true
68+
// If the ordered message is determined to be invalid, then nil, false is returned
69+
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
70+
// The messages must be filtered a second time in case configuration has changed since the message was received
71+
action, _ := r.filters.Apply(msg)
72+
switch action {
73+
case broadcastfilter.Accept:
74+
logger.Debugf("Enqueuing message into batch")
75+
r.curBatch = append(r.curBatch, msg)
76+
77+
if len(r.curBatch) < r.batchSize {
78+
return nil, true
79+
}
80+
81+
logger.Debugf("Batch size met, creating block")
82+
newBatch := r.curBatch
83+
r.curBatch = nil
84+
return [][]*cb.Envelope{newBatch}, true
85+
case broadcastfilter.Reconfigure:
86+
// TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch
87+
payload := &cb.Payload{}
88+
if err := proto.Unmarshal(msg.Payload, payload); err != nil {
89+
logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err)
90+
return nil, false
91+
}
92+
newConfig := &cb.ConfigurationEnvelope{}
93+
if err := proto.Unmarshal(payload.Data, newConfig); err != nil {
94+
logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err)
95+
return nil, false
96+
}
97+
err := r.configManager.Validate(newConfig)
98+
if err != nil {
99+
logger.Warningf("A configuration change made it through the ingress filter but could not be included in a batch: %v", err)
100+
return nil, false
101+
}
102+
103+
logger.Debugf("Configuration change applied successfully, committing previous block and configuration block")
104+
firstBatch := r.curBatch
105+
r.curBatch = nil
106+
secondBatch := []*cb.Envelope{msg}
107+
if firstBatch == nil {
108+
return [][]*cb.Envelope{secondBatch}, true
109+
} else {
110+
return [][]*cb.Envelope{firstBatch, secondBatch}, true
111+
}
112+
case broadcastfilter.Reject:
113+
logger.Debugf("Rejecting message")
114+
return nil, false
115+
case broadcastfilter.Forward:
116+
logger.Debugf("Ignoring message because it was not accepted by a filter")
117+
return nil, false
118+
default:
119+
logger.Fatalf("Received an unknown rule response: %v", action)
120+
}
121+
122+
return nil, false // Unreachable
123+
124+
}
125+
126+
// Cut returns the current batch and starts a new one
127+
func (r *receiver) Cut() []*cb.Envelope {
128+
batch := r.curBatch
129+
r.curBatch = nil
130+
return batch
131+
}

0 commit comments

Comments
 (0)