Skip to content

Commit 8c33fe0

Browse files
committed
Store hashes in rwset for phantom reads issue
https://jira.hyperledger.org/browse/FAB-1935 This CR enables storing of hashes instead of the full read-set for a range query as the range query results can be significant large in some cases. More detail is captured in the Jira item. I tried to contain this CR to some limit but seems unavoidable because, in addition to new functionality, it requires a number of tests and detailed comments. I have tried to put the new code in new files and minimize the changes to the existing files so as to make the review relatively easy. Change-Id: I1c89272be73cde397279d965e4c78020ea2cd696 Signed-off-by: manish <[email protected]>
1 parent 4e051ed commit 8c33fe0

14 files changed

+910
-138
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package rwset
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/golang/protobuf/proto"
23+
"github.com/hyperledger/fabric/bccsp"
24+
bccspfactory "github.com/hyperledger/fabric/bccsp/factory"
25+
)
26+
27+
const (
28+
leafLevel = MerkleTreeLevel(1)
29+
)
30+
31+
var (
32+
hashOpts = &bccsp.SHA256Opts{}
33+
)
34+
35+
// RangeQueryResultsHelper helps preparing range query results for phantom items detection during validation.
36+
// The results are expected to be fed as they are being iterated over.
37+
// If the `hashingEnabled` is set to true, a merkle tree is built of the hashes over the results.
38+
// The merkle tree helps reducing the size of the RWSet which otherwise would need to store all the raw KVReads
39+
//
40+
// The mental model of the tree can be described as below:
41+
// All the results are treated as leaf nodes (level 0) of the tree. Next up level of the tree is built by collecting 'maxDegree + 1'
42+
// items from the previous level and hashing the entire collection.
43+
// Further upper levels of the tree are built in similar manner however the only difference is that unlike level-0
44+
// (where collection consists of raw KVReads), collection at level 1 and above, consists of the hashes
45+
// (of the collection of previous level).
46+
// This is repeated until we reach at a level where we are left with the number of items less than or equals to `maxDegree`.
47+
// In the last collection, the number of items can be less than 'maxDegree' (except if this is the only collection at the given level).
48+
//
49+
// As a result, if the number of total input results are less than or equals to 'maxDegree', no hashing is performed at all.
50+
// And the final output of the computation is either the collection of raw results (if less that or equals to 'maxDegree') or
51+
// a collection of hashes (that or equals to 'maxDegree') at some level in the tree.
52+
//
53+
// `AddResult` function should be invoke to supply the next result and at the end `Done` function should be invoked.
54+
// The `Done` function does the final processing and returns the final output
55+
type RangeQueryResultsHelper struct {
56+
pendingResults []*KVRead
57+
hashingEnabled bool
58+
maxDegree int
59+
mt *merkleTree
60+
}
61+
62+
// NewRangeQueryResultsHelper constructs a RangeQueryResultsHelper
63+
func NewRangeQueryResultsHelper(enableHashing bool, maxDegree int) (*RangeQueryResultsHelper, error) {
64+
helper := &RangeQueryResultsHelper{nil, enableHashing, maxDegree, nil}
65+
if enableHashing {
66+
var err error
67+
if helper.mt, err = newMerkleTree(maxDegree); err != nil {
68+
return nil, err
69+
}
70+
}
71+
return helper, nil
72+
}
73+
74+
// AddResult adds a new query result for processing.
75+
// Put the result into the list of pending results. If the number of pending results exceeds `maxDegree`,
76+
// consume the results for incrementally update the merkle tree
77+
func (helper *RangeQueryResultsHelper) AddResult(kvRead *KVRead) error {
78+
logger.Debug("Adding a result")
79+
helper.pendingResults = append(helper.pendingResults, kvRead)
80+
if helper.hashingEnabled && len(helper.pendingResults) > helper.maxDegree {
81+
logger.Debug("Processing the accumulated results")
82+
if err := helper.processPendingResults(); err != nil {
83+
return err
84+
}
85+
}
86+
return nil
87+
}
88+
89+
// Done processes any pending results if needed
90+
// This returns the final pending results (i.e., []*KVRead) and hashes of the results (i.e., *MerkleSummary)
91+
// Only one of these two will be non-nil (except when no results are ever added).
92+
// `MerkleSummary` will be nil if and only if either `enableHashing` is set to false
93+
// or the number of total results are less than `maxDegree`
94+
func (helper *RangeQueryResultsHelper) Done() ([]*KVRead, *MerkleSummary, error) {
95+
// The merkle tree will be empty if total results are less than or equals to 'maxDegree'
96+
// i.e., not even once the results were processed for hashing
97+
if !helper.hashingEnabled || helper.mt.isEmpty() {
98+
return helper.pendingResults, nil, nil
99+
}
100+
if len(helper.pendingResults) != 0 {
101+
logger.Debug("Processing the pending results")
102+
if err := helper.processPendingResults(); err != nil {
103+
return helper.pendingResults, nil, err
104+
}
105+
}
106+
helper.mt.done()
107+
return helper.pendingResults, helper.mt.getSummery(), nil
108+
}
109+
110+
// GetMerkleSummary return the current state of the MerkleSummary
111+
// This intermediate state of the merkle tree helps during validation to detect a mismatch early on.
112+
// That helps by not requiring to build the complete merkle tree during validation
113+
// if there is a mismatch in early portion of the result-set.
114+
func (helper *RangeQueryResultsHelper) GetMerkleSummary() *MerkleSummary {
115+
if !helper.hashingEnabled {
116+
return nil
117+
}
118+
return helper.mt.getSummery()
119+
}
120+
121+
func (helper *RangeQueryResultsHelper) processPendingResults() error {
122+
var b []byte
123+
var err error
124+
if b, err = serializeKVReads(helper.pendingResults); err != nil {
125+
return err
126+
}
127+
helper.pendingResults = nil
128+
hash, err := computeHash(b)
129+
if err != nil {
130+
return err
131+
}
132+
helper.mt.update(hash)
133+
return nil
134+
}
135+
136+
func serializeKVReads(kvReads []*KVRead) ([]byte, error) {
137+
buf := proto.NewBuffer(nil)
138+
if err := buf.EncodeVarint(uint64(len(kvReads))); err != nil {
139+
return nil, err
140+
}
141+
for i := 0; i < len(kvReads); i++ {
142+
if err := kvReads[i].Marshal(buf); err != nil {
143+
return nil, err
144+
}
145+
}
146+
return buf.Bytes(), nil
147+
}
148+
149+
//////////// Merkle tree building code ///////
150+
151+
type merkleTree struct {
152+
maxDegree int
153+
tree map[MerkleTreeLevel][]Hash
154+
maxLevel MerkleTreeLevel
155+
}
156+
157+
func newMerkleTree(maxDegree int) (*merkleTree, error) {
158+
if maxDegree < 2 {
159+
return nil, fmt.Errorf("maxDegree [is %d] should not be less than 2 in the merkle tree", maxDegree)
160+
}
161+
return &merkleTree{maxDegree, make(map[MerkleTreeLevel][]Hash), 1}, nil
162+
}
163+
164+
// update takes a hash that forms the next leaf level (level-1) node in the merkle tree.
165+
// Also, complete the merkle tree as much as possible with the addition of this new leaf node -
166+
// i.e. recursively build the higher level nodes and delete the underlying sub-tree.
167+
func (m *merkleTree) update(nextLeafLevelHash Hash) error {
168+
logger.Debugf("Before update() = %s", m)
169+
defer logger.Debugf("After update() = %s", m)
170+
m.tree[leafLevel] = append(m.tree[leafLevel], nextLeafLevelHash)
171+
currentLevel := leafLevel
172+
for {
173+
currentLevelHashes := m.tree[currentLevel]
174+
if len(currentLevelHashes) <= m.maxDegree {
175+
return nil
176+
}
177+
nextLevelHash, err := computeCombinedHash(currentLevelHashes)
178+
if err != nil {
179+
return err
180+
}
181+
delete(m.tree, currentLevel)
182+
nextLevel := currentLevel + 1
183+
m.tree[nextLevel] = append(m.tree[nextLevel], nextLevelHash)
184+
if nextLevel > m.maxLevel {
185+
m.maxLevel = nextLevel
186+
}
187+
currentLevel = nextLevel
188+
}
189+
}
190+
191+
// done completes the merkle tree.
192+
// There may have been some nodes that are at the levels lower than the maxLevel (maximum level seen by the tree so far).
193+
// Make the parent nodes out of such nodes till we complete the tree at the level of maxLevel (or maxLevel+1).
194+
func (m *merkleTree) done() error {
195+
logger.Debugf("Before done() = %s", m)
196+
defer logger.Debugf("After done() = %s", m)
197+
currentLevel := leafLevel
198+
var h Hash
199+
var err error
200+
for currentLevel < m.maxLevel {
201+
currentLevelHashes := m.tree[currentLevel]
202+
switch len(currentLevelHashes) {
203+
case 0:
204+
currentLevel++
205+
continue
206+
case 1:
207+
h = currentLevelHashes[0]
208+
default:
209+
if h, err = computeCombinedHash(currentLevelHashes); err != nil {
210+
return err
211+
}
212+
}
213+
delete(m.tree, currentLevel)
214+
currentLevel++
215+
m.tree[currentLevel] = append(m.tree[currentLevel], h)
216+
}
217+
218+
finalHashes := m.tree[m.maxLevel]
219+
if len(finalHashes) > m.maxDegree {
220+
delete(m.tree, m.maxLevel)
221+
m.maxLevel++
222+
combinedHash, err := computeCombinedHash(finalHashes)
223+
if err != nil {
224+
return err
225+
}
226+
m.tree[m.maxLevel] = []Hash{combinedHash}
227+
}
228+
return nil
229+
}
230+
231+
func (m *merkleTree) getSummery() *MerkleSummary {
232+
return &MerkleSummary{m.maxDegree, m.maxLevel, m.tree[m.maxLevel]}
233+
}
234+
235+
func (m *merkleTree) getMaxLevel() MerkleTreeLevel {
236+
return m.maxLevel
237+
}
238+
239+
func (m *merkleTree) getMaxLevelHashes() []Hash {
240+
return m.tree[m.maxLevel]
241+
}
242+
243+
func (m *merkleTree) isEmpty() bool {
244+
return m.maxLevel == 1 && len(m.tree[m.maxLevel]) == 0
245+
}
246+
247+
func (m *merkleTree) String() string {
248+
return fmt.Sprintf("tree := %#v", m.tree)
249+
}
250+
251+
func computeCombinedHash(hashes []Hash) (Hash, error) {
252+
combinedHash := []byte{}
253+
for _, h := range hashes {
254+
combinedHash = append(combinedHash, h...)
255+
}
256+
return computeHash(combinedHash)
257+
}
258+
259+
func computeHash(data []byte) (Hash, error) {
260+
bccsp, err := bccspfactory.GetDefault()
261+
if err != nil {
262+
return nil, err
263+
}
264+
return bccsp.Hash(data, hashOpts)
265+
}

0 commit comments

Comments
 (0)