From eb36ed6926aa9f10d83860dae378a4ef490a6e9b Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 20 Dec 2023 21:54:32 +0800 Subject: [PATCH 1/3] xdepthmaker: remove the shared trade collector and order store, add mutex for covered position --- pkg/strategy/xdepthmaker/strategy.go | 59 +++++++++++----------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 912a44d802..ad2e9a37f2 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -12,7 +12,6 @@ import ( "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -48,14 +47,9 @@ type CrossExchangeMarketMakingStrategy struct { Position *types.Position `json:"position,omitempty" persistence:"position"` ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"` + mu sync.Mutex MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor - - // orderStore is a shared order store between the maker session and the hedge session - orderStore *core.OrderStore - - // tradeCollector is a shared trade collector between the maker session and the hedge session - tradeCollector *core.TradeCollector } func (s *CrossExchangeMarketMakingStrategy) Initialize( @@ -129,14 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // bbgo.Sync(ctx, s) }) - // global order store - s.orderStore = core.NewOrderStore(s.Position.Symbol) - s.orderStore.BindStream(hedgeSession.UserDataStream) - s.orderStore.BindStream(makerSession.UserDataStream) - - // global trade collector - s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore) - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { + coveredFunc := func(trade types.Trade, profit, netProfit fixedpoint.Value) { c := trade.PositionChange() // sync covered position @@ -148,11 +135,13 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // 2) short position -> increase short position if trade.Exchange == s.hedgeSession.ExchangeName { // TODO: make this atomic + s.mu.Lock() s.CoveredPosition = s.CoveredPosition.Add(c) + s.mu.Unlock() } - }) - s.tradeCollector.BindStream(s.hedgeSession.UserDataStream) - s.tradeCollector.BindStream(s.makerSession.UserDataStream) + } + s.MakerOrderExecutor.TradeCollector().OnTrade(coveredFunc) + s.HedgeOrderExecutor.TradeCollector().OnTrade(coveredFunc) return nil } @@ -339,11 +328,7 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) if s.RecoverTrade { - s.tradeCollector.OnRecover(func(trade types.Trade) { - bbgo.Notify("Recovered trade", trade) - }) - - go s.runTradeRecover(ctx) + // go s.runTradeRecover(ctx) } s.authedC = make(chan struct{}, 2) @@ -373,7 +358,7 @@ func (s *Strategy) CrossRun( defer fullReplenishTicker.Stop() // clean up the previous open orders - if err := s.cleanUpOpenOrders(ctx); err != nil { + if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil { log.WithError(err).Errorf("error cleaning up open orders") } @@ -426,10 +411,10 @@ func (s *Strategy) CrossRun( // // For negative position: // uncover position = -5 - -3 (covered position) = -2 - s.tradeCollector.Process() + s.HedgeOrderExecutor.TradeCollector().Process() + s.MakerOrderExecutor.TradeCollector().Process() position := s.Position.GetBase() - uncoverPosition := position.Sub(s.CoveredPosition) absPos := uncoverPosition.Abs() if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 { @@ -550,7 +535,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) - createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{ + _, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Market: s.hedgeMarket, Symbol: s.Symbol, Type: types.OrderTypeMarket, @@ -564,14 +549,16 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { return } - s.orderStore.Add(createdOrders...) - // if the hedge is on sell side, then we should add positive position switch side { case types.SideTypeSell: + s.mu.Lock() s.CoveredPosition = s.CoveredPosition.Add(quantity) + s.mu.Unlock() case types.SideTypeBuy: + s.mu.Lock() s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg()) + s.mu.Unlock() } } @@ -597,11 +584,11 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { if s.RecoverTrade { startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) - if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { log.WithError(err).Errorf("query trades error") } - if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { log.WithError(err).Errorf("query trades error") } } @@ -853,17 +840,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) { return } - createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...) + _, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...) if err != nil { log.WithError(err).Errorf("order error: %s", err.Error()) return } - - s.orderStore.Add(createdOrders...) } -func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error { - openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.makerSession.Exchange, s.Symbol) +func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error { + openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol) if err != nil { return err } @@ -875,7 +860,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error { log.Infof("found existing open orders:") types.OrderSlice(openOrders).Print() - if err := s.makerSession.Exchange.CancelOrders(ctx, openOrders...); err != nil { + if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil { return err } From 58321e8aa529295750bd5a62de45f7a5bd4190cb Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 20 Dec 2023 22:20:40 +0800 Subject: [PATCH 2/3] xdepthmaker: update instance id format --- pkg/strategy/xdepthmaker/strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index ad2e9a37f2..4ff76b85df 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -218,7 +218,7 @@ func (s *Strategy) ID() string { } func (s *Strategy) InstanceID() string { - return fmt.Sprintf("%s:%s", ID, s.Symbol) + return fmt.Sprintf("%s:%s:%s-%s", ID, s.Symbol, s.MakerExchange, s.HedgeExchange) } func (s *Strategy) Initialize() error { From 3ba16215901b74480b762068ba4e8274f3aa4055 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 20 Dec 2023 22:28:20 +0800 Subject: [PATCH 3/3] xdepthmaker: simplify covered handler registration --- pkg/strategy/xdepthmaker/strategy.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 4ff76b85df..26c24ebea1 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -123,7 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // bbgo.Sync(ctx, s) }) - coveredFunc := func(trade types.Trade, profit, netProfit fixedpoint.Value) { + s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { c := trade.PositionChange() // sync covered position @@ -133,15 +133,12 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // buy trade -> positive delta -> // 1) short position -> reduce short position // 2) short position -> increase short position - if trade.Exchange == s.hedgeSession.ExchangeName { - // TODO: make this atomic - s.mu.Lock() - s.CoveredPosition = s.CoveredPosition.Add(c) - s.mu.Unlock() - } - } - s.MakerOrderExecutor.TradeCollector().OnTrade(coveredFunc) - s.HedgeOrderExecutor.TradeCollector().OnTrade(coveredFunc) + + // TODO: make this atomic + s.mu.Lock() + s.CoveredPosition = s.CoveredPosition.Add(c) + s.mu.Unlock() + }) return nil }