Skip to content

Commit 5455c58

Browse files
author
Jason Yellick
committed
[FAB-2366] Convert channel creation to common path
https://jira.hyperledger.org/browse/FAB-2366 With FAB-2364, a common code path for config updates and channel creation was introduced, but not hooked into the production code path. This CR switches broadcast to use this new path, and removes the old channel creation path from the multichain manager. Change-Id: I59fc9c5e47b6abd8c6e18f9d8c9dd9cbf9a3c114 Signed-off-by: Jason Yellick <[email protected]>
1 parent 1219131 commit 5455c58

12 files changed

+185
-363
lines changed

common/configtx/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ func NewManagerImpl(configEnv *cb.ConfigEnvelope, initializer api.Initializer, c
108108
return nil, fmt.Errorf("Nil config envelope Config")
109109
}
110110

111+
if configEnv.Config.Channel == nil {
112+
return nil, fmt.Errorf("Nil config envelope Config.Channel")
113+
}
114+
111115
if configEnv.Config.Header == nil {
112116
return nil, fmt.Errorf("Nil config envelop Config Header")
113117
}

orderer/common/broadcast/broadcast.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ import (
2929

3030
var logger = logging.MustGetLogger("orderer/common/broadcast")
3131

32+
// ConfigUpdateProcessor is used to transform CONFIG_UPDATE transactions which are used to generate other envelope
33+
// message types with preprocessing by the orderer
34+
type ConfigUpdateProcessor interface {
35+
// Process takes in an envelope of type CONFIG_UPDATE and proceses it
36+
// to transform it either into another envelope type
37+
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
38+
}
39+
3240
// Handler defines an interface which handles broadcasts
3341
type Handler interface {
3442
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
@@ -37,13 +45,10 @@ type Handler interface {
3745

3846
// SupportManager provides a way for the Handler to look up the Support for a chain
3947
type SupportManager interface {
48+
ConfigUpdateProcessor
49+
4050
// GetChain gets the chain support for a given ChannelId
4151
GetChain(chainID string) (Support, bool)
42-
43-
// ProposeChain accepts a configuration transaction for a chain which does not already exists
44-
// The status returned is whether the proposal is accepted for consideration, only after consensus
45-
// occurs will the proposal be committed or rejected
46-
ProposeChain(env *cb.Envelope) cb.Status
4752
}
4853

4954
// Support provides the backing resources needed to support broadcast on a chain
@@ -84,23 +89,27 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
8489
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
8590
}
8691

87-
support, ok := bh.sm.GetChain(payload.Header.ChannelHeader.ChannelId)
88-
if !ok {
89-
// Chain not found, maybe create one?
90-
if payload.Header.ChannelHeader.Type != int32(cb.HeaderType_CONFIG_UPDATE) {
91-
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
92+
if payload.Header.ChannelHeader.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
93+
logger.Debugf("Preprocessing CONFIG_UPDATE")
94+
msg, err = bh.sm.Process(msg)
95+
if err != nil {
96+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
9297
}
9398

94-
logger.Debugf("Proposing new chain")
95-
err = srv.Send(&ab.BroadcastResponse{Status: bh.sm.ProposeChain(msg)})
96-
if err != nil {
97-
return err
99+
err = proto.Unmarshal(msg.Payload, payload)
100+
if payload.Header == nil || payload.Header.ChannelHeader == nil || payload.Header.ChannelHeader.ChannelId == "" {
101+
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
102+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
98103
}
99-
continue
104+
}
105+
106+
support, ok := bh.sm.GetChain(payload.Header.ChannelHeader.ChannelId)
107+
if !ok {
108+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
100109
}
101110

102111
if logger.IsEnabledFor(logging.DEBUG) {
103-
logger.Debugf("Broadcast is filtering message for chain %s", payload.Header.ChannelHeader.ChannelId)
112+
logger.Debugf("Broadcast is filtering message for channel %s", payload.Header.ChannelHeader.ChannelId)
104113
}
105114

106115
// Normal transaction for existing chain

orderer/common/broadcast/broadcast_test.go

+20-22
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/hyperledger/fabric/protos/utils"
2828

2929
logging "github.com/op/go-logging"
30+
"github.com/stretchr/testify/assert"
3031
"google.golang.org/grpc"
3132
)
3233

@@ -63,24 +64,20 @@ func (m *mockB) Recv() (*cb.Envelope, error) {
6364
}
6465

6566
type mockSupportManager struct {
66-
chains map[string]*mockSupport
67+
chains map[string]*mockSupport
68+
ProcessVal *cb.Envelope
6769
}
6870

6971
func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
7072
chain, ok := mm.chains[chainID]
7173
return chain, ok
7274
}
7375

74-
func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status {
75-
payload := utils.ExtractPayloadOrPanic(configTx)
76-
77-
mm.chains[string(payload.Header.ChannelHeader.ChannelId)] = &mockSupport{
78-
filters: filter.NewRuleSet([]filter.Rule{
79-
filter.EmptyRejectRule,
80-
filter.AcceptRule,
81-
}),
76+
func (mm *mockSupportManager) Process(configTx *cb.Envelope) (*cb.Envelope, error) {
77+
if mm.ProcessVal == nil {
78+
return nil, fmt.Errorf("Nil result implies error")
8279
}
83-
return cb.Status_SUCCESS
80+
return mm.ProcessVal, nil
8481
}
8582

8683
type mockSupport struct {
@@ -222,8 +219,9 @@ func TestBadChannelId(t *testing.T) {
222219
}
223220
}
224221

225-
func TestNewChannelId(t *testing.T) {
222+
func TestGoodConfigUpdate(t *testing.T) {
226223
mm, _ := getMockSupportManager()
224+
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: &cb.ChannelHeader{ChannelId: systemChain}}})}
227225
bh := NewHandlerImpl(mm)
228226
m := newMockB()
229227
defer close(m.recvChan)
@@ -232,17 +230,17 @@ func TestNewChannelId(t *testing.T) {
232230

233231
m.recvChan <- makeConfigMessage(newChannelId)
234232
reply := <-m.sendChan
235-
if reply.Status != cb.Status_SUCCESS {
236-
t.Fatalf("Should have created a new chain, got %d", reply.Status)
237-
}
233+
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
234+
}
238235

239-
if len(mm.chains) != 2 {
240-
t.Fatalf("Should have created a new chain")
241-
}
236+
func TestBadConfigUpdate(t *testing.T) {
237+
mm, _ := getMockSupportManager()
238+
bh := NewHandlerImpl(mm)
239+
m := newMockB()
240+
defer close(m.recvChan)
241+
go bh.Handle(m)
242242

243-
m.recvChan <- makeMessage(newChannelId, []byte("Some bytes"))
244-
reply = <-m.sendChan
245-
if reply.Status != cb.Status_SUCCESS {
246-
t.Fatalf("Should have successfully sent message to new chain, got %v", reply)
247-
}
243+
m.recvChan <- makeConfigMessage(systemChain)
244+
reply := <-m.sendChan
245+
assert.NotEqual(t, cb.Status_SUCCESS, reply.Status, "Should have rejected CONFIG_UPDATE")
248246
}

orderer/main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,13 @@ func main() {
151151
consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry, conf.Kafka.TLS)
152152
consenters["sbft"] = sbft.New(makeSbftConsensusConfig(conf), makeSbftStackConfig(conf))
153153

154-
manager := multichain.NewManagerImpl(lf, consenters, localmsp.NewSigner())
154+
signer := localmsp.NewSigner()
155+
156+
manager := multichain.NewManagerImpl(lf, consenters, signer)
155157

156158
server := NewServer(
157159
manager,
158-
int(conf.General.QueueSize),
159-
int(conf.General.MaxWindowSize),
160+
signer,
160161
)
161162

162163
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)

orderer/multichain/chainsupport.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type ChainSupport interface {
8585

8686
broadcast.Support
8787
ConsenterSupport
88+
89+
// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
90+
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
8891
}
8992

9093
type chainSupport struct {
@@ -155,7 +158,7 @@ func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources)
155158
filter.EmptyRejectRule,
156159
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
157160
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
158-
newSystemChainFilter(ml),
161+
newSystemChainFilter(ledgerResources, ml),
159162
configtxfilter.NewFilter(ledgerResources),
160163
filter.AcceptRule,
161164
})

