Skip to content

Commit aa84135

Browse files
[FAB-2778] Msg store update
Update message store to automatically expire messages. Includes new go routine and expiration callback functions Change-Id: I7c4ec11c7392015e65de5d0553bd1e4c8aca3e5c Signed-off-by: Gennady Laventman <[email protected]>
1 parent 2663d8b commit aa84135

File tree

2 files changed

+294
-11
lines changed

2 files changed

+294
-11
lines changed

gossip/gossip/msgstore/msgs.go

+153-11
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ package msgstore
1919
import (
2020
"sync"
2121

22+
"time"
23+
2224
"github.com/hyperledger/fabric/gossip/common"
2325
)
2426

27+
var noopLock = func() {}
28+
2529
// invalidationTrigger is invoked on each message that was invalidated because of a message addition
2630
// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations
2731
// then the invalidation trigger on 0 was called when 1 was added.
@@ -30,7 +34,49 @@ type invalidationTrigger func(message interface{})
3034
// NewMessageStore returns a new MessageStore with the message replacing
3135
// policy and invalidation trigger passed.
3236
func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore {
33-
return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger}
37+
return newMsgStore(pol, trigger)
38+
}
39+
40+
// NewMessageStoreExpirable returns a new MessageStore with the message replacing
41+
// policy and invalidation trigger passed. It supports old message expiration after msgTTL, during expiration first external
42+
// lock taken, expiration callback invoked and external lock released. Callback and external lock can be nil.
43+
func NewMessageStoreExpirable(pol common.MessageReplacingPolicy, trigger invalidationTrigger, msgTTL time.Duration, externalLock func(), externalUnlock func(), externalExpire func(interface{})) MessageStore {
44+
store := newMsgStore(pol, trigger)
45+
46+
store.expirable = true
47+
store.msgTTL = msgTTL
48+
49+
if externalLock != nil {
50+
store.externalLock = externalLock
51+
}
52+
53+
if externalUnlock != nil {
54+
store.externalUnlock = externalUnlock
55+
}
56+
57+
if externalExpire != nil {
58+
store.expireMsgCallback = externalExpire
59+
}
60+
61+
go store.expirationRoutine()
62+
return store
63+
}
64+
65+
func newMsgStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) *messageStoreImpl {
66+
return &messageStoreImpl{
67+
pol: pol,
68+
messages: make([]*msg, 0),
69+
invTrigger: trigger,
70+
71+
expirable: false,
72+
externalLock: noopLock,
73+
externalUnlock: noopLock,
74+
expireMsgCallback: func(m interface{}) {},
75+
expiredCount: 0,
76+
77+
doneCh: make(chan struct{}),
78+
}
79+
3480
}
3581

3682
// MessageStore adds messages to an internal buffer.
@@ -44,22 +90,42 @@ type MessageStore interface {
4490
// returns true or false whether the message was added to the store
4591
Add(msg interface{}) bool
4692

93+
// Checks if message is valid for insertion to store
94+
// returns true or false whether the message can be added to the store
95+
CheckValid(msg interface{}) bool
96+
4797
// size returns the amount of messages in the store
4898
Size() int
4999

50100
// get returns all messages in the store
51101
Get() []interface{}
102+
103+
// Stop all associated go routines
104+
Stop()
52105
}
53106

54107
type messageStoreImpl struct {
55108
pol common.MessageReplacingPolicy
56-
lock *sync.RWMutex
109+
lock sync.RWMutex
57110
messages []*msg
58111
invTrigger invalidationTrigger
112+
113+
expirable bool
114+
msgTTL time.Duration
115+
expiredCount int
116+
117+
externalLock func()
118+
externalUnlock func()
119+
expireMsgCallback func(msg interface{})
120+
121+
doneCh chan struct{}
122+
stopOnce sync.Once
59123
}
60124

61125
type msg struct {
62-
data interface{}
126+
data interface{}
127+
created time.Time
128+
expired bool
63129
}
64130

65131
// add adds a message to the store
@@ -78,32 +144,108 @@ func (s *messageStoreImpl) Add(message interface{}) bool {
78144
s.messages = append(s.messages[:i], s.messages[i+1:]...)
79145
n--
80146
i--
81-
break
82-
default:
83-
break
84147
}
85148
}
86149

