Skip to content

Commit

Permalink
Support job deadlines
Browse files Browse the repository at this point in the history
Fixes #52
  • Loading branch information
acaloiaro committed Aug 28, 2023
1 parent a4c54e0 commit 7259486
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 103 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Neoq aims to be _simple_, _reliable_, _easy to integrate_, and demand a _minimal
- **Periodic Jobs**: Jobs can be scheduled periodically using standard cron syntax
- **Future Jobs**: Jobs can be scheduled in the future
- **Concurrency**: Concurrency is configurable for every queue
- **Job Deadlines**: If a job doesn't complete before a specific `time.Time`, the job expires

# Getting Started

Expand Down
11 changes: 8 additions & 3 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,

// Make sure RunAfter is set to a non-zero value if not provided by the caller
// if already set, schedule the future job
now := time.Now()
now := time.Now().UTC()
if job.RunAfter.IsZero() {
job.RunAfter = now
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
return
}

m.logger.Error("job failed", err, "job_id", job.ID)
m.logger.Error("job failed", "error", err, "job_id", job.ID)
runAfter := internal.CalculateBackoff(job.Retries)
job.RunAfter = runAfter
m.queueFutureJob(job)
Expand Down Expand Up @@ -307,7 +307,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han
job.Retries++
}

// execute the queue handler of this job
if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) {
m.logger.Debug("job deadline is in the past, skipping", "job_id", job.ID)
err = jobs.ErrJobExceededDeadline
return
}

