Skip to content

Commit 8b63a26

Browse files
committed
add ability to unregister for events
dropping the grpc connection was the only method of clearing out event interest registration. As use of events increases, the ability for the sdk and clients to drop interest individual events will be a performance concern. Short lived and one time interests will be common. Change-Id: Ib16405c7b919ce244f3ea3f07a9bbaec48c5fa26 Signed-off-by: Patrick Mullaney <[email protected]>
1 parent d48a1c2 commit 8b63a26

File tree

8 files changed

+301
-72
lines changed

8 files changed

+301
-72
lines changed

events/consumer/consumer.go

+91-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package consumer
1919
import (
2020
"fmt"
2121
"io"
22+
"sync"
2223
"time"
2324

2425
"golang.org/x/net/context"
@@ -30,14 +31,24 @@ import (
3031

3132
//EventsClient holds the stream and adapter for consumer to work with
3233
type EventsClient struct {
34+
sync.RWMutex
3335
peerAddress string
36+
regTimeout time.Duration
3437
stream ehpb.Events_ChatClient
3538
adapter EventAdapter
3639
}
3740

3841
//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
39-
func NewEventsClient(peerAddress string, adapter EventAdapter) *EventsClient {
40-
return &EventsClient{peerAddress, nil, adapter}
42+
func NewEventsClient(peerAddress string, regTimeout time.Duration, adapter EventAdapter) (*EventsClient, error) {
43+
var err error
44+
if regTimeout < 100*time.Millisecond {
45+
regTimeout = 100 * time.Millisecond
46+
err = fmt.Errorf("regTimeout >= 0, setting to 100 msec")
47+
} else if regTimeout > 60*time.Second {
48+
regTimeout = 60 * time.Second
49+
err = fmt.Errorf("regTimeout > 60, setting to 60 sec")
50+
}
51+
return &EventsClient{sync.RWMutex{}, peerAddress, regTimeout, nil, adapter}, err
4152
}
4253

4354
//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
@@ -48,11 +59,26 @@ func newEventsClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn,
4859
return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)
4960
}
5061

51-
func (ec *EventsClient) register(ies []*ehpb.Interest) error {
62+
func (ec *EventsClient) send(emsg *ehpb.Event) error {
63+
ec.Lock()
64+
defer ec.Unlock()
65+
return ec.stream.Send(emsg)
66+
}
67+
68+
// RegisterAsync - registers interest in a event and doesn't wait for a response
69+
func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
5270
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}}
5371
var err error
54-
if err = ec.stream.Send(emsg); err != nil {
72+
if err = ec.send(emsg); err != nil {
5573
fmt.Printf("error on Register send %s\n", err)
74+
}
75+
return err
76+
}
77+
78+
// register - registers interest in a event
79+
func (ec *EventsClient) register(ies []*ehpb.Interest) error {
80+
var err error
81+
if err = ec.RegisterAsync(ies); err != nil {
5682
return err
5783
}
5884

@@ -74,12 +100,72 @@ func (ec *EventsClient) register(ies []*ehpb.Interest) error {
74100
}()
75101
select {
76102
case <-regChan:
77-
case <-time.After(5 * time.Second):
103+
case <-time.After(ec.regTimeout):
78104
err = fmt.Errorf("timeout waiting for registration")
79105
}
80106
return err
81107
}
82108

