Skip to content

Commit b7166b7

Browse files
committed
[FAB-2913] Prepend channel name to log output
https://jira.hyperledger.org/browse/FAB-2913 As some of our recent debugging sessions have shown, the channel name is a useful bit of info to have on almost every log message that the orderer writes. This changeset delivers that. This changeset also: 1. Switches all the Panic calls to their logger-enabled equivalent, in accordance to what we're starting to do throughout the `orderer` directory. 2. Lowercases all returned error messages, following the guideline posted in [1]. [1] https://github.com/golang/go/wiki/CodeReviewComments#error-strings Change-Id: Iebaeac7b622e2766606f66c01358edfe65b9bd44 Signed-off-by: Kostas Christidis <[email protected]>
1 parent 4515d66 commit b7166b7

10 files changed

+61
-49
lines changed

orderer/kafka/broker.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,23 @@ func newBroker(brokers []string, cp ChainPartition) (Broker, error) {
5353
}
5454

5555
if connectedBroker == nil {
56-
return nil, fmt.Errorf("Failed to connect to any of the given brokers (%v) for metadata request", brokers)
56+
return nil, fmt.Errorf("failed to connect to any of the given brokers (%v) for metadata request", brokers)
5757
}
5858
logger.Debugf("Connected to broker %s", connectedBroker.Addr())
5959

6060
// Get metadata for the topic that corresponds to this chain
6161
metadata, err := connectedBroker.GetMetadata(&sarama.MetadataRequest{Topics: []string{cp.Topic()}})
6262
if err != nil {
63-
return nil, fmt.Errorf("Failed to get metadata for topic %s: %s", cp, err)
63+
return nil, fmt.Errorf("failed to get metadata for topic %s: %s", cp, err)
6464
}
6565

