Skip to content

Commit b2f9d56

Browse files
committed
FAB-2055 GetHistoryForKey() returns timestamp
Updated Ledger's GetHistoryForKey() API to return key, value, timestamp and delete marker. Further, query response from ledger is directly forwarded to shim layer to cast the response appropriately and return to chaincode as a struct. We have made the following changes to realize this. (i) Renamed QueryStateResponse proto message to QueryResponse. The QueryResult from ledger is converted to a 2D byte array (encoding using gob) and appropriately set in QueryResponse struct. Hence, the QueryResponse is consistent across different query types. This approach reduces the repetition of code in peer's handler.go that process the QueryResult from ledger. (iii) Introduced two types of iterator in chaincode shim: One for iterating over QueryResult of range/execute query and another for iterating over QueryResult of history query. Now, these iterator processes QueryResult (i.e., 2D byte array, decoded using gob) and returns a struct to chaincode: - ledger.KV struct for range and execute query, - ledger.KeyModification struct for history query. As a result, chaincode examples (map.go, marbles_chaincode.go) and mockstub have been modified (iv) Added test for historyQueries (from shim layer) (v) As java shim is significnatly behind go shim, I have commented out range query functionality for now. As a separate changeset, need to update java shim's range query with updated chaincode proto and add other query functionalities. Change-Id: I5e0cfd5f712f686361f8610d69db57646ad52b4f Signed-off-by: senthil <[email protected]>
1 parent 375ccf1 commit b2f9d56

File tree

22 files changed

+530
-347
lines changed

22 files changed

+530
-347
lines changed

core/chaincode/exectransaction_test.go

+58
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,13 @@ func startTxSimulation(ctxt context.Context, chainID string) (context.Context, l
138138
if err != nil {
139139
return nil, nil, err
140140
}
141+
historyQueryExecutor, err := lgr.NewHistoryQueryExecutor()
142+
if err != nil {
143+
return nil, nil, err
144+
}
141145

142146
ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
147+
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)
143148
return ctxt, txsim, nil
144149
}
145150

