Skip to content

Commit 0567b34

Browse files
author
Chris Elder
committed
FAB-1395 - Generic query API for CouchDB
Implement the following ledger interface: ExecuteQuery(query string) (ResultsIterator, error) Tests are included in the CouchDB layer in util. Tests are included for CouchDB transaction manager. Change-Id: I5c6d38ef6f3ee555d3bb9b0b9f8bfd6127ab3c01 Signed-off-by: Chris Elder <[email protected]>
1 parent 2d1ac2a commit 0567b34

File tree

7 files changed

+373
-35
lines changed

7 files changed

+373
-35
lines changed

core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go

+31-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package couchdbtxmgmt
1818

1919
import (
20-
"errors"
21-
2220
"github.com/hyperledger/fabric/core/ledger"
2321
)
2422

@@ -55,7 +53,7 @@ func (q *CouchDBQueryExecutor) GetStateMultipleKeys(namespace string, keys []str
5553
// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
5654
func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
5755
//q.checkDone()
58-
scanner, err := q.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
56+
scanner, err := q.txmgr.getRangeScanner(namespace, startKey, endKey)
5957
if err != nil {
6058
return nil, err
6159
}
@@ -64,7 +62,11 @@ func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, start
6462

6563
// ExecuteQuery implements method in interface `ledger.QueryExecutor`
6664
func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
67-
return nil, errors.New("Not supported by KV data model")
65+
scanner, err := q.txmgr.getQuery(query)
66+
if err != nil {
67+
return nil, err
68+
}
69+
return &qQueryItr{scanner}, nil
6870
}
6971

7072
// Done implements method in interface `ledger.QueryExecutor`
@@ -76,20 +78,42 @@ type qKVItr struct {
7678
s *kvScanner
7779
}
7880

