Skip to content

Commit

Permalink
fix(pg): Fixed error logs for multi node cron jobs. (#94)
Browse files Browse the repository at this point in the history
Resolves #76
  • Loading branch information
elliotcourant authored Oct 6, 2023
1 parent 0db1180 commit 42da4d8
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 82 deletions.
12 changes: 11 additions & 1 deletion backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) {

// initializeDB initializes the tables, types, and indices necessary to operate Neoq
//
// This will consume the migration files embedded at build time and will connect to the DB using its own tooling and
// perform the migrations. After which it will close its DB connections since they will not be needed after
// initialization.
//
//nolint:funlen,gocyclo,cyclop
func (p *PgBackend) initializeDB() (err error) {
migrations, err := iofs.New(migrationsFS, "migrations")
Expand Down Expand Up @@ -256,6 +260,8 @@ func (p *PgBackend) initializeDB() (err error) {
p.logger.Error("unable to run migrations", "error", err)
return
}
// We don't need the migration tooling to hold it's connections to the DB once it has been completed.
defer m.Close()

err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
Expand Down Expand Up @@ -369,7 +375,11 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
if err = p.cron.AddFunc(cronSpec, func() {
_, err := p.Enqueue(ctx, &jobs.Job{Queue: queue})
if err != nil {
if errors.Is(err, context.Canceled) {
// When we are working with a cron we want to ignore the canceled and the duplicate job errors. The duplicate job
// error specifically is not one the cron enqueuer needs to concern itself with because that means that another
// worker has already enqueued the job for this cron recurrence. It is not helpful to log the error in that
// scenario since the job will be processed.
if errors.Is(err, context.Canceled) || errors.Is(err, ErrDuplicateJob) {
return
}

Expand Down
182 changes: 101 additions & 81 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,69 +21,75 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

const (
ConcurrentWorkers = 8
)

var (
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
conn *pgxpool.Pool
)

func flushDB() {
// prepareAndCleanupDB should be run at the beginning of each test. It will check to see if the TEST_DATABASE_URL is
// present and has a valid connection string. If it does it will connect to the DB and clean up any jobs that might be
// lingering in the jobs table if that table exists. It will then return the connection string it found. If the
// connection string is not present then it will cause the current test to skip automatically. If the connection string
// is invalid or it cannot connect to the DB it will fail the current test.
func prepareAndCleanupDB(t *testing.T) (dbURL string, conn *pgxpool.Pool) {
t.Helper()
ctx := context.Background()
dbURL := os.Getenv("TEST_DATABASE_URL")
dbURL = os.Getenv("TEST_DATABASE_URL")
if dbURL == "" {
return
t.Skip("TEST_DATABASE_URL environment variable is missing, test requires a PostgreSQL database to continue")
return "", nil
}

var err error
var poolConfig *pgxpool.Config
poolConfig, err = pgxpool.ParseConfig(dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to parse database url: '%s': %v", dbURL, err)
return
t.Fatalf("unable to parse database url: '%s': %+v", dbURL, err)
return dbURL, nil
}
poolConfig.MaxConns = 2

conn, err = pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
// an error was encountered connecting to the db. this may simply mean that we're running tests against an
// uninitialized database and the database needs to be created. By creating a new pg backend instance with
// neoq.New, we run the db initialization process.
// if no errors return from `New`, then we've succeeded
var newErr error
_, newErr = neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(dbURL))
if newErr != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
return
}
t.Fatalf("failed to connect to the database in TEST_DATABASE_URL: %+v", err)
return dbURL, nil
}

_, err = conn.Query(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic
if err != nil {
fmt.Fprintf(os.Stderr, "'neoq_jobs' table flush failed: %v\n", err)
}
// Delete everything in the neoq_jobs table if it exists
// We don't _need_ to concern ourselves with an error here because the only way this query would fail is if the table
// does not exist. Which is fine because anything within these tests would simply create that table immediately upon
// starting.
_, _ = conn.Exec(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic

// Since this is running at the beginning of each test, make sure that when the test is finished we clean up anything
// we allocated here.
t.Cleanup(func() {
conn.Close()
})

// Return the conn url so that the calling test can use it.
return dbURL, conn
}

func TestMain(m *testing.M) {
flushDB()
code := m.Run()
os.Exit(code)
}

// TestBasicJobProcessing tests that the postgres backend is able to process the most basic jobs with the
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
maxRetries := 5
done := make(chan bool)
defer close(done)

timeoutTimer := time.After(5 * time.Second)

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}

ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
Expand Down Expand Up @@ -144,32 +150,19 @@ func TestBasicJobProcessing(t *testing.T) {
if jmxrt != maxRetries {
t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113
}

t.Cleanup(func() {
flushDB()
})
}

func TestMultipleProcessors(t *testing.T) {
const queue = "testing"

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}

t.Cleanup(func() {
flushDB()
})
connString, _ := prepareAndCleanupDB(t)

var execCount uint32
var wg sync.WaitGroup
count := 4
neos := make([]neoq.Neoq, 0, count)
neos := make([]neoq.Neoq, 0, ConcurrentWorkers)
// Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different
// workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers.
for i := 0; i < count; i++ {
for i := 0; i < ConcurrentWorkers; i++ {
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
Expand Down Expand Up @@ -200,7 +193,7 @@ func TestMultipleProcessors(t *testing.T) {

// From one of the neoq clients, enqueue several jobs. At least one per processor registered above.
nq := neos[0]
for i := 0; i < count; i++ {
for i := 0; i < ConcurrentWorkers; i++ {
wg.Add(1)
ctx := context.Background()
deadline := time.Now().UTC().Add(10 * time.Second)
Expand All @@ -220,20 +213,16 @@ func TestMultipleProcessors(t *testing.T) {
wg.Wait()

// Make sure that we executed the expected number of jobs.
if execCount != uint32(count) {
t.Fatalf("mismatch number of executions. Expected: %d Found: %d", count, execCount)
if execCount != uint32(ConcurrentWorkers) {
t.Fatalf("mismatch number of executions. Expected: %d Found: %d", ConcurrentWorkers, execCount)
}
}

// TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates
func TestDuplicateJobRejection(t *testing.T) {
const queue = "testing"

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
Expand Down Expand Up @@ -267,10 +256,6 @@ func TestDuplicateJobRejection(t *testing.T) {
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

// TestBasicJobMultipleQueue tests that the postgres backend is able to process jobs on multiple queues
Expand All @@ -282,11 +267,7 @@ func TestBasicJobMultipleQueue(t *testing.T) {

timeoutTimer := time.After(5 * time.Second)

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
Expand Down Expand Up @@ -354,21 +335,13 @@ results_loop:
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

func TestCron(t *testing.T) {
done := make(chan bool, 1)
defer close(done)
const cron = "* * * * * *"
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
Expand Down Expand Up @@ -404,22 +377,73 @@ func TestCron(t *testing.T) {
if err != nil {
t.Error(err)
}
}

t.Cleanup(func() {
flushDB()
})
func TestMultipleCronNodes(t *testing.T) {
jobsProcessed := sync.Map{}
const cron = "* * * * * *"
connString, _ := prepareAndCleanupDB(t)

workers := make([]neoq.Neoq, ConcurrentWorkers)
var jobsCompleted uint32
var duplicateJobs uint32
for i := 0; i < ConcurrentWorkers; i++ {
ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
nq.Shutdown(ctx)
})
h := handler.NewPeriodic(func(ctx context.Context) (err error) {
job, err := jobs.FromContext(ctx)
if err != nil {
t.Fatalf("failed to extract job details from context: %+v", err)
return nil
}
_, exists := jobsProcessed.LoadOrStore(job.ID, "foo")
if exists {
t.Fatalf("job (%d) has already been processed by another worker!", job.ID)
atomic.AddUint32(&duplicateJobs, 1)
return nil
}
atomic.AddUint32(&jobsCompleted, 1)
return
})

h.WithOptions(
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

err = nq.StartCron(ctx, cron, h)
if err != nil {
t.Error(err)
}

workers[i] = nq
}

const WaitForJobTime = 1100 * time.Millisecond

// allow time for listener to start and for at least one job to process
time.Sleep(WaitForJobTime)
if jobsCompleted == 0 {
t.Fatalf("no jobs were completed after %v", WaitForJobTime)
}

if duplicateJobs > 0 {
t.Fatalf("some jobs were processed more than once")
}
}

// TestBasicJobProcessingWithErrors tests that the postgres backend is able to update the status of jobs that fail
func TestBasicJobProcessingWithErrors(t *testing.T) {
const queue = "testing"
logsChan := make(chan string, 100)
timeoutTimer := time.After(5 * time.Second)
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx,
Expand Down Expand Up @@ -472,8 +496,4 @@ results_loop:
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

0 comments on commit 42da4d8

Please sign in to comment.