Skip to content

Commit 2d24b31

Browse files
author
Jason Yellick
committed
[FAB-1298] Remove queueing from broadcast
https://jira.hyperledger.org/browse/FAB-1298 Per some testing from Bishop Brock, the underlying HTTP2 stream for gRPC seems to handle windowing on its own gracefully and likely more efficiently (and certainly more simply) than we are able to do it at a higher layer. This changeset removes the queueing concept from Broadcast, and pushes the flow control back into the HTTP2/gRPC layer. This simplifies the code significantly, and appears to give us a performance improvement, so this seems like a win across the board. Change-Id: I4f519a188f0138d5b6fa8abf2432bf5e4a59342d Signed-off-by: Jason Yellick <[email protected]>
1 parent f9b68d4 commit 2d24b31

File tree

3 files changed

+78
-143
lines changed

3 files changed

+78
-143
lines changed

orderer/common/broadcast/broadcast.go

+26-68
Original file line numberDiff line numberDiff line change
@@ -58,65 +58,19 @@ type Support interface {
5858
}
5959

6060
type handlerImpl struct {
61-
queueSize int
62-
sm SupportManager
61+
sm SupportManager
6362
}
6463

6564
// NewHandlerImpl constructs a new implementation of the Handler interface
66-
func NewHandlerImpl(sm SupportManager, queueSize int) Handler {
65+
func NewHandlerImpl(sm SupportManager) Handler {
6766
return &handlerImpl{
68-
queueSize: queueSize,
69-
sm: sm,
67+
sm: sm,
7068
}
7169
}
7270

7371
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
7472
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
75-
b := newBroadcaster(bh)
76-
defer close(b.queue)
77-
go b.drainQueue()
78-
return b.queueEnvelopes(srv)
79-
}
80-
81-
type msgAndSupport struct {
82-
msg *cb.Envelope
83-
support Support
84-
}
85-
86-
type broadcaster struct {
87-
bs *handlerImpl
88-
queue chan *msgAndSupport
89-
exitChan chan struct{}
90-
}
91-
92-
func newBroadcaster(bs *handlerImpl) *broadcaster {
93-
b := &broadcaster{
94-
bs: bs,
95-
queue: make(chan *msgAndSupport, bs.queueSize),
96-
exitChan: make(chan struct{}),
97-
}
98-
return b
99-
}
100-
101-
func (b *broadcaster) drainQueue() {
102-
defer close(b.exitChan)
103-
for msgAndSupport := range b.queue {
104-
if !msgAndSupport.support.Enqueue(msgAndSupport.msg) {
105-
logger.Debugf("Consenter instructed us to shut down")
106-
return
107-
}
108-
}
109-
logger.Debugf("Exiting because the queue channel closed")
110-
}
111-
112-
func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error {
113-
11473
for {
115-
select {
116-
case <-b.exitChan:
117-
return nil
118-
default:
119-
}
12074
msg, err := srv.Recv()
12175
if err != nil {
12276
return err
@@ -129,32 +83,36 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
12983
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
13084
}
13185

132-
support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID)
86+
support, ok := bh.sm.GetChain(payload.Header.ChainHeader.ChainID)
13387
if !ok {
13488
// Chain not found, maybe create one?
13589
if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
136-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
137-
} else {
138-
logger.Debugf("Proposing new chain")
139-
err = srv.Send(&ab.BroadcastResponse{Status: b.bs.sm.ProposeChain(msg)})
90+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
14091
}
141-
} else {
142-
// Normal transaction for existing chain
143-
_, filterErr := support.Filters().Apply(msg)
144-
145-
if filterErr != nil {
146-
logger.Debugf("Rejecting broadcast message")
147-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
148-
} else {
149-
select {
150-
case b.queue <- &msgAndSupport{msg: msg, support: support}:
151-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
152-
default:
153-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
154-
}
92+
93+
logger.Debugf("Proposing new chain")
94+
err = srv.Send(&ab.BroadcastResponse{Status: bh.sm.ProposeChain(msg)})
95+
if err != nil {
96+
return err
15597
}
98+
continue
99+
}
100+
101+
// Normal transaction for existing chain
102+
_, filterErr := support.Filters().Apply(msg)
103+
104+
if filterErr != nil {
105+
logger.Debugf("Rejecting broadcast message")
106+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
107+
}
108+
109+
if !support.Enqueue(msg) {
110+
logger.Debugf("Consenter instructed us to shut down")
111+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
156112
}
157113

114+
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
115+
158116
if err != nil {
159117
return err
160118
}

orderer/common/broadcast/broadcast_test.go

+51-74
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package broadcast
1919
import (
2020
"fmt"
2121
"testing"
22+
"time"
2223

2324
"github.com/hyperledger/fabric/orderer/common/filter"
2425
cb "github.com/hyperledger/fabric/protos/common"
@@ -73,21 +74,13 @@ func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status {
7374
filter.EmptyRejectRule,
7475
filter.AcceptRule,
7576
}),
76-
queue: make(chan *cb.Envelope),
7777
}
7878
return cb.Status_SUCCESS
7979
}
8080