orderer/multichain/manager.go

+13-22
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@ type Manager interface {
3838
// GetChain retrieves the chain support for a chain (and whether it exists)
3939
GetChain(chainID string) (ChainSupport, bool)
4040

41-
// ProposeChain accepts a config transaction for a chain which does not already exists
42-
// The status returned is whether the proposal is accepted for consideration, only after consensus
43-
// occurs will the proposal be committed or rejected
44-
ProposeChain(env *cb.Envelope) cb.Status
41+
// SystemChannelID returns the channel ID for the system channel
42+
SystemChannelID() string
4543
}
4644

4745
type configResources struct {
@@ -58,11 +56,11 @@ type ledgerResources struct {
5856
}
5957

6058
type multiLedger struct {
61-
chains map[string]*chainSupport
62-
consenters map[string]Consenter
63-
ledgerFactory ordererledger.Factory
64-
sysChain *systemChain
65-
signer crypto.LocalSigner
59+
chains map[string]*chainSupport
60+
consenters map[string]Consenter
61+
ledgerFactory ordererledger.Factory
62+
signer crypto.LocalSigner
63+
systemChannelID string
6664
}
6765

6866
func getConfigTx(reader ordererledger.Reader) *cb.Envelope {
@@ -102,16 +100,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
102100
chainID := ledgerResources.ChainID()
103101

104102
if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil {
105-
if ml.sysChain != nil {
106-
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
103+
if ml.systemChannelID != "" {
104+
logger.Fatalf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
107105
}
108106
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
109107
ledgerResources,
110108
consenters,
111109
signer)
112110
logger.Infof("Starting with system channel: %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
113111
ml.chains[string(chainID)] = chain
114-
ml.sysChain = newSystemChain(chain)
112+
ml.systemChannelID = chainID
115113
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
116114
defer chain.start()
117115
} else {
@@ -126,18 +124,15 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
126124

127125
}
128126

129-
if ml.sysChain == nil {
127+
if ml.systemChannelID == "" {
130128
logger.Panicf("No system chain found")
131129
}
132130

133131
return ml
134132
}
135133

136-
// ProposeChain accepts a config transaction for a chain which does not already exists
137-
// The status returned is whether the proposal is accepted for consideration, only after consensus
138-
// occurs will the proposal be committed or rejected
139-
func (ml *multiLedger) ProposeChain(env *cb.Envelope) cb.Status {
140-
return ml.sysChain.proposeChain(env)
134+
func (ml *multiLedger) SystemChannelID() string {
135+
return ml.systemChannelID
141136
}
142137

143138
// GetChain retrieves the chain support for a chain (and whether it exists)
@@ -190,10 +185,6 @@ func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResource
190185
}
191186
}
192187

193-
func (ml *multiLedger) systemChain() *systemChain {
194-
return ml.sysChain
195-
}
196-
197188
func (ml *multiLedger) newChain(configtx *cb.Envelope) {
198189
ledgerResources := ml.newLedgerResources(configtx)
199190
ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))

orderer/multichain/manager_test.go

+12-31
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/hyperledger/fabric/common/configtx"
25+
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
2526
genesisconfig "github.com/hyperledger/fabric/common/configtx/tool/localconfig"
2627
"github.com/hyperledger/fabric/common/configtx/tool/provisional"
2728
mockcrypto "github.com/hyperledger/fabric/common/mocks/crypto"
@@ -31,7 +32,6 @@ import (
3132
ab "github.com/hyperledger/fabric/protos/orderer"
3233
"github.com/hyperledger/fabric/protos/utils"
3334

34-
"github.com/hyperledger/fabric/msp"
3535
logging "github.com/op/go-logging"
3636
"github.com/stretchr/testify/assert"
3737
)
@@ -223,23 +223,18 @@ func TestNewChain(t *testing.T) {
223223

224224
manager := NewManagerImpl(lf, consenters, mockCrypto())
225225

226-
generator := provisional.New(conf)
227-
channelTemplate := generator.ChannelTemplate()
228-
229-
signer, err := msp.NewNoopMsp().GetDefaultSigningIdentity()
230-
assert.NoError(t, err)
231-
232226
newChainID := "TestNewChain"
233-
newChainMessage, err := configtx.MakeChainCreationTransaction(provisional.AcceptAllPolicyKey, newChainID, signer, channelTemplate)
227+
228+
configEnv, err := configtx.NewChainCreationTemplate(provisional.AcceptAllPolicyKey, configtxtest.CompositeTemplate()).Envelope(newChainID)
234229
if err != nil {
235-
t.Fatalf("Error producing config transaction: %s", err)
230+
t.Fatalf("Error constructing configtx")
236231
}
232+
ingressTx := makeConfigTxFromConfigUpdateEnvelope(newChainID, configEnv)
233+
wrapped := wrapConfigTx(ingressTx)
237234

238-
status := manager.ProposeChain(newChainMessage)
239-
240-
if status != cb.Status_SUCCESS {
241-
t.Fatalf("Error submitting chain creation request")
242-
}
235+
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
236+
assert.True(t, ok, "Could not find system channel")
237+
chainSupport.Enqueue(wrapped)
243238

244239
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
245240
select {
@@ -251,21 +246,13 @@ func TestNewChain(t *testing.T) {
251246
if len(block.Data.Data) != 1 {
252247
t.Fatalf("Should have had only one message in the orderer transaction block")
253248
}
254-
configEnv, err := configtx.UnmarshalConfigEnvelope(utils.UnmarshalPayloadOrPanic(
255-
utils.UnmarshalEnvelopeOrPanic(utils.UnmarshalPayloadOrPanic(utils.ExtractEnvelopeOrPanic(block, 0).Payload).Data).Payload).Data)
256249

257-
if err != nil {
258-
t.Fatal(err)
259-
}
260-
261-
if !reflect.DeepEqual(configEnv.LastUpdate, newChainMessage) {
262-
t.Errorf("Orderer config block contains wrong transaction, expected %v got %v", configEnv.LastUpdate, newChainMessage)
263-
}
250+
assert.Equal(t, wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Orderer config block contains wrong transaction")
264251
case <-time.After(time.Second):
265252
t.Fatalf("Block 1 not produced after timeout in system chain")
266253
}
267254

268-
chainSupport, ok := manager.GetChain(newChainID)
255+
chainSupport, ok = manager.GetChain(newChainID)
269256

270257
if !ok {
271258
t.Fatalf("Should have gotten new chain which was created")
@@ -290,14 +277,8 @@ func TestNewChain(t *testing.T) {
290277
if len(block.Data.Data) != 1 {
291278
t.Fatalf("Should have had only one message in the new genesis block")
292279
}
293-
configEnv, err := configtx.ConfigEnvelopeFromBlock(block)
294-
if err != nil {
295-
t.Fatal(err)
296-
}
297280

298-
if !reflect.DeepEqual(configEnv.LastUpdate, newChainMessage) {
299-
t.Errorf("Genesis block contains wrong transaction, expected %v got %v", configEnv.LastUpdate, newChainMessage)
300-
}
281+
assert.Equal(t, ingressTx, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Genesis block contains wrong transaction")
301282
case <-time.After(time.Second):
302283
t.Fatalf("Block 1 not produced after timeout in system chain")
303284
}

0 commit comments

Comments
 (0)