81+
type qQueryItr struct {
82+
s *queryScanner
83+
}
84+
7985
// Next implements Next() method in ledger.ResultsIterator
8086
func (itr *qKVItr) Next() (ledger.QueryResult, error) {
81-
committedKV, err := itr.s.next()
87+
KV, err := itr.s.next()
8288
if err != nil {
8389
return nil, err
8490
}
85-
if committedKV == nil {
91+
if KV == nil {
8692
return nil, nil
8793
}
8894

89-
return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
95+
return &ledger.KV{Key: KV.key, Value: KV.value}, nil
9096
}
9197

9298
// Close implements Close() method in ledger.ResultsIterator
9399
func (itr *qKVItr) Close() {
94100
itr.s.close()
95101
}
102+
103+
// Next implements Next() method in ledger.ResultsIterator
104+
func (itr *qQueryItr) Next() (ledger.QueryResult, error) {
105+
queryRecord, err := itr.s.next()
106+
if err != nil {
107+
return nil, err
108+
}
109+
if queryRecord == nil {
110+
return nil, nil
111+
}
112+
113+
return &ledger.QueryRecord{Namespace: queryRecord.namespace, Key: queryRecord.key, Record: queryRecord.record}, nil
114+
}
115+
116+
// Close implements Close() method in ledger.ResultsIterator
117+
func (itr *qQueryItr) Close() {
118+
itr.s.close()
119+
}

core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go

+43-6
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,22 @@ func (s *CouchDBTxSimulator) GetState(ns string, key string) ([]byte, error) {
112112
// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
113113
func (s *CouchDBTxSimulator) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
114114
//s.checkDone()
115-
scanner, err := s.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
115+
scanner, err := s.txmgr.getRangeScanner(namespace, startKey, endKey)
116116
if err != nil {
117117
return nil, err
118118
}
119119
return &sKVItr{scanner, s}, nil
120120
}
121121

122+
// ExecuteQuery implements method in interface `ledger.QueryExecutor`
123+
func (s *CouchDBTxSimulator) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
124+
scanner, err := s.txmgr.getQuery(query)
125+
if err != nil {
126+
return nil, err
127+
}
128+
return &sQueryItr{scanner, s}, nil
129+
}
130+
122131
// SetState implements method in interface `ledger.TxSimulator`
123132
func (s *CouchDBTxSimulator) SetState(ns string, key string, value []byte) error {
124133
logger.Debugf("===COUCHDB=== Entering CouchDBTxSimulator.SetState()")
@@ -224,28 +233,56 @@ type sKVItr struct {
224233
simulator *CouchDBTxSimulator
225234
}
226235

236+
type sQueryItr struct {
237+
scanner *queryScanner
238+
simulator *CouchDBTxSimulator
239+
}
240+
227241
// Next implements Next() method in ledger.ResultsIterator
228242
// Returns the next item in the result set. The `QueryResult` is expected to be nil when
229243
// the iterator gets exhausted
230244
func (itr *sKVItr) Next() (ledger.QueryResult, error) {
231-
committedKV, err := itr.scanner.next()
245+
kv, err := itr.scanner.next()
232246
if err != nil {
233247
return nil, err
234248
}
235-
if committedKV == nil {
249+
if kv == nil {
236250
return nil, nil
237251
}
238252

239253
// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
240254
nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace)
241-
nsRWs.readMap[committedKV.key] = &kvReadCache{
242-
&rwset.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}
255+
nsRWs.readMap[kv.key] = &kvReadCache{
256+
&rwset.KVRead{Key: kv.key, Version: kv.version}, kv.value}
243257

244-
return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
258+
return &ledger.KV{Key: kv.key, Value: kv.value}, nil
245259
}
246260

247261
// Close implements Close() method in ledger.ResultsIterator
248262
// which releases resources occupied by the iterator.
249263
func (itr *sKVItr) Close() {
250264
itr.scanner.close()
251265
}
266+
267+
// Next implements Next() method in ledger.ResultsIterator
268+
func (itr *sQueryItr) Next() (ledger.QueryResult, error) {
269+
queryRecord, err := itr.scanner.next()
270+
if err != nil {
271+
return nil, err
272+
}
273+
if queryRecord == nil {
274+
return nil, nil
275+
}
276+
277+
// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
278+
nsRWs := itr.simulator.getOrCreateNsRWHolder(queryRecord.namespace)
279+
nsRWs.readMap[queryRecord.key] = &kvReadCache{
280+
&rwset.KVRead{Key: queryRecord.key, Version: queryRecord.version}, queryRecord.record}
281+
282+
return &ledger.QueryRecord{Namespace: queryRecord.namespace, Key: queryRecord.key, Record: queryRecord.record}, nil
283+
}
284+
285+
// Close implements Close() method in ledger.ResultsIterator
286+
func (itr *sQueryItr) Close() {
287+
itr.scanner.close()
288+
}

core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ limitations under the License.
1717
package couchdbtxmgmt
1818

1919
import (
20+
"encoding/json"
2021
"fmt"
2122
"os"
2223
"testing"
2324

25+
"github.com/hyperledger/fabric/core/ledger"
26+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
2427
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
2528
"github.com/hyperledger/fabric/core/ledger/testutil"
2629
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
@@ -139,3 +142,93 @@ func TestSavepoint(t *testing.T) {
139142
txMgr.Shutdown()
140143
}
141144
}
145+
146+
func TestDatabaseQuery(t *testing.T) {
147+
148+
//call a helper method to load the core.yaml
149+
testutil.SetupCoreYAMLConfig("./../../../../../peer")
150+
151+
//Only run the tests if CouchDB is explitily enabled in the code,
152+
//otherwise CouchDB may not be installed and all the tests would fail
153+
//TODO replace this with external config property rather than config within the code
154+
if ledgerconfig.IsCouchDBEnabled() == true {
155+
156+
env := newTestEnv(t)
157+
//env.Cleanup() //cleanup at the beginning to ensure the database doesn't exist already
158+
//defer env.Cleanup() //and cleanup at the end
159+
160+
txMgr := NewCouchDBTxMgr(env.conf,
161+
env.couchDBAddress, //couchDB Address
162+
env.couchDatabaseName, //couchDB db name
163+
env.couchUsername, //enter couchDB id
164+
env.couchPassword) //enter couchDB pw
165+
166+
type Asset struct {
167+
ID string `json:"_id"`
168+
Rev string `json:"_rev"`
169+
AssetName string `json:"asset_name"`
170+
Color string `json:"color"`
171+
Size string `json:"size"`
172+
Owner string `json:"owner"`
173+
}
174+
175+
s1, _ := txMgr.NewTxSimulator()
176+
177+
s1.SetState("ns1", "key1", []byte("value1"))
178+
s1.SetState("ns1", "key2", []byte("value2"))
179+
s1.SetState("ns1", "key3", []byte("value3"))
180+
s1.SetState("ns1", "key4", []byte("value4"))
181+
s1.SetState("ns1", "key5", []byte("value5"))
182+
s1.SetState("ns1", "key6", []byte("value6"))
183+
s1.SetState("ns1", "key7", []byte("value7"))
184+
s1.SetState("ns1", "key8", []byte("value8"))
185+
186+
s1.SetState("ns1", "key9", []byte(`{"asset_name":"marble1","color":"red","size":"25","owner":"jerry"}`))
187+
s1.SetState("ns1", "key10", []byte(`{"asset_name":"marble2","color":"blue","size":"10","owner":"bob"}`))
188+
s1.SetState("ns1", "key11", []byte(`{"asset_name":"marble3","color":"blue","size":"35","owner":"jerry"}`))
189+
s1.SetState("ns1", "key12", []byte(`{"asset_name":"marble4","color":"green","size":"15","owner":"bob"}`))
190+
s1.SetState("ns1", "key13", []byte(`{"asset_name":"marble5","color":"red","size":"35","owner":"jerry"}`))
191+
s1.SetState("ns1", "key14", []byte(`{"asset_name":"marble6","color":"blue","size":"25","owner":"bob"}`))
192+
193+
s1.Done()
194+
195+
// validate and commit RWset
196+
txRWSet := s1.(*CouchDBTxSimulator).getTxReadWriteSet()
197+
isValid, err := txMgr.validateTx(txRWSet)
198+
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
199+
testutil.AssertSame(t, isValid, true)
200+
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
201+
err = txMgr.Commit()
202+
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))
203+
204+
queryExecuter, _ := txMgr.NewQueryExecutor()
205+
queryString := "{\"selector\":{\"owner\": {\"$eq\": \"bob\"}},\"limit\": 10,\"skip\": 0}"
206+
207+
itr, _ := queryExecuter.ExecuteQuery(queryString)
208+
209+
counter := 0
210+
for {
211+
queryRecord, _ := itr.Next()
212+
if queryRecord == nil {
213+
break
214+
}
215+
216+
//Unmarshal the document to Asset structure
217+
assetResp := &Asset{}
218+
json.Unmarshal(queryRecord.(*ledger.QueryRecord).Record, &assetResp)
219+
220+
//Verify the owner retrieved matches
221+
testutil.AssertEquals(t, assetResp.Owner, "bob")
222+
223+
counter++
224+
225+
}
226+
227+
//Ensure the query returns 3 documents
228+
testutil.AssertEquals(t, counter, 3)
229+
230+
txMgr.Shutdown()
231+
232+
}
233+
234+
}

