diff --git a/client.go b/client.go index b46ec4e..a817c48 100644 --- a/client.go +++ b/client.go @@ -137,10 +137,10 @@ func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Querya } _, err = q.Exec(ctx, `INSERT INTO gue_jobs -(job_id, queue, priority, run_at, job_type, args, created_at, updated_at) +(job_id, queue, priority, run_at, job_type, args, skip_delete, status, created_at, updated_at) VALUES -($1, $2, $3, $4, $5, $6, $7, $7) -`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, j.CreatedAt) +($1, $2, $3, $4, $5, $6, $7, $8, $9, $9) +`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, boolToInt16(j.SkipDelete), JobStatusTodo, j.CreatedAt) c.logger.Debug( "Tried to enqueue a job", @@ -177,13 +177,13 @@ func (c *Client) execEnqueue(ctx context.Context, j *Job, q adapter.Queryable) e // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at + sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at FROM gue_jobs -WHERE queue = $1 AND run_at <= $2 +WHERE queue = $1 AND run_at <= $2 AND (status = $3 OR status = $4) ORDER BY priority ASC LIMIT 1 FOR UPDATE SKIP LOCKED` - return c.execLockJob(ctx, true, sql, queue, time.Now().UTC()) + return c.execLockJob(ctx, true, sql, queue, time.Now().UTC(), JobStatusTodo, JobStatusRetry) } // LockJobByID attempts to retrieve a specific Job from the database. @@ -197,7 +197,7 @@ LIMIT 1 FOR UPDATE SKIP LOCKED` // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at + sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at FROM gue_jobs WHERE job_id = $1 FOR UPDATE SKIP LOCKED` @@ -218,13 +218,13 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED` // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at + sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at FROM gue_jobs -WHERE queue = $1 AND run_at <= $2 +WHERE queue = $1 AND run_at <= $2 AND (status = $3 OR status = $4) ORDER BY run_at, priority ASC LIMIT 1 FOR UPDATE SKIP LOCKED` - return c.execLockJob(ctx, true, sql, queue, time.Now().UTC()) + return c.execLockJob(ctx, true, sql, queue, time.Now().UTC(), JobStatusTodo, JobStatusRetry) } func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql string, args ...any) (*Job, error) { @@ -236,6 +236,7 @@ func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql stri j := Job{tx: tx, backoff: c.backoff, logger: c.logger} + var skipDelete int16 err = tx.QueryRow(ctx, sql, args...).Scan( &j.ID, &j.Queue, @@ -243,11 +244,14 @@ func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql stri &j.RunAt, &j.Type, &j.Args, + &skipDelete, + &j.Status, &j.ErrorCount, &j.LastError, &j.CreatedAt, ) if err == nil { + j.SkipDelete = int16ToBool(skipDelete) c.mLockJob.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(true))) return &j, nil } @@ -279,3 +283,17 @@ func (c *Client) initMetrics() (err error) { return nil } + +func boolToInt16(v bool) int16 { + if v { + return 1 + } + return 0 +} + +func int16ToBool(v int16) bool { + if v > 0 { + return true + } + return false +} diff --git a/client_test.go b/client_test.go index 8a1c94d..1fde08d 100644 --- a/client_test.go +++ b/client_test.go @@ -140,7 +140,7 @@ func testLockJobCustomQueue(t *testing.T, connPool adapter.ConnPool) { assert.NoError(t, err) }) - err = j.Delete(ctx) + err = j.Finish(ctx, JobStatusSuccess) require.NoError(t, err) } @@ -405,15 +405,15 @@ func testJobConnRace(t *testing.T, connPool adapter.ConnPool) { wg.Wait() } -func TestJobDelete(t *testing.T) { +func TestJobFinish(t *testing.T) { for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { t.Run(name, func(t *testing.T) { - testJobDelete(t, openFunc(t)) + testJobFinish(t, openFunc(t)) }) } } -func testJobDelete(t *testing.T, connPool adapter.ConnPool) { +func testJobFinish(t *testing.T, connPool adapter.ConnPool) { ctx := context.Background() c, err := NewClient(connPool) @@ -427,7 +427,7 @@ func testJobDelete(t *testing.T, connPool adapter.ConnPool) { require.NoError(t, err) require.NotNil(t, j) - err = j.Delete(ctx) + err = j.Finish(ctx, JobStatusSuccess) require.NoError(t, err) err = j.Done(ctx) @@ -440,6 +440,48 @@ func testJobDelete(t *testing.T, connPool adapter.ConnPool) { assert.Nil(t, jj) } +func TestJobFinishSkipDelete(t *testing.T) { + for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { + t.Run(name, func(t *testing.T) { + testJobFinishSkipDelete(t, openFunc(t)) + }) + } +} + +func testJobFinishSkipDelete(t *testing.T, connPool adapter.ConnPool) { + ctx := context.Background() + + c, err := NewClient(connPool) + require.NoError(t, err) + + job := Job{Type: "MyJob", SkipDelete: true} + err = c.Enqueue(ctx, &job) + require.NoError(t, err) + + j, err := c.LockJob(ctx, "") + require.NoError(t, err) + require.NotNil(t, j) + + err = j.Finish(ctx, JobStatusSuccess) + require.NoError(t, err) + + err = j.Done(ctx) + require.NoError(t, err) + + // make sure job was not deleted + j2, err := c.LockJobByID(ctx, job.ID) + require.NoError(t, err) + require.NotNil(t, j2) + + t.Cleanup(func() { + err := j2.Done(ctx) + assert.NoError(t, err) + }) + + assert.Equal(t, job.Type, j2.Type) + assert.Equal(t, JobStatusSuccess, j2.Status) +} + func TestJobDone(t *testing.T) { for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { t.Run(name, func(t *testing.T) { @@ -536,6 +578,50 @@ func testJobError(t *testing.T, connPool adapter.ConnPool) { assert.Equal(t, msg, j2.LastError.String) assert.Equal(t, int32(1), j2.ErrorCount) assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix()) + assert.Equal(t, JobStatusRetry, j2.Status) +} + +func TestJobErrorSkipDelete(t *testing.T) { + for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { + t.Run(name, func(t *testing.T) { + testJobErrorSkipDelete(t, openFunc(t)) + }) + } +} + +func testJobErrorSkipDelete(t *testing.T, connPool adapter.ConnPool) { + ctx := context.Background() + + c, err := NewClient(connPool) + require.NoError(t, err) + + job := &Job{Type: "MyJob", SkipDelete: true} + err = c.Enqueue(ctx, job) + require.NoError(t, err) + + j, err := c.LockJob(ctx, "") + require.NoError(t, err) + require.NotNil(t, j) + + msg := "world\nended" + err = j.Error(ctx, errors.New(msg)) + require.NoError(t, err) + + // make sure job was not deleted + j2, err := c.LockJobByID(ctx, job.ID) + require.NoError(t, err) + require.NotNil(t, j2) + + t.Cleanup(func() { + err := j2.Done(ctx) + assert.NoError(t, err) + }) + + assert.True(t, j2.LastError.Valid) + assert.Equal(t, msg, j2.LastError.String) + assert.Equal(t, int32(1), j2.ErrorCount) + assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix()) + assert.Equal(t, JobStatusRetry, j2.Status) } func TestJobErrorCustomBackoff(t *testing.T) { @@ -584,6 +670,83 @@ func testJobErrorCustomBackoff(t *testing.T, connPool adapter.ConnPool) { assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix()) // a diff in a sec is possible when doing dates math, so allow it assert.WithinDuration(t, job.RunAt.Add(time.Hour), j2.RunAt, time.Second) + assert.Equal(t, JobStatusRetry, j2.Status) +} + +func TestJobErrorBackoffNever(t *testing.T) { + for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { + t.Run(name, func(t *testing.T) { + testJobErrorBackoffNever(t, openFunc(t)) + }) + } +} + +func testJobErrorBackoffNever(t *testing.T, connPool adapter.ConnPool) { + ctx := context.Background() + + c, err := NewClient(connPool, WithClientBackoff(BackoffNever)) + require.NoError(t, err) + + job := &Job{Type: "MyJob"} + err = c.Enqueue(ctx, job) + require.NoError(t, err) + + j, err := c.LockJob(ctx, "") + require.NoError(t, err) + require.NotNil(t, j) + + msg := "world\nended" + err = j.Error(ctx, errors.New(msg)) + require.NoError(t, err) + + // make sure job was deleted + jj, err := c.LockJobByID(ctx, job.ID) + require.Error(t, err) + assert.True(t, errors.Is(err, adapter.ErrNoRows)) + assert.Nil(t, jj) +} + +func TestJobErrorBackoffNeverSkipDelete(t *testing.T) { + for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { + t.Run(name, func(t *testing.T) { + testJobErrorBackoffNeverSkipDelete(t, openFunc(t)) + }) + } +} + +func testJobErrorBackoffNeverSkipDelete(t *testing.T, connPool adapter.ConnPool) { + ctx := context.Background() + + c, err := NewClient(connPool, WithClientBackoff(BackoffNever)) + require.NoError(t, err) + + job := &Job{Type: "MyJob", SkipDelete: true} + err = c.Enqueue(ctx, job) + require.NoError(t, err) + + j, err := c.LockJob(ctx, "") + require.NoError(t, err) + require.NotNil(t, j) + + msg := "world\nended" + err = j.Error(ctx, errors.New(msg)) + require.NoError(t, err) + + // make sure job was not deleted + j2, err := c.LockJobByID(ctx, job.ID) + require.NoError(t, err) + require.NotNil(t, j2) + + t.Cleanup(func() { + err := j2.Done(ctx) + assert.NoError(t, err) + }) + + assert.True(t, j2.LastError.Valid) + assert.Equal(t, msg, j2.LastError.String) + assert.Equal(t, int32(1), j2.ErrorCount) + assert.Equal(t, j2.RunAt.Unix(), job.RunAt.Unix()) + assert.Equal(t, JobStatusError, j2.Status) } func TestJobPriority(t *testing.T) { @@ -783,6 +946,8 @@ CREATE TABLE gue_jobs run_at TIMESTAMPTZ NOT NULL, job_type TEXT NOT NULL, args BYTEA NOT NULL, + skip_delete SMALLINT NOT NULL, + status TEXT NOT NULL, error_count INTEGER NOT NULL DEFAULT 0, last_error TEXT, queue TEXT NOT NULL, @@ -800,10 +965,10 @@ CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, run_at, pri const queueName string = "some-queue" for i := 0; i < 101; i++ { _, err = connPool.Exec(ctx, `INSERT INTO gue_jobs -(queue, priority, run_at, job_type, args, created_at, updated_at) +(queue, priority, run_at, job_type, args, skip_delete, status, created_at, updated_at) VALUES -($1, $2, $3, $4, $5, $6, $6) -`, queueName, 0, now, "foo-bar", []byte(fmt.Sprintf(`{"job":%d}`, i)), now) +($1, $2, $3, $4, $5, $6, $7, $8, $8) +`, queueName, 0, now, "foo-bar", []byte(fmt.Sprintf(`{"job":%d}`, i)), 0, JobStatusTodo, now) require.NoError(t, err) } @@ -821,7 +986,7 @@ VALUES require.NotNil(t, j1) t.Logf("Locked a job: %s %s", j1.ID.String(), string(j1.Args)) - err = j1.Delete(ctx) + err = j1.Finish(ctx, JobStatusSuccess) require.NoError(t, err) err = j1.Done(ctx) require.NoError(t, err) diff --git a/job.go b/job.go index 7794a80..8ff77f8 100644 --- a/job.go +++ b/job.go @@ -24,6 +24,15 @@ const ( JobPriorityLowest JobPriority = 32767 ) +type JobStatus string + +const ( + JobStatusTodo JobStatus = "todo" + JobStatusRetry JobStatus = "retry" + JobStatusSuccess JobStatus = "success" + JobStatusError JobStatus = "error" +) + // Job is a single unit of work for Gue to perform. type Job struct { // ID is the unique database ID of the Job. It is ignored on job creation. @@ -49,6 +58,14 @@ type Job struct { // Args for the job. Args []byte + // SkipDelete is it necessary to keep task in postgres queue after successful execution. + // If this field set to true, gue won't delete task from postgres table. + SkipDelete bool + + // JobStatus is the task execution status. + // It is ignored on job creation. + Status JobStatus + // ErrorCount is the number of times this job has attempted to run, but failed with an error. // It is ignored on job creation. // This field is initialised only when the Job is being retrieved from the DB and is not @@ -66,11 +83,11 @@ type Job struct { // whether it makes sense to retry the job or it can be dropped. CreatedAt time.Time - mu sync.Mutex - deleted bool - tx adapter.Tx - backoff Backoff - logger adapter.Logger + mu sync.Mutex + finished bool + tx adapter.Tx + backoff Backoff + logger adapter.Logger } // Tx returns DB transaction that this job is locked to. You may use @@ -81,25 +98,33 @@ func (j *Job) Tx() adapter.Tx { return j.tx } -// Delete marks this job as complete by deleting it from the database. +// Finish marks this job as complete. +// If job has property SkipDelete set to true, then Finish function will mark it as finished by changing its status field. +// If job has no such property, it will mark job as complete by deleting it from the database. // // You must also later call Done() to return this job's database connection to // the pool. If you got the job from the worker - it will take care of cleaning up the job and resources, // no need to do this manually in a WorkFunc. -func (j *Job) Delete(ctx context.Context) error { +func (j *Job) Finish(ctx context.Context, status JobStatus) error { j.mu.Lock() defer j.mu.Unlock() - if j.deleted { + if j.finished { return nil } - _, err := j.tx.Exec(ctx, `DELETE FROM gue_jobs WHERE job_id = $1`, j.ID.String()) + var err error + if j.SkipDelete { + _, err = j.tx.Exec(ctx, `UPDATE gue_jobs SET status = $1 WHERE job_id = $2`, status, j.ID.String()) + } else { + _, err = j.tx.Exec(ctx, `DELETE FROM gue_jobs WHERE job_id = $1`, j.ID.String()) + } + if err != nil { return err } - j.deleted = true + j.finished = true return nil } @@ -150,14 +175,25 @@ func (j *Job) Error(ctx context.Context, jErr error) (err error) { adapter.F("job-errors", errorCount), adapter.Err(jErr), ) - err = j.Delete(ctx) + + // save last error for history + _, err = j.tx.Exec( + ctx, + `UPDATE gue_jobs SET error_count = $1, last_error = $2, updated_at = $3 WHERE job_id = $4`, + errorCount, jErr.Error(), now, j.ID.String(), + ) + if err != nil { + return + } + + err = j.Finish(ctx, JobStatusError) return } _, err = j.tx.Exec( ctx, - `UPDATE gue_jobs SET error_count = $1, run_at = $2, last_error = $3, updated_at = $4 WHERE job_id = $5`, - errorCount, newRunAt, jErr.Error(), now, j.ID.String(), + `UPDATE gue_jobs SET error_count = $1, run_at = $2, last_error = $3, updated_at = $4, status = $5 WHERE job_id = $6`, + errorCount, newRunAt, jErr.Error(), now, JobStatusRetry, j.ID.String(), ) return err diff --git a/migrations/schema.sql b/migrations/schema.sql index 04a2b6b..b0a4b25 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -5,6 +5,8 @@ CREATE TABLE IF NOT EXISTS gue_jobs run_at TIMESTAMPTZ NOT NULL, job_type TEXT NOT NULL, args BYTEA NOT NULL, + skip_delete SMALLINT NOT NULL, + status TEXT NOT NULL, error_count INTEGER NOT NULL DEFAULT 0, last_error TEXT, queue TEXT NOT NULL, diff --git a/worker.go b/worker.go index ccfe6a2..b86f84b 100644 --- a/worker.go +++ b/worker.go @@ -265,7 +265,7 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { hook(ctx, j, nil) } - err = j.Delete(ctx) + err = j.Finish(ctx, JobStatusSuccess) if err != nil { span.RecordError(fmt.Errorf("failed to delete finished job: %w", err)) ll.Error("Got an error on deleting a job", adapter.Err(err))