Skip to content

Commit b043904

Browse files
manish-sethimastersingh24
authored andcommitted
[FAB-6779] Allow rebuilding block storage indexes
This CR allows building of block storage indexes. For rebuilding the indexes, existing index folder would need to be dropped. However, please note that this would drop (and rebuild) the indexes for all the chains because they share the underlying leveldb. Also, enabled the flush/synch of batch writting to leveldb (statedb, block indexes, and historydb). Change-Id: I6a926ab765df4bbb6543d6a3960359d95d60fd68 Signed-off-by: manish <[email protected]>
1 parent 837acd0 commit b043904

File tree

7 files changed

+259
-21
lines changed

7 files changed

+259
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package fsblkstorage
8+
9+
import (
10+
"fmt"
11+
"io/ioutil"
12+
"os"
13+
"strconv"
14+
"strings"
15+
16+
"github.com/davecgh/go-spew/spew"
17+
"github.com/hyperledger/fabric/protos/common"
18+
)
19+
20+
// constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info
21+
// if the last file contains no block or only a partially written block (potentially because of a crash while writing block to the file),
22+
// this scans the second last file (if any)
23+
func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, error) {
24+
logger.Debugf("Retrieving checkpoint info from block files")
25+
var lastFileNum int
26+
var numBlocksInFile int
27+
var endOffsetLastBlock int64
28+
var lastBlockNumber uint64
29+
30+
var lastBlockBytes []byte
31+
var lastBlock *common.Block
32+
var err error
33+
34+
if lastFileNum, err = retrieveLastFileSuffix(rootDir); err != nil {
35+
return nil, err
36+
}
37+
logger.Debugf("Last file number found = %d", lastFileNum)
38+
39+
if lastFileNum == -1 {
40+
cpInfo := &checkpointInfo{0, 0, true, 0}
41+
logger.Info("No block file found")
42+
return cpInfo, nil
43+
}
44+
45+
fileInfo := getFileInfoOrPanic(rootDir, lastFileNum)
46+
logger.Infof("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
47+
if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil {
48+
logger.Errorf("Error while scanning last file [file num=%d]: %s", lastFileNum, err)
49+
return nil, err
50+
}
51+
52+
if numBlocksInFile == 0 && lastFileNum > 0 {
53+
secondLastFileNum := lastFileNum - 1
54+
fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum)
55+
logger.Infof("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
56+
if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil {
57+
logger.Errorf("Error while scanning second last file [file num=%d]: %s", secondLastFileNum, err)
58+
return nil, err
59+
}
60+
}
61+
62+
if lastBlockBytes != nil {
63+
if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil {
64+
logger.Errorf("Error deserializing last block: %s. Block bytes length = %d", err, len(lastBlockBytes))
65+
return nil, err
66+
}
67+
lastBlockNumber = lastBlock.Header.Number
68+
}
69+
70+
cpInfo := &checkpointInfo{
71+
lastBlockNumber: lastBlockNumber,
72+
latestFileChunksize: int(endOffsetLastBlock),
73+
latestFileChunkSuffixNum: lastFileNum,
74+
isChainEmpty: lastFileNum == 0 && numBlocksInFile == 0,
75+
}
76+
logger.Debugf("Checkpoint info constructed from file system = %s", spew.Sdump(cpInfo))
77+
return cpInfo, nil
78+
}
79+
80+
func retrieveLastFileSuffix(rootDir string) (int, error) {
81+
logger.Debugf("retrieveLastFileSuffix()")
82+
biggestFileNum := -1
83+
filesInfo, err := ioutil.ReadDir(rootDir)
84+
if err != nil {
85+
return -1, err
86+
}
87+
for _, fileInfo := range filesInfo {
88+
name := fileInfo.Name()
89+
if fileInfo.IsDir() || !isBlockFileName(name) {
90+
logger.Debugf("Skipping File name = %s", name)
91+
continue
92+
}
93+
fileSuffix := strings.TrimPrefix(name, blockfilePrefix)
94+
fileNum, err := strconv.Atoi(fileSuffix)
95+
if err != nil {
96+
return -1, err
97+
}
98+
if fileNum > biggestFileNum {
99+
biggestFileNum = fileNum
100+
}
101+
}
102+
logger.Debugf("retrieveLastFileSuffix() - biggestFileNum = %d", biggestFileNum)
103+
return biggestFileNum, err
104+
}
105+
106+
func isBlockFileName(name string) bool {
107+
return strings.HasPrefix(name, blockfilePrefix)
108+
}
109+
110+
func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo {
111+
filePath := deriveBlockfilePath(rootDir, fileNum)
112+
fileInfo, err := os.Lstat(filePath)
113+
if err != nil {
114+
panic(fmt.Errorf("Error in retrieving file info for file num = %d", fileNum))
115+
}
116+
return fileInfo
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package fsblkstorage
8+
9+
import (
10+
"os"
11+
"testing"
12+
13+
"github.com/golang/protobuf/proto"
14+
"github.com/hyperledger/fabric/common/ledger/testutil"
15+
"github.com/hyperledger/fabric/common/ledger/util"
16+
)
17+
18+
func TestConstructCheckpointInfoFromBlockFiles(t *testing.T) {
19+
testPath := "/tmp/tests/fabric/common/ledger/blkstorage/fsblkstorage"
20+
ledgerid := "testLedger"
21+
conf := NewConf(testPath, 0)
22+
blkStoreDir := conf.getLedgerBlockDir(ledgerid)
23+
env := newTestEnv(t, conf)
24+
util.CreateDirIfMissing(blkStoreDir)
25+
defer env.Cleanup()
26+
27+
// checkpoint constructed on an empty block folder should return CPInfo with isChainEmpty: true
28+
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
29+
testutil.AssertNoError(t, err, "")
30+
testutil.AssertEquals(t, cpInfo, &checkpointInfo{isChainEmpty: true, lastBlockNumber: 0, latestFileChunksize: 0, latestFileChunkSuffixNum: 0})
31+
32+
w := newTestBlockfileWrapper(env, ledgerid)
33+
defer w.close()
34+
blockfileMgr := w.blockfileMgr
35+
bg, gb := testutil.NewBlockGenerator(t, ledgerid, false)
36+
37+
// Add a few blocks and verify that cpinfo derived from filesystem should be same as from the blockfile manager
38+
blockfileMgr.addBlock(gb)
39+
for _, blk := range bg.NextTestBlocks(3) {
40+
blockfileMgr.addBlock(blk)
41+
}
42+
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
43+
44+
// Move the chain to new file and check cpinfo derived from file system
45+
blockfileMgr.moveToNextFile()
46+
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
47+
48+
// Add a few blocks that would go to new file and verify that cpinfo derived from filesystem should be same as from the blockfile manager
49+
for _, blk := range bg.NextTestBlocks(3) {
50+
blockfileMgr.addBlock(blk)
51+
}
52+
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
53+
54+
// Write a partial block (to simulate a crash) and verify that cpinfo derived from filesystem should be same as from the blockfile manager
55+
lastTestBlk := bg.NextTestBlocks(1)[0]
56+
blockBytes, _, err := serializeBlock(lastTestBlk)
57+
testutil.AssertNoError(t, err, "")
58+
partialByte := append(proto.EncodeVarint(uint64(len(blockBytes))), blockBytes[len(blockBytes)/2:]...)
59+
blockfileMgr.currentFileWriter.append(partialByte, true)
60+
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
61+
62+
// Close the block storage, drop the index and restart and verify
63+
cpInfoBeforeClose := blockfileMgr.cpInfo
64+
w.close()
65+
env.provider.Close()
66+
indexFolder := conf.getIndexDir()
67+
testutil.AssertNoError(t, os.RemoveAll(indexFolder), "")
68+
69+
env = newTestEnv(t, conf)
70+
w = newTestBlockfileWrapper(env, ledgerid)
71+
blockfileMgr = w.blockfileMgr
72+
testutil.AssertEquals(t, blockfileMgr.cpInfo, cpInfoBeforeClose)
73+
74+
lastBlkIndexed, err := blockfileMgr.index.getLastBlockIndexed()
75+
testutil.AssertNoError(t, err, "")
76+
testutil.AssertEquals(t, lastBlkIndexed, uint64(6))
77+
78+
// Add the last block again after start and check cpinfo again
79+
testutil.AssertNoError(t, blockfileMgr.addBlock(lastTestBlk), "")
80+
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
81+
}
82+
83+
func checkCPInfoFromFile(t *testing.T, blkStoreDir string, expectedCPInfo *checkpointInfo) {
84+
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
85+
testutil.AssertNoError(t, err, "")
86+
testutil.AssertEquals(t, cpInfo, expectedCPInfo)
87+
}

common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go

+41-16
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"sync/atomic"
2424

25+
"github.com/davecgh/go-spew/spew"
26+
2527
"github.com/golang/protobuf/proto"
2628
"github.com/hyperledger/fabric/common/flogging"
2729
"github.com/hyperledger/fabric/common/ledger/blkstorage"
@@ -112,16 +114,23 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
112114
if err != nil {
113115
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
114116
}
115-
if cpInfo == nil { //if no cpInfo stored in db initiate to zero
116-
cpInfo = &checkpointInfo{0, 0, true, 0}
117-
err = mgr.saveCurrentInfo(cpInfo, true)
118-
if err != nil {
119-
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
117+
if cpInfo == nil {
118+
logger.Info(`No info about blocks file found in the db.
119+
This could happen if this is the first time the ledger is constructed or the index is dropped.
120+
Scanning blocks dir for the latest info`)
121+
if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
122+
panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
120123
}
124+
logger.Infof("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
125+
} else {
126+
logger.Info(`Synching the info about block files`)
127+
syncCPInfoFromFS(rootDir, cpInfo)
128+
}
129+
err = mgr.saveCurrentInfo(cpInfo, true)
130+
if err != nil {
131+
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
121132
}
122-
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
123-
// If not the same, sync the cpInfo and the file system
124-
syncCPInfoFromFS(rootDir, cpInfo)
133+
125134
//Open a writer to the file identified by the number and truncate it to only contain the latest block
126135
// that was completely saved (file system, index, cpinfo, etc)
127136
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
@@ -193,7 +202,7 @@ func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
193202
return
194203
}
195204
//Scan the file system to verify that the checkpoint info stored in db is correct
196-
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
205+
_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
197206
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
198207
if err != nil {
199208
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
@@ -325,25 +334,36 @@ func (mgr *blockfileMgr) syncIndex() error {
325334
}
326335
indexEmpty = true
327336
}
337+
328338
//initialize index to file number:zero, offset:zero and blockNum:0
329339
startFileNum := 0
330340
startOffset := 0
331-
blockNum := uint64(0)
332341
skipFirstBlock := false
333342
//get the last file that blocks were added to using the checkpoint info
334343
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
344+
startingBlockNum := uint64(0)
345+
335346
//if the index stored in the db has value, update the index information with those values
336347
if !indexEmpty {
348+
if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
349+
logger.Infof("Both the block files and indices are in sync.")
350+
return nil
351+
}
352+
logger.Infof("Last block indexed [%d], Last block present in block files=[%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
337353
var flp *fileLocPointer
338354
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
339355
return err
340356
}
341357
startFileNum = flp.fileSuffixNum
342358
startOffset = flp.locPointer.offset
343-
blockNum = lastBlockIndexed
344359
skipFirstBlock = true
360+
startingBlockNum = lastBlockIndexed + 1
361+
} else {
362+
logger.Infof("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
345363
}
346364

365+
logger.Infof("Start building index from block [%d]", startingBlockNum)
366+
347367
//open a blockstream to the file location that was stored in the index
348368
var stream *blockStream
349369
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
@@ -365,6 +385,7 @@ func (mgr *blockfileMgr) syncIndex() error {
365385
//Should be at the last block already, but go ahead and loop looking for next blockBytes.
366386
//If there is another block, add it to the index.
367387
//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
388+
blockIdxInfo := &blockIdxInfo{}
368389
for {
369390
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
370391
return err
@@ -385,7 +406,6 @@ func (mgr *blockfileMgr) syncIndex() error {
385406
}
386407

387408
//Update the blockIndexInfo with what was actually stored in file system
388-
blockIdxInfo := &blockIdxInfo{}
389409
blockIdxInfo.blockHash = info.blockHeader.Hash()
390410
blockIdxInfo.blockNum = info.blockHeader.Number
391411
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
@@ -397,8 +417,11 @@ func (mgr *blockfileMgr) syncIndex() error {
397417
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
398418
return err
399419
}
400-
blockNum++
420+
if blockIdxInfo.blockNum%10000 == 0 {
421+
logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
422+
}
401423
}
424+
logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
402425
return nil
403426
}
404427

@@ -581,12 +604,13 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
581604

582605
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
583606
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
584-
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
607+
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
585608
//scan the passed file number suffix starting from the passed offset to find the last completed block
586609
numBlocks := 0
610+
var lastBlockBytes []byte
587611
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
588612
if errOpen != nil {
589-
return 0, 0, errOpen
613+
return nil, 0, 0, errOpen
590614
}
591615
defer blockStream.close()
592616
var errRead error
@@ -596,6 +620,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
596620
if blockBytes == nil || errRead != nil {
597621
break
598622
}
623+
lastBlockBytes = blockBytes
599624
numBlocks++
600625
}
601626
if errRead == ErrUnexpectedEndOfBlockfile {
@@ -605,7 +630,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
605630
errRead = nil
606631
}
607632
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
608-
return blockStream.currentOffset, numBlocks, errRead
633+
return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
609634
}
610635

611636
// checkpointInfo

common/ledger/blkstorage/fsblkstorage/blockfile_scan_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) {
4343
_, fileSize, err := util.FileExists(filePath)
4444
testutil.AssertNoError(t, err, "")
4545

46-
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
46+
lastBlockBytes, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
4747
testutil.AssertNoError(t, err, "")
4848
testutil.AssertEquals(t, numBlocks, len(blocks))
4949
testutil.AssertEquals(t, endOffsetLastBlock, fileSize)
50+
51+
expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-1])
52+
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
5053
}
5154

5255
func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
@@ -72,7 +75,10 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
7275
err = file.Truncate(fileSize - 1)
7376
testutil.AssertNoError(t, err, "")
7477

75-
_, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
78+
lastBlockBytes, _, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
7679
testutil.AssertNoError(t, err, "")
7780
testutil.AssertEquals(t, numBlocks, len(blocks)-1)
81+
82+
expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-2])
83+
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
7884
}

common/ledger/blkstorage/fsblkstorage/blockindex.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
156156
}
157157

158158
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
159-
if err := index.db.WriteBatch(batch, false); err != nil {
159+
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
160+
if err := index.db.WriteBatch(batch, true); err != nil {
160161
return err
161162
}
162163
return nil

0 commit comments

Comments
 (0)