From 040c0de7db4e5d2b2e4bb6f9156af9b45354740a Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Thu, 21 Sep 2023 09:29:42 -0600 Subject: [PATCH] fix: Revert "feat: Hold connections and transactions for less time" This reverts commit bc8df98894358f7f774ea0cd4e168559c29c31e1. This commit was totally wrong about the motivations for holding transactions throught the duration of jobs. Ths is an embarrassing mistake because it is the crux of the entire Postgres backend. Transactions must be held while jobs are being processed because `FOR UPDATE` requires the transaction that issued the `SELECT * FOR UPDATE ...` query to remain open to hold the lock (https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-ROWS) The reverted commit allowed jobs to be picked up by multiple workers. --- backends/postgres/postgres_backend.go | 94 ++++++++++++++++----------- 1 file changed, 55 insertions(+), 39 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index eb789a8..1d76b71 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -56,7 +56,10 @@ const ( setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0` ) +type contextKey struct{} + var ( + txCtxVarKey contextKey shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") @@ -181,6 +184,18 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption { } } +// txFromContext gets the transaction from a context, if the transaction is already set +func txFromContext(ctx context.Context) (t pgx.Tx, err error) { + var ok bool + if t, ok = ctx.Value(txCtxVarKey).(pgx.Tx); ok { + return + } + + err = ErrNoTransactionInContext + + return +} + // initializeDB initializes the tables, types, and indices necessary to operate Neoq // //nolint:funlen,gocyclo,cyclop @@ -435,7 +450,6 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, // processed at least one more time. // nolint: cyclop func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { - var tx pgx.Tx status := internal.JobStatusProcessed errMsg := "" @@ -450,22 +464,13 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { return fmt.Errorf("error getting job from context: %w", err) } - conn, err := p.pool.Acquire(ctx) - if err != nil { - p.logger.Error("failed to acquire database connection to update job", "error", err) - return - } - defer conn.Release() - - tx, err = conn.Begin(ctx) - if err != nil { - return + var tx pgx.Tx + if tx, err = txFromContext(ctx); err != nil { + return fmt.Errorf("error getting tx from context: %w", err) } - defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed if job.Retries >= job.MaxRetries { err = p.moveToDeadQueue(ctx, tx, job, jobErr) - err = tx.Commit(ctx) return } @@ -478,15 +483,9 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4" _, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID) } - if err != nil { - return - } - err = tx.Commit(ctx) if err != nil { - errMsg := "unable to commit job transaction. retrying this job may dupliate work:" - p.logger.Error(errMsg, "error", err, "job_id", job.ID) - return fmt.Errorf("%s %w", errMsg, err) + return } if time.Until(runAfter) > 0 { @@ -639,9 +638,18 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) { jobsCh = make(chan string) + + conn, err := p.pool.Acquire(ctx) + if err != nil { + p.logger.Error("failed to acquire database connection to listen for pending queue items", err) + return + } + go func(ctx context.Context) { + defer conn.Release() + for { - jobID, err := p.getPendingJobID(ctx, queue) + jobID, err := p.getPendingJobID(ctx, conn, queue) if err != nil { if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) { break @@ -663,12 +671,23 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan // 2. handleJob secondly calls the handler on the job, and finally updates the job's status func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) { var job *jobs.Job - job, err = p.getPendingJob(ctx, jobID) + var tx pgx.Tx + conn, err := p.pool.Acquire(ctx) if err != nil { return } + defer conn.Release() - ctx = withJobContext(ctx, job) + tx, err = conn.Begin(ctx) + if err != nil { + return + } + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed + + job, err = p.getPendingJob(ctx, tx, jobID) + if err != nil { + return + } if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline @@ -677,6 +696,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl return } + ctx = withJobContext(ctx, job) + ctx = context.WithValue(ctx, txCtxVarKey, tx) + // check if the job is being retried and increment retry count accordingly if job.Status != internal.JobStatusNew { job.Retries++ @@ -694,6 +716,13 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl return err } + err = tx.Commit(ctx) + if err != nil { + errMsg := "unable to commit job transaction. retrying this job may dupliate work:" + p.logger.Error(errMsg, "error", err, "job_id", job.ID) + return fmt.Errorf("%s %w", errMsg, err) + } + return nil } @@ -761,14 +790,8 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin conn.Release() } -func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs.Job, err error) { - conn, err := p.pool.Acquire(ctx) - if err != nil { - return - } - defer conn.Release() - - row, err := conn.Query(ctx, PendingJobQuery, jobID) +func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { + row, err := tx.Query(ctx, PendingJobQuery, jobID) if err != nil { return } @@ -781,14 +804,7 @@ func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs. return } -func (p *PgBackend) getPendingJobID(ctx context.Context, queue string) (jobID string, err error) { - conn, err := p.pool.Acquire(ctx) - if err != nil { - p.logger.Error("failed to acquire database connection to listen for pending queue items", err) - return - } - defer conn.Release() - +func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) { err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID) return }