Skip to content

Commit 157479b

Browse files
author
Jason Yellick
committed
Create multichain manager
The multichain manager will ultimately supplant much of what is main.go and generally coordinate the creation of the consenter backed chains and provide an interface for the common broadcast/deliver logic. This changeset introduces the manager but makes no attempt to hook it into the rest of the system (for ease of review). Change-Id: I89072ff497dbf0395f1864587e034d5906217cea Signed-off-by: Jason Yellick <[email protected]> Signed-off-by: Yacov Manevich <[email protected]>
1 parent 69a3aa6 commit 157479b

File tree

4 files changed

+548
-0
lines changed

4 files changed

+548
-0
lines changed

orderer/multichain/chainsupport.go

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multichain
18+
19+
import (
20+
"github.com/hyperledger/fabric/orderer/common/blockcutter"
21+
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
22+
"github.com/hyperledger/fabric/orderer/common/configtx"
23+
"github.com/hyperledger/fabric/orderer/common/policies"
24+
"github.com/hyperledger/fabric/orderer/rawledger"
25+
cb "github.com/hyperledger/fabric/protos/common"
26+
)
27+
28+
const XXXBatchSize = 10 // XXX
29+
30+
// Consenter defines the backing ordering mechanism
31+
type Consenter interface {
32+
// HandleChain should create a return a reference to a Chain for the given set of resources
33+
// It will only be invoked for a given chain once per process. See the description of Chain
34+
// for more details
35+
HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain
36+
}
37+
38+
// Chain defines a way to inject messages for ordering
39+
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
40+
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
41+
// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows
42+
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
43+
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
44+
type Chain interface {
45+
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
46+
Enqueue(env *cb.Envelope) bool
47+
48+
// Start should allocate whatever resources are needed for staying up to date with the chain
49+
// Typically, this involves creating a thread which reads from the ordering source, passes those
50+
// messages to a block cutter, and writes the resulting blocks to the ledger
51+
Start()
52+
53+
// Halt frees the resources which were allocated for this Chain
54+
Halt()
55+
}
56+
57+
// ChainSupport provides a wrapper for the resources backing a chain
58+
type ChainSupport interface {
59+
// ConfigManager returns the current config for the chain
60+
ConfigManager() configtx.Manager
61+
62+
// PolicyManager returns the current policy manager as specified by the chain configuration
63+
PolicyManager() policies.Manager
64+
65+
// Filters returns the set of broadcast filters for this chain
66+
Filters() *broadcastfilter.RuleSet
67+
68+
// Reader returns the chain Reader for the chain
69+
Reader() rawledger.Reader
70+
71+
// Chain returns the consenter backed chain
72+
Chain() Chain
73+
}
74+
75+
type chainSupport struct {
76+
chain Chain
77+
configManager configtx.Manager
78+
policyManager policies.Manager
79+
reader rawledger.Reader
80+
writer rawledger.Writer
81+
filters *broadcastfilter.RuleSet
82+
}
83+
84+
func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, consenters map[string]Consenter) *chainSupport {
85+
batchSize := XXXBatchSize // XXX Pull this from chain config
86+
filters := createBroadcastRuleset(configManager)
87+
cutter := blockcutter.NewReceiverImpl(batchSize, filters, configManager)
88+
consenterType := "solo" // XXX retrieve this from the chain config
89+
consenter, ok := consenters[consenterType]
90+
if !ok {
91+
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
92+
}
93+
94+
cs := &chainSupport{
95+
configManager: configManager,
96+
policyManager: policyManager,
97+
filters: filters,
98+
reader: backing,
99+
writer: newWriteInterceptor(configManager, backing),
100+
}
101+
102+
cs.chain = consenter.HandleChain(configManager, cutter, cs.writer, nil)
103+
104+
return cs
105+
}
106+
107+
func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet {
108+
return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{
109+
broadcastfilter.EmptyRejectRule,
110+
// configfilter.New(configManager),
111+
broadcastfilter.AcceptRule,
112+
})
113+
}
114+
115+
func (cs *chainSupport) start() {
116+
cs.chain.Start()
117+
}
118+
119+
func (cs *chainSupport) ConfigManager() configtx.Manager {
120+
return cs.configManager
121+
}
122+
123+
func (cs *chainSupport) PolicyManager() policies.Manager {
124+
return cs.policyManager
125+
}
126+
127+
func (cs *chainSupport) Filters() *broadcastfilter.RuleSet {
128+
return cs.filters
129+
}
130+
131+
func (cs *chainSupport) Reader() rawledger.Reader {
132+
return cs.reader
133+
}
134+
135+
func (cs *chainSupport) Chain() Chain {
136+
return cs.chain
137+
}
138+
139+
type writeInterceptor struct {
140+
backing rawledger.Writer
141+
}
142+
143+
// TODO ultimately set write interception policy by config
144+
func newWriteInterceptor(configManager configtx.Manager, backing rawledger.Writer) *writeInterceptor {
145+
return &writeInterceptor{
146+
backing: backing,
147+
}
148+
}
149+
150+
func (wi *writeInterceptor) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block {
151+
return wi.backing.Append(blockContents, metadata)
152+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multichain
18+
19+
import (
20+
"github.com/hyperledger/fabric/orderer/common/blockcutter"
21+
"github.com/hyperledger/fabric/orderer/common/configtx"
22+
"github.com/hyperledger/fabric/orderer/rawledger"
23+
cb "github.com/hyperledger/fabric/protos/common"
24+
)
25+
26+
type mockConsenter struct {
27+
}
28+
29+
func (mc *mockConsenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain {
30+
return &mockChain{
31+
queue: make(chan *cb.Envelope),
32+
ledger: rl,
33+
cutter: cutter,
34+
}
35+
}
36+
37+
type mockChain struct {
38+
queue chan *cb.Envelope
39+
ledger rawledger.Writer
40+
cutter blockcutter.Receiver
41+
}
42+
43+
func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
44+
mch.queue <- env
45+
return true
46+
}
47+
48+
func (mch *mockChain) Start() {
49+
go func() {
50+
for {
51+
msg, ok := <-mch.queue
52+
if !ok {
53+
return
54+
}
55+
batches, _ := mch.cutter.Ordered(msg)
56+
for _, batch := range batches {
57+
mch.ledger.Append(batch, nil)
58+
}
59+
}
60+
}()
61+
}
62+
63+
func (mch *mockChain) Halt() {
64+
close(mch.queue)
65+
}

orderer/multichain/manager.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multichain
18+
19+
import (
20+
"sync"
21+
22+
"github.com/hyperledger/fabric/orderer/common/configtx"
23+
"github.com/hyperledger/fabric/orderer/common/policies"
24+
"github.com/hyperledger/fabric/orderer/rawledger"
25+
cb "github.com/hyperledger/fabric/protos/common"
26+
ab "github.com/hyperledger/fabric/protos/orderer"
27+
28+
"github.com/golang/protobuf/proto"
29+
"github.com/op/go-logging"
30+
)
31+
32+
var logger = logging.MustGetLogger("orderer/multichain")
33+
34+
// XXX This crypto helper is a stand in until we have a real crypto handler
35+
// it considers all signatures to be valid
36+
type xxxCryptoHelper struct{}
37+
38+
func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool {
39+
return true
40+
}
41+
42+
func init() {
43+
logging.SetLevel(logging.DEBUG, "")
44+
}
45+
46+
// Manager coordinates the creation and access of chains
47+
type Manager interface {
48+
// GetChain retrieves the chain support for a chain (and whether it exists)
49+
GetChain(chainID []byte) (ChainSupport, bool)
50+
}
51+
52+
type multiLedger struct {
53+
chains map[string]*chainSupport
54+
consenters map[string]Consenter
55+
ledgerFactory rawledger.Factory
56+
mutex sync.Mutex
57+
}
58+
59+
// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one
60+
func getConfigTx(reader rawledger.Reader) *cb.Envelope {
61+
var lastConfigTx *cb.Envelope
62+
63+
it, _ := reader.Iterator(ab.SeekInfo_OLDEST, 0)
64+
// Iterate over the blockchain, looking for config transactions, track the most recent one encountered
65+
// this will be the transaction which is returned
66+
for {
67+
select {
68+
case <-it.ReadyChan():
69+
block, status := it.Next()
70+
if status != cb.Status_SUCCESS {
71+
logger.Fatalf("Error parsing blockchain at startup: %v", status)
72+
}
73+
// ConfigTxs should always be by themselves
74+
if len(block.Data.Data) != 1 {
75+
continue
76+
}
77+
78+
maybeConfigTx := &cb.Envelope{}
79+
80+
err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)
81+
82+
if err != nil {
83+
logger.Fatalf("Found data which was not an envelope: %s", err)
84+
}
85+
86+
payload := &cb.Payload{}
87+
err = proto.Unmarshal(maybeConfigTx.Payload, payload)
88+
89+
if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
90+
continue
91+
}
92+
93+
logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number)
94+
lastConfigTx = maybeConfigTx
95+
default:
96+
return lastConfigTx
97+
}
98+
}
99+
}
100+
101+
// NewManagerImpl produces an instance of a Manager
102+
func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Consenter) Manager {
103+
ml := &multiLedger{
104+
chains: make(map[string]*chainSupport),
105+
ledgerFactory: ledgerFactory,
106+
}
107+
108+
existingChains := ledgerFactory.ChainIDs()
109+
for _, chainID := range existingChains {
110+
rl, err := ledgerFactory.GetOrCreate(chainID)
111+
if err != nil {
112+
logger.Fatalf("Ledger factory reported chainID %x but could not retrieve it: %s", chainID, err)
113+
}
114+
configTx := getConfigTx(rl)
115+
if configTx == nil {
116+
logger.Fatalf("Could not find configuration transaction for chain %x", chainID)
117+
}
118+
configManager, policyManager, backingLedger := ml.newResources(configTx)
119+
chainID := configManager.ChainID()
120+
ml.chains[string(chainID)] = newChainSupport(configManager, policyManager, backingLedger, consenters)
121+
}
122+
123+
for _, cs := range ml.chains {
124+
cs.start()
125+
}
126+
127+
return ml
128+
}
129+
130+
// GetChain retrieves the chain support for a chain (and whether it exists)
131+
func (ml *multiLedger) GetChain(chainID []byte) (ChainSupport, bool) {
132+
cs, ok := ml.chains[string(chainID)]
133+
return cs, ok
134+
}
135+
136+
func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter) {
137+
policyManager := policies.NewManagerImpl(xxxCryptoHelper{})
138+
configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler)
139+
for ctype := range cb.ConfigurationItem_ConfigurationType_name {
140+
rtype := cb.ConfigurationItem_ConfigurationType(ctype)
141+
switch rtype {
142+
case cb.ConfigurationItem_Policy:
143+
configHandlerMap[rtype] = policyManager
144+
default:
145+
configHandlerMap[rtype] = configtx.NewBytesHandler()
146+
}
147+
}
148+
149+
payload := &cb.Payload{}
150+
err := proto.Unmarshal(configTx.Payload, payload)
151+
if err != nil {
152+
logger.Fatalf("Error unmarshaling a config transaction payload: %s", err)
153+
}
154+
155+
configEnvelope := &cb.ConfigurationEnvelope{}
156+
err = proto.Unmarshal(payload.Data, configEnvelope)
157+
if err != nil {
158+
logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err)
159+
}
160+
161+
configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap)
162+
if err != nil {
163+
logger.Fatalf("Error unpacking configuration transaction: %s", err)
164+
}
165+
166+
chainID := configManager.ChainID()
167+
168+
ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
169+
if err != nil {
170+
logger.Fatalf("Error getting ledger for %x", chainID)
171+
}
172+
173+
return configManager, policyManager, ledger
174+
}

0 commit comments

Comments
 (0)