Skip to content

Commit da5effe

Browse files
committed
FAB-1292 Gossip pull refactoring: Mediator
This commit introduces: 1) A pull.Mediator, an object that offloads handling of messages and implementation of algo.PullAdapter from the rest of the gossip code, so that it can be used for any type of message we want to sync using the pull mechanism. 2) A certStore object, that holds certificates that are going to be replicated using the pull mechanism. 3) An addition of an identity message which is a certificate that's replicated among peers. Change-Id: Ia7682c3c9874ee40fa3588706239f914f1e334fb Signed-off-by: Yacov Manevich <[email protected]>
1 parent 82d6870 commit da5effe

13 files changed

+1141
-268
lines changed

gossip/comm/comm_impl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (c *commImpl) isStopping() bool {
254254

255255
func (c *commImpl) Probe(peer *RemotePeer) error {
256256
if c.isStopping() {
257-
return fmt.Errorf("Stopping!")
257+
return fmt.Errorf("Stopping")
258258
}
259259
c.logger.Debug("Entering, endpoint:", peer.Endpoint, "PKIID:", peer.PKIID)
260260
var err error

gossip/comm/comm_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestBasic(t *testing.T) {
173173
m2 := comm2.Accept(acceptAll)
174174
out := make(chan uint64, 2)
175175
reader := func(ch <-chan ReceivedMessage) {
176-
m := <- ch
176+
m := <-ch
177177
out <- m.GetGossipMessage().Nonce
178178
}
179179
go reader(m1)

gossip/gossip/certstore.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package gossip
18+
19+
import (
20+
"sync"
21+
22+
prot "github.com/golang/protobuf/proto"
23+
"github.com/hyperledger/fabric/gossip/api"
24+
"github.com/hyperledger/fabric/gossip/comm"
25+
"github.com/hyperledger/fabric/gossip/common"
26+
"github.com/hyperledger/fabric/gossip/gossip/pull"
27+
"github.com/hyperledger/fabric/gossip/identity"
28+
"github.com/hyperledger/fabric/gossip/proto"
29+
"github.com/hyperledger/fabric/gossip/util"
30+
)
31+
32+
// certStore supports pull dissemination of identity messages
33+
type certStore struct {
34+
sync.RWMutex
35+
selfIdentity api.PeerIdentityType
36+
idMapper identity.Mapper
37+
pull pull.Mediator
38+
logger *util.Logger
39+
}
40+
41+
func newCertStore(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, pullMed pull.Mediator) *certStore {
42+
certStore := &certStore{
43+
idMapper: identity.NewIdentityMapper(mcs),
44+
selfIdentity: selfIdentity,
45+
pull: pullMed,
46+
}
47+
48+
selfPKIID := certStore.idMapper.GetPKIidOfCert(selfIdentity)
49+
50+
certStore.logger = util.GetLogger("certStore", string(selfPKIID))
51+
52+
if err := certStore.idMapper.Put(selfPKIID, selfIdentity); err != nil {
53+
certStore.logger.Panic("Failed associating self PKIID to cert:", err)
54+
}
55+
56+
pullMed.Add(certStore.createIdentityMessage())
57+
58+
pullMed.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ comm.ReceivedMessage) {
59+
for _, msg := range msgs {
60+
pkiID := common.PKIidType(msg.GetPeerIdentity().PkiID)
61+
cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert)
62+
if err := certStore.idMapper.Put(pkiID, cert); err != nil {
63+
certStore.logger.Warning("Failed adding identity", cert, ", reason:", err)
64+
}
65+
}
66+
})
67+
68+
return certStore
69+
}
70+
71+
func (cs *certStore) createIdentityMessage() *proto.GossipMessage {
72+
identity := &proto.PeerIdentity{
73+
Cert: cs.selfIdentity,
74+
Metadata: nil,
75+
PkiID: cs.idMapper.GetPKIidOfCert(cs.selfIdentity),
76+
Sig: nil,
77+
}
78+
79+
b, err := prot.Marshal(identity)
80+
if err != nil {
81+
cs.logger.Warning("Failed marshalling identity message:", err)
82+
return nil
83+
}
84+
85+
sig, err := cs.idMapper.Sign(b)
86+
if err != nil {
87+
cs.logger.Warning("Failed signing identity message:", err)
88+
return nil
89+
}
90+
identity.Sig = sig
91+
92+
return &proto.GossipMessage{
93+
Channel: nil,
94+
Nonce: 0,
95+
Tag: proto.GossipMessage_EMPTY,
96+
Content: &proto.GossipMessage_PeerIdentity{
97+
PeerIdentity: identity,
98+
},
99+
}
100+
}
101+
102+
func (cs *certStore) stop() {
103+
cs.pull.Stop()
104+
}

0 commit comments

Comments
 (0)