Skip to content

Commit 7b2e488

Browse files
author
Jason Yellick
committed
Validate state at startup
This changeset validates state at startup, switching the assumption from state being valid to state being invalid. Further, once state transfer completes, the replica will now broadcast a checkpoint for the state it transferred to. https://jira.hyperledger.org/browse/FAB-379 Change-Id: I491451f4829f14c9167ceed5d29befe3a3b08521 Signed-off-by: Jason Yellick <[email protected]>
1 parent 9bf95d0 commit 7b2e488

File tree

9 files changed

+212
-22
lines changed

9 files changed

+212
-22
lines changed

consensus/executor/executor.go

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func (co *coordinatorImpl) ProcessEvent(event events.Event) events.Event {
131131
for {
132132
err, recoverable := co.stc.SyncToTarget(info.Height-1, info.CurrentBlockHash, et.peers)
133133
if err == nil {
134+
logger.Debug("State transfer sync completed, returning")
134135
co.skipInProgress = false
135136
co.consumer.StateUpdated(et.tag, info)
136137
return nil

consensus/helper/helper.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,16 @@ func NewHelper(mhc peer.MessageHandlerCoordinator) *Helper {
5353
coordinator: mhc,
5454
secOn: viper.GetBool("security.enabled"),
5555
secHelper: mhc.GetSecHelper(),
56-
valid: true, // Assume our state is consistent until we are told otherwise, TODO: revisit
56+
valid: true, // Assume our state is consistent until we are told otherwise, actual consensus (pbft) will invalidate this immediately, but noops will not
5757
}
5858

5959
h.executor = executor.NewImpl(h, h, mhc)
60-
h.executor.Start()
6160
return h
6261
}
6362

6463
func (h *Helper) setConsenter(c consensus.Consenter) {
6564
h.consenter = c
65+
h.executor.Start() // The consenter may be expecting a callback from the executor because of state transfer completing, it will miss this if we start the executor too early
6666
}
6767

6868
// GetNetworkInfo returns the PeerEndpoints of the current validator and the entire validating network

consensus/pbft/batch.go

+9
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,17 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc
8383
etf := events.NewTimerFactoryImpl(op.manager)
8484
op.pbft = newPbftCore(id, config, op, etf)
8585
op.manager.Start()
86+
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
8687
op.externalEventReceiver.manager = op.manager
8788
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack)
89+
op.manager.Queue() <- workEvent(func() {
90+
op.pbft.stateTransfer(&stateUpdateTarget{
91+
checkpointMessage: checkpointMessage{
92+
seqNo: op.pbft.lastExec,
93+
id: blockchainInfoBlob,
94+
},
95+
})
96+
})
8897

8998
op.batchSize = config.GetInt("general.batchsize")
9099
op.batchStore = nil

consensus/pbft/batch_test.go

+47-14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/hyperledger/fabric/consensus/util/events"
2525
pb "github.com/hyperledger/fabric/protos"
2626

27+
"github.com/golang/protobuf/proto"
2728
"github.com/spf13/viper"
2829
)
2930

@@ -76,8 +77,31 @@ func TestNetworkBatch(t *testing.T) {
7677
}
7778
}
7879

