Skip to content

Commit

Permalink
[+] switch to VirtualXID from TransactionXID, solves #674 (#675)
Browse files Browse the repository at this point in the history
Turned out issuing `txid_current()` at the beginning of every chain
transaction is creating a session that's sits idle in transaction for
the duration of the entire chain. In current case, with a single task
`{kind == PROGRAM}`, that idle transaction doesn't do anything aside
from pin the `xmin` horizon and block vacuum for 6+ hours.
The same issue occurs for SQL tasks that are `Remote` or `Autonomous`.
  • Loading branch information
pashagolub authored Feb 6, 2025
1 parent da8486a commit 5046c1a
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, c
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`,
task.ChainID, task.TaskID, task.Script, task.Kind,
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Txid,
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid,
task.IgnoreError)
if err != nil {
pge.l.WithError(err).Error("Failed to log chain element execution status")
Expand Down
9 changes: 6 additions & 3 deletions internal/pgengine/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)

// StartTransaction returns transaction object, transaction id and error
func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, txid int64, err error) {
// StartTransaction returns transaction object, virtual transaction id and error
func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, vxid int64, err error) {
if tx, err = pge.ConfigDb.Begin(ctx); err != nil {
return
}
err = tx.QueryRow(ctx, "SELECT txid_current()").Scan(&txid)
err = tx.QueryRow(ctx, `SELECT
(split_part(virtualxid, '/', 1)::int8 << 32) | split_part(virtualxid, '/', 2)::int8
FROM pg_locks
WHERE pid = pg_backend_pid() AND virtualxid IS NOT NULL`).Scan(&vxid)
return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestStartTransaction(t *testing.T) {
assert.Error(t, err)

mockPool.ExpectBegin()
mockPool.ExpectQuery("SELECT txid_current()").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42)))
mockPool.ExpectQuery("SELECT").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42)))
tx, txid, err := pge.StartTransaction(ctx)
assert.NotNil(t, tx)
assert.EqualValues(t, 42, txid)
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ChainTask struct {
Timeout int `db:"timeout"` // in milliseconds
StartedAt time.Time `db:"-"`
Duration int64 `db:"-"` // in microseconds
Txid int64 `db:"-"`
Vxid int64 `db:"-"`
}

func (task *ChainTask) IsRemote() bool {
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,20 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
var ChainTasks []pgengine.ChainTask
var bctx context.Context
var cancel context.CancelFunc
var txid int64
var vxid int64

chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
if cancel != nil {
defer cancel()
}

chainL := sch.l.WithField("chain", chain.ChainID)
tx, txid, err := sch.pgengine.StartTransaction(chainCtx)
tx, vxid, err := sch.pgengine.StartTransaction(chainCtx)
if err != nil {
chainL.WithError(err).Error("Cannot start transaction")
return
}
chainL = chainL.WithField("txid", txid)
chainL = chainL.WithField("vxid", vxid)

err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID)
if err != nil {
Expand All @@ -217,7 +217,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
/* now we can loop through every element of the task chain */
for _, task := range ChainTasks {
task.ChainID = chain.ChainID
task.Txid = txid
task.Vxid = vxid
l := chainL.WithField("task", task.TaskID)
l.Info("Starting task")
taskCtx := log.WithLogger(chainCtx, l)
Expand Down

0 comments on commit 5046c1a

Please sign in to comment.