Skip to content

Commit

Permalink
feat: Improve postgres duplicate job detection performance
Browse files Browse the repository at this point in the history
Previously, neoq would query to see if new jobs' (fingerprint, status)
tuple were unique. This change adds a unique index on (fingerprint,
status) and simply allows unique key constraint violations to be
returned from `Enqueue` instead. This prevents one additional query
for every new queue item.
  • Loading branch information
acaloiaro committed Sep 2, 2023
1 parent 5a00fa1 commit 6ea1bb6
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS neoq_jobs_fingerprint_idx ON neoq_jobs (fingerprint, status);
DROP INDEX IF EXISTS neoq_jobs_fingerprint_unique_idx;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--- This unique partial index prevents multiple unprocessed jobs with the same payload from being queued
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (fingerprint, status) WHERE NOT (status = 'processed');
DROP INDEX IF EXISTS neoq_jobs_fingerprint_idx;
49 changes: 15 additions & 34 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
_ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/iancoleman/strcase"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
"github.com/robfig/cron"
Expand Down Expand Up @@ -64,7 +66,7 @@ var (
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrDuplicateJob = errors.New("duplicate job")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
)

Expand Down Expand Up @@ -278,45 +280,40 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}

if job.Queue == "" {
p.logger.Debug("duplicate job fingerprint", "fingerprint", job.Fingerprint)
err = jobs.ErrNoQueueSpecified
return
}

jobID, err = p.enqueueJob(ctx, tx, job)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == pgerrcode.UniqueViolation {
err = ErrDuplicateJob
return
}
}
p.logger.Error("error enqueueing job", "error", err)
err = fmt.Errorf("error enqueuing job: %w", err)
}

p.logger.Debug("job added to queue:", "job_id", jobID)
if jobID == jobs.DuplicateJobID {
err = ErrDuplicateJobID
err = tx.Commit(ctx)
if err != nil {
err = fmt.Errorf("error committing transaction: %w", err)
return
}
p.logger.Debug("job added to queue:", "job_id", jobID)

// notify listeners that a new job has arrived if it's not a future job
if job.RunAfter.Equal(now) {
p.logger.Debug("sending NOTIFY for job:", "job_id", jobID)
_, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", job.Queue, jobID))
if err != nil {
p.logger.Error("error NOTIFYING", "error", err)
err = fmt.Errorf("error executing transaction: %w", err)
}
p.announceJob(ctx, job.Queue, jobID)
} else {
p.mu.Lock()
p.futureJobs[jobID] = job.RunAfter
p.mu.Unlock()
p.logger.Debug("added job to future jobs list", "job_id", jobID, "run_after", job.RunAfter)
}

err = tx.Commit(ctx)
if err != nil {
err = fmt.Errorf("error committing transaction: %w", err)
return
}

p.logger.Debug("transaction committed. job is in the queue:", "job_id", jobID)
return jobID, nil
}

Expand Down Expand Up @@ -413,22 +410,6 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
return
}

p.logger.Debug("checking if job fingerprint already exists (is duplicate)", "fingerprint", j.Fingerprint)
var rowCount int64
countRow := tx.QueryRow(ctx, `SELECT COUNT(*) as row_count
FROM neoq_jobs
WHERE fingerprint = $1
AND status NOT IN ('processed')`, j.Fingerprint)
err = countRow.Scan(&rowCount)
if err != nil {
return
}

// this is a duplicate job; skip it
if rowCount > 0 {
return jobs.DuplicateJobID, nil
}

p.logger.Debug("adding job to the queue")
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline)
VALUES ($1, $2, $3, $4, $5) RETURNING id`,
Expand Down
48 changes: 48 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,54 @@ func TestBasicJobProcessing(t *testing.T) {
})
}

// TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates
func TestDuplicateJobRejection(t *testing.T) {
const queue = "testing"

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})
if e != nil || jid == jobs.DuplicateJobID {
err = e
}

_, e2 := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})

// we submitted two duplicate jobs; the error should be a duplicate job error
if !errors.Is(e2, postgres.ErrDuplicateJob) {
err = e2
}

if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

// TestBasicJobMultipleQueue tests that the postgres backend is able to process jobs on multiple queues
func TestBasicJobMultipleQueue(t *testing.T) {
const queue = "testing"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/guregu/null v4.0.0+incompatible
github.com/hibiken/asynq v0.24.0
github.com/iancoleman/strcase v0.2.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v5 v5.3.1
github.com/jsuar/go-cron-descriptor v0.1.0
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ github.com/hibiken/asynq v0.24.0/go.mod h1:FVnRfUTm6gcoDkM/EjF4OIh5/06ergCPUO6pS
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
Expand Down

0 comments on commit 6ea1bb6

Please sign in to comment.