@@ -21,34 +21,39 @@ import (
21
21
22
22
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
23
23
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
24
+ "github.com/hyperledger/fabric/orderer/common/configtx"
24
25
"github.com/hyperledger/fabric/orderer/rawledger"
26
+
27
+ "github.com/golang/protobuf/proto"
25
28
)
26
29
27
30
type broadcastServer struct {
28
- queueSize int
29
- batchSize int
30
- batchTimeout time.Duration
31
- rl rawledger.Writer
32
- filter * broadcastfilter.RuleSet
33
- sendChan chan * ab.BroadcastMessage
34
- exitChan chan struct {}
31
+ queueSize int
32
+ batchSize int
33
+ batchTimeout time.Duration
34
+ rl rawledger.Writer
35
+ filter * broadcastfilter.RuleSet
36
+ configManager configtx.Manager
37
+ sendChan chan * ab.BroadcastMessage
38
+ exitChan chan struct {}
35
39
}
36
40
37
- func newBroadcastServer (queueSize , batchSize int , batchTimeout time.Duration , rl rawledger.Writer ) * broadcastServer {
38
- bs := newPlainBroadcastServer (queueSize , batchSize , batchTimeout , rl )
41
+ func newBroadcastServer (queueSize , batchSize int , batchTimeout time.Duration , rl rawledger.Writer , filters * broadcastfilter. RuleSet , configManager configtx. Manager ) * broadcastServer {
42
+ bs := newPlainBroadcastServer (queueSize , batchSize , batchTimeout , rl , filters , configManager )
39
43
go bs .main ()
40
44
return bs
41
45
}
42
46
43
- func newPlainBroadcastServer (queueSize , batchSize int , batchTimeout time.Duration , rl rawledger.Writer ) * broadcastServer {
47
+ func newPlainBroadcastServer (queueSize , batchSize int , batchTimeout time.Duration , rl rawledger.Writer , filters * broadcastfilter. RuleSet , configManager configtx. Manager ) * broadcastServer {
44
48
bs := & broadcastServer {
45
- queueSize : queueSize ,
46
- batchSize : batchSize ,
47
- batchTimeout : batchTimeout ,
48
- rl : rl ,
49
- filter : broadcastfilter .NewRuleSet ([]broadcastfilter.Rule {broadcastfilter .EmptyRejectRule , broadcastfilter .AcceptRule }),
50
- sendChan : make (chan * ab.BroadcastMessage ),
51
- exitChan : make (chan struct {}),
49
+ queueSize : queueSize ,
50
+ batchSize : batchSize ,
51
+ batchTimeout : batchTimeout ,
52
+ rl : rl ,
53
+ filter : filters ,
54
+ configManager : configManager ,
55
+ sendChan : make (chan * ab.BroadcastMessage ),
56
+ exitChan : make (chan struct {}),
52
57
}
53
58
return bs
54
59
}
@@ -59,41 +64,64 @@ func (bs *broadcastServer) halt() {
59
64
60
65
func (bs * broadcastServer ) main () {
61
66
var curBatch []* ab.BroadcastMessage
62
- outer:
67
+ var timer <- chan time.Time
68
+
69
+ cutBatch := func () {
70
+ bs .rl .Append (curBatch , nil )
71
+ curBatch = nil
72
+ timer = nil
73
+ }
74
+
63
75
for {
64
- timer := time .After (bs .batchTimeout )
65
- for {
66
- select {
67
- case msg := <- bs .sendChan :
68
- // The messages must be filtered a second time in case configuration has changed since the message was received
69
- action , _ := bs .filter .Apply (msg )
70
- switch action {
71
- case broadcastfilter .Accept :
72
- curBatch = append (curBatch , msg )
73
- if len (curBatch ) < bs .batchSize {
74
- continue
75
- }
76
+ select {
77
+ case msg := <- bs .sendChan :
78
+ // The messages must be filtered a second time in case configuration has changed since the message was received
79
+ action , _ := bs .filter .Apply (msg )
80
+ switch action {
81
+ case broadcastfilter .Accept :
82
+ curBatch = append (curBatch , msg )
83
+
84
+ if len (curBatch ) >= bs .batchSize {
76
85
logger .Debugf ("Batch size met, creating block" )
77
- case broadcastfilter .Forward :
78
- logger .Debugf ("Ignoring message because it was not accepted by a filter" )
79
- default :
80
- // TODO add support for other cases, unreachable for now
81
- logger .Fatalf ("NOT IMPLEMENTED YET" )
86
+ cutBatch ()
87
+ } else if len (curBatch ) == 1 {
88
+ // If this is the first request in a batch, start the batch timer
89
+ timer = time .After (bs .batchTimeout )
82
90
}
83
- case <- timer :
84
- if len (curBatch ) == 0 {
85
- continue outer
91
+ case broadcastfilter .Reconfigure :
92
+ // TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch
93
+ newConfig := & ab.ConfigurationEnvelope {}
94
+ if err := proto .Unmarshal (msg .Data , newConfig ); err != nil {
95
+ logger .Errorf ("A change was flagged as configuration, but could not be unmarshaled: %v" , err )
96
+ continue
86
97
}
87
- logger .Debugf ("Batch timer expired, creating block" )
88
- case <- bs .exitChan :
89
- logger .Debugf ("Exiting" )
90
- return
98
+ err := bs .configManager .Apply (newConfig )
99
+ if err != nil {
100
+ logger .Warningf ("A configuration change made it through the ingress filter but could not be included in a batch: %v" , err )
101
+ continue
102
+ }
103
+
104
+ logger .Debugf ("Configuration change applied successfully, committing previous block and configuration block" )
105
+ cutBatch ()
106
+ bs .rl .Append ([]* ab.BroadcastMessage {msg }, nil )
107
+ case broadcastfilter .Reject :
108
+ fallthrough
109
+ case broadcastfilter .Forward :
110
+ logger .Debugf ("Ignoring message because it was not accepted by a filter" )
111
+ default :
112
+ logger .Fatalf ("Received an unknown rule response: %v" , action )
91
113
}
92
- break
114
+ case <- timer :
115
+ if len (curBatch ) == 0 {
116
+ logger .Warningf ("Batch timer expired with no pending requests, this might indicate a bug" )
117
+ continue
118
+ }
119
+ logger .Debugf ("Batch timer expired, creating block" )
120
+ cutBatch ()
121
+ case <- bs .exitChan :
122
+ logger .Debugf ("Exiting" )
123
+ return
93
124
}
94
-
95
- bs .rl .Append (curBatch , nil )
96
- curBatch = nil
97
125
}
98
126
}
99
127
@@ -139,6 +167,8 @@ func (b *broadcaster) queueBroadcastMessages(srv ab.AtomicBroadcast_BroadcastSer
139
167
action , _ := b .bs .filter .Apply (msg )
140
168
141
169
switch action {
170
+ case broadcastfilter .Reconfigure :
171
+ fallthrough
142
172
case broadcastfilter .Accept :
143
173
select {
144
174
case b .queue <- msg :
@@ -151,8 +181,7 @@ func (b *broadcaster) queueBroadcastMessages(srv ab.AtomicBroadcast_BroadcastSer
151
181
case broadcastfilter .Reject :
152
182
err = srv .Send (& ab.BroadcastResponse {ab .Status_BAD_REQUEST })
153
183
default :
154
- // TODO add support for other cases, unreachable for now
155
- logger .Fatalf ("NOT IMPLEMENTED YET" )
184
+ logger .Fatalf ("Unknown filter action :%v" , action )
156
185
}
157
186
158
187
if err != nil {
0 commit comments