Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep finished jobs in the database for history, but skip it while processing the queue #255

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Querya
}

_, err = q.Exec(ctx, `INSERT INTO gue_jobs
(job_id, queue, priority, run_at, job_type, args, created_at, updated_at)
(job_id, queue, priority, run_at, job_type, args, skip_delete, status, created_at, updated_at)
VALUES
($1, $2, $3, $4, $5, $6, $7, $7)
`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, j.CreatedAt)
($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)
`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, boolToInt16(j.SkipDelete), JobStatusTodo, j.CreatedAt)

c.logger.Debug(
"Tried to enqueue a job",
Expand Down Expand Up @@ -177,13 +177,13 @@ func (c *Client) execEnqueue(ctx context.Context, j *Job, q adapter.Queryable) e
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
WHERE queue = $1 AND run_at <= $2 AND (status = $3 OR status = $4)
ORDER BY priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`

return c.execLockJob(ctx, true, sql, queue, time.Now().UTC())
return c.execLockJob(ctx, true, sql, queue, time.Now().UTC(), JobStatusTodo, JobStatusRetry)
}

// LockJobByID attempts to retrieve a specific Job from the database.
Expand All @@ -197,7 +197,7 @@ LIMIT 1 FOR UPDATE SKIP LOCKED`
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at
FROM gue_jobs
WHERE job_id = $1 FOR UPDATE SKIP LOCKED`

