Skip to content

Commit

Permalink
feat: Multiplex the listener connection
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Dec 5, 2023
1 parent 080a162 commit 3d26149
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 139 deletions.
240 changes: 127 additions & 113 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
"github.com/robfig/cron"
"golang.org/x/exp/slices"
"golang.org/x/exp/slog"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ const (
LIMIT 100
FOR UPDATE SKIP LOCKED`
setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0`
pgConnectionBusyRetries = 10 // the number of times to retry busy postgres connections, i.e. PgConn.IsBusy()
)

type contextKey struct{}
Expand All @@ -72,16 +74,19 @@ var (
// PgBackend is a Postgres-based Neoq backend
type PgBackend struct {
neoq.Neoq
cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown()
config *neoq.Config
cron *cron.Cron
futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record
handlers map[string]handler.Handler // a map of queue names to queue handlers
listenCancel context.CancelFunc // cancel function for the LISTEN loop
listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues
logger logging.Logger
mu *sync.RWMutex // mutex to protect mutating state on a pgWorker
pool *pgxpool.Pool
cancelFuncs []context.CancelFunc // cancel functions to be called upon Shutdown()
config *neoq.Config // backend configuration
cron *cron.Cron // scheduler for periodic jobs
futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record
handlers map[string]handler.Handler // a map of queue names to queue handlers
newQueues chan string // a channel that indicates that new queues are ready to be processed
readyQueues chan string // a channel that indicates which queues are ready to have jobs processed.
listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call.
listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues
listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access
logger logging.Logger // backend-wide logger
mu *sync.RWMutex // protects concurrent access to fields on PgBackend
pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs
}

// Backend initializes a new postgres-backed neoq backend
Expand Down Expand Up @@ -114,12 +119,16 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
cfg.PGConnectionTimeout = DefaultConnectionTimeout

p := &PgBackend{
mu: &sync.RWMutex{},
config: cfg,
handlers: make(map[string]handler.Handler),
futureJobs: make(map[string]*jobs.Job),
cron: cron.New(),
cancelFuncs: []context.CancelFunc{},
cancelFuncs: []context.CancelFunc{},
config: cfg,
cron: cron.New(),
futureJobs: make(map[string]*jobs.Job),
handlers: make(map[string]handler.Handler),
newQueues: make(chan string),
readyQueues: make(chan string),
listenerConnMu: &sync.RWMutex{},
mu: &sync.RWMutex{},
listenCancelCh: make(chan context.CancelFunc, 1),
}

// Set all options
Expand Down Expand Up @@ -170,29 +179,77 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err

p.listenerConn, err = p.newListenerConn(ctx)
if err != nil {
slog.Error("unable to initialize listener connection", slog.Any("error", err))
p.logger.Error("unable to initialize listener connection", slog.Any("error", err))
}

// monitor handlers for changes and LISTEN when new queues are added
go p.newQueueMonitor(ctx)

p.cron.Start()

pb = p

return pb, nil
}

// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them
func (p *PgBackend) newQueueMonitor(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case newQueue := <-p.newQueues:
p.logger.Debug("configure new handler", "queue", newQueue)
setup_listeners:
// drain p.listenCancelCh before setting up new listeners
select {
case cancelListener := <-p.listenCancelCh:
p.logger.Debug("canceling previous wait listeners", "queue", newQueue)
cancelListener()
goto setup_listeners
default:
}

p.listenerConnMu.Lock()
// note: 'LISTEN, channel' is idempotent
_, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
p.listenerConnMu.Unlock()
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("FATAL ERROR unable to listen for new jobs", slog.String("queue", newQueue), slog.Any("error", err))
return
}

p.logger.Debug("listening on queue", "queue", newQueue)
p.readyQueues <- newQueue
}
}
}

func (p *PgBackend) newListenerConn(ctx context.Context) (conn *pgx.Conn, err error) {
var pgxCfg *pgx.ConnConfig
pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString)
if err != nil {
return
}
pgxCfg.RuntimeParams = nil // TODO determine correct thing to do here, we don't want to wipe these out completely

// remove any pgxpool parameters before creating a new connection
customPgxParams := []string{
"pool_max_conns", "pool_min_conns",
"pool_max_conn_lifetime", "pool_max_conn_idle_time", "pool_health_check_period",
"pool_max_conn_lifetime_jitter",
}
for param := range pgxCfg.RuntimeParams {
if slices.Contains(customPgxParams, param) {
delete(pgxCfg.RuntimeParams, param)
}
}
conn, err = pgx.ConnectConfig(ctx, pgxCfg)
if err != nil {
slog.Error("unable to acquire listener connection", "error", err)
p.logger.Error("unable to acquire listener connection", slog.Any("error", err))
return
}
conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")
_, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")

return
}
Expand Down Expand Up @@ -381,6 +438,8 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
p.handlers[h.Queue] = h
p.mu.Unlock()

p.newQueues <- h.Queue

err = p.start(ctx, h)
if err != nil {
p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err))
Expand Down Expand Up @@ -573,22 +632,25 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
// nolint: cyclop
func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
var ok bool
var listenJobChan chan *pgconn.Notification
var errCh chan error

