Skip to content

Commit 99a6669

Browse files
author
Srinivasan Muralidharan
committed
FAB-860 Fix table crash
GetRows exibits two kinds of errors . iterator gets closed asynchronously while chaincode is traversing rows . channel panic on iterator getting closed (this happens when chaincode does not drain the iterator completely) The two issues are unrelated to each other. Typically the first issue would kick in and users won't see the second. For small tables users would see the second if the chaincode does not read all rows. First issue is fixed by removing the iterator.Close call in the GetRows processing. Transaction clean up logic will close it anyway. Second issue is fixed by trapping the panic and recovering. This is based on the fact that the panic can happen only from the channel close. NOTE - this is not checked into master yet as current master does not completely support Table yet. Change-Id: I73697b7b5b91d809940c6a969281ecbb8042a763 Signed-off-by: Srinivasan Muralidharan <[email protected]>
1 parent 775faf8 commit 99a6669

File tree

3 files changed

+236
-1
lines changed

3 files changed

+236
-1
lines changed

core/chaincode/exectransaction_test.go

+106
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,112 @@ func TestGetEvent(t *testing.T) {
14531453
closeListenerAndSleep(lis)
14541454
}
14551455

1456+
// TestGetRows gets large and small rows from a table and tests border conditions (FAB-860)
1457+
func TestGetRows(t *testing.T) {
1458+
testDBWrapper.CleanDB(t)
1459+
var opts []grpc.ServerOption
1460+
if viper.GetBool("peer.tls.enabled") {
1461+
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
1462+
if err != nil {
1463+
grpclog.Fatalf("Failed to generate credentials %v", err)
1464+
}
1465+
opts = []grpc.ServerOption{grpc.Creds(creds)}
1466+
}
1467+
grpcServer := grpc.NewServer(opts...)
1468+
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/tmpdb")
1469+
1470+
//use a different address than what we usually use for "peer"
1471+
//we override the peerAddress set in chaincode_support.go
1472+
peerAddress := "0.0.0.0:21212"
1473+
1474+
lis, err := net.Listen("tcp", peerAddress)
1475+
if err != nil {
1476+
t.Fail()
1477+
t.Logf("Error starting peer listener %s", err)
1478+
return
1479+
}
1480+
1481+
getPeerEndpoint := func() (*pb.PeerEndpoint, error) {
1482+
return &pb.PeerEndpoint{ID: &pb.PeerID{Name: "testpeer"}, Address: peerAddress}, nil
1483+
}
1484+
1485+
ccStartupTimeout := time.Duration(chaincodeStartupTimeoutDefault) * time.Millisecond
1486+
pb.RegisterChaincodeSupportServer(grpcServer, NewChaincodeSupport(DefaultChain, getPeerEndpoint, false, ccStartupTimeout, nil))
1487+
1488+
go grpcServer.Serve(lis)
1489+
1490+
var ctxt = context.Background()
1491+
1492+
url := "github.com/hyperledger/fabric/examples/chaincode/go/largerowsiter"
1493+
cID := &pb.ChaincodeID{Path: url}
1494+
1495+
f := "init"
1496+
args := util.ToChaincodeArgs(f, "")
1497+
1498+
spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}
1499+
1500+
_, err = deploy(ctxt, spec)
1501+
chaincodeID := spec.ChaincodeID.Name
1502+
if err != nil {
1503+
t.Fail()
1504+
t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err)
1505+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1506+
closeListenerAndSleep(lis)
1507+
return
1508+
}
1509+
1510+
//query using GetRows 1000, ie all rows
1511+
f = "query"
1512+
args = util.ToChaincodeArgs(f, "1000")
1513+
1514+
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}
1515+
1516+
var retval []byte
1517+
_, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY)
1518+
1519+
if err != nil || retval == nil {
1520+
t.Fail()
1521+
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
1522+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1523+
closeListenerAndSleep(lis)
1524+
return
1525+
}
1526+
1527+
if string(retval) != "1000" {
1528+
t.Fail()
1529+
t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval))
1530+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1531+
closeListenerAndSleep(lis)
1532+
return
1533+
}
1534+
1535+
//query just 10 rows
1536+
f = "query"
1537+
args = util.ToChaincodeArgs(f, "10")
1538+
1539+
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}
1540+
_, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY)
1541+
1542+
if err != nil || retval == nil {
1543+
t.Fail()
1544+
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
1545+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1546+
closeListenerAndSleep(lis)
1547+
return
1548+
}
1549+
1550+
if string(retval) != "10" {
1551+
t.Fail()
1552+
t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval))
1553+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1554+
closeListenerAndSleep(lis)
1555+
return
1556+
}
1557+
1558+
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
1559+
closeListenerAndSleep(lis)
1560+
}
1561+
14561562
func TestMain(m *testing.M) {
14571563
SetupTestConfig()
14581564
os.Exit(m.Run())

core/chaincode/shim/chaincode.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -586,11 +586,13 @@ func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row,
586586
if err != nil {
587587
return nil, fmt.Errorf("Error fetching rows: %s", err)
588588
}
589-
defer iter.Close()
590589

591590
rows := make(chan Row)
592591

593592
go func() {
593+
defer func() {
594+
recover()
595+
}()
594596
for iter.HasNext() {
595597
_, rowBytes, err := iter.Next()
596598
if err != nil {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package main
15+
16+
import (
17+
"fmt"
18+
"strconv"
19+
20+
"github.com/hyperledger/fabric/core/chaincode/shim"
21+
)
22+
23+
const (
24+
totalRows = 1000
25+
)
26+
27+
type largeRowsetChaincode struct {
28+
}
29+
30+
func (lrc *largeRowsetChaincode) retInAdd(ok bool, err error) ([]byte, error) {
31+
if err != nil {
32+
return nil, fmt.Errorf("operation failed. %s", err)
33+
}
34+
if !ok {
35+
return nil, err
36+
}
37+
return nil, nil
38+
}
39+
40+
// Init called for initializing the chaincode
41+
func (lrc *largeRowsetChaincode) Init(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
42+
// Create a new table
43+
if err := stub.CreateTable("LargeTable", []*shim.ColumnDefinition{
44+
{Name: "Key", Type: shim.ColumnDefinition_STRING, Key: true},
45+
{"Name", shim.ColumnDefinition_STRING, false},
46+
{"Value", shim.ColumnDefinition_STRING, false},
47+
}); err != nil {
48+
//just assume the table exists and was populated
49+
return nil, nil
50+
}
51+
52+
for i := 0; i < totalRows; i++ {
53+
col1 := fmt.Sprintf("Key_%d", i)
54+
col2 := fmt.Sprintf("Name_%d", i)
55+
col3 := fmt.Sprintf("Value_%d", i)
56+
if _, err := lrc.retInAdd(stub.InsertRow("LargeTable", shim.Row{Columns: []*shim.Column{
57+
&shim.Column{Value: &shim.Column_String_{String_: col1}},
58+
&shim.Column{Value: &shim.Column_String_{String_: col2}},
59+
&shim.Column{Value: &shim.Column_String_{String_: col3}},
60+
}})); err != nil {
61+
return nil, err
62+
}
63+
}
64+
65+
return nil, nil
66+
}
67+
68+
// Run callback representing the invocation of a chaincode
69+
func (lrc *largeRowsetChaincode) Invoke(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
70+
return nil, nil
71+
}
72+
73+
// Query callback representing the query of a chaincode.
74+
func (lrc *largeRowsetChaincode) Query(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
75+
var err error
76+
//stop at 1 greater than total rows by default (ie, read all rows)
77+
var stopAtRow = totalRows
78+
79+
if len(args) > 0 {
80+
stopAtRow, err = strconv.Atoi(string(args[0]))
81+
if err != nil {
82+
return nil, err
83+
}
84+
}
85+
model := "LargeTable"
86+
87+
var rowChannel <-chan shim.Row
88+
89+
rowChannel, err = stub.GetRows(model, []shim.Column{})
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
i := stopAtRow
95+
96+
var rows []*shim.Row
97+
for {
98+
select {
99+
case row, ok := <-rowChannel:
100+
if !ok {
101+
rowChannel = nil
102+
} else {
103+
rows = append(rows, &row)
104+
}
105+
}
106+
107+
i = i - 1
108+
if rowChannel == nil || i == 0 {
109+
break
110+
}
111+
}
112+
113+
col1 := shim.Column{Value: &shim.Column_String_{String_: "Key_2"}}
114+
_, err = stub.GetRow(model, []shim.Column{col1})
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
return []byte(fmt.Sprintf("%d", len(rows))), nil
120+
}
121+
122+
func main() {
123+
err := shim.Start(new(largeRowsetChaincode))
124+
if err != nil {
125+
fmt.Printf("Error starting the chaincode: %s", err)
126+
}
127+
}

0 commit comments

Comments
 (0)