From 3d261494a7f69913b76e39da0c65be9ffe7da958 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sun, 3 Dec 2023 17:30:22 -0700 Subject: [PATCH] feat: Multiplex the listener connection --- backends/postgres/postgres_backend.go | 240 +++++++++++---------- backends/postgres/postgres_backend_test.go | 42 ++-- 2 files changed, 143 insertions(+), 139 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 42b84bf..42c42ae 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -24,6 +24,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" "github.com/robfig/cron" + "golang.org/x/exp/slices" "golang.org/x/exp/slog" ) @@ -53,6 +54,7 @@ const ( LIMIT 100 FOR UPDATE SKIP LOCKED` setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0` + pgConnectionBusyRetries = 10 // the number of times to retry busy postgres connections, i.e. PgConn.IsBusy() ) type contextKey struct{} @@ -72,16 +74,19 @@ var ( // PgBackend is a Postgres-based Neoq backend type PgBackend struct { neoq.Neoq - cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() - config *neoq.Config - cron *cron.Cron - futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record - handlers map[string]handler.Handler // a map of queue names to queue handlers - listenCancel context.CancelFunc // cancel function for the LISTEN loop - listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues - logger logging.Logger - mu *sync.RWMutex // mutex to protect mutating state on a pgWorker - pool *pgxpool.Pool + cancelFuncs []context.CancelFunc // cancel functions to be called upon Shutdown() + config *neoq.Config // backend configuration + cron *cron.Cron // scheduler for periodic jobs + futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record + handlers map[string]handler.Handler // a map of queue names to queue handlers + newQueues chan string // a channel that indicates that new queues are ready to be processed + readyQueues chan string // a channel that indicates which queues are ready to have jobs processed. + listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call. + listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues + listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access + logger logging.Logger // backend-wide logger + mu *sync.RWMutex // protects concurrent access to fields on PgBackend + pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs } // Backend initializes a new postgres-backed neoq backend @@ -114,12 +119,16 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err cfg.PGConnectionTimeout = DefaultConnectionTimeout p := &PgBackend{ - mu: &sync.RWMutex{}, - config: cfg, - handlers: make(map[string]handler.Handler), - futureJobs: make(map[string]*jobs.Job), - cron: cron.New(), - cancelFuncs: []context.CancelFunc{}, + cancelFuncs: []context.CancelFunc{}, + config: cfg, + cron: cron.New(), + futureJobs: make(map[string]*jobs.Job), + handlers: make(map[string]handler.Handler), + newQueues: make(chan string), + readyQueues: make(chan string), + listenerConnMu: &sync.RWMutex{}, + mu: &sync.RWMutex{}, + listenCancelCh: make(chan context.CancelFunc, 1), } // Set all options @@ -170,9 +179,12 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err p.listenerConn, err = p.newListenerConn(ctx) if err != nil { - slog.Error("unable to initialize listener connection", slog.Any("error", err)) + p.logger.Error("unable to initialize listener connection", slog.Any("error", err)) } + // monitor handlers for changes and LISTEN when new queues are added + go p.newQueueMonitor(ctx) + p.cron.Start() pb = p @@ -180,19 +192,64 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err return pb, nil } +// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them +func (p *PgBackend) newQueueMonitor(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case newQueue := <-p.newQueues: + p.logger.Debug("configure new handler", "queue", newQueue) + setup_listeners: + // drain p.listenCancelCh before setting up new listeners + select { + case cancelListener := <-p.listenCancelCh: + p.logger.Debug("canceling previous wait listeners", "queue", newQueue) + cancelListener() + goto setup_listeners + default: + } + + p.listenerConnMu.Lock() + // note: 'LISTEN, channel' is idempotent + _, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue)) + p.listenerConnMu.Unlock() + if err != nil { + err = fmt.Errorf("unable to configure listener connection: %w", err) + p.logger.Error("FATAL ERROR unable to listen for new jobs", slog.String("queue", newQueue), slog.Any("error", err)) + return + } + + p.logger.Debug("listening on queue", "queue", newQueue) + p.readyQueues <- newQueue + } + } +} + func (p *PgBackend) newListenerConn(ctx context.Context) (conn *pgx.Conn, err error) { var pgxCfg *pgx.ConnConfig pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString) if err != nil { return } - pgxCfg.RuntimeParams = nil // TODO determine correct thing to do here, we don't want to wipe these out completely + + // remove any pgxpool parameters before creating a new connection + customPgxParams := []string{ + "pool_max_conns", "pool_min_conns", + "pool_max_conn_lifetime", "pool_max_conn_idle_time", "pool_health_check_period", + "pool_max_conn_lifetime_jitter", + } + for param := range pgxCfg.RuntimeParams { + if slices.Contains(customPgxParams, param) { + delete(pgxCfg.RuntimeParams, param) + } + } conn, err = pgx.ConnectConfig(ctx, pgxCfg) if err != nil { - slog.Error("unable to acquire listener connection", "error", err) + p.logger.Error("unable to acquire listener connection", slog.Any("error", err)) return } - conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0") + _, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0") return } @@ -381,6 +438,8 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { p.handlers[h.Queue] = h p.mu.Unlock() + p.newQueues <- h.Queue + err = p.start(ctx, h) if err != nil { p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err)) @@ -573,22 +632,25 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { // nolint: cyclop func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { var ok bool + var listenJobChan chan *pgconn.Notification + var errCh chan error if h, ok = p.handlers[h.Queue]; !ok { return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue) } - listenJobChan, ready, errCh := p.listen(ctx) // listen for 'new' jobs - defer close(ready) - pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup* // wait for the listener to connect and be ready to listen - select { - case <-ready: - break - case err = <-errCh: - return + for q := range p.readyQueues { + if q == h.Queue { + listenJobChan, errCh = p.listen(ctx) + break + } + + p.logger.Debug("Picked up a queue that a different start() will be waiting for. Adding back to ready list", + slog.String("queue", q)) + p.readyQueues <- q } // process all future jobs and retries @@ -597,16 +659,19 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { for i := 0; i < h.Concurrency; i++ { go func() { var err error - var jobID string + var n *pgconn.Notification for { select { - case jobID = <-listenJobChan: - err = p.handleJob(ctx, jobID) - case jobID = <-pendingJobsChan: - err = p.handleJob(ctx, jobID) + case n = <-listenJobChan: + err = p.handleJob(ctx, n.Payload) + case n = <-pendingJobsChan: + err = p.handleJob(ctx, n.Payload) case <-ctx.Done(): return + case <-errCh: + p.logger.Error("error hanlding job", "error", err) + continue } if err != nil { @@ -619,7 +684,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { "job failed", slog.String("queue", h.Queue), slog.Any("error", err), - slog.String("job_id", jobID), + slog.String("job_id", n.Payload), ) continue @@ -723,8 +788,8 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { } } -func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) { - jobsCh = make(chan string) +func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) { + jobsCh = make(chan *pgconn.Notification) conn, err := p.acquire(ctx) if err != nil { @@ -753,7 +818,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan slog.String("job_id", jobID), ) } else { - jobsCh <- jobID + jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID} } } }(ctx) @@ -765,6 +830,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan // it receives pending, periodic, and retry job ids asynchronously // 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed. // 2. handleJob secondly calls the handler on the job, and finally updates the job's status +// nolint: cyclop func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { var job *jobs.Job var tx pgx.Tx @@ -803,7 +869,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { var jobErr error h, ok := p.handlers[job.Queue] if !ok { - slog.Error("received a job for which no handler is configured", "queue", job.Queue, "job_id", job.ID) + p.logger.Error("received a job for which no handler is configured", + slog.String("queue", job.Queue), + slog.Int64("job_id", job.ID)) return handler.ErrNoHandlerForQueue } @@ -833,77 +901,41 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { // TODO: There is currently no handling of listener disconnects in PgBackend. // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. -func (p *PgBackend) listen(ctx context.Context) (c chan string, ready chan bool, errCh chan error) { - c = make(chan string) - ready = make(chan bool) +func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, errCh chan error) { + c = make(chan *pgconn.Notification) errCh = make(chan error) - go func(ctx context.Context) { - // a previously running listen loop needs to be cancelled - if p.listenCancel != nil { - ctx, newCancel := context.WithCancel(ctx) - newConn, err := p.newListenerConn(ctx) - if err != nil { - errCh <- err - newCancel() - return - } - for queue := range p.handlers { - _, err := newConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue)) - if err != nil { - err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection!", slog.String("queue", queue), slog.Any("error", err)) - errCh <- err - newCancel() - return - } - } - - p.listenCancel() // cancel previously running listen loop - p.mu.Lock() - p.listenerConn = newConn - p.listenCancel = newCancel - p.mu.Unlock() - } else { - p.mu.Lock() - ctx, p.listenCancel = context.WithCancel(ctx) - p.mu.Unlock() - for queue := range p.handlers { - // note: LISTEN is idempotent - _, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue)) - if err != nil { - err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err)) - errCh <- err - return - } - - } - } - - // notify start() that we're ready to listen for jobs - ready <- true - - p.mu.Lock() - conn := p.listenerConn - p.mu.Unlock() + waitForNotificationCtx, cancel := context.WithCancel(ctx) + p.listenCancelCh <- cancel + go func(ctx context.Context) { var notification *pgconn.Notification var waitErr error for { select { case <-ctx.Done(): + // our context has been canceled, the system is shutting down return default: - notification, waitErr = conn.WaitForNotification(ctx) + p.listenerConnMu.Lock() + notification, waitErr = p.listenerConn.WaitForNotification(waitForNotificationCtx) + p.listenerConnMu.Unlock() } if waitErr != nil { if errors.Is(waitErr, context.Canceled) { + // this is likely not a system shutdown, but an interrupt from the goroutine that manages changes to + // the list of handlers. It needs the connection to be unbusy so that it can instruct the connection + // to start listening on any new queues + p.logger.Debug("Stopping notifications processing") return } - if conn.PgConn().IsBusy() { - time.Sleep(500 * time.Millisecond) + // The connection is busy adding new LISTENers + if p.listenerConn.PgConn().IsBusy() { + p.logger.Debug("listen connection is busy, trying to acquire listener connection again...") + waitForNotificationCtx, cancel = context.WithCancel(ctx) + p.listenCancelCh <- cancel + continue } p.logger.Error("failed to wait for notification", slog.Any("error", waitErr)) @@ -921,29 +953,11 @@ func (p *PgBackend) listen(ctx context.Context) (c chan string, ready chan bool, return } - c <- notification.Payload + c <- notification } }(ctx) - return c, ready, errCh -} - -func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { - query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %q", p.config.IdleTransactionTimeout, queue) - _, err := conn.Exec(ctx, query) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - - p.logger.Error( - "unable to reset connection config before release", - slog.String("queue", queue), - slog.Any("error", err), - ) - } - - conn.Release() + return c, errCh } func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 22492f7..c41aef4 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -4,9 +4,7 @@ import ( "context" "errors" "fmt" - "log" "os" - "regexp" "strings" "sync" "sync/atomic" @@ -155,11 +153,19 @@ func TestBasicJobProcessing(t *testing.T) { func TestMultipleProcessors(t *testing.T) { const queue = "testing" + var execCount uint32 + var wg sync.WaitGroup connString, _ := prepareAndCleanupDB(t) - var execCount uint32 - var wg sync.WaitGroup + h := handler.New(queue, func(_ context.Context) (err error) { + atomic.AddUint32(&execCount, 1) + wg.Done() + return + }) + // Make sure that each neoq worker only works on one thing at a time. + h.Concurrency = 1 + neos := make([]neoq.Neoq, 0, ConcurrentWorkers) // Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different // workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers. @@ -173,17 +179,6 @@ func TestMultipleProcessors(t *testing.T) { nq.Shutdown(ctx) }) - h := handler.New(queue, func(_ context.Context) (err error) { - // Make sure that by wasting some time working on a thing we don't consume two jobs back to back. - // This should give the other neoq clients enough time to grab a job as well. - time.Sleep(500 * time.Millisecond) - atomic.AddUint32(&execCount, 1) - wg.Done() - return - }) - // Make sure that each neoq worker only works on one thing at a time. - h.Concurrency = 1 - err = nq.Start(ctx, h) if err != nil { t.Error(err) @@ -271,7 +266,12 @@ func TestBasicJobMultipleQueue(t *testing.T) { connString, _ := prepareAndCleanupDB(t) ctx := context.TODO() - nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + nq, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + neoq.WithLogLevel(logging.LogLevelDebug), + postgres.WithConnectionTimeout(1*time.Second), + ) if err != nil { t.Fatal(err) } @@ -292,7 +292,6 @@ func TestBasicJobMultipleQueue(t *testing.T) { t.Error(err) } - time.Sleep(500 * time.Millisecond) err = nq.Start(ctx, h2) if err != nil { t.Error(err) @@ -776,12 +775,3 @@ func Test_ConnectionTimeout(t *testing.T) { t.Error(err) } } - -func maxConnsDBUrl(maxConns int) (dbURL string) { - dbURL = os.Getenv("TEST_DATABASE_URL") - r := regexp.MustCompile(`pool_max_conns=\d+`) - dbURL = string(r.ReplaceAll([]byte(dbURL), []byte(fmt.Sprintf("pool_max_conns=%d", maxConns)))) - - log.Println("URL", dbURL) - return -}