Skip to content

Commit 282ed86

Browse files
committed
Add AnchorPeers to ConfigurationBlock
This commit: 1) Adds anchor peers upon channel creation 2) Extracts them at the peer side in the gossip layer and uses them. 3) Changes an internal API of a method in gossip from GetTimestamp to SequenceNumber, because using sequence numbers is better than using timestamps, that are machine-specific in contrast to sequence numbers which are global. Signed-off-by: Yacov Manevich <[email protected]> Change-Id: Id080585a56a5083d9cd4911ce790e5be389cfa52
1 parent 076923e commit 282ed86

File tree

12 files changed

+380
-45
lines changed

12 files changed

+380
-45
lines changed

common/configtx/test/helper.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ func init() {
6262
}
6363

6464
template = configtx.NewSimpleTemplate(templateProto.Items...)
65-
genesisFactory = genesis.NewFactoryImpl(configtx.NewCompositeTemplate(MSPTemplate{}, template))
65+
gossTemplate := configtx.NewSimpleTemplate(utils.EncodeAnchorPeers())
66+
genesisFactory = genesis.NewFactoryImpl(configtx.NewCompositeTemplate(MSPTemplate{}, template, gossTemplate))
6667
}
6768

6869
func MakeGenesisBlock(chainID string) (*cb.Block, error) {

core/peer/peer.go

+3
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
152152
return err
153153
}
154154

155+
// TODO This should be called from a configtx.Manager but it's not
156+
// implemented yet. When it will be, this needs to move there,
157+
// and the inner fields (AnchorPeers) only should be passed to this.
155158
if err := service.GetGossipService().JoinChannel(c, cb); err != nil {
156159
return err
157160
}

gossip/api/channel.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package api
1818

1919
import (
20-
"time"
21-
2220
"github.com/hyperledger/fabric/gossip/common"
2321
)
2422

