Skip to content

Commit 6cb5b91

Browse files
committed
[FAB-1840] Use block cutter to create batches
https://jira.hyperledger.org/browse/FAB-1840 Change-Id: If53dbe94604ae3cca1083bb31d40798a16adee02 Signed-off-by: Gabor Hosszu <[email protected]>
1 parent 2d8b184 commit 6cb5b91

16 files changed

+309
-102
lines changed

orderer/multichain/chainsupport.go

-2
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,10 @@ func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committe
242242
for _, committer := range committers {
243243
committer.Commit()
244244
}
245-
246245
// Set the orderer-related metadata field
247246
if encodedMetadataValue != nil {
248247
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
249248
}
250-
251249
cs.addBlockSignature(block)
252250
cs.addLastConfigSignature(block)
253251

orderer/sbft/backend/backend.go

+47-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"encoding/gob"
4141

4242
"github.com/golang/protobuf/proto"
43+
"github.com/hyperledger/fabric/orderer/common/filter"
4344
commonfilter "github.com/hyperledger/fabric/orderer/common/filter"
4445
"github.com/hyperledger/fabric/orderer/multichain"
4546
"github.com/hyperledger/fabric/orderer/sbft/connection"
@@ -246,6 +247,51 @@ func (b *Backend) AddSbftPeer(chainID string, support multichain.ConsenterSuppor
246247
return s.New(b.GetMyId(), chainID, config, b)
247248
}
248249

