diff --git a/README.md b/README.md index 61da849..0b1a65e 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Neoq aims to be _simple_, _reliable_, _easy to integrate_, and demand a _minimal - **Periodic Jobs**: Jobs can be scheduled periodically using standard cron syntax - **Future Jobs**: Jobs can be scheduled in the future - **Concurrency**: Concurrency is configurable for every queue +- **Job Deadlines**: If a job doesn't complete before a specific `time.Time`, the job expires # Getting Started diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 2e03eab..920f231 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -83,7 +83,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, // Make sure RunAfter is set to a non-zero value if not provided by the caller // if already set, schedule the future job - now := time.Now() + now := time.Now().UTC() if job.RunAfter.IsZero() { job.RunAfter = now } @@ -237,7 +237,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { return } - m.logger.Error("job failed", err, "job_id", job.ID) + m.logger.Error("job failed", "error", err, "job_id", job.ID) runAfter := internal.CalculateBackoff(job.Retries) job.RunAfter = runAfter m.queueFutureJob(job) @@ -307,7 +307,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han job.Retries++ } - // execute the queue handler of this job + if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) { + m.logger.Debug("job deadline is in the past, skipping", "job_id", job.ID) + err = jobs.ErrJobExceededDeadline + return + } + err = handler.Exec(ctx, h) if err != nil { job.Error = null.StringFrom(err.Error()) diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index 73de01e..cc5d3f3 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -185,7 +185,7 @@ func TestFutureJobScheduling(t *testing.T) { Payload: map[string]interface{}{ "message": "hello world", }, - RunAfter: time.Now().Add(5 * time.Second), + RunAfter: time.Now().UTC().Add(5 * time.Second), }) if err != nil || jid == jobs.DuplicateJobID { err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err) diff --git a/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.down.sql b/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.down.sql new file mode 100644 index 0000000..5863e65 --- /dev/null +++ b/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE neoq_jobs DROP COLUMN IF EXISTS deadline; +ALTER TABLE neoq_dead_jobs DROP COLUMN IF EXISTS deadline; diff --git a/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.up.sql b/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.up.sql new file mode 100644 index 0000000..75accca --- /dev/null +++ b/backends/postgres/migrations/20230827174530_add_deadline_field_to_jobs.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE neoq_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone; +ALTER TABLE neoq_dead_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone; \ No newline at end of file diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 78a1ee8..da07719 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -39,7 +39,7 @@ const ( AND run_after <= NOW() FOR UPDATE SKIP LOCKED LIMIT 1` - PendingJobQuery = `SELECT id,fingerprint,queue,status,payload,retries,max_retries,run_after,ran_at,created_at,error + PendingJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error FROM neoq_jobs WHERE id = $1 AND status NOT IN ('processed') @@ -248,7 +248,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e // Make sure RunAfter is set to a non-zero value if not provided by the caller // if already set, schedule the future job - now := time.Now() + now := time.Now().UTC() if job.RunAfter.IsZero() { p.logger.Debug("RunAfter not set, job will run immediately after being enqueued") job.RunAfter = now @@ -345,7 +345,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha return } - p.logger.Error("error queueing cron job", err) + p.logger.Error("error queueing cron job", "error", err) } }); err != nil { return fmt.Errorf("error adding cron: %w", err) @@ -407,9 +407,9 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job } p.logger.Debug("adding job to the queue") - err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after) - VALUES ($1, $2, $3, $4) RETURNING id`, - j.Queue, j.Fingerprint, j.Payload, j.RunAfter).Scan(&jobID) + err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline) + VALUES ($1, $2, $3, $4, $5) RETURNING id`, + j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline).Scan(&jobID) if err != nil { err = fmt.Errorf("unable add job to queue: %w", err) return @@ -425,9 +425,9 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, return } - _, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error) - VALUES ($1, $2, $3, $4, $5, $6, $7)`, - j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error()) + _, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error, deadline) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, + j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error(), j.Deadline) return } @@ -453,6 +453,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { errMsg := "" if jobErr != nil { + p.logger.Error("job failed", "job_error", jobErr) status = internal.JobStatusFailed errMsg = jobErr.Error() } @@ -476,10 +477,10 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { if status == internal.JobStatusFailed { runAfter = internal.CalculateBackoff(job.Retries) qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6" - _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, runAfter, job.ID) + _, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.Retries, runAfter, job.ID) } else { qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4" - _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.ID) + _, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID) } if err != nil { @@ -537,7 +538,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { continue } - p.logger.Error("job failed", err, "job_id", jobID) + p.logger.Error("job failed", "error", err, "job_id", jobID) continue } @@ -654,7 +655,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan break } - p.logger.Error("failed to fetch pending job", err, "job_id", jobID) + p.logger.Error("failed to fetch pending job", "error", err, "job_id", jobID) } else { jobsCh <- jobID } @@ -688,6 +689,13 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl return } + if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { + err = jobs.ErrJobExceededDeadline + p.logger.Debug("job deadline is in he past, skipping", "job_id", job.ID) + err = p.updateJob(ctx, err) + return + } + ctx = withJobContext(ctx, job) ctx = context.WithValue(ctx, txCtxVarKey, tx) @@ -698,7 +706,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl // execute the queue handler of this job jobErr := handler.Exec(ctx, h) - err = p.updateJob(ctx, jobErr) if err != nil { if errors.Is(err, context.Canceled) { @@ -712,7 +719,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl err = tx.Commit(ctx) if err != nil { errMsg := "unable to commit job transaction. retrying this job may dupliate work:" - p.logger.Error(errMsg, err, "job_id", job.ID) + p.logger.Error(errMsg, "error", err, "job_id", job.ID) return fmt.Errorf("%s %w", errMsg, err) } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index aa71131..c15b7c9 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -18,12 +18,12 @@ import ( "github.com/acaloiaro/neoq/logging" "github.com/acaloiaro/neoq/testutils" "github.com/jackc/pgx/v5" - "golang.org/x/exp/slog" ) var errPeriodicTimeout = errors.New("timed out waiting for periodic job") func flushDB() { + ctx := context.Background() dbURL := os.Getenv("TEST_DATABASE_URL") if dbURL == "" { return @@ -31,8 +31,16 @@ func flushDB() { conn, err := pgx.Connect(context.Background(), dbURL) if err != nil { - fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) - os.Exit(1) + // an error was encountered connecting to the db. this may simply mean that we're running tests against an + // uninitialized database and the database needs to be created. By creating a new pg backend instance with + // neoq.New, we run the db initialization process. + // if no errors return from `New`, then we've succeeded + var newErr error + _, newErr = neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(dbURL)) + if newErr != nil { + fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) + return + } } defer conn.Close(context.Background()) @@ -80,11 +88,13 @@ func TestBasicJobProcessing(t *testing.T) { t.Error(err) } + deadline := time.Now().UTC().Add(5 * time.Second) jid, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello world", }, + Deadline: &deadline, }) if e != nil || jid == jobs.DuplicateJobID { t.Error(e) @@ -243,11 +253,8 @@ func TestCron(t *testing.T) { // TestBasicJobProcessingWithErrors tests that the postgres backend is able to update the status of jobs that fail func TestBasicJobProcessingWithErrors(t *testing.T) { const queue = "testing" - done := make(chan bool, 10) - defer close(done) - + logsChan := make(chan string, 100) timeoutTimer := time.After(5 * time.Second) - connString := os.Getenv("TEST_DATABASE_URL") if connString == "" { t.Skip("Skipping: TEST_DATABASE_URL not set") @@ -269,8 +276,7 @@ func TestBasicJobProcessingWithErrors(t *testing.T) { return }) - buf := &strings.Builder{} - nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done}) + nq.SetLogger(testutils.TestLogger{L: log.New(testutils.ChanWriter{Ch: logsChan}, "", 0)}) err = nq.Start(ctx, queue, h) if err != nil { @@ -287,17 +293,19 @@ func TestBasicJobProcessingWithErrors(t *testing.T) { t.Error(e) } - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - case <-done: - // the error is returned after the done channel receives its message, so we need to give time for the logger to - // have logged the error that was returned by the handler - time.Sleep(100 * time.Millisecond) - expectedLogMsg := "job failed to process: something bad happened" - actualLogMsg := strings.Trim(buf.String(), "\n") - if strings.Contains(actualLogMsg, expectedLogMsg) { - t.Error(fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg)) //nolint:all +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case actualLogMsg := <-logsChan: + expectedLogMsg := "job failed to process: something bad happened" + if strings.Contains(actualLogMsg, expectedLogMsg) { + err = nil + break results_loop + } + err = fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg) //nolint:all } } @@ -305,7 +313,6 @@ func TestBasicJobProcessingWithErrors(t *testing.T) { t.Error(err) } - nq.SetLogger(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logging.LogLevelDebug}))) t.Cleanup(func() { flushDB() }) diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index e20c95b..05b8963 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -24,10 +24,12 @@ import ( "golang.org/x/exp/slog" ) -var ( - // ErrInvalidAddr indicates that the provided address is not a valid redis connection string - ErrInvalidAddr = errors.New("invalid connecton string: see documentation for valid connection strings") -) +// All jobs are placed on the same 'default' queue (until a compelling case is made for using different asynq queues +// for every job) +const defaultAsynqQueue = "default" + +// ErrInvalidAddr indicates that the provided address is not a valid redis connection string +var ErrInvalidAddr = errors.New("invalid connecton string: see documentation for valid connection strings") // RedisBackend is a Redis-backed neoq backend // nolint: revive @@ -35,6 +37,7 @@ type RedisBackend struct { types.Backend client *asynq.Client server *asynq.Server + inspector *asynq.Inspector mux *asynq.ServeMux config *config.Config logger logging.Logger @@ -73,7 +76,7 @@ func (m *memoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfi } // Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend -func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { +func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, err error) { b := &RedisBackend{ config: config.New(), mu: &sync.Mutex{}, @@ -98,6 +101,7 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, if b.config.BackendAuthPassword != "" { clientOpt.Password = b.config.BackendAuthPassword } + b.inspector = asynq.NewInspector(clientOpt) b.client = asynq.NewClient(clientOpt) b.server = asynq.NewServer( clientOpt, @@ -115,7 +119,7 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, PeriodicTaskConfigProvider: b.taskProvider, SyncInterval: 500 * time.Millisecond, SchedulerOpts: &asynq.SchedulerOpts{ - PostEnqueueFunc: func(info *asynq.TaskInfo, err error) { + PostEnqueueFunc: func(_ *asynq.TaskInfo, err error) { if err != nil { b.logger.Error("unable to schedule task", err) } @@ -193,12 +197,7 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string return } task := asynq.NewTask(job.Queue, payload) - if job.RunAfter.IsZero() { - _, err = b.client.EnqueueContext(ctx, task, asynq.TaskID(job.Fingerprint)) - } else { - _, err = b.client.EnqueueContext(ctx, task, asynq.TaskID(job.Fingerprint), asynq.ProcessAt(job.RunAfter)) - } - + _, err = b.client.EnqueueContext(ctx, task, jobToTaskOptions(job)...) if err != nil { err = fmt.Errorf("unable to enqueue task: %w", err) } @@ -209,27 +208,41 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string // Start starts processing jobs with the specified queue and handler func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) { b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) { + taskID := t.ResultWriter().TaskID() var p map[string]any if err = json.Unmarshal(t.Payload(), &p); err != nil { - b.logger.Info("job has no payload") + b.logger.Info("job has no payload", "task_id", taskID) + } + + ti, err := b.inspector.GetTaskInfo(defaultAsynqQueue, taskID) + if err != nil { + b.logger.Error("unable to process job", "error", err) + return + } + if !ti.Deadline.IsZero() && ti.Deadline.UTC().Before(time.Now().UTC()) { + err = jobs.ErrJobExceededDeadline + b.logger.Debug("job deadline is in the past, skipping", "task_id", taskID) + return } job := &jobs.Job{ - CreatedAt: time.Now(), + CreatedAt: time.Now().UTC(), Queue: queue, Payload: p, + Deadline: &ti.Deadline, + RunAfter: ti.NextProcessAt, } ctx = withJobContext(ctx, job) err = handler.Exec(ctx, h) if err != nil { - b.logger.Error("error handling job", err) + b.logger.Error("error handling job", "error", err) } return }) - return + return nil } // StartCron starts processing jobs with the specified cron schedule and handler @@ -263,6 +276,21 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler return } +// jobToTaskOptions converts jobs.Job to a slice of asynq.Option that corresponds with its settings +func jobToTaskOptions(job *jobs.Job) (opts []asynq.Option) { + opts = append(opts, asynq.TaskID(job.Fingerprint)) + + if !job.RunAfter.IsZero() { + opts = append(opts, asynq.ProcessAt(job.RunAfter)) + } + + if job.Deadline != nil { + opts = append(opts, asynq.Deadline(*job.Deadline)) + } + + return +} + // Asynq does not currently support the seconds field in cron specs. However, it does supports seconds using the // alternative syntax: @every Xs, where X is the number of seconds between executions // diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index 634d045..0bf0d0e 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -2,10 +2,10 @@ package redis import ( "context" + "errors" "fmt" "log" "os" - "strings" "testing" "time" @@ -17,12 +17,14 @@ import ( "github.com/hibiken/asynq" ) -const queue = "testing" -const queue2 = "testing2" +const ( + queue = "testing" + queue2 = "testing2" +) func init() { var err error - var connString = os.Getenv("TEST_REDIS_URL") + connString := os.Getenv("TEST_REDIS_URL") if connString == "" { return } @@ -62,10 +64,10 @@ func init() { } func TestBasicJobProcessing(t *testing.T) { - var timeoutTimer = time.After(5 * time.Second) - var done = make(chan bool) + timeoutTimer := time.After(5 * time.Second) + done := make(chan bool) - var connString = os.Getenv("TEST_REDIS_URL") + connString := os.Getenv("TEST_REDIS_URL") if connString == "" { t.Skip("Skipping: TEST_REDIS_URL not set") return @@ -118,12 +120,12 @@ func TestBasicJobProcessing(t *testing.T) { // TestBasicJobMultipleQueue tests that the redis backend is able to process jobs on multiple queues func TestBasicJobMultipleQueue(t *testing.T) { - var done = make(chan bool) - var doneCnt = 0 + done := make(chan bool) + doneCnt := 0 - var timeoutTimer = time.After(30 * time.Second) + timeoutTimer := time.After(30 * time.Second) - var connString = os.Getenv("TEST_REDIS_URL") + connString := os.Getenv("TEST_REDIS_URL") if connString == "" { t.Skip("Skipping: TEST_REDIS_URL not set") return @@ -188,7 +190,6 @@ results_loop: err = jobs.ErrJobTimeout break results_loop case <-done: - log.Println("DONE") doneCnt++ if doneCnt == 2 { break results_loop @@ -204,9 +205,9 @@ results_loop: func TestStartCron(t *testing.T) { done := make(chan bool) - var timeoutTimer = time.After(5 * time.Second) + timeoutTimer := time.After(5 * time.Second) - var connString = os.Getenv("TEST_REDIS_URL") + connString := os.Getenv("TEST_REDIS_URL") if connString == "" { t.Skip("Skipping: TEST_REDIS_URL not set") return @@ -243,34 +244,33 @@ func TestStartCron(t *testing.T) { func TestJobProcessingWithOptions(t *testing.T) { const queue = "testing" - var timeoutTimer = time.After(5 * time.Second) - var done = make(chan bool) + timeoutTimer := time.After(5 * time.Second) + logsChan := make(chan string, 1) - var connString = os.Getenv("TEST_REDIS_URL") + connString := os.Getenv("TEST_REDIS_URL") if connString == "" { t.Skip("Skipping: TEST_REDIS_URL not set") return } password := os.Getenv("REDIS_PASSWORD") - ctx := context.TODO() + ctx := context.Background() nq, err := neoq.New( ctx, neoq.WithBackend(Backend), WithAddr(connString), WithPassword(password), - WithShutdownTimeout(time.Millisecond)) + WithShutdownTimeout(500*time.Millisecond)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) - buf := &strings.Builder{} - nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done}) + logger := testutils.TestLogger{L: log.New(&testutils.ChanWriter{Ch: logsChan}, "", 0)} + nq.SetLogger(logger) h := handler.New(func(_ context.Context) (err error) { time.Sleep(50 * time.Millisecond) - done <- true return }) h.WithOptions( @@ -293,15 +293,76 @@ func TestJobProcessingWithOptions(t *testing.T) { t.Error(e) } + expectedLogMsg := "error handling job [error job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword select { case <-timeoutTimer: err = jobs.ErrJobTimeout - case <-done: - expectedLogMsg := "error handling job [job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword - actualLogMsg := strings.Trim(buf.String(), "\n") - if actualLogMsg != expectedLogMsg { - t.Error(fmt.Errorf("%s != %s", actualLogMsg, expectedLogMsg)) //nolint:all + case actualLogMsg := <-logsChan: + if actualLogMsg == expectedLogMsg { + err = nil + break } + + err = fmt.Errorf("%s != %s", actualLogMsg, expectedLogMsg) //nolint:all + } + + if err != nil { + t.Error(err) + } +} + +func TestJobProcessingWithJobDeadline(t *testing.T) { + const queue = "testing" + timeoutTimer := time.After(100 * time.Millisecond) + done := make(chan bool) + + connString := os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + ctx := context.Background() + nq, err := neoq.New( + ctx, + neoq.WithBackend(Backend), + WithAddr(connString), + WithPassword(password), + WithShutdownTimeout(500*time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(func(_ context.Context) (err error) { + time.Sleep(50 * time.Millisecond) + done <- true + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } + + dl := time.Now().UTC() + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), + }, + Deadline: &dl, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = nil + case <-done: + err = errors.New("job should not have completed, but did") //nolint:all } if err != nil { diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_deadline/main.go new file mode 100644 index 0000000..783b076 --- /dev/null +++ b/examples/add_job_with_deadline/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" +) + +func main() { + var err error + const queue = "foobar" + ctx := context.Background() + + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) + if err != nil { + log.Fatalf("error initializing neoq: %v", err) + } + + // we use a done channel here to make sure that our test doesn't exit before the job finishes running + // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons + done := make(chan bool) + + h := handler.New(func(_ context.Context) (err error) { + <-done + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + log.Println("error listening to queue", err) + } + + // this job must complete before 5 seconds from now + deadline := time.Now().Add(5 * time.Second) + _, err = nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello, world", + }, + Deadline: &deadline, + }) + if err != nil { + log.Println("error adding job", err) + } + + <-done +} diff --git a/internal/internal.go b/internal/internal.go index 700796d..0243152 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -25,7 +25,7 @@ func CalculateBackoff(retryCount int) time.Time { const backoffExponent = 4 const maxInt = 30 p := int(math.Round(math.Pow(float64(retryCount), backoffExponent))) - return time.Now().Add(time.Duration(p+15+RandInt(maxInt)*retryCount+1) * time.Second) + return time.Now().UTC().Add(time.Duration(p+15+RandInt(maxInt)*retryCount+1) * time.Second) } // RandInt returns a random integer up to max diff --git a/jobs/jobs.go b/jobs/jobs.go index db601d9..6a6c012 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -14,9 +14,10 @@ import ( ) var ( - ErrContextHasNoJob = errors.New("context has no Job") - ErrJobTimeout = errors.New("timed out waiting for job(s)") - ErrNoQueueSpecified = errors.New("this job does not specify a queue. please specify a queue") + ErrContextHasNoJob = errors.New("context has no Job") + ErrJobTimeout = errors.New("timed out waiting for job(s)") + ErrNoQueueSpecified = errors.New("this job does not specify a queue. please specify a queue") + ErrJobExceededDeadline = errors.New("the job did not complete before its deadline") ) const ( @@ -36,6 +37,7 @@ type Job struct { Status string `db:"status"` // The status of the job Queue string `db:"queue"` // The queue the job is on Payload map[string]any `db:"payload"` // JSON job payload for more complex jobs + Deadline *time.Time `db:"deadline"` // The time after which the job should no longer be run RunAfter time.Time `db:"run_after"` // The time after which the job is elligible to be picked up by a worker RanAt null.Time `db:"ran_at"` // The last time the job ran Error null.String `db:"error"` // The last error the job elicited diff --git a/neoq_test.go b/neoq_test.go index 6eb026d..89d711e 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -195,10 +195,10 @@ func TestStartCron(t *testing.T) { } func TestSetLogger(t *testing.T) { + timeoutTimer := time.After(5 * time.Second) const queue = "testing" - done := make(chan bool, 1) - buf := &strings.Builder{} - ctx := context.TODO() + logsChan := make(chan string, 10) + ctx := context.Background() nq, err := New(ctx, WithBackend(memory.Backend)) if err != nil { @@ -206,7 +206,7 @@ func TestSetLogger(t *testing.T) { } defer nq.Shutdown(ctx) - nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done}) + nq.SetLogger(testutils.TestLogger{L: log.New(testutils.ChanWriter{Ch: logsChan}, "", 0)}) h := handler.New(func(ctx context.Context) (err error) { err = errTrigger @@ -215,21 +215,32 @@ func TestSetLogger(t *testing.T) { if err != nil { t.Error(err) } - err = nq.Start(ctx, queue, h) if err != nil { t.Error(err) } - _, err = nq.Enqueue(ctx, &jobs.Job{Queue: queue}) if err != nil { t.Error(err) } - <-done expectedLogMsg := "adding a new job [queue testing]" - actualLogMsg := strings.Trim(buf.String(), "\n") - if actualLogMsg != expectedLogMsg { - t.Error(fmt.Errorf("%s != %s", actualLogMsg, expectedLogMsg)) //nolint:all +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case actualLogMsg := <-logsChan: + if strings.Contains(actualLogMsg, expectedLogMsg) { + err = nil + break results_loop + } + err = fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg) //nolint:all + } + } + + if err != nil { + t.Error(err) } } diff --git a/testutils/testutils.go b/testutils/testutils.go index 3edde53..e0ff466 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -2,28 +2,36 @@ package testutils -import "log" +import ( + "log" + "strings" +) // TestLogger is a utility for logging in tests type TestLogger struct { - L *log.Logger - Done chan bool + L *log.Logger } // Info prints to stdout and signals its done channel func (h TestLogger) Info(m string, args ...any) { h.L.Println(m, args) - h.Done <- true } // Debug prints to stdout and signals its done channel func (h TestLogger) Debug(m string, args ...any) { h.L.Println(m, args) - h.Done <- true } // Error prints to stdout and signals its done channel func (h TestLogger) Error(m string, args ...any) { h.L.Println(m, args) - h.Done <- true +} + +type ChanWriter struct { + Ch chan string +} + +func (c ChanWriter) Write(p []byte) (n int, err error) { + c.Ch <- strings.Trim(string(p), "\n") + return len(p), nil }