Skip to content

Commit a069559

Browse files
committed
[FAB-4155] Make deliveryClient failover on 503 error
This change set addresses the client-side of the peer <---> orderer connection in spite of internal errors in the ordering service, namely kafka connection issues. This commit: - Exposes a method of the DeliveryClient - Disconnect() that makes the client re-connect to some ordering service node (can be the same one, it is randomly chosen) - Adds logic in the blocks provider to invoke Disconnect() upon a reception of ServiceUnavailable(503) status from the orderer node - Fixes a small bug - the seeding of time was missing from the connection producer and as a result it was always chosing orderer endpoints deterministically. Change-Id: I949453237c88cb27f9a4fa96f7acd81974f29712 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 2590cce commit a069559

File tree

8 files changed

+248
-20
lines changed

8 files changed

+248
-20
lines changed

core/comm/producer.go

+2-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"math/rand"
2222
"sync"
23+
"time"
2324

2425
"github.com/hyperledger/fabric/common/flogging"
2526
"google.golang.org/grpc"
@@ -91,22 +92,10 @@ func (cp *connProducer) UpdateEndpoints(endpoints []string) {
9192
func shuffle(a []string) []string {
9293
n := len(a)
9394
returnedSlice := make([]string, n)
95+
rand.Seed(time.Now().UnixNano())
9496
indices := rand.Perm(n)
9597
for i, idx := range indices {
9698
returnedSlice[i] = a[idx]
9799
}
98100
return returnedSlice
99101
}
100-
101-
// filterOut receives a slice of strings and a string to filter out
102-
// and returns the slice without the string
103-
func filterOut(a []string, filteredOut string) []string {
104-
var slice2Return []string
105-
for _, s := range a {
106-
if s == filteredOut {
107-
continue
108-
}
109-
slice2Return = append(slice2Return, s)
110-
}
111-
return slice2Return
112-
}

core/deliverservice/blocksprovider/blocksprovider.go

+7
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ type streamClient interface {
7979

8080
// Close closes the stream and its underlying connection
8181
Close()
82+
83+
// Disconnect disconnects from the remote node
84+
Disconnect()
8285
}
8386

8487
// blocksProviderImpl the actual implementation for BlocksProvider interface
@@ -127,6 +130,10 @@ func (b *blocksProviderImpl) DeliverBlocks() {
127130
return
128131
}
129132
logger.Warningf("[%s] Got error %v", b.chainID, t)
133+
if t.Status == common.Status_SERVICE_UNAVAILABLE {
134+
b.client.Disconnect()
135+
continue
136+
}
130137
case *orderer.DeliverResponse_Block:
131138
seqNum := t.Block.Header.Number
132139

core/deliverservice/blocksprovider/blocksprovider_test.go

+66-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func TestBlocksProvider_CheckTerminationDeliveryResponseStatus(t *testing.T) {
127127
tmp := struct{ mocks.MockBlocksDeliverer }{}
128128

129129
// Making mocked Recv() function to return DeliverResponse_Status to force block
130-
// provider to fail and exit, cheking that in that case to block was actually
130+
// provider to fail and exit, checking that in that case to block was actually
131131
// delivered.
132132
tmp.MockRecv = func(mock *mocks.MockBlocksDeliverer) (*orderer.DeliverResponse, error) {
133133
return &orderer.DeliverResponse{
@@ -179,6 +179,71 @@ func TestBlocksProvider_CheckTerminationDeliveryResponseStatus(t *testing.T) {
179179
}
180180
}
181181

182+
func TestBlocksProvider_DeliveryServiceUnavailable(t *testing.T) {
183+
sendBlock := func(seqNum uint64) *orderer.DeliverResponse {
184+
return &orderer.DeliverResponse{
185+
Type: &orderer.DeliverResponse_Block{
186+
Block: &common.Block{
187+
Header: &common.BlockHeader{
188+
Number: seqNum,
189+
DataHash: []byte{},
190+
PreviousHash: []byte{},
191+
},
192+
Data: &common.BlockData{
193+
Data: [][]byte{},
194+
},
195+
}},
196+
}
197+
}
198+
sendStatus := func(status common.Status) *orderer.DeliverResponse {
199+
return &orderer.DeliverResponse{
200+
Type: &orderer.DeliverResponse_Status{
201+
Status: status,
202+
},
203+
}
204+
}
205+
206+
bd := mocks.MockBlocksDeliverer{DisconnectCalled: make(chan struct{}, 1)}
207+
mcs := &mockMCS{}
208+
mcs.On("VerifyBlock", mock.Anything).Return(nil)
209+
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64, 2)}
210+
provider := &blocksProviderImpl{
211+
chainID: "***TEST_CHAINID***",
212+
gossip: gossipServiceAdapter,
213+
client: &bd,
214+
mcs: mcs,
215+
}
216+
217+
attempts := int32(0)
218+
bd.MockRecv = func(mock *mocks.MockBlocksDeliverer) (*orderer.DeliverResponse, error) {
219+
atomic.AddInt32(&attempts, 1)
220+
switch atomic.LoadInt32(&attempts) {
221+
case int32(1):
222+
return sendBlock(0), nil
223+
case int32(2):
224+
return sendStatus(common.Status_SERVICE_UNAVAILABLE), nil
225+
case int32(3):
226+
return sendBlock(1), nil
227+
default:
228+
provider.Stop()
229+
return nil, errors.New("Stopping")
230+
}
231+
}
232+
233+
go provider.DeliverBlocks()
234+
assert.Len(t, bd.DisconnectCalled, 0)
235+
for i := 0; i < 2; i++ {
236+
select {
237+
case seq := <-gossipServiceAdapter.GossipBlockDisseminations:
238+
assert.Equal(t, uint64(i), seq)
239+
case <-time.After(time.Second * 3):
240+
assert.Fail(t, "Didn't receive a block within a timely manner")
241+
}
242+
}
243+
// Make sure disconnect was called in between the deliveries
244+
assert.Len(t, bd.DisconnectCalled, 1)
245+
}
246+
182247
func TestBlockFetchFailure(t *testing.T) {
183248
rcvr := func(mock *mocks.MockBlocksDeliverer) (*orderer.DeliverResponse, error) {
184249
return nil, errors.New("Failed fetching block")

core/deliverservice/client.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interfa
120120
}
121121
resp, err := action()
122122
if err != nil {
123-
bc.disconnect()
123+
bc.Disconnect()
124124
return nil, err
125125
}
126126
return resp, nil
@@ -151,7 +151,7 @@ func (bc *broadcastClient) connect() error {
151151
}
152152
// If we reached here, lets make sure connection is closed
153153
// and nullified before we return
154-
bc.disconnect()
154+
bc.Disconnect()
155155
return err
156156
}
157157

@@ -191,6 +191,7 @@ func (bc *broadcastClient) shouldStop() bool {
191191
return atomic.LoadInt32(&bc.stopFlag) == int32(1)
192192
}
193193

194+
// Close makes the client close its connection and shut down
194195
func (bc *broadcastClient) Close() {
195196
bc.Lock()
196197
defer bc.Unlock()
@@ -205,7 +206,8 @@ func (bc *broadcastClient) Close() {
205206
bc.conn.Close()
206207
}
207208

208-
func (bc *broadcastClient) disconnect() {
209+
// Disconnect makes the client close the existing connection
210+
func (bc *broadcastClient) Disconnect() {
209211
bc.Lock()
210212
defer bc.Unlock()
211213
if bc.conn == nil {

core/deliverservice/client_test.go

+71-2
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,14 @@ func TestCloseWhileRecv(t *testing.T) {
490490
}
491491
bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy)
492492
var flag int32
493-
time.AfterFunc(time.Second, func() {
493+
go func() {
494+
for fakeOrderer.ConnCount() == 0 {
495+
time.Sleep(time.Second)
496+
}
494497
atomic.StoreInt32(&flag, int32(1))
495498
bc.Close()
496499
bc.Close() // Try to close a second time
497-
})
500+
}()
498501
resp, err := bc.Recv()
499502
// Ensure we returned because bc.Close() was called and not because some other reason
500503
assert.Equal(t, int32(1), atomic.LoadInt32(&flag), "Recv returned before bc.Close() was called")
@@ -595,6 +598,72 @@ func TestProductionUsage(t *testing.T) {
595598
cl.Close()
596599
}
597600

601+
func TestDisconnect(t *testing.T) {
602+
// Scenario: spawn 2 ordering service instances
603+
// and a client.
604+
// Have the client try to Recv() from one of them,
605+
// and disconnect the client until it tries connecting
606+
// to the other instance.
607+
608+
defer ensureNoGoroutineLeak(t)()
609+
os1 := mocks.NewOrderer(5613, t)
610+
os1.SetNextExpectedSeek(5)
611+
os2 := mocks.NewOrderer(5614, t)
612+
os2.SetNextExpectedSeek(5)
613+
614+
defer os1.Shutdown()
615+
defer os2.Shutdown()
616+
617+
waitForConnectionToSomeOSN := func() {
618+
for {
619+
if os1.ConnCount() > 0 || os2.ConnCount() > 0 {
620+
return
621+
}
622+
time.Sleep(time.Millisecond * 100)
623+
}
624+
}
625+
626+
connFact := func(endpoint string) (*grpc.ClientConn, error) {
627+
return grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock())
628+
}
629+
prod := comm.NewConnectionProducer(connFact, []string{"localhost:5613", "localhost:5614"})
630+
clFact := func(cc *grpc.ClientConn) orderer.AtomicBroadcastClient {
631+
return orderer.NewAtomicBroadcastClient(cc)
632+
}
633+
onConnect := func(bd blocksprovider.BlocksDeliverer) error {
634+
return nil
635+
}
636+
retryPol := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
637+
return time.Millisecond * 10, attemptNum < 100
638+
}
639+
640+
cl := NewBroadcastClient(prod, clFact, onConnect, retryPol)
641+
stopChan := make(chan struct{})
642+
go func() {
643+
cl.Recv()
644+
stopChan <- struct{}{}
645+
}()
646+
waitForConnectionToSomeOSN()
647+
cl.Disconnect()
648+
649+
i := 0
650+
for (os1.ConnCount() == 0 || os2.ConnCount() == 0) && i < 100 {
651+
t.Log("Attempt", i, "os1ConnCount()=", os1.ConnCount(), "os2ConnCount()=", os2.ConnCount())
652+
i++
653+
if i == 100 {
654+
assert.Fail(t, "Didn't switch to other instance after many attempts")
655+
}
656+
cl.Disconnect()
657+
time.Sleep(time.Millisecond * 500)
658+
}
659+
cl.Close()
660+
select {
661+
case <-stopChan:
662+
case <-time.After(time.Second * 20):
663+
assert.Fail(t, "Didn't stop within a timely manner")
664+
}
665+
}
666+
598667
func newTestSeekInfo() *orderer.SeekInfo {
599668
return &orderer.SeekInfo{Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: 5}}},
600669
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},

