Skip to content

Commit 216ae65

Browse files
committed
gossip discovery tests improvements
Added some more tests to discovery module and also did some refactoring of existing tests code-coverage is now 84% instead of 70% Changes in discovery_impl are only gofmt stuff Change-Id: I1125d08365e653ec87d6409ce90f8f389261ef17 Signed-off-by: Yacov Manevich <[email protected]>
1 parent b8c6b2f commit 216ae65

File tree

2 files changed

+115
-50
lines changed

2 files changed

+115
-50
lines changed

gossip/discovery/discovery_impl.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -96,25 +96,25 @@ type gossipDiscoveryImpl struct {
9696
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
9797
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery {
9898
d := &gossipDiscoveryImpl{
99-
endpoint: self.Endpoint,
100-
incTime: uint64(time.Now().UnixNano()),
101-
metadata: self.Metadata,
102-
pkiID: self.PKIid,
103-
seqNum: uint64(0),
99+
endpoint: self.Endpoint,
100+
incTime: uint64(time.Now().UnixNano()),
101+
metadata: self.Metadata,
102+
pkiID: self.PKIid,
103+
seqNum: uint64(0),
104104
deadLastTS: make(map[string]*timestamp),
105105
aliveLastTS: make(map[string]*timestamp),
106106
id2Member: make(map[string]*NetworkMember),
107107
cachedMembership: &proto.MembershipResponse{
108108
Alive: make([]*proto.AliveMessage, 0),
109109
Dead: make([]*proto.AliveMessage, 0),
110110
},
111-
crpypt: crypt,
111+
crpypt: crypt,
112112
bootstrapPeers: bootstrapPeers,
113-
comm: comm,
114-
lock: &sync.RWMutex{},
115-
toDieChan: make(chan struct{}, 1),
116-
toDieFlag: int32(0),
117-
logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint),
113+
comm: comm,
114+
lock: &sync.RWMutex{},
115+
toDieChan: make(chan struct{}, 1),
116+
toDieFlag: int32(0),
117+
logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint),
118118
}
119119

120120
d.logger.SetLevel(logging.WARNING)

gossip/discovery/discovery_test.go

+104-39
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"net"
2323
"sync"
24+
"sync/atomic"
2425
"testing"
2526
"time"
2627

@@ -42,6 +43,7 @@ type dummyCommModule struct {
4243
lock *sync.RWMutex
4344
incMsgs chan *proto.GossipMessage
4445
lastSeqs map[string]uint64
46+
shouldGossip bool
4547
}
4648

4749
type gossipMsg struct {
@@ -55,8 +57,9 @@ func (m *gossipMsg) GetGossipMessage() *proto.GossipMessage {
5557
type gossipInstance struct {
5658
comm *dummyCommModule
5759
Discovery
58-
gRGCserv *grpc.Server
59-
lsnr net.Listener
60+
gRGCserv *grpc.Server
61+
lsnr net.Listener
62+
shouldGossip bool
6063
}
6164

6265
func (comm *dummyCommModule) ValidateAliveMsg(am *proto.AliveMessage) bool {
@@ -68,6 +71,9 @@ func (comm *dummyCommModule) SignMessage(am *proto.AliveMessage) *proto.AliveMes
6871
}
6972

7073
func (comm *dummyCommModule) Gossip(msg *proto.GossipMessage) {
74+
if !comm.shouldGossip {
75+
return
76+
}
7177
comm.lock.Lock()
7278
defer comm.lock.Unlock()
7379
for _, conn := range comm.streams {
@@ -203,6 +209,14 @@ func (g *gossipInstance) Ping(context.Context, *proto.Empty) (*proto.Empty, erro
203209
}
204210

205211
func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance {
212+
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true)
213+
}
214+
215+
func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance {
216+
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false)
217+
}
218+
219+
func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool) *gossipInstance {
206220
comm := &dummyCommModule{
207221
conns: make(map[string]*grpc.ClientConn),
208222
streams: make(map[string]proto.Gossip_GossipStreamClient),
@@ -212,6 +226,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss
212226
detectedDead: make(chan string, 10000),
213227
lock: &sync.RWMutex{},
214228
lastSeqs: make(map[string]uint64),
229+
shouldGossip: shouldGossip,
215230
}
216231

217232
endpoint := fmt.Sprintf("localhost:%d", port)
@@ -230,7 +245,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss
230245

231246
discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm)
232247
discSvc.(*gossipDiscoveryImpl).logger.SetLevel(logging.WARNING)
233-
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll}
248+
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip}
234249

235250
proto.RegisterGossipServer(s, gossInst)
236251
go s.Serve(ll)
@@ -287,19 +302,34 @@ func TestUpdate(t *testing.T) {
287302
return true
288303
}
289304

290-
291305
waitUntilOrFail(t, checkMembership)
306+
stopInstances(t, instances)
307+
}
292308