if h, ok = p.handlers[h.Queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

listenJobChan, ready, errCh := p.listen(ctx) // listen for 'new' jobs
defer close(ready)

pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

// wait for the listener to connect and be ready to listen
select {
case <-ready:
break
case err = <-errCh:
return
for q := range p.readyQueues {
if q == h.Queue {
listenJobChan, errCh = p.listen(ctx)
break
}

p.logger.Debug("Picked up a queue that a different start() will be waiting for. Adding back to ready list",
slog.String("queue", q))
p.readyQueues <- q
}

// process all future jobs and retries
Expand All @@ -597,16 +659,19 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
for i := 0; i < h.Concurrency; i++ {
go func() {
var err error
var jobID string
var n *pgconn.Notification

for {
select {
case jobID = <-listenJobChan:
err = p.handleJob(ctx, jobID)
case jobID = <-pendingJobsChan:
err = p.handleJob(ctx, jobID)
case n = <-listenJobChan:
err = p.handleJob(ctx, n.Payload)
case n = <-pendingJobsChan:
err = p.handleJob(ctx, n.Payload)
case <-ctx.Done():
return
case <-errCh:
p.logger.Error("error hanlding job", "error", err)
continue
}

if err != nil {
Expand All @@ -619,7 +684,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
"job failed",
slog.String("queue", h.Queue),
slog.Any("error", err),
slog.String("job_id", jobID),
slog.String("job_id", n.Payload),
)

continue
Expand Down Expand Up @@ -723,8 +788,8 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
}
}

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) {
jobsCh = make(chan string)
func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
jobsCh = make(chan *pgconn.Notification)

conn, err := p.acquire(ctx)
if err != nil {
Expand Down Expand Up @@ -753,7 +818,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
slog.String("job_id", jobID),
)
} else {
jobsCh <- jobID
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID}
}
}
}(ctx)
Expand All @@ -765,6 +830,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// it receives pending, periodic, and retry job ids asynchronously
// 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed.
// 2. handleJob secondly calls the handler on the job, and finally updates the job's status
// nolint: cyclop
func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
var job *jobs.Job
var tx pgx.Tx
Expand Down Expand Up @@ -803,7 +869,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
var jobErr error
h, ok := p.handlers[job.Queue]
if !ok {
slog.Error("received a job for which no handler is configured", "queue", job.Queue, "job_id", job.ID)
p.logger.Error("received a job for which no handler is configured",
slog.String("queue", job.Queue),
slog.Int64("job_id", job.ID))
return handler.ErrNoHandlerForQueue
}

Expand Down Expand Up @@ -833,77 +901,41 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
// TODO: There is currently no handling of listener disconnects in PgBackend.
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context) (c chan string, ready chan bool, errCh chan error) {
c = make(chan string)
ready = make(chan bool)
func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, errCh chan error) {
c = make(chan *pgconn.Notification)
errCh = make(chan error)

go func(ctx context.Context) {
// a previously running listen loop needs to be cancelled
if p.listenCancel != nil {
ctx, newCancel := context.WithCancel(ctx)
newConn, err := p.newListenerConn(ctx)
if err != nil {
errCh <- err
newCancel()
return
}
for queue := range p.handlers {
_, err := newConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection!", slog.String("queue", queue), slog.Any("error", err))
errCh <- err
newCancel()
return
}
}

p.listenCancel() // cancel previously running listen loop
p.mu.Lock()
p.listenerConn = newConn
p.listenCancel = newCancel
p.mu.Unlock()
} else {
p.mu.Lock()
ctx, p.listenCancel = context.WithCancel(ctx)
p.mu.Unlock()
for queue := range p.handlers {
// note: LISTEN is idempotent
_, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err))
errCh <- err
return
}

}
}

// notify start() that we're ready to listen for jobs
ready <- true

p.mu.Lock()
conn := p.listenerConn
p.mu.Unlock()
waitForNotificationCtx, cancel := context.WithCancel(ctx)
p.listenCancelCh <- cancel

go func(ctx context.Context) {
var notification *pgconn.Notification
var waitErr error
for {
select {
case <-ctx.Done():
// our context has been canceled, the system is shutting down
return
default:
notification, waitErr = conn.WaitForNotification(ctx)
p.listenerConnMu.Lock()
notification, waitErr = p.listenerConn.WaitForNotification(waitForNotificationCtx)
p.listenerConnMu.Unlock()
}
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
// this is likely not a system shutdown, but an interrupt from the goroutine that manages changes to
// the list of handlers. It needs the connection to be unbusy so that it can instruct the connection
// to start listening on any new queues
p.logger.Debug("Stopping notifications processing")
return
}

if conn.PgConn().IsBusy() {
time.Sleep(500 * time.Millisecond)
// The connection is busy adding new LISTENers
if p.listenerConn.PgConn().IsBusy() {
p.logger.Debug("listen connection is busy, trying to acquire listener connection again...")
waitForNotificationCtx, cancel = context.WithCancel(ctx)
p.listenCancelCh <- cancel
continue
}

p.logger.Error("failed to wait for notification", slog.Any("error", waitErr))
Expand All @@ -921,29 +953,11 @@ func (p *PgBackend) listen(ctx context.Context) (c chan string, ready chan bool,
return
}

c <- notification.Payload
c <- notification
}
}(ctx)

return c, ready, errCh
}

func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) {
query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %q", p.config.IdleTransactionTimeout, queue)
_, err := conn.Exec(ctx, query)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

p.logger.Error(
"unable to reset connection config before release",
slog.String("queue", queue),
slog.Any("error", err),
)
}

conn.Release()
return c, errCh
}

func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
Expand Down
Loading

0 comments on commit 3d26149

Please sign in to comment.