Skip to content

Commit

Permalink
Update docs regarding lost connections while updating jobs (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Mar 18, 2023
1 parent eb430e9 commit 4639e53
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 35 deletions.
73 changes: 42 additions & 31 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,20 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job,
}

// updateJob updates the status of jobs with: status, run time, error messages, and retries
//
// if the retry count exceeds the maximum number of retries for the job, move the job to the dead jobs queue
//
// if `tx`'s underlying connection dies, it results in this function throwing an error and job status inaccurately
// reflecting the status of the job and its number of retries.
// TODO: Handle dropped connections when updating job status in PgBackend
// e.g. acquiring a new connection in the event of connection failure
// nolint: cyclop
// if `tx`'s underlying connection dies while updating job status, the transaction will fail, and the job's original
// status will be reflecting in the database.
//
// The implication of this is that:
// - the job's 'error' field will not reflect any errors the occurred in the handler
// - the job's retry count is not incremented
// - the job's run time will remain its original value
// - the job has its original 'status'
//
// ultimately, this means that any time a database connection is lost while updating job status, then the job will be
// processed at least one more time.
func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
status := internal.JobStatusProcessed
errMsg := ""
Expand Down Expand Up @@ -566,13 +573,19 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
var h handler.Handler
var ok bool

if h, ok = p.handlers[queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
}

listenJobChan := p.listen(ctx, queue) // listen for 'new' jobs
listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs
defer close(ready)

pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup*

// wait for the listener to connect and be ready to listen
<-ready

// process all future jobs and retries
go func() { p.scheduleFutureJobs(ctx, queue) }()

Expand Down Expand Up @@ -782,8 +795,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle

err = tx.Commit(ctx)
if err != nil {
p.logger.Error("unable to commit job transaction. retrying this job may dupliate work", err, "job_id", job.ID)
return fmt.Errorf("unable to commit job transaction. retrying this job may dupliate work: %w", err)
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
}

return nil
Expand All @@ -793,59 +807,56 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle
// TODO: There is currently no handling of listener disconnects in PgBackend.
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64) {
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, ready chan bool) {
c = make(chan int64)
ready = make(chan bool)

go func(ctx context.Context) {
for {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new connnection", err)
return
}
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new listener connnection", err)
return
}
defer p.release(ctx, conn, queue)

// set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected
_, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", err)
time.Sleep(time.Second) // don't hammer the db
p.release(ctx, conn, queue)
continue
}
// set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected
_, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", err)
return
}

// notify start() that we're ready to listen for jobs
ready <- true

for {
notification, waitErr := conn.Conn().WaitForNotification(ctx)
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
p.release(ctx, conn, queue)
return
}

p.logger.Error("failed to wait for notification", waitErr)
p.release(ctx, conn, queue)
continue
}

var jobID int64
if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil {
p.logger.Error("unable to fetch job", err)
p.release(ctx, conn, queue)
continue
}

// check if Shutdown() has been called
if jobID == shutdownJobID {
p.release(ctx, conn, queue)
return
}

c <- jobID

p.release(ctx, conn, queue)
}
}(ctx)

return c
return c, ready
}

func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) {
Expand Down
8 changes: 4 additions & 4 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres_test
import (
"context"
"errors"
"fmt"
"os"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/acaloiaro/neoq/backends/postgres"
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
)

Expand Down Expand Up @@ -75,7 +77,6 @@ func TestBasicJobMultipleQueue(t *testing.T) {
const queue2 = "testing2"
done := make(chan bool)
doneCnt := 0
defer close(done)

var timeoutTimer = time.After(5 * time.Second)

Expand Down Expand Up @@ -115,7 +116,7 @@ func TestBasicJobMultipleQueue(t *testing.T) {
jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
"message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)),
},
})
if e != nil || jid == jobs.DuplicateJobID {
Expand All @@ -125,7 +126,7 @@ func TestBasicJobMultipleQueue(t *testing.T) {
jid2, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue2,
Payload: map[string]interface{}{
"message": "hello world",
"message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)),
},
})
if e != nil || jid2 == jobs.DuplicateJobID {
Expand All @@ -146,7 +147,6 @@ results_loop:
}
}

time.Sleep(time.Second)
if err != nil {
t.Error(err)
}
Expand Down

0 comments on commit 4639e53

Please sign in to comment.