From 6ea1bb692f9f987bfd4561a2cfb780a8d8008da9 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 2 Sep 2023 12:55:56 +0200 Subject: [PATCH] feat: Improve postgres duplicate job detection performance Previously, neoq would query to see if new jobs' (fingerprint, status) tuple were unique. This change adds a unique index on (fingerprint, status) and simply allows unique key constraint violations to be returned from `Enqueue` instead. This prevents one additional query for every new queue item. --- ...301_make_fingerprint_index_unique.down.sql | 2 + ...05301_make_fingerprint_index_unique.up.sql | 3 ++ backends/postgres/postgres_backend.go | 49 ++++++------------- backends/postgres/postgres_backend_test.go | 48 ++++++++++++++++++ go.mod | 1 + go.sum | 2 + 6 files changed, 71 insertions(+), 34 deletions(-) create mode 100644 backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.down.sql create mode 100644 backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.up.sql diff --git a/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.down.sql b/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.down.sql new file mode 100644 index 0000000..744c2ca --- /dev/null +++ b/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.down.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS neoq_jobs_fingerprint_idx ON neoq_jobs (fingerprint, status); +DROP INDEX IF EXISTS neoq_jobs_fingerprint_unique_idx; diff --git a/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.up.sql b/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.up.sql new file mode 100644 index 0000000..9025c99 --- /dev/null +++ b/backends/postgres/migrations/20230902105301_make_fingerprint_index_unique.up.sql @@ -0,0 +1,3 @@ +--- This unique partial index prevents multiple unprocessed jobs with the same payload from being queued +CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (fingerprint, status) WHERE NOT (status = 'processed'); +DROP INDEX IF EXISTS neoq_jobs_fingerprint_idx; \ No newline at end of file diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 67c290e..03e2cd4 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -19,7 +19,9 @@ import ( _ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/iancoleman/strcase" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" "github.com/robfig/cron" @@ -64,7 +66,7 @@ var ( shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") - ErrDuplicateJobID = errors.New("duplicate job id") + ErrDuplicateJob = errors.New("duplicate job") ErrNoTransactionInContext = errors.New("context does not have a Tx set") ) @@ -278,31 +280,33 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } if job.Queue == "" { - p.logger.Debug("duplicate job fingerprint", "fingerprint", job.Fingerprint) err = jobs.ErrNoQueueSpecified return } jobID, err = p.enqueueJob(ctx, tx, job) if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + if pgErr.Code == pgerrcode.UniqueViolation { + err = ErrDuplicateJob + return + } + } p.logger.Error("error enqueueing job", "error", err) err = fmt.Errorf("error enqueuing job: %w", err) } - p.logger.Debug("job added to queue:", "job_id", jobID) - if jobID == jobs.DuplicateJobID { - err = ErrDuplicateJobID + err = tx.Commit(ctx) + if err != nil { + err = fmt.Errorf("error committing transaction: %w", err) return } + p.logger.Debug("job added to queue:", "job_id", jobID) // notify listeners that a new job has arrived if it's not a future job if job.RunAfter.Equal(now) { - p.logger.Debug("sending NOTIFY for job:", "job_id", jobID) - _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", job.Queue, jobID)) - if err != nil { - p.logger.Error("error NOTIFYING", "error", err) - err = fmt.Errorf("error executing transaction: %w", err) - } + p.announceJob(ctx, job.Queue, jobID) } else { p.mu.Lock() p.futureJobs[jobID] = job.RunAfter @@ -310,13 +314,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e p.logger.Debug("added job to future jobs list", "job_id", jobID, "run_after", job.RunAfter) } - err = tx.Commit(ctx) - if err != nil { - err = fmt.Errorf("error committing transaction: %w", err) - return - } - - p.logger.Debug("transaction committed. job is in the queue:", "job_id", jobID) return jobID, nil } @@ -413,22 +410,6 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job return } - p.logger.Debug("checking if job fingerprint already exists (is duplicate)", "fingerprint", j.Fingerprint) - var rowCount int64 - countRow := tx.QueryRow(ctx, `SELECT COUNT(*) as row_count - FROM neoq_jobs - WHERE fingerprint = $1 - AND status NOT IN ('processed')`, j.Fingerprint) - err = countRow.Scan(&rowCount) - if err != nil { - return - } - - // this is a duplicate job; skip it - if rowCount > 0 { - return jobs.DuplicateJobID, nil - } - p.logger.Debug("adding job to the queue") err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline) VALUES ($1, $2, $3, $4, $5) RETURNING id`, diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index c15b7c9..b169820 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -115,6 +115,54 @@ func TestBasicJobProcessing(t *testing.T) { }) } +// TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates +func TestDuplicateJobRejection(t *testing.T) { + const queue = "testing" + + connString := os.Getenv("TEST_DATABASE_URL") + if connString == "" { + t.Skip("Skipping: TEST_DATABASE_URL not set") + return + } + + ctx := context.TODO() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + if e != nil || jid == jobs.DuplicateJobID { + err = e + } + + _, e2 := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + + // we submitted two duplicate jobs; the error should be a duplicate job error + if !errors.Is(e2, postgres.ErrDuplicateJob) { + err = e2 + } + + if err != nil { + t.Error(err) + } + + t.Cleanup(func() { + flushDB() + }) +} + // TestBasicJobMultipleQueue tests that the postgres backend is able to process jobs on multiple queues func TestBasicJobMultipleQueue(t *testing.T) { const queue = "testing" diff --git a/go.mod b/go.mod index e44c2f2..cf04dcd 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/guregu/null v4.0.0+incompatible github.com/hibiken/asynq v0.24.0 github.com/iancoleman/strcase v0.2.0 + github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.3.1 github.com/jsuar/go-cron-descriptor v0.1.0 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 1fe1382..aad01f8 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/hibiken/asynq v0.24.0/go.mod h1:FVnRfUTm6gcoDkM/EjF4OIh5/06ergCPUO6pS github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=