81-
func (mm *mockSupportManager) halt() {
82-
for _, chain := range mm.chains {
83-
chain.halt()
84-
}
85-
}
86-
8781
type mockSupport struct {
88-
filters *filter.RuleSet
89-
queue chan *cb.Envelope
90-
done bool
82+
filters *filter.RuleSet
83+
rejectEnqueue bool
9184
}
9285

9386
func (ms *mockSupport) Filters() *filter.RuleSet {
@@ -96,16 +89,7 @@ func (ms *mockSupport) Filters() *filter.RuleSet {
9689

9790
// Enqueue sends a message for ordering
9891
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
99-
ms.queue <- env
100-
return !ms.done
101-
}
102-
103-
func (ms *mockSupport) halt() {
104-
ms.done = true
105-
select {
106-
case <-ms.queue:
107-
default:
108-
}
92+
return !ms.rejectEnqueue
10993
}
11094

11195
func makeConfigMessage(chainID string) *cb.Envelope {
@@ -137,29 +121,31 @@ func makeMessage(chainID string, data []byte) *cb.Envelope {
137121
}
138122
}
139123

140-
func getMultichainManager() *mockSupportManager {
124+
func getMockSupportManager() (*mockSupportManager, *mockSupport) {
141125
filters := filter.NewRuleSet([]filter.Rule{
142126
filter.EmptyRejectRule,
143127
filter.AcceptRule,
144128
})
145129
mm := &mockSupportManager{
146130
chains: make(map[string]*mockSupport),
147131
}
148-
mm.chains[string(systemChain)] = &mockSupport{
132+
mSysChain := &mockSupport{
149133
filters: filters,
150-
queue: make(chan *cb.Envelope),
151134
}
152-
return mm
135+
mm.chains[string(systemChain)] = mSysChain
136+
return mm, mSysChain
153137
}
154138

155-
func TestQueueOverflow(t *testing.T) {
156-
mm := getMultichainManager()
157-
defer mm.halt()
158-
bh := NewHandlerImpl(mm, 2)
139+
func TestEnqueueFailure(t *testing.T) {
140+
mm, mSysChain := getMockSupportManager()
141+
bh := NewHandlerImpl(mm)
159142
m := newMockB()
160143
defer close(m.recvChan)
161-
b := newBroadcaster(bh.(*handlerImpl))
162-
go b.queueEnvelopes(m)
144+
done := make(chan struct{})
145+
go func() {
146+
bh.Handle(m)
147+
close(done)
148+
}()
163149

164150
for i := 0; i < 2; i++ {
165151
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
@@ -169,86 +155,77 @@ func TestQueueOverflow(t *testing.T) {
169155
}
170156
}
171157

158+
mSysChain.rejectEnqueue = true
172159
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
173160
reply := <-m.sendChan
174161
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
175162
t.Fatalf("Should not have successfully queued the message")
176163
}
177164

178-
}
179-
180-
func TestMultiQueueOverflow(t *testing.T) {
181-
mm := getMultichainManager()
182-
defer mm.halt()
183-
bh := NewHandlerImpl(mm, 2)
184-
ms := []*mockB{newMockB(), newMockB(), newMockB()}
185-
186-
for _, m := range ms {
187-
defer close(m.recvChan)
188-
b := newBroadcaster(bh.(*handlerImpl))
189-
go b.queueEnvelopes(m)
190-
}
191-
192-
for _, m := range ms {
193-
for i := 0; i < 2; i++ {
194-
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
195-
reply := <-m.sendChan
196-
if reply.Status != cb.Status_SUCCESS {
197-
t.Fatalf("Should have successfully queued the message")
198-
}
199-
}
200-
}
201-
202-
for _, m := range ms {
203-
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
204-
reply := <-m.sendChan
205-
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
206-
t.Fatalf("Should not have successfully queued the message")
207-
}
165+
select {
166+
case <-done:
167+
case <-time.After(time.Second):
168+
t.Fatalf("Should have terminated the stream")
208169
}
209170
}
210171

211172
func TestEmptyEnvelope(t *testing.T) {
212-
mm := getMultichainManager()
213-
defer mm.halt()
214-
bh := NewHandlerImpl(mm, 2)
173+
mm, _ := getMockSupportManager()
174+
bh := NewHandlerImpl(mm)
215175
m := newMockB()
216176
defer close(m.recvChan)
217-
go bh.Handle(m)
177+
done := make(chan struct{})
178+
go func() {
179+
bh.Handle(m)
180+
close(done)
181+
}()
218182

219183
m.recvChan <- &cb.Envelope{}
220184
reply := <-m.sendChan
221185
if reply.Status != cb.Status_BAD_REQUEST {
222186
t.Fatalf("Should have rejected the null message")
223187
}
224188

189+
select {
190+
case <-done:
191+
case <-time.After(time.Second):
192+
t.Fatalf("Should have terminated the stream")
193+
}
225194
}
226195

227196
func TestBadChainID(t *testing.T) {
228-
mm := getMultichainManager()
229-
defer mm.halt()
230-
bh := NewHandlerImpl(mm, 2)
197+
mm, _ := getMockSupportManager()
198+
bh := NewHandlerImpl(mm)
231199
m := newMockB()
232200
defer close(m.recvChan)
233-
go bh.Handle(m)
201+
done := make(chan struct{})
202+
go func() {
203+
bh.Handle(m)
204+
close(done)
205+
}()
234206

235207
m.recvChan <- makeMessage("Wrong chain", []byte("Some bytes"))
236208
reply := <-m.sendChan
237209
if reply.Status != cb.Status_NOT_FOUND {
238210
t.Fatalf("Should have rejected message to a chain which does not exist")
239211
}
240212

213+
select {
214+
case <-done:
215+
case <-time.After(time.Second):
216+
t.Fatalf("Should have terminated the stream")
217+
}
241218
}
242219

243220
func TestNewChainID(t *testing.T) {
244-
mm := getMultichainManager()
245-
defer mm.halt()
246-
bh := NewHandlerImpl(mm, 2)
221+
mm, _ := getMockSupportManager()
222+
bh := NewHandlerImpl(mm)
247223
m := newMockB()
248224
defer close(m.recvChan)
249225
go bh.Handle(m)
226+
newChainID := "New Chain"
250227

251-
m.recvChan <- makeConfigMessage("New chain")
228+
m.recvChan <- makeConfigMessage(newChainID)
252229
reply := <-m.sendChan
253230
if reply.Status != cb.Status_SUCCESS {
254231
t.Fatalf("Should have created a new chain, got %d", reply.Status)
@@ -258,9 +235,9 @@ func TestNewChainID(t *testing.T) {
258235
t.Fatalf("Should have created a new chain")
259236
}
260237

261-
m.recvChan <- makeMessage("New chain", []byte("Some bytes"))
238+
m.recvChan <- makeMessage(newChainID, []byte("Some bytes"))
262239
reply = <-m.sendChan
263240
if reply.Status != cb.Status_SUCCESS {
264-
t.Fatalf("Should have successfully sent message to new chain")
241+
t.Fatalf("Should have successfully sent message to new chain, got %v", reply)
265242
}
266243
}

orderer/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func NewServer(ml multichain.Manager, queueSize, maxWindowSize int) ab.AtomicBro
4949
logger.Infof("Starting orderer")
5050

5151
s := &server{
52-
bh: broadcast.NewHandlerImpl(broadcastSupport{ml}, queueSize),
5352
dh: deliver.NewHandlerImpl(deliverSupport{ml}),
53+
bh: broadcast.NewHandlerImpl(broadcastSupport{ml}),
5454
}
5555
return s
5656
}

0 commit comments

Comments
 (0)