diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 5a579d8..7bb0652 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -52,6 +52,11 @@ const ( AND run_after <= NOW() FOR UPDATE SKIP LOCKED LIMIT 1` + PendingJobCountQuery = `SELECT COUNT(*) + FROM neoq_jobs + WHERE queue = $1 + AND status NOT IN ('processed') + AND run_after <= NOW()` FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error FROM neoq_jobs WHERE queue = $1 @@ -98,9 +103,7 @@ type PgBackend struct { logger logging.Logger // backend-wide logger mu *sync.RWMutex // protects concurrent access to fields on PgBackend pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs - successGauage metric.Int64Counter // opentelemetry gauge for successful jobs - failureGauge metric.Int64Counter // opentelemetry gauge for failed jobs - depthGauge metric.Int64Gauge // opentelemetry gauge for queue depth + metricPack neoq.MetricPack // a collection opentelemetry counters and gauges for telemetry reporting } // Backend initializes a new postgres-backed neoq backend @@ -144,6 +147,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err mu: &sync.RWMutex{}, listenCancelCh: make(chan context.CancelFunc, 1), listenConnDown: make(chan bool), + metricPack: neoq.MetricPack{}, } // Set all options @@ -194,21 +198,9 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err // initialize otel meters for observability if p.config.OpentelemetryMeterProvider != nil { - m := p.config.OpentelemetryMeterProvider.Meter("github.com/acaloiaro/neoq") - p.successGauage, err = m.Int64Counter("neoq.queue.success", metric.WithDescription("jobs that have succeeded")) + err = p.metricPack.Initialize(p.config.OpentelemetryMeterProvider) if err != nil { - p.logger.Error("unable to initialize opentelemetry success metrics", slog.Any("error", err)) - return nil, fmt.Errorf("unable to initialize opentelemetry success metrics: %w", err) - } - p.failureGauge, err = m.Int64Counter("neoq.queue.failure", metric.WithDescription("jobs that have failed")) - if err != nil { - p.logger.Error("unable to initialize opentelemetry failure metrics", slog.Any("error", err)) - return nil, fmt.Errorf("unable to initialize opentelemetry failure metrics: %w", err) - } - p.depthGauge, err = m.Int64Gauge("neoq.queue.depth", metric.WithDescription("depth of the queue")) - if err != nil { - p.logger.Error("unable to initialize opentelemetry queue depth metrics", slog.Any("error", err)) - return nil, fmt.Errorf("unable to initialize opentelemetry queue depth: %w", err) + p.logger.Error("unable to initialize open telemetry metrics", slog.Any("error", err)) } } @@ -483,6 +475,10 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e ) } + p.metricPack.DepthCounter.Add(ctx, 1, metric.WithAttributes( + attribute.Key("queue").String(job.Queue), + )) + return jobID, nil } @@ -861,6 +857,24 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan return } + pendingCount, err := p.countPendingJobs(ctx, conn, queue) + if err != nil { + p.logger.Error("failed to count pending jobs", + slog.String("queue", queue), + slog.Any("error", err), + ) + return + } + + p.logger.Debug("pending jobs", slog.Int64("count", pendingCount), slog.String("queue", queue)) + if pendingCount == 0 { + return + } + + p.metricPack.DepthCounter.Add(ctx, pendingCount, metric.WithAttributes( + attribute.Key("queue").String(queue), + )) + go func(ctx context.Context) { defer conn.Release() @@ -910,12 +924,12 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { attribute.Key("queue").String(job.Queue), ) // the job ended with an error, incrementing the failing counter - if jobErr != nil && p.failureGauge != nil { - p.failureGauge.Add(ctx, 1, telemetryAttrs) + if jobErr != nil && p.metricPack.FailureCounter != nil { + p.metricPack.DepthCounter.Add(ctx, 1, telemetryAttrs) } // the job ended in success, increment success metrics - if jobErr == nil && p.successGauage != nil { - p.successGauage.Add(ctx, 1, telemetryAttrs) + if jobErr == nil && p.metricPack.SuccessCounter != nil { + p.metricPack.SuccessCounter.Add(ctx, 1, telemetryAttrs) } }() @@ -1078,6 +1092,11 @@ func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, que return } +func (p *PgBackend) countPendingJobs(ctx context.Context, conn *pgxpool.Conn, queue string) (count int64, err error) { + err = conn.QueryRow(ctx, PendingJobCountQuery, queue).Scan(&count) + return +} + // acquire acquires connections from the connection pool with a timeout // // the purpose of this function is to skirt pgxpool's default blocking behavior with connection acquisition preemption diff --git a/neoq.go b/neoq.go index 5d55a55..3559859 100644 --- a/neoq.go +++ b/neoq.go @@ -3,11 +3,13 @@ package neoq import ( "context" "errors" + "fmt" "time" "github.com/acaloiaro/neoq/handler" "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" + otel "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" ) @@ -45,6 +47,32 @@ type Config struct { OpentelemetryMeterProvider *metric.MeterProvider } +// MetricPack is a collection of opentelemetry counters/gauges for telemetry reporting +type MetricPack struct { + SuccessCounter otel.Int64Counter // opentelemetry successful job counter + FailureCounter otel.Int64Counter // opentelemetry failed job counter + DepthCounter otel.Int64Counter // opentelemetry queue depth counter +} + +// Initialize initializes all meters and gauages +func (m *MetricPack) Initialize(mp *metric.MeterProvider) (err error) { + meter := mp.Meter("github.com/acaloiaro/neoq") + m.SuccessCounter, err = meter.Int64Counter("neoq.queue.success", otel.WithDescription("successful jobs")) + if err != nil { + return fmt.Errorf("unable to initialize opentelemetry success metrics: %w", err) + } + m.FailureCounter, err = meter.Int64Counter("neoq.queue.failure", otel.WithDescription("failed jobs")) + if err != nil { + return fmt.Errorf("unable to initialize opentelemetry failure metrics: %w", err) + } + m.DepthCounter, err = meter.Int64Counter("neoq.queue.depth", otel.WithDescription("queue depth")) + if err != nil { + return fmt.Errorf("unable to initialize opentelemetry queue depth: %w", err) + } + + return +} + // ConfigOption is a function that sets optional backend configuration type ConfigOption func(c *Config)