Skip to content

Commit ae9f2f2

Browse files
author
Jason Yellick
committed
[FAB-816] Clean multichain integration interfaces
The multichain integration in https://gerrit.hyperledger.org/r/#/c/2721/ hooked the multi-chain manager into the default solo path. This was attempted in a way which kept the diff minimal, but correspondingly produced some unpleasant artifacts in the code. In particular, the integration used one catch-all interface in multichain.Manager to supply support to the various components, this meant that components were forced to drag in unnecessary imports and it complicated mock testing with many panic-ing unimplemented functions. This changeset breaks this interface into pieces, and pushes the definition of the interface back into the components which depend on the definition. Also included in this changeset is substantial cleanup of the solo/consenter_test.go file. This set of tests was originally written in a way which depended on the end to end flow of the system, but as the common components have been factored out, and as the solo 'consensus' has been reduced to its simplest component, more targetted tests are now needed. Also included are some assorted linting fixes. Change-Id: I7c5e20cd7b8c66eb51cc56ad539177ce81cbcbfc Signed-off-by: Jason Yellick <[email protected]>
1 parent da16559 commit ae9f2f2

File tree

18 files changed

+399
-357
lines changed

18 files changed

+399
-357
lines changed

bddtests/docker-compose-with-orderer.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ services:
1212
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
1313
- ORDERER_GENERAL_ORDERERTYPE=solo
1414
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
15-
- ORDERER_GENERAL_LISTENPORT=5005
15+
- ORDERER_GENERAL_LISTENPORT=7050
1616
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
1717
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
1818
command: orderer
@@ -29,7 +29,7 @@ services:
2929
- CORE_NEXT=true
3030
- CORE_PEER_ENDORSER_ENABLED=true
3131
- CORE_PEER_COMMITTER_ENABLED=true
32-
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005
32+
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:7050
3333
volumes:
3434
- /var/run/:/host/var/run/
3535
networks:
@@ -67,7 +67,7 @@ services:
6767
environment:
6868
- CORE_PEER_ID=vp2
6969
- CORE_PEER_PROFILE_ENABLED=true
70-
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005
70+
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:7050
7171
- CORE_PEER_GOSSIP_BOOTSTRAP=vp0:10000
7272
- CORE_PEER_GOSSIP_ORGLEADER=false
7373
command: peer node start

bddtests/steps/orderer_util.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ def getABStubForComposeService(self, context, composeService):
188188
self.atomicBroadcastStubsDict[composeService] = newABStub
189189
return newABStub
190190

191+
# The default chain ID when the system is statically bootstrapped for testing
192+
TEST_CHAIN_ID = "**TEST_CHAINID**".encode()
191193

192194
# Registerses a user on a specific composeService
193195
def registerUser(context, secretMsg, composeService):
@@ -217,7 +219,7 @@ def getUserRegistration(context, enrollId):
217219
def createDeliverUpdateMsg(Start, SpecifiedNumber, WindowSize):
218220
seek = ab_pb2.SeekInfo()
219221
startVal = seek.__getattribute__(Start)
220-
seekInfo = ab_pb2.SeekInfo(Start = startVal, SpecifiedNumber = SpecifiedNumber, WindowSize = WindowSize)
222+
seekInfo = ab_pb2.SeekInfo(Start = startVal, SpecifiedNumber = SpecifiedNumber, WindowSize = WindowSize, ChainID = TEST_CHAIN_ID)
221223
deliverUpdateMsg = ab_pb2.DeliverUpdate(Seek = seekInfo)
222224
return deliverUpdateMsg
223225

@@ -227,7 +229,8 @@ def generateBroadcastMessages(numToGenerate = 1, timeToHoldOpen = 1):
227229
for i in range(0, numToGenerate):
228230
envelope = common_pb2.Envelope()
229231
payload = common_pb2.Payload(header = common_pb2.Header(chainHeader = common_pb2.ChainHeader()))
230-
# TODO, appropriately set the header type
232+
payload.header.chainHeader.chainID = TEST_CHAIN_ID
233+
payload.header.chainHeader.type = common_pb2.ENDORSER_TRANSACTION
231234
payload.data = str("BDD test: {0}".format(datetime.datetime.utcnow()))
232235
envelope.payload = payload.SerializeToString()
233236
messages.append(envelope)

orderer/common/blockcutter/blockcutter.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type receiver struct {
5252
curBatch []*cb.Envelope
5353
}
5454

