Skip to content

Commit b53de80

Browse files
author
Jason Yellick
committed
[FAB-1279] Add dynamic chain creation path
This changeset introduces the runtime chain creation logic into the orderer. It's presently only exercised via unit tests, pending the creation of bdd and sample clients to exercise this behavior. The chain creation checking is still somewhat limited, and does not enforce the sorts of keys which can be created, or do real policy evaluation at this point, but the plug points are there. Change-Id: I44b904884d81f1e6e279c7214238950b8c6b059f Signed-off-by: Jason Yellick <[email protected]>
1 parent 746b873 commit b53de80

11 files changed

+991
-72
lines changed

orderer/common/broadcast/broadcast.go

+26-14
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ type Handler interface {
3939

4040
// SupportManager provides a way for the Handler to look up the Support for a chain
4141
type SupportManager interface {
42+
// GetChain gets the chain support for a given ChainID
4243
GetChain(chainID string) (Support, bool)
44+
45+
// ProposeChain accepts a configuration transaction for a chain which does not already exists
46+
// The status returned is whether the proposal is accepted for consideration, only after consensus
47+
// occurs will the proposal be committed or rejected
48+
ProposeChain(env *cb.Envelope) cb.Status
4349
}
4450

4551
// Support provides the backing resources needed to support broadcast on a chain
@@ -125,21 +131,27 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
125131

126132
support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID)
127133
if !ok {
128-
// XXX Hook in chain creation logic here
129-
panic("Unimplemented")
130-
}
131-
132-
_, filterErr := support.Filters().Apply(msg)
133-
134-
if filterErr != nil {
135-
logger.Debugf("Rejecting broadcast message")
136-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
134+
// Chain not found, maybe create one?
135+
if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
136+
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
137+
} else {
138+
logger.Debugf("Proposing new chain")
139+
err = srv.Send(&ab.BroadcastResponse{Status: b.bs.sm.ProposeChain(msg)})
140+
}
137141
} else {
138-
select {
139-
case b.queue <- &msgAndSupport{msg: msg, support: support}:
140-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
141-
default:
142-
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
142+
// Normal transaction for existing chain
143+
_, filterErr := support.Filters().Apply(msg)
144+
145+
if filterErr != nil {
146+
logger.Debugf("Rejecting broadcast message")
147+
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
148+
} else {
149+
select {
150+
case b.queue <- &msgAndSupport{msg: msg, support: support}:
151+
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
152+
default:
153+
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
154+
}
143155
}
144156
}
145157

orderer/common/broadcast/broadcast_test.go

+72-8
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import (
2020
"fmt"
2121
"testing"
2222

23-
"github.com/golang/protobuf/proto"
2423
"github.com/hyperledger/fabric/orderer/common/filter"
2524
cb "github.com/hyperledger/fabric/protos/common"
2625
ab "github.com/hyperledger/fabric/protos/orderer"
26+
"github.com/hyperledger/fabric/protos/utils"
2727

2828
"google.golang.org/grpc"
2929
)
@@ -65,6 +65,19 @@ func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
6565
return chain, ok
6666
}
6767

68+
func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status {
69+
payload := utils.ExtractPayloadOrPanic(configTx)
70+
71+
mm.chains[string(payload.Header.ChainHeader.ChainID)] = &mockSupport{
72+
filters: filter.NewRuleSet([]filter.Rule{
73+
filter.EmptyRejectRule,
74+
filter.AcceptRule,
75+
}),
76+
queue: make(chan *cb.Envelope),
77+
}
78+
return cb.Status_SUCCESS
79+
}
80+
6881
func (mm *mockSupportManager) halt() {
6982
for _, chain := range mm.chains {
7083
chain.halt()
@@ -95,6 +108,21 @@ func (ms *mockSupport) halt() {
95108
}
96109
}
97110

111+
func makeConfigMessage(chainID string) *cb.Envelope {
112+
payload := &cb.Payload{
113+
Data: utils.MarshalOrPanic(&cb.ConfigurationEnvelope{}),
114+
Header: &cb.Header{
115+
ChainHeader: &cb.ChainHeader{
116+
ChainID: chainID,
117+
Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION),
118+
},
119+
},
120+
}
121+
return &cb.Envelope{
122+
Payload: utils.MarshalOrPanic(payload),
123+
}
124+
}
125+
98126
func makeMessage(chainID string, data []byte) *cb.Envelope {
99127
payload := &cb.Payload{
100128
Data: data,
@@ -104,14 +132,9 @@ func makeMessage(chainID string, data []byte) *cb.Envelope {
104132
},
105133
},
106134
}
107-
data, err := proto.Marshal(payload)
108-
if err != nil {
109-
panic(err)
135+
return &cb.Envelope{
136+
Payload: utils.MarshalOrPanic(payload),
110137
}
111-
env := &cb.Envelope{
112-
Payload: data,
113-
}
114-
return env
115138
}
116139

