From eb430e944446542dd3ffe097d594d98aefb153e1 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 18 Mar 2023 09:52:48 -0700 Subject: [PATCH] Acquire a new connection for every queue listener (#38) --- .github/workflows/goreleaser.yml | 2 +- Makefile | 2 +- backends/memory/memory_backend.go | 4 +- backends/memory/memory_backend_test.go | 4 +- backends/postgres/postgres_backend.go | 129 +++++++++++++-------- backends/postgres/postgres_backend_test.go | 85 +++++++++++++- jobs/jobs.go | 3 +- 7 files changed, 173 insertions(+), 56 deletions(-) diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index e62e438..2c531c2 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -20,7 +20,7 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} WITH_V: true - DEFAULT_BUMP: patch + DEFAULT_BUMP: minor - name: Set up Go uses: actions/setup-go@v3 diff --git a/Makefile b/Makefile index 4fa15cf..51764d8 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,7 @@ test-watch: .PHONY: lint lint: @clear - @golangci-lint run . + @golangci-lint run .PHONY: lint-watch lint-watch: install-reflex diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 9ddb8ca..4f60828 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -86,7 +86,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e } if job.Queue == "" { - err = errors.New("this job does not specify a Queue. Please specify a queue") + err = jobs.ErrNoQueueSpecified return } @@ -264,7 +264,7 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) { queueChan = qc.(chan *jobs.Job) queueChan <- j } else { - m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), errors.New("no queue processor configured")) + m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), handler.ErrNoHandlerForQueue) } }(job) } diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index 4edd164..8c7d17c 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -193,7 +193,7 @@ func TestFutureJobScheduling(t *testing.T) { } func TestCron(t *testing.T) { - const cron = "* * * * * *" + const cronSpec = "* * * * * *" ctx := context.TODO() nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { @@ -212,7 +212,7 @@ func TestCron(t *testing.T) { handler.Concurrency(1), ) - err = nq.StartCron(ctx, cron, h) + err = nq.StartCron(ctx, cronSpec, h) if err != nil { t.Error(err) } diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e0be4fa..0910625 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -54,11 +54,12 @@ const ( type contextKey struct{} var ( - txCtxVarKey contextKey - ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") - ErrDuplicateJobID = errors.New("duplicate job id") - ErrNoQueue = errors.New("no queue specified") - ErrNoTransactionInContext = errors.New("context does not have a Tx set") + txCtxVarKey contextKey + shutdownJobID int64 = -1 // job ID announced when triggering a shutdown + shutdownAnnouncementAllowance = 100 // ms + ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") + ErrDuplicateJobID = errors.New("duplicate job id") + ErrNoTransactionInContext = errors.New("context does not have a Tx set") ) // PgBackend is a Postgres-based Neoq backend @@ -67,7 +68,6 @@ type PgBackend struct { config *config.Config logger logging.Logger cron *cron.Cron - listenConn *pgx.Conn mu *sync.Mutex // mutex to protect mutating state on a pgWorker pool *pgxpool.Pool futureJobs map[int64]time.Time // map of future job IDs to their due time @@ -155,7 +155,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err pb = p - return + return pb, nil } // WithTransactionTimeout sets the time that PgBackend's transactions may be idle before its underlying connection is @@ -344,7 +344,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, er } if job.Queue == "" { - err = ErrNoQueue + err = jobs.ErrNoQueueSpecified return } @@ -418,15 +418,17 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha if err = p.cron.AddFunc(cronSpec, func() { _, err := p.Enqueue(ctx, &jobs.Job{Queue: queue}) if err != nil { + if errors.Is(err, context.Canceled) { + return + } + p.logger.Error("error queueing cron job", err) } }); err != nil { return fmt.Errorf("error adding cron: %w", err) } - err = p.Start(ctx, queue, h) - - return + return p.Start(ctx, queue, h) } // SetLogger sets this backend's logger @@ -435,13 +437,20 @@ func (p *PgBackend) SetLogger(logger logging.Logger) { } func (p *PgBackend) Shutdown(ctx context.Context) { - p.pool.Close() // also closes the hijacked listenConn - p.cron.Stop() + for queue := range p.handlers { + p.announceJob(ctx, queue, shutdownJobID) + } + + // wait for the announcement to process + time.Sleep(time.Duration(shutdownAnnouncementAllowance) * time.Millisecond) for _, f := range p.cancelFuncs { f() } + p.pool.Close() + p.cron.Stop() + p.cancelFuncs = nil } @@ -560,26 +569,11 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { if h, ok = p.handlers[queue]; !ok { return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) } - conn, err := p.pool.Acquire(ctx) - if err != nil { - return - } - - // use a single connection to listen for jobs on all queues - // TODO: Give more thought to the implications of hijacking connections to LISTEN on in PgBackend - // should this connecton not come from the pool, to avoid tainting it with connections that don't have an idle in - // transaction time out set? - p.mu.Lock() - if p.listenConn == nil { - p.listenConn = conn.Hijack() - } - p.mu.Unlock() listenJobChan := p.listen(ctx, queue) // listen for 'new' jobs pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup* // process all future jobs and retries - // TODO consider performance implications of `scheduleFutureJobs` in PgBackend go func() { p.scheduleFutureJobs(ctx, queue) }() for i := 0; i < h.Concurrency; i++ { @@ -598,7 +592,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { } if err != nil { - if errors.Is(err, pgx.ErrNoRows) { + if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) { err = nil continue } @@ -625,26 +619,31 @@ func (p *PgBackend) removeFutureJob(jobID int64) { // initFutureJobs is intended to be run once to initialize the list of future jobs that must be monitored for // execution. it should be run only during system startup. -func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) { +func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) { rows, err := p.pool.Query(ctx, FutureJobQuery, queue) if err != nil { - p.logger.Error("error fetching future jobs list", err) + p.logger.Error("failed to fetch future jobs list", err) return } var id int64 var runAfter time.Time - _, _ = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { + _, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { p.mu.Lock() p.futureJobs[id] = runAfter p.mu.Unlock() return nil }) + + return } // scheduleFutureJobs announces future jobs using NOTIFY on an interval func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { - p.initFutureJobs(ctx, queue) + err := p.initFutureJobs(ctx, queue) + if err != nil { + return + } // check for new future jobs on an interval ticker := time.NewTicker(p.config.JobCheckInterval) @@ -718,12 +717,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan for { jobID, err := p.getPendingJobID(ctx, conn, queue) if err != nil { - if !errors.Is(err, pgx.ErrNoRows) { - p.logger.Error("failed to fetch pending job", err, "job_id", jobID) - } else { - // done fetching pending jobs + if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) { break } + + p.logger.Error("failed to fetch pending job", err, "job_id", jobID) } else { jobsCh <- jobID } @@ -774,6 +772,10 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle err = p.updateJob(ctx, jobErr) if err != nil { + if errors.Is(err, context.Canceled) { + return + } + err = fmt.Errorf("error updating job status: %w", err) return err } @@ -792,43 +794,74 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64) { - var err error c = make(chan int64) - // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = p.listenConn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) - if err != nil { - err = fmt.Errorf("unable to create database connection for listener: %w", err) - p.logger.Error("unablet o create database connection for listener", err) - return - } - go func(ctx context.Context) { for { - notification, waitErr := p.listenConn.WaitForNotification(ctx) + conn, err := p.pool.Acquire(ctx) + if err != nil { + p.logger.Error("unable to acquire new connnection", err) + return + } + + // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected + _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) + if err != nil { + err = fmt.Errorf("unable to configure listener connection: %w", err) + p.logger.Error("unable to configure listener connection", err) + time.Sleep(time.Second) // don't hammer the db + p.release(ctx, conn, queue) + continue + } + + notification, waitErr := conn.Conn().WaitForNotification(ctx) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { + p.release(ctx, conn, queue) return } p.logger.Error("failed to wait for notification", waitErr) - time.Sleep(1 * time.Second) + p.release(ctx, conn, queue) continue } var jobID int64 if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil { p.logger.Error("unable to fetch job", err) + p.release(ctx, conn, queue) continue } + // check if Shutdown() has been called + if jobID == shutdownJobID { + p.release(ctx, conn, queue) + return + } + c <- jobID + + p.release(ctx, conn, queue) } }(ctx) return c } +func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { + query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %s", p.config.IdleTransactionTimeout, queue) + _, err := conn.Exec(ctx, query) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + p.logger.Error("unable to reset connection config before release", err) + } + + conn.Release() +} + func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job *jobs.Job, err error) { row, err := tx.Query(ctx, PendingJobQuery, jobID) if err != nil { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 41fd36a..5c9b6ad 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -19,7 +19,7 @@ var errPeriodicTimeout = errors.New("timed out waiting for periodic job") // TestBasicJobProcessing tests that the postgres backend is able to process the most basic jobs with the // most basic configuration. func TestBasicJobProcessing(t *testing.T) { - queue := "testing" + const queue = "testing" done := make(chan bool) defer close(done) @@ -69,6 +69,89 @@ func TestBasicJobProcessing(t *testing.T) { } } +// TestBasicJobMultipleQueue tests that the postgres backend is able to process jobs on multiple queues +func TestBasicJobMultipleQueue(t *testing.T) { + const queue = "testing" + const queue2 = "testing2" + done := make(chan bool) + doneCnt := 0 + defer close(done) + + var timeoutTimer = time.After(5 * time.Second) + + var connString = os.Getenv("TEST_DATABASE_URL") + if connString == "" { + t.Skip("Skipping: TEST_DATABASE_URL not set") + return + } + + ctx := context.TODO() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), config.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + h2 := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } + + err = nq.Start(ctx, queue2, h2) + if err != nil { + t.Error(err) + } + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + jid2, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue2, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + if e != nil || jid2 == jobs.DuplicateJobID { + t.Error(e) + } + +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case <-done: + doneCnt++ + if doneCnt == 2 { + break results_loop + } + } + } + + time.Sleep(time.Second) + if err != nil { + t.Error(err) + } +} + func TestCron(t *testing.T) { done := make(chan bool, 1) defer close(done) diff --git a/jobs/jobs.go b/jobs/jobs.go index 2634860..ea6ed19 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -12,7 +12,8 @@ import ( ) var ( - ErrJobTimeout = errors.New("timed out waiting for job(s)") + ErrJobTimeout = errors.New("timed out waiting for job(s)") + ErrNoQueueSpecified = errors.New("this job does not specify a queue. please specify a queue") ) const (