109+
// UnregisterAsync - Unregisters interest in a event and doesn't wait for a response
110+
func (ec *EventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
111+
emsg := &ehpb.Event{Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}}
112+
var err error
113+
if err = ec.send(emsg); err != nil {
114+
err = fmt.Errorf("error on unregister send %s\n", err)
115+
}
116+
117+
return err
118+
}
119+
120+
// unregister - unregisters interest in a event
121+
func (ec *EventsClient) unregister(ies []*ehpb.Interest) error {
122+
var err error
123+
if err = ec.UnregisterAsync(ies); err != nil {
124+
return err
125+
}
126+
127+
regChan := make(chan struct{})
128+
go func() {
129+
defer close(regChan)
130+
in, inerr := ec.stream.Recv()
131+
if inerr != nil {
132+
err = inerr
133+
return
134+
}
135+
switch in.Event.(type) {
136+
case *ehpb.Event_Unregister:
137+
case nil:
138+
err = fmt.Errorf("invalid nil object for unregister")
139+
default:
140+
err = fmt.Errorf("invalid unregistration object")
141+
}
142+
}()
143+
select {
144+
case <-regChan:
145+
case <-time.After(ec.regTimeout):
146+
err = fmt.Errorf("timeout waiting for unregistration")
147+
}
148+
return err
149+
}
150+
151+
// Recv recieves next event - use when client has not called Start
152+
func (ec *EventsClient) Recv() (*ehpb.Event, error) {
153+
in, err := ec.stream.Recv()
154+
if err == io.EOF {
155+
// read done.
156+
if ec.adapter != nil {
157+
ec.adapter.Disconnected(nil)
158+
}
159+
return nil, err
160+
}
161+
if err != nil {
162+
if ec.adapter != nil {
163+
ec.adapter.Disconnected(err)
164+
}
165+
return nil, err
166+
}
167+
return in, nil
168+
}
83169
func (ec *EventsClient) processEvents() error {
84170
defer ec.stream.CloseSend()
85171
for {

events/events_test.go

+105-22
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,30 @@ func (a *Adapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
4747
return []*ehpb.Interest{
4848
&ehpb.Interest{EventType: ehpb.EventType_BLOCK},
4949
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: "event1"}}},
50-
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: ""}}},
50+
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: "event2"}}},
5151
}, nil
5252
//return []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_BLOCK}}, nil
5353
}
5454

55+
func (a *Adapter) updateCountNotify() {
56+
a.Lock()
57+
a.count--
58+
if a.count <= 0 {
59+
a.notfy <- struct{}{}
60+
}
61+
a.Unlock()
62+
}
63+
5564
func (a *Adapter) Recv(msg *ehpb.Event) (bool, error) {
56-
//fmt.Printf("Adapter received %+v\n", msg.Event)
5765
switch x := msg.Event.(type) {
58-
case *ehpb.Event_Block:
59-
case *ehpb.Event_ChaincodeEvent:
66+
case *ehpb.Event_Block, *ehpb.Event_ChaincodeEvent, *ehpb.Event_Register, *ehpb.Event_Unregister:
67+
a.updateCountNotify()
6068
case nil:
6169
// The field is not set.
62-
fmt.Printf("event not set\n")
6370
return false, fmt.Errorf("event not set")
6471
default:
65-
fmt.Printf("unexpected type %T\n", x)
6672
return false, fmt.Errorf("unexpected type %T", x)
6773
}
68-
a.Lock()
69-
a.count--
70-
if a.count <= 0 {
71-
a.notfy <- struct{}{}
72-
}
73-
a.Unlock()
7474
return true, nil
7575
}
7676

@@ -107,17 +107,13 @@ func TestReceiveMessage(t *testing.T) {
107107
t.Logf("Error sending message %s", err)
108108
}
109109

110-
//receive 2 messages
111-
for i := 0; i < 2; i++ {
112-
select {
113-
case <-adapter.notfy:
114-
case <-time.After(5 * time.Second):
115-
t.Fail()
116-
t.Logf("timed out on messge")
117-
}
110+
select {
111+
case <-adapter.notfy:
112+
case <-time.After(2 * time.Second):
113+
t.Fail()
114+
t.Logf("timed out on messge")
118115
}
119116
}
120-
121117
func TestReceiveAnyMessage(t *testing.T) {
122118
var err error
123119

@@ -144,6 +140,42 @@ func TestReceiveAnyMessage(t *testing.T) {
144140
}
145141
}
146142
}
143+
func TestReceiveCCWildcard(t *testing.T) {
144+
var err error
145+
146+
adapter.count = 1
147+
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: ""}}}})
148+
149+
select {
150+
case <-adapter.notfy:
151+
case <-time.After(2 * time.Second):
152+
t.Fail()
153+
t.Logf("timed out on messge")
154+
}
155+
156+
adapter.count = 1
157+
emsg := createTestChaincodeEvent("0xffffffff", "wildcardevent")
158+
if err = producer.Send(emsg); err != nil {
159+
t.Fail()
160+
t.Logf("Error sending message %s", err)
161+
}
162+
163+
select {
164+
case <-adapter.notfy:
165+
case <-time.After(2 * time.Second):
166+
t.Fail()
167+
t.Logf("timed out on messge")
168+
}
169+
adapter.count = 1
170+
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: ""}}}})
171+
172+
select {
173+
case <-adapter.notfy:
174+
case <-time.After(2 * time.Second):
175+
t.Fail()
176+
t.Logf("timed out on messge")
177+
}
178+
}
147179