87-
s.messages = append(s.messages, &msg{data: message})
150+
s.messages = append(s.messages, &msg{data: message, created: time.Now()})
151+
return true
152+
}
153+
154+
// Checks if message is valid for insertion to store
155+
func (s *messageStoreImpl) CheckValid(message interface{}) bool {
156+
s.lock.RLock()
157+
defer s.lock.RUnlock()
158+
159+
for _, m := range s.messages {
160+
if s.pol(message, m.data) == common.MessageInvalidated {
161+
return false
162+
}
163+
}
88164
return true
89165
}
90166

91167
// size returns the amount of messages in the store
92168
func (s *messageStoreImpl) Size() int {
93169
s.lock.RLock()
94170
defer s.lock.RUnlock()
95-
return len(s.messages)
171+
return len(s.messages) - s.expiredCount
96172
}
97173

98174
// get returns all messages in the store
99175
func (s *messageStoreImpl) Get() []interface{} {
176+
res := make([]interface{}, 0)
177+
100178
s.lock.RLock()
101179
defer s.lock.RUnlock()
102180

181+
for _, msg := range s.messages {
182+
if !msg.expired {
183+
res = append(res, msg.data)
184+
}
185+
}
186+
return res
187+
}
188+
189+
func (s *messageStoreImpl) expireMessages() {
190+
s.externalLock()
191+
s.lock.Lock()
192+
defer s.lock.Unlock()
193+
defer s.externalUnlock()
194+
103195
n := len(s.messages)
104-
res := make([]interface{}, n)
105196
for i := 0; i < n; i++ {
106-
res[i] = s.messages[i].data
197+
m := s.messages[i]
198+
if !m.expired {
199+
if time.Since(m.created) > s.msgTTL {
200+
m.expired = true
201+
s.expireMsgCallback(m.data)
202+
s.expiredCount++
203+
}
204+
} else {
205+
if time.Since(m.created) > (s.msgTTL * 2) {
206+
s.messages = append(s.messages[:i], s.messages[i+1:]...)
207+
n--
208+
i--
209+
s.expiredCount--
210+
}
211+
212+
}
107213
}
108-
return res
214+
}
215+
216+
func (s *messageStoreImpl) needToExpire() bool {
217+
s.lock.RLock()
218+
defer s.lock.RUnlock()
219+
for _, msg := range s.messages {
220+
if !msg.expired && time.Since(msg.created) > s.msgTTL {
221+
return true
222+
} else if time.Since(msg.created) > (s.msgTTL * 2) {
223+
return true
224+
}
225+
}
226+
return false
227+
}
228+
229+
func (s *messageStoreImpl) expirationRoutine() {
230+
for {
231+
select {
232+
case <-s.doneCh:
233+
return
234+
case <-time.After(s.expirationCheckInterval()):
235+
if s.needToExpire() {
236+
s.expireMessages()
237+
}
238+
}
239+
}
240+
}
241+
242+
func (s *messageStoreImpl) Stop() {
243+
stopFunc := func() {
244+
close(s.doneCh)
245+
}
246+
s.stopOnce.Do(stopFunc)
247+
}
248+
249+
func (s *messageStoreImpl) expirationCheckInterval() time.Duration {
250+
return s.msgTTL / 100
109251
}

gossip/gossip/msgstore/msgs_test.go

+141
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"testing"
2323
"time"
2424

25+
"sync"
26+
2527
"github.com/hyperledger/fabric/gossip/common"
2628
"github.com/stretchr/testify/assert"
2729
)
@@ -51,6 +53,16 @@ func compareInts(this interface{}, that interface{}) common.InvalidationResult {
5153
return common.MessageInvalidated
5254
}
5355