Expand All @@ -218,13 +218,13 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED`
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
sql := `SELECT job_id, queue, priority, run_at, job_type, args, skip_delete, status, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
WHERE queue = $1 AND run_at <= $2 AND (status = $3 OR status = $4)
ORDER BY run_at, priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`

return c.execLockJob(ctx, true, sql, queue, time.Now().UTC())
return c.execLockJob(ctx, true, sql, queue, time.Now().UTC(), JobStatusTodo, JobStatusRetry)
}

func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql string, args ...any) (*Job, error) {
Expand All @@ -236,18 +236,22 @@ func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql stri

j := Job{tx: tx, backoff: c.backoff, logger: c.logger}

var skipDelete int16
err = tx.QueryRow(ctx, sql, args...).Scan(
&j.ID,
&j.Queue,
&j.Priority,
&j.RunAt,
&j.Type,
&j.Args,
&skipDelete,
&j.Status,
&j.ErrorCount,
&j.LastError,
&j.CreatedAt,
)
if err == nil {
j.SkipDelete = int16ToBool(skipDelete)
c.mLockJob.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(true)))
return &j, nil
}
Expand Down Expand Up @@ -279,3 +283,17 @@ func (c *Client) initMetrics() (err error) {

return nil
}

func boolToInt16(v bool) int16 {
if v {
return 1
}
return 0
}

func int16ToBool(v int16) bool {
if v > 0 {
return true
}
return false
}
183 changes: 174 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func testLockJobCustomQueue(t *testing.T, connPool adapter.ConnPool) {
assert.NoError(t, err)
})

err = j.Delete(ctx)
err = j.Finish(ctx, JobStatusSuccess)
require.NoError(t, err)
}

Expand Down Expand Up @@ -405,15 +405,15 @@ func testJobConnRace(t *testing.T, connPool adapter.ConnPool) {
wg.Wait()
}

func TestJobDelete(t *testing.T) {
func TestJobFinish(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobDelete(t, openFunc(t))
testJobFinish(t, openFunc(t))
})
}
}

func testJobDelete(t *testing.T, connPool adapter.ConnPool) {
func testJobFinish(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
Expand All @@ -427,7 +427,7 @@ func testJobDelete(t *testing.T, connPool adapter.ConnPool) {
require.NoError(t, err)
require.NotNil(t, j)

err = j.Delete(ctx)
err = j.Finish(ctx, JobStatusSuccess)
require.NoError(t, err)

err = j.Done(ctx)
Expand All @@ -440,6 +440,48 @@ func testJobDelete(t *testing.T, connPool adapter.ConnPool) {
assert.Nil(t, jj)
}

func TestJobFinishSkipDelete(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobFinishSkipDelete(t, openFunc(t))
})
}
}

func testJobFinishSkipDelete(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

job := Job{Type: "MyJob", SkipDelete: true}
err = c.Enqueue(ctx, &job)
require.NoError(t, err)

j, err := c.LockJob(ctx, "")
require.NoError(t, err)
require.NotNil(t, j)

err = j.Finish(ctx, JobStatusSuccess)
require.NoError(t, err)

err = j.Done(ctx)
require.NoError(t, err)

// make sure job was not deleted
j2, err := c.LockJobByID(ctx, job.ID)
require.NoError(t, err)
require.NotNil(t, j2)

t.Cleanup(func() {
err := j2.Done(ctx)
assert.NoError(t, err)
})

assert.Equal(t, job.Type, j2.Type)
assert.Equal(t, JobStatusSuccess, j2.Status)
}

func TestJobDone(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -536,6 +578,50 @@ func testJobError(t *testing.T, connPool adapter.ConnPool) {
assert.Equal(t, msg, j2.LastError.String)
assert.Equal(t, int32(1), j2.ErrorCount)
assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix())
assert.Equal(t, JobStatusRetry, j2.Status)
}

func TestJobErrorSkipDelete(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobErrorSkipDelete(t, openFunc(t))
})
}
}

func testJobErrorSkipDelete(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

job := &Job{Type: "MyJob", SkipDelete: true}
err = c.Enqueue(ctx, job)
require.NoError(t, err)

j, err := c.LockJob(ctx, "")
require.NoError(t, err)
require.NotNil(t, j)

msg := "world\nended"
err = j.Error(ctx, errors.New(msg))
require.NoError(t, err)

// make sure job was not deleted
j2, err := c.LockJobByID(ctx, job.ID)
require.NoError(t, err)
require.NotNil(t, j2)

t.Cleanup(func() {
err := j2.Done(ctx)
assert.NoError(t, err)
})

assert.True(t, j2.LastError.Valid)
assert.Equal(t, msg, j2.LastError.String)
assert.Equal(t, int32(1), j2.ErrorCount)
assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix())
assert.Equal(t, JobStatusRetry, j2.Status)
}

func TestJobErrorCustomBackoff(t *testing.T) {
Expand Down Expand Up @@ -584,6 +670,83 @@ func testJobErrorCustomBackoff(t *testing.T, connPool adapter.ConnPool) {
assert.Greater(t, j2.RunAt.Unix(), job.RunAt.Unix())
// a diff in a sec is possible when doing dates math, so allow it
assert.WithinDuration(t, job.RunAt.Add(time.Hour), j2.RunAt, time.Second)
assert.Equal(t, JobStatusRetry, j2.Status)
}

func TestJobErrorBackoffNever(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobErrorBackoffNever(t, openFunc(t))
})
}
}

func testJobErrorBackoffNever(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool, WithClientBackoff(BackoffNever))
require.NoError(t, err)

job := &Job{Type: "MyJob"}
err = c.Enqueue(ctx, job)
require.NoError(t, err)

j, err := c.LockJob(ctx, "")
require.NoError(t, err)
require.NotNil(t, j)

msg := "world\nended"
err = j.Error(ctx, errors.New(msg))
require.NoError(t, err)

// make sure job was deleted
jj, err := c.LockJobByID(ctx, job.ID)
require.Error(t, err)
assert.True(t, errors.Is(err, adapter.ErrNoRows))
assert.Nil(t, jj)
}

func TestJobErrorBackoffNeverSkipDelete(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobErrorBackoffNeverSkipDelete(t, openFunc(t))
})
}
}

func testJobErrorBackoffNeverSkipDelete(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool, WithClientBackoff(BackoffNever))
require.NoError(t, err)

job := &Job{Type: "MyJob", SkipDelete: true}
err = c.Enqueue(ctx, job)
require.NoError(t, err)

j, err := c.LockJob(ctx, "")
require.NoError(t, err)
require.NotNil(t, j)

msg := "world\nended"
err = j.Error(ctx, errors.New(msg))
require.NoError(t, err)

// make sure job was not deleted
j2, err := c.LockJobByID(ctx, job.ID)
require.NoError(t, err)
require.NotNil(t, j2)

t.Cleanup(func() {
err := j2.Done(ctx)
assert.NoError(t, err)
})

assert.True(t, j2.LastError.Valid)
assert.Equal(t, msg, j2.LastError.String)
assert.Equal(t, int32(1), j2.ErrorCount)
assert.Equal(t, j2.RunAt.Unix(), job.RunAt.Unix())
assert.Equal(t, JobStatusError, j2.Status)
}

func TestJobPriority(t *testing.T) {
Expand Down Expand Up @@ -783,6 +946,8 @@ CREATE TABLE gue_jobs
run_at TIMESTAMPTZ NOT NULL,
job_type TEXT NOT NULL,
args BYTEA NOT NULL,
skip_delete SMALLINT NOT NULL,
status TEXT NOT NULL,
error_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
queue TEXT NOT NULL,
Expand All @@ -800,10 +965,10 @@ CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, run_at, pri
const queueName string = "some-queue"
for i := 0; i < 101; i++ {
_, err = connPool.Exec(ctx, `INSERT INTO gue_jobs
(queue, priority, run_at, job_type, args, created_at, updated_at)
(queue, priority, run_at, job_type, args, skip_delete, status, created_at, updated_at)
VALUES
($1, $2, $3, $4, $5, $6, $6)
`, queueName, 0, now, "foo-bar", []byte(fmt.Sprintf(`{"job":%d}`, i)), now)
($1, $2, $3, $4, $5, $6, $7, $8, $8)
`, queueName, 0, now, "foo-bar", []byte(fmt.Sprintf(`{"job":%d}`, i)), 0, JobStatusTodo, now)
require.NoError(t, err)
}

Expand All @@ -821,7 +986,7 @@ VALUES
require.NotNil(t, j1)
t.Logf("Locked a job: %s %s", j1.ID.String(), string(j1.Args))

err = j1.Delete(ctx)
err = j1.Finish(ctx, JobStatusSuccess)
require.NoError(t, err)
err = j1.Done(ctx)
require.NoError(t, err)
Expand Down
Loading