Skip to content

Commit

Permalink
WIP: postgres done?
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Oct 27, 2024
1 parent bdb8a7b commit 4ae3a10
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 21 deletions.
61 changes: 40 additions & 21 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ const (
AND run_after <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobCountQuery = `SELECT COUNT(*)
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()`
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
Expand Down Expand Up @@ -98,9 +103,7 @@ type PgBackend struct {
logger logging.Logger // backend-wide logger
mu *sync.RWMutex // protects concurrent access to fields on PgBackend
pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs
successGauage metric.Int64Counter // opentelemetry gauge for successful jobs
failureGauge metric.Int64Counter // opentelemetry gauge for failed jobs
depthGauge metric.Int64Gauge // opentelemetry gauge for queue depth
metricPack neoq.MetricPack // a collection opentelemetry counters and gauges for telemetry reporting
}

// Backend initializes a new postgres-backed neoq backend
Expand Down Expand Up @@ -144,6 +147,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
mu: &sync.RWMutex{},
listenCancelCh: make(chan context.CancelFunc, 1),
listenConnDown: make(chan bool),
metricPack: neoq.MetricPack{},
}

// Set all options
Expand Down Expand Up @@ -194,21 +198,9 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err

// initialize otel meters for observability
if p.config.OpentelemetryMeterProvider != nil {
m := p.config.OpentelemetryMeterProvider.Meter("github.com/acaloiaro/neoq")
p.successGauage, err = m.Int64Counter("neoq.queue.success", metric.WithDescription("jobs that have succeeded"))
err = p.metricPack.Initialize(p.config.OpentelemetryMeterProvider)
if err != nil {
p.logger.Error("unable to initialize opentelemetry success metrics", slog.Any("error", err))
return nil, fmt.Errorf("unable to initialize opentelemetry success metrics: %w", err)
}
p.failureGauge, err = m.Int64Counter("neoq.queue.failure", metric.WithDescription("jobs that have failed"))
if err != nil {
p.logger.Error("unable to initialize opentelemetry failure metrics", slog.Any("error", err))
return nil, fmt.Errorf("unable to initialize opentelemetry failure metrics: %w", err)
}
p.depthGauge, err = m.Int64Gauge("neoq.queue.depth", metric.WithDescription("depth of the queue"))
if err != nil {
p.logger.Error("unable to initialize opentelemetry queue depth metrics", slog.Any("error", err))
return nil, fmt.Errorf("unable to initialize opentelemetry queue depth: %w", err)
p.logger.Error("unable to initialize open telemetry metrics", slog.Any("error", err))
}
}

Expand Down Expand Up @@ -483,6 +475,10 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
)
}

p.metricPack.DepthCounter.Add(ctx, 1, metric.WithAttributes(
attribute.Key("queue").String(job.Queue),
))

return jobID, nil
}

Expand Down Expand Up @@ -861,6 +857,24 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
return
}

pendingCount, err := p.countPendingJobs(ctx, conn, queue)
if err != nil {
p.logger.Error("failed to count pending jobs",
slog.String("queue", queue),
slog.Any("error", err),
)
return
}

p.logger.Debug("pending jobs", slog.Int64("count", pendingCount), slog.String("queue", queue))
if pendingCount == 0 {
return
}

p.metricPack.DepthCounter.Add(ctx, pendingCount, metric.WithAttributes(
attribute.Key("queue").String(queue),
))

go func(ctx context.Context) {
defer conn.Release()

Expand Down Expand Up @@ -910,12 +924,12 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
attribute.Key("queue").String(job.Queue),
)
// the job ended with an error, incrementing the failing counter
if jobErr != nil && p.failureGauge != nil {
p.failureGauge.Add(ctx, 1, telemetryAttrs)
if jobErr != nil && p.metricPack.FailureCounter != nil {
p.metricPack.DepthCounter.Add(ctx, 1, telemetryAttrs)
}
// the job ended in success, increment success metrics
if jobErr == nil && p.successGauage != nil {
p.successGauage.Add(ctx, 1, telemetryAttrs)
if jobErr == nil && p.metricPack.SuccessCounter != nil {
p.metricPack.SuccessCounter.Add(ctx, 1, telemetryAttrs)
}
}()

Expand Down Expand Up @@ -1078,6 +1092,11 @@ func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, que
return
}

func (p *PgBackend) countPendingJobs(ctx context.Context, conn *pgxpool.Conn, queue string) (count int64, err error) {
err = conn.QueryRow(ctx, PendingJobCountQuery, queue).Scan(&count)
return
}

// acquire acquires connections from the connection pool with a timeout
//
// the purpose of this function is to skirt pgxpool's default blocking behavior with connection acquisition preemption
Expand Down
28 changes: 28 additions & 0 deletions neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package neoq
import (
"context"
"errors"
"fmt"
"time"

"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
otel "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
)

Expand Down Expand Up @@ -45,6 +47,32 @@ type Config struct {
OpentelemetryMeterProvider *metric.MeterProvider
}

// MetricPack is a collection of opentelemetry counters/gauges for telemetry reporting
type MetricPack struct {
SuccessCounter otel.Int64Counter // opentelemetry successful job counter
FailureCounter otel.Int64Counter // opentelemetry failed job counter
DepthCounter otel.Int64Counter // opentelemetry queue depth counter
}

// Initialize initializes all meters and gauages
func (m *MetricPack) Initialize(mp *metric.MeterProvider) (err error) {
meter := mp.Meter("github.com/acaloiaro/neoq")
m.SuccessCounter, err = meter.Int64Counter("neoq.queue.success", otel.WithDescription("successful jobs"))
if err != nil {
return fmt.Errorf("unable to initialize opentelemetry success metrics: %w", err)
}
m.FailureCounter, err = meter.Int64Counter("neoq.queue.failure", otel.WithDescription("failed jobs"))
if err != nil {
return fmt.Errorf("unable to initialize opentelemetry failure metrics: %w", err)
}
m.DepthCounter, err = meter.Int64Counter("neoq.queue.depth", otel.WithDescription("queue depth"))
if err != nil {
return fmt.Errorf("unable to initialize opentelemetry queue depth: %w", err)
}

return
}

// ConfigOption is a function that sets optional backend configuration
type ConfigOption func(c *Config)

Expand Down

0 comments on commit 4ae3a10

Please sign in to comment.