Skip to content

Commit 2292080

Browse files
committed
[FAB-4263] Improve UT coverage of broadcast
Some minor style fixes in broadcast and more unit tests. Change-Id: Ia6d85e4034d04ebef3ae431fbde9505359953646 Signed-off-by: Jay Guo <[email protected]>
1 parent fa63fb9 commit 2292080

File tree

2 files changed

+175
-22
lines changed

2 files changed

+175
-22
lines changed

orderer/common/broadcast/broadcast.go

+10-21
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,17 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
7878
for {
7979
msg, err := srv.Recv()
8080
if err == io.EOF {
81+
logger.Debugf("Received EOF, hangup")
8182
return nil
8283
}
8384
if err != nil {
85+
logger.Warningf("Error reading from stream: %s", err)
8486
return err
8587
}
8688

87-
payload := &cb.Payload{}
88-
err = proto.Unmarshal(msg.Payload, payload)
89+
payload, err := utils.UnmarshalPayload(msg.Payload)
8990
if err != nil {
90-
if logger.IsEnabledFor(logging.WARNING) {
91-
logger.Warningf("Received malformed message, dropping connection: %s", err)
92-
}
91+
logger.Warningf("Received malformed message, dropping connection: %s", err)
9392
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
9493
}
9594

@@ -100,19 +99,15 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
10099

101100
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
102101
if err != nil {
103-
if logger.IsEnabledFor(logging.WARNING) {
104-
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
105-
}
102+
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
106103
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
107104
}
108105

109106
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
110107
logger.Debugf("Preprocessing CONFIG_UPDATE")
111108
msg, err = bh.sm.Process(msg)
112109
if err != nil {
113-
if logger.IsEnabledFor(logging.WARNING) {
114-
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
115-
}
110+
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
116111
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
117112
}
118113

@@ -136,23 +131,17 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
136131

137132
support, ok := bh.sm.GetChain(chdr.ChannelId)
138133
if !ok {
139-
if logger.IsEnabledFor(logging.WARNING) {
140-
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
141-
}
134+
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
142135
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
143136
}
144137

145-
if logger.IsEnabledFor(logging.DEBUG) {
146-
logger.Debugf("Broadcast is filtering message of type %s for channel %s", cb.HeaderType_name[chdr.Type], chdr.ChannelId)
147-
}
138+
logger.Debugf("Broadcast is filtering message of type %s for channel %s", cb.HeaderType_name[chdr.Type], chdr.ChannelId)
148139

149140
// Normal transaction for existing chain
150141
_, filterErr := support.Filters().Apply(msg)
151142

152143
if filterErr != nil {
153-
if logger.IsEnabledFor(logging.WARNING) {
154-
logger.Warningf("Rejecting broadcast message because of filter error: %s", filterErr)
155-
}
144+
logger.Warningf("Rejecting broadcast message because of filter error: %s", filterErr)
156145
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
157146
}
158147

@@ -166,8 +155,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
166155
}
167156

168157
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
169-
170158
if err != nil {
159+
logger.Warningf("Error sending to stream: %s", err)
171160
return err
172161
}
173162
}

orderer/common/broadcast/broadcast_test.go

+165-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package broadcast
1818