6666
// Get the leader broker for this chain partition
6767
if (cp.Partition() >= 0) && (cp.Partition() < int32(len(metadata.Topics[0].Partitions))) {
6868
leaderBrokerID := metadata.Topics[0].Partitions[cp.Partition()].Leader
69-
logger.Debugf("Leading broker for chain %s is broker ID %d", cp, leaderBrokerID)
69+
// ATTN: If we ever switch to more than one partition per topic, the message
70+
// below should be updated to print `cp` (i.e. Topic/Partition) instead of
71+
// `cp.Topic()`.
72+
logger.Debugf("[channel: %s] Leading broker: %d", cp.Topic(), leaderBrokerID)
7073
for _, availableBroker := range metadata.Brokers {
7174
if availableBroker.ID() == leaderBrokerID {
7275
leaderBroker = availableBroker
@@ -76,15 +79,18 @@ func newBroker(brokers []string, cp ChainPartition) (Broker, error) {
7679
}
7780

7881
if leaderBroker == nil {
79-
return nil, fmt.Errorf("Can't find leader for chain %s", cp)
82+
// ATTN: If we ever switch to more than one partition per topic, the message
83+
// below should be updated to print `cp` (i.e. Topic/Partition) instead of
84+
// `cp.Topic()`.
85+
return nil, fmt.Errorf("[channel: %s] cannot find leader", cp.Topic())
8086
}
8187

8288
// Connect to broker
8389
if err := leaderBroker.Open(nil); err != nil {
84-
return nil, fmt.Errorf("Failed to connect ho Kafka broker: %s", err)
90+
return nil, fmt.Errorf("failed to connect to Kafka broker: %s", err)
8591
}
8692
if connected, err := leaderBroker.Connected(); !connected {
87-
return nil, fmt.Errorf("Failed to connect to Kafka broker: %s", err)
93+
return nil, fmt.Errorf("failed to connect to Kafka broker: %s", err)
8894
}
8995

9096
return &brokerImpl{broker: leaderBroker}, nil

orderer/kafka/broker_mock_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func mockNewBroker(t *testing.T, cp ChainPartition) (Broker, error) {
4747

4848
broker := sarama.NewBroker(mockBroker.Addr())
4949
if err := broker.Open(nil); err != nil {
50-
return nil, fmt.Errorf("Cannot connect to mock broker: %s", err)
50+
return nil, fmt.Errorf("cannot connect to mock broker: %s", err)
5151
}
5252

5353
return &mockBrockerImpl{

orderer/kafka/consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.
4747
parent: parent,
4848
partition: partition,
4949
}
50-
logger.Debugf("Created new consumer for session (partition %s, beginning offset %d)", cp, offset)
50+
logger.Debugf("[channel: %s] Created new consumer for session (beginning offset: %d)", cp.Topic(), offset)
5151
return c, nil
5252
}
5353

orderer/kafka/log_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ import (
2121
)
2222

2323
func init() {
24-
logging.SetLevel(logging.INFO, "") // Silence debug-level outputs when testing
24+
logging.SetLevel(logging.DEBUG, "") // Silence debug-level outputs when testing
2525
}

orderer/kafka/orderer.go

+35-27
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,16 @@ type consenterImpl struct {
8080
// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which
8181
// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains.
8282
func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
83-
return newChain(co, cs, getLastOffsetPersisted(metadata)), nil
83+
return newChain(co, cs, getLastOffsetPersisted(metadata, cs.ChainID())), nil
8484
}
8585

86-
func getLastOffsetPersisted(metadata *cb.Metadata) int64 {
86+
func getLastOffsetPersisted(metadata *cb.Metadata, chainID string) int64 {
8787
if metadata.Value != nil {
8888
// Extract orderer-related metadata from the tip of the ledger first
8989
kafkaMetadata := &ab.KafkaMetadata{}
9090
if err := proto.Unmarshal(metadata.Value, kafkaMetadata); err != nil {
91-
panic("Ledger may be corrupted: cannot unmarshal orderer metadata in most recent block")
91+
logger.Panicf("[channel: %s] Ledger may be corrupted:"+
92+
"cannot unmarshal orderer metadata in most recent block", chainID)
9293
}
9394
return kafkaMetadata.LastOffsetPersisted
9495
}
@@ -104,7 +105,7 @@ func getLastOffsetPersisted(metadata *cb.Metadata) int64 {
104105
// be satisfied by both the actual and the mock object and will allow
105106
// us to retrieve these constructors.
106107
func newChain(consenter testableConsenter, support multichain.ConsenterSupport, lastOffsetPersisted int64) *chainImpl {
107-
logger.Debug("Starting chain with last persisted offset:", lastOffsetPersisted)
108+
logger.Debugf("[channel: %s] Starting chain with last persisted offset: %d", support.ChainID(), lastOffsetPersisted)
108109
return &chainImpl{
109110
consenter: consenter,
110111
support: support,
@@ -163,18 +164,19 @@ type chainImpl struct {
163164
func (ch *chainImpl) Start() {
164165
// 1. Post the CONNECT message to prevent panicking that occurs
165166
// when seeking on a partition that hasn't been created yet.
166-
logger.Debug("Posting the CONNECT message...")
167+
logger.Debugf("[channel: %s] Posting the CONNECT message...", ch.support.ChainID())
167168
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newConnectMessage())); err != nil {
168-
logger.Criticalf("Couldn't post CONNECT message to %s: %s", ch.partition, err)
169+
logger.Criticalf("[channel: %s] Cannot post CONNECT message: %s", ch.support.ChainID(), err)
169170
close(ch.exitChan)
170171
ch.halted = true
171172
return
172173
}
174+
logger.Debugf("[channel: %s] CONNECT message posted successfully", ch.support.ChainID())
173175

174176
// 2. Set up the listener/consumer for this partition.
175177
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.consenter.tlsConfig(), ch.partition, ch.lastOffsetPersisted+1)
176178
if err != nil {
177-
logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err)
179+
logger.Criticalf("[channel: %s] Cannot retrieve requested offset from Kafka cluster: %s", ch.support.ChainID(), err)
178180
close(ch.exitChan)
179181
ch.halted = true
180182
return
@@ -204,7 +206,9 @@ func (ch *chainImpl) Halt() {
204206
// This construct is useful because it allows Halt() to be
205207
// called multiple times w/o panicking. Recal that a receive
206208
// from a closed channel returns (the zero value) immediately.
209+
logger.Debugf("[channel: %s] Halting of chain requested again", ch.support.ChainID())
207210
default:
211+
logger.Debugf("[channel: %s] Halting of chain requested", ch.support.ChainID())
208212
close(ch.exitChan)
209213
}
210214
}
@@ -217,11 +221,12 @@ func (ch *chainImpl) Enqueue(env *cb.Envelope) bool {
217221
return false
218222
}
219223

220-
logger.Debug("Enqueueing:", env)
224+
logger.Debugf("[channel: %s] Enqueueing envelope...", ch.support.ChainID())
221225
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(env)))); err != nil {
222-
logger.Errorf("Couldn't post to %s: %s", ch.partition, err)
226+
logger.Errorf("[channel: %s] cannot enqueue envelope: %s", ch.support.ChainID(), err)
223227
return false
224228
}
229+
logger.Debugf("[channel: %s] Envelope enqueued successfully", ch.support.ChainID())
225230

226231
return !ch.halted // If ch.halted has been set to true while sending, we should return false
227232
}
@@ -242,49 +247,52 @@ func (ch *chainImpl) loop() {
242247
case in := <-ch.consumer.Recv():
243248
if err := proto.Unmarshal(in.Value, msg); err != nil {
244249
// This shouldn't happen, it should be filtered at ingress
245-
logger.Critical("Unable to unmarshal consumed message:", err)
250+
logger.Criticalf("[channel: %s] Unable to unmarshal consumed message:", ch.support.ChainID(), err)
246251
}
247-
logger.Debug("Received:", msg)
252+
logger.Debugf("[channel: %s] Successfully unmarshalled consumed message. Inspecting type...", ch.support.ChainID())
248253
switch msg.Type.(type) {
249254
case *ab.KafkaMessage_Connect:
250-
logger.Debug("It's a connect message - ignoring")
255+
logger.Debugf("[channel: %s] It's a connect message - ignoring", ch.support.ChainID())
251256
continue
252257
case *ab.KafkaMessage_TimeToCut:
253258
ttcNumber = msg.GetTimeToCut().BlockNumber
254-
logger.Debug("It's a time-to-cut message for block", ttcNumber)
259+
logger.Debugf("[channel: %s] It's a time-to-cut message for block %d", ch.support.ChainID(), ttcNumber)
255260
if ttcNumber == ch.lastCutBlock+1 {
256261
timer = nil
257-
logger.Debug("Nil'd the timer")
262+
logger.Debugf("[channel: %s] Nil'd the timer", ch.support.ChainID())
258263
batch, committers := ch.support.BlockCutter().Cut()
259264
if len(batch) == 0 {
260-
logger.Warningf("Got right time-to-cut message (%d) but no pending requests - this might indicate a bug", ch.lastCutBlock)
261-
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
265+
logger.Warningf("[channel: %s] Got right time-to-cut message (for block %d),"+
266+
" no pending requests though; this might indicate a bug", ch.support.ChainID(), ch.lastCutBlock)
267+
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
262268
return
263269
}
264270
block := ch.support.CreateNextBlock(batch)
265271
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
266272
ch.support.WriteBlock(block, committers, encodedLastOffsetPersisted)
267273
ch.lastCutBlock++
268-
logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock)
274+
logger.Debugf("[channel: %s] Proper time-to-cut received, just cut block %d",
275+
ch.support.ChainID(), ch.lastCutBlock)
269276
continue
270277
} else if ttcNumber > ch.lastCutBlock+1 {
271-
logger.Warningf("Got larger time-to-cut message (%d) than allowed (%d) - this might indicate a bug", ttcNumber, ch.lastCutBlock+1)
272-
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
278+
logger.Warningf("[channel: %s] Got larger time-to-cut message (%d) than allowed (%d)"+
279+
" - this might indicate a bug", ch.support.ChainID(), ttcNumber, ch.lastCutBlock+1)
280+
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
273281
return
274282
}
275-
logger.Debug("Ignoring stale time-to-cut-message for", ch.lastCutBlock)
283+
logger.Debugf("[channel: %s] Ignoring stale time-to-cut-message for block %d", ch.support.ChainID(), ch.lastCutBlock)
276284
case *ab.KafkaMessage_Regular:
277285
env := new(cb.Envelope)
278286
if err := proto.Unmarshal(msg.GetRegular().Payload, env); err != nil {
279287
// This shouldn't happen, it should be filtered at ingress
280-
logger.Critical("Unable to unmarshal consumed regular message:", err)
288+
logger.Criticalf("[channel: %s] Unable to unmarshal consumed regular message:", ch.support.ChainID(), err)
281289
continue
282290
}
283291
batches, committers, ok := ch.support.BlockCutter().Ordered(env)
284-
logger.Debugf("Ordering results: batches: %v, ok: %v", batches, ok)
292+
logger.Debugf("[channel: %s] Ordering results: batches: %v, ok: %v", ch.support.ChainID(), batches, ok)
285293
if ok && len(batches) == 0 && timer == nil {
286294
timer = time.After(ch.batchTimeout)
287-
logger.Debugf("Just began %s batch timer", ch.batchTimeout.String())
295+
logger.Debugf("[channel: %s] Just began %s batch timer", ch.support.ChainID(), ch.batchTimeout.String())
288296
continue
289297
}
290298
// If !ok, batches == nil, so this will be skipped
@@ -293,21 +301,21 @@ func (ch *chainImpl) loop() {
293301
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
294302
ch.support.WriteBlock(block, committers[i], encodedLastOffsetPersisted)
295303
ch.lastCutBlock++
296-
logger.Debug("Batch filled, just cut block", ch.lastCutBlock)
304+
logger.Debugf("[channel: %s] Batch filled, just cut block %d", ch.support.ChainID(), ch.lastCutBlock)
297305
}
298306
if len(batches) > 0 {
299307
timer = nil
300308
}
301309
}
302310
case <-timer:
303-
logger.Debugf("Time-to-cut block %d timer expired", ch.lastCutBlock+1)
311+
logger.Debugf("[channel: %s] Time-to-cut block %d timer expired", ch.support.ChainID(), ch.lastCutBlock+1)
304312
timer = nil
305313
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+1))); err != nil {
306-
logger.Errorf("Couldn't post to %s: %s", ch.partition, err)
314+
logger.Errorf("[channel: %s] Cannot post time-to-cut message: %s", ch.support.ChainID(), err)
307315
// Do not exit
308316
}
309317
case <-ch.exitChan: // When Halt() is called
310-
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
318+
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
311319
return
312320
}
313321
}

