diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ae1f325..26ffdc2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,6 +20,7 @@ jobs: postgres: image: postgres:14-alpine env: + TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable&pool_max_conns=2 POSTGRES_PASSWORD: postgres options: >- --health-cmd pg_isready @@ -41,7 +42,6 @@ jobs: - name: test run: make mod test coverage env: - TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable TEST_REDIS_URL: redis:6379 - name: upload results uses: actions/upload-artifact@v3 diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 7889ea5..f582c6e 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -133,7 +133,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err return } - if p.pool == nil { + if p.pool == nil { //nolint: nestif var poolConfig *pgxpool.Config poolConfig, err = pgxpool.ParseConfig(p.config.ConnectionString) if err != nil || p.config.ConnectionString == "" { @@ -419,9 +419,9 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job } p.logger.Debug("adding job to the queue") - err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline) - VALUES ($1, $2, $3, $4, $5) RETURNING id`, - j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline).Scan(&jobID) + err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries) + VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`, + j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) if err != nil { err = fmt.Errorf("unable add job to queue: %w", err) return @@ -480,7 +480,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { return fmt.Errorf("error getting tx from context: %w", err) } - if job.Retries >= job.MaxRetries { + if job.MaxRetries != nil && job.Retries >= *job.MaxRetries { err = p.moveToDeadQueue(ctx, tx, job, jobErr) return } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5412617..17692e2 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -18,10 +18,13 @@ import ( "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" "github.com/acaloiaro/neoq/testutils" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) -var errPeriodicTimeout = errors.New("timed out waiting for periodic job") +var ( + errPeriodicTimeout = errors.New("timed out waiting for periodic job") + conn *pgxpool.Pool +) func flushDB() { ctx := context.Background() @@ -30,7 +33,16 @@ func flushDB() { return } - conn, err := pgx.Connect(context.Background(), dbURL) + var err error + var poolConfig *pgxpool.Config + poolConfig, err = pgxpool.ParseConfig(dbURL) + if err != nil { + fmt.Fprintf(os.Stderr, "unable to parse database url: '%s': %v", dbURL, err) + return + } + poolConfig.MaxConns = 2 + + conn, err = pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { // an error was encountered connecting to the db. this may simply mean that we're running tests against an // uninitialized database and the database needs to be created. By creating a new pg backend instance with @@ -43,7 +55,6 @@ func flushDB() { return } } - defer conn.Close(context.Background()) _, err = conn.Query(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic if err != nil { @@ -61,6 +72,7 @@ func TestMain(m *testing.M) { // most basic configuration. func TestBasicJobProcessing(t *testing.T) { const queue = "testing" + maxRetries := 5 done := make(chan bool) defer close(done) @@ -95,7 +107,8 @@ func TestBasicJobProcessing(t *testing.T) { Payload: map[string]interface{}{ "message": "hello world", }, - Deadline: &deadline, + Deadline: &deadline, + MaxRetries: &maxRetries, }) if e != nil || jid == jobs.DuplicateJobID { t.Error(e) @@ -106,11 +119,32 @@ func TestBasicJobProcessing(t *testing.T) { err = jobs.ErrJobTimeout case <-done: } + if err != nil { + t.Error(err) + } + + // ensure job has fields set correctly + var jdl time.Time + var jmxrt int + err = conn. + QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid). + Scan(&jdl, &jmxrt) if err != nil { t.Error(err) } + jdl = jdl.In(time.UTC) + // dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as + // strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html + if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) { + t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113 + } + + if jmxrt != maxRetries { + t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113 + } + t.Cleanup(func() { flushDB() }) diff --git a/jobs/jobs.go b/jobs/jobs.go index 6a6c012..5ad628a 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -42,7 +42,7 @@ type Job struct { RanAt null.Time `db:"ran_at"` // The last time the job ran Error null.String `db:"error"` // The last error the job elicited Retries int `db:"retries"` // The number of times the job has retried - MaxRetries int `db:"max_retries"` // The maximum number of times the job can retry + MaxRetries *int `db:"max_retries"` // The maximum number of times the job can retry CreatedAt time.Time `db:"created_at"` // The time the job was created }