Skip to content

Commit d74b1c5

Browse files
committed
Make pbft broadcast timeout configurable
Add a new parameter general.broadcastTimeout for pbft broadcast progress. Default set to 1s. Add test for pbft config setting and overriding. Change-Id: I72acf90be5a5b27cf0c45e249609216e63a7e007 Signed-off-by: jiangyaoguo <[email protected]>
1 parent 8ea25a9 commit d74b1c5

File tree

6 files changed

+188
-21
lines changed

6 files changed

+188
-21
lines changed

consensus/pbft/batch.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc
8484
op.pbft = newPbftCore(id, config, op, etf)
8585
op.manager.Start()
8686
op.externalEventReceiver.manager = op.manager
87-
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, stack)
87+
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack)
8888

8989
op.batchSize = config.GetInt("general.batchsize")
9090
op.batchStore = nil

consensus/pbft/broadcast.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,28 @@ type communicator interface {
3333
type broadcaster struct {
3434
comm communicator
3535

36-
f int
37-
msgChans map[uint64]chan *sendRequest
38-
closed sync.WaitGroup
39-
closedCh chan struct{}
36+
f int
37+
broadcastTimeout time.Duration
38+
msgChans map[uint64]chan *sendRequest
39+
closed sync.WaitGroup
40+
closedCh chan struct{}
4041
}
4142

4243
type sendRequest struct {
4344
msg *pb.Message
4445
done chan bool
4546
}
4647

47-
func newBroadcaster(self uint64, N int, f int, c communicator) *broadcaster {
48+
func newBroadcaster(self uint64, N int, f int, broadcastTimeout time.Duration, c communicator) *broadcaster {
4849
queueSize := 10 // XXX increase after testing
4950

5051
chans := make(map[uint64]chan *sendRequest)
5152
b := &broadcaster{
52-
comm: c,
53-
f: f,
54-
msgChans: chans,
55-
closedCh: make(chan struct{}),
53+
comm: c,
54+
f: f,
55+
broadcastTimeout: broadcastTimeout,
56+
msgChans: chans,
57+
closedCh: make(chan struct{}),
5658
}
5759
for i := 0; i < N; i++ {
5860
if uint64(i) == self {
@@ -172,7 +174,7 @@ func (b *broadcaster) send(msg *pb.Message, dest *uint64) error {
172174
}
173175

174176
succeeded := 0
175-
timer := time.NewTimer(time.Second) // TODO, make this configurable
177+
timer := time.NewTimer(b.broadcastTimeout)
176178

177179
// This loop will try to send, until one of:
178180
// a) the required number of sends succeed

consensus/pbft/broadcast_test.go

+40-5
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestBroadcast(t *testing.T) {
6969
}
7070
}()
7171

72-
b := newBroadcaster(1, 4, 1, m)
72+
b := newBroadcaster(1, 4, 1, time.Second, m)
7373

7474
msg := &pb.Message{Payload: []byte("hi")}
7575
b.Broadcast(msg)
@@ -123,7 +123,7 @@ func TestBroadcastStuck(t *testing.T) {
123123
}
124124
}()
125125

126-
b := newBroadcaster(1, 4, 1, m)
126+
b := newBroadcaster(1, 4, 1, time.Second, m)
127127

128128
maxc := 20
129129
for c := 0; c < maxc; c++ {
@@ -168,7 +168,7 @@ func TestBroadcastUnicast(t *testing.T) {
168168
}
169169
}()
170170

171-
b := newBroadcaster(1, 4, 1, m)
171+
b := newBroadcaster(1, 4, 1, time.Second, m)
172172

173173
msg := &pb.Message{Payload: []byte("hi")}
174174
b.Unicast(msg, 0)
@@ -206,7 +206,7 @@ func TestBroadcastAllFail(t *testing.T) {
206206
done: make(chan struct{}),
207207
}
208208

209-
b := newBroadcaster(1, 4, 1, m)
209+
b := newBroadcaster(1, 4, 1, time.Second, m)
210210

211211
maxc := 20
212212
for c := 0; c < maxc; c++ {
@@ -228,6 +228,41 @@ func TestBroadcastAllFail(t *testing.T) {
228228
}
229229
}
230230

231+
func TestBroadcastTimeout(t *testing.T) {
232+
expectTime := 10 * time.Second
233+
deltaTime := 50 * time.Millisecond
234+
m := &mockIndefinitelyStuckComm{
235+
mockComm: mockComm{
236+
self: 1,
237+
n: 4,
238+
msgCh: make(chan mockMsg),
239+
},
240+
done: make(chan struct{}),
241+
}
242+
243+
b := newBroadcaster(1, 4, 1, expectTime, m)
244+
broadcastDone := make(chan time.Time)
245+
246+
beginTime := time.Now()
247+
go func() {
248+
b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", 1))})
249+
broadcastDone <- time.Now()
250+
}()
251+
252+
checkTime := expectTime + deltaTime
253+
select {
254+
case endTime := <-broadcastDone:
255+
t.Log("Broadcast consume time: ", endTime.Sub(beginTime))
256+
close(broadcastDone)
257+
close(m.done)
258+
return
259+
case <-time.After(checkTime):
260+
close(broadcastDone)
261+
close(m.done)
262+
t.Fatalf("Broadcast timeout after %v, expected %v", checkTime, expectTime)
263+
}
264+
}
265+
231266
type mockIndefinitelyStuckComm struct {
232267
mockComm
233268
done chan struct{}
@@ -250,7 +285,7 @@ func TestBroadcastIndefinitelyStuck(t *testing.T) {
250285
done: make(chan struct{}),
251286
}
252287

253-
b := newBroadcaster(1, 4, 1, m)
288+
b := newBroadcaster(1, 4, 1, time.Second, m)
254289

255290
broadcastDone := make(chan struct{})
256291

consensus/pbft/config.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ general:
5959
# Interval to send "keep-alive" null requests. Set to 0 to disable. If enabled, must be greater than request timeout
6060
nullrequest: 0s
6161

62+
# How long may a message broadcast take.
63+
broadcast: 1s
64+
6265
################################################################################
6366
#
6467
# SECTION: EXECUTOR

consensus/pbft/pbft-core.go

+6
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ type pbftCore struct {
155155
newViewTimeout time.Duration // progress timeout for new views
156156
newViewTimerReason string // what triggered the timer
157157
lastNewViewTimeout time.Duration // last timeout we used during this view change
158+
broadcastTimeout time.Duration // progress timeout for broadcast
158159
outstandingReqBatches map[string]*RequestBatch // track whether we are waiting for request batches to execute
159160

160161
nullRequestTimer events.Timer // timeout triggering a null request
@@ -255,6 +256,10 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events
255256
if err != nil {
256257
instance.nullRequestTimeout = 0
257258
}
259+
instance.broadcastTimeout, err = time.ParseDuration(config.GetString("general.timeout.broadcast"))
260+
if err != nil {
261+
panic(fmt.Errorf("Cannot parse new broadcast timeout: %s", err))
262+
}
258263

259264
instance.activeView = true
260265
instance.replicaCount = instance.N
@@ -266,6 +271,7 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events
266271
logger.Infof("PBFT request timeout = %v", instance.requestTimeout)
267272
logger.Infof("PBFT view change timeout = %v", instance.newViewTimeout)
268273
logger.Infof("PBFT Checkpoint period (K) = %v", instance.K)
274+
logger.Infof("PBFT broadcast timeout = %v", instance.broadcastTimeout)
269275
logger.Infof("PBFT Log multiplier = %v", instance.logMultiplier)
270276
logger.Infof("PBFT log size (L) = %v", instance.L)
271277
if instance.nullRequestTimeout > 0 {

consensus/pbft/pbft-core_test.go

+126-5
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,41 @@ func init() {
3636
logging.SetLevel(logging.DEBUG, "")
3737
}
3838

39+
func TestConfigSet(t *testing.T) {
40+
config := loadConfig()
41+
42+
testKeys := []string{
43+
"general.mode",
44+
"general.N",
45+
"general.f",
46+
"general.K",
47+
"general.logmultiplier",
48+
"general.batchsize",
49+
"general.byzantine",
50+
"general.viewchangeperiod",
51+
"general.timeout.batch",
52+
"general.timeout.request",
53+
"general.timeout.viewchange",
54+
"general.timeout.resendviewchange",
55+
"general.timeout.nullrequest",
56+
"general.timeout.broadcast",
57+
"executor.queuesize",
58+
}
59+
60+
for _, key := range testKeys {
61+
if ok := config.IsSet(key); !ok {
62+
t.Errorf("Cannot test env override because \"%s\" does not seem to be set", key)
63+
}
64+
}
65+
}
66+
3967
func TestEnvOverride(t *testing.T) {
4068
config := loadConfig()
4169

4270
key := "general.mode" // for a key that exists
4371
envName := "CORE_PBFT_GENERAL_MODE" // env override name
4472
overrideValue := "overide_test" // value to override default value with
4573

46-
// test key
47-
if ok := config.IsSet("general.mode"); !ok {
48-
t.Fatalf("Cannot test env override because \"%s\" does not seem to be set", key)
49-
}
50-
5174
os.Setenv(envName, overrideValue)
5275
// The override config value will cause other calls to fail unless unset.
5376
defer func() {
@@ -66,6 +89,104 @@ func TestEnvOverride(t *testing.T) {
6689

6790
}
6891

92+
func TestIntEnvOverride(t *testing.T) {
93+
config := loadConfig()
94+
95+
tests := []struct {
96+
key string
97+
envName string
98+
overrideValue string
99+
expectValue int
100+
}{
101+
{"general.N", "CORE_PBFT_GENERAL_N", "8", 8},
102+
{"general.f", "CORE_PBFT_GENERAL_F", "2", 2},
103+
{"general.K", "CORE_PBFT_GENERAL_K", "20", 20},
104+
{"general.logmultiplier", "CORE_PBFT_GENERAL_LOGMULTIPLIER", "6", 6},
105+
{"general.batchsize", "CORE_PBFT_GENERAL_BATCHSIZE", "200", 200},
106+
{"general.viewchangeperiod", "CORE_PBFT_GENERAL_VIEWCHANGEPERIOD", "5", 5},
107+
{"executor.queuesize", "CORE_PBFT_EXECUTOR_QUEUESIZE", "50", 50},
108+
}
109+
110+
for _, test := range tests {
111+
os.Setenv(test.envName, test.overrideValue)
112+
113+
if ok := config.IsSet(test.key); !ok {
114+
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
115+
}
116+
117+
configVal := config.GetInt(test.key)
118+
if configVal != test.expectValue {
119+
t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%d\"", test.key, test.expectValue, configVal)
120+
}
121+
122+
os.Unsetenv(test.envName)
123+
}
124+
}
125+
126+
func TestDurationEnvOverride(t *testing.T) {
127+
config := loadConfig()
128+
129+
tests := []struct {
130+
key string
131+
envName string
132+
overrideValue string
133+
expectValue time.Duration
134+
}{
135+
{"general.timeout.batch", "CORE_PBFT_GENERAL_TIMEOUT_BATCH", "2s", 2 * time.Second},
136+
{"general.timeout.request", "CORE_PBFT_GENERAL_TIMEOUT_REQUEST", "4s", 4 * time.Second},
137+
{"general.timeout.viewchange", "CORE_PBFT_GENERAL_TIMEOUT_VIEWCHANGE", "5s", 5 * time.Second},
138+
{"general.timeout.resendviewchange", "CORE_PBFT_GENERAL_TIMEOUT_RESENDVIEWCHANGE", "200ms", 200 * time.Millisecond},
139+
{"general.timeout.nullrequest", "CORE_PBFT_GENERAL_TIMEOUT_NULLREQUEST", "1s", time.Second},
140+
{"general.timeout.broadcast", "CORE_PBFT_GENERAL_TIMEOUT_BROADCAST", "1m", time.Minute},
141+
}
142+
143+
for _, test := range tests {
144+
os.Setenv(test.envName, test.overrideValue)
145+
146+
if ok := config.IsSet(test.key); !ok {
147+
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
148+
}
149+
150+
configVal := config.GetDuration(test.key)
151+
if configVal != test.expectValue {
152+
t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", test.key, test.expectValue, configVal)
153+
}
154+
155+
os.Unsetenv(test.envName)
156+
}
157+
}
158+
159+
func TestBoolEnvOverride(t *testing.T) {
160+
config := loadConfig()
161+
162+
tests := []struct {
163+
key string
164+
envName string
165+
overrideValue string
166+
expectValue bool
167+
}{
168+
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "false", false},
169+
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "0", false},
170+
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "true", true},
171+
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "1", true},
172+
}
173+
174+
for i, test := range tests {
175+
os.Setenv(test.envName, test.overrideValue)
176+
177+
if ok := config.IsSet(test.key); !ok {
178+
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
179+
}
180+
181+
configVal := config.GetBool(test.key)
182+
if configVal != test.expectValue {
183+
t.Errorf("Test %d Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", i, test.key, test.expectValue, configVal)
184+
}
185+
186+
os.Unsetenv(test.envName)
187+
}
188+
}
189+
69190
func TestMaliciousPrePrepare(t *testing.T) {
70191
mock := &omniProto{
71192
broadcastImpl: func(msgPayload []byte) {

0 commit comments

Comments
 (0)