Skip to content

Commit 945c4f7

Browse files
committed
[FAB-1924]: Rework delivery client
Move ordering service delivery client management into gossip service, this to be able to maintain single connection to the ordering service. Moreover this to be widely used after gossip based leader election integrated. Change-Id: Iea9a70a1d6ba82caa55716444c54f3ddbc19673b Signed-off-by: Artem Barger <[email protected]>
1 parent 8762744 commit 945c4f7

13 files changed

+789
-200
lines changed

core/deliverservice/client.go core/deliverservice/blocksprovider/blocksprovider.go

+119-134
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. 2017 All Rights Reserved.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -14,150 +14,172 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package deliverclient
17+
package blocksprovider
1818

1919
import (
2020
"math"
21-
"time"
21+
"sync/atomic"
2222

2323
"github.com/golang/protobuf/proto"
24-
"github.com/hyperledger/fabric/core/committer"
2524
gossipcommon "github.com/hyperledger/fabric/gossip/common"
25+
"github.com/hyperledger/fabric/gossip/discovery"
26+
2627
gossip_proto "github.com/hyperledger/fabric/gossip/proto"
27-
"github.com/hyperledger/fabric/gossip/service"
2828
"github.com/hyperledger/fabric/protos/common"
2929
"github.com/hyperledger/fabric/protos/orderer"
3030
"github.com/hyperledger/fabric/protos/utils"
3131
"github.com/op/go-logging"
32-
"github.com/spf13/viper"
33-
"golang.org/x/net/context"
34-
"google.golang.org/grpc"
3532
)
3633

37-
var logger *logging.Logger // package-level logger
34+
// LedgerInfo an adapter to provide the interface to query
35+
// the ledger committer for current ledger height
36+
type LedgerInfo interface {
37+
// LedgerHeight returns current local ledger height
38+
LedgerHeight() (uint64, error)
39+
}
3840

