Skip to content

Commit 470cb3c

Browse files
committed
FAB-1405 Ledger History query framework
Ledger History queries and API implementation. The initial implementation will be to get the history of transactions for a given key. Change-Id: Ic2eb4bb6d4a53a534f434c0e4d81fed8f919a7d0 Signed-off-by: Mari Wade <[email protected]>
1 parent ccb94c5 commit 470cb3c

9 files changed

+203
-21
lines changed

core/ledger/history/couchdb_histmgr.go

+75
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"strconv"
2222

23+
"github.com/hyperledger/fabric/core/ledger"
2324
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
2425
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
2526
"github.com/hyperledger/fabric/protos/common"
@@ -29,6 +30,8 @@ import (
2930

3031
var logger = logging.MustGetLogger("history")
3132

33+
var compositeKeySep = []byte{0x00}
34+
3235
// CouchDBHistMgr a simple implementation of interface `histmgmt.HistMgr'.
3336
// TODO This implementation does not currently use a lock but may need one to ensure query's are consistent
3437
type CouchDBHistMgr struct {
@@ -49,8 +52,14 @@ func NewCouchDBHistMgr(couchDBConnectURL string, dbName string, id string, pw st
4952
return &CouchDBHistMgr{couchDB: couchDB}
5053
}
5154

55+
// NewHistoryQueryExecutor implements method in interface `histmgmt.HistMgr'.
56+
func (histmgr *CouchDBHistMgr) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
57+
return &CouchDBHistQueryExecutor{histmgr}, nil
58+
}
59+
5260
// Commit implements method in interface `histmgmt.HistMgr`
5361
// This writes to a separate history database.
62+
// TODO dpending on how invalid transactions are handled may need to filter what history commits.
5463
func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error {
5564
logger.Debugf("===HISTORYDB=== Entering CouchDBHistMgr.Commit()")
5665

@@ -124,6 +133,22 @@ func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error {
124133
return nil
125134
}
126135

136+
//getTransactionsForNsKey contructs composite start and end keys based on the namespace and key then calls the CouchDB range scanner
137+
func (histmgr *CouchDBHistMgr) getTransactionsForNsKey(namespace string, key string, includeValues bool) (*histScanner, error) {
138+
var compositeStartKey []byte
139+
var compositeEndKey []byte
140+
if key != "" {
141+
compositeStartKey = constructPartialCompositeKey(namespace, key, false)
142+
compositeEndKey = constructPartialCompositeKey(namespace, key, true)
143+
}
144+
145+
//TODO the limit should not be hardcoded. Need the config.
146+
//TODO Implement includeValues so that values are not returned in the readDocRange
147+
queryResult, _ := histmgr.couchDB.ReadDocRange(string(compositeStartKey), string(compositeEndKey), 1000, 0)
148+
149+
return newHistScanner(compositeStartKey, *queryResult), nil
150+
}
151+
127152
func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string {
128153
//History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id
129154

@@ -139,3 +164,53 @@ func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint6
139164

140165
return buffer.String()
141166
}
167+
168+
func constructPartialCompositeKey(ns string, key string, endkey bool) []byte {
169+
compositeKey := []byte(ns)
170+
compositeKey = append(compositeKey, compositeKeySep...)
171+
compositeKey = append(compositeKey, []byte(key)...)
172+
if endkey {
173+
compositeKey = append(compositeKey, []byte("1")...)
174+
}
175+
return compositeKey
176+
}
177+
178+
func splitCompositeKey(compositePartialKey []byte, compositeKey []byte) (string, string) {
179+
split := bytes.SplitN(compositeKey, compositePartialKey, 2)
180+
return string(split[0]), string(split[1])
181+
}
182+
183+
type histScanner struct {
184+
cursor int
185+
compositePartialKey []byte
186+
results []couchdb.QueryResult
187+
}
188+
189+
type historicValue struct {
190+
blockNumTranNum string
191+
value []byte
192+
}
193+
194+
func newHistScanner(compositePartialKey []byte, queryResults []couchdb.QueryResult) *histScanner {
195+
return &histScanner{-1, compositePartialKey, queryResults}
196+
}
197+
198+
func (scanner *histScanner) next() (*historicValue, error) {
199+
200+
scanner.cursor++
201+
202+
if scanner.cursor >= len(scanner.results) {
203+
return nil, nil
204+
}
205+
206+
selectedValue := scanner.results[scanner.cursor]
207+
208+
_, blockNumTranNum := splitCompositeKey(scanner.compositePartialKey, []byte(selectedValue.ID))
209+
210+
return &historicValue{blockNumTranNum, selectedValue.Value}, nil
211+
212+
}
213+
214+
func (scanner *histScanner) close() {
215+
scanner = nil
216+
}

core/ledger/history/couchdb_histmgr_test.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestHistoryDatabaseAutoCreate(t *testing.T) {
3535

3636
//call a helper method to load the core.yaml
3737
testutil.SetupCoreYAMLConfig("./../../../peer")
38-
logger.Debugf("===HISTORYDB=== TestHistoryDatabaseAutoCreate IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n",
38+
logger.Debugf("===HISTORYDB=== TestHistoryDatabaseAutoCreate IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n",
3939
ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled())
4040

4141
if ledgerconfig.IsHistoryDBEnabled() == true {
@@ -82,3 +82,17 @@ func TestConstructCompositeKey(t *testing.T) {
8282

8383
testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1")
8484
}
85+
86+
//History Database commit and read is being tested with kv_ledger_test.go.
87+
//This test will push some of the testing down into history itself
88+
func TestHistoryDatabaseCommit(t *testing.T) {
89+
//call a helper method to load the core.yaml
90+
testutil.SetupCoreYAMLConfig("./../../../peer")
91+
logger.Debugf("===HISTORYDB=== TestHistoryDatabaseCommit IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n",
92+
ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled())
93+
94+
if ledgerconfig.IsHistoryDBEnabled() == true {
95+
//TODO Build the necessary infrastructure so that history can be tested iwthout ledger
96+
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package history
18+
19+
import "github.com/hyperledger/fabric/core/ledger"
20+
21+
// CouchDBHistQueryExecutor is a query executor used in `CouchDBHistMgr`
22+
type CouchDBHistQueryExecutor struct {
23+
histmgr *CouchDBHistMgr
24+
}
25+
26+
// GetTransactionsForKey implements method in interface `ledger.HistoryQueryExecutor`
27+
func (q *CouchDBHistQueryExecutor) GetTransactionsForKey(namespace string, key string, includeValues bool, includeTransactions bool) (ledger.ResultsIterator, error) {
28+
//TODO includeTransactions has not been implemented yet.
29+
scanner, err := q.histmgr.getTransactionsForNsKey(namespace, key, includeValues)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return &qHistoryItr{scanner}, nil
34+
}
35+
36+
type qHistoryItr struct {
37+
q *histScanner
38+
}
39+
40+
// Next implements Next() method in ledger.ResultsIterator
41+
func (itr *qHistoryItr) Next() (ledger.QueryResult, error) {
42+
historicValue, err := itr.q.next()
43+
if err != nil {
44+
return nil, err
45+
}
46+
if historicValue == nil {
47+
return nil, nil
48+
}
49+
//TODO Returning blockNumTrannum as TxID for now but eventually will return txID instead
50+
return &ledger.KeyModification{TxID: historicValue.blockNumTranNum, Value: historicValue.value}, nil
51+
}
52+
53+
// Close implements Close() method in ledger.ResultsIterator
54+
func (itr *qHistoryItr) Close() {
55+
itr.q.close()
56+
}

core/ledger/history/histmgmt.go

+2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ limitations under the License.
1717
package history
1818

1919
import "github.com/hyperledger/fabric/protos/common"
20+
import "github.com/hyperledger/fabric/core/ledger"
2021

2122
// HistMgr - an interface that a history manager should implement
2223
type HistMgr interface {
24+
NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error)
2325
Commit(block *common.Block) error
2426
}

core/ledger/kvledger/kv_ledger.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,20 @@ func (l *KVLedger) NewTxSimulator() (ledger.TxSimulator, error) {
204204
return l.txtmgmt.NewTxSimulator()
205205
}
206206

207-
// NewQueryExecutor gives handle to a query executer.
207+
// NewQueryExecutor gives handle to a query executor.
208208
// A client can obtain more than one 'QueryExecutor's for parallel execution.
209209
// Any synchronization should be performed at the implementation level if required
210210
func (l *KVLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
211211
return l.txtmgmt.NewQueryExecutor()
212212
}
213213

214+
// NewHistoryQueryExecutor gives handle to a history query executor.
215+
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
216+
// Any synchronization should be performed at the implementation level if required
217+
func (l *KVLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
218+
return l.historymgmt.NewHistoryQueryExecutor()
219+
}
220+
214221
// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
215222
func (l *KVLedger) Commit(block *common.Block) error {
216223
var err error
@@ -231,8 +238,10 @@ func (l *KVLedger) Commit(block *common.Block) error {
231238
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
232239
}
233240

234-
//TODO future will want to run async with state db writes. History needs to wait for chain (FSBlock) to write but not the state db
235-
logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled())
241+
//TODO There are still some decisions to be made regarding how consistent history
242+
//needs to stay with the state. At min will want this to run async with state db writes.
243+
//(History needs to wait for the block (FSBlock) to write but not the state db)
244+
logger.Debugf("===HISTORYDB=== Commit() will write to history if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled())
236245
if ledgerconfig.IsHistoryDBEnabled() == true {
237246
logger.Debugf("Committing transactions to history database")
238247
if err := l.historymgmt.Commit(block); err != nil {

core/ledger/kvledger/kv_ledger_test.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package kvledger
1818

1919
import (
20+
"fmt"
2021
"testing"
2122

2223
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
@@ -170,6 +171,8 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
170171
ledger, _ := NewKVLedger(env.conf)
171172
defer ledger.Close()
172173

174+
//testNs := "ns1"
175+
173176
bcInfo, _ := ledger.GetBlockchainInfo()
174177
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
175178
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})
@@ -204,6 +207,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
204207
simulationResults = append(simulationResults, simRes)
205208
//add a 2nd transaction
206209
simulator2, _ := ledger.NewTxSimulator()
210+
simulator2.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"TRAIN\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
207211
simulator2.SetState("ns1", "key9", []byte("value5"))
208212
simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
209213
simulator2.Done()
@@ -230,7 +234,27 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
230234
b2, _ = ledger.GetBlockByNumber(2)
231235
testutil.AssertEquals(t, b2, block2)
232236

237+
//TODO move this test to history.
233238
if ledgerconfig.IsHistoryDBEnabled() == true {
234-
//TODO history specific test once the query api's are in and we can validate content
239+
qhistory, err := ledger.NewHistoryQueryExecutor()
240+
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to retrieve history database executor"))
241+
242+
itr, err2 := qhistory.GetTransactionsForKey("ns1", "key7", true, false)
243+
testutil.AssertNoError(t, err2, fmt.Sprintf("Error when trying to retrieve history database executor"))
244+
245+
count := 0
246+
for {
247+
kmod, _ := itr.Next()
248+
if kmod == nil {
249+
break
250+
}
251+
//TODO TEST CONTENT - need to point to ledger import and not the KVledger
252+
//TODO MOVE TEST TO HISTORY
253+
//bt := kmod.(*ledger.KeyModification).TxID
254+
//v := kmod.(*ledger.KeyModification).Value
255+
//t.Logf("Retrieved for ns=%s, key=key7 : k=%s, v=%s at count=%d start=%s end=%s", testNs, bt, v, count)
256+
count++
257+
}
258+
testutil.AssertEquals(t, count, 3)
235259
}
236260
}

core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go

-5
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@ func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, start
6262
return &qKVItr{scanner}, nil
6363
}
6464

65-
// GetTransactionsForKey - implements method in interface `ledger.QueryExecutor`
66-
func (q *CouchDBQueryExecutor) GetTransactionsForKey(namespace string, key string) (ledger.ResultsIterator, error) {
67-
return nil, errors.New("Not yet implemented")
68-
}
69-
7065
// ExecuteQuery implements method in interface `ledger.QueryExecutor`
7166
func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
7267
return nil, errors.New("Not supported by KV data model")

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_query_executer.go

-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package lockbasedtxmgr
1818

1919
import (
20-
"errors"
21-
2220
"github.com/hyperledger/fabric/core/ledger"
2321
"github.com/hyperledger/fabric/core/util"
2422
)
@@ -54,11 +52,6 @@ func (q *lockBasedQueryExecutor) GetStateRangeScanIterator(namespace string, sta
5452
return q.helper.getStateRangeScanIterator(namespace, startKey, endKey)
5553
}
5654

57-
// GetTransactionsForKey - implements method in interface `ledger.QueryExecutor`
58-
func (q *lockBasedQueryExecutor) GetTransactionsForKey(namespace string, key string) (ledger.ResultsIterator, error) {
59-
return nil, errors.New("Not yet implemented")
60-
}
61-
6255
// ExecuteQuery implements method in interface `ledger.QueryExecutor`
6356
func (q *lockBasedQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
6457
return q.helper.executeQuery(query)

core/ledger/ledger_interface.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,14 @@ type ValidatedLedger interface {
5757
// A client can obtain more than one 'TxSimulator's for parallel execution.
5858
// Any snapshoting/synchronization should be performed at the implementation level if required
5959
NewTxSimulator() (TxSimulator, error)
60-
// NewQueryExecuter gives handle to a query executor.
60+
// NewQueryExecutor gives handle to a query executor.
6161
// A client can obtain more than one 'QueryExecutor's for parallel execution.
6262
// Any synchronization should be performed at the implementation level if required
6363
NewQueryExecutor() (QueryExecutor, error)
64+
// NewHistoryQueryExecutor gives handle to a history query executor.
65+
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
66+
// Any synchronization should be performed at the implementation level if required
67+
NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
6468
// Commits block into the ledger
6569
Commit(block *common.Block) error
6670
}
@@ -78,15 +82,18 @@ type QueryExecutor interface {
7882
// GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
7983
// The returned ResultsIterator contains results of type *KV
8084
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ResultsIterator, error)
81-
// GetTransactionsForKey returns an iterator that contains all the transactions that modified the given key.
82-
// The returned ResultsIterator contains results of type *msgs.Transaction
83-
GetTransactionsForKey(namespace string, key string) (ResultsIterator, error)
8485
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
8586
ExecuteQuery(query string) (ResultsIterator, error)
8687
// Done releases resources occupied by the QueryExecutor
8788
Done()
8889
}
8990

91+
// HistoryQueryExecutor executes the history queries
92+
type HistoryQueryExecutor interface {
93+
// GetTransactionsForKey retrieves the set of transactons that updated this key by doing a key range query.
94+
GetTransactionsForKey(namespace string, key string, includeValues bool, includeTransactions bool) (ResultsIterator, error)
95+
}
96+
9097
// TxSimulator simulates a transaction on a consistent snapshot of the 'as recent state as possible'
9198
// Set* methods are for supporting KV-based data model. ExecuteUpdate method is for supporting a rich datamodel and query support
9299
type TxSimulator interface {
@@ -128,6 +135,13 @@ type KV struct {
128135
Value []byte
129136
}
130137

138+
// KeyModification - QueryResult for History.
139+
type KeyModification struct {
140+
TxID string
141+
Value []byte
142+
Transaction *pb.Transaction
143+
}
144+
131145
// BlockHolder holds block returned by the iterator in GetBlocksIterator.
132146
// The sole purpose of this holder is to avoid desrialization if block is desired in raw bytes form (e.g., for transfer)
133147
type BlockHolder interface {

0 commit comments

Comments
 (0)