core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go

+52-2
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,9 @@ func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([
406406
return docBytes, ver, nil
407407
}
408408

409-
//getCommittedRangeScanner contructs composite start and end keys based on the namespace then calls the CouchDB range scanner
410-
func (txmgr *CouchDBTxMgr) getCommittedRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) {
409+
//getRangeScanner contructs composite start and end keys based on the namespace then calls the CouchDB range scanner
410+
//TODO the limit and offset are currently hard coded. The limit should eventually be a config option
411+
func (txmgr *CouchDBTxMgr) getRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) {
411412
var compositeStartKey []byte
412413
var compositeEndKey []byte
413414
if startKey != "" {
@@ -422,6 +423,17 @@ func (txmgr *CouchDBTxMgr) getCommittedRangeScanner(namespace string, startKey s
422423
return newKVScanner(namespace, *queryResult), nil
423424
}
424425

426+
//getQuery calls the CouchDB query documents method (CouchDB _find API)
427+
//TODO the limit and offset are currently hard coded. The limit should eventually be a config option
428+
func (txmgr *CouchDBTxMgr) getQuery(query string) (*queryScanner, error) {
429+
430+
//TODO - limit is currently set at 1000, eventually this will need to be changed
431+
//to reflect a config option and potentially return an exception if the threshold is exceeded
432+
queryResult, _ := txmgr.couchDB.QueryDocuments(query, 1000, 0)
433+
434+
return newQueryScanner(*queryResult), nil
435+
}
436+
425437
func encodeValue(value []byte, version uint64) []byte {
426438
versionBytes := proto.EncodeVarint(version)
427439
deleteMarker := 0
@@ -495,3 +507,41 @@ func (scanner *kvScanner) close() {
495507

496508
scanner = nil
497509
}
510+
511+
type queryScanner struct {
512+
cursor int
513+
results []couchdb.QueryResult
514+
}
515+
516+
type queryRecord struct {
517+
namespace string
518+
key string
519+
version *version.Height
520+
record []byte
521+
}
522+
523+
func newQueryScanner(queryResults []couchdb.QueryResult) *queryScanner {
524+
return &queryScanner{-1, queryResults}
525+
}
526+
527+
func (scanner *queryScanner) next() (*queryRecord, error) {
528+
529+
scanner.cursor++
530+
531+
if scanner.cursor >= len(scanner.results) {
532+
return nil, nil
533+
}
534+
535+
selectedValue := scanner.results[scanner.cursor]
536+
537+
namespace, key := splitCompositeKey([]byte(selectedValue.ID))
538+
539+
//TODO - change hardcoded version when version support is available in CouchDB
540+
return &queryRecord{namespace, key, version.NewHeight(1, 1), selectedValue.Value}, nil
541+
542+
}
543+
544+
func (scanner *queryScanner) close() {
545+
546+
scanner = nil
547+
}

core/ledger/ledger_interface.go

+9
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type QueryExecutor interface {
8383
// The returned ResultsIterator contains results of type *KV
8484
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ResultsIterator, error)
8585
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
86+
// Only used for state databases that support query
8687
ExecuteQuery(query string) (ResultsIterator, error)
8788
// Done releases resources occupied by the QueryExecutor
8889
Done()
@@ -142,6 +143,14 @@ type KeyModification struct {
142143
Transaction *pb.Transaction
143144
}
144145

146+
// QueryRecord - Result structure for query records. Holds a namespace, key and record.
147+
// Only used for state databases that support query
148+
type QueryRecord struct {
149+
Namespace string
150+
Key string
151+
Record []byte
152+
}
153+
145154
// BlockHolder holds block returned by the iterator in GetBlocksIterator.
146155
// The sole purpose of this holder is to avoid desrialization if block is desired in raw bytes form (e.g., for transfer)
147156
type BlockHolder interface {

0 commit comments

Comments
 (0)