@@ -1137,6 +1142,59 @@ func TestQueries(t *testing.T) {
11371142
//Reset the query limit to default
11381143
viper.Set("ledger.state.queryLimit", 10000)
11391144

1145+
if ledgerconfig.IsHistoryDBEnabled() == true {
1146+
1147+
f = "put"
1148+
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
1149+
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
1150+
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
1151+
nextBlockNumber++
1152+
if err != nil {
1153+
t.Fail()
1154+
t.Logf("Error invoking <%s>: %s", ccID, err)
1155+
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1156+
return
1157+
}
1158+
1159+
f = "put"
1160+
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
1161+
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
1162+
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
1163+
nextBlockNumber++
1164+
if err != nil {
1165+
t.Fail()
1166+
t.Logf("Error invoking <%s>: %s", ccID, err)
1167+
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1168+
return
1169+
}
1170+
1171+
//The following history query for "marble12" should return 3 records
1172+
f = "history"
1173+
args = util.ToChaincodeArgs(f, "marble12")
1174+
1175+
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
1176+
_, _, retval, err := invoke(ctxt, chainID, spec, nextBlockNumber)
1177+
nextBlockNumber++
1178+
if err != nil {
1179+
t.Fail()
1180+
t.Logf("Error invoking <%s>: %s", ccID, err)
1181+
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1182+
return
1183+
}
1184+
1185+
var history []interface{}
1186+
err = json.Unmarshal(retval, &history)
1187+
1188+
//default query limit of 10000 is used, query should return all records that meet the criteria
1189+
if len(history) != 3 {
1190+
t.Fail()
1191+
t.Logf("Error detected with the history query, should have returned 3 but returned %v", len(keys))
1192+
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1193+
return
1194+
}
1195+
1196+
}
1197+
11401198
if ledgerconfig.IsCouchDBEnabled() == true {
11411199

11421200
//The following rich query for should return 9 marbles

core/chaincode/handler.go

+88-115
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package chaincode
1818

1919
import (
2020
"bytes"
21+
"encoding/gob"
2122
"fmt"
2223
"io"
2324
"sync"
@@ -193,7 +194,8 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
193194
if handler.txCtxs[txid] != nil {
194195
return nil, fmt.Errorf("txid:%s exists", txid)
195196
}
196-
txctx := &transactionContext{chainID: chainID, signedProp: signedProp, proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
197+
txctx := &transactionContext{chainID: chainID, signedProp: signedProp,
198+
proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
197199
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
198200
handler.txCtxs[txid] = txctx
199201
txctx.txsimulator = getTxSimulator(ctxt)
@@ -720,38 +722,20 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
720722
}
721723

722724
handler.putQueryIterator(txContext, iterID, rangeIter)
725+
var payload *pb.QueryResponse
726+
payload, err = getQueryResponse(handler, txContext, rangeIter, iterID)
723727

724-
var keysAndValues []*pb.QueryStateKeyValue
725-
var i = 0
726-
var queryLimit = ledgerconfig.GetQueryLimit()
727-
var qresult commonledger.QueryResult
728-
for ; i < queryLimit; i++ {
729-
qresult, err = rangeIter.Next()
730-
if err != nil {
731-
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
732-
return
733-
}
734-
if qresult == nil {
735-
break
736-
}
737-
kv := qresult.(*ledger.KV)
738-
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
739-
keysAndValues = append(keysAndValues, &keyAndValue)
740-
}
741-
if qresult != nil {
728+
if err != nil {
742729
rangeIter.Close()
743730
handler.deleteQueryIterator(txContext, iterID)
744-
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
745-
//following changes to the future paging design.
746-
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
731+
payload := []byte(err.Error())
732+
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
733+
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
734+
return
747735
}
748736

749-
//TODO - HasMore is set to false until the requery issue for the peer is resolved
750-
//FAB-2462 - Re-introduce paging for range queries and rich queries
751-
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
752-
753-
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
754-
payloadBytes, err := proto.Marshal(payload)
737+
var payloadBytes []byte
738+
payloadBytes, err = proto.Marshal(payload)
755739
if err != nil {
756740
rangeIter.Close()
757741
handler.deleteQueryIterator(txContext, iterID)
@@ -769,6 +753,60 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
769753
}()
770754
}
771755

756+
func getBytes(qresult interface{}) ([]byte, error) {
757+
var buf bytes.Buffer
758+
enc := gob.NewEncoder(&buf)
759+
err := enc.Encode(qresult)
760+
if err != nil {
761+
return nil, err
762+
}
763+
return buf.Bytes(), nil
764+
}
765+
766+
//getQueryResponse takes an iterator and fetch state to construct QueryResponse
767+
func getQueryResponse(handler *Handler, txContext *transactionContext, iter commonledger.ResultsIterator,
768+
iterID string) (*pb.QueryResponse, error) {
769+
770+
var i = 0
771+
var err error
772+
var queryLimit = ledgerconfig.GetQueryLimit()
773+
var queryResult commonledger.QueryResult
774+
var queryResultsBytes []*pb.QueryResultBytes
775+
776+
for ; i < queryLimit; i++ {
777+
queryResult, err = iter.Next()
778+
if err != nil {
779+
return nil, err
780+
}
781+
if queryResult == nil {
782+
break
783+
}
784+
var resultBytes []byte
785+
resultBytes, err = getBytes(queryResult)
786+
if err != nil {
787+
return nil, err
788+
}
789+
790+
qresultBytes := pb.QueryResultBytes{ResultBytes: resultBytes}
791+
queryResultsBytes = append(queryResultsBytes, &qresultBytes)
792+
}
793+
794+
if queryResult == nil {
795+
iter.Close()
796+
handler.deleteQueryIterator(txContext, iterID)
797+
} else {
798+
//TODO: remove this else part completely when paging design is implemented
799+
//FAB-2462 - Re-introduce paging for range queries and rich queries
800+
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
801+
iter.Close()
802+
handler.deleteQueryIterator(txContext, iterID)
803+
}
804+
//TODO - HasMore is set to false until the requery issue for the peer is resolved
805+
//FAB-2462 - Re-introduce paging for range queries and rich queries
806+
//payload := &pb.QueryResponse{Data: data, HasMore: qresult != nil, Id: iterID}
807+
return &pb.QueryResponse{Results: queryResultsBytes, HasMore: false, Id: iterID}, nil
808+
}
809+
772810
// afterQueryStateNext handles a QUERY_STATE_NEXT request from the chaincode.
773811
func (handler *Handler) afterQueryStateNext(e *fsm.Event, state string) {
774812
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
@@ -824,39 +862,17 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
824862
return
825863
}
826864

827-
var keysAndValues []*pb.QueryStateKeyValue
828-
var i = 0
829-
var queryLimit = ledgerconfig.GetQueryLimit()
830-
831-
var qresult commonledger.QueryResult
832-
var err error
833-
for ; i < queryLimit; i++ {
834-
qresult, err = queryIter.Next()
835-
if err != nil {
836-
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
837-
return
838-
}
839-
if qresult != nil {
840-
break
841-
}
842-
kv := qresult.(*ledger.KV)
843-
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
844-
keysAndValues = append(keysAndValues, &keyAndValue)
845-
}
865+
payload, err := getQueryResponse(handler, txContext, queryIter, queryStateNext.Id)
846866

847-
if qresult != nil {
867+
if err != nil {
848868
queryIter.Close()
849869
handler.deleteQueryIterator(txContext, queryStateNext.Id)
850-
851-
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
852-
//following changes to the future paging design.
853-
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
870+
payload := []byte(err.Error())
871+
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
872+
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
873+
return
854874
}
855875

856-
//TODO - HasMore is set to false until the requery issue for the peer is resolved
857-
//FAB-2462 - Re-introduce paging for range queries and rich queries
858-
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id}
859-
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: queryStateNext.Id}
860876
payloadBytes, err := proto.Marshal(payload)
861877
if err != nil {
862878
queryIter.Close()
@@ -927,7 +943,7 @@ func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) {
927943
handler.deleteQueryIterator(txContext, queryStateClose.Id)
928944
}
929945

930-
payload := &pb.QueryStateResponse{HasMore: false, Id: queryStateClose.Id}
946+
payload := &pb.QueryResponse{HasMore: false, Id: queryStateClose.Id}
931947
payloadBytes, err := proto.Marshal(payload)
932948
if err != nil {
933949

@@ -1010,48 +1026,27 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
10101026
}
10111027

10121028
handler.putQueryIterator(txContext, iterID, executeIter)
1029+
var payload *pb.QueryResponse
1030+
payload, err = getQueryResponse(handler, txContext, executeIter, iterID)
10131031

1014-
var keysAndValues []*pb.QueryStateKeyValue
1015-
var i = 0
1016-
var queryLimit = ledgerconfig.GetQueryLimit()
1017-
var qresult commonledger.QueryResult
1018-
for ; i < queryLimit; i++ {
1019-
qresult, err = executeIter.Next()
1020-
if err != nil {
1021-
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
1022-
return
1023-
}
1024-
if qresult == nil {
1025-
break
1026-
}
1027-
queryRecord := qresult.(*ledger.QueryRecord)
1028-
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.Key, Value: queryRecord.Record}
1029-
keysAndValues = append(keysAndValues, &keyAndValue)
1030-
}
1031-
1032-
if qresult != nil {
1032+
if err != nil {
10331033
executeIter.Close()
10341034
handler.deleteQueryIterator(txContext, iterID)
1035-
1036-
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
1037-
//following changes to the future paging design.
1038-
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
1035+
payload := []byte(err.Error())
1036+
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
1037+
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
1038+
return
10391039
}
10401040

10411041
var payloadBytes []byte
1042-
1043-
//TODO - HasMore is set to false until the requery issue for the peer is resolved
1044-
//FAB-2462 - Re-introduce paging for range queries and rich queries
1045-
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
1046-
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
10471042
payloadBytes, err = proto.Marshal(payload)
10481043
if err != nil {
10491044
executeIter.Close()
10501045
handler.deleteQueryIterator(txContext, iterID)
10511046

10521047
// Send error msg back to chaincode. GetState will not trigger event
10531048
payload := []byte(err.Error())
1054-
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
1049+
chaincodeLogger.Errorf("Failed marshall response. Sending %s", pb.ChaincodeMessage_ERROR)
10551050
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
10561051
return
10571052
}
@@ -1128,41 +1123,19 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
11281123

11291124
handler.putQueryIterator(txContext, iterID, historyIter)
11301125

1131-
// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
1132-
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
1133-
var keysAndValues []*pb.QueryStateKeyValue
1134-
var i = 0
1135-
var queryLimit = ledgerconfig.GetQueryLimit()
1136-
var qresult commonledger.QueryResult
1137-
for ; i < queryLimit; i++ {
1138-
qresult, err = historyIter.Next()
1139-
if err != nil {
1140-
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
1141-
return
1142-
}
1143-
if qresult == nil {
1144-
break
1145-
}
1146-
queryRecord := qresult.(*ledger.KeyModification)
1147-
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
1148-
keysAndValues = append(keysAndValues, &keyAndValue)
1149-
}
1126+
var payload *pb.QueryResponse
1127+
payload, err = getQueryResponse(handler, txContext, historyIter, iterID)
11501128

1151-
if qresult != nil {
1129+
if err != nil {
11521130
historyIter.Close()
11531131
handler.deleteQueryIterator(txContext, iterID)
1154-
1155-
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
1156-
//following changes to the future paging design.
1157-
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
1132+
payload := []byte(err.Error())
1133+
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
1134+
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
1135+
return
11581136
}
11591137

11601138
var payloadBytes []byte
1161-
1162-
//TODO - HasMore is set to false until the requery issue for the peer is resolved
1163-
//FAB-2462 - Re-introduce paging for range queries and rich queries
1164-
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
1165-
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
11661139
payloadBytes, err = proto.Marshal(payload)
11671140
if err != nil {
11681141
historyIter.Close()

0 commit comments

Comments
 (0)