39-
func init() {
40-
logger = logging.MustGetLogger("deliveryService")
41+
// GossipServiceAdapter serves to provide basic functionality
42+
// required from gossip service by delivery service
43+
type GossipServiceAdapter interface {
44+
// PeersOfChannel returns slice with members of specified channel
45+
PeersOfChannel(gossipcommon.ChainID) []discovery.NetworkMember
46+
47+
// AddPayload adds payload to the local state sync buffer
48+
AddPayload(chainID string, payload *gossip_proto.Payload) error
49+
50+
// Gossip the message across the peers
51+
Gossip(msg *gossip_proto.GossipMessage)
52+
}
53+
54+
// BlocksProvider used to read blocks from the ordering service
55+
// for specified chain it subscribed to
56+
type BlocksProvider interface {
57+
// RequestBlock acquire new blocks from ordering service based on
58+
// information provided by ledger info instance
59+
RequestBlocks(ledgerInfoProvider LedgerInfo) error
60+
61+
// DeliverBlocks starts delivering and disseminating blocks
62+
DeliverBlocks()
63+
64+
// Stop shutdowns blocks provider and stops delivering new blocks
65+
Stop()
4166
}
4267

43-
// DeliverService used to communicate with orderers to obtain
44-
// new block and send the to the committer service
45-
type DeliverService struct {
46-
client orderer.AtomicBroadcast_DeliverClient
68+
// BlocksDeliverer defines interface which actually helps
69+
// to abstract the AtomicBroadcast_DeliverClient with only
70+
// required method for blocks provider. This also help to
71+
// build up mocking facilities for testing purposes
72+
type BlocksDeliverer interface {
73+
// Recv capable to bring new blocks from the ordering service
74+
Recv() (*orderer.DeliverResponse, error)
4775

76+
// Send used to send request to the ordering service to obtain new blocks
77+
Send(*common.Envelope) error
78+
}
79+
80+
// blocksProviderImpl the actual implementation for BlocksProvider interface
81+
type blocksProviderImpl struct {
4882
chainID string
49-
conn *grpc.ClientConn
83+
84+
client BlocksDeliverer
85+
86+
gossip GossipServiceAdapter
87+
88+
done int32
5089
}
5190

52-
// StopDeliveryService sends stop to the delivery service reference
53-
func StopDeliveryService(service *DeliverService) {
54-
if service != nil {
55-
service.Stop()
91+
var logger *logging.Logger // package-level logger
92+
93+
func init() {
94+
logger = logging.MustGetLogger("blocksProvider")
95+
}
96+
97+
// NewBlocksProvider constructor function to creare blocks deliverer instance
98+
func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServiceAdapter) BlocksProvider {
99+
return &blocksProviderImpl{
100+
chainID: chainID,
101+
client: client,
102+
gossip: gossip,
56103
}
57104
}
58105

59-
// NewDeliverService construction function to create and initilize
60-
// delivery service instance
61-
func NewDeliverService(chainID string) *DeliverService {
62-
if viper.GetBool("peer.committer.enabled") {
63-
logger.Infof("Creating committer for single noops endorser")
64-
deliverService := &DeliverService{
65-
// Instance of RawLedger
66-
chainID: chainID,
106+
// DeliverBlocks used to pull out blocks from the ordering service to
107+
// distributed them across peers
108+
func (b *blocksProviderImpl) DeliverBlocks() {
109+
for !b.isDone() {
110+
msg, err := b.client.Recv()
111+
if err != nil {
112+
logger.Warningf("Receive error: %s", err.Error())
113+
return
67114
}
115+
switch t := msg.Type.(type) {
116+
case *orderer.DeliverResponse_Status:
117+
if t.Status == common.Status_SUCCESS {
118+
logger.Warning("ERROR! Received success for a seek that should never complete")
119+
return
120+
}
121+
logger.Warning("Got error ", t)
122+
case *orderer.DeliverResponse_Block:
123+
seqNum := t.Block.Header.Number
68124

69-
return deliverService
125+
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
126+
// Create payload with a block received
127+
payload := createPayload(seqNum, t.Block)
128+
// Use payload to create gossip message
129+
gossipMsg := createGossipMsg(b.chainID, payload)
130+
131+
logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
132+
// Add payload to local state payloads buffer
133+
b.gossip.AddPayload(b.chainID, payload)
134+
135+
// Gossip messages with other nodes
136+
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
137+
b.gossip.Gossip(gossipMsg)
138+
default:
139+
logger.Warning("Received unknown: ", t)
140+
return
141+
}
70142
}
71-
logger.Infof("Committer disabled")
72-
return nil
73143
}
74144

75-
func (d *DeliverService) startDeliver(committer committer.Committer) error {
76-
logger.Info("Starting deliver service client")
77-
err := d.initDeliver()
145+
// Stops blocks delivery provider
146+
func (b *blocksProviderImpl) Stop() {
147+
atomic.StoreInt32(&b.done, 1)
148+
}
78149

79-
if err != nil {
80-
logger.Errorf("Can't initiate deliver protocol [%s]", err)
81-
return err
82-
}
150+
// Check whenever provider is stopped
151+
func (b *blocksProviderImpl) isDone() bool {
152+
return atomic.LoadInt32(&b.done) == 1
153+
}
83154

84-
height, err := committer.LedgerHeight()
155+
func (b *blocksProviderImpl) RequestBlocks(ledgerInfoProvider LedgerInfo) error {
156+
height, err := ledgerInfoProvider.LedgerHeight()
85157
if err != nil {
86158
logger.Errorf("Can't get legder height from committer [%s]", err)
87159
return err
88160
}
89161

90162
if height > 0 {
91163
logger.Debugf("Starting deliver with block [%d]", height)
92-
if err := d.seekLatestFromCommitter(height); err != nil {
164+
if err := b.seekLatestFromCommitter(height); err != nil {
93165
return err
94166
}
95-
96167
} else {
97168
logger.Debug("Starting deliver with olders block")
98-
if err := d.seekOldest(); err != nil {
169+
if err := b.seekOldest(); err != nil {
99170
return err
100171
}
101-
102172
}
103173

104-
d.readUntilClose()
105-
106-
return nil
107-
}
108-
109-
func (d *DeliverService) initDeliver() error {
110-
opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(3 * time.Second), grpc.WithBlock()}
111-
endpoint := viper.GetString("peer.committer.ledger.orderer")
112-
conn, err := grpc.Dial(endpoint, opts...)
113-
if err != nil {
114-
logger.Errorf("Cannot dial to %s, because of %s", endpoint, err)
115-
return err
116-
}
117-
var abc orderer.AtomicBroadcast_DeliverClient
118-
abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
119-
if err != nil {
120-
logger.Errorf("Unable to initialize atomic broadcast, due to %s", err)
121-
return err
122-
}
123-
124-
// Atomic Broadcast Deliver Client
125-
d.client = abc
126-
d.conn = conn
127174
return nil
128-
129175
}
130176

131-
func (d *DeliverService) stopDeliver() {
132-
if d.conn != nil {
133-
d.conn.Close()
134-
}
135-
}
136-
137-
// Stop all service and release resources
138-
func (d *DeliverService) Stop() {
139-
d.stopDeliver()
140-
}
141-
142-
// Start delivery service
143-
func (d *DeliverService) Start(committer committer.Committer) {
144-
go d.checkLeaderAndRunDeliver(committer)
145-
}
146-
147-
func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer) {
148-
isLeader := viper.GetBool("peer.gossip.orgLeader")
149-
150-
if isLeader {
151-
d.startDeliver(committer)
152-
}
153-
}
154-
155-
func (d *DeliverService) seekOldest() error {
156-
return d.client.Send(&common.Envelope{
177+
func (b *blocksProviderImpl) seekOldest() error {
178+
return b.client.Send(&common.Envelope{
157179
Payload: utils.MarshalOrPanic(&common.Payload{
158180
Header: &common.Header{
159181
ChainHeader: &common.ChainHeader{
160-
ChainID: d.chainID,
182+
ChainID: b.chainID,
161183
},
162184
SignatureHeader: &common.SignatureHeader{},
163185
},
@@ -170,12 +192,12 @@ func (d *DeliverService) seekOldest() error {
170192
})
171193
}
172194

173-
func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
174-
return d.client.Send(&common.Envelope{
195+
func (b *blocksProviderImpl) seekLatestFromCommitter(height uint64) error {
196+
return b.client.Send(&common.Envelope{
175197
Payload: utils.MarshalOrPanic(&common.Payload{
176198
Header: &common.Header{
177199
ChainHeader: &common.ChainHeader{
178-
ChainID: d.chainID,
200+
ChainID: b.chainID,
179201
},
180202
SignatureHeader: &common.SignatureHeader{},
181203
},
@@ -188,43 +210,6 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
188210
})
189211
}
190212

191-
func (d *DeliverService) readUntilClose() {
192-
for {
193-
msg, err := d.client.Recv()
194-
if err != nil {
195-
logger.Warningf("Receive error: %s", err.Error())
196-
return
197-
}
198-
switch t := msg.Type.(type) {
199-
case *orderer.DeliverResponse_Status:
200-
if t.Status == common.Status_SUCCESS {
201-
logger.Warning("ERROR! Received success for a seek that should never complete")
202-
return
203-
}
204-
logger.Warning("Got error ", t)
205-
case *orderer.DeliverResponse_Block:
206-
seqNum := t.Block.Header.Number
207-
208-
numberOfPeers := len(service.GetGossipService().PeersOfChannel(gossipcommon.ChainID(d.chainID)))
209-
// Create payload with a block received
210-
payload := createPayload(seqNum, t.Block)
211-
// Use payload to create gossip message
212-
gossipMsg := createGossipMsg(d.chainID, payload)
213-
214-
logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
215-
// Add payload to local state payloads buffer
216-
service.GetGossipService().AddPayload(d.chainID, payload)
217-
218-
// Gossip messages with other nodes
219-
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
220-
service.GetGossipService().Gossip(gossipMsg)
221-
default:
222-
logger.Warning("Received unknown: ", t)
223-
return
224-
}
225-
}
226-
}
227-
228213
func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_proto.GossipMessage {
229214
gossipMsg := &gossip_proto.GossipMessage{
230215
Nonce: 0,

0 commit comments

Comments
 (0)