Skip to content

Commit

Permalink
feat: Announce jobs using PG trigger
Browse files Browse the repository at this point in the history
Previously, new, non-future jobs were announced by executing `NOTIFY` in
Go code. Triggers are much better suited for this, and reduces neoq
complexity by allowing PG to perform notification work for most jobs.

"Future" jobs continue to use the `announceJob` method on a timer.
  • Loading branch information
acaloiaro committed Oct 3, 2023
1 parent fab0736 commit 35085be
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TRIGGER IF EXISTS announce_job ON neoq_jobs CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE OR REPLACE FUNCTION announce_job() RETURNS trigger AS $$
DECLARE
BEGIN
PERFORM pg_notify(CAST(NEW.queue AS text), CAST(NEW.id AS text));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER announce_job
AFTER INSERT ON neoq_jobs FOR EACH ROW
WHEN (NEW.run_after <= timezone('utc', NEW.created_at))
EXECUTE PROCEDURE announce_job();
18 changes: 5 additions & 13 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
// Rollback is safe to call even if the tx is already closed, so if
// the tx commits successfully, this is a no-op
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

// Make sure RunAfter is set to a non-zero value if not provided by the caller
// if already set, schedule the future job
now := time.Now().UTC()
if job.RunAfter.IsZero() {
p.logger.Debug("RunAfter not set, job will run immediately after being enqueued")
job.RunAfter = now
}

jobID, err = p.enqueueJob(ctx, tx, job)
if err != nil {
var pgErr *pgconn.PgError
Expand All @@ -300,10 +291,8 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}
p.logger.Debug("job added to queue:", "job_id", jobID)

// notify listeners that a new job has arrived if it's not a future job
if job.RunAfter.Equal(now) {
p.announceJob(ctx, job.Queue, jobID)
} else {
// add future jobs to the future job list
if job.RunAfter.After(time.Now().UTC()) {
p.mu.Lock()
p.futureJobs[jobID] = job.RunAfter
p.mu.Unlock()
Expand Down Expand Up @@ -607,6 +596,9 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {

// announceJob announces jobs to queue listeners.
//
// When jobs are inserted into the neoq_jobs table, a trigger announces the new job's arrival. This function is to be
// used for announcing jobs that have not been recently inserted into the neoq_jobs table.
//
// Announced jobs are executed by the first worker to respond to the announcement.
func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
conn, err := p.pool.Acquire(ctx)
Expand Down

0 comments on commit 35085be

Please sign in to comment.