core/deliverservice/deliveryclient_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,67 @@ func TestDeliverServiceFailover(t *testing.T) {
245245
service.Stop()
246246
}
247247

248+
func TestDeliverServiceServiceUnavailable(t *testing.T) {
249+
defer ensureNoGoroutineLeak(t)()
250+
// Scenario: bring up 2 ordering service instances,
251+
// Make the instance the client connects to fail after a delivery of a block and send SERVICE_UNAVAILABLE
252+
// whenever subsequent seeks are sent to it.
253+
// The client is expected to connect to the other instance, and to ask for a block sequence that is the next block
254+
// after the last block it got from the first ordering service node.
255+
256+
os1 := mocks.NewOrderer(5615, t)
257+
os2 := mocks.NewOrderer(5616, t)
258+
259+
time.Sleep(time.Second)
260+
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)}
261+
262+
service, err := NewDeliverService(&Config{
263+
Endpoints: []string{"localhost:5615", "localhost:5616"},
264+
Gossip: gossipServiceAdapter,
265+
CryptoSvc: &mockMCS{},
266+
ABCFactory: DefaultABCFactory,
267+
ConnFactory: DefaultConnectionFactory,
268+
})
269+
assert.NoError(t, err)
270+
li := &mocks.MockLedgerInfo{Height: 100}
271+
os1.SetNextExpectedSeek(100)
272+
os2.SetNextExpectedSeek(100)
273+
274+
err = service.StartDeliverForChannel("TEST_CHAINID", li)
275+
assert.NoError(t, err, "can't start delivery")
276+
// We need to discover to which instance the client connected to
277+
go os1.SendBlock(100)
278+
// Is it the first instance?
279+
instance2fail := os1
280+
nextBlockSeek := uint64(100)
281+
select {
282+
case seq := <-gossipServiceAdapter.GossipBlockDisseminations:
283+
// Just for sanity check, ensure we got block seq 100
284+
assert.Equal(t, uint64(100), seq)
285+
// Connected to the first instance
286+
// Advance ledger's height by 1
287+
atomic.StoreUint64(&li.Height, 101)
288+
// Backup instance should expect a seek of 101 since we got 100
289+
os2.SetNextExpectedSeek(uint64(101))
290+
nextBlockSeek = uint64(101)
291+
// Have backup instance prepare to send a block
292+
os2.SendBlock(101)
293+
case <-time.After(time.Second * 5):
294+
// We didn't get a block on time, so seems like we're connected to the 2nd instance
295+
// and not to the first.
296+
instance2fail = os2
297+
}
298+
299+
instance2fail.Fail()
300+
// Ensure the client asks blocks from the other ordering service node
301+
assertBlockDissemination(nextBlockSeek, gossipServiceAdapter.GossipBlockDisseminations, t)
302+
303+
// Cleanup
304+
os1.Shutdown()
305+
os2.Shutdown()
306+
service.Stop()
307+
}
308+
248309
func TestDeliverServiceShutdown(t *testing.T) {
249310
defer ensureNoGoroutineLeak(t)()
250311
// Scenario: Launch an ordering service node and let the client pull some blocks.

core/deliverservice/mocks/blocksprovider.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ func (mock *MockGossipServiceAdapter) Gossip(msg *gossip_proto.GossipMessage) {
6969
// MockBlocksDeliverer mocking structure of BlocksDeliverer interface to initialize
7070
// the blocks provider implementation
7171
type MockBlocksDeliverer struct {
72-
Pos uint64
72+
DisconnectCalled chan struct{}
73+
Pos uint64
7374
grpc.ClientStream
7475
RecvCnt int32
7576
MockRecv func(mock *MockBlocksDeliverer) (*orderer.DeliverResponse, error)
@@ -121,6 +122,10 @@ func (mock *MockBlocksDeliverer) Send(env *common.Envelope) error {
121122
return nil
122123
}
123124

125+
func (mock *MockBlocksDeliverer) Disconnect() {
126+
mock.DisconnectCalled <- struct{}{}
127+
}
128+
124129
func (mock *MockBlocksDeliverer) Close() {}
125130

126131
// MockLedgerInfo mocking implementation of LedgerInfo interface, needed

0 commit comments

Comments
 (0)