Skip to content

Commit 41e842f

Browse files
committed
Add Committer service API interface.
Introduce committer service API with basic functionality required for gossip layer. Additionally refactored a toy example of committer service implementation within "noopssinglechain" package, separating beetwen the block delivery and committing functionality. Change-Id: Id2b05c4dae9af55c7f14801051ea510eaf54fcbb Signed-off-by: Artem Barger <[email protected]>
1 parent ed5ad4d commit 41e842f

File tree

6 files changed

+340
-229
lines changed

6 files changed

+340
-229
lines changed

core/committer/committer.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ limitations under the License.
1616

1717
package committer
1818

19+
import (
20+
"github.com/hyperledger/fabric/protos"
21+
)
22+
1923
// Committer is the interface supported by committers
2024
// The only committer is noopssinglechain committer.
2125
// The interface is intentionally sparse with the sole
@@ -24,6 +28,13 @@ package committer
2428
// more support (such as Gossip) this interface will
2529
// change
2630
type Committer interface {
27-
//Start registers and opens communications
28-
Start() error
31+
32+
// Commit block to the ledger
33+
CommitBlock(block *protos.Block2) error
34+
35+
// Get recent block sequence number
36+
LedgerHeight() (uint64, error)
37+
38+
// Gets blocks with sequence numbers provided in the slice
39+
GetBlocks(blockSeqs []uint64) []*protos.Block2
2940
}

core/committer/committer_impl.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package committer
18+
19+
import (
20+
"github.com/hyperledger/fabric/core/ledger"
21+
"github.com/hyperledger/fabric/protos"
22+
pb "github.com/hyperledger/fabric/protos"
23+
"github.com/op/go-logging"
24+
)
25+
26+
//--------!!!IMPORTANT!!-!!IMPORTANT!!-!!IMPORTANT!!---------
27+
// This is used merely to complete the loop for the "skeleton"
28+
// path so we can reason about and modify committer component
29+
// more effectively using code.
30+
31+
var logger *logging.Logger // package-level logger
32+
33+
func init() {
34+
logger = logging.MustGetLogger("committer")
35+
}
36+
37+
type LedgerCommitter struct {
38+
ledger ledger.ValidatedLedger
39+
}
40+
41+
// NewLedgerCommitter is a factory function to create an instance of the committer
42+
func NewLedgerCommitter(ledger ledger.ValidatedLedger) *LedgerCommitter {
43+
return &LedgerCommitter{ledger}
44+
}
45+
46+
// CommitBlock commits block to into the ledger
47+
func (lc *LedgerCommitter) CommitBlock(block *protos.Block2) error {
48+
if _, _, err := lc.ledger.RemoveInvalidTransactionsAndPrepare(block); err != nil {
49+
return err
50+
}
51+
if err := lc.ledger.Commit(); err != nil {
52+
return err
53+
}
54+
return nil
55+
}
56+
57+
// LedgerHeight returns recently committed block sequence number
58+
func (lc *LedgerCommitter) LedgerHeight() (uint64, error) {
59+
var info *pb.BlockchainInfo
60+
var err error
61+
if info, err = lc.ledger.GetBlockchainInfo(); err != nil {
62+
logger.Errorf("Cannot get blockchain info, %s\n", info)
63+
return uint64(0), err
64+
}
65+
66+
return info.Height, nil
67+
}
68+
69+
// GetBlocks used to retrieve blocks with sequence numbers provided in the slice
70+
func (lc *LedgerCommitter) GetBlocks(blockSeqs []uint64) []*protos.Block2 {
71+
blocks := make([]*protos.Block2, 0)
72+
73+
for _, seqNum := range blockSeqs {
74+
var block *protos.Block2
75+
var err error
76+
if block, err = lc.ledger.GetBlockByNumber(seqNum); err != nil {
77+
logger.Errorf("Could not able to acquire block num %d, from the ledger skipping...\n", seqNum)
78+
continue
79+
}
80+
blocks = append(blocks, block)
81+
}
82+
83+
return blocks
84+
}