orderer/kafka/orderer_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -593,15 +593,15 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {
593593

594594
func TestGetLastOffsetPersistedEmpty(t *testing.T) {
595595
expected := sarama.OffsetOldest - 1
596-
actual := getLastOffsetPersisted(&cb.Metadata{})
596+
actual := getLastOffsetPersisted(&cb.Metadata{}, "")
597597
if actual != expected {
598598
t.Fatalf("Expected last offset %d, got %d", expected, actual)
599599
}
600600
}
601601

602602
func TestGetLastOffsetPersistedRight(t *testing.T) {
603603
expected := int64(100)
604-
actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})})
604+
actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})}, "")
605605
if actual != expected {
606606
t.Fatalf("Expected last offset %d, got %d", expected, actual)
607607
}
@@ -655,7 +655,7 @@ func TestKafkaConsenterRestart(t *testing.T) {
655655
logger.Fatalf("Error extracting orderer metadata for chain %x: %s", cs.ChainIDVal, err)
656656
}
657657

658-
lastPersistedOffset = getLastOffsetPersisted(metadata)
658+
lastPersistedOffset = getLastOffsetPersisted(metadata, ch.support.ChainID())
659659
nextProducedOffset = lastPersistedOffset + 1
660660

661661
co = mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)

orderer/kafka/producer.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package kafka
1818