250+
func (b *Backend) Ordered(chainID string, req *s.Request) ([][]*s.Request, [][]filter.Committer, bool) {
251+
// ([][]*cb.Envelope, [][]filter.Committer, bool) {
252+
// If the message is a valid normal message and fills a batch, the batch, committers, true is returned
253+
// If the message is a valid special message (like a config message) it terminates the current batch
254+
// and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true
255+
env := &cb.Envelope{}
256+
err := proto.Unmarshal(req.Payload, env)
257+
if err != nil {
258+
logger.Panicf("Request format error: %s", err)
259+
}
260+
envbatch, committers, accepted := b.supports[chainID].BlockCutter().Ordered(env)
261+
if accepted {
262+
if len(envbatch) == 1 {
263+
rb1 := toRequestBatch(envbatch[0])
264+
return [][]*s.Request{rb1}, committers, true
265+
}
266+
if len(envbatch) == 2 {
267+
rb1 := toRequestBatch(envbatch[0])
268+
rb2 := toRequestBatch(envbatch[1])
269+
return [][]*s.Request{rb1, rb2}, committers, true
270+
}
271+
272+
return nil, nil, true
273+
}
274+
return nil, nil, false
275+
}
276+
277+
func (b *Backend) Cut(chainID string) ([]*s.Request, []filter.Committer) {
278+
envbatch, committers := b.supports[chainID].BlockCutter().Cut()
279+
return toRequestBatch(envbatch), committers
280+
}
281+
282+
func toRequestBatch(envelopes []*cb.Envelope) []*s.Request {
283+
rqs := make([]*s.Request, 0, len(envelopes))
284+
for _, e := range envelopes {
285+
requestbytes, err := proto.Marshal(e)
286+
if err != nil {
287+
logger.Panicf("Cannot marshal envelope: %s", err)
288+
}
289+
rq := &s.Request{Payload: requestbytes}
290+
rqs = append(rqs, rq)
291+
}
292+
return rqs
293+
}
294+
249295
// Consensus implements the SBFT consensus gRPC interface
250296
func (c *consensusConn) Consensus(_ *Handshake, srv Consensus_ConsensusServer) error {
251297
pi := connection.GetPeerInfo(srv)
@@ -333,7 +379,7 @@ func (b *Backend) Timer(d time.Duration, tf func()) s.Canceller {
333379
}
334380

335381
// Deliver writes a block
336-
func (b *Backend) Deliver(chainId string, batch *s.Batch) {
382+
func (b *Backend) Deliver(chainId string, batch *s.Batch, committers []commonfilter.Committer) {
337383
blockContents := make([]*cb.Envelope, 0, len(batch.Payloads))
338384
for _, p := range batch.Payloads {
339385
envelope := &cb.Envelope{}
@@ -354,7 +400,6 @@ func (b *Backend) Deliver(chainId string, batch *s.Batch) {
354400
metadata[signaturesIndex] = encodeSignatures(batch.Signatures)
355401
block.Metadata.Metadata = metadata
356402
b.lastBatches[chainId] = batch
357-
committers := []commonfilter.Committer{}
358403
b.supports[chainId].WriteBlock(block, committers, nil)
359404
}
360405

orderer/sbft/backend/backend_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"testing"
2626

2727
"github.com/golang/protobuf/proto"
28+
"github.com/hyperledger/fabric/orderer/common/filter"
2829
"github.com/hyperledger/fabric/orderer/mocks/multichain"
2930
mc "github.com/hyperledger/fabric/orderer/multichain"
3031
"github.com/hyperledger/fabric/orderer/sbft/simplebft"
@@ -88,10 +89,10 @@ func TestLedgerReadWrite(t *testing.T) {
8889
sgns[uint64(22)] = []byte("sgn22")
8990
batch := simplebft.Batch{Header: header, Payloads: data, Signatures: sgns}
9091

91-
b.Deliver(testChainID1, &batch)
92+
b.Deliver(testChainID1, &batch, []filter.Committer{})
9293
batch1 := b.LastBatch(testChainID1)
9394
batch2 := b.LastBatch(testChainID2)
94-
b.Deliver(testChainID3, &batch)
95+
b.Deliver(testChainID3, &batch, []filter.Committer{})
9596
batch3 := b.LastBatch(testChainID3)
9697

9798
if !reflect.DeepEqual(batch, *batch1) {

orderer/sbft/simplebft/batch.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ func (s *SBFT) checkBatch(b *Batch, checkData bool, needSigs bool) (*BatchHeader
5151
if checkData {
5252
datahash := merkleHashData(b.Payloads)
5353
if !reflect.DeepEqual(datahash, batchheader.DataHash) {
54-
return nil, fmt.Errorf("malformed batch: invalid hash")
54+
return nil, fmt.Errorf("malformed batches: invalid hash")
5555
}
5656
}
5757

5858
if batchheader.PrevHash == nil {
5959
// TODO check against root hash, which should be part of constructor
6060
} else if needSigs {
6161
if len(b.Signatures) < s.oneCorrectQuorum() {
62-
return nil, fmt.Errorf("insufficient number of signatures on batch: need %d, got %d", s.oneCorrectQuorum(), len(b.Signatures))
62+
return nil, fmt.Errorf("insufficient number of signatures on batches: need %d, got %d", s.oneCorrectQuorum(), len(b.Signatures))
6363
}
6464
}
6565

orderer/sbft/simplebft/checkpoint.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
9797
// ignore null requests
9898
batch := *s.cur.preprep.Batch
9999
batch.Signatures = cpset
100-
s.deliverBatch(&batch)
100+
s.deliverBatch(&batch, s.cur.committers)
101101
log.Infof("replica %d: request %s %s delivered on %d (completed common case)", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)
102102
s.maybeSendNextBatch()
103103
s.processBacklog()

orderer/sbft/simplebft/connection.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ func (s *SBFT) Connection(replica uint64) {
3535
}
3636

3737
// A reconnecting replica can play forward its blockchain to
38-
// the batch listed in the hello message. However, the
39-
// currently in-flight batch will not be reflected in the
38+
// the batches listed in the hello message. However, the
39+
// currently in-flight batches will not be reflected in the
4040
// Hello message, nor will all messages be present to actually
41-
// commit the in-flight batch at the reconnecting replica.
41+
// commit the in-flight batches at the reconnecting replica.
4242
//
4343
// Therefore we also send the most recent (pre)prepare,
4444
// commit, checkpoint so that the reconnecting replica can
45-
// catch up on the in-flight batch.
45+
// catch up on the in-flight batches.
4646

4747
batchheader, err := s.checkBatch(&batch, false, true)
4848
if err != nil {
@@ -66,16 +66,21 @@ func (s *SBFT) Connection(replica uint64) {
6666

6767
func (s *SBFT) handleHello(h *Hello, src uint64) {
6868
bh, err := s.checkBatch(h.Batch, false, true)
69-
log.Debugf("replica %d: got hello for batch %d from replica %d", s.id, bh.Seq, src)
69+
log.Debugf("replica %d: got hello for batches %d from replica %d", s.id, bh.Seq, src)
7070

7171
if err != nil {
72-
log.Warningf("replica %d: invalid hello batch from %d: %s", s.id, src, err)
72+
log.Warningf("replica %d: invalid hello batches from %d: %s", s.id, src, err)
7373
return
7474
}
7575

7676
if s.sys.LastBatch(s.chainId).DecodeHeader().Seq < bh.Seq {
77-
log.Debugf("replica %d: delivering batch %d after hello from replica %d", s.id, bh.Seq, src)
78-
s.deliverBatch(h.Batch)
77+
log.Debugf("replica %d: delivering batches %d after hello from replica %d", s.id, bh.Seq, src)
78+
blockOK, committers := s.getCommittersFromBlockCutter(h.Batch)
79+
if blockOK {
80+
s.deliverBatch(h.Batch, committers)
81+
} else {
82+
log.Debugf("replica %d: we got a hello from %d with an erroneous block", s.id, src)
83+
}
7984
}
8085

8186
s.handleNewView(h.NewView, src)

orderer/sbft/simplebft/newview.go

+19-10
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (s *SBFT) maybeSendNewView() {
4545

4646
var batch *Batch
4747
if xset == nil {
48-
// no need for batch, it is contained in the vset
48+
// no need for batches, it is contained in the vset
4949
} else if reflect.DeepEqual(s.cur.subject.Digest, xset.Digest) {
5050
batch = s.cur.preprep.Batch
5151
} else {
@@ -122,12 +122,12 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
122122

123123
if nv.Xset == nil {
124124
if nv.Batch != nil {
125-
log.Warningf("replica %d: invalid new view from %d: null request should come with null batch", s.id, src)
125+
log.Warningf("replica %d: invalid new view from %d: null request should come with null batches", s.id, src)
126126
s.sendViewChange()
127127
return
128128
}
129129
} else if nv.Batch == nil || !bytes.Equal(nv.Batch.Hash(), nv.Xset.Digest) {
130-
log.Warningf("replica %d: invalid new view from %d: batch head hash does not match xset: %x, %x, %v",
130+
log.Warningf("replica %d: invalid new view from %d: batches head hash does not match xset: %x, %x, %v",
131131
s.id, src, hash(nv.Batch.Header), nv.Xset.Digest, nv)
132132
s.sendViewChange()
133133
return
@@ -136,7 +136,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
136136
if nv.Batch != nil {
137137
_, err = s.checkBatch(nv.Batch, true, false)
138138
if err != nil {
139-
log.Warningf("replica %d: invalid new view from %d: invalid batch, %s",
139+
log.Warningf("replica %d: invalid new view from %d: invalid batches, %s",
140140
s.id, src, err)
141141
s.sendViewChange()
142142
return
@@ -146,19 +146,25 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
146146
s.view = nv.View
147147
s.discardBacklog(s.primaryID())
148148

149-
// maybe deliver previous batch
149+
// maybe deliver previous batches
150150
if s.sys.LastBatch(s.chainId).DecodeHeader().Seq < prevBatch.DecodeHeader().Seq {
151151
if prevBatch.DecodeHeader().Seq == s.cur.subject.Seq.Seq {
152152
// we just received a signature set for a request which we preprepared, but never delivered.
153153
// check first if the locally preprepared request matches the signature set
154154
if !reflect.DeepEqual(prevBatch.DecodeHeader().DataHash, s.cur.preprep.Batch.DecodeHeader().DataHash) {
155-
log.Warningf("replica %d: [seq %d] request checkpointed in a previous view does not match locally preprepared one, delivering batch without payload", s.id, s.cur.subject.Seq.Seq)
155+
log.Warningf("replica %d: [seq %d] request checkpointed in a previous view does not match locally preprepared one, delivering batches without payload", s.id, s.cur.subject.Seq.Seq)
156156
} else {
157-
log.Debugf("replica %d: [seq %d] request checkpointed in a previous view with matching preprepare, completing and delivering the batch with payload", s.id, s.cur.subject.Seq.Seq)
157+
log.Debugf("replica %d: [seq %d] request checkpointed in a previous view with matching preprepare, completing and delivering the batches with payload", s.id, s.cur.subject.Seq.Seq)
158158
prevBatch.Payloads = s.cur.preprep.Batch.Payloads
159159
}
160160
}
161-
s.deliverBatch(prevBatch)
161+
// TODO we should not do this here, as prevBatch was already delivered
162+
blockOK, committers := s.getCommittersFromBlockCutter(prevBatch)
163+
if !blockOK {
164+
log.Panic("Replica %d: our last checkpointed batch is erroneous (block cutter).", s.id)
165+
}
166+
// TODO what should we do with the remaining?
167+
s.deliverBatch(prevBatch, committers)
162168
}
163169

164170
// after a new-view message, prepare to accept new requests.
@@ -174,10 +180,13 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
174180
Seq: &SeqView{Seq: nv.Batch.DecodeHeader().Seq, View: s.view},
175181
Batch: nv.Batch,
176182
}
183+
blockOK, committers := s.getCommittersFromBlockCutter(nv.Batch)
184+
if !blockOK {
185+
log.Panic("Replica %d: new view %d batch erroneous (block cutter).", s.id, nv.View)
186+
}
177187

178-
s.handleCheckedPreprepare(pp)
188+
s.handleCheckedPreprepare(pp, committers)
179189
} else {
180-
log.Debugf("replica %d: %+v", s.id, s)
181190
s.cancelViewChangeTimer()
182191
s.maybeSendNextBatch()
183192
}

orderer/sbft/simplebft/preprepare.go

+63-10
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package simplebft
1919
import (
2020
"bytes"
2121
"time"
22+
23+
"github.com/hyperledger/fabric/orderer/common/filter"
2224
)
2325

24-
func (s *SBFT) sendPreprepare(batch []*Request) {
26+
func (s *SBFT) sendPreprepare(batch []*Request, committers []filter.Committer) {
2527
seq := s.nextSeq()
2628

2729
data := make([][]byte, len(batch))
@@ -38,7 +40,8 @@ func (s *SBFT) sendPreprepare(batch []*Request) {
3840

3941
s.sys.Persist(s.chainId, preprepared, m)
4042
s.broadcast(&Msg{&Msg_Preprepare{m}})
41-
s.handleCheckedPreprepare(m)
43+
log.Infof("replica %d: sendPreprepare", s.id)
44+
s.handleCheckedPreprepare(m, committers)
4245
}
4346

4447
func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
@@ -60,26 +63,27 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
6063
return
6164
}
6265
if pp.Batch == nil {
63-
log.Infof("replica %d: preprepare without batch", s.id)
66+
log.Infof("replica %d: preprepare without batches", s.id)
6467
return
6568
}
6669

6770
batchheader, err := s.checkBatch(pp.Batch, true, false)
6871
if err != nil || batchheader.Seq != pp.Seq.Seq {
69-
log.Infof("replica %d: preprepare %v batch head inconsistent from %d: %s", s.id, pp.Seq, src, err)
72+
log.Infof("replica %d: preprepare %v batches head inconsistent from %d: %s", s.id, pp.Seq, src, err)
7073
return
7174
}
7275

7376
prevhash := s.sys.LastBatch(s.chainId).Hash()
7477
if !bytes.Equal(batchheader.PrevHash, prevhash) {
75-
log.Infof("replica %d: preprepare batch prev hash does not match expected %s, got %s", s.id, hash2str(batchheader.PrevHash), hash2str(prevhash))
78+
log.Infof("replica %d: preprepare batches prev hash does not match expected %s, got %s", s.id, hash2str(batchheader.PrevHash), hash2str(prevhash))
7679
return
7780
}
78-
79-
s.handleCheckedPreprepare(pp)
81+
committers := s.getCommitters(pp)
82+
log.Infof("replica %d: handlePrepare", s.id)
83+
s.handleCheckedPreprepare(pp, committers)
8084
}
8185

82-
func (s *SBFT) acceptPreprepare(pp *Preprepare) {
86+
func (s *SBFT) acceptPreprepare(pp *Preprepare, committers []filter.Committer) {
8387
sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()}
8488

8589
log.Infof("replica %d: accepting preprepare for %v, %x", s.id, sub.Seq, sub.Digest)
@@ -92,11 +96,32 @@ func (s *SBFT) acceptPreprepare(pp *Preprepare) {
9296
prep: make(map[uint64]*Subject),
9397
commit: make(map[uint64]*Subject),
9498
checkpoint: make(map[uint64]*Checkpoint),
99+
committers: committers,
100+
}
101+
}
102+
103+
func (s *SBFT) getCommitters(pp *Preprepare) []filter.Committer {
104+
// if we are the primary, we can be sure the block is OK
105+
// and we also have the committers
106+
// TODO what to do with the remaining ones???
107+
// how to mantain the mapping between batches and committers?
108+
var committers []filter.Committer
109+
110+
if !s.isPrimary() {
111+
blockOK, allcommitters := s.getCommittersFromBlockCutter(pp.Batch)
112+
if !blockOK {
113+
log.Panicf("Replica %d found Byzantine block, Seq: %d View: %d", s.id, pp.Seq.Seq, pp.Seq.View)
114+
}
115+
committers = allcommitters
116+
} else {
117+
committers = s.primarycommitters[0]
118+
s.primarycommitters = s.primarycommitters[1:]
95119
}
120+
return committers
96121
}
97122

98-
func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) {
99-
s.acceptPreprepare(pp)
123+
func (s *SBFT) handleCheckedPreprepare(pp *Preprepare, committers []filter.Committer) {
124+
s.acceptPreprepare(pp, committers)
100125
if !s.isPrimary() {
101126
s.sendPrepare()
102127
s.processBacklog()
@@ -105,6 +130,34 @@ func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) {
105130
s.maybeSendCommit()
106131
}
107132

133+
func (s *SBFT) getCommittersFromBlockCutter(reqBatch *Batch) (bool, []filter.Committer) {
134+
reqs := make([]*Request, 0, len(reqBatch.Payloads))
135+
for _, pl := range reqBatch.Payloads {
136+
req := &Request{Payload: pl}
137+
reqs = append(reqs, req)
138+
}
139+
batches := make([][]*Request, 0, 1)
140+
comms := [][]filter.Committer{}
141+
for _, r := range reqs {
142+
b, c, accepted := s.sys.Ordered(s.chainId, r)
143+
if !accepted {
144+
return false, nil
145+
}
146+
batches = append(batches, b...)
147+
comms = append(comms, c...)
148+
}
149+
if len(batches) > 1 || len(batches) != len(comms) {
150+
return false, nil
151+
}
152+
153+
if len(batches) == 0 {
154+
_, committer := s.sys.Cut(s.chainId)
155+
return true, committer
156+
} else {
157+
return true, comms[0]
158+
}
159+
}
160+
108161
////////////////////////////////////////////////
109162

110163
func (s *SBFT) requestTimeout() {

0 commit comments

Comments
 (0)