@@ -85,48 +85,140 @@ func NewKVLedger(versionedDBProvider statedb.VersionedDBProvider, ledgerID strin
85
85
86
86
l := & KVLedger {ledgerID , blockStore , txmgmt , historymgmt }
87
87
88
- if err := recoverStateDB (l ); err != nil {
88
+ //Recover both state DB and history DB if they are out of sync with block storage
89
+ if err := recoverDB (l ); err != nil {
89
90
panic (fmt .Errorf (`Error during state DB recovery:%s` , err ))
90
91
}
91
92
92
93
return l , nil
93
94
}
94
95
95
- //Recover the state database by recommitting last valid blocks
96
- func recoverStateDB (l * KVLedger ) error {
96
+ //Recover the state database and history database (if exist)
97
+ //by recommitting last valid blocks
98
+ func recoverDB (l * KVLedger ) error {
97
99
//If there is no block in blockstorage, nothing to recover.
98
100
info , _ := l .blockStore .GetBlockchainInfo ()
99
101
if info .Height == 0 {
102
+ logger .Debugf ("Block storage is empty." )
100
103
return nil
101
104
}
102
105
103
- //Getting savepointValue stored in the state DB
104
106
var err error
105
- var savepointValue uint64
106
- if savepointValue , err = l .txtmgmt .GetBlockNumFromSavepoint (); err != nil {
107
+ var stateDBSavepoint , historyDBSavepoint uint64
108
+ //Default value for bool is false
109
+ var recoverStateDB , recoverHistoryDB bool
110
+
111
+ //Getting savepointValue stored in the state DB
112
+ if stateDBSavepoint , err = l .txtmgmt .GetBlockNumFromSavepoint (); err != nil {
107
113
return err
108
114
}
109
115
110
- //Checking whether the savepointValue is in sync with block storage height
111
- if savepointValue == info .Height {
116
+ //Check whether the state DB is in sync with block storage
117
+ if recoverStateDB , err = isRecoveryNeeded (stateDBSavepoint , info .Height ); err != nil {
118
+ return err
119
+ }
120
+
121
+ if ledgerconfig .IsHistoryDBEnabled () == true {
122
+ //Getting savepointValue stored in the history DB
123
+ if historyDBSavepoint , err = l .historymgmt .GetBlockNumFromSavepoint (); err != nil {
124
+ return err
125
+ }
126
+ //Check whether the history DB is in sync with block storage
127
+ if recoverHistoryDB , err = isRecoveryNeeded (historyDBSavepoint , info .Height ); err != nil {
128
+ return err
129
+ }
130
+ }
131
+
132
+ if recoverHistoryDB == false && recoverStateDB == false {
133
+ //If nothing needs recovery, return
134
+ if ledgerconfig .IsHistoryDBEnabled () == true {
135
+ logger .Debugf ("Both state database and history database are in sync with the block storage. No need to perform recovery operation." )
136
+ } else {
137
+ logger .Debugf ("State database is in sync with the block storage." )
138
+ }
112
139
return nil
113
- } else if savepointValue > info .Height {
114
- return errors .New ("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first" )
140
+ } else if recoverHistoryDB == false && recoverStateDB == true {
141
+ logger .Debugf ("State database is behind block storage by %d blocks. Recovering state database." , info .Height - stateDBSavepoint )
142
+ if err = recommitLostBlocks (l , stateDBSavepoint , info .Height , true , false ); err != nil {
143
+ return err
144
+ }
145
+ } else if recoverHistoryDB == true && recoverStateDB == false {
146
+ logger .Debugf ("History database is behind block storage by %d blocks. Recovering history database." , info .Height - historyDBSavepoint )
147
+ if err = recommitLostBlocks (l , historyDBSavepoint , info .Height , false , true ); err != nil {
148
+ return err
149
+ }
150
+ } else if recoverHistoryDB == true && recoverStateDB == true {
151
+ logger .Debugf ("State database is behind block storage by %d blocks, and history database is behind block storage by %d blocks. Recovering both state and history database." , info .Height - stateDBSavepoint , info .Height - historyDBSavepoint )
152
+ //If both state DB and history DB need to be recovered, first
153
+ //we need to ensure that the state DB and history DB are in same state
154
+ //before recommitting lost blocks.
155
+ if stateDBSavepoint > historyDBSavepoint {
156
+ logger .Debugf ("History database is behind the state database by %d blocks" , stateDBSavepoint - historyDBSavepoint )
157
+ logger .Debugf ("Making the history DB in sync with state DB" )
158
+ if err = recommitLostBlocks (l , historyDBSavepoint , stateDBSavepoint , false , true ); err != nil {
159
+ return err
160
+ }
161
+ logger .Debugf ("Making both history DB and state DB in sync with the block storage" )
162
+ if err = recommitLostBlocks (l , stateDBSavepoint , info .Height , true , true ); err != nil {
163
+ return err
164
+ }
165
+ } else if stateDBSavepoint < historyDBSavepoint {
166
+ logger .Debugf ("State database is behind the history database by %d blocks" , historyDBSavepoint - stateDBSavepoint )
167
+ logger .Debugf ("Making the state DB in sync with history DB" )
168
+ if err = recommitLostBlocks (l , stateDBSavepoint , historyDBSavepoint , true , false ); err != nil {
169
+ return err
170
+ }
171
+ logger .Debugf ("Making both state DB and history DB in sync with the block storage" )
172
+ if err = recommitLostBlocks (l , historyDBSavepoint , info .Height , true , true ); err != nil {
173
+ return err
174
+ }
175
+ } else {
176
+ logger .Debugf ("State and history database are in same state but behind block storage" )
177
+ logger .Debugf ("Making both state DB and history DB in sync with the block storage" )
178
+ if err = recommitLostBlocks (l , stateDBSavepoint , info .Height , true , true ); err != nil {
179
+ return err
180
+ }
181
+ }
115
182
}
183
+ return nil
184
+ }
116
185
186
+ //isRecoveryNeeded compares savepoint and current block height to decide whether
187
+ //to initiate recovery process
188
+ func isRecoveryNeeded (savepoint uint64 , blockHeight uint64 ) (bool , error ) {
189
+ if savepoint > blockHeight {
190
+ return false , errors .New ("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first" )
191
+ } else if savepoint == blockHeight {
192
+ return false , nil
193
+ } else {
194
+ return true , nil
195
+ }
196
+ }
197
+
198
+ //recommitLostBlocks retrieves blocks in specified range and commit the write set to either
199
+ //state DB or history DB or both
200
+ func recommitLostBlocks (l * KVLedger , savepoint uint64 , blockHeight uint64 , recoverStateDB bool , recoverHistoryDB bool ) error {
117
201
//Compute updateSet for each missing savepoint and commit to state DB
118
- for blockNumber := savepointValue + 1 ; blockNumber <= info .Height ; blockNumber ++ {
119
- var block * common.Block
202
+ var err error
203
+ var block * common.Block
204
+ for blockNumber := savepoint + 1 ; blockNumber <= blockHeight ; blockNumber ++ {
120
205
if block , err = l .GetBlockByNumber (blockNumber ); err != nil {
121
206
return err
122
207
}
123
- logger .Debugf ("Constructing updateSet for the block %d" , blockNumber )
124
- if err = l .txtmgmt .ValidateAndPrepare (block , false ); err != nil {
125
- return err
208
+ if recoverStateDB == true {
209
+ logger .Debugf ("Constructing updateSet for the block %d" , blockNumber )
210
+ if err = l .txtmgmt .ValidateAndPrepare (block , false ); err != nil {
211
+ return err
212
+ }
213
+ logger .Debugf ("Committing block %d to state database" , blockNumber )
214
+ if err = l .txtmgmt .Commit (); err != nil {
215
+ return err
216
+ }
126
217
}
127
- logger .Debugf ("Committing block %d to state database" , blockNumber )
128
- if err = l .txtmgmt .Commit (); err != nil {
129
- return err
218
+ if ledgerconfig .IsHistoryDBEnabled () == true && recoverHistoryDB == true {
219
+ if err = l .historymgmt .Commit (block ); err != nil {
220
+ return err
221
+ }
130
222
}
131
223
}
132
224
0 commit comments