Skip to content

Commit

Permalink
Bug fixing for postgres and memory backends, polish the unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pconstantinou committed Jan 18, 2024
1 parent 22fe80c commit 68ff6c5
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 165 deletions.
24 changes: 16 additions & 8 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,23 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...n
if !options.Override {
return jobs.DuplicateJobID, jobs.ErrJobFingerprintConflict
}
}
m.fingerprints.Store(job.Fingerprint, job)

m.mu.Lock()
m.jobCount++
m.mu.Unlock()
oldJob, found := m.fingerprints.Swap(job.Fingerprint, job)
if found {
// Return the same JobID to make it the same as posgres
job.ID = oldJob.(*jobs.Job).ID
} else {
m.logger.Info("Expected to get job but none was returned for fingerprint %s", job.Fingerprint)
}
jobID = fmt.Sprint(job.ID)

job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)
} else {
m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
m.jobCount++
m.mu.Unlock()
job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)
}

if job.RunAfter.Equal(now) {
queueChan <- job
Expand Down
8 changes: 3 additions & 5 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,11 @@ result_loop:
}
}

func TestFutureJobSchedulingOverride(t *testing.T) {
func TestSuite(t *testing.T) {
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
n, err := neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

backends.TestOverrideFingerprint(t, ctx, nq)
backends.NewNeoQTestSuite(n).Run(t)
}
140 changes: 0 additions & 140 deletions backends/override_fingerprint.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
DROP CONSTRAINT neoq_jobs_fingerprint_constraint_idx;
CREATE INDEX IF NOT EXISTS neoq_jobs_fingerprint_idx ON neoq_jobs (fingerprint, status);
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (queue, fingerprint, status) WHERE NOT (status = 'processed');
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
ALTER TABLE neoq_jobs ADD CONSTRAINT neoq_jobs_fingerprint_constraint_idx UNIQUE (queue, fingerprint, status, ran_at);
DROP INDEX IF EXISTS neoq_jobs_fingerprint_idx;
DROP INDEX IF EXISTS neoq_jobs_fingerprint_unique_idx;

CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (queue, status, fingerprint, ran_at);
11 changes: 7 additions & 4 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,14 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, opti
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
} else {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (queue, fingerprint, status, ran_at) DO
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (queue, status, fingerprint, ran_at) DO
UPDATE SET
payload=$3, run_after=$4, deadline=$5, max_retries=$6
RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if err != nil {
p.logger.Error("error enqueueing override job", slog.Any("error", err))
}
}

if err != nil {
Expand Down Expand Up @@ -849,19 +852,19 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
var tx pgx.Tx
conn, err := p.acquire(ctx)
if err != nil {
return
return err
}
defer conn.Release()

tx, err = conn.Begin(ctx)
if err != nil {
return
return err

Check failure on line 861 in backends/postgres/postgres_backend.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func (*github.com/jackc/pgx/v5/pgxpool.Conn).Begin(ctx context.Context) (github.com/jackc/pgx/v5.Tx, error) (wrapcheck)
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

job, err = p.getJob(ctx, tx, jobID)
if err != nil {
return
return err
}

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
Expand Down
11 changes: 7 additions & 4 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,15 +777,18 @@ func Test_ConnectionTimeout(t *testing.T) {
t.Error(err)
}
}

func TestFutureJobSchedulingOverride(t *testing.T) {
func setup(t *testing.T) (neoq.Neoq, context.Context) {

Check failure on line 780 in backends/postgres/postgres_backend_test.go

View workflow job for this annotation

GitHub Actions / lint

test helper function should start from t.Helper() (thelper)
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)
backends.TestOverrideFingerprint(t, ctx, nq)
return nq, ctx
}

func TestSuite(t *testing.T) {
n, _ := setup(t)
backends.NewNeoQTestSuite(n).Run(t)
}
6 changes: 3 additions & 3 deletions backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ result_loop:
}
}

func TestFutureJobSchedulingOverride(t *testing.T) {
func TestSuite(t *testing.T) {
connString := os.Getenv("TEST_REDIS_URL")
if connString == "" {
t.Skip("Skipping: TEST_REDIS_URL not set")
Expand All @@ -461,6 +461,6 @@ func TestFutureJobSchedulingOverride(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)
backends.TestOverrideFingerprint(t, ctx, nq)

backends.NewNeoQTestSuite(nq).Run(t)
}
30 changes: 30 additions & 0 deletions backends/suite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package backends

import (
"context"
"testing"

"github.com/acaloiaro/neoq"
"github.com/stretchr/testify/suite"
)

type NeoQTestSuite struct {
suite.Suite
NeoQ neoq.Neoq
}

// NewNeoQTestSuite constructs a new NeoQ test suite that can be used to test
// any impementation of the queue
func NewNeoQTestSuite(q neoq.Neoq) *NeoQTestSuite {
n := new(NeoQTestSuite)
n.NeoQ = q
return n
}

func (s *NeoQTestSuite) Run(t *testing.T) {

Check failure on line 24 in backends/suite.go

View workflow job for this annotation

GitHub Actions / lint

test helper function should start from t.Helper() (thelper)
suite.Run(t, s)
}

func (s *NeoQTestSuite) TearDownSuite() {
s.NeoQ.Shutdown(context.Background())
}
Loading

0 comments on commit 68ff6c5

Please sign in to comment.