Skip to content

Commit

Permalink
fix: Revert "feat: Hold connections and transactions for less time"
Browse files Browse the repository at this point in the history
This reverts commit bc8df98.

This commit was totally wrong about the motivations for holding transactions throught the duration of jobs.

Ths is an embarrassing mistake because it is the crux of the entire Postgres backend. Transactions must be held while jobs are being processed because `FOR UPDATE` requires the transaction that issued the `SELECT * FOR UPDATE ...` query to remain open to hold the lock (https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-ROWS)

The reverted commit allowed jobs to be picked up by multiple workers.
  • Loading branch information
acaloiaro committed Sep 22, 2023
1 parent dbda88a commit 040c0de
Showing 1 changed file with 55 additions and 39 deletions.
94 changes: 55 additions & 39 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ const (
setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0`
)

type contextKey struct{}

var (
txCtxVarKey contextKey
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
Expand Down Expand Up @@ -181,6 +184,18 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption {
}
}

// txFromContext gets the transaction from a context, if the transaction is already set
func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
var ok bool
if t, ok = ctx.Value(txCtxVarKey).(pgx.Tx); ok {
return
}

err = ErrNoTransactionInContext

return
}

// initializeDB initializes the tables, types, and indices necessary to operate Neoq
//
//nolint:funlen,gocyclo,cyclop
Expand Down Expand Up @@ -435,7 +450,6 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job,
// processed at least one more time.
// nolint: cyclop
func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
var tx pgx.Tx
status := internal.JobStatusProcessed
errMsg := ""

Expand All @@ -450,22 +464,13 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
return fmt.Errorf("error getting job from context: %w", err)
}

conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to update job", "error", err)
return
}
defer conn.Release()

tx, err = conn.Begin(ctx)
if err != nil {
return
var tx pgx.Tx
if tx, err = txFromContext(ctx); err != nil {
return fmt.Errorf("error getting tx from context: %w", err)
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

if job.Retries >= job.MaxRetries {
err = p.moveToDeadQueue(ctx, tx, job, jobErr)
err = tx.Commit(ctx)
return
}

Expand All @@ -478,15 +483,9 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4"
_, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID)
}
if err != nil {
return
}

err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "error", err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
return
}

if time.Until(runAfter) > 0 {
Expand Down Expand Up @@ -639,9 +638,18 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) {
jobsCh = make(chan string)

conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", err)
return
}

go func(ctx context.Context) {
defer conn.Release()

for {
jobID, err := p.getPendingJobID(ctx, queue)
jobID, err := p.getPendingJobID(ctx, conn, queue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
Expand All @@ -663,12 +671,23 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// 2. handleJob secondly calls the handler on the job, and finally updates the job's status
func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) {
var job *jobs.Job
job, err = p.getPendingJob(ctx, jobID)
var tx pgx.Tx
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
}
defer conn.Release()

ctx = withJobContext(ctx, job)
tx, err = conn.Begin(ctx)
if err != nil {
return
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

job, err = p.getPendingJob(ctx, tx, jobID)
if err != nil {
return
}

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
Expand All @@ -677,6 +696,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
return
}

ctx = withJobContext(ctx, job)
ctx = context.WithValue(ctx, txCtxVarKey, tx)

// check if the job is being retried and increment retry count accordingly
if job.Status != internal.JobStatusNew {
job.Retries++
Expand All @@ -694,6 +716,13 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
return err
}

err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "error", err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
}

return nil
}

Expand Down Expand Up @@ -761,14 +790,8 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
conn.Release()
}

func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs.Job, err error) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
}
defer conn.Release()

row, err := conn.Query(ctx, PendingJobQuery, jobID)
func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, PendingJobQuery, jobID)
if err != nil {
return
}
Expand All @@ -781,14 +804,7 @@ func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs.
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, queue string) (jobID string, err error) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", err)
return
}
defer conn.Release()

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
return
}
Expand Down

0 comments on commit 040c0de

Please sign in to comment.