Skip to content

Commit 31b7572

Browse files
committed
sbft: sync state on reconnect
Change-Id: I90fc0e866de288dd592b9afb01a806e3c1df5695 Signed-off-by: Simon Schubert <[email protected]>
1 parent 18a44d0 commit 31b7572

11 files changed

+309
-149
lines changed

consensus/simplebft/backlog.go

+18-30
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,25 @@ func (s *SBFT) testBacklog(m *Msg, src uint64) bool {
2525
}
2626

2727
func (s *SBFT) testBacklog2(m *Msg, src uint64) bool {
28-
record := func(seq uint64) bool {
29-
if seq > s.cur.subject.Seq.Seq {
28+
record := func(seq *SeqView) bool {
29+
if !s.activeView {
30+
return true
31+
}
32+
if seq.Seq > s.cur.subject.Seq.Seq || seq.View > s.seq.View {
3033
return true
3134
}
3235
return false
3336
}
3437

35-
if pp := m.GetPreprepare(); pp != nil && !s.cur.executed {
36-
return true
38+
if pp := m.GetPreprepare(); pp != nil {
39+
return record(pp.Seq) && !s.cur.checkpointDone
3740
} else if p := m.GetPrepare(); p != nil {
38-
return record(p.Seq.Seq)
41+
return record(p.Seq)
3942
} else if c := m.GetCommit(); c != nil {
40-
return record(c.Seq.Seq)
43+
return record(c.Seq)
4144
} else if cs := m.GetCheckpoint(); cs != nil {
4245
c := &Checkpoint{}
43-
return record(c.Seq)
46+
return record(&SeqView{Seq: c.Seq})
4447
}
4548
return false
4649
}
@@ -53,22 +56,19 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
5356
//
5457
// Prevent DoS by limiting the number of messages per replica.
5558
//
56-
// If the backlog limit is exceeded, discard all messages with
57-
// Seq before the replica's hello message (we can, because we
58-
// can play forward to this batch via state transfer). If
59-
// there is no hello message, we must be really slow or the
60-
// replica must be byzantine. In this case we probably should
61-
// re-establish the connection.
59+
// If the backlog limit is exceeded, re-establish the
60+
// connection.
6261
//
6362
// After the connection has been re-established, we will
64-
// receive a hello, and the following messages will trigger
65-
// the pruning of old messages. If this pruning lead us not
66-
// to make progress, the backlog processing algorithm as lined
67-
// out below will take care of starting a state transfer,
68-
// using the hello message we received on reconnect.
63+
// receive a hello, which will advance our state and discard
64+
// old messages.
6965
s.replicaState[src].backLog = append(s.replicaState[src].backLog, m)
7066
}
7167

68+
func (s *SBFT) discardBacklog(src uint64) {
69+
s.replicaState[src].backLog = nil
70+
}
71+
7272
func (s *SBFT) processBacklog() {
7373
processed := true
7474
notReady := uint64(0)
@@ -111,18 +111,6 @@ func (s *SBFT) processBacklog() {
111111
// we should reconnect to get a working connection going
112112
// again.
113113
//
114-
// If a noFaultyQuorum (-1, because we're not faulty, just
115-
// were disconnected) is backlogged, we know that we need to
116-
// perform a state transfer. Of course, f of these might be
117-
// byzantine, and the remaining f that are not backlogged will
118-
// allow us to get unstuck. To check against that, we need to
119-
// only consider backlogged replicas of which we have a hello
120-
// message that talks about a future Seq.
121-
//
122-
// We need to pick the highest Seq of all the hello messages
123-
// we received, perform a state transfer to that Batch, and
124-
// discard all backlogged messages that refer to a lower Seq.
125-
//
126114
// Do we need to detect that a connection is stuck and we
127115
// should reconnect?
128116
}

consensus/simplebft/batch.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,26 @@ func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch {
4141
}
4242
}
4343

44-
func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) {
45-
datahash := merkleHashData(b.Payloads)
46-
44+
func (s *SBFT) checkBatch(b *Batch, checkData bool) (*BatchHeader, error) {
4745
batchheader := &BatchHeader{}
4846
err := proto.Unmarshal(b.Header, batchheader)
4947
if err != nil {
5048
return nil, err
5149
}
5250

53-
if !reflect.DeepEqual(datahash, batchheader.DataHash) {
54-
return nil, fmt.Errorf("malformed batch: invalid hash")
51+
if checkData {
52+
datahash := merkleHashData(b.Payloads)
53+
if !reflect.DeepEqual(datahash, batchheader.DataHash) {
54+
return nil, fmt.Errorf("malformed batch: invalid hash")
55+
}
56+
}
57+
58+
bh := b.Hash()
59+
for r, sig := range b.Signatures {
60+
err = s.sys.CheckSig(bh, r, sig)
61+
if err != nil {
62+
return nil, err
63+
}
5564
}
5665

5766
return batchheader, nil
@@ -63,3 +72,13 @@ func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) {
6372
func (b *Batch) Hash() []byte {
6473
return hash(b.Header)
6574
}
75+
76+
func (b *Batch) DecodeHeader() *BatchHeader {
77+
batchheader := &BatchHeader{}
78+
err := proto.Unmarshal(b.Header, batchheader)
79+
if err != nil {
80+
panic(err)
81+
}
82+
83+
return batchheader
84+
}

consensus/simplebft/checkpoint.go

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
9898
batch := *s.cur.preprep.Batch
9999
batch.Signatures = cpset
100100
s.sys.Deliver(&batch)
101+
s.seq = *s.cur.subject.Seq
101102

102103
s.cur.timeout.Cancel()
103104
log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)

consensus/simplebft/connection.go

+46-7
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ limitations under the License.
1616

1717
package simplebft
1818

19-
import "github.com/golang/protobuf/proto"
20-
2119
// Connection is an event from system to notify a new connection with
2220
// replica.
2321
// On connection, we send our latest (weak) checkpoint, and we expect
2422
// to receive one from replica.
2523
func (s *SBFT) Connection(replica uint64) {
2624
batch := *s.sys.LastBatch()
2725
batch.Payloads = nil // don't send the big payload
28-
s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica)
26+
hello := &Hello{Batch: &batch}
27+
if s.isPrimary() && s.activeView && s.lastNewViewSent != nil {
28+
hello.NewView = s.lastNewViewSent
29+
}
30+
s.sys.Send(&Msg{&Msg_Hello{hello}}, replica)
2931

3032
// A reconnecting replica can play forward its blockchain to
3133
// the batch listed in the hello message. However, the
@@ -43,13 +45,12 @@ func (s *SBFT) Connection(replica uint64) {
4345
// connecting right after a new-view message was received, and
4446
// its xset batch is in-flight.
4547

46-
batchheader := &BatchHeader{}
47-
err := proto.Unmarshal(batch.Header, batchheader)
48+
batchheader, err := s.checkBatch(&batch, false)
4849
if err != nil {
4950
panic(err)
5051
}
5152

52-
if s.cur.subject.Seq.Seq > batchheader.Seq {
53+
if s.cur.subject.Seq.Seq > batchheader.Seq && s.activeView {
5354
if s.isPrimary() {
5455
s.sys.Send(&Msg{&Msg_Preprepare{s.cur.preprep}}, replica)
5556
} else {
@@ -64,6 +65,44 @@ func (s *SBFT) Connection(replica uint64) {
6465
}
6566
}
6667

67-
func (s *SBFT) handleHello(h *Batch, src uint64) {
68+
func (s *SBFT) handleHello(h *Hello, src uint64) {
69+
bh, err := s.checkBatch(h.Batch, false)
70+
if err != nil {
71+
log.Warningf("invalid hello batch from %d: %s", src, err)
72+
return
73+
}
74+
75+
if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq {
76+
s.sys.Deliver(h.Batch)
77+
s.seq.Seq = bh.Seq
78+
}
79+
80+
if h.NewView != nil {
81+
if s.primaryIDView(h.NewView.View) != src {
82+
log.Warningf("invalid hello with new view from non-primary %d", src)
83+
return
84+
}
85+
86+
vcs, err := s.checkNewViewSignatures(h.NewView)
87+
if err != nil {
88+
log.Warningf("invalid hello new view from %d: %s", src, err)
89+
return
90+
}
91+
92+
_, ok := s.makeXset(vcs)
93+
if !ok {
94+
log.Warningf("invalid hello new view xset from %d", src)
95+
return
96+
}
97+
98+
if s.seq.View <= h.NewView.View {
99+
s.seq.View = h.NewView.View
100+
}
101+
s.activeView = true
102+
}
103+
68104
s.replicaState[src].hello = h
105+
106+
s.discardBacklog(src)
107+
s.processBacklog()
69108
}

consensus/simplebft/execute.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ func (s *SBFT) maybeExecute() {
2121
return
2222
}
2323
s.cur.executed = true
24-
s.seq = *s.cur.subject.Seq
25-
log.Noticef("executing %v", s.seq)
24+
log.Noticef("executing %v", s.cur.subject)
2625

2726
s.sys.Persist("execute", &s.cur.subject)
2827

consensus/simplebft/newview.go

+26-22
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
)
2323

2424
func (s *SBFT) maybeSendNewView() {
25-
if s.lastNewViewSent == s.seq.View {
25+
if s.lastNewViewSent != nil && s.lastNewViewSent.View == s.seq.View {
2626
return
2727
}
2828

@@ -63,21 +63,11 @@ func (s *SBFT) maybeSendNewView() {
6363
}
6464

6565
log.Noticef("sending new view for %d", nv.View)
66-
s.lastNewViewSent = nv.View
66+
s.lastNewViewSent = nv
6767
s.broadcast(&Msg{&Msg_NewView{nv}})
6868
}
6969

70-
func (s *SBFT) handleNewView(nv *NewView, src uint64) {
71-
if src != s.primaryIDView(nv.View) {
72-
log.Warningf("invalid new view from %d for %d", src, nv.View)
73-
return
74-
}
75-
76-
if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
77-
log.Debugf("discarding duplicate new view for %d", nv.View)
78-
return
79-
}
80-
70+
func (s *SBFT) checkNewViewSignatures(nv *NewView) ([]*ViewChange, error) {
8171
var vcs []*ViewChange
8272
for vcsrc, svc := range nv.Vset {
8373
vc := &ViewChange{}
@@ -88,13 +78,31 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
8878
}
8979
}
9080
if err != nil {
91-
log.Warningf("invalid new view from %d: view change for %d: %s", src, vcsrc, err)
92-
s.sendViewChange()
93-
return
81+
return nil, err
9482
}
9583
vcs = append(vcs, vc)
9684
}
9785

86+
return vcs, nil
87+
}
88+
89+
func (s *SBFT) handleNewView(nv *NewView, src uint64) {
90+
if src != s.primaryIDView(nv.View) {
91+
log.Warningf("invalid new view from %d for %d", src, nv.View)
92+
return
93+
}
94+
95+
if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
96+
log.Debugf("discarding duplicate new view for %d", nv.View)
97+
return
98+
}
99+
100+
vcs, err := s.checkNewViewSignatures(nv)
101+
if err != nil {
102+
log.Warningf("invalid new view from %d: %s", src, err)
103+
s.sendViewChange()
104+
}
105+
98106
xset, ok := s.makeXset(vcs)
99107
if xset.Digest == nil {
100108
// null request special treatment
@@ -120,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
120128
return
121129
}
122130

123-
_, err := s.checkBatch(nv.Batch)
131+
_, err = s.checkBatch(nv.Batch, true)
124132
if err != nil {
125133
log.Warningf("invalid new view from %d: invalid batch, %s",
126134
src, err)
@@ -155,9 +163,5 @@ func (s *SBFT) processNewView() {
155163
}
156164

157165
s.activeView = true
158-
var h []byte
159-
if nv.Batch != nil {
160-
h = hash(nv.Batch.Header)
161-
}
162-
s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: h}, pp)
166+
s.handleCheckedPreprepare(pp)
163167
}

consensus/simplebft/preprepare.go

+25-19
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,32 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
5454
log.Infof("duplicate preprepare for %v", *pp.Seq)
5555
return
5656
}
57-
var blockhash []byte
58-
if pp.Batch != nil {
59-
blockhash = hash(pp.Batch.Header)
60-
61-
batchheader, err := s.checkBatch(pp.Batch)
62-
if err != nil || batchheader.Seq != pp.Seq.Seq {
63-
log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src)
64-
return
65-
}
66-
67-
prevhash := hash(s.sys.LastBatch().Header)
68-
if !bytes.Equal(batchheader.PrevHash, prevhash) {
69-
log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash))
70-
return
71-
}
57+
if pp.Batch == nil {
58+
log.Infof("preprepare without batch")
59+
return
60+
}
61+
62+
batchheader, err := s.checkBatch(pp.Batch, true)
63+
if err != nil || batchheader.Seq != pp.Seq.Seq {
64+
log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src)
65+
return
66+
}
67+
68+
prevhash := s.sys.LastBatch().Hash()
69+
if !bytes.Equal(batchheader.PrevHash, prevhash) {
70+
log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash))
71+
return
7272
}
7373

74-
s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: blockhash}, pp)
74+
s.handleCheckedPreprepare(pp)
7575
}
7676

77-
func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) {
77+
func (s *SBFT) acceptPreprepare(pp *Preprepare) {
78+
sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()}
79+
80+
log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest)
81+
s.sys.Persist("preprepare", pp)
82+
7883
s.cur = reqInfo{
7984
subject: sub,
8085
timeout: s.sys.Timer(time.Duration(s.config.RequestTimeoutNsec)*time.Nanosecond, s.requestTimeout),
@@ -83,9 +88,10 @@ func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) {
8388
commit: make(map[uint64]*Subject),
8489
checkpoint: make(map[uint64]*Checkpoint),
8590
}
91+
}
8692

87-
log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest)
88-
s.sys.Persist("preprepare", pp)
93+
func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) {
94+
s.acceptPreprepare(pp)
8995
s.cancelViewChangeTimer()
9096
if !s.isPrimary() {
9197
s.sendPrepare()

0 commit comments

Comments
 (0)