79-
func TestClearOustandingReqsOnStateRecovery(t *testing.T) {
80-
b := newObcBatch(0, loadConfig(), &omniProto{})
80+
var inertState = &omniProto{
81+
GetBlockchainInfoImpl: func() *pb.BlockchainInfo {
82+
return &pb.BlockchainInfo{
83+
CurrentBlockHash: []byte("GENESIS"),
84+
Height: 1,
85+
}
86+
},
87+
GetBlockchainInfoBlobImpl: func() []byte {
88+
b, _ := proto.Marshal(&pb.BlockchainInfo{
89+
CurrentBlockHash: []byte("GENESIS"),
90+
Height: 1,
91+
})
92+
return b
93+
},
94+
InvalidateStateImpl: func() {},
95+
ValidateStateImpl: func() {},
96+
UpdateStateImpl: func(id interface{}, target *pb.BlockchainInfo, peers []*pb.PeerID) {},
97+
}
98+
99+
func TestClearOutstandingReqsOnStateRecovery(t *testing.T) {
100+
omni := *inertState
101+
omni.UnicastImpl = func(msg *pb.Message, receiverHandle *pb.PeerID) error { return nil }
102+
b := newObcBatch(0, loadConfig(), &omni)
103+
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
104+
81105
defer b.Close()
82106

83107
b.reqStore.storeOutstanding(&Request{})
@@ -98,10 +122,9 @@ func TestClearOustandingReqsOnStateRecovery(t *testing.T) {
98122
func TestOutstandingReqsIngestion(t *testing.T) {
99123
bs := [3]*obcBatch{}
100124
for i := range bs {
101-
omni := &omniProto{
102-
UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil },
103-
}
104-
bs[i] = newObcBatch(uint64(i), loadConfig(), omni)
125+
omni := *inertState
126+
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil }
127+
bs[i] = newObcBatch(uint64(i), loadConfig(), &omni)
105128
defer bs[i].Close()
106129

107130
// Have vp1 only deliver messages
@@ -115,6 +138,9 @@ func TestOutstandingReqsIngestion(t *testing.T) {
115138
}
116139
}
117140
}
141+
for i := range bs {
142+
bs[i].StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
143+
}
118144

119145
err := bs[1].RecvMsg(createTxMsg(1), &pb.PeerID{Name: "vp1"})
120146
if err != nil {
@@ -137,10 +163,10 @@ func TestOutstandingReqsIngestion(t *testing.T) {
137163
}
138164

139165
func TestOutstandingReqsResubmission(t *testing.T) {
140-
omni := &omniProto{}
141166
config := loadConfig()
142167
config.Set("general.batchsize", 2)
143-
b := newObcBatch(0, config, omni)
168+
omni := *inertState
169+
b := newObcBatch(0, config, &omni)
144170
defer b.Close() // The broadcasting threads only cause problems here... but this test stalls without them
145171

146172
transactionsBroadcast := 0
@@ -160,6 +186,9 @@ func TestOutstandingReqsResubmission(t *testing.T) {
160186
return nil
161187
}
162188

189+
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
190+
b.manager.Queue() <- nil // Make sure the state update finishes first
191+
163192
reqs := make([]*Request, 8)
164193
for i := 0; i < len(reqs); i++ {
165194
reqs[i] = createPbftReq(int64(i), 0)
@@ -232,11 +261,12 @@ func TestOutstandingReqsResubmission(t *testing.T) {
232261
}
233262

234263
func TestViewChangeOnPrimarySilence(t *testing.T) {
235-
b := newObcBatch(1, loadConfig(), &omniProto{
236-
UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil },
237-
SignImpl: func(msg []byte) ([]byte, error) { return msg, nil },
238-
VerifyImpl: func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil },
239-
})
264+
omni := *inertState
265+
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint
266+
omni.SignImpl = func(msg []byte) ([]byte, error) { return msg, nil }
267+
omni.VerifyImpl = func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil }
268+
b := newObcBatch(1, loadConfig(), &omni)
269+
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
240270
b.pbft.requestTimeout = 50 * time.Millisecond
241271
defer b.Close()
242272

@@ -347,7 +377,10 @@ func TestClassicBackToBackStateTransfer(t *testing.T) {
347377
}
348378

349379
func TestClearBatchStoreOnViewChange(t *testing.T) {
350-
b := newObcBatch(1, loadConfig(), &omniProto{})
380+
omni := *inertState
381+
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint
382+
b := newObcBatch(1, loadConfig(), &omni)
383+
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
351384
defer b.Close()
352385

353386
b.batchStore = []*Request{&Request{}}

consensus/pbft/mock_ledger_test.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package pbft
1818

1919
import (
20+
"bytes"
2021
"fmt"
2122
"reflect"
2223
"sync"
@@ -297,6 +298,27 @@ func (mock *MockLedger) GetBlockHeadMetadata() ([]byte, error) {
297298
}
298299

299300
func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers []*protos.PeerID) {
301+
if mock.blockHeight >= info.Height {
302+
blockCursor := info.Height - 1
303+
validHash := info.CurrentBlockHash
304+
for {
305+
block, ok := mock.blocks[blockCursor]
306+
if !ok {
307+
break
308+
}
309+
hash, _ := mock.HashBlock(block)
310+
if !bytes.Equal(hash, validHash) {
311+
break
312+
}
313+
blockCursor--
314+
validHash = block.PreviousBlockHash
315+
if blockCursor == ^uint64(0) {
316+
return
317+
}
318+
}
319+
panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d. (Corrupt block at %d with hash %x)", info.Height, mock.blockHeight, blockCursor, validHash))
320+
}
321+
300322
var remoteLedger consensus.ReadOnlyLedger
301323
if len(peers) > 0 {
302324
var ok bool
@@ -309,9 +331,6 @@ func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers
309331
}
310332
fmt.Printf("TEST LEDGER skipping to %+v", info)
311333
p := 0
312-
if mock.blockHeight >= info.Height {
313-
panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d", info.Height, mock.blockHeight))
314-
}
315334
for n := mock.blockHeight; n < info.Height; n++ {
316335
block, err := remoteLedger.GetBlock(n)
317336

consensus/pbft/pbft-core.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -376,11 +376,11 @@ func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
376376
return nil
377377
}
378378
logger.Infof("Replica %d application caught up via state transfer, lastExec now %d", instance.id, update.seqNo)
379-
// XXX create checkpoint
380379
instance.lastExec = update.seqNo
381380
instance.moveWatermarks(instance.lastExec) // The watermark movement handles moving this to a checkpoint boundary
382381
instance.skipInProgress = false
383382
instance.consumer.validateState()
383+
instance.Checkpoint(update.seqNo, update.id)
384384
instance.executeOutstanding()
385385
case execDoneEvent:
386386
instance.execDoneSync()

consensus/pbft/pbft-core_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/op/go-logging"
3232

3333
"github.com/hyperledger/fabric/consensus/util/events"
34+
pb "github.com/hyperledger/fabric/protos"
3435
)
3536

3637
func init() {
@@ -1703,6 +1704,30 @@ func TestViewChangeDuringExecution(t *testing.T) {
17031704
}
17041705
}
17051706

1707+
func TestStateTransferCheckpoint(t *testing.T) {
1708+
broadcasts := 0
1709+
instance := newPbftCore(3, loadConfig(), &omniProto{
1710+
broadcastImpl: func(msg []byte) {
1711+
broadcasts++
1712+
},
1713+
validateStateImpl: func() {},
1714+
}, &inertTimerFactory{})
1715+
1716+
id := []byte("My ID")
1717+
events.SendEvent(instance, stateUpdatedEvent{
1718+
chkpt: &checkpointMessage{
1719+
seqNo: 10,
1720+
id: id,
1721+
},
1722+
target: &pb.BlockchainInfo{},
1723+
})
1724+
1725+
if broadcasts != 1 {
1726+
t.Fatalf("Should have broadcast a checkpoint after the state transfer finished")
1727+
}
1728+
1729+
}
1730+
17061731
func TestStateTransferredToOldPoint(t *testing.T) {
17071732
skipped := false
17081733
instance := newPbftCore(3, loadConfig(), &omniProto{

core/peer/statetransfer/statetransfer.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type coordinatorImpl struct {
9797
// If the peerIDs are nil, then all peers are assumed to have the given block.
9898
// If the call returns an error, a boolean is included which indicates if the error may be transient and the caller should retry
9999
func (sts *coordinatorImpl) SyncToTarget(blockNumber uint64, blockHash []byte, peerIDs []*pb.PeerID) (error, bool) {
100-
logger.Debugf("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs)
100+
logger.Infof("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs)
101101

102102
if !sts.inProgress {
103103
sts.currentStateBlockNumber = sts.stack.GetBlockchainSize() - 1 // The block height is one more than the latest block number
@@ -422,8 +422,42 @@ func (sts *coordinatorImpl) syncBlockchainToTarget(blockSyncReq *blockSyncReq) {
422422
panic("Our blockchain is already higher than a sync target, this is unlikely, but unimplemented")
423423
}
424424
} else {
425+
blockCursor := blockSyncReq.blockNumber
426+
validHash := blockSyncReq.firstBlockHash
425427

426-
_, _, err := sts.syncBlocks(blockSyncReq.blockNumber, blockSyncReq.reportOnBlock, blockSyncReq.firstBlockHash, blockSyncReq.peerIDs)
428+
// Don't bother fetching blocks which are already here and valid
429+
// This is especially useful at startup
430+
for {
431+
block, err := sts.stack.GetBlockByNumber(blockCursor)
432+
if err != nil || block == nil {
433+
// Need to fetch this block
434+
break
435+
}
436+
bh, err := sts.stack.HashBlock(block)
437+
if err != nil {
438+
// Something wrong with this block
439+
break
440+
}
441+
if !bytes.Equal(bh, validHash) {
442+
// Block is corrupt
443+
break
444+
}
445+
blockCursor--
446+
validHash = block.PreviousBlockHash
447+
if blockCursor+1 == blockSyncReq.reportOnBlock {
448+
break
449+
}
450+
}
451+
452+
if blockCursor+1 <= blockSyncReq.blockNumber {
453+
logger.Debugf("Skipped remote syncing of block %d through %d because they were already present and valid", blockSyncReq.blockNumber, blockCursor+1)
454+
}
455+
456+
var err error
457+
// Note, this must accomodate blockCursor underflowing
458+
if blockCursor+1 > blockSyncReq.reportOnBlock {
459+
_, _, err = sts.syncBlocks(blockCursor, blockSyncReq.reportOnBlock, validHash, blockSyncReq.peerIDs)
460+
}
427461

428462
if nil != blockSyncReq.replyChan {
429463
logger.Debugf("Replying to blockSyncReq on reply channel with : %s", err)

core/peer/statetransfer/statetransfer_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,75 @@ func makeSimpleFilter(failureTrigger mockRequest, failureType mockResponse) (fun
164164

165165
}
166166

167+
func TestStartupValidStateGenesis(t *testing.T) {
168+
mrls := createRemoteLedgers(2, 1) // No remote targets available
169+
170+
// Test from blockheight of 1, with valid genesis block
171+
ml := NewMockLedger(mrls, nil, t)
172+
ml.PutBlock(0, SimpleGetBlock(0))
173+
174+
sts := newTestStateTransfer(ml, mrls)
175+
defer sts.Stop()
176+
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
177+
t.Fatalf("Startup failure: %s", err)
178+
}
179+
180+
}
181+
182+
func TestStartupValidStateExisting(t *testing.T) {
183+
mrls := createRemoteLedgers(2, 1) // No remote targets available
184+
185+
// Test from blockheight of 1, with valid genesis block
186+
ml := NewMockLedger(mrls, nil, t)
187+
height := uint64(50)
188+
for i := uint64(0); i < height; i++ {
189+
ml.PutBlock(i, SimpleGetBlock(i))
190+
}
191+
ml.state = SimpleGetState(height - 1)
192+
193+
sts := newTestStateTransfer(ml, mrls)
194+
defer sts.Stop()
195+
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
196+
t.Fatalf("Startup failure: %s", err)
197+
}
198+
199+
}
200+
201+
func TestStartupInvalidStateGenesis(t *testing.T) {
202+
mrls := createRemoteLedgers(1, 3)
203+
204+
// Test from blockheight of 1, with valid genesis block
205+
ml := NewMockLedger(mrls, nil, t)
206+
ml.PutBlock(0, SimpleGetBlock(0))
207+
ml.state = ^ml.state // Ensure the state is wrong
208+
209+
sts := newTestStateTransfer(ml, mrls)
210+
defer sts.Stop()
211+
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
212+
t.Fatalf("Startup failure: %s", err)
213+
}
214+
215+
}
216+
217+
func TestStartupInvalidStateExisting(t *testing.T) {
218+
mrls := createRemoteLedgers(1, 3)
219+
220+
// Test from blockheight of 1, with valid genesis block
221+
ml := NewMockLedger(mrls, nil, t)
222+
height := uint64(50)
223+
for i := uint64(0); i < height; i++ {
224+
ml.PutBlock(i, SimpleGetBlock(i))
225+
}
226+
ml.state = ^SimpleGetState(height - 1) // Ensure the state is wrong
227+
228+
sts := newTestStateTransfer(ml, mrls)
229+
defer sts.Stop()
230+
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
231+
t.Fatalf("Startup failure: %s", err)
232+
}
233+
234+
}
235+
167236
func TestCatchupSimple(t *testing.T) {
168237
mrls := createRemoteLedgers(1, 3)
169238

0 commit comments

Comments
 (0)