Skip to content

Commit 9d3abd1

Browse files
committed
Gossip Comm tests tweaks
This commit affects gossip comm tests: 0) Makes them run in parallel 1) Reduces gossip comm tests from 32s to 13s: Tweaked TestBlackListPKIid, TestAccept and TestBasic 2) Contains some refactoring work of gossip comm tests 3) Re-writes TestBlackListPKIid from concurrent to serial so it won't fail on occasions But it also contains a tiny change to the flow of createConnection() in gossip/comm/comm_impl.go that only affected wrong logging of messages: If the remote peer was black-listed, it would have logged that the remote peer has sent a nil pkiID, although it hadn't. Change-Id: Iceecbe7b8fe44915c59ff8b28d22ba1853ca4e8a Signed-off-by: Yacov Manevich <[email protected]>
1 parent 7129a55 commit 9d3abd1

File tree

2 files changed

+102
-74
lines changed

2 files changed

+102
-74
lines changed

gossip/comm/comm_impl.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIid
112112

113113
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
114114
func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID common.PKIidType, dialOpts ...grpc.DialOption) (Comm, error) {
115-
commInst, err := NewCommInstanceWithServer(-1, sec, PKIID, dialOpts ...)
115+
commInst, err := NewCommInstanceWithServer(-1, sec, PKIID, dialOpts...)
116116
if err != nil {
117117
return nil, err
118118
}
@@ -162,12 +162,12 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
162162

163163
if stream, err := cl.GossipStream(context.Background()); err == nil {
164164
pkiID, err := c.authenticateRemotePeer(stream)
165-
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
166-
// PKIID is nil when we don't know the remote PKI id's
167-
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
168-
return nil, fmt.Errorf("Authentication failure")
169-
}
170165
if err == nil {
166+
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
167+
// PKIID is nil when we don't know the remote PKI id's
168+
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
169+
return nil, fmt.Errorf("Authentication failure")
170+
}
171171
conn := newConnection(cl, cc, stream, nil)
172172
conn.pkiID = pkiID
173173
conn.logger = c.logger

gossip/comm/comm_test.go

+96-68
Original file line numberDiff line numberDiff line change
@@ -165,28 +165,26 @@ func TestHandshake(t *testing.T) {
165165
}
166166

167167
func TestBasic(t *testing.T) {
168+
t.Parallel()
168169
comm1, _ := newCommInstance(2000, naiveSec)
169170
comm2, _ := newCommInstance(3000, naiveSec)
170-
defer comm1.Stop()
171-
defer comm2.Stop()
172-
time.Sleep(time.Duration(3) * time.Second)
173-
msgs := make(chan *proto.GossipMessage, 2)
174-
go func() {
175-
m := <-comm2.Accept(acceptAll)
176-
msgs <- m.GetGossipMessage()
177-
}()
178-
go func() {
179-
m := <-comm1.Accept(acceptAll)
180-
msgs <- m.GetGossipMessage()
181-
}()
182-
comm1.Send(createGossipMsg(), &RemotePeer{PKIID: []byte("localhost:3000"), Endpoint: "localhost:3000"})
183-
time.Sleep(time.Second)
184-
comm2.Send(createGossipMsg(), &RemotePeer{PKIID: []byte("localhost:2000"), Endpoint: "localhost:2000"})
171+
m1 := comm1.Accept(acceptAll)
172+
m2 := comm2.Accept(acceptAll)
173+
out := make(chan uint64, 2)
174+
reader := func(ch <-chan ReceivedMessage) {
175+
m := <- ch
176+
out <- m.GetGossipMessage().Nonce
177+
}
178+
go reader(m1)
179+
go reader(m2)
180+
comm1.Send(createGossipMsg(), remotePeer(3000))
185181
time.Sleep(time.Second)
186-
assert.Equal(t, 2, len(msgs))
182+
comm2.Send(createGossipMsg(), remotePeer(2000))
183+
waitForMessages(t, out, 2, "Didn't receive 2 messages")
187184
}
188185

189186
func TestBlackListPKIid(t *testing.T) {
187+
t.Parallel()
190188
comm1, _ := newCommInstance(1611, naiveSec)
191189
comm2, _ := newCommInstance(1612, naiveSec)
192190
comm3, _ := newCommInstance(1613, naiveSec)
@@ -196,7 +194,7 @@ func TestBlackListPKIid(t *testing.T) {
196194
defer comm3.Stop()
197195
defer comm4.Stop()
198196

199-
reader := func(out chan uint64, in <-chan ReceivedMessage) {
197+
reader := func(instance string, out chan uint64, in <-chan ReceivedMessage) {
200198
for {
201199
msg := <-in
202200
if msg == nil {
@@ -206,52 +204,48 @@ func TestBlackListPKIid(t *testing.T) {
206204
}
207205
}
208206

209-
sender := func(comm Comm, port int, n int) {
210-
endpoint := fmt.Sprintf("localhost:%d", port)
211-
for i := 0; i < n; i++ {
212-
comm.Send(createGossipMsg(), &RemotePeer{Endpoint: endpoint, PKIID: []byte(endpoint)})
213-
time.Sleep(time.Duration(1) * time.Second)
214-
}
215-
}
207+
out1 := make(chan uint64, 4)
208+
out2 := make(chan uint64, 4)
209+
out3 := make(chan uint64, 4)
210+
out4 := make(chan uint64, 4)
216211

217-
out1 := make(chan uint64, 5)
218-
out2 := make(chan uint64, 5)
219-
out3 := make(chan uint64, 10)
220-
out4 := make(chan uint64, 10)
221-
222-
go reader(out1, comm1.Accept(acceptAll))
223-
go reader(out2, comm2.Accept(acceptAll))
224-
go reader(out3, comm3.Accept(acceptAll))
225-
go reader(out4, comm4.Accept(acceptAll))
212+
go reader("comm1", out1, comm1.Accept(acceptAll))
213+
go reader("comm2", out2, comm2.Accept(acceptAll))
214+
go reader("comm3", out3, comm3.Accept(acceptAll))
215+
go reader("comm4", out4, comm4.Accept(acceptAll))
226216

227217
// have comm1 BL comm3
228218
comm1.BlackListPKIid([]byte("localhost:1613"))
229219

230220
// make comm3 send to 1 and 2
231-
go sender(comm3, 1611, 5)
232-
go sender(comm3, 1612, 5)
221+
comm3.Send(createGossipMsg(), remotePeer(1612)) // out2++
222+
comm3.Send(createGossipMsg(), remotePeer(1611))
233223

234-
// make comm1 and comm2 send to comm3
235-
go sender(comm1, 1613, 5)
236-
go sender(comm2, 1613, 5)
224+
waitForMessages(t, out2, 1, "comm2 should have received 1 message")
237225

238-
// make comm1 and comm2 send to comm4 which is not blacklisted
239-
go sender(comm1, 1614, 5)
240-
go sender(comm2, 1614, 5)
226+
// make comm1 and comm2 send to comm3
227+
comm1.Send(createGossipMsg(), remotePeer(1613))
228+
comm2.Send(createGossipMsg(), remotePeer(1613)) // out3++
229+
waitForMessages(t, out3, 1, "comm3 should have received 1 message")
241230

242-
time.Sleep(time.Duration(1) * time.Second)
231+
// make comm1 and comm2 send to comm4 which is not blacklisted // out4 += 4
232+
comm1.Send(createGossipMsg(), remotePeer(1614))
233+
comm2.Send(createGossipMsg(), remotePeer(1614))
234+
comm1.Send(createGossipMsg(), remotePeer(1614))
235+
comm2.Send(createGossipMsg(), remotePeer(1614))
243236

244-
// blacklist comm3 mid-sending
237+
// blacklist comm3 by comm2
245238
comm2.BlackListPKIid([]byte("localhost:1613"))
246-
time.Sleep(time.Duration(5) * time.Second)
247239

248-
assert.Equal(t, 0, len(out1), "Comm instance 1 received messages(%d) from comm3 although comm3 is black listed", len(out1))
249-
assert.True(t, len(out2) < 2, "Comm instance 2 received too many messages(%d) from comm3 although comm3 is black listed", len(out2))
250-
assert.True(t, len(out3) < 3, "Comm instance 3 received too many messages(%d) although black listed", len(out3))
251-
assert.Equal(t, 10, len(out4), "Comm instance 4 didn't receive all messages sent to it")
240+
// send from comm1 and comm2 to comm3 again
241+
comm1.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect
242+
comm2.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect
243+
244+
waitForMessages(t, out4, 4, "comm1 should have received 4 messages")
252245
}
253246

254247
func TestParallelSend(t *testing.T) {
248+
t.Parallel()
255249
comm1, _ := newCommInstance(5611, naiveSec)
256250
comm2, _ := newCommInstance(5612, naiveSec)
257251
defer comm1.Stop()
@@ -266,7 +260,7 @@ func TestParallelSend(t *testing.T) {
266260
emptyMsg := createGossipMsg()
267261
go func() {
268262
defer wg.Done()
269-
comm1.Send(emptyMsg, &RemotePeer{Endpoint: "localhost:5612", PKIID: []byte("localhost:5612")})
263+
comm1.Send(emptyMsg, remotePeer(5612))
270264
}()
271265
}
272266
wg.Wait()
@@ -293,6 +287,7 @@ func TestParallelSend(t *testing.T) {
293287
}
294288

295289
func TestResponses(t *testing.T) {
290+
t.Parallel()
296291
comm1, _ := newCommInstance(8611, naiveSec)
297292
comm2, _ := newCommInstance(8612, naiveSec)
298293

@@ -316,7 +311,7 @@ func TestResponses(t *testing.T) {
316311
responsesFromComm1 := comm2.Accept(acceptAll)
317312

318313
ticker := time.NewTicker(time.Duration(6000) * time.Millisecond)
319-
comm2.Send(msg, &RemotePeer{PKIID: []byte("localhost:8611"), Endpoint: "localhost:8611"})
314+
comm2.Send(msg, remotePeer(8611))
320315
time.Sleep(time.Duration(100) * time.Millisecond)
321316

322317
select {
@@ -331,6 +326,7 @@ func TestResponses(t *testing.T) {
331326
}
332327

333328
func TestAccept(t *testing.T) {
329+
t.Parallel()
334330
comm1, _ := newCommInstance(7611, naiveSec)
335331
comm2, _ := newCommInstance(7612, naiveSec)
336332

@@ -348,11 +344,13 @@ func TestAccept(t *testing.T) {
348344
var evenResults []uint64
349345
var oddResults []uint64
350346

347+
out := make(chan uint64, defRecvBuffSize)
351348
sem := make(chan struct{}, 0)
352349

353350
readIntoSlice := func(a *[]uint64, ch <-chan ReceivedMessage) {
354351
for m := range ch {
355352
*a = append(*a, m.GetGossipMessage().Nonce)
353+
out <- m.GetGossipMessage().Nonce
356354
}
357355
sem <- struct{}{}
358356
}
@@ -361,10 +359,10 @@ func TestAccept(t *testing.T) {
361359
go readIntoSlice(&oddResults, oddNONCES)
362360

363361
for i := 0; i < defRecvBuffSize; i++ {
364-
comm2.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:7611", PKIID: []byte("localhost:7611")})
362+
comm2.Send(createGossipMsg(), remotePeer(7611))
365363
}
366364

367-
time.Sleep(time.Duration(5) * time.Second)
365+
waitForMessages(t, out, defRecvBuffSize, "Didn't receive all messages sent")
368366

369367
comm1.Stop()
370368
comm2.Stop()
@@ -386,6 +384,7 @@ func TestAccept(t *testing.T) {
386384
}
387385

388386
func TestReConnections(t *testing.T) {
387+
t.Parallel()
389388
comm1, _ := newCommInstance(3611, naiveSec)
390389
comm2, _ := newCommInstance(3612, naiveSec)
391390

@@ -406,10 +405,10 @@ func TestReConnections(t *testing.T) {
406405
go reader(out2, comm2.Accept(acceptAll))
407406

408407
// comm1 connects to comm2
409-
comm1.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:3612", PKIID: []byte("localhost:3612")})
408+
comm1.Send(createGossipMsg(), remotePeer(3612))
410409
time.Sleep(100 * time.Millisecond)
411410
// comm2 sends to comm1
412-
comm2.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:3611", PKIID: []byte("localhost:3611")})
411+
comm2.Send(createGossipMsg(), remotePeer(3611))
413412
time.Sleep(100 * time.Millisecond)
414413

415414
assert.Equal(t, 1, len(out2))
@@ -419,39 +418,43 @@ func TestReConnections(t *testing.T) {
419418
comm1, _ = newCommInstance(3611, naiveSec)
420419
go reader(out1, comm1.Accept(acceptAll))
421420
time.Sleep(300 * time.Millisecond)
422-
comm2.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:3611", PKIID: []byte("localhost:3611")})
421+
comm2.Send(createGossipMsg(), remotePeer(3611))
423422
time.Sleep(100 * time.Millisecond)
424423
assert.Equal(t, 2, len(out1))
425424
}
426425

427426
func TestProbe(t *testing.T) {
427+
t.Parallel()
428428
comm1, _ := newCommInstance(6611, naiveSec)
429429
defer comm1.Stop()
430430
comm2, _ := newCommInstance(6612, naiveSec)
431431
time.Sleep(time.Duration(1) * time.Second)
432-
assert.NoError(t, comm1.Probe(&RemotePeer{Endpoint: "localhost:6612", PKIID: []byte("localhost:6612")}))
433-
assert.Error(t, comm1.Probe(&RemotePeer{Endpoint: "localhost:9012", PKIID: []byte("localhost:9012")}))
432+
assert.NoError(t, comm1.Probe(remotePeer(6612)))
433+
assert.Error(t, comm1.Probe(remotePeer(9012)))
434434
comm2.Stop()
435435
time.Sleep(time.Second)
436-
assert.Error(t, comm1.Probe(&RemotePeer{Endpoint: "localhost:6612", PKIID: []byte("localhost:6612")}))
436+
assert.Error(t, comm1.Probe(remotePeer(6612)))
437437
comm2, _ = newCommInstance(6612, naiveSec)
438438
defer comm2.Stop()
439439
time.Sleep(time.Duration(1) * time.Second)
440-
assert.NoError(t, comm2.Probe(&RemotePeer{Endpoint: "localhost:6611", PKIID: []byte("localhost:6611")}))
441-
assert.NoError(t, comm1.Probe(&RemotePeer{Endpoint: "localhost:6612", PKIID: []byte("localhost:6612")}))
440+
assert.NoError(t, comm2.Probe(remotePeer(6611)))
441+
assert.NoError(t, comm1.Probe(remotePeer(6612)))
442442
}
443443

444444
func TestPresumedDead(t *testing.T) {
445-
comm1, _ := newCommInstance(7611, naiveSec)
446-
defer comm1.Stop()
447-
comm2, _ := newCommInstance(7612, naiveSec)
448-
go comm1.Send(createGossipMsg(), &RemotePeer{PKIID: []byte("localhost:7612"), Endpoint: "localhost:7612"})
445+
t.Parallel()
446+
comm1, _ := newCommInstance(4611, naiveSec)
447+
comm2, _ := newCommInstance(4612, naiveSec)
448+
go comm1.Send(createGossipMsg(), remotePeer(4612))
449449
<-comm2.Accept(acceptAll)
450450
comm2.Stop()
451-
for i := 0; i < 5; i++ {
452-
comm1.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:7612", PKIID: []byte("localhost:7612")})
453-
time.Sleep(time.Second)
454-
}
451+
go func() {
452+
for i := 0; i < 5; i++ {
453+
comm1.Send(createGossipMsg(), remotePeer(4612))
454+
time.Sleep(time.Millisecond * 200)
455+
}
456+
}()
457+
455458
ticker := time.NewTicker(time.Second * time.Duration(3))
456459
select {
457460
case <-ticker.C:
@@ -472,3 +475,28 @@ func createGossipMsg() *proto.GossipMessage {
472475
},
473476
}
474477
}
478+
479+
func remotePeer(port int) *RemotePeer {
480+
endpoint := fmt.Sprintf("localhost:%d", port)
481+
return &RemotePeer{Endpoint: endpoint, PKIID: []byte(endpoint)}
482+
}
483+
484+
func waitForMessages(t *testing.T, msgChan chan uint64, count int, errMsg string) {
485+
c := 0
486+
waiting := true
487+
ticker := time.NewTicker(time.Duration(5) * time.Second)
488+
for waiting {
489+
select {
490+
case <-msgChan:
491+
c++
492+
if c == count {
493+
waiting = false
494+
}
495+
break
496+
case <-ticker.C:
497+
waiting = false
498+
break
499+
}
500+
}
501+
assert.Equal(t, count, c, errMsg)
502+
}

0 commit comments

Comments
 (0)