117140
func getMultichainManager() *mockSupportManager {
@@ -200,3 +223,44 @@ func TestEmptyEnvelope(t *testing.T) {
200223
}
201224

202225
}
226+
227+
func TestBadChainID(t *testing.T) {
228+
mm := getMultichainManager()
229+
defer mm.halt()
230+
bh := NewHandlerImpl(mm, 2)
231+
m := newMockB()
232+
defer close(m.recvChan)
233+
go bh.Handle(m)
234+
235+
m.recvChan <- makeMessage("Wrong chain", []byte("Some bytes"))
236+
reply := <-m.sendChan
237+
if reply.Status != cb.Status_NOT_FOUND {
238+
t.Fatalf("Should have rejected message to a chain which does not exist")
239+
}
240+
241+
}
242+
243+
func TestNewChainID(t *testing.T) {
244+
mm := getMultichainManager()
245+
defer mm.halt()
246+
bh := NewHandlerImpl(mm, 2)
247+
m := newMockB()
248+
defer close(m.recvChan)
249+
go bh.Handle(m)
250+
251+
m.recvChan <- makeConfigMessage("New chain")
252+
reply := <-m.sendChan
253+
if reply.Status != cb.Status_SUCCESS {
254+
t.Fatalf("Should have created a new chain, got %d", reply.Status)
255+
}
256+
257+
if len(mm.chains) != 2 {
258+
t.Fatalf("Should have created a new chain")
259+
}
260+
261+
m.recvChan <- makeMessage("New chain", []byte("Some bytes"))
262+
reply = <-m.sendChan
263+
if reply.Status != cb.Status_SUCCESS {
264+
t.Fatalf("Should have successfully sent message to new chain")
265+
}
266+
}

orderer/multichain/chainsupport.go

+33-4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ type ChainSupport interface {
6767
broadcast.Support
6868
deliver.Support
6969
ConsenterSupport
70+
71+
// ChainID returns the ChainID for this chain support
72+
ChainID() string
73+
74+
// ConfigTxManager returns the corresponding configtx.Manager for this chain
75+
ConfigTxManager() configtx.Manager
7076
}
7177