1919
import (
2020
"fmt"
21+
"io"
2122
"testing"
2223
"time"
2324

@@ -58,11 +59,48 @@ func (m *mockB) Send(br *ab.BroadcastResponse) error {
5859
func (m *mockB) Recv() (*cb.Envelope, error) {
5960
msg, ok := <-m.recvChan
6061
if !ok {
61-
return msg, fmt.Errorf("Channel closed")
62+
return msg, io.EOF
6263
}
6364
return msg, nil
6465
}
6566

67+
type erroneousRecvMockB struct {
68+
grpc.ServerStream
69+
}
70+
71+
func (m *erroneousRecvMockB) Send(br *ab.BroadcastResponse) error {
72+
return nil
73+
}
74+
75+
func (m *erroneousRecvMockB) Recv() (*cb.Envelope, error) {
76+
// The point here is to simulate an error other than EOF.
77+
// We don't bother to create a new custom error type.
78+
return nil, io.ErrUnexpectedEOF
79+
}
80+
81+
type erroneousSendMockB struct {
82+
grpc.ServerStream
83+
recvVal *cb.Envelope
84+
}
85+
86+
func (m *erroneousSendMockB) Send(br *ab.BroadcastResponse) error {
87+
// The point here is to simulate an error other than EOF.
88+
// We don't bother to create a new custom error type.
89+
return io.ErrUnexpectedEOF
90+
}
91+
92+
func (m *erroneousSendMockB) Recv() (*cb.Envelope, error) {
93+
return m.recvVal, nil
94+
}
95+
96+
var RejectRule = filter.Rule(rejectRule{})
97+
98+
type rejectRule struct{}
99+
100+
func (r rejectRule) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
101+
return filter.Reject, nil
102+
}
103+
66104
type mockSupportManager struct {
67105
chains map[string]*mockSupport
68106
ProcessVal *cb.Envelope
@@ -244,3 +282,129 @@ func TestBadConfigUpdate(t *testing.T) {
244282
reply := <-m.sendChan
245283
assert.NotEqual(t, cb.Status_SUCCESS, reply.Status, "Should have rejected CONFIG_UPDATE")
246284
}
285+
286+
func TestGracefulShutdown(t *testing.T) {
287+
bh := NewHandlerImpl(nil)
288+
m := newMockB()
289+
close(m.recvChan)
290+
assert.NoError(t, bh.Handle(m), "Should exit normally upon EOF")
291+
}
292+
293+
func TestRejected(t *testing.T) {
294+
filters := filter.NewRuleSet([]filter.Rule{RejectRule})
295+
mm := &mockSupportManager{
296+
chains: map[string]*mockSupport{string(systemChain): {filters: filters}},
297+
}
298+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ChannelId: systemChain})}})}
299+
bh := NewHandlerImpl(mm)
300+
m := newMockB()
301+
defer close(m.recvChan)
302+
go bh.Handle(m)
303+
304+
newChannelId := "New Chain"
305+
306+
m.recvChan <- makeConfigMessage(newChannelId)
307+
reply := <-m.sendChan
308+
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
309+
}
310+
311+
func TestBadStreamRecv(t *testing.T) {
312+
bh := NewHandlerImpl(nil)
313+
assert.Error(t, bh.Handle(&erroneousRecvMockB{}), "Should catch unexpected stream error")
314+
}
315+
316+
func TestBadStreamSend(t *testing.T) {
317+
mm, _ := getMockSupportManager()
318+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ChannelId: systemChain})}})}
319+
bh := NewHandlerImpl(mm)
320+
m := &erroneousSendMockB{recvVal: makeConfigMessage("New Chain")}
321+
assert.Error(t, bh.Handle(m), "Should catch unexpected stream error")
322+
}
323+
324+
func TestMalformedEnvelope(t *testing.T) {
325+
mm, _ := getMockSupportManager()
326+
bh := NewHandlerImpl(mm)
327+
m := newMockB()
328+
defer close(m.recvChan)
329+
go bh.Handle(m)
330+
331+
m.recvChan <- &cb.Envelope{Payload: []byte("foo")}
332+
reply := <-m.sendChan
333+
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected the malformed message")
334+
}
335+
336+
func TestMissingHeader(t *testing.T) {
337+
mm, _ := getMockSupportManager()
338+
bh := NewHandlerImpl(mm)
339+
m := newMockB()
340+
defer close(m.recvChan)
341+
go bh.Handle(m)
342+
343+
m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})}
344+
reply := <-m.sendChan
345+
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected the payload without header")
346+
}
347+
348+
func TestBadChannelHeader(t *testing.T) {
349+
mm, _ := getMockSupportManager()
350+
bh := NewHandlerImpl(mm)
351+
m := newMockB()
352+
defer close(m.recvChan)
353+
go bh.Handle(m)
354+
355+
m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: []byte("foo")}})}
356+
reply := <-m.sendChan
357+
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected bad header")
358+
}
359+
360+
func TestBadPayloadAfterProcessing(t *testing.T) {
361+
mm, _ := getMockSupportManager()
362+
mm.ProcessVal = &cb.Envelope{Payload: []byte("foo")}
363+
bh := NewHandlerImpl(mm)
364+
m := newMockB()
365+
defer close(m.recvChan)
366+
go bh.Handle(m)
367+
368+
m.recvChan <- makeConfigMessage("New Chain")
369+
reply := <-m.sendChan
370+
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
371+
}
372+
373+
func TestNilHeaderAfterProcessing(t *testing.T) {
374+
mm, _ := getMockSupportManager()
375+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})}
376+
bh := NewHandlerImpl(mm)
377+
m := newMockB()
378+
defer close(m.recvChan)
379+
go bh.Handle(m)
380+
381+
m.recvChan <- makeConfigMessage("New Chain")
382+
reply := <-m.sendChan
383+
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
384+
}
385+
386+
func TestBadChannelHeaderAfterProcessing(t *testing.T) {
387+
mm, _ := getMockSupportManager()
388+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: []byte("foo")}})}
389+
bh := NewHandlerImpl(mm)
390+
m := newMockB()
391+
defer close(m.recvChan)
392+
go bh.Handle(m)
393+
394+
m.recvChan <- makeConfigMessage("New Chain")
395+
reply := <-m.sendChan
396+
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
397+
}
398+
399+
func TestEmptyChannelIDAfterProcessing(t *testing.T) {
400+
mm, _ := getMockSupportManager()
401+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{})}})}
402+
bh := NewHandlerImpl(mm)
403+
m := newMockB()
404+
defer close(m.recvChan)
405+
go bh.Handle(m)
406+
407+
m.recvChan <- makeConfigMessage("New Chain")
408+
reply := <-m.sendChan
409+
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
410+
}

0 commit comments

Comments
 (0)