55+
// NewReceiverImpl creates a Receiver implementation based on the given batchsize, filters, and configtx manager
5556
func NewReceiverImpl(batchSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Receiver {
5657
return &receiver{
5758
batchSize: batchSize,
@@ -106,9 +107,8 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
106107
secondBatch := []*cb.Envelope{msg}
107108
if firstBatch == nil {
108109
return [][]*cb.Envelope{secondBatch}, true
109-
} else {
110-
return [][]*cb.Envelope{firstBatch, secondBatch}, true
111110
}
111+
return [][]*cb.Envelope{firstBatch, secondBatch}, true
112112
case broadcastfilter.Reject:
113113
logger.Debugf("Rejecting message")
114114
return nil, false

orderer/common/bootstrap/static/static.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ import (
2626
ab "github.com/hyperledger/fabric/protos/orderer"
2727
)
2828

29-
var TestChainID = "**TEST_CHAINID**"
29+
// TestChainID is the default chain ID which is used by all statically bootstrapped networks
30+
// This is necessary to allow test clients to connect without being rejected for targetting
31+
// a chain which does not exist
32+
const TestChainID = "**TEST_CHAINID**"
3033

3134
const msgVersion = int32(1)
3235

@@ -38,12 +41,20 @@ type bootstrapper struct {
3841
batchSize int32
3942
}
4043

44+
const (
45+
// DefaultBatchSize is the default value of BatchSizeKey
46+
DefaultBatchSize = 10
47+
48+
// DefaultConsensusType is the default value of ConsensusTypeKey
49+
DefaultConsensusType = "solo"
50+
)
51+
4152
// New returns a new static bootstrap helper.
4253
func New() bootstrap.Helper {
4354
return &bootstrapper{
4455
chainID: TestChainID,
45-
consensusType: "solo",
46-
batchSize: 10,
56+
consensusType: DefaultConsensusType,
57+
batchSize: DefaultBatchSize,
4758
}
4859
}
4960

orderer/common/broadcast/broadcast.go

+39-27
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package broadcast
1818

1919
import (
2020
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
21-
"github.com/hyperledger/fabric/orderer/multichain"
2221
cb "github.com/hyperledger/fabric/protos/common"
2322
ab "github.com/hyperledger/fabric/protos/orderer"
2423
"github.com/op/go-logging"
@@ -38,18 +37,30 @@ type Handler interface {
3837
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
3938
}
4039

40+
// SupportManager provides a way for the Handler to look up the Support for a chain
41+
type SupportManager interface {
42+
GetChain(chainID string) (Support, bool)
43+
}
44+
45+
// Support provides the backing resources needed to support broadcast on a chain
46+
type Support interface {
47+
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
48+
Enqueue(env *cb.Envelope) bool
49+
50+
// Filters returns the set of broadcast filters for this chain
51+
Filters() *broadcastfilter.RuleSet
52+
}
53+
4154
type handlerImpl struct {
4255
queueSize int
43-
ml multichain.Manager
44-
exitChan chan struct{}
56+
sm SupportManager
4557
}
4658

4759
// NewHandlerImpl constructs a new implementation of the Handler interface
48-
func NewHandlerImpl(queueSize int, ml multichain.Manager) Handler {
60+
func NewHandlerImpl(sm SupportManager, queueSize int) Handler {
4961
return &handlerImpl{
5062
queueSize: queueSize,
51-
ml: ml,
52-
exitChan: make(chan struct{}),
63+
sm: sm,
5364
}
5465
}
5566

@@ -61,44 +72,45 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
6172
return b.queueEnvelopes(srv)
6273
}
6374

64-
type msgAndChainSupport struct {
65-
msg *cb.Envelope
66-
chainSupport multichain.ChainSupport
75+
type msgAndSupport struct {
76+
msg *cb.Envelope
77+
support Support
6778
}
6879

6980
type broadcaster struct {
70-
bs *handlerImpl
71-
queue chan *msgAndChainSupport
81+
bs *handlerImpl
82+
queue chan *msgAndSupport
83+
exitChan chan struct{}
7284
}
7385

7486
func newBroadcaster(bs *handlerImpl) *broadcaster {
7587
b := &broadcaster{
76-
bs: bs,
77-
queue: make(chan *msgAndChainSupport, bs.queueSize),
88+
bs: bs,
89+
queue: make(chan *msgAndSupport, bs.queueSize),
90+
exitChan: make(chan struct{}),
7891
}
7992
return b
8093
}
8194

8295
func (b *broadcaster) drainQueue() {
83-
for {
84-
select {
85-
case msgAndChainSupport, ok := <-b.queue:
86-
if ok {
87-
if !msgAndChainSupport.chainSupport.Chain().Enqueue(msgAndChainSupport.msg) {
88-
return
89-
}
90-
} else {
91-
return
92-
}
93-
case <-b.bs.exitChan:
96+
defer close(b.exitChan)
97+
for msgAndSupport := range b.queue {
98+
if !msgAndSupport.support.Enqueue(msgAndSupport.msg) {
99+
logger.Debugf("Consenter instructed us to shut down")
94100
return
95101
}
96102
}
103+
logger.Debugf("Exiting because the queue channel closed")
97104
}
98105

99106
func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error {
100107

101108
for {
109+
select {
110+
case <-b.exitChan:
111+
return nil
112+
default:
113+
}
102114
msg, err := srv.Recv()
103115
if err != nil {
104116
return err
@@ -111,20 +123,20 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
111123
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
112124
}
113125

114-
chainSupport, ok := b.bs.ml.GetChain(payload.Header.ChainHeader.ChainID)
126+
support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID)
115127
if !ok {
116128
// XXX Hook in chain creation logic here
117129
panic("Unimplemented")
118130
}
119131

120-
action, _ := chainSupport.Filters().Apply(msg)
132+
action, _ := support.Filters().Apply(msg)
121133

122134
switch action {
123135
case broadcastfilter.Reconfigure:
124136
fallthrough
125137
case broadcastfilter.Accept:
126138
select {
127-
case b.queue <- &msgAndChainSupport{msg: msg, chainSupport: chainSupport}:
139+
case b.queue <- &msgAndSupport{msg: msg, support: support}:
128140
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
129141
default:
130142
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})

0 commit comments

Comments
 (0)