@@ -17,6 +17,7 @@ limitations under the License.
17
17
package election
18
18
19
19
import (
20
+ "bytes"
20
21
"fmt"
21
22
"sync"
22
23
"sync/atomic"
@@ -113,16 +114,18 @@ type LeaderElectionService interface {
113
114
Stop ()
114
115
}
115
116
117
+ type peerID []byte
118
+
116
119
// Peer describes a remote peer
117
120
type Peer interface {
118
121
// ID returns the ID of the peer
119
- ID () string
122
+ ID () peerID
120
123
}
121
124
122
125
// Msg describes a message sent from a remote peer
123
126
type Msg interface {
124
127
// SenderID returns the ID of the peer sent the message
125
- SenderID () string
128
+ SenderID () peerID
126
129
// IsProposal returns whether this message is a leadership proposal
127
130
IsProposal () bool
128
131
// IsDeclaration returns whether this message is a leadership declaration
@@ -138,7 +141,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback
138
141
panic (fmt .Errorf ("Empty id" ))
139
142
}
140
143
le := & leaderElectionSvcImpl {
141
- id : id ,
144
+ id : peerID ( id ) ,
142
145
proposals : util .NewSet (),
143
146
adapter : adapter ,
144
147
stopChan : make (chan struct {}, 1 ),
@@ -157,7 +160,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback
157
160
158
161
// leaderElectionSvcImpl is an implementation of a LeaderElectionService
159
162
type leaderElectionSvcImpl struct {
160
- id string
163
+ id peerID
161
164
proposals * util.Set
162
165
sync.Mutex
163
166
stopChan chan struct {}
@@ -209,13 +212,13 @@ func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
209
212
defer le .Unlock ()
210
213
211
214
if msg .IsProposal () {
212
- le .proposals .Add (msg .SenderID ())
215
+ le .proposals .Add (string ( msg .SenderID () ))
213
216
} else if msg .IsDeclaration () {
214
217
atomic .StoreInt32 (& le .leaderExists , int32 (1 ))
215
218
if le .sleeping && len (le .interruptChan ) == 0 {
216
219
le .interruptChan <- struct {}{}
217
220
}
218
- if msg .SenderID () < le .id && le .IsLeader () {
221
+ if bytes . Compare ( msg .SenderID (), le .id ) < 0 && le .IsLeader () {
219
222
le .stopBeingLeader ()
220
223
}
221
224
} else {
@@ -281,7 +284,7 @@ func (le *leaderElectionSvcImpl) leaderElection() {
281
284
// for being a leader
282
285
for _ , o := range le .proposals .ToArray () {
283
286
id := o .(string )
284
- if id < le .id {
287
+ if bytes . Compare ( peerID ( id ), le .id ) < 0 {
285
288
return
286
289
}
287
290
}
@@ -344,9 +347,9 @@ func (le *leaderElectionSvcImpl) drainInterruptChannel() {
344
347
}
345
348
346
349
// isAlive returns whether peer of given id is considered alive
347
- func (le * leaderElectionSvcImpl ) isAlive (id string ) bool {
350
+ func (le * leaderElectionSvcImpl ) isAlive (id peerID ) bool {
348
351
for _ , p := range le .adapter .Peers () {
349
- if p .ID () == id {
352
+ if bytes . Equal ( p .ID (), id ) {
350
353
return true
351
354
}
352
355
}
0 commit comments