Skip to content

Commit 506e786

Browse files
committed
Fix incorrect channel register
Before receiving a hello message from fanin message channel, we can't get peerID of fanin message channel. So RegisterChannel always gets RegisterChannel(nil, channel). PeerID here is supposed to do something like session management (eg. duplicated peer connection check). But at this time we can't get peerID. The session management is done by class "Impl" with RegisterHandler and DeregisterHandler. Here PeerID is unnecessary, and it will always be nil. Incorrect warning of "Received duplicate connection from nil, switching to new connection" will be emitted everytime there is a new connection. Change-Id: I9f1bf6287576497acefb46a543fd4ac62d062665 Signed-off-by: jiangyaoguo <[email protected]>
1 parent 0f959c0 commit 506e786

File tree

3 files changed

+18
-22
lines changed

3 files changed

+18
-22
lines changed

consensus/helper/handler.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,8 @@ func NewConsensusHandler(coord peer.MessageHandlerCoordinator,
6969
consensusQueueSize = DefaultConsensusQueueSize
7070
}
7171

72-
pe, _ := handler.To()
73-
7472
handler.consenterChan = make(chan *util.Message, consensusQueueSize)
75-
getEngineImpl().consensusFan.RegisterChannel(pe.ID, handler.consenterChan)
73+
getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan)
7674

7775
return handler, nil
7876
}

consensus/util/messagefan.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -38,43 +38,46 @@ type Message struct {
3838

3939
// MessageFan contains the reference to the peer's MessageHandlerCoordinator
4040
type MessageFan struct {
41-
ins map[*pb.PeerID]<-chan *Message
41+
ins []<-chan *Message
4242
out chan *Message
4343
lock sync.Mutex
4444
}
4545

4646
// NewMessageFan will return an initialized MessageFan
4747
func NewMessageFan() *MessageFan {
4848
return &MessageFan{
49-
ins: make(map[*pb.PeerID]<-chan *Message),
49+
ins: []<-chan *Message{},
5050
out: make(chan *Message),
5151
}
5252
}
5353

54-
// RegisterChannel is intended to be invoked by Handler to add a channel to be fan-ed in
55-
func (fan *MessageFan) RegisterChannel(sender *pb.PeerID, channel <-chan *Message) {
54+
// AddFaninChannel is intended to be invoked by Handler to add a channel to be fan-ed in
55+
func (fan *MessageFan) AddFaninChannel(channel <-chan *Message) {
5656
fan.lock.Lock()
5757
defer fan.lock.Unlock()
5858

59-
if _, ok := fan.ins[sender]; ok {
60-
logger.Warningf("Received duplicate connection from %v, switching to new connection", sender)
61-
} else {
62-
logger.Infof("Registering connection from %v", sender)
59+
for _, c := range fan.ins {
60+
if c == channel {
61+
logger.Warningf("Received duplicate connection")
62+
return
63+
}
6364
}
6465

65-
fan.ins[sender] = channel
66+
fan.ins = append(fan.ins, channel)
6667

6768
go func() {
6869
for msg := range channel {
6970
fan.out <- msg
7071
}
7172

72-
logger.Infof("Connection from peer %v terminated", sender)
73-
7473
fan.lock.Lock()
7574
defer fan.lock.Unlock()
7675

77-
delete(fan.ins, sender)
76+
for i, c := range fan.ins {
77+
if c == channel {
78+
fan.ins = append(fan.ins[:i], fan.ins[i+1:]...)
79+
}
80+
}
7881
}()
7982
}
8083

consensus/util/messagefan_test.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@ limitations under the License.
1717
package util
1818

1919
import (
20-
"fmt"
2120
"testing"
2221
"time"
23-
24-
pb "github.com/hyperledger/fabric/protos"
2522
)
2623

2724
func TestFanIn(t *testing.T) {
@@ -32,8 +29,7 @@ func TestFanIn(t *testing.T) {
3229

3330
for i := 0; i < Channels; i++ {
3431
c := make(chan *Message, Messages/2)
35-
pid := &pb.PeerID{Name: fmt.Sprintf("%d", i)}
36-
fh.RegisterChannel(pid, c)
32+
fh.AddFaninChannel(c)
3733
go func() {
3834
for j := 0; j < Messages; j++ {
3935
c <- &Message{}
@@ -67,8 +63,7 @@ func TestFanIn(t *testing.T) {
6763
func TestFanChannelClose(t *testing.T) {
6864
fh := NewMessageFan()
6965
c := make(chan *Message)
70-
pid := &pb.PeerID{Name: "1"}
71-
fh.RegisterChannel(pid, c)
66+
fh.AddFaninChannel(c)
7267
close(c)
7368

7469
for i := 0; i < 100; i++ {

0 commit comments

Comments
 (0)