293-
stopAction := &sync.WaitGroup{}
294-
for _, inst := range instances {
295-
stopAction.Add(1)
296-
go func(inst *gossipInstance) {
297-
defer stopAction.Done()
298-
inst.Stop()
299-
}(inst)
300-
}
309+
func TestInitiateSync(t *testing.T) {
310+
nodeNum := 10
311+
bootPeers := []string{bootPeer(3611), bootPeer(3612)}
312+
instances := []*gossipInstance{}
301313

302-
waitUntilOrFailBlocking(t, stopAction.Wait)
314+
toDie := int32(0)
315+
for i := 1; i <= nodeNum; i++ {
316+
id := fmt.Sprintf("d%d", i)
317+
inst := createDiscoveryInstanceWithNoGossip(3610+i, id, bootPeers)
318+
instances = append(instances, inst)
319+
go func() {
320+
for {
321+
if atomic.LoadInt32(&toDie) == int32(1) {
322+
return
323+
}
324+
time.Sleep(aliveExpirationTimeout / 3)
325+
inst.InitiateSync(9)
326+
}
327+
}()
328+
}
329+
time.Sleep(aliveExpirationTimeout * 4)
330+
assertMembership(t, instances, nodeNum-1)
331+
atomic.StoreInt32(&toDie, int32(1))
332+
stopInstances(t, instances)
303333
}
304334

305335
func TestExpiration(t *testing.T) {
@@ -319,21 +349,12 @@ func TestExpiration(t *testing.T) {
319349
instances = append(instances, inst)
320350
}
321351

322-
fullMembership := func() bool {
323-
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
324-
}
325-
326-
waitUntilOrFail(t, fullMembership)
352+
assertMembership(t, instances, nodeNum-1)
327353

328354
waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
329355
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)
330356

331-
time.Sleep(time.Duration(2) * time.Second)
332-
membershipReduced := func() bool {
333-
return nodeNum-3 == len(instances[0].GetMembership())
334-
}
335-
336-
waitUntilOrFail(t, membershipReduced)
357+
assertMembership(t, instances, nodeNum-3)
337358

338359
stopAction := &sync.WaitGroup{}
339360
for i, inst := range instances {
@@ -367,21 +388,8 @@ func TestGetFullMembership(t *testing.T) {
367388
instances = append(instances, inst)
368389
}
369390

370-
fullMembership := func() bool {
371-
return nodeNum - 1 == len(instances[nodeNum-1].GetMembership())
372-
}
373-
waitUntilOrFail(t, fullMembership)
374-
375-
stopAction := &sync.WaitGroup{}
376-
for _, inst := range instances {
377-
stopAction.Add(1)
378-
go func(inst *gossipInstance) {
379-
defer stopAction.Done()
380-
inst.Stop()
381-
}(inst)
382-
}
383-
384-
waitUntilOrFailBlocking(t, stopAction.Wait)
391+
assertMembership(t, instances, nodeNum-1)
392+
stopInstances(t, instances)
385393
}
386394

387395
func TestGossipDiscoveryStopping(t *testing.T) {
@@ -391,6 +399,38 @@ func TestGossipDiscoveryStopping(t *testing.T) {
391399

392400
}
393401

402+
func TestConvergence(t *testing.T) {
403+
// scenario:
404+
// {boot peer: [peer list]}
405+
// {d1: d2, d3, d4}
406+
// {d5: d6, d7, d8}
407+
// {d9: d10, d11, d12}
408+
// connect all boot peers with d13
409+
// take down d13
410+
// ensure still full membership
411+
instances := []*gossipInstance{}
412+
for _, i := range []int{1, 5, 9} {
413+
bootPort := 4610 + i
414+
id := fmt.Sprintf("d%d", i)
415+
leader := createDiscoveryInstance(bootPort, id, []string{})
416+
instances = append(instances, leader)
417+
for minionIndex := 1; minionIndex <= 3; minionIndex++ {
418+
id := fmt.Sprintf("d%d", i+minionIndex)
419+
minion := createDiscoveryInstance(4610+minionIndex+i, id, []string{bootPeer(bootPort)})
420+
instances = append(instances, minion)
421+
}
422+
}
423+
424+
assertMembership(t, instances, 3)
425+
connector := createDiscoveryInstance(4623, fmt.Sprintf("d13"), []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)})
426+
instances = append(instances, connector)
427+
assertMembership(t, instances, 12)
428+
connector.Stop()
429+
instances = instances[:len(instances)-1]
430+
assertMembership(t, instances, 11)
431+
stopInstances(t, instances)
432+
}
433+
394434
func waitUntilOrFail(t *testing.T, pred func() bool) {
395435
start := time.Now()
396436
limit := start.UnixNano() + timeout.Nanoseconds()
@@ -417,3 +457,28 @@ func waitUntilOrFailBlocking(t *testing.T, f func()) {
417457
}
418458
assert.Fail(t, "Timeout expired!")
419459
}
460+
461+
func stopInstances(t *testing.T, instances []*gossipInstance) {
462+
stopAction := &sync.WaitGroup{}
463+
for _, inst := range instances {
464+
stopAction.Add(1)
465+
go func(inst *gossipInstance) {
466+
defer stopAction.Done()
467+
inst.Stop()
468+
}(inst)
469+
}
470+
471+
waitUntilOrFailBlocking(t, stopAction.Wait)
472+
}
473+
474+
func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int) {
475+
fullMembership := func() bool {
476+
for _, inst := range instances {
477+
if len(inst.GetMembership()) == expectedNum {
478+
return true
479+
}
480+
}
481+
return false
482+
}
483+
waitUntilOrFail(t, fullMembership)
484+
}

0 commit comments

Comments
 (0)