Skip to content

Commit

Permalink
chore(test): Adding a test for multiple consumers with the pg backend.
Browse files Browse the repository at this point in the history
This test is to provide a minimal proof that jobs can be consumed by
multiple workers and in a way can only be consumed once. If the
execCount does not match the expected count then this test will fail
because either too many jobs were executed (like one executing twice) or
a job was dropped when it should not have been.
  • Loading branch information
elliotcourant authored and acaloiaro committed Sep 28, 2023
1 parent a17ea73 commit 52e118e
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -114,6 +116,81 @@ func TestBasicJobProcessing(t *testing.T) {
})
}

func TestMultipleProcessors(t *testing.T) {
const queue = "testing"

connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}

t.Cleanup(func() {
flushDB()
})

var execCount uint32
var wg sync.WaitGroup
count := 8
neos := make([]neoq.Neoq, 0, count)
// Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different
// workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers.
for i := 0; i < count; i++ {
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
nq.Shutdown(ctx)
})

h := handler.New(queue, func(_ context.Context) (err error) {
// Make sure that by wasting some time working on a thing we don't consume two jobs back to back.
// This should give the other neoq clients enough time to grab a job as well.
time.Sleep(500 * time.Millisecond)
atomic.AddUint32(&execCount, 1)
wg.Done()
return
})
// Make sure that each neoq worker only works on one thing at a time.
h.Concurrency = 1

err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

neos = append(neos, nq)
}

// From one of the neoq clients, enqueue several jobs. At least one per processor registered above.
nq := neos[0]
for i := 0; i < count; i++ {
wg.Add(1)
ctx := context.Background()
deadline := time.Now().UTC().Add(10 * time.Second)
jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": fmt.Sprintf("hello world: %d", i),
},
Deadline: &deadline,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
}
}

// Wait for all jobs to complete.
wg.Wait()

// Make sure that we executed the expected number of jobs.
if execCount != uint32(count) {
t.Fatalf("mismatch number of executions. Expected: %d Found: %d", count, execCount)
}
}

// TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates
func TestDuplicateJobRejection(t *testing.T) {
const queue = "testing"
Expand Down

0 comments on commit 52e118e

Please sign in to comment.