@@ -80,7 +80,7 @@ type gossipDiscoveryImpl struct {
80
80
aliveMembership * util.MembershipStore
81
81
deadMembership * util.MembershipStore
82
82
83
- msgStore msgstore. MessageStore
83
+ msgStore * aliveMsgStore
84
84
85
85
bootstrapPeers []string
86
86
@@ -114,25 +114,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
114
114
disclosurePolicy : disPol ,
115
115
}
116
116
117
- policy := proto .NewGossipMessageComparator (0 )
118
- trigger := func (m interface {}) {}
119
- aliveMsgTTL := getAliveExpirationTimeout () * msgExpirationFactor
120
- externalLock := func () { d .lock .Lock () }
121
- externalUnlock := func () { d .lock .Unlock () }
122
- callback := func (m interface {}) {
123
- msg := m .(* proto.SignedGossipMessage )
124
- if ! msg .IsAliveMsg () {
125
- return
126
- }
127
- id := msg .GetAliveMsg ().Membership .PkiId
128
- d .aliveMembership .Remove (id )
129
- d .deadMembership .Remove (id )
130
- delete (d .id2Member , string (id ))
131
- delete (d .deadLastTS , string (id ))
132
- delete (d .aliveLastTS , string (id ))
133
- }
134
-
135
- d .msgStore = msgstore .NewMessageStoreExpirable (policy , trigger , aliveMsgTTL , externalLock , externalUnlock , callback )
117
+ d .msgStore = newAliveMsgStore (d )
136
118
137
119
go d .periodicalSendAlive ()
138
120
go d .periodicalCheckAlive ()
@@ -325,7 +307,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
325
307
return
326
308
}
327
309
328
- if d .msgStore .CheckValid (m ) {
310
+ if d .msgStore .CheckValid (selfInfoGossipMsg ) {
329
311
d .handleAliveMessage (selfInfoGossipMsg )
330
312
}
331
313
@@ -364,10 +346,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
364
346
return
365
347
}
366
348
367
- if d .msgStore .CheckValid (m ) {
349
+ if d .msgStore .CheckValid (am ) {
368
350
d .handleAliveMessage (am )
369
351
}
370
-
371
352
}
372
353
373
354
for _ , env := range memResp .Dead {
@@ -381,7 +362,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
381
362
continue
382
363
}
383
364
384
- if ! d .msgStore .CheckValid (m ) {
365
+ if ! d .msgStore .CheckValid (dm ) {
385
366
//Newer alive message exist
386
367
return
387
368
}
@@ -965,3 +946,46 @@ func filterOutLocalhost(endpoints []string, port int) []string {
965
946
}
966
947
return returnedEndpoints
967
948
}
949
+
950
+ type aliveMsgStore struct {
951
+ msgstore.MessageStore
952
+ }
953
+
954
+ func newAliveMsgStore (d * gossipDiscoveryImpl ) * aliveMsgStore {
955
+ policy := proto .NewGossipMessageComparator (0 )
956
+ trigger := func (m interface {}) {}
957
+ aliveMsgTTL := getAliveExpirationTimeout () * msgExpirationFactor
958
+ externalLock := func () { d .lock .Lock () }
959
+ externalUnlock := func () { d .lock .Unlock () }
960
+ callback := func (m interface {}) {
961
+ msg := m .(* proto.SignedGossipMessage )
962
+ if ! msg .IsAliveMsg () {
963
+ return
964
+ }
965
+ id := msg .GetAliveMsg ().Membership .PkiId
966
+ d .aliveMembership .Remove (id )
967
+ d .deadMembership .Remove (id )
968
+ delete (d .id2Member , string (id ))
969
+ delete (d .deadLastTS , string (id ))
970
+ delete (d .aliveLastTS , string (id ))
971
+ }
972
+
973
+ s := & aliveMsgStore {
974
+ MessageStore : msgstore .NewMessageStoreExpirable (policy , trigger , aliveMsgTTL , externalLock , externalUnlock , callback ),
975
+ }
976
+ return s
977
+ }
978
+
979
+ func (s * aliveMsgStore ) Add (msg interface {}) bool {
980
+ if ! msg .(* proto.SignedGossipMessage ).IsAliveMsg () {
981
+ panic (fmt .Sprint ("Msg " , msg , " is not AliveMsg" ))
982
+ }
983
+ return s .MessageStore .Add (msg )
984
+ }
985
+
986
+ func (s * aliveMsgStore ) CheckValid (msg interface {}) bool {
987
+ if ! msg .(* proto.SignedGossipMessage ).IsAliveMsg () {
988
+ panic (fmt .Sprint ("Msg " , msg , " is not AliveMsg" ))
989
+ }
990
+ return s .MessageStore .CheckValid (msg )
991
+ }
0 commit comments