148180
func TestFailReceive(t *testing.T) {
149181
var err error
@@ -163,6 +195,56 @@ func TestFailReceive(t *testing.T) {
163195
}
164196
}
165197

198+
func TestUnregister(t *testing.T) {
199+
var err error
200+
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: "event10"}}}})
201+
202+
adapter.count = 1
203+
select {
204+
case <-adapter.notfy:
205+
case <-time.After(2 * time.Second):
206+
t.Fail()
207+
t.Logf("timed out on messge")
208+
}
209+
210+
emsg := createTestChaincodeEvent("0xffffffff", "event10")
211+
if err = producer.Send(emsg); err != nil {
212+
t.Fail()
213+
t.Logf("Error sending message %s", err)
214+
}
215+
216+
adapter.count = 1
217+
select {
218+
case <-adapter.notfy:
219+
case <-time.After(2 * time.Second):
220+
t.Fail()
221+
t.Logf("timed out on messge")
222+
}
223+
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeID: "0xffffffff", EventName: "event10"}}}})
224+
adapter.count = 1
225+
select {
226+
case <-adapter.notfy:
227+
case <-time.After(2 * time.Second):
228+
t.Fail()
229+
t.Logf("should have received unreg")
230+
}
231+
232+
adapter.count = 1
233+
emsg = createTestChaincodeEvent("0xffffffff", "event10")
234+
if err = producer.Send(emsg); err != nil {
235+
t.Fail()
236+
t.Logf("Error sending message %s", err)
237+
}
238+
239+
select {
240+
case <-adapter.notfy:
241+
t.Fail()
242+
t.Logf("should NOT have received event1")
243+
case <-time.After(5 * time.Second):
244+
}
245+
246+
}
247+
166248
func BenchmarkMessages(b *testing.B) {
167249
numMessages := 10000
168250

@@ -220,9 +302,10 @@ func TestMain(m *testing.M) {
220302
fmt.Printf("Starting events server\n")
221303
go grpcServer.Serve(lis)
222304

305+
var regTimeout = 5 * time.Second
223306
done := make(chan struct{})
224307
adapter = &Adapter{notfy: done}
225-
obcEHClient = consumer.NewEventsClient(peerAddress, adapter)
308+
obcEHClient, _ = consumer.NewEventsClient(peerAddress, regTimeout, adapter)
226309
if err = obcEHClient.Start(); err != nil {
227310
fmt.Printf("could not start chat %s\n", err)
228311
obcEHClient.Stop()

events/producer/events.go

-4
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,18 @@ import (
3434
//and the big lock should have no performance impact
3535
//
3636
type handlerList interface {
37-
//find() *handler
3837
add(ie *pb.Interest, h *handler) (bool, error)
3938
del(ie *pb.Interest, h *handler) (bool, error)
4039
foreach(ie *pb.Event, action func(h *handler))
4140
}
4241

4342
type genericHandlerList struct {
4443
sync.RWMutex
45-
// this map used as a list - add/del/iterate
4644
handlers map[*handler]bool
4745
}
4846

4947
type chaincodeHandlerList struct {
5048
sync.RWMutex
51-
// this map used as a list - add/del/iterate
5249
handlers map[string]map[string]map[*handler]bool
5350
}
5451

@@ -113,7 +110,6 @@ func (hl *chaincodeHandlerList) del(ie *pb.Interest, h *handler) (bool, error) {
113110
//the handler is not registered for the event type
114111
return false, fmt.Errorf("handler not registered for event name %s for chaincode ID %s", ie.GetChaincodeRegInfo().EventName, ie.GetChaincodeRegInfo().ChaincodeID)
115112
}
116-
117113
//remove the handler from the map
118114
delete(handlerMap, h)
119115

0 commit comments

Comments
 (0)