Skip to content

Commit a14912f

Browse files
committed
[FAB-4201] Fix error validation in deliver
This patch fixes several issues in error validation of deliver: 1. EOF is now caught as normal hangup of gRPC connection, hence exit without returning error. 2. Error of inverse seek position was not caught, which results in undesired behavior. Now it returns BAD_REQUEST status. 3. Some errors were not logged. 4. Explicit logging level checks are removed, i.e. `isEnabledFor` This was intended to improve the performance by reducing memory allocations caused by passing in string arguments. However, the performance gain is not significant (~ns) and we prefer cleaner code. On the other hand, common logic in tests are extracted into a reusable function. Change-Id: Ib2ff52cd1d9ef767f0918728084ec31c075cc38a Signed-off-by: Jay Guo <[email protected]>
1 parent fa63fb9 commit a14912f

File tree

2 files changed

+80
-76
lines changed

2 files changed

+80
-76
lines changed

orderer/common/deliver/deliver.go

+30-34
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package deliver
1818

1919
import (
20+
"io"
21+
2022
"github.com/hyperledger/fabric/common/policies"
2123
"github.com/hyperledger/fabric/orderer/common/filter"
2224
"github.com/hyperledger/fabric/orderer/common/sigfilter"
@@ -66,70 +68,60 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
6668
for {
6769
logger.Debugf("Attempting to read seek info message")
6870
envelope, err := srv.Recv()
71+
if err == io.EOF {
72+
logger.Debugf("Received EOF, hangup")
73+
return nil
74+
}
75+
6976
if err != nil {
70-
if logger.IsEnabledFor(logging.WARNING) {
71-
logger.Warningf("Error reading from stream: %s", err)
72-
}
77+
logger.Warningf("Error reading from stream: %s", err)
7378
return err
7479
}
75-
payload := &cb.Payload{}
76-
if err = proto.Unmarshal(envelope.Payload, payload); err != nil {
77-
if logger.IsEnabledFor(logging.WARNING) {
78-
logger.Warningf("Received an envelope with no payload: %s", err)
79-
}
80+
81+
payload, err := utils.UnmarshalPayload(envelope.Payload)
82+
if err != nil {
83+
logger.Warningf("Received an envelope with no payload: %s", err)
8084
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
8185
}
8286

8387
if payload.Header == nil {
84-
if logger.IsEnabledFor(logging.WARNING) {
85-
logger.Warningf("Malformed envelope received with bad header")
86-
}
88+
logger.Warningf("Malformed envelope received with bad header")
8789
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
8890
}
8991

9092
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
9193
if err != nil {
92-
logger.Error(err)
93-
return err
94+
logger.Warningf("Failed to unmarshal channel header: %s", err)
95+
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
9496
}
9597

9698
chain, ok := ds.sm.GetChain(chdr.ChannelId)
9799
if !ok {
98100
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
99101
// So we would expect our log to be somewhat flooded with these
100-
if logger.IsEnabledFor(logging.DEBUG) {
101-
logger.Debugf("Client request for channel %s not found", chdr.ChannelId)
102-
}
102+
logger.Debugf("Client request for channel %s not found", chdr.ChannelId)
103103
return sendStatusReply(srv, cb.Status_NOT_FOUND)
104104
}
105105

106106
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
107107
result, _ := sf.Apply(envelope)
108108
if result != filter.Forward {
109-
if logger.IsEnabledFor(logging.WARNING) {
110-
logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId)
111-
}
109+
logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId)
112110
return sendStatusReply(srv, cb.Status_FORBIDDEN)
113111
}
114112

115113
seekInfo := &ab.SeekInfo{}
116114
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
117-
if logger.IsEnabledFor(logging.WARNING) {
118-
logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err)
119-
}
115+
logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err)
120116
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
121117
}
122118

123119
if seekInfo.Start == nil || seekInfo.Stop == nil {
124-
if logger.IsEnabledFor(logging.WARNING) {
125-
logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop)
126-
}
120+
logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop)
127121
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
128122
}
129123

130-
if logger.IsEnabledFor(logging.DEBUG) {
131-
logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId)
132-
}
124+
logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId)
133125

134126
cursor, number := chain.Reader().Iterator(seekInfo.Start)
135127
var stopNum uint64
@@ -140,6 +132,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
140132
stopNum = chain.Reader().Height() - 1
141133
case *ab.SeekPosition_Specified:
142134
stopNum = stop.Specified.Number
135+
if stopNum < number {
136+
logger.Warningf("Received invalid seekInfo message where start number %d is greater than stop number %d", number, stopNum)
137+
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
138+
}
143139
}
144140

145141
for {
@@ -159,10 +155,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
159155
return sendStatusReply(srv, status)
160156
}
161157

162-
if logger.IsEnabledFor(logging.DEBUG) {
163-
logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId)
164-
}
158+
logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId)
159+
165160
if err := sendBlockReply(srv, block); err != nil {
161+
logger.Warningf("Error sending to stream: %s", err)
166162
return err
167163
}
168164

@@ -172,11 +168,11 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
172168
}
173169

174170
if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
171+
logger.Warningf("Error sending to stream: %s", err)
175172
return err
176173
}
177-
if logger.IsEnabledFor(logging.DEBUG) {
178-
logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo)
179-
}
174+
175+
logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo)
180176
}
181177
}
182178

orderer/common/deliver/deliver_test.go

+50-42
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package deliver
1818