@@ -47,8 +45,9 @@ type ChannelNotifier interface {
4745
// among the peers
4846
type JoinChannelMessage interface {
4947

50-
// GetTimestamp returns the timestamp of the message's creation
51-
GetTimestamp() time.Time
48+
// SequenceNumber returns the sequence number of the configuration block
49+
// the JoinChannelMessage originated from
50+
SequenceNumber() uint64
5251

5352
// AnchorPeers returns all the anchor peers that are in the channel
5453
AnchorPeers() []AnchorPeer

gossip/gossip/channel/channel.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
329329
gc.joinMsg = joinMsg
330330
}
331331

332-
if gc.joinMsg.GetTimestamp().After(joinMsg.GetTimestamp()) {
333-
gc.logger.Warning("Already have a more updated JoinChannel message(", gc.joinMsg.GetTimestamp(), ") than", gc.joinMsg.GetTimestamp())
332+
if gc.joinMsg.SequenceNumber() > (joinMsg.SequenceNumber()) {
333+
gc.logger.Warning("Already have a more updated JoinChannel message(", gc.joinMsg.SequenceNumber(), ") than", gc.joinMsg.SequenceNumber())
334334
return
335335
}
336336
orgs := []api.OrgIdentityType{}

gossip/gossip/channel/channel_test.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,14 @@ type joinChanMsg struct {
6868
anchorPeers func() []api.AnchorPeer
6969
}
7070

71-
// GetTimestamp returns the timestamp of the message's creation
72-
func (jcm *joinChanMsg) GetTimestamp() time.Time {
71+
// SequenceNumber returns the sequence number of the block
72+
// this joinChanMsg was derived from.
73+
// I use timestamps here just for the test.
74+
func (jcm *joinChanMsg) SequenceNumber() uint64 {
7375
if jcm.getTS != nil {
74-
return jcm.getTS()
76+
return uint64(jcm.getTS().UnixNano())
7577
}
76-
return time.Now()
78+
return uint64(time.Now().UnixNano())
7779
}
7880

7981
// AnchorPeers returns all the anchor peers that are in the channel

gossip/gossip/gossip_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ func acceptData(m interface{}) bool {
6565
type joinChanMsg struct {
6666
}
6767

68-
// GetTimestamp returns the timestamp of the message's creation
69-
func (*joinChanMsg) GetTimestamp() time.Time {
70-
return time.Now()
68+
// SequenceNumber returns the sequence number of the block this joinChanMsg
69+
// is derived from
70+
func (*joinChanMsg) SequenceNumber() uint64 {
71+
return uint64(time.Now().UnixNano())
7172
}
7273

7374
// AnchorPeers returns all the anchor peers that are in the channel

gossip/service/channel.go

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package service
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/hyperledger/fabric/gossip/api"
23+
"github.com/hyperledger/fabric/protos/common"
24+
"github.com/hyperledger/fabric/protos/peer"
25+
"github.com/golang/protobuf/proto"
26+
"github.com/hyperledger/fabric/protos/utils"
27+
)
28+
29+
// unMarshal is used to un-marshall proto-buffer types.
30+
// In tests, I override this variable with a function
31+
var unMarshal func (buf []byte, pb proto.Message) error
32+
33+
func init() {
34+
unMarshal = proto.Unmarshal
35+
}
36+
37+
// This is an implementation of api.JoinChannelMessage.
38+
// This object is created from a *common.Block
39+
type joinChannelMessage struct {
40+
seqNum uint64
41+
anchorPeers []api.AnchorPeer
42+
}
43+
44+
// JoinChannelMessageFromBlock returns an api.JoinChannelMessage, nil
45+
// or nil, error if the block is not a valid channel configuration block
46+
func JoinChannelMessageFromBlock(block *common.Block) (api.JoinChannelMessage, error) {
47+
anchorPeers, err := parseBlock(block)
48+
if err != nil {
49+
return nil, err
50+
}
51+
jcm := &joinChannelMessage{seqNum: block.Header.Number, anchorPeers: []api.AnchorPeer{}}
52+
for _, ap := range anchorPeers.AnchorPees {
53+
anchorPeer := api.AnchorPeer{
54+
Host: ap.Host,
55+
Port: int(ap.Port),
56+
Cert: api.PeerIdentityType(ap.Cert),
57+
}
58+
jcm.anchorPeers = append(jcm.anchorPeers, anchorPeer)
59+
}
60+
return jcm, nil
61+
}
62+
63+
// parseBlock parses a configuration block, and returns error
64+
// if it's not a valid channel configuration block
65+
func parseBlock(block *common.Block) (*peer.AnchorPeers, error) {
66+
confEnvelope, err := extractConfigurationEnvelope(block)
67+
if err != nil {
68+
return nil, err
69+
}
70+
// Find anchor peer configuration
71+
found := false
72+
var anchorPeerConfig *common.ConfigurationItem
73+
for _, item := range confEnvelope.Items {
74+
rawConfItem := item.ConfigurationItem
75+
confItem := &common.ConfigurationItem{}
76+
if err := unMarshal(rawConfItem, confItem); err != nil {
77+
return nil, fmt.Errorf("Failed unmarshalling configuration item")
78+
}
79+
if confItem.Header.Type != int32(common.HeaderType_CONFIGURATION_ITEM) {
80+
continue
81+
}
82+
if confItem.Type != common.ConfigurationItem_Peer {
83+
continue
84+
}
85+
if confItem.Key != utils.AnchorPeerConfItemKey {
86+
continue
87+
}
88+
if found {
89+
return nil, fmt.Errorf("Found multiple definition of AnchorPeers instead of a single one")
90+
}
91+
found = true
92+
anchorPeerConfig = confItem
93+
}
94+
if ! found {
95+
return nil, fmt.Errorf("Didn't find AnchorPeer definition in configuration block")
96+
}
97+
rawAnchorPeersBytes := anchorPeerConfig.Value
98+
anchorPeers := &peer.AnchorPeers{}
99+
if err := unMarshal(rawAnchorPeersBytes, anchorPeers); err != nil {
100+
return nil, fmt.Errorf("Failed deserializing anchor peers from configuration item")
101+
}
102+
if len(anchorPeers.AnchorPees) == 0 {
103+
return nil, fmt.Errorf("AnchorPeers field in configuration block was found, but is empty")
104+
}
105+
return anchorPeers, nil
106+
}
107+
108+
// extractConfigurationEnvelope extracts the configuration envelope from a block,
109+
// or returns nil and an error if extraction fails
110+
func extractConfigurationEnvelope(block *common.Block) (*common.ConfigurationEnvelope, error) {
111+
if block.Header == nil {
112+
return nil, fmt.Errorf("Block header is empty")
113+
}
114+
if block.Data == nil {
115+
return nil, fmt.Errorf("Block data is empty")
116+
}
117+
env := &common.Envelope{}
118+
if len(block.Data.Data) == 0 {
119+
return nil, fmt.Errorf("Empty data in block")
120+
}
121+
if len(block.Data.Data) != 1 {
122+
return nil, fmt.Errorf("More than 1 transaction in a block")
123+
}
124+
if err := unMarshal(block.Data.Data[0], env); err != nil {
125+
return nil, fmt.Errorf("Failed unmarshalling envelope from block: %v", err)
126+
}
127+
payload := &common.Payload{}
128+
if err := unMarshal(env.Payload, payload); err != nil {
129+
return nil, fmt.Errorf("Failed unmarshalling payload from envelope: %v", err)
130+
}
131+
confEnvelope := &common.ConfigurationEnvelope{}
132+
if err := unMarshal(payload.Data, confEnvelope); err != nil {
133+
return nil, fmt.Errorf("Failed unmarshalling configuration envelope from payload: %v", err)
134+
}
135+
if len(confEnvelope.Items) == 0 {
136+
return nil, fmt.Errorf("Empty configuration envelope, no items detected")
137+
}
138+
return confEnvelope, nil
139+
}
140+
141+
func (jcm *joinChannelMessage) SequenceNumber() uint64 {
142+
return jcm.seqNum
143+
}
144+
145+
func (jcm *joinChannelMessage) AnchorPeers() []api.AnchorPeer {
146+
return jcm.anchorPeers
147+
}

0 commit comments

Comments
 (0)