core/committer/committer_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package committer
18+
19+
import (
20+
"os"
21+
"testing"
22+
23+
"github.com/hyperledger/fabric/core/ledger/kvledger"
24+
"github.com/hyperledger/fabric/core/ledger/testutil"
25+
"github.com/hyperledger/fabric/protos"
26+
"github.com/stretchr/testify/assert"
27+
)
28+
29+
func TestKVLedgerBlockStorage(t *testing.T) {
30+
conf := kvledger.NewConf("/tmp/tests/ledger/", 0)
31+
defer os.RemoveAll("/tmp/tests/ledger/")
32+
33+
ledger, _ := kvledger.NewKVLedger(conf)
34+
defer ledger.Close()
35+
36+
committer := NewLedgerCommitter(ledger)
37+
38+
var err error
39+
40+
height, err := committer.LedgerHeight()
41+
assert.Equal(t, uint64(0), height)
42+
assert.NoError(t, err)
43+
44+
bcInfo, _ := ledger.GetBlockchainInfo()
45+
testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{
46+
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})
47+
48+
simulator, _ := ledger.NewTxSimulator()
49+
simulator.SetState("ns1", "key1", []byte("value1"))
50+
simulator.SetState("ns1", "key2", []byte("value2"))
51+
simulator.SetState("ns1", "key3", []byte("value3"))
52+
simulator.Done()
53+
54+
simRes, _ := simulator.GetTxSimulationResults()
55+
block1 := testutil.ConstructBlockForSimulationResults(t, [][]byte{simRes})
56+
57+
err = committer.CommitBlock(block1)
58+
assert.NoError(t, err)
59+
60+
height, err = committer.LedgerHeight()
61+
assert.Equal(t, uint64(1), height)
62+
assert.NoError(t, err)
63+
64+
blocks := committer.GetBlocks([]uint64{1})
65+
assert.Equal(t, 1, len(blocks))
66+
assert.NoError(t, err)
67+
68+
bcInfo, _ = ledger.GetBlockchainInfo()
69+
serBlock1, _ := protos.ConstructSerBlock2(block1)
70+
block1Hash := serBlock1.ComputeHash()
71+
testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{
72+
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})
73+
}
+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package noopssinglechain
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"github.com/golang/protobuf/proto"
24+
"github.com/hyperledger/fabric/core/chaincode"
25+
"github.com/hyperledger/fabric/core/committer"
26+
"github.com/hyperledger/fabric/core/ledger/kvledger"
27+
"github.com/hyperledger/fabric/protos"
28+
"github.com/hyperledger/fabric/protos/common"
29+
"github.com/hyperledger/fabric/protos/orderer"
30+
putils "github.com/hyperledger/fabric/protos/utils"
31+
"github.com/op/go-logging"
32+
"github.com/spf13/viper"
33+
"golang.org/x/net/context"
34+
"google.golang.org/grpc"
35+
)
36+
37+
var logger *logging.Logger // package-level logger
38+
39+
func init() {
40+
logger = logging.MustGetLogger("committer")
41+
}
42+
43+
// DeliverService used to communicate with orderers to obtain
44+
// new block and send the to the committer service
45+
type DeliverService struct {
46+
client orderer.AtomicBroadcast_DeliverClient
47+
windowSize uint64
48+
unAcknowledged uint64
49+
committer *committer.LedgerCommitter
50+
}
51+
52+
// NewDeliverService construction function to create and initilize
53+
// delivery service instance
54+
func NewDeliverService() *DeliverService {
55+
if viper.GetBool("peer.committer.enabled") {
56+
logger.Infof("Creating committer for single noops endorser")
57+
58+
var opts []grpc.DialOption
59+
opts = append(opts, grpc.WithInsecure())
60+
opts = append(opts, grpc.WithTimeout(3*time.Second))
61+
opts = append(opts, grpc.WithBlock())
62+
endpoint := viper.GetString("peer.committer.ledger.orderer")
63+
conn, err := grpc.Dial(endpoint, opts...)
64+
if err != nil {
65+
logger.Errorf("Cannot dial to %s, because of %s", endpoint, err)
66+
return nil
67+
}
68+
var abc orderer.AtomicBroadcast_DeliverClient
69+
abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
70+
if err != nil {
71+
logger.Errorf("Unable to initialize atomic broadcast, due to %s", err)
72+
return nil
73+
}
74+
75+
deliverService := &DeliverService{
76+
// Atomic Broadcast Deliver Clienet
77+
client: abc,
78+
// Instance of RawLedger
79+
committer: committer.NewLedgerCommitter(kvledger.GetLedger(string(chaincode.DefaultChain))),
80+
windowSize: 10,
81+
}
82+
return deliverService
83+
}
84+
logger.Infof("Committer disabled")
85+
return nil
86+
}
87+
88+
// Start the delivery service to read the block via delivery
89+
// protocol from the orderers
90+
func (d *DeliverService) Start() error {
91+
if err := d.seekOldest(); err != nil {
92+
return err
93+
}
94+
95+
d.readUntilClose()
96+
return nil
97+
}
98+
99+
func (d *DeliverService) seekOldest() error {
100+
return d.client.Send(&orderer.DeliverUpdate{
101+
Type: &orderer.DeliverUpdate_Seek{
102+
Seek: &orderer.SeekInfo{
103+
Start: orderer.SeekInfo_OLDEST,
104+
WindowSize: d.windowSize,
105+
},
106+
},
107+
})
108+
}
109+
110+
func (d *DeliverService) readUntilClose() {
111+
for {
112+
msg, err := d.client.Recv()
113+
if err != nil {
114+
return
115+
}
116+
117+
switch t := msg.Type.(type) {
118+
case *orderer.DeliverResponse_Error:
119+
if t.Error == common.Status_SUCCESS {
120+
fmt.Println("ERROR! Received success in error field")
121+
return
122+
}
123+
fmt.Println("Got error ", t)
124+
case *orderer.DeliverResponse_Block:
125+
block := &protos.Block2{}
126+
for _, d := range t.Block.Data.Data {
127+
if d != nil {
128+
if tx, err := putils.GetEndorserTxFromBlock(d); err != nil {
129+
fmt.Printf("Error getting tx from block(%s)\n", err)
130+
} else if tx != nil {
131+
if t, err := proto.Marshal(tx); err == nil {
132+
block.Transactions = append(block.Transactions, t)
133+
} else {
134+
fmt.Printf("Cannot marshal transactoins %s\n", err)
135+
}
136+
} else {
137+
fmt.Printf("Nil tx from block\n")
138+
}
139+
}
140+
}
141+
// Once block is constructed need to commit into the ledger
142+
if err = d.committer.CommitBlock(block); err != nil {
143+
fmt.Printf("Got error while committing(%s)\n", err)
144+
} else {
145+
fmt.Printf("Commit success, created a block!\n")
146+
}
147+
148+
d.unAcknowledged++
149+
if d.unAcknowledged >= d.windowSize/2 {
150+
fmt.Println("Sending acknowledgement")
151+
err = d.client.Send(&orderer.DeliverUpdate{
152+
Type: &orderer.DeliverUpdate_Acknowledgement{
153+
Acknowledgement: &orderer.Acknowledgement{
154+
Number: t.Block.Header.Number,
155+
},
156+
},
157+
})
158+
if err != nil {
159+
return
160+
}
161+
d.unAcknowledged = 0
162+
}
163+
default:
164+
fmt.Println("Received unknown: ", t)
165+
return
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)