diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 0910625..d6e29ab 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -504,13 +504,20 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, } // updateJob updates the status of jobs with: status, run time, error messages, and retries +// // if the retry count exceeds the maximum number of retries for the job, move the job to the dead jobs queue // -// if `tx`'s underlying connection dies, it results in this function throwing an error and job status inaccurately -// reflecting the status of the job and its number of retries. -// TODO: Handle dropped connections when updating job status in PgBackend -// e.g. acquiring a new connection in the event of connection failure -// nolint: cyclop +// if `tx`'s underlying connection dies while updating job status, the transaction will fail, and the job's original +// status will be reflecting in the database. +// +// The implication of this is that: +// - the job's 'error' field will not reflect any errors the occurred in the handler +// - the job's retry count is not incremented +// - the job's run time will remain its original value +// - the job has its original 'status' +// +// ultimately, this means that any time a database connection is lost while updating job status, then the job will be +// processed at least one more time. func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { status := internal.JobStatusProcessed errMsg := "" @@ -566,13 +573,19 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { func (p *PgBackend) start(ctx context.Context, queue string) (err error) { var h handler.Handler var ok bool + if h, ok = p.handlers[queue]; !ok { return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) } - listenJobChan := p.listen(ctx, queue) // listen for 'new' jobs + listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs + defer close(ready) + pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup* + // wait for the listener to connect and be ready to listen + <-ready + // process all future jobs and retries go func() { p.scheduleFutureJobs(ctx, queue) }() @@ -782,8 +795,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle err = tx.Commit(ctx) if err != nil { - p.logger.Error("unable to commit job transaction. retrying this job may dupliate work", err, "job_id", job.ID) - return fmt.Errorf("unable to commit job transaction. retrying this job may dupliate work: %w", err) + errMsg := "unable to commit job transaction. retrying this job may dupliate work:" + p.logger.Error(errMsg, err, "job_id", job.ID) + return fmt.Errorf("%s %w", errMsg, err) } return nil @@ -793,59 +807,56 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle // TODO: There is currently no handling of listener disconnects in PgBackend. // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. -func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64) { +func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, ready chan bool) { c = make(chan int64) + ready = make(chan bool) go func(ctx context.Context) { - for { - conn, err := p.pool.Acquire(ctx) - if err != nil { - p.logger.Error("unable to acquire new connnection", err) - return - } + conn, err := p.pool.Acquire(ctx) + if err != nil { + p.logger.Error("unable to acquire new listener connnection", err) + return + } + defer p.release(ctx, conn, queue) - // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) - if err != nil { - err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", err) - time.Sleep(time.Second) // don't hammer the db - p.release(ctx, conn, queue) - continue - } + // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected + _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) + if err != nil { + err = fmt.Errorf("unable to configure listener connection: %w", err) + p.logger.Error("unable to configure listener connection", err) + return + } + + // notify start() that we're ready to listen for jobs + ready <- true + for { notification, waitErr := conn.Conn().WaitForNotification(ctx) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { - p.release(ctx, conn, queue) return } p.logger.Error("failed to wait for notification", waitErr) - p.release(ctx, conn, queue) continue } var jobID int64 if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil { p.logger.Error("unable to fetch job", err) - p.release(ctx, conn, queue) continue } // check if Shutdown() has been called if jobID == shutdownJobID { - p.release(ctx, conn, queue) return } c <- jobID - - p.release(ctx, conn, queue) } }(ctx) - return c + return c, ready } func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5c9b6ad..e68444f 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -3,6 +3,7 @@ package postgres_test import ( "context" "errors" + "fmt" "os" "testing" "time" @@ -11,6 +12,7 @@ import ( "github.com/acaloiaro/neoq/backends/postgres" "github.com/acaloiaro/neoq/config" "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/internal" "github.com/acaloiaro/neoq/jobs" ) @@ -75,7 +77,6 @@ func TestBasicJobMultipleQueue(t *testing.T) { const queue2 = "testing2" done := make(chan bool) doneCnt := 0 - defer close(done) var timeoutTimer = time.After(5 * time.Second) @@ -115,7 +116,7 @@ func TestBasicJobMultipleQueue(t *testing.T) { jid, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ - "message": "hello world", + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), }, }) if e != nil || jid == jobs.DuplicateJobID { @@ -125,7 +126,7 @@ func TestBasicJobMultipleQueue(t *testing.T) { jid2, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue2, Payload: map[string]interface{}{ - "message": "hello world", + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), }, }) if e != nil || jid2 == jobs.DuplicateJobID { @@ -146,7 +147,6 @@ results_loop: } } - time.Sleep(time.Second) if err != nil { t.Error(err) }