Skip to content

Commit 742443e

Browse files
committed
FAB-872 Multichannel support: message extension
https://jira.hyperledger.org/browse/FAB-872 This commit extends the gossip message to introduce 2 fields: 1) Channel - obviously, since we'll do per-channel routing 2) Tag - This is for disseminating information inside an organization, and for specifying per-channel routing is enabled This commit also refactors the code related to the gossip internal message store comparison. Last but not least- since state transfer is going to be per channel, the commit introduces a new message type - StateInfo that'll be used for the state transfer mechanism but per-channel and not globally. Change-Id: Ibba0b5494ef4ef35420e2c6b09c058f17e56db0c Signed-off-by: Yacov Manevich <[email protected]>
1 parent f4dcb08 commit 742443e

13 files changed

+576
-320
lines changed

gossip/comm/comm_impl.go

+1
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMes
488488

489489
func createConnectionMsg(pkiID common.PKIidType, sig []byte) *proto.GossipMessage {
490490
return &proto.GossipMessage{
491+
Tag: proto.GossipMessage_EMPTY,
491492
Nonce: 0,
492493
Content: &proto.GossipMessage_Conn{
493494
Conn: &proto.ConnEstablish{

gossip/comm/comm_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ import (
2626

2727
"crypto/tls"
2828

29+
"github.com/hyperledger/fabric/gossip/common"
2930
"github.com/hyperledger/fabric/gossip/proto"
3031
"github.com/stretchr/testify/assert"
3132
"golang.org/x/net/context"
3233
"google.golang.org/grpc"
3334
"google.golang.org/grpc/credentials"
34-
"github.com/hyperledger/fabric/gossip/common"
3535
)
3636

3737
func init() {
@@ -465,6 +465,7 @@ func TestPresumedDead(t *testing.T) {
465465

466466
func createGossipMsg() *proto.GossipMessage {
467467
return &proto.GossipMessage{
468+
Tag: proto.GossipMessage_EMPTY,
468469
Nonce: uint64(rand.Int()),
469470
Content: &proto.GossipMessage_DataMsg{
470471
DataMsg: &proto.DataMessage{},

gossip/comm/conn.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
"sync"
2222
"sync/atomic"
2323

24+
"github.com/hyperledger/fabric/gossip/common"
2425
"github.com/hyperledger/fabric/gossip/proto"
2526
"github.com/hyperledger/fabric/gossip/util"
2627
"google.golang.org/grpc"
27-
"github.com/hyperledger/fabric/gossip/common"
2828
)
2929

3030
type handler func(*proto.GossipMessage)
@@ -35,7 +35,7 @@ type connFactory interface {
3535

3636
type connectionStore struct {
3737
logger *util.Logger // logger
38-
selfPKIid common.PKIidType // pkiID of this peer
38+
selfPKIid common.PKIidType // pkiID of this peer
3939
isClosing bool // whether this connection store is shutting down
4040
connFactory connFactory // creates a connection to remote peer
4141
sync.RWMutex // synchronize access to shared variables
@@ -200,7 +200,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
200200
type connection struct {
201201
outBuff chan *msgSending
202202
logger *util.Logger // logger
203-
pkiID common.PKIidType // pkiID of the remote endpoint
203+
pkiID common.PKIidType // pkiID of the remote endpoint
204204
handler handler // function to invoke upon a message reception
205205
conn *grpc.ClientConn // gRPC connection to remote endpoint
206206
cl proto.GossipClient // gRPC stub of remote endpoint

gossip/common/common.go

+19
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,22 @@ type Payload struct {
3535

3636
// ChainID defines the identity representation of a chain
3737
type ChainID []byte
38+
39+
// MessageReplacingPolicy Returns:
40+
// MESSAGE_INVALIDATES if this message invalidates that
41+
// MESSAGE_INVALIDATED if this message is invalidated by that
42+
// MESSAGE_NO_ACTION otherwise
43+
type MessageReplacingPolicy func(this interface{}, that interface{}) InvalidationResult
44+
45+
// InvalidationResult determines how a message affects another message
46+
// when it is put into gossip message store
47+
type InvalidationResult int
48+
49+
const (
50+
// MessageNoAction means messages have no relation
51+
MessageNoAction = iota
52+
// MessageInvalidates means message invalidates the other message
53+
MessageInvalidates
54+
// MessageInvalidated means message is invalidated by the other message
55+
MessageInvalidated
56+
)

gossip/discovery/discovery_impl.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"sync/atomic"
2424
"time"
2525

26+
"github.com/hyperledger/fabric/gossip/common"
2627
"github.com/hyperledger/fabric/gossip/proto"
2728
"github.com/hyperledger/fabric/gossip/util"
2829
"github.com/op/go-logging"
29-
"github.com/hyperledger/fabric/gossip/common"
3030
)
3131

3232
const defaultHelloInterval = time.Duration(5) * time.Second
@@ -285,6 +285,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]by
285285
Metadata: member.Metadata,
286286
PKIid: member.PkiID,
287287
}, &proto.GossipMessage{
288+
Tag: proto.GossipMessage_EMPTY,
288289
Nonce: uint64(0),
289290
Content: &proto.GossipMessage_MemRes{
290291
MemRes: memResp,
@@ -455,6 +456,7 @@ func (d *gossipDiscoveryImpl) createMembershipRequest() *proto.GossipMessage {
455456
Known: d.getKnownPeers(),
456457
}
457458
return &proto.GossipMessage{
459+
Tag: proto.GossipMessage_EMPTY,
458460
Nonce: uint64(0),
459461
Content: &proto.GossipMessage_MemReq{
460462
MemReq: req,
@@ -559,6 +561,7 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
559561
d.logger.Debug("Sleeping", aliveTimeInterval)
560562
time.Sleep(aliveTimeInterval)
561563
msg2Gossip := &proto.GossipMessage{
564+
Tag: proto.GossipMessage_EMPTY,
562565
Content: &proto.GossipMessage_AliveMsg{AliveMsg: d.createAliveMessage()},
563566
}
564567
d.comm.Gossip(msg2Gossip)

gossip/discovery/discovery_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import (
2525
"testing"
2626
"time"
2727

28+
"github.com/hyperledger/fabric/gossip/common"
2829
"github.com/hyperledger/fabric/gossip/proto"
2930
"github.com/op/go-logging"
3031
"github.com/stretchr/testify/assert"
3132
"golang.org/x/net/context"
3233
"google.golang.org/grpc"
33-
"github.com/hyperledger/fabric/gossip/common"
3434
)
3535

3636
var timeout = time.Second * time.Duration(15)

gossip/gossip/gossip_impl.go

+14-60
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ import (
3131
"github.com/op/go-logging"
3232
)
3333

34+
const (
35+
presumedDeadChanSize = 100
36+
acceptChanSize = 100
37+
)
38+
3439
type gossipServiceImpl struct {
3540
presumedDead chan common.PKIidType
3641
disc discovery.Discovery
@@ -51,7 +56,7 @@ type gossipServiceImpl struct {
5156
// NewGossipService creates a new gossip instance
5257
func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Gossip {
5358
g := &gossipServiceImpl{
54-
presumedDead: make(chan common.PKIidType, 100),
59+
presumedDead: make(chan common.PKIidType, presumedDeadChanSize),
5560
disc: nil,
5661
comm: c,
5762
conf: conf,
@@ -75,7 +80,7 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService)
7580

7681
g.pushPull = algo.NewPullEngine(g, conf.PullInterval)
7782

78-
g.msgStore = newMessageStore(g.invalidationPolicy, func(m interface{}) {
83+
g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) {
7984
if dataMsg, isDataMsg := m.(*proto.DataMessage); isDataMsg {
8085
g.pushPull.Remove(dataMsg.Payload.SeqNum)
8186
}
@@ -92,61 +97,6 @@ func (g *gossipServiceImpl) toDie() bool {
9297
return atomic.LoadInt32(&g.stopFlag) == int32(1)
9398
}
9499

95-
func (g *gossipServiceImpl) invalidationPolicy(this interface{}, that interface{}) invalidationResult {
96-
thisMsg := this.(*proto.GossipMessage)
97-
thatMsg := that.(*proto.GossipMessage)
98-
thisAliveMsg, thisIsAliveMessage := thisMsg.GetAliveMsg(), thisMsg.GetAliveMsg() != nil
99-
thatAliveMsg, thatIsAliveMessage := thatMsg.GetAliveMsg(), thatMsg.GetAliveMsg() != nil
100-
101-
if thisIsAliveMessage && thatIsAliveMessage {
102-
return aliveInvalidationPolicy(thisAliveMsg, thatAliveMsg)
103-
}
104-
105-
thisDataMsg, thisIsDataMessage := thisMsg.GetDataMsg(), thisMsg.GetDataMsg() != nil
106-
thatDataMsg, thatIsDataMessage := thatMsg.GetDataMsg(), thatMsg.GetDataMsg() != nil
107-
108-
if thisIsDataMessage && thatIsDataMessage {
109-
if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum {
110-
if thisDataMsg.Payload.Hash == thatDataMsg.Payload.Hash {
111-
return messageInvalidated
112-
}
113-
return messageNoAction
114-
}
115-
116-
diff := util.Abs(thisDataMsg.Payload.SeqNum, thatDataMsg.Payload.SeqNum)
117-
if diff <= uint64(g.conf.MaxMessageCountToStore) {
118-
return messageNoAction
119-
}
120-
121-
if thisDataMsg.Payload.SeqNum > thatDataMsg.Payload.SeqNum {
122-
return messageInvalidates
123-
}
124-
return messageInvalidated
125-
}
126-
return messageNoAction
127-
}
128-
129-
func aliveInvalidationPolicy(thisMsg *proto.AliveMessage, thatMsg *proto.AliveMessage) invalidationResult {
130-
if !equalPKIIds(thisMsg.Membership.PkiID, thatMsg.Membership.PkiID) {
131-
return messageNoAction
132-
}
133-
134-
if thisMsg.Timestamp.IncNumber == thatMsg.Timestamp.IncNumber {
135-
if thisMsg.Timestamp.SeqNum > thatMsg.Timestamp.SeqNum {
136-
return messageInvalidates
137-
}
138-
139-
if thisMsg.Timestamp.SeqNum < thatMsg.Timestamp.SeqNum {
140-
return messageInvalidated
141-
}
142-
return messageInvalidated
143-
}
144-
if thisMsg.Timestamp.IncNumber < thatMsg.Timestamp.IncNumber {
145-
return messageInvalidated
146-
}
147-
return messageInvalidates
148-
}
149-
150100
func (g *gossipServiceImpl) handlePresumedDead() {
151101
defer g.logger.Debug("Exiting")
152102
g.stopSignal.Add(1)
@@ -183,11 +133,10 @@ func (g *gossipServiceImpl) start() {
183133
return false
184134
}
185135

186-
isAck := gMsg.GetGossipMessage().GetAckMsg() != nil
187136
isConn := gMsg.GetGossipMessage().GetConn() != nil
188137
isEmpty := gMsg.GetGossipMessage().GetEmpty() != nil
189138

190-
return !(isAck || isConn || isEmpty)
139+
return !(isConn || isEmpty)
191140
}
192141

193142
incMsgs := g.comm.Accept(msgSelector)
@@ -224,6 +173,7 @@ func (g *gossipServiceImpl) SelectPeers() []string {
224173

225174
func (g *gossipServiceImpl) Hello(dest string, nonce uint64) {
226175
helloMsg := &proto.GossipMessage{
176+
Tag: proto.GossipMessage_EMPTY,
227177
Nonce: 0,
228178
Content: &proto.GossipMessage_Hello{
229179
Hello: &proto.GossipHello{
@@ -239,6 +189,7 @@ func (g *gossipServiceImpl) Hello(dest string, nonce uint64) {
239189

240190
func (g *gossipServiceImpl) SendDigest(digest []uint64, nonce uint64, context interface{}) {
241191
digMsg := &proto.GossipMessage{
192+
Tag: proto.GossipMessage_EMPTY,
242193
Nonce: 0,
243194
Content: &proto.GossipMessage_DataDig{
244195
DataDig: &proto.DataDigest{
@@ -253,6 +204,7 @@ func (g *gossipServiceImpl) SendDigest(digest []uint64, nonce uint64, context in
253204

254205
func (g *gossipServiceImpl) SendReq(dest string, items []uint64, nonce uint64) {
255206
req := &proto.GossipMessage{
207+
Tag: proto.GossipMessage_EMPTY,
256208
Nonce: 0,
257209
Content: &proto.GossipMessage_DataReq{
258210
DataReq: &proto.DataRequest{
@@ -282,6 +234,7 @@ func (g *gossipServiceImpl) SendRes(requestedItems []uint64, context interface{}
282234
}
283235

284236
returnedUpdate := &proto.GossipMessage{
237+
Tag: proto.GossipMessage_EMPTY,
285238
Nonce: 0,
286239
Content: &proto.GossipMessage_DataUpdate{
287240
DataUpdate: &proto.DataUpdate{
@@ -355,6 +308,7 @@ func (g *gossipServiceImpl) handlePushPullMsg(msg comm.ReceivedMessage) {
355308
items := make([]uint64, len(res.Data))
356309
for i, data := range res.Data {
357310
dataMsg := &proto.GossipMessage{
311+
Tag: proto.GossipMessage_EMPTY,
358312
Content: &proto.GossipMessage_DataMsg{
359313
DataMsg: data,
360314
},
@@ -444,7 +398,7 @@ func (g *gossipServiceImpl) UpdateMetadata(md []byte) {
444398

445399
func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor) <-chan *proto.GossipMessage {
446400
inCh := g.AddChannel(acceptor)
447-
outCh := make(chan *proto.GossipMessage, 100)
401+
outCh := make(chan *proto.GossipMessage, acceptChanSize)
448402
go func() {
449403
for {
450404
select {

0 commit comments

Comments
 (0)