err = handler.Exec(ctx, h)
if err != nil {
job.Error = null.StringFrom(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestFutureJobScheduling(t *testing.T) {
Payload: map[string]interface{}{
"message": "hello world",
},
RunAfter: time.Now().Add(5 * time.Second),
RunAfter: time.Now().UTC().Add(5 * time.Second),
})
if err != nil || jid == jobs.DuplicateJobID {
err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE neoq_jobs DROP COLUMN IF EXISTS deadline;
ALTER TABLE neoq_dead_jobs DROP COLUMN IF EXISTS deadline;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE neoq_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone;
ALTER TABLE neoq_dead_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone;
37 changes: 22 additions & 15 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
AND run_after <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobQuery = `SELECT id,fingerprint,queue,status,payload,retries,max_retries,run_after,ran_at,created_at,error
PendingJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE id = $1
AND status NOT IN ('processed')
Expand Down Expand Up @@ -248,7 +248,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e

// Make sure RunAfter is set to a non-zero value if not provided by the caller
// if already set, schedule the future job
now := time.Now()
now := time.Now().UTC()
if job.RunAfter.IsZero() {
p.logger.Debug("RunAfter not set, job will run immediately after being enqueued")
job.RunAfter = now
Expand Down Expand Up @@ -345,7 +345,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
return
}

p.logger.Error("error queueing cron job", err)
p.logger.Error("error queueing cron job", "error", err)
}
}); err != nil {
return fmt.Errorf("error adding cron: %w", err)
Expand Down Expand Up @@ -407,9 +407,9 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
}

p.logger.Debug("adding job to the queue")
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after)
VALUES ($1, $2, $3, $4) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter).Scan(&jobID)
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline)
VALUES ($1, $2, $3, $4, $5) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline).Scan(&jobID)
if err != nil {
err = fmt.Errorf("unable add job to queue: %w", err)
return
Expand All @@ -425,9 +425,9 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job,
return
}

_, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error())
_, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error, deadline)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error(), j.Deadline)

return
}
Expand All @@ -453,6 +453,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
errMsg := ""

if jobErr != nil {
p.logger.Error("job failed", "job_error", jobErr)
status = internal.JobStatusFailed
errMsg = jobErr.Error()
}
Expand All @@ -476,10 +477,10 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
if status == internal.JobStatusFailed {
runAfter = internal.CalculateBackoff(job.Retries)
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6"
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, runAfter, job.ID)
_, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.Retries, runAfter, job.ID)
} else {
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4"
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.ID)
_, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID)
}

if err != nil {
Expand Down Expand Up @@ -537,7 +538,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
continue
}

p.logger.Error("job failed", err, "job_id", jobID)
p.logger.Error("job failed", "error", err, "job_id", jobID)

continue
}
Expand Down Expand Up @@ -654,7 +655,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
break
}

p.logger.Error("failed to fetch pending job", err, "job_id", jobID)
p.logger.Error("failed to fetch pending job", "error", err, "job_id", jobID)
} else {
jobsCh <- jobID
}
Expand Down Expand Up @@ -688,6 +689,13 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
return
}

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in he past, skipping", "job_id", job.ID)
err = p.updateJob(ctx, err)
return
}

ctx = withJobContext(ctx, job)
ctx = context.WithValue(ctx, txCtxVarKey, tx)

Expand All @@ -698,7 +706,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl

// execute the queue handler of this job
jobErr := handler.Exec(ctx, h)

err = p.updateJob(ctx, jobErr)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -712,7 +719,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, err, "job_id", job.ID)
p.logger.Error(errMsg, "error", err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
}

Expand Down
49 changes: 28 additions & 21 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@ import (
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slog"
)

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")

func flushDB() {
ctx := context.Background()
dbURL := os.Getenv("TEST_DATABASE_URL")
if dbURL == "" {
return
}

conn, err := pgx.Connect(context.Background(), dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
// an error was encountered connecting to the db. this may simply mean that we're running tests against an
// uninitialized database and the database needs to be created. By creating a new pg backend instance with
// neoq.New, we run the db initialization process.
// if no errors return from `New`, then we've succeeded
var newErr error
_, newErr = neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(dbURL))
if newErr != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
return
}
}
defer conn.Close(context.Background())

Expand Down Expand Up @@ -80,11 +88,13 @@ func TestBasicJobProcessing(t *testing.T) {
t.Error(err)
}

deadline := time.Now().UTC().Add(5 * time.Second)
jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
Deadline: &deadline,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
Expand Down Expand Up @@ -243,11 +253,8 @@ func TestCron(t *testing.T) {
// TestBasicJobProcessingWithErrors tests that the postgres backend is able to update the status of jobs that fail
func TestBasicJobProcessingWithErrors(t *testing.T) {
const queue = "testing"
done := make(chan bool, 10)
defer close(done)

logsChan := make(chan string, 100)
timeoutTimer := time.After(5 * time.Second)

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
Expand All @@ -269,8 +276,7 @@ func TestBasicJobProcessingWithErrors(t *testing.T) {
return
})

buf := &strings.Builder{}
nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done})
nq.SetLogger(testutils.TestLogger{L: log.New(testutils.ChanWriter{Ch: logsChan}, "", 0)})

err = nq.Start(ctx, queue, h)
if err != nil {
Expand All @@ -287,25 +293,26 @@ func TestBasicJobProcessingWithErrors(t *testing.T) {
t.Error(e)
}

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
// the error is returned after the done channel receives its message, so we need to give time for the logger to
// have logged the error that was returned by the handler
time.Sleep(100 * time.Millisecond)
expectedLogMsg := "job failed to process: something bad happened"
actualLogMsg := strings.Trim(buf.String(), "\n")
if strings.Contains(actualLogMsg, expectedLogMsg) {
t.Error(fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg)) //nolint:all
results_loop:
for {
select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
break results_loop
case actualLogMsg := <-logsChan:
expectedLogMsg := "job failed to process: something bad happened"
if strings.Contains(actualLogMsg, expectedLogMsg) {
err = nil
break results_loop
}
err = fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg) //nolint:all
}
}

if err != nil {
t.Error(err)
}

nq.SetLogger(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logging.LogLevelDebug})))
t.Cleanup(func() {
flushDB()
})
Expand Down
Loading

0 comments on commit 7259486

Please sign in to comment.