From bc8df98894358f7f774ea0cd4e168559c29c31e1 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 2 Sep 2023 14:15:17 +0200 Subject: [PATCH] feat: Hold connections and transactions for less time Previously pg backend would hold connections/transactions during the execution of job handlers. So, e.g. if a job ran for 30 seconds, a transaction and its connection would be held for the entirety of those 30 seconds. The reason it was originally implemented this way is because originally the vision was to pass every job a `tx` that could be used throughout the job. If the job failed, its `tx` would be rolled back in neoq so that user would not have to handle rollbacks and connection handling themselves, were they to perform database operations in their jobs. But ultimately, that is a lot of hand-holding at the cost of a lot of resources, for a use case that is both unlikely and does not work as soon as the user's application and neoq tables are in different databases. This commit reverses that poor decision and opts to improve performance over giving users an ergonomic transaction handling API. --- backends/postgres/postgres_backend.go | 94 +++++++++++---------------- 1 file changed, 39 insertions(+), 55 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 03e2cd4..0777747 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -59,10 +59,7 @@ const ( setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0` ) -type contextKey struct{} - var ( - txCtxVarKey contextKey shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") @@ -184,18 +181,6 @@ func WithTransactionTimeout(txTimeout int) config.Option { } } -// txFromContext gets the transaction from a context, if the transaction is already set -func txFromContext(ctx context.Context) (t pgx.Tx, err error) { - var ok bool - if t, ok = ctx.Value(txCtxVarKey).(pgx.Tx); ok { - return - } - - err = ErrNoTransactionInContext - - return -} - // initializeDB initializes the tables, types, and indices necessary to operate Neoq // //nolint:funlen,gocyclo,cyclop @@ -453,6 +438,7 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, // processed at least one more time. // nolint: cyclop func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { + var tx pgx.Tx status := internal.JobStatusProcessed errMsg := "" @@ -467,13 +453,22 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { return fmt.Errorf("error getting job from context: %w", err) } - var tx pgx.Tx - if tx, err = txFromContext(ctx); err != nil { - return fmt.Errorf("error getting tx from context: %w", err) + conn, err := p.pool.Acquire(ctx) + if err != nil { + p.logger.Error("failed to acquire database connection to update job", "error", err) + return + } + defer conn.Release() + + tx, err = conn.Begin(ctx) + if err != nil { + return } + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed if job.Retries >= job.MaxRetries { err = p.moveToDeadQueue(ctx, tx, job, jobErr) + err = tx.Commit(ctx) return } @@ -486,11 +481,17 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4" _, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID) } - if err != nil { return } + err = tx.Commit(ctx) + if err != nil { + errMsg := "unable to commit job transaction. retrying this job may dupliate work:" + p.logger.Error(errMsg, "error", err, "job_id", job.ID) + return fmt.Errorf("%s %w", errMsg, err) + } + if time.Until(runAfter) > 0 { p.mu.Lock() p.futureJobs[fmt.Sprint(job.ID)] = runAfter @@ -642,18 +643,9 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) { jobsCh = make(chan string) - - conn, err := p.pool.Acquire(ctx) - if err != nil { - p.logger.Error("failed to acquire database connection to listen for pending queue items", err) - return - } - go func(ctx context.Context) { - defer conn.Release() - for { - jobID, err := p.getPendingJobID(ctx, conn, queue) + jobID, err := p.getPendingJobID(ctx, queue) if err != nil { if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) { break @@ -675,23 +667,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan // 2. handleJob secondly calls the handler on the job, and finally updates the job's status func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) { var job *jobs.Job - var tx pgx.Tx - conn, err := p.pool.Acquire(ctx) - if err != nil { - return - } - defer conn.Release() - - tx, err = conn.Begin(ctx) + job, err = p.getPendingJob(ctx, jobID) if err != nil { return } - defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - job, err = p.getPendingJob(ctx, tx, jobID) - if err != nil { - return - } + ctx = withJobContext(ctx, job) if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline @@ -700,9 +681,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl return } - ctx = withJobContext(ctx, job) - ctx = context.WithValue(ctx, txCtxVarKey, tx) - // check if the job is being retried and increment retry count accordingly if job.Status != internal.JobStatusNew { job.Retries++ @@ -720,13 +698,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl return err } - err = tx.Commit(ctx) - if err != nil { - errMsg := "unable to commit job transaction. retrying this job may dupliate work:" - p.logger.Error(errMsg, "error", err, "job_id", job.ID) - return fmt.Errorf("%s %w", errMsg, err) - } - return nil } @@ -794,8 +765,14 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin conn.Release() } -func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { - row, err := tx.Query(ctx, PendingJobQuery, jobID) +func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs.Job, err error) { + conn, err := p.pool.Acquire(ctx) + if err != nil { + return + } + defer conn.Release() + + row, err := conn.Query(ctx, PendingJobQuery, jobID) if err != nil { return } @@ -808,7 +785,14 @@ func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) return } -func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) { +func (p *PgBackend) getPendingJobID(ctx context.Context, queue string) (jobID string, err error) { + conn, err := p.pool.Acquire(ctx) + if err != nil { + p.logger.Error("failed to acquire database connection to listen for pending queue items", err) + return + } + defer conn.Release() + err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID) return }