56+
func nonReplaceInts(this interface{}, that interface{}) common.InvalidationResult {
57+
a := this.(int)
58+
b := that.(int)
59+
if a == b {
60+
return common.MessageInvalidated
61+
}
62+
63+
return common.MessageNoAction
64+
}
65+
5466
func TestSize(t *testing.T) {
5567
msgStore := NewMessageStore(alwaysNoAction, noopTrigger)
5668
msgStore.Add(0)
@@ -108,6 +120,7 @@ func TestNewMessagesInvalidated(t *testing.T) {
108120
}
109121

110122
func TestConcurrency(t *testing.T) {
123+
t.Parallel()
111124
stopFlag := int32(0)
112125
msgStore := NewMessageStore(compareInts, noopTrigger)
113126
looper := func(f func()) func() {
@@ -141,3 +154,131 @@ func TestConcurrency(t *testing.T) {
141154

142155
atomic.CompareAndSwapInt32(&stopFlag, 0, 1)
143156
}
157+
158+
func TestExpiration(t *testing.T) {
159+
t.Parallel()
160+
expired := make([]int, 0)
161+
msgTTL := time.Second * 3
162+
163+
msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) {
164+
expired = append(expired, m.(int))
165+
})
166+
167+
for i := 0; i < 10; i++ {
168+
assert.True(t, msgStore.Add(i), "Adding", i)
169+
}
170+
171+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")
172+
173+
time.Sleep(time.Second * 2)
174+
175+
for i := 0; i < 10; i++ {
176+
assert.False(t, msgStore.CheckValid(i))
177+
assert.False(t, msgStore.Add(i))
178+
}
179+
180+
for i := 10; i < 20; i++ {
181+
assert.True(t, msgStore.CheckValid(i))
182+
assert.True(t, msgStore.Add(i))
183+
assert.False(t, msgStore.CheckValid(i))
184+
}
185+
assert.Equal(t, 20, msgStore.Size(), "Wrong number of items in store - second batch")
186+
187+
time.Sleep(time.Second * 2)
188+
189+
for i := 0; i < 20; i++ {
190+
assert.False(t, msgStore.Add(i))
191+
}
192+
193+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration")
194+
assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration")
195+
196+
time.Sleep(time.Second * 4)
197+
198+
assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after second batch expiration")
199+
assert.Equal(t, 20, len(expired), "Wrong number of expired msgs - after second batch expiration")
200+
201+
for i := 0; i < 10; i++ {
202+
assert.True(t, msgStore.CheckValid(i))
203+
assert.True(t, msgStore.Add(i))
204+
assert.False(t, msgStore.CheckValid(i))
205+
}
206+
207+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after second batch expiration and first banch re-added")
208+
209+
}
210+
211+
func TestExpirationConcurrency(t *testing.T) {
212+
t.Parallel()
213+
expired := make([]int, 0)
214+
msgTTL := time.Second * 3
215+
lock := &sync.RWMutex{}
216+
217+
msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL,
218+
func() {
219+
lock.Lock()
220+
},
221+
func() {
222+
lock.Unlock()
223+
},
224+
func(m interface{}) {
225+
expired = append(expired, m.(int))
226+
})
227+
228+
lock.Lock()
229+
for i := 0; i < 10; i++ {
230+
assert.True(t, msgStore.Add(i), "Adding", i)
231+
}
232+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")
233+
lock.Unlock()
234+
235+
time.Sleep(time.Second * 2)
236+
237+
lock.Lock()
238+
time.Sleep(time.Second * 2)
239+
240+
for i := 0; i < 10; i++ {
241+
assert.False(t, msgStore.Add(i))
242+
}
243+
244+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, external lock taken")
245+
assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, external lock taken")
246+
lock.Unlock()
247+
248+
time.Sleep(time.Second * 1)
249+
250+
lock.Lock()
251+
for i := 0; i < 10; i++ {
252+
assert.False(t, msgStore.Add(i))
253+
}
254+
255+
assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after first batch expiration, expiration should run")
256+
assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration, expiration should run")
257+
258+
lock.Unlock()
259+
}
260+
261+
func TestStop(t *testing.T) {
262+
t.Parallel()
263+
expired := make([]int, 0)
264+
msgTTL := time.Second * 3
265+
266+
msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) {
267+
expired = append(expired, m.(int))
268+
})
269+
270+
for i := 0; i < 10; i++ {
271+
assert.True(t, msgStore.Add(i), "Adding", i)
272+
}
273+
274+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")
275+
276+
msgStore.Stop()
277+
278+
time.Sleep(time.Second * 4)
279+
280+
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, but store was stopped, so no expiration")
281+
assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, but store was stopped, so no expiration")
282+
283+
msgStore.Stop()
284+
}

0 commit comments

Comments
 (0)