Skip to content

Commit 1785d26

Browse files
gennadylaventmanyacovm
authored andcommitted
[FAB-4559] Handling Deliver errors
Currently peer ignores Deliver errors other than SERVICE_UNAVAILABLE This change add handle to rest of errors in same way as SERVICE_UNAVAILABLE status. For BAD_REQUEST and FORBIDDEN Critical log level msg used and extra retry mechanism added - more that 10 msgs ot the kind in row will cause DeliverBlocks to exit. Change-Id: I47daeca71c169e2fa484a94402b559e2b1981490 Signed-off-by: Gennady Laventman <[email protected]>
1 parent 0a72230 commit 1785d26

File tree

4 files changed

+154
-18
lines changed

4 files changed

+154
-18
lines changed

core/deliverservice/blocksprovider/blocksprovider.go

+34-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ limitations under the License.
1717
package blocksprovider
1818

1919
import (
20+
"math"
21+
"time"
22+
2023
"sync/atomic"
2124

2225
"github.com/golang/protobuf/proto"
@@ -95,8 +98,14 @@ type blocksProviderImpl struct {
9598
mcs api.MessageCryptoService
9699

97100
done int32
101+
102+
wrongStatusThreshold int
98103
}
99104

105+
const wrongStatusThreshold = 10
106+
107+
var maxRetryDelay = time.Second * 10
108+
100109
var logger *logging.Logger // package-level logger
101110

102111
func init() {
@@ -106,16 +115,19 @@ func init() {
106115
// NewBlocksProvider constructor function to create blocks deliverer instance
107116
func NewBlocksProvider(chainID string, client streamClient, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider {
108117
return &blocksProviderImpl{
109-
chainID: chainID,
110-
client: client,
111-
gossip: gossip,
112-
mcs: mcs,
118+
chainID: chainID,
119+
client: client,
120+
gossip: gossip,
121+
mcs: mcs,
122+
wrongStatusThreshold: wrongStatusThreshold,
113123
}
114124
}
115125

116126
// DeliverBlocks used to pull out blocks from the ordering service to
117127
// distributed them across peers
118128
func (b *blocksProviderImpl) DeliverBlocks() {
129+
errorStatusCounter := 0
130+
statusCounter := 0
119131
defer b.client.Close()
120132
for !b.isDone() {
121133
msg, err := b.client.Recv()
@@ -129,12 +141,26 @@ func (b *blocksProviderImpl) DeliverBlocks() {
129141
logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
130142
return
131143
}
132-
logger.Warningf("[%s] Got error %v", b.chainID, t)
133-
if t.Status == common.Status_SERVICE_UNAVAILABLE {
134-
b.client.Disconnect()
135-
continue
144+
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
145+
logger.Errorf("[%s] Got error %v", b.chainID, t)
146+
errorStatusCounter++
147+
if errorStatusCounter > b.wrongStatusThreshold {
148+
logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
149+
return
150+
}
151+
} else {
152+
errorStatusCounter = 0
153+
logger.Warningf("[%s] Got error %v", b.chainID, t)
136154
}
155+
maxDelay := float64(maxRetryDelay)
156+
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
157+
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
158+
statusCounter++
159+
b.client.Disconnect()
160+
continue
137161
case *orderer.DeliverResponse_Block:
162+
errorStatusCounter = 0
163+
statusCounter = 0
138164
seqNum := t.Block.Header.Number
139165

140166
marshaledBlock, err := proto.Marshal(t.Block)

core/deliverservice/blocksprovider/blocksprovider_test.go

+112-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ import (
3131
"github.com/stretchr/testify/mock"
3232
)
3333

34+
func init() {
35+
maxRetryDelay = time.Second
36+
}
37+
3438
type mockMCS struct {
3539
mock.Mock
3640
}
@@ -103,6 +107,19 @@ func assertDelivery(t *testing.T, ga *mocks.MockGossipServiceAdapter, deliverer
103107
}
104108
}
105109

110+
func waitUntilOrFail(t *testing.T, pred func() bool) {
111+
timeout := time.Second * 30
112+
start := time.Now()
113+
limit := start.UnixNano() + timeout.Nanoseconds()
114+
for time.Now().UnixNano() < limit {
115+
if pred() {
116+
return
117+
}
118+
time.Sleep(timeout / 60)
119+
}
120+
assert.Fail(t, "Timeout expired!")
121+
}
122+
106123
/*
107124
Test to check whenever blocks provider starts calling new blocks from the
108125
oldest and that eventually it terminates after the Stop method has been called.
@@ -179,7 +196,7 @@ func TestBlocksProvider_CheckTerminationDeliveryResponseStatus(t *testing.T) {
179196
}
180197
}
181198

182-
func TestBlocksProvider_DeliveryServiceUnavailable(t *testing.T) {
199+
func TestBlocksProvider_DeliveryWrongStatus(t *testing.T) {
183200
sendBlock := func(seqNum uint64) *orderer.DeliverResponse {
184201
return &orderer.DeliverResponse{
185202
Type: &orderer.DeliverResponse_Block{
@@ -203,15 +220,16 @@ func TestBlocksProvider_DeliveryServiceUnavailable(t *testing.T) {
203220
}
204221
}
205222

206-
bd := mocks.MockBlocksDeliverer{DisconnectCalled: make(chan struct{}, 1)}
223+
bd := mocks.MockBlocksDeliverer{DisconnectCalled: make(chan struct{}, 10)}
207224
mcs := &mockMCS{}
208225
mcs.On("VerifyBlock", mock.Anything).Return(nil)
209226
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64, 2)}
210227
provider := &blocksProviderImpl{
211-
chainID: "***TEST_CHAINID***",
212-
gossip: gossipServiceAdapter,
213-
client: &bd,
214-
mcs: mcs,
228+
chainID: "***TEST_CHAINID***",
229+
gossip: gossipServiceAdapter,
230+
client: &bd,
231+
mcs: mcs,
232+
wrongStatusThreshold: wrongStatusThreshold,
215233
}
216234

217235
attempts := int32(0)
@@ -223,6 +241,14 @@ func TestBlocksProvider_DeliveryServiceUnavailable(t *testing.T) {
223241
case int32(2):
224242
return sendStatus(common.Status_SERVICE_UNAVAILABLE), nil
225243
case int32(3):
244+
return sendStatus(common.Status_BAD_REQUEST), nil
245+
case int32(4):
246+
return sendStatus(common.Status_FORBIDDEN), nil
247+
case int32(5):
248+
return sendStatus(common.Status_NOT_FOUND), nil
249+
case int32(6):
250+
return sendStatus(common.Status_INTERNAL_SERVER_ERROR), nil
251+
case int32(7):
226252
return sendBlock(1), nil
227253
default:
228254
provider.Stop()
@@ -236,12 +262,90 @@ func TestBlocksProvider_DeliveryServiceUnavailable(t *testing.T) {
236262
select {
237263
case seq := <-gossipServiceAdapter.GossipBlockDisseminations:
238264
assert.Equal(t, uint64(i), seq)
239-
case <-time.After(time.Second * 3):
265+
case <-time.After(time.Second * 10):
240266
assert.Fail(t, "Didn't receive a block within a timely manner")
241267
}
242268
}
243269
// Make sure disconnect was called in between the deliveries
244-
assert.Len(t, bd.DisconnectCalled, 1)
270+
assert.Len(t, bd.DisconnectCalled, 5)
271+
}
272+
273+
func TestBlocksProvider_DeliveryWrongStatusClose(t *testing.T) {
274+
// Test emulate receive of wrong statuses from orderer
275+
// Once test get sequence of wrongStatusThreshold (5) FORBIDDEN or BAD_REQUEST statuses,
276+
// it stop blocks deliver go routine
277+
// At start is sends all 5 statuses and check is each one of them caused disconnect and reconnect
278+
// but blocks deliver still running
279+
// At next step it sends 2 FORBIDDEN or BAD_REQUEST statuses, followed by SERVICE_UNAVAILABLE
280+
// and 4 more FORBIDDEN or BAD_REQUEST statuses. It checks if enough disconnects called, but
281+
// blocks deliver still running
282+
// At the end, it send 2 FORBIDDEN or BAD_REQUEST statuses and check is blocks deliver stopped
283+
284+
sendStatus := func(status common.Status) *orderer.DeliverResponse {
285+
return &orderer.DeliverResponse{
286+
Type: &orderer.DeliverResponse_Status{
287+
Status: status,
288+
},
289+
}
290+
}
291+
292+
bd := mocks.MockBlocksDeliverer{DisconnectCalled: make(chan struct{}, 100), CloseCalled: make(chan struct{}, 1)}
293+
mcs := &mockMCS{}
294+
mcs.On("VerifyBlock", mock.Anything).Return(nil)
295+
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64, 2)}
296+
provider := &blocksProviderImpl{
297+
chainID: "***TEST_CHAINID***",
298+
gossip: gossipServiceAdapter,
299+
client: &bd,
300+
mcs: mcs,
301+
wrongStatusThreshold: 5,
302+
}
303+
304+
incomingMsgs := make(chan *orderer.DeliverResponse)
305+
306+
bd.MockRecv = func(mock *mocks.MockBlocksDeliverer) (*orderer.DeliverResponse, error) {
307+
inMsg := <-incomingMsgs
308+
return inMsg, nil
309+
}
310+
311+
go provider.DeliverBlocks()
312+
313+
incomingMsgs <- sendStatus(common.Status_SERVICE_UNAVAILABLE)
314+
incomingMsgs <- sendStatus(common.Status_BAD_REQUEST)
315+
incomingMsgs <- sendStatus(common.Status_FORBIDDEN)
316+
incomingMsgs <- sendStatus(common.Status_NOT_FOUND)
317+
incomingMsgs <- sendStatus(common.Status_INTERNAL_SERVER_ERROR)
318+
319+
waitUntilOrFail(t, func() bool {
320+
return len(bd.DisconnectCalled) == 5
321+
})
322+
323+
waitUntilOrFail(t, func() bool {
324+
return len(bd.CloseCalled) == 0
325+
})
326+
327+
incomingMsgs <- sendStatus(common.Status_FORBIDDEN)
328+
incomingMsgs <- sendStatus(common.Status_BAD_REQUEST)
329+
incomingMsgs <- sendStatus(common.Status_SERVICE_UNAVAILABLE)
330+
incomingMsgs <- sendStatus(common.Status_FORBIDDEN)
331+
incomingMsgs <- sendStatus(common.Status_BAD_REQUEST)
332+
incomingMsgs <- sendStatus(common.Status_FORBIDDEN)
333+
incomingMsgs <- sendStatus(common.Status_BAD_REQUEST)
334+
335+
waitUntilOrFail(t, func() bool {
336+
return len(bd.DisconnectCalled) == 12
337+
})
338+
339+
waitUntilOrFail(t, func() bool {
340+
return len(bd.CloseCalled) == 0
341+
})
342+
343+
incomingMsgs <- sendStatus(common.Status_BAD_REQUEST)
344+
incomingMsgs <- sendStatus(common.Status_FORBIDDEN)
345+
346+
waitUntilOrFail(t, func() bool {
347+
return len(bd.CloseCalled) == 1
348+
})
245349
}
246350

247351
func TestBlockFetchFailure(t *testing.T) {

core/deliverservice/mocks/blocksprovider.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (mock *MockGossipServiceAdapter) Gossip(msg *gossip_proto.GossipMessage) {
7070
// the blocks provider implementation
7171
type MockBlocksDeliverer struct {
7272
DisconnectCalled chan struct{}
73+
CloseCalled chan struct{}
7374
Pos uint64
7475
grpc.ClientStream
7576
RecvCnt int32
@@ -126,7 +127,12 @@ func (mock *MockBlocksDeliverer) Disconnect() {
126127
mock.DisconnectCalled <- struct{}{}
127128
}
128129

129-
func (mock *MockBlocksDeliverer) Close() {}
130+
func (mock *MockBlocksDeliverer) Close() {
131+
if mock.CloseCalled == nil {
132+
return
133+
}
134+
mock.CloseCalled <- struct{}{}
135+
}
130136

131137
// MockLedgerInfo mocking implementation of LedgerInfo interface, needed
132138
// for test initialization purposes

core/deliverservice/mocks/blocksprovider_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package mocks
1818

1919
import (
20-
"context"
2120
"math"
2221
"sync/atomic"
2322
"testing"
@@ -29,6 +28,7 @@ import (
2928
proto "github.com/hyperledger/fabric/protos/gossip"
3029
"github.com/hyperledger/fabric/protos/orderer"
3130
"github.com/stretchr/testify/assert"
31+
"golang.org/x/net/context"
3232
)
3333

3434
func TestMockBlocksDeliverer(t *testing.T) {

0 commit comments

Comments
 (0)