Skip to content

Commit

Permalink
Acquire a new connection for every queue listener (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Mar 18, 2023
1 parent aabb567 commit eb430e9
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WITH_V: true
DEFAULT_BUMP: patch
DEFAULT_BUMP: minor
-
name: Set up Go
uses: actions/setup-go@v3
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ test-watch:
.PHONY: lint
lint:
@clear
@golangci-lint run .
@golangci-lint run

.PHONY: lint-watch
lint-watch: install-reflex
Expand Down
4 changes: 2 additions & 2 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e
}

if job.Queue == "" {
err = errors.New("this job does not specify a Queue. Please specify a queue")
err = jobs.ErrNoQueueSpecified

return
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
queueChan = qc.(chan *jobs.Job)
queueChan <- j
} else {
m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), errors.New("no queue processor configured"))
m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), handler.ErrNoHandlerForQueue)
}
}(job)
}
Expand Down
4 changes: 2 additions & 2 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestFutureJobScheduling(t *testing.T) {
}

func TestCron(t *testing.T) {
const cron = "* * * * * *"
const cronSpec = "* * * * * *"
ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
Expand All @@ -212,7 +212,7 @@ func TestCron(t *testing.T) {
handler.Concurrency(1),
)

err = nq.StartCron(ctx, cron, h)
err = nq.StartCron(ctx, cronSpec, h)
if err != nil {
t.Error(err)
}
Expand Down
129 changes: 81 additions & 48 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ const (
type contextKey struct{}

var (
txCtxVarKey contextKey
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrNoQueue = errors.New("no queue specified")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
txCtxVarKey contextKey
shutdownJobID int64 = -1 // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
)

// PgBackend is a Postgres-based Neoq backend
Expand All @@ -67,7 +68,6 @@ type PgBackend struct {
config *config.Config
logger logging.Logger
cron *cron.Cron
listenConn *pgx.Conn
mu *sync.Mutex // mutex to protect mutating state on a pgWorker
pool *pgxpool.Pool
futureJobs map[int64]time.Time // map of future job IDs to their due time
Expand Down Expand Up @@ -155,7 +155,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err

pb = p

return
return pb, nil
}

// WithTransactionTimeout sets the time that PgBackend's transactions may be idle before its underlying connection is
Expand Down Expand Up @@ -344,7 +344,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, er
}

if job.Queue == "" {
err = ErrNoQueue
err = jobs.ErrNoQueueSpecified
return
}

Expand Down Expand Up @@ -418,15 +418,17 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
if err = p.cron.AddFunc(cronSpec, func() {
_, err := p.Enqueue(ctx, &jobs.Job{Queue: queue})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

p.logger.Error("error queueing cron job", err)
}
}); err != nil {
return fmt.Errorf("error adding cron: %w", err)
}

err = p.Start(ctx, queue, h)

return
return p.Start(ctx, queue, h)
}

// SetLogger sets this backend's logger
Expand All @@ -435,13 +437,20 @@ func (p *PgBackend) SetLogger(logger logging.Logger) {
}

func (p *PgBackend) Shutdown(ctx context.Context) {
p.pool.Close() // also closes the hijacked listenConn
p.cron.Stop()
for queue := range p.handlers {
p.announceJob(ctx, queue, shutdownJobID)
}

// wait for the announcement to process
time.Sleep(time.Duration(shutdownAnnouncementAllowance) * time.Millisecond)

for _, f := range p.cancelFuncs {
f()
}

p.pool.Close()
p.cron.Stop()

p.cancelFuncs = nil
}

Expand Down Expand Up @@ -560,26 +569,11 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
if h, ok = p.handlers[queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
}
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
}

// use a single connection to listen for jobs on all queues
// TODO: Give more thought to the implications of hijacking connections to LISTEN on in PgBackend
// should this connecton not come from the pool, to avoid tainting it with connections that don't have an idle in
// transaction time out set?
p.mu.Lock()
if p.listenConn == nil {
p.listenConn = conn.Hijack()
}
p.mu.Unlock()

listenJobChan := p.listen(ctx, queue) // listen for 'new' jobs
pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup*

// process all future jobs and retries
// TODO consider performance implications of `scheduleFutureJobs` in PgBackend
go func() { p.scheduleFutureJobs(ctx, queue) }()

for i := 0; i < h.Concurrency; i++ {
Expand All @@ -598,7 +592,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
}

if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
err = nil
continue
}
Expand All @@ -625,26 +619,31 @@ func (p *PgBackend) removeFutureJob(jobID int64) {

// initFutureJobs is intended to be run once to initialize the list of future jobs that must be monitored for
// execution. it should be run only during system startup.
func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) {
func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) {
rows, err := p.pool.Query(ctx, FutureJobQuery, queue)
if err != nil {
p.logger.Error("error fetching future jobs list", err)
p.logger.Error("failed to fetch future jobs list", err)
return
}

var id int64
var runAfter time.Time
_, _ = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error {
_, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error {
p.mu.Lock()
p.futureJobs[id] = runAfter
p.mu.Unlock()
return nil
})

return
}

// scheduleFutureJobs announces future jobs using NOTIFY on an interval
func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
p.initFutureJobs(ctx, queue)
err := p.initFutureJobs(ctx, queue)
if err != nil {
return
}

// check for new future jobs on an interval
ticker := time.NewTicker(p.config.JobCheckInterval)
Expand Down Expand Up @@ -718,12 +717,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
for {
jobID, err := p.getPendingJobID(ctx, conn, queue)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error("failed to fetch pending job", err, "job_id", jobID)
} else {
// done fetching pending jobs
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
}

p.logger.Error("failed to fetch pending job", err, "job_id", jobID)
} else {
jobsCh <- jobID
}
Expand Down Expand Up @@ -774,6 +772,10 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle

err = p.updateJob(ctx, jobErr)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

err = fmt.Errorf("error updating job status: %w", err)
return err
}
Expand All @@ -792,43 +794,74 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64) {
var err error
c = make(chan int64)

// set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected
_, err = p.listenConn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to create database connection for listener: %w", err)
p.logger.Error("unablet o create database connection for listener", err)
return
}

go func(ctx context.Context) {
for {
notification, waitErr := p.listenConn.WaitForNotification(ctx)
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new connnection", err)
return
}

// set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected
_, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", err)
time.Sleep(time.Second) // don't hammer the db
p.release(ctx, conn, queue)
continue
}

notification, waitErr := conn.Conn().WaitForNotification(ctx)
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
p.release(ctx, conn, queue)
return
}

p.logger.Error("failed to wait for notification", waitErr)
time.Sleep(1 * time.Second)
p.release(ctx, conn, queue)
continue
}

var jobID int64
if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil {
p.logger.Error("unable to fetch job", err)
p.release(ctx, conn, queue)
continue
}

// check if Shutdown() has been called
if jobID == shutdownJobID {
p.release(ctx, conn, queue)
return
}

c <- jobID

p.release(ctx, conn, queue)
}
}(ctx)

return c
}

func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) {
query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %s", p.config.IdleTransactionTimeout, queue)
_, err := conn.Exec(ctx, query)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

p.logger.Error("unable to reset connection config before release", err)
}

conn.Release()
}

func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, PendingJobQuery, jobID)
if err != nil {
Expand Down
Loading

0 comments on commit eb430e9

Please sign in to comment.