@@ -29,6 +29,7 @@ import (
29
29
"github.com/op/go-logging"
30
30
)
31
31
32
+ // Constants go here.
32
33
const (
33
34
HelloMsgType PullMsgType = iota
34
35
DigestMsgType
@@ -40,8 +41,9 @@ const (
40
41
type PullMsgType int
41
42
42
43
// MessageHook defines a function that will run after a certain pull message is received
43
- type MessageHook func (itemIds []string , items []* proto.GossipMessage , msg comm.ReceivedMessage )
44
+ type MessageHook func (itemIDs []string , items []* proto.GossipMessage , msg comm.ReceivedMessage )
44
45
46
+ // Sender sends messages to remote peers
45
47
type Sender interface {
46
48
// Send sends a message to a list of remote peers
47
49
Send (msg * proto.GossipMessage , peers ... * comm.RemotePeer )
@@ -55,7 +57,7 @@ type MembershipService interface {
55
57
56
58
// PullConfig defines the configuration of the pull mediator
57
59
type PullConfig struct {
58
- Id string
60
+ ID string
59
61
PullInterval time.Duration // Duration between pull invocations
60
62
PeerCountToSelect int // Number of peers to initiate pull with
61
63
Tag proto.GossipMessage_Tag
@@ -64,7 +66,7 @@ type PullConfig struct {
64
66
}
65
67
66
68
// Mediator is a component wrap a PullEngine and provides the methods
67
- // it needs to perform pull synchronization..
69
+ // it needs to perform pull synchronization.
68
70
// The specialization of a pull mediator to a certain type of message is
69
71
// done by the configuration, a IdentifierExtractor, IdentifierExtractor
70
72
// given at construction, and also hooks that can be registered for each
@@ -86,28 +88,29 @@ type Mediator interface {
86
88
HandleMessage (msg comm.ReceivedMessage )
87
89
}
88
90
89
- // pullStoreImpl is an implementation of PullStore
91
+ // pullMediatorImpl is an implementation of Mediator
90
92
type pullMediatorImpl struct {
93
+ sync.RWMutex
94
+ Sender
91
95
msgType2Hook map [PullMsgType ][]MessageHook
92
96
idExtractor proto.IdentifierExtractor
93
97
msgCons proto.MsgConsumer
94
98
config PullConfig
95
99
logger * logging.Logger
96
- sync.RWMutex
97
- itemId2msg map [string ]* proto.GossipMessage
98
- Sender
99
- memBvc MembershipService
100
- engine * algo.PullEngine
100
+ itemID2Msg map [string ]* proto.GossipMessage
101
+ memBvc MembershipService
102
+ engine * algo.PullEngine
101
103
}
102
104
105
+ // NewPullMediator returns a new Mediator
103
106
func NewPullMediator (config PullConfig , sndr Sender , memSvc MembershipService , idExtractor proto.IdentifierExtractor , msgCons proto.MsgConsumer ) Mediator {
104
107
p := & pullMediatorImpl {
105
108
msgCons : msgCons ,
106
109
msgType2Hook : make (map [PullMsgType ][]MessageHook ),
107
110
idExtractor : idExtractor ,
108
111
config : config ,
109
- logger : util .GetLogger (util .LoggingPullModule , config .Id ),
110
- itemId2msg : make (map [string ]* proto.GossipMessage ),
112
+ logger : util .GetLogger (util .LoggingPullModule , config .ID ),
113
+ itemID2Msg : make (map [string ]* proto.GossipMessage ),
111
114
memBvc : memSvc ,
112
115
Sender : sndr ,
113
116
}
@@ -128,7 +131,7 @@ func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) {
128
131
129
132
p .logger .Debug (msg )
130
133
131
- itemIds := []string {}
134
+ itemIDs := []string {}
132
135
items := []* proto.GossipMessage {}
133
136
var pullMsgType PullMsgType
134
137
@@ -137,33 +140,33 @@ func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) {
137
140
p .engine .OnHello (helloMsg .Nonce , m )
138
141
}
139
142
if digest := msg .GetDataDig (); digest != nil {
140
- itemIds = digest .Digests
143
+ itemIDs = digest .Digests
141
144
pullMsgType = DigestMsgType
142
145
p .engine .OnDigest (digest .Digests , digest .Nonce , m )
143
146
}
144
147
if req := msg .GetDataReq (); req != nil {
145
- itemIds = req .Digests
148
+ itemIDs = req .Digests
146
149
pullMsgType = RequestMsgType
147
150
p .engine .OnReq (req .Digests , req .Nonce , m )
148
151
}
149
152
if res := msg .GetDataUpdate (); res != nil {
150
- itemIds = make ([]string , len (res .Data ))
153
+ itemIDs = make ([]string , len (res .Data ))
151
154
items = make ([]* proto.GossipMessage , len (res .Data ))
152
155
pullMsgType = ResponseMsgType
153
156
for i , pulledMsg := range res .Data {
154
157
p .msgCons (pulledMsg )
155
- itemIds [i ] = p .idExtractor (pulledMsg )
158
+ itemIDs [i ] = p .idExtractor (pulledMsg )
156
159
items [i ] = pulledMsg
157
160
p .Lock ()
158
- p.itemId2msg [ itemIds [i ]] = pulledMsg
161
+ p.itemID2Msg [ itemIDs [i ]] = pulledMsg
159
162
p .Unlock ()
160
163
}
161
- p .engine .OnRes (itemIds , res .Nonce )
164
+ p .engine .OnRes (itemIDs , res .Nonce )
162
165
}
163
166
164
167
// Invoke hooks for relevant message type
165
168
for _ , h := range p .hooksByMsgType (pullMsgType ) {
166
- h (itemIds , items , m )
169
+ h (itemIDs , items , m )
167
170
}
168
171
}
169
172
@@ -183,18 +186,18 @@ func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType PullMsgType, hook Message
183
186
func (p * pullMediatorImpl ) Add (msg * proto.GossipMessage ) {
184
187
p .Lock ()
185
188
defer p .Unlock ()
186
- itemId := p .idExtractor (msg )
187
- p .itemId2msg [ itemId ] = msg
188
- p .engine .Add (itemId )
189
+ itemID := p .idExtractor (msg )
190
+ p .itemID2Msg [ itemID ] = msg
191
+ p .engine .Add (itemID )
189
192
}
190
193
191
194
// Remove removes a GossipMessage from the store
192
195
func (p * pullMediatorImpl ) Remove (msg * proto.GossipMessage ) {
193
196
p .Lock ()
194
197
defer p .Unlock ()
195
- itemId := p .idExtractor (msg )
196
- delete (p .itemId2msg , itemId )
197
- p .engine .Remove (itemId )
198
+ itemID := p .idExtractor (msg )
199
+ delete (p .itemID2Msg , itemID )
200
+ p .engine .Remove (itemID )
198
201
}
199
202
200
203
// SelectPeers returns a slice of peers which the engine will initiate the protocol with
@@ -271,7 +274,7 @@ func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce ui
271
274
p .RLock ()
272
275
defer p .RUnlock ()
273
276
for _ , item := range items {
274
- if msg , exists := p .itemId2msg [item ]; exists {
277
+ if msg , exists := p .itemID2Msg [item ]; exists {
275
278
items2return = append (items2return , msg )
276
279
}
277
280
}
@@ -314,6 +317,7 @@ func (p *pullMediatorImpl) hooksByMsgType(msgType PullMsgType) []MessageHook {
314
317
return returnedHooks
315
318
}
316
319
320
+ // SelectEndpoints select k peers from peerPool and returns them.
317
321
func SelectEndpoints (k int , peerPool []discovery.NetworkMember ) []* comm.RemotePeer {
318
322
if len (peerPool ) < k {
319
323
k = len (peerPool )
0 commit comments