Skip to content

Commit

Permalink
Add redis support (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Apr 15, 2023
1 parent 219ea84 commit aaed37e
Show file tree
Hide file tree
Showing 22 changed files with 933 additions and 81 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
container: golang:1.20

Expand All @@ -29,6 +36,7 @@ jobs:
run: make mod test coverage
env:
TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres
TEST_REDIS_URL: redis:6379
- name: upload results
uses: actions/upload-artifact@v3
with:
Expand Down
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,11 @@ vendor/
# Some tests require environment variables to be set
.env

# it's handy to have a docker-compose available during development
docker-compose.yml

.pre-commit-config.yaml

dist/


45 changes: 38 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ Background job processing for Go

# About

Neoq is a background job framework for Go applications. Its purpose is to minimize the infrastructure necessary to run production applications. It does so by implementing queue durability with modular backends.
Neoq is a queue-agnostic background job framework for Go.

This allows application to use the same type of data store for both application data and backround job processing. At the moment an in-memory and Postgres backends are provided. However, the goal is to have backends for every major datastore: Postgres, Redis, MySQL, etc.
Neoq job handlers are the same, whether queues are in-memory for development/testing, or Postgres, Redis, or a custom queue for production -- allowing queue infrastructure to change without code change.

Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a minimal infrastructure footprint.
Developing/testing or don't need a durable queue? Use the in-memory queue.

Running an application in production? Use Postgres.

Have higher throughput demands in production? Use Redis.

Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a _minimal infrastructure footprint_.

# What it does

- **Background job Processing**: Neoq has an in-memory and Postgres backend out of the box. Users may supply their own without changing neoq directly.
- **Multiple Backends**: In-memory, Postgres, Redis, or user-supplied custom backends.
- **Retries**: Jobs may be retried a configurable number of times with exponential backoff and jitter to prevent thundering herds
- **Job uniqueness**: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple unprocessed jobs with the same payload cannot be queued)
- **Deadlines**: Queue handlers can be configured with per-job time deadlines with millisecond accuracy
- **Configurable transaction idle time**: Don't let your background worker transactions run away with db resources. By default, worker transactions may idle for 60 seconds.
- **Job uniqueness**: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple jobs with the same payload are not re-queued)
- **Job Timeouts**: Queue handlers can be configured with per-job timeouts with millisecond accuracy
- **Periodic Jobs**: Jobs can be scheduled periodically using standard cron syntax
- **Future Jobs**: Jobs can be scheduled either for the future or immediate execution
- **Concurrency**: Concurrency is configurable for every queue
Expand Down Expand Up @@ -70,6 +75,32 @@ nq.Enqueue(ctx, &jobs.Job{
})
```

## Redis

**Example**: Process jobs on the "hello_world" queue and add a job to it using the redis backend

```go
ctx := context.Background()
nq, _ := neoq.New(ctx,
neoq.WithBackend(redis.Backend),
redis.WithAddr("localhost:6379"),
redis.WithPassword(""),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))

nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Payload: map[string]interface{}{
"message": "hello world",
},
})
```

## Postgres

**Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend
Expand Down
4 changes: 2 additions & 2 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend,
}

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) {
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
var queueChan chan *jobs.Job
var qc any
var ok bool
Expand Down Expand Up @@ -107,7 +107,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e
m.mu.Unlock()

job.ID = m.jobCount
jobID = m.jobCount
jobID = fmt.Sprint(m.jobCount)

if job.RunAfter.Equal(now) {
queueChan <- job
Expand Down
6 changes: 4 additions & 2 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -186,8 +187,9 @@ func TestFutureJobScheduling(t *testing.T) {
t.Error(err)
}

jobID, _ := strconv.ParseInt(jid, 0, 64)
var ok bool
if _, ok = testFutureJobs.Load(jid); !ok {
if _, ok = testFutureJobs.Load(jobID); !ok {
t.Error(err)
}
}
Expand All @@ -208,7 +210,7 @@ func TestCron(t *testing.T) {
})

h.WithOptions(
handler.Deadline(500*time.Millisecond),
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

Expand Down
60 changes: 27 additions & 33 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -55,11 +54,11 @@ type contextKey struct{}

var (
txCtxVarKey contextKey
shutdownJobID int64 = -1 // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
)

// PgBackend is a Postgres-based Neoq backend
Expand All @@ -70,7 +69,7 @@ type PgBackend struct {
cron *cron.Cron
mu *sync.Mutex // mutex to protect mutating state on a pgWorker
pool *pgxpool.Pool
futureJobs map[int64]time.Time // map of future job IDs to their due time
futureJobs map[string]time.Time // map of future job IDs to their due time
handlers map[string]handler.Handler // a map of queue names to queue handlers
cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown()
}
Expand Down Expand Up @@ -104,7 +103,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err
mu: &sync.Mutex{},
config: config.New(),
handlers: make(map[string]handler.Handler),
futureJobs: make(map[int64]time.Time),
futureJobs: make(map[string]time.Time),
logger: slog.New(slog.NewTextHandler(os.Stdout)),
cron: cron.New(),
cancelFuncs: []context.CancelFunc{},
Expand Down Expand Up @@ -321,7 +320,7 @@ func (p *PgBackend) initializeDB(ctx context.Context) (err error) {
}

// Enqueue adds jobs to the specified queue
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) {
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
ctx, cancel := context.WithCancel(ctx)
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
Expand Down Expand Up @@ -359,14 +358,15 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, er
if err != nil {
err = fmt.Errorf("error enqueuing job: %w", err)
}

if jobID == jobs.DuplicateJobID {
err = ErrDuplicateJobID
return
}

// notify listeners that a new job has arrived if it's not a future job
if job.RunAfter.Equal(now) {
_, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", job.Queue, jobID))
_, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", job.Queue, jobID))
if err != nil {
err = fmt.Errorf("error executing transaction: %w", err)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func (p *PgBackend) Shutdown(ctx context.Context) {
//
// Jobs that are not already fingerprinted are fingerprinted before being added
// Duplicate jobs are not added to the queue. Any two unprocessed jobs with the same fingerprint are duplicates
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID int64, err error) {
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID string, err error) {
err = jobs.FingerprintJob(j)
if err != nil {
return
Expand Down Expand Up @@ -569,7 +569,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {

if time.Until(runAfter) > 0 {
p.mu.Lock()
p.futureJobs[job.ID] = runAfter
p.futureJobs[fmt.Sprint(job.ID)] = runAfter
p.mu.Unlock()
}

Expand Down Expand Up @@ -600,7 +600,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
for i := 0; i < h.Concurrency; i++ {
go func() {
var err error
var jobID int64
var jobID string

for {
select {
Expand Down Expand Up @@ -630,7 +630,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
}

// removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future
func (p *PgBackend) removeFutureJob(jobID int64) {
func (p *PgBackend) removeFutureJob(jobID string) {
if _, ok := p.futureJobs[jobID]; ok {
p.mu.Lock()
delete(p.futureJobs, jobID)
Expand All @@ -647,7 +647,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
return
}

var id int64
var id string
var runAfter time.Time
_, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error {
p.mu.Lock()
Expand Down Expand Up @@ -675,7 +675,7 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
timeUntillRunAfter := time.Until(runAfter)
if timeUntillRunAfter <= p.config.FutureJobWindow {
p.removeFutureJob(jobID)
go func(jid int64) {
go func(jid string) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
p.announceJob(ctx, queue, jid)
Expand All @@ -695,7 +695,7 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
// announceJob announces jobs to queue listeners.
//
// Announced jobs are executed by the first worker to respond to the announcement.
func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64) {
func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
Expand All @@ -712,7 +712,7 @@ func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64)
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx)

// notify listeners that a job is ready to run
_, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", queue, jobID))
_, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", queue, jobID))
if err != nil {
return
}
Expand All @@ -723,8 +723,8 @@ func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64)
}
}

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan int64) {
jobsCh = make(chan int64)
func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) {
jobsCh = make(chan string)

conn, err := p.pool.Acquire(ctx)
if err != nil {
Expand Down Expand Up @@ -756,7 +756,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// it receives pending, periodic, and retry job ids asynchronously
// 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed.
// 2. handleJob secondly calls the handler on the job, and finally updates the job's status
func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handler) (err error) {
func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) {
var job *jobs.Job
var tx pgx.Tx
conn, err := p.pool.Acquire(ctx)
Expand Down Expand Up @@ -815,8 +815,8 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle
// TODO: There is currently no handling of listener disconnects in PgBackend.
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, ready chan bool) {
c = make(chan int64)
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, ready chan bool) {
c = make(chan string)
ready = make(chan bool)

go func(ctx context.Context) {
Expand Down Expand Up @@ -849,18 +849,12 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, rea
continue
}

var jobID int64
if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil {
p.logger.Error("unable to fetch job", err)
continue
}

// check if Shutdown() has been called
if jobID == shutdownJobID {
if notification.Payload == shutdownJobID {
return
}

c <- jobID
c <- notification.Payload
}
}(ctx)

Expand All @@ -881,7 +875,7 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
conn.Release()
}

func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job *jobs.Job, err error) {
func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, PendingJobQuery, jobID)
if err != nil {
return
Expand All @@ -895,7 +889,7 @@ func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID int64, err error) {
func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
return
}
Expand Down
2 changes: 1 addition & 1 deletion backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestCron(t *testing.T) {
})

h.WithOptions(
handler.Deadline(500*time.Millisecond),
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

Expand Down
Loading

0 comments on commit aaed37e

Please sign in to comment.