Skip to content

Commit

Permalink
fix: MaxRetries not persisting in PG backend
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Oct 4, 2023
1 parent de2e101 commit afd1f4c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
postgres:
image: postgres:14-alpine
env:
TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable&pool_max_conns=2
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
Expand All @@ -41,7 +42,6 @@ jobs:
- name: test
run: make mod test coverage
env:
TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable
TEST_REDIS_URL: redis:6379
- name: upload results
uses: actions/upload-artifact@v3
Expand Down
10 changes: 5 additions & 5 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
return
}

if p.pool == nil {
if p.pool == nil { //nolint: nestif
var poolConfig *pgxpool.Config
poolConfig, err = pgxpool.ParseConfig(p.config.ConnectionString)
if err != nil || p.config.ConnectionString == "" {
Expand Down Expand Up @@ -419,9 +419,9 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
}

p.logger.Debug("adding job to the queue")
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline)
VALUES ($1, $2, $3, $4, $5) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline).Scan(&jobID)
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if err != nil {
err = fmt.Errorf("unable add job to queue: %w", err)
return
Expand Down Expand Up @@ -480,7 +480,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
return fmt.Errorf("error getting tx from context: %w", err)
}

if job.Retries >= job.MaxRetries {
if job.MaxRetries != nil && job.Retries >= *job.MaxRetries {
err = p.moveToDeadQueue(ctx, tx, job, jobErr)
return
}
Expand Down
44 changes: 39 additions & 5 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")
var (
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
conn *pgxpool.Pool
)

func flushDB() {
ctx := context.Background()
Expand All @@ -30,7 +33,16 @@ func flushDB() {
return
}

conn, err := pgx.Connect(context.Background(), dbURL)
var err error
var poolConfig *pgxpool.Config
poolConfig, err = pgxpool.ParseConfig(dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to parse database url: '%s': %v", dbURL, err)
return
}
poolConfig.MaxConns = 2

conn, err = pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
// an error was encountered connecting to the db. this may simply mean that we're running tests against an
// uninitialized database and the database needs to be created. By creating a new pg backend instance with
Expand All @@ -43,7 +55,6 @@ func flushDB() {
return
}
}
defer conn.Close(context.Background())

_, err = conn.Query(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic
if err != nil {
Expand All @@ -61,6 +72,7 @@ func TestMain(m *testing.M) {
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
const queue = "testing"
maxRetries := 5
done := make(chan bool)
defer close(done)

Expand Down Expand Up @@ -95,7 +107,8 @@ func TestBasicJobProcessing(t *testing.T) {
Payload: map[string]interface{}{
"message": "hello world",
},
Deadline: &deadline,
Deadline: &deadline,
MaxRetries: &maxRetries,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
Expand All @@ -106,11 +119,32 @@ func TestBasicJobProcessing(t *testing.T) {
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Error(err)
}

// ensure job has fields set correctly
var jdl time.Time
var jmxrt int

err = conn.
QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid).
Scan(&jdl, &jmxrt)
if err != nil {
t.Error(err)
}

jdl = jdl.In(time.UTC)
// dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as
// strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html
if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) {
t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113
}

if jmxrt != maxRetries {
t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113
}

t.Cleanup(func() {
flushDB()
})
Expand Down
2 changes: 1 addition & 1 deletion jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Job struct {
RanAt null.Time `db:"ran_at"` // The last time the job ran
Error null.String `db:"error"` // The last error the job elicited
Retries int `db:"retries"` // The number of times the job has retried
MaxRetries int `db:"max_retries"` // The maximum number of times the job can retry
MaxRetries *int `db:"max_retries"` // The maximum number of times the job can retry
CreatedAt time.Time `db:"created_at"` // The time the job was created
}

Expand Down

0 comments on commit afd1f4c

Please sign in to comment.