From 6c2995048f14e9dd5f85eecc134cf7205e309249 Mon Sep 17 00:00:00 2001 From: chiahung Date: Tue, 17 Dec 2024 16:09:02 +0800 Subject: [PATCH] DEBUG: [max] add log to debug depth buffer --- pkg/depth/buffer.go | 27 ++++++++++++++++++++------- pkg/exchange/binance/stream.go | 2 ++ pkg/exchange/kucoin/stream.go | 2 ++ pkg/exchange/max/stream.go | 5 ++++- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index 6d95b6753e..684a96154f 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -5,7 +5,7 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -41,6 +41,8 @@ type Buffer struct { // bufferingPeriod is used to buffer the update message before we get the full depth bufferingPeriod time.Duration + + logger *logrus.Entry } func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer { @@ -48,15 +50,20 @@ func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer { fetcher: fetcher, fetchC: make(chan struct{}, 1), bufferingPeriod: bufferingPeriod, + logger: logrus.NewEntry(logrus.StandardLogger()), } } +func (b *Buffer) SetLogger(logger *logrus.Entry) { + b.logger = logger +} + func (b *Buffer) SetUpdateTimeout(d time.Duration) { b.updateTimeout = d } func (b *Buffer) resetSnapshot() { - log.Info("resetting the snapshot") + b.logger.Info("resetting the snapshot") b.snapshot = nil b.finalUpdateID = 0 } @@ -64,6 +71,7 @@ func (b *Buffer) resetSnapshot() { // emitFetch emits the fetch signal, and in the next call of AddUpdate, the buffer will try to fetch the snapshot // if the fetch signal is already emitted, it will be ignored func (b *Buffer) emitFetch() { + b.logger.Info("emitting fetch signal") select { case b.fetchC <- struct{}{}: default: @@ -71,6 +79,7 @@ func (b *Buffer) emitFetch() { } func (b *Buffer) Reset() { + b.logger.Info("resetting this buffer") b.mu.Lock() b.resetSnapshot() b.emitFetch() @@ -92,7 +101,7 @@ func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, return nil } - log.Info("setting the snapshot") + b.logger.Info("setting the snapshot") // set the final update ID so that we will know if there is an update missing b.finalUpdateID = finalUpdateID @@ -123,10 +132,12 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg select { case <-b.fetchC: + b.logger.Info("fetch signal received") b.buffer = append(b.buffer, u) b.resetSnapshot() b.once.Reset() b.once.Do(func() { + b.logger.Info("try fetching the snapshot due to fetch signal received") go b.tryFetch() }) b.mu.Unlock() @@ -141,6 +152,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg if b.snapshot == nil { b.buffer = append(b.buffer, u) b.once.Do(func() { + b.logger.Info("try fetching the snapshot due to no snapshot") go b.tryFetch() }) b.mu.Unlock() @@ -151,7 +163,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg // skip older events if u.FinalUpdateID <= b.finalUpdateID { - log.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID) + b.logger.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID) b.mu.Unlock() return nil } @@ -163,6 +175,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg b.resetSnapshot() b.once.Reset() b.once.Do(func() { + b.logger.Info("try fetching the snapshot due to missing update") go b.tryFetch() }) @@ -175,7 +188,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg u.FirstUpdateID-b.finalUpdateID) } - log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID) + b.logger.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID) b.finalUpdateID = u.FinalUpdateID b.EmitPush(u) @@ -191,7 +204,7 @@ func (b *Buffer) tryFetch() { err := b.fetchAndPush() if err != nil { - log.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod) + b.logger.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod) continue } @@ -206,7 +219,7 @@ func (b *Buffer) fetchAndPush() error { } b.mu.Lock() - log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID) + b.logger.Infof("fetched depth snapshot, final update id %d", finalUpdateID) if len(b.buffer) > 0 { // the snapshot is too early, we should re-fetch the snapshot diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 27c7566e57..c6e9371b68 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -7,6 +7,7 @@ import ( "github.com/c9s/bbgo/pkg/depth" "github.com/c9s/bbgo/pkg/util" + "github.com/sirupsen/logrus" "github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2/futures" @@ -91,6 +92,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie log.Infof("fetching %s depth...", e.Symbol) return ex.QueryDepth(context.Background(), e.Symbol) }, 3*time.Second) + f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "binance", "symbol": e.Symbol, "component": "depthBuffer"})) f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { stream.EmitBookSnapshot(snapshot) for _, u := range updates { diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index ddc5cb626b..4b216d5887 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/depth" "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" @@ -81,6 +82,7 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) { f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { return s.exchange.QueryDepth(context.Background(), e.Symbol) }, 3*time.Second) + f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "kucoin", "symbol": e.Symbol, "component": "depthBuffer"})) s.depthBuffers[e.Symbol] = f f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { if valid, err := snapshot.IsValid(); !valid { diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 3e0c00b468..1cd0617fce 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/depth" max "github.com/c9s/bbgo/pkg/exchange/max/maxapi" @@ -197,7 +198,7 @@ func (s *Stream) handleConnect() { } func (s *Stream) handleDisconnect() { - log.Debugf("resetting depth snapshots...") + log.Info("resetting depth snapshots...") for _, f := range s.depthBuffers { f.Reset() } @@ -272,6 +273,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { // the depth of websocket orderbook event is 50 by default, so we use 50 as limit here return ex.QueryDepth(context.Background(), e.Market, bookDepth) }, 3*time.Second) + f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "max", "symbol": symbol, "component": "depthBuffer"})) f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { s.EmitBookSnapshot(snapshot) for _, u := range updates { @@ -287,6 +289,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { // if we receive orderbook event with both asks and bids are empty, it means we need to rebuild this orderbook shouldReset := len(e.Asks) == 0 && len(e.Bids) == 0 if shouldReset { + log.Infof("resetting %s orderbook due to both empty asks/bids...", e.Market) f.Reset() return }