diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 45c69b5..5412617 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -114,6 +116,81 @@ func TestBasicJobProcessing(t *testing.T) { }) } +func TestMultipleProcessors(t *testing.T) { + const queue = "testing" + + connString := os.Getenv("TEST_DATABASE_URL") + if connString == "" { + t.Skip("Skipping: TEST_DATABASE_URL not set") + return + } + + t.Cleanup(func() { + flushDB() + }) + + var execCount uint32 + var wg sync.WaitGroup + count := 8 + neos := make([]neoq.Neoq, 0, count) + // Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different + // workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers. + for i := 0; i < count; i++ { + ctx := context.Background() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + nq.Shutdown(ctx) + }) + + h := handler.New(queue, func(_ context.Context) (err error) { + // Make sure that by wasting some time working on a thing we don't consume two jobs back to back. + // This should give the other neoq clients enough time to grab a job as well. + time.Sleep(500 * time.Millisecond) + atomic.AddUint32(&execCount, 1) + wg.Done() + return + }) + // Make sure that each neoq worker only works on one thing at a time. + h.Concurrency = 1 + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + neos = append(neos, nq) + } + + // From one of the neoq clients, enqueue several jobs. At least one per processor registered above. + nq := neos[0] + for i := 0; i < count; i++ { + wg.Add(1) + ctx := context.Background() + deadline := time.Now().UTC().Add(10 * time.Second) + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", i), + }, + Deadline: &deadline, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + } + + // Wait for all jobs to complete. + wg.Wait() + + // Make sure that we executed the expected number of jobs. + if execCount != uint32(count) { + t.Fatalf("mismatch number of executions. Expected: %d Found: %d", count, execCount) + } +} + // TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates func TestDuplicateJobRejection(t *testing.T) { const queue = "testing"