Skip to content

Commit 8a40a51

Browse files
committed
Pull algorithm module for Gossip component
The gossip protocol disseminates messages via push and pull. This commit contains the module that runs the pull protocol which is a separate, timer-based protocol. The patch also contains some gofmt changes Change-Id: I39797631756d76ccd90951af6ac812c1e956fa6b Signed-off-by: Yacov Manevich <[email protected]>
1 parent eefbf7c commit 8a40a51

File tree

5 files changed

+826
-14
lines changed

5 files changed

+826
-14
lines changed

gossip/discovery/discovery_impl.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ func NewDiscoveryService(bootstrapPeers []*NetworkMember, self NetworkMember, co
113113
go d.periodicalReconnectToDead()
114114
go d.handlePresumedDeadPeers()
115115

116-
117116
return d
118117
}
119118

@@ -132,18 +131,17 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
132131
k = n
133132
}
134133

135-
for _, i := range util.GetRandomIndices(k, n - 1) {
134+
for _, i := range util.GetRandomIndices(k, n-1) {
136135
pulledPeer := d.cachedMembership.Alive[i].Membership
137136
netMember := &NetworkMember{
138-
Id: pulledPeer.Id,
137+
Id: pulledPeer.Id,
139138
Endpoint: pulledPeer.Endpoint,
140139
Metadata: pulledPeer.Metadata,
141140
}
142141
d.comm.SendToPeer(netMember, memReq)
143142
}
144143
}
145144

146-
147145
func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() {
148146
d.stopSignal.Add(1)
149147
defer d.stopSignal.Done()
@@ -231,7 +229,7 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known []string) *proto.Me
231229
defer d.lock.RUnlock()
232230

233231
alivePeers := make([]*proto.AliveMessage, 0)
234-
deadPeers := make([]*proto.AliveMessage, 0)
232+
deadPeers := make([]*proto.AliveMessage, 0)
235233

236234
for _, am := range d.cachedMembership.Alive {
237235
isKnown := false
@@ -261,7 +259,7 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known []string) *proto.Me
261259
}
262260
}
263261

