Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEBUG: [max] add log to debug depth buffer #1870

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
Expand Down Expand Up @@ -41,36 +41,45 @@ type Buffer struct {

// bufferingPeriod is used to buffer the update message before we get the full depth
bufferingPeriod time.Duration

logger *logrus.Entry
}

func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{
fetcher: fetcher,
fetchC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
logger: logrus.NewEntry(logrus.StandardLogger()),
}
}

func (b *Buffer) SetLogger(logger *logrus.Entry) {
b.logger = logger
}

func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d
}

func (b *Buffer) resetSnapshot() {
log.Info("resetting the snapshot")
b.logger.Info("resetting the snapshot")
b.snapshot = nil
b.finalUpdateID = 0
}

// emitFetch emits the fetch signal, and in the next call of AddUpdate, the buffer will try to fetch the snapshot
// if the fetch signal is already emitted, it will be ignored
func (b *Buffer) emitFetch() {
b.logger.Info("emitting fetch signal")
select {
case b.fetchC <- struct{}{}:
default:
}
}

func (b *Buffer) Reset() {
b.logger.Info("resetting this buffer")
b.mu.Lock()
b.resetSnapshot()
b.emitFetch()
Expand All @@ -92,7 +101,7 @@ func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64,
return nil
}

log.Info("setting the snapshot")
b.logger.Info("setting the snapshot")
// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID

Expand Down Expand Up @@ -123,10 +132,12 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg

select {
case <-b.fetchC:
b.logger.Info("fetch signal received")
b.buffer = append(b.buffer, u)
b.resetSnapshot()
b.once.Reset()
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to fetch signal received")
go b.tryFetch()
})
b.mu.Unlock()
Expand All @@ -141,6 +152,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
if b.snapshot == nil {
b.buffer = append(b.buffer, u)
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to no snapshot")
go b.tryFetch()
})
b.mu.Unlock()
Expand All @@ -151,7 +163,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg

// skip older events
if u.FinalUpdateID <= b.finalUpdateID {
log.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID)
b.logger.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID)
b.mu.Unlock()
return nil
}
Expand All @@ -163,6 +175,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
b.resetSnapshot()
b.once.Reset()
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to missing update")
go b.tryFetch()
})

Expand All @@ -175,7 +188,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
u.FirstUpdateID-b.finalUpdateID)
}

log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.logger.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.finalUpdateID = u.FinalUpdateID
b.EmitPush(u)

Expand All @@ -191,7 +204,7 @@ func (b *Buffer) tryFetch() {

err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod)
b.logger.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod)
continue
}

Expand All @@ -206,7 +219,7 @@ func (b *Buffer) fetchAndPush() error {
}

b.mu.Lock()
log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)
b.logger.Infof("fetched depth snapshot, final update id %d", finalUpdateID)

if len(b.buffer) > 0 {
// the snapshot is too early, we should re-fetch the snapshot
Expand Down
2 changes: 2 additions & 0 deletions pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/util"
"github.com/sirupsen/logrus"

"github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/futures"
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "binance", "symbol": e.Symbol, "component": "depthBuffer"}))
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
stream.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
2 changes: 2 additions & 0 deletions pkg/exchange/kucoin/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
Expand Down Expand Up @@ -81,6 +82,7 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return s.exchange.QueryDepth(context.Background(), e.Symbol)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "kucoin", "symbol": e.Symbol, "component": "depthBuffer"}))
s.depthBuffers[e.Symbol] = f
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
Expand Down
5 changes: 4 additions & 1 deletion pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/depth"
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *Stream) handleConnect() {
}

func (s *Stream) handleDisconnect() {
log.Debugf("resetting depth snapshots...")
log.Info("resetting depth snapshots...")
for _, f := range s.depthBuffers {
f.Reset()
}
Expand Down Expand Up @@ -272,6 +273,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "max", "symbol": symbol, "component": "depthBuffer"}))
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand All @@ -287,6 +289,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
// if we receive orderbook event with both asks and bids are empty, it means we need to rebuild this orderbook
shouldReset := len(e.Asks) == 0 && len(e.Bids) == 0
if shouldReset {
log.Infof("resetting %s orderbook due to both empty asks/bids...", e.Market)
f.Reset()
return
}
Expand Down
Loading