1919
import (
2020
"fmt"
21+
"io"
2122
"testing"
2223
"time"
2324

@@ -30,6 +31,7 @@ import (
3031
ab "github.com/hyperledger/fabric/protos/orderer"
3132
"github.com/hyperledger/fabric/protos/utils"
3233
logging "github.com/op/go-logging"
34+
"github.com/stretchr/testify/assert"
3335
"google.golang.org/grpc"
3436
)
3537

@@ -64,7 +66,7 @@ func (m *mockD) Send(br *ab.DeliverResponse) error {
6466
func (m *mockD) Recv() (*cb.Envelope, error) {
6567
msg, ok := <-m.recvChan
6668
if !ok {
67-
return msg, fmt.Errorf("Channel closed")
69+
return msg, io.EOF
6870
}
6971
return msg, nil
7072
}
@@ -98,6 +100,16 @@ func NewRAMLedger() ledger.ReadWriter {
98100
return rl
99101
}
100102

103+
func initializeDeliverHandler() Handler {
104+
mm := newMockMultichainManager()
105+
for i := 1; i < ledgerSize; i++ {
106+
l := mm.chains[systemChainID].ledger
107+
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
108+
}
109+
110+
return NewHandlerImpl(mm)
111+
}
112+
101113
func newMockMultichainManager() *mockSupportManager {
102114
rl := NewRAMLedger()
103115
mm := &mockSupportManager{
@@ -132,16 +144,10 @@ func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope {
132144
}
133145

134146
func TestOldestSeek(t *testing.T) {
135-
mm := newMockMultichainManager()
136-
for i := 1; i < ledgerSize; i++ {
137-
l := mm.chains[systemChainID].ledger
138-
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
139-
}
140-
141147
m := newMockD()
142148
defer close(m.recvChan)
143-
ds := NewHandlerImpl(mm)
144149

150+
ds := initializeDeliverHandler()
145151
go ds.Handle(m)
146152

147153
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
@@ -170,27 +176,18 @@ func TestOldestSeek(t *testing.T) {
170176
}
171177

172178
func TestNewestSeek(t *testing.T) {
173-
mm := newMockMultichainManager()
174-
for i := 1; i < ledgerSize; i++ {
175-
l := mm.chains[systemChainID].ledger
176-
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
177-
}
178-
179179
m := newMockD()
180180
defer close(m.recvChan)
181-
ds := NewHandlerImpl(mm)
182181

182+
ds := initializeDeliverHandler()
183183
go ds.Handle(m)
184184

185185
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
186186

187187
select {
188188
case deliverReply := <-m.sendChan:
189189
if deliverReply.GetBlock() == nil {
190-
if deliverReply.GetStatus() != cb.Status_SUCCESS {
191-
t.Fatalf("Received an error on the reply channel")
192-
}
193-
return
190+
t.Fatalf("Received an error on the reply channel")
194191
}
195192
if deliverReply.GetBlock().Header.Number != uint64(ledgerSize-1) {
196193
t.Fatalf("Expected only the most recent block")
@@ -201,20 +198,14 @@ func TestNewestSeek(t *testing.T) {
201198
}
202199

203200
func TestSpecificSeek(t *testing.T) {
204-
mm := newMockMultichainManager()
205-
for i := 1; i < ledgerSize; i++ {
206-
l := mm.chains[systemChainID].ledger
207-
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
208-
}
209-
210201
m := newMockD()
211202
defer close(m.recvChan)
212-
ds := NewHandlerImpl(mm)
213-
specifiedStart := uint64(3)
214-
specifiedStop := uint64(7)
215203

204+
ds := initializeDeliverHandler()
216205
go ds.Handle(m)
217206

207+
specifiedStart := uint64(3)
208+
specifiedStop := uint64(7)
218209
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
219210

220211
count := uint64(0)
@@ -264,16 +255,10 @@ func TestUnauthorizedSeek(t *testing.T) {
264255
}
265256

266257
func TestBadSeek(t *testing.T) {
267-
mm := newMockMultichainManager()
268-
for i := 1; i < ledgerSize; i++ {
269-
l := mm.chains[systemChainID].ledger
270-
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
271-
}
272-
273258
m := newMockD()
274259
defer close(m.recvChan)
275-
ds := NewHandlerImpl(mm)
276260

261+
ds := initializeDeliverHandler()
277262
go ds.Handle(m)
278263

279264
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
@@ -289,16 +274,10 @@ func TestBadSeek(t *testing.T) {
289274
}
290275

291276
func TestFailFastSeek(t *testing.T) {
292-
mm := newMockMultichainManager()
293-
for i := 1; i < ledgerSize; i++ {
294-
l := mm.chains[systemChainID].ledger
295-
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
296-
}
297-
298277
m := newMockD()
299278
defer close(m.recvChan)
300-
ds := NewHandlerImpl(mm)
301279

280+
ds := initializeDeliverHandler()
302281
go ds.Handle(m)
303282

304283
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY})
@@ -373,3 +352,32 @@ func TestBlockingSeek(t *testing.T) {
373352
t.Fatalf("Timed out waiting to get all blocks")
374353
}
375354
}
355+
356+
func TestSGracefulShutdown(t *testing.T) {
357+
m := newMockD()
358+
ds := NewHandlerImpl(nil)
359+
360+
close(m.recvChan)
361+
assert.NoError(t, ds.Handle(m), "Expected no error for hangup")
362+
}
363+
364+
func TestReversedSeqSeek(t *testing.T) {
365+
m := newMockD()
366+
defer close(m.recvChan)
367+
368+
ds := initializeDeliverHandler()
369+
go ds.Handle(m)
370+
371+
specifiedStart := uint64(7)
372+
specifiedStop := uint64(3)
373+
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
374+
375+
select {
376+
case deliverReply := <-m.sendChan:
377+
if deliverReply.GetStatus() != cb.Status_BAD_REQUEST {
378+
t.Fatalf("Received wrong error on the reply channel")
379+
}
380+
case <-time.After(time.Second):
381+
t.Fatalf("Timed out waiting to get all blocks")
382+
}
383+
}

0 commit comments

Comments
 (0)