264-
return &proto.MembershipResponse {
262+
return &proto.MembershipResponse{
265263
Alive: append(alivePeers, aliveMsg),
266264
Dead: deadPeers,
267265
}

gossip/discovery/discovery_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
9292

9393
_, alreadyExists := comm.streams[peer.Id]
9494
if !alreadyExists {
95-
newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(500) * time.Millisecond))
95+
newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(500)*time.Millisecond))
9696
if err != nil {
9797
//fmt.Printf("Error dialing: to %v: %v\n",peer.Endpoint, err)
9898
return false
@@ -133,7 +133,7 @@ func (comm *dummyCommModule) CloseConn(id string) {
133133
comm.streams[id].CloseSend()
134134
comm.conns[id].Close()
135135

136-
delete(comm.streams,id)
136+
delete(comm.streams, id)
137137
delete(comm.conns, id)
138138
}
139139

gossip/gossip/algo/pull.go

+288
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package algo
18+
19+
import (
20+
"math/rand"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/hyperledger/fabric/gossip/util"
26+
)
27+
28+
/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items
29+
identified by uint64 numbers.
30+
The protocol is as follows:
31+
1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers.
32+
2) Each remote peer responds with a digest of its messages and returns that NONCE.
33+
3) The initiator checks the validity of the NONCEs received, aggregates the digests,
34+
and crafts a request containing specific item ids it wants to receive from each remote peer and then
35+
sends each request to its corresponding peer.
36+
4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE.
37+
38+
Other peer Initiator
39+
O <-------- Hello <NONCE> ------------------------- O
40+
/|\ --------- Digest <[3,5,8, 10...], NONCE> --------> /|\
41+
| <-------- Request <[3,8], NONCE> ----------------- |
42+
/ \ --------- Response <[item3, item8], NONCE>-------> / \
43+
44+
*/
45+
46+
const (
47+
DEF_DIGEST_WAIT_TIME = time.Duration(4) * time.Second
48+
DEF_REQUEST_WAIT_TIME = time.Duration(4) * time.Second
49+
DEF_RESPONSE_WAIT_TIME = time.Duration(7) * time.Second
50+
)
51+
52+
func init() {
53+
rand.Seed(42)
54+
}
55+
56+
var defaultDigestWaitTime = DEF_DIGEST_WAIT_TIME
57+
var defaultRequestWaitTime = DEF_REQUEST_WAIT_TIME
58+
var defaultResponseWaitTime = DEF_RESPONSE_WAIT_TIME
59+
60+
// PullAdapter is needed by the PullEngine in order to
61+
// send messages to the remote PullEngine instances.
62+
// The PullEngine expects to be invoked with
63+
// OnHello, OnDigest, OnReq, OnRes when the respective message arrives
64+
// from a remote PullEngine
65+
type PullAdapter interface {
66+
// SelectPeers returns a slice of peers which the engine will initiate the protocol with
67+
SelectPeers() []string
68+
69+
// Hello sends a hello message to initiate the protocol
70+
// and returns an NONCE that is expected to be returned
71+
// in the digest message.
72+
Hello(dest string, nonce uint64)
73+
74+
// SendDigest sends a digest to a remote PullEngine.
75+
// The context parameter specifies the remote engine to send to.
76+
SendDigest(digest []uint64, nonce uint64, context interface{})
77+
78+
// SendReq sends an array of items to a certain remote PullEngine identified
79+
// by a string
80+
SendReq(dest string, items []uint64, nonce uint64)
81+
82+
// SendRes sends an array of items to a remote PullEngine identified by a context.
83+
SendRes(items []uint64, context interface{}, nonce uint64)
84+
}
85+
86+
type PullEngine struct {
87+
PullAdapter
88+
stopFlag int32
89+
state *util.Set
90+
item2owners map[uint64][]string
91+
peers2nonces map[string]uint64
92+
nonces2peers map[uint64]string
93+
acceptingDigests int32
94+
acceptingResponses int32
95+
lock sync.Mutex
96+
nonces *util.Set
97+
}
98+
99+
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
100+
engine := &PullEngine{
101+
PullAdapter: participant,
102+
stopFlag: int32(0),
103+
state: util.NewSet(),
104+
item2owners: make(map[uint64][]string),
105+
peers2nonces: make(map[string]uint64),
106+
nonces2peers: make(map[uint64]string),
107+
acceptingDigests: int32(0),
108+
acceptingResponses: int32(0),
109+
nonces: util.NewSet(),
110+
}
111+
112+
go func() {
113+
for !engine.toDie() {
114+
time.Sleep(sleepTime)
115+
engine.initiatePull()
116+
117+
}
118+
}()
119+
120+
return engine
121+
}
122+
123+
func (engine *PullEngine) toDie() bool {
124+
return (atomic.LoadInt32(&(engine.stopFlag)) == int32(1))
125+
}
126+
127+
func (engine *PullEngine) acceptResponses() {
128+
atomic.StoreInt32(&(engine.acceptingResponses), int32(1))
129+
}
130+
131+
func (engine *PullEngine) isAcceptingResponses() bool {
132+
return atomic.LoadInt32(&(engine.acceptingResponses)) == int32(1)
133+
}
134+
135+
func (engine *PullEngine) acceptDigests() {
136+
atomic.StoreInt32(&(engine.acceptingDigests), int32(1))
137+
}
138+
139+
func (engine *PullEngine) isAcceptingDigests() bool {
140+
return atomic.LoadInt32(&(engine.acceptingDigests)) == int32(1)
141+
}
142+
143+
func (engine *PullEngine) ignoreDigests() {
144+
atomic.StoreInt32(&(engine.acceptingDigests), int32(0))
145+
}
146+
147+
func (engine *PullEngine) Stop() {
148+
atomic.StoreInt32(&(engine.stopFlag), int32(1))
149+
}
150+
151+
func (engine *PullEngine) initiatePull() {
152+
engine.lock.Lock()
153+
defer engine.lock.Unlock()
154+
155+
engine.acceptDigests()
156+
for _, peer := range engine.SelectPeers() {
157+
nonce := engine.newNONCE()
158+
engine.nonces.Add(nonce)
159+
engine.nonces2peers[nonce] = peer
160+
engine.peers2nonces[peer] = nonce
161+
engine.Hello(peer, nonce)
162+
}
163+
164+
time.AfterFunc(defaultDigestWaitTime, func() {
165+
engine.processIncomingDigests()
166+
})
167+
}
168+
169+
func (engine *PullEngine) processIncomingDigests() {
170+
engine.ignoreDigests()
171+
172+
engine.lock.Lock()
173+
defer engine.lock.Unlock()
174+
175+
requestMapping := make(map[string][]uint64)
176+
for n, sources := range engine.item2owners {
177+
// select a random source
178+
source := sources[rand.Intn(len(sources))]
179+
if _, exists := requestMapping[source]; !exists {
180+
requestMapping[source] = make([]uint64, 0)
181+
}
182+
// append the number to that source
183+
requestMapping[source] = append(requestMapping[source], n)
184+
}
185+
186+
engine.acceptResponses()
187+
188+
for dest, seqsToReq := range requestMapping {
189+
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
190+
}
191+
192+
time.AfterFunc(defaultResponseWaitTime, engine.endPull)
193+
194+
}
195+
196+
func (engine *PullEngine) endPull() {
197+
engine.lock.Lock()
198+
defer engine.lock.Unlock()
199+
200+
atomic.StoreInt32(&(engine.acceptingResponses), int32(0))
201+
engine.nonces.Clear()
202+
203+
engine.item2owners = make(map[uint64][]string)
204+
engine.peers2nonces = make(map[string]uint64)
205+
engine.nonces2peers = make(map[uint64]string)
206+
}
207+
208+
func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) {
209+
if !engine.isAcceptingDigests() || !engine.nonces.Exists(nonce) {
210+
return
211+
}
212+
213+
engine.lock.Lock()
214+
defer engine.lock.Unlock()
215+
216+
for _, n := range digest {
217+
if engine.state.Exists(n) {
218+
continue
219+
}
220+
221+
if _, exists := engine.item2owners[n]; !exists {
222+
engine.item2owners[n] = make([]string, 0)
223+
}
224+
225+
engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
226+
}
227+
}
228+
229+
func (engine *PullEngine) Add(seqs ...uint64) {
230+
for _, seq := range seqs {
231+
engine.state.Add(seq)
232+
}
233+
}
234+
235+
func (engine *PullEngine) Remove(seqs ...uint64) {
236+
for _, seq := range seqs {
237+
engine.state.Remove(seq)
238+
}
239+
}
240+
241+
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
242+
engine.nonces.Add(nonce)
243+
time.AfterFunc(defaultRequestWaitTime, func() {
244+
engine.nonces.Remove(nonce)
245+
})
246+
247+
a := engine.state.ToArray()
248+
digest := make([]uint64, len(a))
249+
for i, item := range a {
250+
digest[i] = item.(uint64)
251+
}
252+
engine.SendDigest(digest, nonce, context)
253+
}
254+
255+
func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) {
256+
if !engine.nonces.Exists(nonce) {
257+
return
258+
}
259+
engine.lock.Lock()
260+
defer engine.lock.Unlock()
261+
262+
items2Send := make([]uint64, 0)
263+
for _, item := range items {
264+
if engine.state.Exists(item) {
265+
items2Send = append(items2Send, item)
266+
}
267+
}
268+
269+
engine.SendRes(items2Send, context, nonce)
270+
}
271+
272+
func (engine *PullEngine) OnRes(items []uint64, nonce uint64) {
273+
if !engine.nonces.Exists(nonce) || !engine.isAcceptingResponses() {
274+
return
275+
}
276+
277+
engine.Add(items...)
278+
}
279+
280+
func (engine *PullEngine) newNONCE() uint64 {
281+
n := uint64(0)
282+
for {
283+
n = uint64(rand.Int63())
284+
if !engine.nonces.Exists(n) {
285+
return n
286+
}
287+
}
288+
}

0 commit comments

Comments
 (0)