Skip to content

Commit a552e22

Browse files
author
Jason Yellick
committed
[FAB-2552] Allow concurrent config proposals
https://jira.hyperledger.org/browse/FAB-2552 The original config proposal framework was designed under the idea that only one committable proposal was ever given at a time. However, because of the need to support partial updates, there are additional situations where a commit-like path needs to be performed for the config concurrently with other commit-like paths. This CR adds an internal tx identifier to configuration proposals to allow for the safe concurrent processing of configuration. Change-Id: I9f8cb5eb8a6c426fb930591792781e1159f8e601 Signed-off-by: Jason Yellick <[email protected]>
1 parent bcb9259 commit a552e22

19 files changed

+265
-172
lines changed

common/cauthdsl/policy_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ func makePolicySource(policyResult bool) *cb.Policy {
5757
}
5858

5959
func addPolicy(manager policies.Proposer, id string, policy *cb.Policy) {
60-
manager.BeginPolicyProposals(nil)
61-
err := manager.ProposePolicy(id, &cb.ConfigPolicy{
60+
manager.BeginPolicyProposals(id, nil)
61+
err := manager.ProposePolicy(id, id, &cb.ConfigPolicy{
6262
Policy: policy,
6363
})
6464
if err != nil {
6565
panic(err)
6666
}
67-
manager.CommitProposals()
67+
manager.CommitProposals(id)
6868
}
6969

7070
func providerMap() map[int32]policies.Provider {

common/config/api.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,19 @@ type Orderer interface {
8686

8787
type ValueProposer interface {
8888
// BeginValueProposals called when a config proposal is begun
89-
BeginValueProposals(groups []string) ([]ValueProposer, error)
89+
BeginValueProposals(tx interface{}, groups []string) ([]ValueProposer, error)
9090

9191
// ProposeValue called when config is added to a proposal
92-
ProposeValue(key string, configValue *cb.ConfigValue) error
92+
ProposeValue(tx interface{}, key string, configValue *cb.ConfigValue) error
9393

9494
// RollbackProposals called when a config proposal is abandoned
95-
RollbackProposals()
95+
RollbackProposals(tx interface{})
9696

9797
// PreCommit is invoked before committing the config to catch
9898
// any errors which cannot be caught on a per proposal basis
9999
// TODO, rename other methods to remove Value/Proposal references
100-
PreCommit() error
100+
PreCommit(tx interface{}) error
101101

102102
// CommitProposals called when a config proposal is committed
103-
CommitProposals()
103+
CommitProposals(tx interface{})
104104
}

common/config/application.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func NewApplicationConfig(ag *ApplicationGroup) *ApplicationConfig {
7474
}
7575
}
7676

77-
func (ac *ApplicationConfig) Validate(groups map[string]ValueProposer) error {
77+
func (ac *ApplicationConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
7878
ac.applicationOrgs = make(map[string]ApplicationOrg)
7979
var ok bool
8080
for key, value := range groups {

common/config/applicationorg.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ func NewApplicationOrgConfig(aog *ApplicationOrgGroup) *ApplicationOrgConfig {
8686
return aoc
8787
}
8888

89-
func (aoc *ApplicationOrgConfig) Validate(groups map[string]ValueProposer) error {
89+
func (aoc *ApplicationOrgConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
9090
if logger.IsEnabledFor(logging.DEBUG) {
9191
logger.Debugf("Anchor peers for org %s are %v", aoc.applicationOrgGroup.name, aoc.protos.AnchorPeers)
9292
}
93-
return aoc.OrganizationConfig.Validate(groups)
93+
return aoc.OrganizationConfig.Validate(tx, groups)
9494
}

common/config/channel.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (cc *ChannelConfig) OrdererAddresses() []string {
159159

160160
// Validate inspects the generated configuration protos, ensures that the values are correct, and
161161
// sets the ChannelConfig fields that may be referenced after Commit
162-
func (cc *ChannelConfig) Validate(groups map[string]ValueProposer) error {
162+
func (cc *ChannelConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
163163
for _, validator := range []func() error{
164164
cc.validateHashingAlgorithm,
165165
cc.validateBlockDataHashingStructure,

common/config/msp/config.go

+51-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package msp
1919
import (
2020
"fmt"
2121
"reflect"
22+
"sync"
2223

2324
"github.com/hyperledger/fabric/msp"
2425
mspprotos "github.com/hyperledger/fabric/protos/msp"
@@ -36,37 +37,59 @@ type mspConfigStore struct {
3637

3738
// MSPConfigHandler
3839
type MSPConfigHandler struct {
39-
pendingConfig *mspConfigStore
40+
pendingConfig map[interface{}]*mspConfigStore
41+
pendingLock sync.RWMutex
4042
msp.MSPManager
4143
}
4244

45+
func NewMSPConfigHandler() *MSPConfigHandler {
46+
return &MSPConfigHandler{
47+
pendingConfig: make(map[interface{}]*mspConfigStore),
48+
}
49+
}
50+
4351
// BeginConfig called when a config proposal is begun
44-
func (bh *MSPConfigHandler) BeginConfig() {
45-
if bh.pendingConfig != nil {
46-
panic("Programming error, called BeginValueProposals while a proposal was in process")
52+
func (bh *MSPConfigHandler) BeginConfig(tx interface{}) {
53+
bh.pendingLock.Lock()
54+
defer bh.pendingLock.Unlock()
55+
_, ok := bh.pendingConfig[tx]
56+
if ok {
57+
panic("Programming error, called BeginConfig mulitply for the same tx")
4758
}
48-
bh.pendingConfig = &mspConfigStore{
59+
bh.pendingConfig[tx] = &mspConfigStore{
4960
idMap: make(map[string]*pendingMSPConfig),
5061
}
5162
}
5263

5364
// RollbackProposals called when a config proposal is abandoned
54-
func (bh *MSPConfigHandler) RollbackProposals() {
55-
bh.pendingConfig = nil
65+
func (bh *MSPConfigHandler) RollbackProposals(tx interface{}) {
66+
bh.pendingLock.Lock()
67+
defer bh.pendingLock.Unlock()
68+
delete(bh.pendingConfig, tx)
5669
}
5770

5871
// CommitProposals called when a config proposal is committed
59-
func (bh *MSPConfigHandler) CommitProposals() {
60-
if bh.pendingConfig == nil {
61-
panic("Programming error, called CommitProposals with no proposal in process")
72+
func (bh *MSPConfigHandler) CommitProposals(tx interface{}) {
73+
bh.pendingLock.Lock()
74+
defer bh.pendingLock.Unlock()
75+
pendingConfig, ok := bh.pendingConfig[tx]
76+
if !ok {
77+
panic("Programming error, called BeginConfig mulitply for the same tx")
6278
}
6379

64-
bh.MSPManager = bh.pendingConfig.proposedMgr
65-
bh.pendingConfig = nil
80+
bh.MSPManager = pendingConfig.proposedMgr
81+
delete(bh.pendingConfig, tx)
6682
}
6783

6884
// ProposeValue called when config is added to a proposal
69-
func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP, error) {
85+
func (bh *MSPConfigHandler) ProposeMSP(tx interface{}, mspConfig *mspprotos.MSPConfig) (msp.MSP, error) {
86+
bh.pendingLock.RLock()
87+
pendingConfig, ok := bh.pendingConfig[tx]
88+
bh.pendingLock.RUnlock()
89+
if !ok {
90+
panic("Programming error, called BeginConfig mulitply for the same tx")
91+
}
92+
7093
// check that the type for that MSP is supported
7194
if mspConfig.Type != int32(msp.FABRIC) {
7295
return nil, fmt.Errorf("Setup error: unsupported msp type %d", mspConfig.Type)
@@ -90,12 +113,12 @@ func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP,
90113
return nil, fmt.Errorf("Could not extract msp identifier, err %s", err)
91114
}
92115

93-
existingPendingMSPConfig, ok := bh.pendingConfig.idMap[mspID]
116+
existingPendingMSPConfig, ok := pendingConfig.idMap[mspID]
94117
if ok && !reflect.DeepEqual(existingPendingMSPConfig.mspConfig, mspConfig) {
95118
return nil, fmt.Errorf("Attempted to define two different versions of MSP: %s", mspID)
96119
}
97120

98-
bh.pendingConfig.idMap[mspID] = &pendingMSPConfig{
121+
pendingConfig.idMap[mspID] = &pendingMSPConfig{
99122
mspConfig: mspConfig,
100123
msp: mspInst,
101124
}
@@ -104,20 +127,27 @@ func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP,
104127
}
105128

106129
// PreCommit instantiates the MSP manager
107-
func (bh *MSPConfigHandler) PreCommit() error {
108-
if len(bh.pendingConfig.idMap) == 0 {
130+
func (bh *MSPConfigHandler) PreCommit(tx interface{}) error {
131+
bh.pendingLock.RLock()
132+
pendingConfig, ok := bh.pendingConfig[tx]
133+
bh.pendingLock.RUnlock()
134+
if !ok {
135+
panic("Programming error, called PreCommit for tx which was not started")
136+
}
137+
138+
if len(pendingConfig.idMap) == 0 {
109139
// Cannot instantiate an MSP manager with no MSPs
110140
return nil
111141
}
112142

113-
mspList := make([]msp.MSP, len(bh.pendingConfig.idMap))
143+
mspList := make([]msp.MSP, len(pendingConfig.idMap))
114144
i := 0
115-
for _, pendingMSP := range bh.pendingConfig.idMap {
145+
for _, pendingMSP := range pendingConfig.idMap {
116146
mspList[i] = pendingMSP.msp
117147
i++
118148
}
119149

120-
bh.pendingConfig.proposedMgr = msp.NewMSPManager()
121-
err := bh.pendingConfig.proposedMgr.Setup(mspList)
150+
pendingConfig.proposedMgr = msp.NewMSPManager()
151+
err := pendingConfig.proposedMgr.Setup(mspList)
122152
return err
123153
}

common/config/msp/config_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ func TestMSPConfigManager(t *testing.T) {
3131
// test success:
3232

3333
// begin/propose/commit
34-
mspCH := &MSPConfigHandler{}
35-
mspCH.BeginConfig()
36-
_, err = mspCH.ProposeMSP(conf)
34+
mspCH := NewMSPConfigHandler()
35+
mspCH.BeginConfig(t)
36+
_, err = mspCH.ProposeMSP(t, conf)
3737
assert.NoError(t, err)
38-
mspCH.PreCommit()
39-
mspCH.CommitProposals()
38+
mspCH.PreCommit(t)
39+
mspCH.CommitProposals(t)
4040

4141
msps, err := mspCH.GetMSPs()
4242
assert.NoError(t, err)
@@ -47,9 +47,9 @@ func TestMSPConfigManager(t *testing.T) {
4747

4848
// test failure
4949
// begin/propose/commit
50-
mspCH.BeginConfig()
51-
_, err = mspCH.ProposeMSP(conf)
50+
mspCH.BeginConfig(t)
51+
_, err = mspCH.ProposeMSP(t, conf)
5252
assert.NoError(t, err)
53-
_, err = mspCH.ProposeMSP(&mspprotos.MSPConfig{Config: []byte("BARF!")})
53+
_, err = mspCH.ProposeMSP(t, &mspprotos.MSPConfig{Config: []byte("BARF!")})
5454
assert.Error(t, err)
5555
}

common/config/orderer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (oc *OrdererConfig) KafkaBrokers() []string {
142142
return oc.protos.KafkaBrokers.Brokers
143143
}
144144

145-
func (oc *OrdererConfig) Validate(groups map[string]ValueProposer) error {
145+
func (oc *OrdererConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
146146
for _, validator := range []func() error{
147147
oc.validateConsensusType,
148148
oc.validateBatchSize,

common/config/organization.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,19 @@ func NewOrganizationConfig(og *OrganizationGroup) *OrganizationConfig {
9898
}
9999

100100
// Validate returns whether the configuration is valid
101-
func (oc *OrganizationConfig) Validate(groups map[string]ValueProposer) error {
102-
return oc.validateMSP()
101+
func (oc *OrganizationConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
102+
return oc.validateMSP(tx)
103103
}
104104

105105
func (oc *OrganizationConfig) Commit() {
106106
oc.organizationGroup.OrganizationConfig = oc
107107
}
108108

109-
func (oc *OrganizationConfig) validateMSP() error {
109+
func (oc *OrganizationConfig) validateMSP(tx interface{}) error {
110110
var err error
111111

112112
logger.Debugf("Setting up MSP for org %s", oc.organizationGroup.name)
113-
oc.msp, err = oc.organizationGroup.mspConfigHandler.ProposeMSP(oc.protos.MSP)
113+
oc.msp, err = oc.organizationGroup.mspConfigHandler.ProposeMSP(tx, oc.protos.MSP)
114114
if err != nil {
115115
return err
116116
}

0 commit comments

Comments
 (0)