diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql new file mode 100644 index 0000000..c28d4b1 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql @@ -0,0 +1 @@ +DROP TRIGGER IF EXISTS announce_job ON neoq_jobs CASCADE; \ No newline at end of file diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql new file mode 100644 index 0000000..865d442 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION announce_job() RETURNS trigger AS $$ +DECLARE +BEGIN + PERFORM pg_notify(CAST(NEW.queue AS text), CAST(NEW.id AS text)); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER announce_job + AFTER INSERT ON neoq_jobs FOR EACH ROW + WHEN (NEW.run_after <= timezone('utc', NEW.created_at)) + EXECUTE PROCEDURE announce_job(); \ No newline at end of file diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 1d76b71..cb1fc22 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -271,15 +271,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e // Rollback is safe to call even if the tx is already closed, so if // the tx commits successfully, this is a no-op defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - - // Make sure RunAfter is set to a non-zero value if not provided by the caller - // if already set, schedule the future job - now := time.Now().UTC() - if job.RunAfter.IsZero() { - p.logger.Debug("RunAfter not set, job will run immediately after being enqueued") - job.RunAfter = now - } - jobID, err = p.enqueueJob(ctx, tx, job) if err != nil { var pgErr *pgconn.PgError @@ -300,10 +291,8 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } p.logger.Debug("job added to queue:", "job_id", jobID) - // notify listeners that a new job has arrived if it's not a future job - if job.RunAfter.Equal(now) { - p.announceJob(ctx, job.Queue, jobID) - } else { + // add future jobs to the future job list + if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() p.futureJobs[jobID] = job.RunAfter p.mu.Unlock() @@ -607,6 +596,9 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { // announceJob announces jobs to queue listeners. // +// When jobs are inserted into the neoq_jobs table, a trigger announces the new job's arrival. This function is to be +// used for announcing jobs that have not been recently inserted into the neoq_jobs table. +// // Announced jobs are executed by the first worker to respond to the announcement. func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { conn, err := p.pool.Acquire(ctx)