@@ -17,6 +17,8 @@ limitations under the License.
17
17
package kafka
18
18
19
19
import (
20
+ "time"
21
+
20
22
"github.com/Shopify/sarama"
21
23
"github.com/golang/protobuf/proto"
22
24
"github.com/hyperledger/fabric/orderer/localconfig"
@@ -93,6 +95,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport)
93
95
consenter : consenter ,
94
96
support : support ,
95
97
partition : newChainPartition (support .ChainID (), rawPartition ),
98
+ batchTimeout : support .SharedConfig ().BatchTimeout (),
96
99
lastProcessed : sarama .OffsetOldest - 1 , // TODO This should be retrieved by ConsenterSupport; also see note in loop() below
97
100
producer : consenter .prodFunc ()(support .SharedConfig ().KafkaBrokers (), consenter .kafkaVersion (), consenter .retryOptions ()),
98
101
halted : false , // Redundant as the default value for booleans is false but added for readability
@@ -123,7 +126,9 @@ type chainImpl struct {
123
126
support multichain.ConsenterSupport
124
127
125
128
partition ChainPartition
129
+ batchTimeout time.Duration
126
130
lastProcessed int64
131
+ lastCutBlock uint64
127
132
128
133
producer Producer
129
134
consumer Consumer
@@ -199,46 +204,82 @@ func (ch *chainImpl) Enqueue(env *cb.Envelope) bool {
199
204
200
205
func (ch * chainImpl ) loop () {
201
206
msg := new (ab.KafkaMessage )
207
+ var timer <- chan time.Time
208
+ var ttcNumber uint64
202
209
203
210
defer close (ch .haltedChan )
204
211
defer ch .producer .Close ()
205
212
defer func () { ch .halted = true }()
206
213
defer ch .consumer .Close ()
207
214
208
- // TODO Add support for time-based block cutting
209
-
210
215
for {
211
216
select {
212
217
case in := <- ch .consumer .Recv ():
213
- logger .Debug ("Received:" , in )
214
218
if err := proto .Unmarshal (in .Value , msg ); err != nil {
215
219
// This shouldn't happen, it should be filtered at ingress
216
220
logger .Critical ("Unable to unmarshal consumed message:" , err )
217
221
}
218
- logger .Debug ("Unmarshaled to :" , msg )
222
+ logger .Debug ("Received :" , msg )
219
223
switch msg .Type .(type ) {
220
- case * ab.KafkaMessage_Connect , * ab. KafkaMessage_TimeToCut :
221
- logger .Debugf ( "Ignoring message" )
224
+ case * ab.KafkaMessage_Connect :
225
+ logger .Debug ( "It's a connect message - ignoring " )
222
226
continue
227
+ case * ab.KafkaMessage_TimeToCut :
228
+ ttcNumber = msg .GetTimeToCut ().BlockNumber
229
+ logger .Debug ("It's a time-to-cut message for block" , ttcNumber )
230
+ if ttcNumber == ch .lastCutBlock + 1 {
231
+ timer = nil
232
+ logger .Debug ("Nil'd the timer" )
233
+ batch , committers := ch .support .BlockCutter ().Cut ()
234
+ if len (batch ) == 0 {
235
+ logger .Warningf ("Got right time-to-cut message (%d) but no pending requests - this might indicate a bug" , ch .lastCutBlock )
236
+ logger .Infof ("Consenter for chain %s exiting" , ch .partition .Topic ())
237
+ return
238
+ }
239
+ block := ch .support .CreateNextBlock (batch )
240
+ ch .support .WriteBlock (block , committers )
241
+ ch .lastCutBlock ++
242
+ logger .Debug ("Proper time-to-cut received, just cut block" , ch .lastCutBlock )
243
+ continue
244
+ } else if ttcNumber > ch .lastCutBlock + 1 {
245
+ logger .Warningf ("Got larger time-to-cut message (%d) than allowed (%d) - this might indicate a bug" , ttcNumber , ch .lastCutBlock + 1 )
246
+ logger .Infof ("Consenter for chain %s exiting" , ch .partition .Topic ())
247
+ return
248
+ }
249
+ logger .Debug ("Ignoring stale time-to-cut-message for" , ch .lastCutBlock )
223
250
case * ab.KafkaMessage_Regular :
224
251
env := new (cb.Envelope )
225
252
if err := proto .Unmarshal (msg .GetRegular ().Payload , env ); err != nil {
226
253
// This shouldn't happen, it should be filtered at ingress
227
- logger .Critical ("Unable to unmarshal consumed message:" , err )
254
+ logger .Critical ("Unable to unmarshal consumed regular message:" , err )
228
255
continue
229
256
}
230
257
batches , committers , ok := ch .support .BlockCutter ().Ordered (env )
231
258
logger .Debugf ("Ordering results: batches: %v, ok: %v" , batches , ok )
232
- if ok && len (batches ) == 0 {
259
+ if ok && len (batches ) == 0 && timer == nil {
260
+ timer = time .After (ch .batchTimeout )
261
+ logger .Debugf ("Just began %s batch timer" , ch .batchTimeout .String ())
233
262
continue
234
263
}
235
264
// If !ok, batches == nil, so this will be skipped
236
265
for i , batch := range batches {
237
266
block := ch .support .CreateNextBlock (batch )
238
267
ch .support .WriteBlock (block , committers [i ])
268
+ ch .lastCutBlock ++
269
+ logger .Debug ("Batch filled, just cut block" , ch .lastCutBlock )
270
+ }
271
+ if len (batches ) > 0 {
272
+ timer = nil
239
273
}
240
274
}
241
- case <- ch .exitChan : // when Halt() is called
275
+ case <- timer :
276
+ logger .Debugf ("Time-to-cut block %d timer expired" , ch .lastCutBlock + 1 )
277
+ timer = nil
278
+ if err := ch .producer .Send (ch .partition , utils .MarshalOrPanic (newTimeToCutMessage (ch .lastCutBlock + 1 ))); err != nil {
279
+ logger .Errorf ("Couldn't post to %s: %s" , ch .partition , err )
280
+ // Do not exit
281
+ }
282
+ case <- ch .exitChan : // When Halt() is called
242
283
logger .Infof ("Consenter for chain %s exiting" , ch .partition .Topic ())
243
284
return
244
285
}
0 commit comments