Skip to content

Commit b40cd9a

Browse files
committed
Gossip communication layer
This commit adds the gossip communication layer Change-Id: I371445a5b84ef0c0e0764050bf0901db2b246df8 Signed-off-by: Yacov Manevich <[email protected]>
1 parent 8249ddd commit b40cd9a

File tree

8 files changed

+1719
-3
lines changed

8 files changed

+1719
-3
lines changed

gossip/comm/comm_impl.go

+539
Large diffs are not rendered by default.

gossip/comm/comm_test.go

+460
Large diffs are not rendered by default.

gossip/comm/conn.go

+330
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package comm
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
24+
"github.com/hyperledger/fabric/gossip/proto"
25+
"github.com/hyperledger/fabric/gossip/util"
26+
"google.golang.org/grpc"
27+
)
28+
29+
type handler func(*proto.GossipMessage)
30+
31+
type connFactory interface {
32+
createConnection(endpoint string, pkiID PKIidType) (*connection, error)
33+
}
34+
35+
type connectionStore struct {
36+
logger *util.Logger // logger
37+
selfPKIid PKIidType // pkiID of this peer
38+
isClosing bool // whether this connection store is shutting down
39+
connFactory connFactory // creates a connection to remote peer
40+
sync.RWMutex // synchronize access to shared variables
41+
pki2Conn map[string]*connection // mapping between pkiID to connections
42+
destinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,
43+
// used to prevent concurrent connection establishment to the same remote endpoint
44+
}
45+
46+
func newConnStore(connFactory connFactory, pkiID PKIidType, logger *util.Logger) *connectionStore {
47+
return &connectionStore{
48+
connFactory: connFactory,
49+
isClosing: false,
50+
pki2Conn: make(map[string]*connection),
51+
selfPKIid: pkiID,
52+
destinationLocks: make(map[string]*sync.RWMutex),
53+
logger: logger,
54+
}
55+
}
56+
57+
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
58+
cs.RLock()
59+
isClosing := cs.isClosing
60+
cs.RUnlock()
61+
62+
if isClosing {
63+
return nil, fmt.Errorf("Shutting down")
64+
}
65+
66+
pkiID := peer.PKIID
67+
endpoint := peer.Endpoint
68+
69+
cs.Lock()
70+
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
71+
if !hasConnected {
72+
destinationLock = &sync.RWMutex{}
73+
cs.destinationLocks[string(pkiID)] = destinationLock
74+
}
75+
cs.Unlock()
76+
77+
destinationLock.Lock()
78+
79+
cs.RLock()
80+
conn, exists := cs.pki2Conn[string(pkiID)]
81+
if exists {
82+
cs.RUnlock()
83+
destinationLock.Unlock()
84+
return conn, nil
85+
}
86+
cs.RUnlock()
87+
88+
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
89+
90+
destinationLock.Unlock()
91+
92+
cs.Lock()
93+
delete(cs.destinationLocks, string(pkiID))
94+
defer cs.Unlock()
95+
96+
// check again, maybe someone connected to us during the connection creation?
97+
conn, exists = cs.pki2Conn[string(pkiID)]
98+
99+
if exists {
100+
if createdConnection != nil {
101+
createdConnection.close()
102+
}
103+
return conn, nil
104+
}
105+
106+
// no one connected to us AND we failed connecting!
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
// at this point in the code, we created a connection to a remote peer
112+
conn = createdConnection
113+
cs.pki2Conn[string(createdConnection.pkiID)] = conn
114+
115+
go conn.serviceInput()
116+
117+
return conn, nil
118+
}
119+
120+
func (cs *connectionStore) connNum() int {
121+
cs.RLock()
122+
defer cs.RUnlock()
123+
return len(cs.pki2Conn)
124+
}
125+
126+
func (cs *connectionStore) closeConn(peer *RemotePeer) {
127+
cs.Lock()
128+
defer cs.Unlock()
129+
130+
if conn, exists := cs.pki2Conn[string(peer.PKIID)]; exists {
131+
conn.close()
132+
delete(cs.pki2Conn, string(conn.pkiID))
133+
}
134+
}
135+
136+
func (cs *connectionStore) shutdown() {
137+
cs.Lock()
138+
cs.isClosing = true
139+
pkiIds2conn := cs.pki2Conn
140+
cs.Unlock()
141+
142+
wg := sync.WaitGroup{}
143+
for _, conn := range pkiIds2conn {
144+
wg.Add(1)
145+
go func(conn *connection) {
146+
cs.closeByPKIid(conn.pkiID)
147+
wg.Done()
148+
}(conn)
149+
}
150+
wg.Wait()
151+
}
152+
153+
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID PKIidType) *connection {
154+
cs.Lock()
155+
defer cs.Unlock()
156+
157+
if c, exists := cs.pki2Conn[string(pkiID)]; exists {
158+
c.close()
159+
}
160+
161+
return cs.registerConn(pkiID, serverStream)
162+
}
163+
164+
func (cs *connectionStore) registerConn(pkiID PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection {
165+
conn := newConnection(nil, nil, nil, serverStream)
166+
conn.pkiID = pkiID
167+
conn.logger = cs.logger
168+
cs.pki2Conn[string(pkiID)] = conn
169+
return conn
170+
}
171+
172+
func (cs *connectionStore) closeByPKIid(pkiID PKIidType) {
173+
cs.Lock()
174+
defer cs.Unlock()
175+
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
176+
conn.close()
177+
delete(cs.pki2Conn, string(pkiID))
178+
}
179+
}
180+
181+
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection {
182+
connection := &connection{
183+
cl: cl,
184+
conn: c,
185+
clientStream: cs,
186+
serverStream: ss,
187+
stopFlag: int32(0),
188+
stopChan: make(chan struct{}, 1),
189+
}
190+
191+
return connection
192+
}
193+
194+
type connection struct {
195+
logger *util.Logger // logger
196+
pkiID PKIidType // pkiID of the remote endpoint
197+
handler handler // function to invoke upon a message reception
198+
conn *grpc.ClientConn // gRPC connection to remote endpoint
199+
cl proto.GossipClient // gRPC stub of remote endpoint
200+
clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
201+
serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
202+
stopFlag int32 // indicates whether this connection is in process of stopping
203+
stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
204+
sync.RWMutex // synchronizes access to shared variables
205+
}
206+
207+
func (conn *connection) close() {
208+
if conn.toDie() {
209+
return
210+
}
211+
212+
amIFirst := atomic.CompareAndSwapInt32(&conn.stopFlag, int32(0), int32(1))
213+
if !amIFirst {
214+
return
215+
}
216+
217+
conn.stopChan <- struct{}{}
218+
219+
conn.Lock()
220+
221+
if conn.clientStream != nil {
222+
conn.clientStream.CloseSend()
223+
}
224+
if conn.conn != nil {
225+
conn.conn.Close()
226+
}
227+
228+
conn.Unlock()
229+
230+
}
231+
232+
func (conn *connection) toDie() bool {
233+
return atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
234+
}
235+
236+
func (conn *connection) send(msg *proto.GossipMessage) error {
237+
conn.Lock()
238+
defer conn.Unlock()
239+
240+
if conn.toDie() {
241+
return fmt.Errorf("Connection aborted")
242+
}
243+
244+
if conn.clientStream != nil {
245+
return conn.clientStream.Send(msg)
246+
}
247+
248+
if conn.serverStream != nil {
249+
return conn.serverStream.Send(msg)
250+
}
251+
252+
return fmt.Errorf("Both streams are nil")
253+
}
254+
255+
func (conn *connection) serviceInput() error {
256+
errChan := make(chan error, 1)
257+
msgChan := make(chan *proto.GossipMessage, defRecvBuffSize)
258+
defer close(msgChan)
259+
260+
// Call stream.Recv() asynchronously in readFromStream(),
261+
// and wait for either the Recv() call to end,
262+
// or a signal to close the connection, which exits
263+
// the method and makes the Recv() call to fail in the
264+
// readFromStream() method
265+
go conn.readFromStream(errChan, msgChan)
266+
267+
for !conn.toDie() {
268+
select {
269+
case stop := <-conn.stopChan:
270+
conn.logger.Warning("Closing reading from stream")
271+
conn.stopChan <- stop
272+
return nil
273+
case err := <-errChan:
274+
return err
275+
case msg := <-msgChan:
276+
conn.handler(msg)
277+
}
278+
}
279+
return nil
280+
}
281+
282+
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.GossipMessage) {
283+
defer func() {
284+
recover()
285+
}() // msgChan might be closed
286+
for !conn.toDie() {
287+
stream := conn.getStream()
288+
if stream == nil {
289+
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
290+
errChan <- fmt.Errorf("Stream is nil")
291+
return
292+
}
293+
msg, err := stream.Recv()
294+
if conn.toDie() {
295+
conn.logger.Warning(conn.pkiID, "canceling read because closing")
296+
return
297+
}
298+
if err != nil {
299+
errChan <- err
300+
conn.logger.Warning(conn.pkiID, "Got error, aborting:", err)
301+
return
302+
}
303+
msgChan <- msg
304+
}
305+
}
306+
307+
func (conn *connection) getStream() stream {
308+
conn.Lock()
309+
defer conn.Unlock()
310+
311+
if conn.clientStream != nil && conn.serverStream != nil {
312+
e := "Both client and server stream are not nil, something went wrong"
313+
conn.logger.Error(e)
314+
}
315+
316+
if conn.clientStream != nil {
317+
return conn.clientStream
318+
}
319+
320+
if conn.serverStream != nil {
321+
return conn.serverStream
322+
}
323+
324+
return nil
325+
}
326+
327+
type msgSending struct {
328+
msg *proto.GossipMessage
329+
onErr func(error)
330+
}

0 commit comments

Comments
 (0)