7278
type chainSupport struct {
@@ -79,9 +85,16 @@ type chainSupport struct {
7985
filters *filter.RuleSet
8086
}
8187

82-
func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, sharedConfigManager sharedconfig.Manager, consenters map[string]Consenter) *chainSupport {
88+
func newChainSupport(
89+
filters *filter.RuleSet,
90+
configManager configtx.Manager,
91+
policyManager policies.Manager,
92+
backing rawledger.ReadWriter,
93+
sharedConfigManager sharedconfig.Manager,
94+
consenters map[string]Consenter,
95+
) *chainSupport {
96+
8397
batchSize := sharedConfigManager.BatchSize() // XXX this needs to be pushed deeper so that the blockcutter queries it after each write for reconfiguration support
84-
filters := createBroadcastRuleset(configManager)
8598
cutter := blockcutter.NewReceiverImpl(batchSize, filters)
8699
consenterType := sharedConfigManager.ConsensusType()
87100
consenter, ok := consenters[consenterType]
@@ -107,12 +120,24 @@ func newChainSupport(configManager configtx.Manager, policyManager policies.Mana
107120
return cs
108121
}
109122

110-
func createBroadcastRuleset(configManager configtx.Manager) *filter.RuleSet {
123+
// createStandardFilters creates the set of filters for a normal (non-system) chain
124+
func createStandardFilters(configManager configtx.Manager) *filter.RuleSet {
111125
return filter.NewRuleSet([]filter.Rule{
112126
filter.EmptyRejectRule,
113127
configtx.NewFilter(configManager),
114128
filter.AcceptRule,
115129
})
130+
131+
}
132+
133+
// createSystemChainFilters creates the set of filters for the ordering system chain
134+
func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager) *filter.RuleSet {
135+
return filter.NewRuleSet([]filter.Rule{
136+
filter.EmptyRejectRule,
137+
newSystemChainFilter(ml),
138+
configtx.NewFilter(configManager),
139+
filter.AcceptRule,
140+
})
116141
}
117142

118143
func (cs *chainSupport) start() {
@@ -123,10 +148,14 @@ func (cs *chainSupport) SharedConfig() sharedconfig.Manager {
123148
return cs.sharedConfigManager
124149
}
125150

126-
func (cs *chainSupport) ConfigManager() configtx.Manager {
151+
func (cs *chainSupport) ConfigTxManager() configtx.Manager {
127152
return cs.configManager
128153
}
129154

155+
func (cs *chainSupport) ChainID() string {
156+
return cs.configManager.ChainID()
157+
}
158+
130159
func (cs *chainSupport) PolicyManager() policies.Manager {
131160
return cs.policyManager
132161
}

orderer/multichain/manager.go

+70-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
package multichain
1818

1919
import (
20-
"sync"
20+
"fmt"
2121

2222
"github.com/hyperledger/fabric/orderer/common/configtx"
2323
"github.com/hyperledger/fabric/orderer/common/policies"
@@ -48,13 +48,18 @@ func init() {
4848
type Manager interface {
4949
// GetChain retrieves the chain support for a chain (and whether it exists)
5050
GetChain(chainID string) (ChainSupport, bool)
51+
52+
// ProposeChain accepts a configuration transaction for a chain which does not already exists
53+
// The status returned is whether the proposal is accepted for consideration, only after consensus
54+
// occurs will the proposal be committed or rejected
55+
ProposeChain(env *cb.Envelope) cb.Status
5156
}
5257

5358
type multiLedger struct {
5459
chains map[string]*chainSupport
5560
consenters map[string]Consenter
5661
ledgerFactory rawledger.Factory
57-
mutex sync.Mutex
62+
sysChain *systemChain
5863
}
5964

6065
// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one
@@ -104,6 +109,7 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse
104109
ml := &multiLedger{
105110
chains: make(map[string]*chainSupport),
106111
ledgerFactory: ledgerFactory,
112+
consenters: consenters,
107113
}
108114

109115
existingChains := ledgerFactory.ChainIDs()
@@ -118,23 +124,43 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse
118124
}
119125
configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx)
120126
chainID := configManager.ChainID()
121-
ml.chains[chainID] = newChainSupport(configManager, policyManager, backingLedger, sharedConfigManager, consenters)
122-
}
123127

124-
for _, cs := range ml.chains {
125-
cs.start()
128+
if sharedConfigManager.ChainCreators() != nil {
129+
if ml.sysChain != nil {
130+
logger.Fatalf("There appear to be two system chains %x and %x", ml.sysChain.support.ChainID(), chainID)
131+
}
132+
logger.Debugf("Starting with system chain: %x", chainID)
133+
chain := newChainSupport(createSystemChainFilters(ml, configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
134+
ml.chains[string(chainID)] = chain
135+
ml.sysChain = newSystemChain(chain)
136+
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
137+
defer chain.start()
138+
} else {
139+
logger.Debugf("Starting chain: %x", chainID)
140+
chain := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
141+
ml.chains[string(chainID)] = chain
142+
chain.start()
143+
}
144+
126145
}
127146

128147
return ml
129148
}
130149

150+
// ProposeChain accepts a configuration transaction for a chain which does not already exists
151+
// The status returned is whether the proposal is accepted for consideration, only after consensus
152+
// occurs will the proposal be committed or rejected
153+
func (ml *multiLedger) ProposeChain(env *cb.Envelope) cb.Status {
154+
return ml.sysChain.proposeChain(env)
155+
}
156+
131157
// GetChain retrieves the chain support for a chain (and whether it exists)
132158
func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) {
133159
cs, ok := ml.chains[chainID]
134160
return cs, ok
135161
}
136162

137-
func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter, sharedconfig.Manager) {
163+
func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) {
138164
policyManager := policies.NewManagerImpl(xxxCryptoHelper{})
139165
sharedConfigManager := sharedconfig.NewManagerImpl()
140166
configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler)
@@ -150,6 +176,15 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po
150176
}
151177
}
152178

179+
configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap)
180+
if err != nil {
181+
return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err)
182+
}
183+
184+
return configManager, policyManager, sharedConfigManager, nil
185+
}
186+
187+
func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter, sharedconfig.Manager) {
153188
payload := &cb.Payload{}
154189
err := proto.Unmarshal(configTx.Payload, payload)
155190
if err != nil {
@@ -162,9 +197,10 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po
162197
logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err)
163198
}
164199

165-
configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap)
200+
configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope)
201+
166202
if err != nil {
167-
logger.Fatalf("Error unpacking configuration transaction: %s", err)
203+
logger.Fatalf("Error creating configtx manager and handlers: %s", err)
168204
}
169205

170206
chainID := configManager.ChainID()
@@ -176,3 +212,28 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po
176212

177213
return configManager, policyManager, ledger, sharedConfigManager
178214
}
215+
216+
func (ml *multiLedger) systemChain() *systemChain {
217+
return ml.sysChain
218+
}
219+
220+
func (ml *multiLedger) newChain(configtx *cb.Envelope) {
221+
configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx)
222+
backingLedger.Append([]*cb.Envelope{configtx}, nil)
223+
224+
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
225+
newChains := make(map[string]*chainSupport)
226+
for key, value := range ml.chains {
227+
newChains[key] = value
228+
}
229+
230+
cs := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfig, ml.consenters)
231+
chainID := configManager.ChainID()
232+
233+
logger.Debugf("Created and starting new chain %s", chainID)
234+
235+
newChains[string(chainID)] = cs
236+
cs.start()
237+
238+
ml.chains = newChains
239+
}

0 commit comments

Comments
 (0)