1919
import (
20-
"fmt"
2120
"time"
2221

2322
"github.com/Shopify/sarama"
@@ -48,7 +47,7 @@ loop:
4847
for {
4948
select {
5049
case <-panicTick.C:
51-
panic(fmt.Errorf("Failed to create Kafka producer: %v", err))
50+
logger.Panicf("Failed to create Kafka producer: %v", err)
5251
case <-repeatTick.C:
5352
logger.Debug("Connecting to Kafka cluster:", brokers)
5453
p, err = sarama.NewSyncProducer(brokers, brokerConfig)
@@ -72,12 +71,12 @@ func (p *producerImpl) Send(cp ChainPartition, payload []byte) error {
7271
prt, ofs, err := p.producer.SendMessage(newProducerMessage(cp, payload))
7372
if prt != cp.Partition() {
7473
// If this happens, something's up with the partitioner
75-
logger.Warningf("Blob destined for partition %d, but posted to %d instead", cp.Partition(), prt)
74+
logger.Warningf("[channel: %s] Blob destined for partition %d, but posted to %d instead", cp.Topic(), cp.Partition(), prt)
7675
}
7776
if err == nil {
78-
logger.Debugf("Forwarded blob with offset number %d to chain partition %s on the Kafka cluster", ofs, cp)
77+
logger.Debugf("[channel %s] Posted blob to the Kafka cluster (offset number: %d)", cp.Topic(), ofs)
7978
} else {
80-
logger.Infof("Failed to send message to chain partition %s on the Kafka cluster: %s", cp, err)
79+
logger.Infof("[channel %s] Failed to post blob to the Kafka cluster: %s", cp.Topic(), err)
8180
}
8281
return err
8382
}

orderer/kafka/producer_mock_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (mp *mockProducerImpl) init(cp ChainPartition, offset int64) {
8787
// on that chain partition gives you blob #offset.
8888
mp.testFillWithBlocks(cp, offset-1)
8989
} else {
90-
panic(fmt.Errorf("Out of range offset (seek number) given to producer: %d", offset))
90+
logger.Panicf("Out of range offset (seek number) given to producer: %d", offset)
9191
}
9292
}
9393

orderer/kafka/util.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka
1919
import (
2020
"crypto/tls"
2121
"crypto/x509"
22-
"fmt"
2322
"strconv"
2423

2524
"github.com/Shopify/sarama"
@@ -37,13 +36,13 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int
3736
// create public/private key pair structure
3837
keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey))
3938
if err != nil {
40-
panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err))
39+
logger.Panicf("Unable to decode public/private key pair: %s", err)
4140
}
4241
// create root CA pool
4342
rootCAs := x509.NewCertPool()
4443
for _, certificate := range tlsConfig.RootCAs {
4544
if !rootCAs.AppendCertsFromPEM([]byte(certificate)) {
46-
panic(fmt.Errorf("Unable to decode certificate. Error: %v", err))
45+
logger.Panic("Unable to parse the root certificate authority certificates (Kafka.Tls.RootCAs)")
4746
}
4847
}
4948
brokerConfig.Net.TLS.Config = &tls.Config{

orderer/multichain/manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
104104
ledgerResources,
105105
consenters,
106106
signer)
107-
logger.Infof("Starting with system channel: %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
107+
logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
108108
ml.chains[string(chainID)] = chain
109109
ml.systemChannelID = chainID
110110
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built

0 commit comments

Comments
 (0)