Skip to content

Commit f9fa8d6

Browse files
committed
[FAB-2828] Add resilient delivery client to peer
This commit adds a resilient delivery client to the peer's deliveryService. This client: - Accepts the following arguments at creation: - Connection Producer (previous change set) - clientFactory (creates AtomicBroadcastClient from ClientConn) - broadcastSetup (function that is going to be used to send a Send() with SeekInfo to orderer) - backoffStrategy - retry logic descriptor, a function that can implement any kind of backoff policy, i.e exponential backoff, etc. etc. - Able to reconnect to the ordering service when the connection is broken - Hides all failure handling and reconnection logic from its user In a later commit I will: - Move the connection creation code that is invoked only once at creation instead of when needed, to the factory of the core/comm/producer.go that was created in the previous commit. - Change the blocksprovider accordingly to use this newly introduced broadcastClient Provided 12 test cases with code coverage of 100% Change-Id: I96a46b76e8fb227eb8bea4c8ded9b788e4fd0eef Signed-off-by: Yacov Manevich <[email protected]>
1 parent 046a667 commit f9fa8d6

File tree

3 files changed

+776
-0
lines changed

3 files changed

+776
-0
lines changed

core/deliverservice/client.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package deliverclient
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"sync"
23+
"sync/atomic"
24+
"time"
25+
26+
"github.com/hyperledger/fabric/core/comm"
27+
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
28+
"github.com/hyperledger/fabric/protos/common"
29+
"github.com/hyperledger/fabric/protos/orderer"
30+
"golang.org/x/net/context"
31+
"google.golang.org/grpc"
32+
)
33+
34+
// broadcastSetup is a function that is called by the broadcastClient immediately after each
35+
// successful connection to the ordering service
36+
type broadcastSetup func(blocksprovider.BlocksDeliverer) error
37+
38+
// retryPolicy receives as parameters the number of times the attempt has failed
39+
// and a duration that specifies the total elapsed time passed since the first attempt.
40+
// If further attempts should be made, it returns:
41+
// - a time duration after which the next attempt would be made, true
42+
// Else, a zero duration, false
43+
type retryPolicy func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool)
44+
45+
// clientFactory creates a gRPC broadcast client out of a ClientConn
46+
type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
47+
48+
type broadcastClient struct {
49+
stopFlag int32
50+
sync.RWMutex
51+
stopChan chan struct{}
52+
createClient clientFactory
53+
shouldRetry retryPolicy
54+
onConnect broadcastSetup
55+
prod comm.ConnectionProducer
56+
blocksprovider.BlocksDeliverer
57+
conn *grpc.ClientConn
58+
}
59+
60+
// NewBroadcastClient returns a broadcastClient with the given params
61+
func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient {
62+
return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)}
63+
}
64+
65+
// Recv receives a message from the ordering service
66+
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
67+
o, err := bc.try(func() (interface{}, error) {
68+
return bc.BlocksDeliverer.Recv()
69+
})
70+
if err != nil {
71+
return nil, err
72+
}
73+
return o.(*orderer.DeliverResponse), nil
74+
}
75+
76+
// Send sends a message to the ordering service
77+
func (bc *broadcastClient) Send(msg *common.Envelope) error {
78+
_, err := bc.try(func() (interface{}, error) {
79+
return nil, bc.BlocksDeliverer.Send(msg)
80+
})
81+
return err
82+
}
83+
84+
func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) {
85+
attempt := 0
86+
start := time.Now()
87+
var backoffDuration time.Duration
88+
retry := true
89+
for retry && !bc.shouldStop() {
90+
attempt++
91+
resp, err := bc.doAction(action)
92+
if err != nil {
93+
backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start))
94+
if !retry {
95+
break
96+
}
97+
bc.sleep(backoffDuration)
98+
continue
99+
}
100+
return resp, nil
101+
}
102+
if bc.shouldStop() {
103+
return nil, errors.New("Client is closing")
104+
}
105+
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start))
106+
}
107+
108+
func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
109+
if bc.BlocksDeliverer == nil {
110+
err := bc.connect()
111+
if err != nil {
112+
return nil, err
113+
}
114+
}
115+
resp, err := action()
116+
if err != nil {
117+
bc.conn.Close()
118+
bc.BlocksDeliverer = nil
119+
bc.conn = nil
120+
return nil, err
121+
}
122+
return resp, nil
123+
}
124+
125+
func (bc *broadcastClient) sleep(duration time.Duration) {
126+
select {
127+
case <-time.After(duration):
128+
case <-bc.stopChan:
129+
}
130+
}
131+
132+
func (bc *broadcastClient) connect() error {
133+
conn, endpoint, err := bc.prod.NewConnection()
134+
if err != nil {
135+
logger.Error("Failed obtaining connection:", err)
136+
return err
137+
}
138+
abc, err := bc.createClient(conn).Deliver(context.Background())
139+
if err != nil {
140+
logger.Error("Connection to ", endpoint, "established but was unable to create gRPC stream:", err)
141+
conn.Close()
142+
return err
143+
}
144+
err = bc.onConnect(bc)
145+
if err == nil {
146+
bc.Lock()
147+
bc.conn = conn
148+
bc.Unlock()
149+
bc.BlocksDeliverer = abc
150+
return nil
151+
}
152+
logger.Error("Failed setting up broadcast:", err)
153+
conn.Close()
154+
return err
155+
}
156+
157+
func (bc *broadcastClient) shouldStop() bool {
158+
return atomic.LoadInt32(&bc.stopFlag) == int32(1)
159+
}
160+
161+
func (bc *broadcastClient) Close() {
162+
atomic.StoreInt32(&bc.stopFlag, int32(1))
163+
bc.stopChan <- struct{}{}
164+
bc.RLock()
165+
defer bc.RUnlock()
166+
if bc.conn == nil {
167+
return
168+
}
169+
bc.conn.Close()
170+
}

0 commit comments

Comments
 (0)