From aaed37eeb575dbe51377acea78e6c901b4bfc2d1 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 14 Apr 2023 19:11:22 -0600 Subject: [PATCH] Add redis support (#44) --- .github/workflows/test.yml | 8 + .gitignore | 6 + README.md | 45 ++- backends/memory/memory_backend.go | 4 +- backends/memory/memory_backend_test.go | 6 +- backends/postgres/postgres_backend.go | 60 ++-- backends/postgres/postgres_backend_test.go | 2 +- backends/redis/redis_backend.go | 312 ++++++++++++++++++ backends/redis/redis_backend_test.go | 310 +++++++++++++++++ config/config.go | 8 + doc.go | 15 +- env.sample | 1 + .../add_job_with_custom_concurrency/main.go | 4 +- .../main.go | 8 +- examples/add_periodic_jobs/main.go | 2 +- examples/add_redis_job/main.go | 39 +++ go.mod | 12 + go.sum | 126 ++++++- handler/handler.go | 34 +- jobs/jobs.go | 4 +- neoq_test.go | 6 +- types/types.go | 2 +- 22 files changed, 933 insertions(+), 81 deletions(-) create mode 100644 backends/redis/redis_backend.go create mode 100644 backends/redis/redis_backend_test.go rename examples/{add_job_with_deadline => add_job_with_timeout}/main.go (89%) create mode 100644 examples/add_redis_job/main.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e9c0ba3..a10382a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,6 +20,13 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 5 + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 container: golang:1.20 @@ -29,6 +36,7 @@ jobs: run: make mod test coverage env: TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres + TEST_REDIS_URL: redis:6379 - name: upload results uses: actions/upload-artifact@v3 with: diff --git a/.gitignore b/.gitignore index 42fd900..87e57dc 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,11 @@ vendor/ # Some tests require environment variables to be set .env +# it's handy to have a docker-compose available during development +docker-compose.yml + +.pre-commit-config.yaml dist/ + + diff --git a/README.md b/README.md index d16af57..945e431 100644 --- a/README.md +++ b/README.md @@ -10,19 +10,24 @@ Background job processing for Go # About -Neoq is a background job framework for Go applications. Its purpose is to minimize the infrastructure necessary to run production applications. It does so by implementing queue durability with modular backends. +Neoq is a queue-agnostic background job framework for Go. -This allows application to use the same type of data store for both application data and backround job processing. At the moment an in-memory and Postgres backends are provided. However, the goal is to have backends for every major datastore: Postgres, Redis, MySQL, etc. +Neoq job handlers are the same, whether queues are in-memory for development/testing, or Postgres, Redis, or a custom queue for production -- allowing queue infrastructure to change without code change. -Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a minimal infrastructure footprint. +Developing/testing or don't need a durable queue? Use the in-memory queue. + +Running an application in production? Use Postgres. + +Have higher throughput demands in production? Use Redis. + +Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a _minimal infrastructure footprint_. # What it does -- **Background job Processing**: Neoq has an in-memory and Postgres backend out of the box. Users may supply their own without changing neoq directly. +- **Multiple Backends**: In-memory, Postgres, Redis, or user-supplied custom backends. - **Retries**: Jobs may be retried a configurable number of times with exponential backoff and jitter to prevent thundering herds -- **Job uniqueness**: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple unprocessed jobs with the same payload cannot be queued) -- **Deadlines**: Queue handlers can be configured with per-job time deadlines with millisecond accuracy -- **Configurable transaction idle time**: Don't let your background worker transactions run away with db resources. By default, worker transactions may idle for 60 seconds. +- **Job uniqueness**: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple jobs with the same payload are not re-queued) +- **Job Timeouts**: Queue handlers can be configured with per-job timeouts with millisecond accuracy - **Periodic Jobs**: Jobs can be scheduled periodically using standard cron syntax - **Future Jobs**: Jobs can be scheduled either for the future or immediate execution - **Concurrency**: Concurrency is configurable for every queue @@ -70,6 +75,32 @@ nq.Enqueue(ctx, &jobs.Job{ }) ``` +## Redis + +**Example**: Process jobs on the "hello_world" queue and add a job to it using the redis backend + +```go +ctx := context.Background() +nq, _ := neoq.New(ctx, + neoq.WithBackend(redis.Backend), + redis.WithAddr("localhost:6379"), + redis.WithPassword(""), +) + +nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + j, _ := jobs.FromContext(ctx) + log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) + return +})) + +nq.Enqueue(ctx, &jobs.Job{ + Queue: "hello_world", + Payload: map[string]interface{}{ + "message": "hello world", + }, +}) +``` + ## Postgres **Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index c47b440..27bc010 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -67,7 +67,7 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, } // Enqueue queues jobs to be executed asynchronously -func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) { +func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { var queueChan chan *jobs.Job var qc any var ok bool @@ -107,7 +107,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e m.mu.Unlock() job.ID = m.jobCount - jobID = m.jobCount + jobID = fmt.Sprint(m.jobCount) if job.RunAfter.Equal(now) { queueChan <- job diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index 8c7d17c..33aa4b4 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "strconv" "strings" "sync" "testing" @@ -186,8 +187,9 @@ func TestFutureJobScheduling(t *testing.T) { t.Error(err) } + jobID, _ := strconv.ParseInt(jid, 0, 64) var ok bool - if _, ok = testFutureJobs.Load(jid); !ok { + if _, ok = testFutureJobs.Load(jobID); !ok { t.Error(err) } } @@ -208,7 +210,7 @@ func TestCron(t *testing.T) { }) h.WithOptions( - handler.Deadline(500*time.Millisecond), + handler.JobTimeout(500*time.Millisecond), handler.Concurrency(1), ) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index c52da3e..4fc0008 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "strconv" "sync" "time" @@ -55,11 +54,11 @@ type contextKey struct{} var ( txCtxVarKey contextKey - shutdownJobID int64 = -1 // job ID announced when triggering a shutdown - shutdownAnnouncementAllowance = 100 // ms - ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") - ErrDuplicateJobID = errors.New("duplicate job id") - ErrNoTransactionInContext = errors.New("context does not have a Tx set") + shutdownJobID = "-1" // job ID announced when triggering a shutdown + shutdownAnnouncementAllowance = 100 // ms + ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") + ErrDuplicateJobID = errors.New("duplicate job id") + ErrNoTransactionInContext = errors.New("context does not have a Tx set") ) // PgBackend is a Postgres-based Neoq backend @@ -70,7 +69,7 @@ type PgBackend struct { cron *cron.Cron mu *sync.Mutex // mutex to protect mutating state on a pgWorker pool *pgxpool.Pool - futureJobs map[int64]time.Time // map of future job IDs to their due time + futureJobs map[string]time.Time // map of future job IDs to their due time handlers map[string]handler.Handler // a map of queue names to queue handlers cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() } @@ -104,7 +103,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err mu: &sync.Mutex{}, config: config.New(), handlers: make(map[string]handler.Handler), - futureJobs: make(map[int64]time.Time), + futureJobs: make(map[string]time.Time), logger: slog.New(slog.NewTextHandler(os.Stdout)), cron: cron.New(), cancelFuncs: []context.CancelFunc{}, @@ -321,7 +320,7 @@ func (p *PgBackend) initializeDB(ctx context.Context) (err error) { } // Enqueue adds jobs to the specified queue -func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) { +func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { ctx, cancel := context.WithCancel(ctx) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) @@ -359,6 +358,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, er if err != nil { err = fmt.Errorf("error enqueuing job: %w", err) } + if jobID == jobs.DuplicateJobID { err = ErrDuplicateJobID return @@ -366,7 +366,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, er // notify listeners that a new job has arrived if it's not a future job if job.RunAfter.Equal(now) { - _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", job.Queue, jobID)) + _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", job.Queue, jobID)) if err != nil { err = fmt.Errorf("error executing transaction: %w", err) } @@ -465,7 +465,7 @@ func (p *PgBackend) Shutdown(ctx context.Context) { // // Jobs that are not already fingerprinted are fingerprinted before being added // Duplicate jobs are not added to the queue. Any two unprocessed jobs with the same fingerprint are duplicates -func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID int64, err error) { +func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID string, err error) { err = jobs.FingerprintJob(j) if err != nil { return @@ -569,7 +569,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { if time.Until(runAfter) > 0 { p.mu.Lock() - p.futureJobs[job.ID] = runAfter + p.futureJobs[fmt.Sprint(job.ID)] = runAfter p.mu.Unlock() } @@ -600,7 +600,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { for i := 0; i < h.Concurrency; i++ { go func() { var err error - var jobID int64 + var jobID string for { select { @@ -630,7 +630,7 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { } // removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future -func (p *PgBackend) removeFutureJob(jobID int64) { +func (p *PgBackend) removeFutureJob(jobID string) { if _, ok := p.futureJobs[jobID]; ok { p.mu.Lock() delete(p.futureJobs, jobID) @@ -647,7 +647,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error return } - var id int64 + var id string var runAfter time.Time _, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { p.mu.Lock() @@ -675,7 +675,7 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { timeUntillRunAfter := time.Until(runAfter) if timeUntillRunAfter <= p.config.FutureJobWindow { p.removeFutureJob(jobID) - go func(jid int64) { + go func(jid string) { scheduleCh := time.After(timeUntillRunAfter) <-scheduleCh p.announceJob(ctx, queue, jid) @@ -695,7 +695,7 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { // announceJob announces jobs to queue listeners. // // Announced jobs are executed by the first worker to respond to the announcement. -func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64) { +func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { conn, err := p.pool.Acquire(ctx) if err != nil { return @@ -712,7 +712,7 @@ func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64) defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // notify listeners that a job is ready to run - _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", queue, jobID)) + _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", queue, jobID)) if err != nil { return } @@ -723,8 +723,8 @@ func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64) } } -func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan int64) { - jobsCh = make(chan int64) +func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) { + jobsCh = make(chan string) conn, err := p.pool.Acquire(ctx) if err != nil { @@ -756,7 +756,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan // it receives pending, periodic, and retry job ids asynchronously // 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed. // 2. handleJob secondly calls the handler on the job, and finally updates the job's status -func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handler) (err error) { +func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) { var job *jobs.Job var tx pgx.Tx conn, err := p.pool.Acquire(ctx) @@ -815,8 +815,8 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle // TODO: There is currently no handling of listener disconnects in PgBackend. // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. -func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, ready chan bool) { - c = make(chan int64) +func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, ready chan bool) { + c = make(chan string) ready = make(chan bool) go func(ctx context.Context) { @@ -849,18 +849,12 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64, rea continue } - var jobID int64 - if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil { - p.logger.Error("unable to fetch job", err) - continue - } - // check if Shutdown() has been called - if jobID == shutdownJobID { + if notification.Payload == shutdownJobID { return } - c <- jobID + c <- notification.Payload } }(ctx) @@ -881,7 +875,7 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin conn.Release() } -func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job *jobs.Job, err error) { +func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { row, err := tx.Query(ctx, PendingJobQuery, jobID) if err != nil { return @@ -895,7 +889,7 @@ func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) ( return } -func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID int64, err error) { +func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) { err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID) return } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index cce01a0..2d2dc2a 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -174,7 +174,7 @@ func TestCron(t *testing.T) { }) h.WithOptions( - handler.Deadline(500*time.Millisecond), + handler.JobTimeout(500*time.Millisecond), handler.Concurrency(1), ) diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go new file mode 100644 index 0000000..baefda1 --- /dev/null +++ b/backends/redis/redis_backend.go @@ -0,0 +1,312 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "runtime" + "strings" + "sync" + "time" + + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/internal" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/logging" + "github.com/acaloiaro/neoq/types" + "github.com/hibiken/asynq" + "github.com/iancoleman/strcase" + "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" + "golang.org/x/exp/slog" +) + +var ( + // ErrInvalidAddr indicates that the provided address is not a valid redis connection string + ErrInvalidAddr = errors.New("invalid connecton string: see documentation for valid connection strings") +) + +// redisBackend is a Redis-backed neoq backend +type redisBackend struct { + types.Backend + client *asynq.Client + server *asynq.Server + mux *asynq.ServeMux + config *config.Config + logger logging.Logger + mu *sync.Mutex // mutext to protect mutating backend state + taskProvider *MemoryTaskConfigProvider + mgr *asynq.PeriodicTaskManager +} + +type MemoryTaskConfigProvider struct { + mu *sync.Mutex + configs []*asynq.PeriodicTaskConfig +} + +// NewMemoryTaskConfigProvider returns a new asynq MemoryTaskConfigProvider +func NewMemoryTaskConfigProvider() (p *MemoryTaskConfigProvider) { + p = &MemoryTaskConfigProvider{ + mu: &sync.Mutex{}, + configs: []*asynq.PeriodicTaskConfig{}, + } + return +} + +// GetConfigs returns this provider's periodic task configurations +func (m *MemoryTaskConfigProvider) GetConfigs() (c []*asynq.PeriodicTaskConfig, err error) { + m.mu.Lock() + cfgs := m.configs + m.mu.Unlock() + return cfgs, nil +} + +// addConfig adds a periodic task configuration to this provider's configs +func (m *MemoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfig) { + m.mu.Lock() + m.configs = append(m.configs, taskConfig) + m.mu.Unlock() +} + +// Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend +func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { + b := &redisBackend{ + config: config.New(), + mu: &sync.Mutex{}, + logger: slog.New(slog.NewTextHandler(os.Stdout)), + taskProvider: NewMemoryTaskConfigProvider(), + } + + for _, opt := range opts { + opt(b.config) + } + + if b.config.ConnectionString == "" { + err = ErrInvalidAddr + return + } + + if b.config.BackendConcurrency <= 0 { + b.config.BackendConcurrency = runtime.NumCPU() + } + + clientOpt := asynq.RedisClientOpt{Addr: b.config.ConnectionString} + if b.config.BackendAuthPassword != "" { + clientOpt.Password = b.config.BackendAuthPassword + } + b.client = asynq.NewClient(clientOpt) + b.server = asynq.NewServer( + clientOpt, + asynq.Config{ + Concurrency: b.config.BackendConcurrency, + ShutdownTimeout: b.config.ShutdownTimeout, + }, + ) + + b.mux = asynq.NewServeMux() + + b.mgr, err = asynq.NewPeriodicTaskManager( + asynq.PeriodicTaskManagerOpts{ + RedisConnOpt: clientOpt, + PeriodicTaskConfigProvider: b.taskProvider, + SyncInterval: 500 * time.Millisecond, + SchedulerOpts: &asynq.SchedulerOpts{ + PostEnqueueFunc: func(info *asynq.TaskInfo, err error) { + if err != nil { + b.logger.Error("unable to schedule task", err) + } + }, + }, + }) + if err != nil { + err = fmt.Errorf("failed to initialize periodic task manager: %w", err) + log.Fatal(err) + } + + go func() { + if err := b.mgr.Run(); err != nil { + log.Fatal(err) + } + }() + + go func() { + if err := b.server.Run(b.mux); err != nil { + log.Fatal(err) + } + }() + + backend = b + + return backend, err +} + +// WithAddr configures neoq to connect to Redis with the given address +func WithAddr(addr string) config.Option { + return func(c *config.Config) { + c.ConnectionString = addr + } +} + +// WithPassword configures neoq to connect to Redis with the given password +func WithPassword(password string) config.Option { + return func(c *config.Config) { + c.BackendAuthPassword = password + } +} + +// WithConcurrency configures the number of workers available to process jobs across all queues +func WithConcurrency(concurrency int) config.Option { + return func(c *config.Config) { + c.BackendConcurrency = concurrency + } +} + +// WithShutdownTimeout specifies the duration to wait to let workers finish their tasks +// before forcing them to abort durning Shutdown() +// +// If unset or zero, default timeout of 8 seconds is used. +func WithShutdownTimeout(timeout time.Duration) config.Option { + return func(c *config.Config) { + c.ShutdownTimeout = timeout + } +} + +// Enqueue queues jobs to be executed asynchronously +func (b *redisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { + if job.Queue == "" { + err = jobs.ErrNoQueueSpecified + return + } + + err = jobs.FingerprintJob(job) + if err != nil { + return + } + + var payload []byte + payload, err = json.Marshal(job.Payload) + if err != nil { + return + } + task := asynq.NewTask(job.Queue, payload) + if job.RunAfter.IsZero() { + _, err = b.client.EnqueueContext(ctx, task, asynq.TaskID(job.Fingerprint)) + } else { + _, err = b.client.EnqueueContext(ctx, task, asynq.TaskID(job.Fingerprint), asynq.ProcessAt(job.RunAfter)) + } + + if err != nil { + err = fmt.Errorf("unable to enqueue task: %w", err) + } + + return job.Fingerprint, err +} + +// Start starts processing jobs with the specified queue and handler +func (b *redisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) { + b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) { + var p map[string]any + if err = json.Unmarshal(t.Payload(), &p); err != nil { + b.logger.Info("job has no payload") + } + + job := &jobs.Job{ + CreatedAt: time.Now(), + Queue: queue, + Payload: p, + } + + ctx = withJobContext(ctx, job) + err = handler.Exec(ctx, h) + if err != nil { + b.logger.Error("error handling job", err) + } + + return + }) + + return +} + +// StartCron starts processing jobs with the specified cron schedule and handler +// +// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format +func (b *redisBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) { + cd, err := crondescriptor.NewCronDescriptor(cronSpec) + if err != nil { + return fmt.Errorf("error creating cron descriptor: %w", err) + } + + cdStr, err := cd.GetDescription(crondescriptor.Full) + if err != nil { + return fmt.Errorf("error getting cron description: %w", err) + } + + queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) + + err = b.Start(ctx, queue, h) + if err != nil { + return + } + + aCronSpec := toAsynqCronspec(cronSpec) + c := &asynq.PeriodicTaskConfig{ + Cronspec: aCronSpec, + Task: asynq.NewTask(queue, nil), + } + b.taskProvider.addConfig(c) + + return +} + +// Asynq does not currently support the seconds field in cron specs. However, it does supports seconds using the +// alternative syntax: @every Xs, where X is the number of seconds between executions +// +// Because of this, when cron specs have six fields (contain seconds), we must convert the seconds field to asynq +// seconds format. +// +// # This implementation is very crude, and unlikely covers the full breadth of the cron spec, panic()ing when necessary +// +// TODO: Refactor if asynq merges https://github.com/hibiken/asynq/pull/644 +func toAsynqCronspec(cronSpec string) string { + // nolint: nestif, gomnd + if strings.Count(cronSpec, "*") == 6 { + fields := strings.Fields(cronSpec) + // nolint: gomnd + if len(fields) == 6 { + secondsField := fields[0] + if secondsField == "*" { + return "@every 1s" + } + + // Handle cronspec divisor syntax, e.g. */30 is every 30 seconds, or @every 30s + if strings.Contains(secondsField, "/") { + seconds := strings.Split(secondsField, "/")[1] + return fmt.Sprintf("@every %ss", seconds) + } + } else { + panic(fmt.Sprintf("invalid cron spec: %s", cronSpec)) + } + } + + return cronSpec +} + +// SetLogger sets this backend's logger +func (b *redisBackend) SetLogger(logger logging.Logger) { + b.logger = logger +} + +// Shutdown halts the worker +func (b *redisBackend) Shutdown(ctx context.Context) { + b.client.Close() + b.server.Shutdown() +} + +// withJobContext creates a new context with the Job set +func withJobContext(ctx context.Context, j *jobs.Job) context.Context { + return context.WithValue(ctx, internal.JobCtxVarKey, j) +} diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go new file mode 100644 index 0000000..0a80cf9 --- /dev/null +++ b/backends/redis/redis_backend_test.go @@ -0,0 +1,310 @@ +package redis + +import ( + "context" + "fmt" + "log" + "os" + "strings" + "testing" + "time" + + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/internal" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/testutils" + "github.com/hibiken/asynq" +) + +const queue = "testing" +const queue2 = "testing2" + +func init() { + var err error + var connString = os.Getenv("TEST_REDIS_URL") + if connString == "" { + return + } + + password := os.Getenv("REDIS_PASSWORD") + clientOpt := asynq.RedisClientOpt{Addr: connString} + if password != "" { + clientOpt.Password = password + } + inspector := asynq.NewInspector(clientOpt) + queues, err := inspector.Queues() + if err != nil { + panic(err) + } + + for _, queue := range queues { + _, err = inspector.DeleteAllPendingTasks(queue) + if err != nil { + panic(err) + } + + _, err = inspector.DeleteAllCompletedTasks(queue) + if err != nil { + panic(err) + } + + _, err = inspector.DeleteAllRetryTasks(queue) + if err != nil { + panic(err) + } + + _, err = inspector.DeleteAllArchivedTasks(queue) + if err != nil { + panic(err) + } + } +} + +func TestBasicJobProcessing(t *testing.T) { + var timeoutTimer = time.After(5 * time.Second) + var done = make(chan bool) + + var connString = os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + ctx := context.TODO() + nq, err := neoq.New( + ctx, + neoq.WithBackend(Backend), + WithAddr(connString), + WithPassword(password), + WithShutdownTimeout(time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), + }, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + log.Println("DONE") + } + + if err != nil { + t.Error(err) + } +} + +// TestBasicJobMultipleQueue tests that the redis backend is able to process jobs on multiple queues +func TestBasicJobMultipleQueue(t *testing.T) { + var done = make(chan bool) + var doneCnt = 0 + + var timeoutTimer = time.After(30 * time.Second) + + var connString = os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + ctx := context.TODO() + nq, err := neoq.New(ctx, + neoq.WithBackend(Backend), + WithAddr(connString), + WithPassword(password), + WithShutdownTimeout(time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + h2 := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } + + err = nq.Start(ctx, queue2, h2) + if err != nil { + t.Error(err) + } + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), + }, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + jid2, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue2, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), + }, + }) + if e != nil || jid2 == jobs.DuplicateJobID { + t.Error(e) + } + +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case <-done: + log.Println("DONE") + doneCnt++ + if doneCnt == 2 { + break results_loop + } + } + } + + if err != nil { + t.Error(err) + } +} + +func TestStartCron(t *testing.T) { + done := make(chan bool) + + var timeoutTimer = time.After(5 * time.Second) + + var connString = os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + ctx := context.TODO() + nq, err := neoq.New(ctx, neoq.WithBackend(Backend), WithAddr(connString), WithPassword(password)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(func(_ context.Context) (err error) { + done <- true + return + }) + + err = nq.StartCron(ctx, "* * * * * *", h) + if err != nil { + t.Error(err) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + } + + if err != nil { + t.Error(err) + } +} + +func TestJobProcessingWithOptions(t *testing.T) { + const queue = "testing" + var timeoutTimer = time.After(5 * time.Second) + var done = make(chan bool) + + var connString = os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + ctx := context.TODO() + nq, err := neoq.New( + ctx, + neoq.WithBackend(Backend), + WithAddr(connString), + WithPassword(password), + WithShutdownTimeout(time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + buf := &strings.Builder{} + nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done}) + + h := handler.New(func(_ context.Context) (err error) { + time.Sleep(50 * time.Millisecond) + done <- true + return + }) + h.WithOptions( + handler.JobTimeout(1*time.Millisecond), + handler.Concurrency(1), + ) + + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", internal.RandInt(10000000000)), + }, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + expectedLogMsg := "error handling job job exceeded its 1ms timeout: context deadline exceeded" //nolint: dupword + actualLogMsg := strings.Trim(buf.String(), "\n") + if actualLogMsg != expectedLogMsg { + t.Error(fmt.Errorf("%s != %s", actualLogMsg, expectedLogMsg)) //nolint:all + } + } + + if err != nil { + t.Error(err) + } +} diff --git a/config/config.go b/config/config.go index c58d9fe..f259159 100644 --- a/config/config.go +++ b/config/config.go @@ -17,12 +17,20 @@ const ( DefaultJobCheckInterval = 5 * time.Second ) +// Config configures neoq and its backends +// +// This configuration struct includes options for all backends. As such, some of its options are not implicable to all +// backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a +// per-handler basis. type Config struct { BackendInitializer BackendInitializer + BackendAuthPassword string // password with which to authenticate to the backend's data provider + BackendConcurrency int // total number of backend processes available to process jobs ConnectionString string // a string containing connection details for the backend JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed + ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown } // Option is a function that sets optional backend configuration diff --git a/doc.go b/doc.go index bf25292..52bb592 100644 --- a/doc.go +++ b/doc.go @@ -1,8 +1,13 @@ -// Package neoq provides background job processing for Go applications. +// Package neoq is a queue-agnostic background job framework for Go. // -// Neoq's goal is to minimize the infrastructure necessary to add background job processing to -// Go applications. It does so by implementing queue durability with modular backends, rather -// than introducing a strict dependency on a particular backend such as Redis. +// Neoq job handlers are the same, whether queues are in-memory for development/testing, +// or Postgres, Redis, or a custom queue for production -- allowing queue infrastructure to +// change without code change. // -// An in-memory and Postgres backend are provided out of the box. +// Developing/testing or don't need a durable queue? Use the in-memory queue. +// Running an application in production? Use Postgres. +// Have higher throughput demands in production? Use Redis. + +// Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a +// _minimal infrastructure footprint_. package neoq diff --git a/env.sample b/env.sample index fd2f701..4484946 100644 --- a/env.sample +++ b/env.sample @@ -1,3 +1,4 @@ TEST_DATABASE_URL= TEST_REDIS_URL= +REDIS_PASSWORD= diff --git a/examples/add_job_with_custom_concurrency/main.go b/examples/add_job_with_custom_concurrency/main.go index 778f53f..b41554c 100644 --- a/examples/add_job_with_custom_concurrency/main.go +++ b/examples/add_job_with_custom_concurrency/main.go @@ -54,7 +54,7 @@ func main() { <-done - // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms deadline' - // until either the job's Sleep statement is decreased/removed or the handler's deadline is increased + // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms timeout' + // until either the job's Sleep statement is decreased/removed or the handler's timeout is increased // this job will continue to fail and ultimately land on the dead jobs queue } diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_timeout/main.go similarity index 89% rename from examples/add_job_with_deadline/main.go rename to examples/add_job_with_timeout/main.go index 03482de..425309e 100644 --- a/examples/add_job_with_deadline/main.go +++ b/examples/add_job_with_timeout/main.go @@ -37,8 +37,8 @@ func main() { return }) - // this 10ms deadline will cause our job that sleeps for 1s to fail - h.WithOptions(handler.Deadline(10 * time.Millisecond)) + // this 10ms timeout will cause our job that sleeps for 1s to fail + h.WithOptions(handler.JobTimeout(10 * time.Millisecond)) err = nq.Start(ctx, queue, h) if err != nil { @@ -57,7 +57,7 @@ func main() { <-done - // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms deadline' - // until either the job's Sleep statement is decreased/removed or the handler's deadline is increased + // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms timeout' + // until either the job's Sleep statement is decreased/removed or the handler's timeout is increased // this job will continue to fail and ultimately land on the dead jobs queue } diff --git a/examples/add_periodic_jobs/main.go b/examples/add_periodic_jobs/main.go index 96bc8de..a95774f 100644 --- a/examples/add_periodic_jobs/main.go +++ b/examples/add_periodic_jobs/main.go @@ -23,7 +23,7 @@ func main() { return }) h.WithOptions( - handler.Deadline(500*time.Millisecond), + handler.JobTimeout(500*time.Millisecond), handler.Concurrency(1), ) diff --git a/examples/add_redis_job/main.go b/examples/add_redis_job/main.go new file mode 100644 index 0000000..889b66e --- /dev/null +++ b/examples/add_redis_job/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "log" + + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/redis" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" +) + +func main() { + + done := make(chan bool) + ctx := context.Background() + nq, _ := neoq.New(ctx, + neoq.WithBackend(redis.Backend), + redis.WithAddr("localhost:6379"), + redis.WithPassword(""), + ) + + nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + j, _ := jobs.FromContext(ctx) + log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) + done <- true + return + })) + + nq.Enqueue(ctx, &jobs.Job{ + Queue: "hello_world", + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + + <-done + nq.Shutdown(ctx) +} diff --git a/go.mod b/go.mod index b9010fb..30103ca 100644 --- a/go.mod +++ b/go.mod @@ -13,13 +13,25 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-redis/redis/v8 v8.11.2 // indirect + github.com/golang/protobuf v1.4.2 // indirect + github.com/google/uuid v1.2.0 // indirect + github.com/hibiken/asynq v0.24.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/spf13/cast v1.3.1 // indirect go.uber.org/atomic v1.10.0 // indirect + go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect + google.golang.org/protobuf v1.25.0 // indirect ) diff --git a/go.sum b/go.sum index 10ff1bd..2bd0328 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,47 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis/v8 v8.11.2 h1:WqlSpAwz8mxDSMCvbyz1Mkiqe0LE5OY4j3lgkvu1Ts0= +github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/guregu/null v4.0.0+incompatible h1:4zw0ckM7ECd6FNNddc3Fu4aty9nTlpkkzH7dPn4/4Gw= github.com/guregu/null v4.0.0+incompatible/go.mod h1:ePGpQaN9cw0tj45IR5E5ehMvsFlLlQZAkkOXZurJ3NM= +github.com/hibiken/asynq v0.24.0 h1:r1CiSVYCy1vGq9REKGI/wdB2D5n/QmtzihYHHXOuBUs= +github.com/hibiken/asynq v0.24.0/go.mod h1:FVnRfUTm6gcoDkM/EjF4OIh5/06ergCPUO6pS2B2y+w= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -23,22 +58,38 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= @@ -48,32 +99,105 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w= golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/handler/handler.go b/handler/handler.go index 2db354a..a67eb95 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -9,7 +9,7 @@ import ( ) const ( - DefaultHandlerDeadline = 30 * time.Second + DefaultHandlerTimeout = 30 * time.Second ) var ( @@ -25,7 +25,7 @@ type Func func(ctx context.Context) error type Handler struct { Handle Func Concurrency int - Deadline time.Duration + JobTimeout time.Duration QueueCapacity int64 } @@ -39,12 +39,12 @@ func (h *Handler) WithOptions(opts ...Option) { } } -// Deadline configures handlers with a time deadline for every executed job -// The deadline is the amount of time that can be spent executing the handler's Func -// when a deadline is exceeded, the job is failed and enters its retry phase -func Deadline(d time.Duration) Option { +// JobTimeout configures handlers with a time deadline for every executed job +// The timeout is the amount of time that can be spent executing the handler's Func +// when a timeout is exceeded, the job fails and enters its retry phase +func JobTimeout(d time.Duration) Option { return func(h *Handler) { - h.Deadline = d + h.JobTimeout = d } } @@ -72,22 +72,22 @@ func New(f Func, opts ...Option) (h Handler) { h.WithOptions(opts...) - // default to running one fewer threads than CPUs + // default to running on as many goroutines as there are CPUs if h.Concurrency == 0 { - Concurrency(runtime.NumCPU() - 1)(&h) + Concurrency(runtime.NumCPU())(&h) } - // always set a job deadline if none is set - if h.Deadline == 0 { - Deadline(DefaultHandlerDeadline)(&h) + // always set a job timeout if none is set + if h.JobTimeout == 0 { + JobTimeout(DefaultHandlerTimeout)(&h) } return } -// Exec executes handler functions with a concrete time deadline +// Exec executes handler functions with a concrete timeout func Exec(ctx context.Context, handler Handler) (err error) { - deadlineCtx, cancel := context.WithDeadline(ctx, time.Now().Add(handler.Deadline)) + timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout) defer cancel() var errCh = make(chan error, 1) @@ -104,10 +104,10 @@ func Exec(ctx context.Context, handler Handler) (err error) { err = fmt.Errorf("job failed to process: %w", err) } - case <-deadlineCtx.Done(): - ctxErr := deadlineCtx.Err() + case <-timeoutCtx.Done(): + ctxErr := timeoutCtx.Err() if errors.Is(ctxErr, context.DeadlineExceeded) { - err = fmt.Errorf("job exceeded its %s deadline: %w", handler.Deadline, ctxErr) + err = fmt.Errorf("job exceeded its %s timeout: %w", handler.JobTimeout, ctxErr) } else if errors.Is(ctxErr, context.Canceled) { err = ctxErr } else { diff --git a/jobs/jobs.go b/jobs/jobs.go index c0ca048..db601d9 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -20,8 +20,8 @@ var ( ) const ( - DuplicateJobID = -1 - UnqueuedJobID = -2 + DuplicateJobID = "-1" + UnqueuedJobID = "-2" ) // Job contains all the data pertaining to jobs diff --git a/neoq_test.go b/neoq_test.go index 6c95322..337a646 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -105,7 +105,7 @@ func TestStart(t *testing.T) { return }) h.WithOptions( - handler.Deadline(500*time.Millisecond), + handler.JobTimeout(500*time.Millisecond), handler.Concurrency(1), ) @@ -125,7 +125,7 @@ func TestStart(t *testing.T) { "message": fmt.Sprintf("hello world: %d", i), }, }) - if err != nil || jid == -1 { + if err != nil || jid == jobs.DuplicateJobID { t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err) } } @@ -169,7 +169,7 @@ func TestStartCron(t *testing.T) { }) h.WithOptions( - handler.Deadline(500*time.Millisecond), + handler.JobTimeout(500*time.Millisecond), handler.Concurrency(1), ) diff --git a/types/types.go b/types/types.go index 76dece1..17e27c6 100644 --- a/types/types.go +++ b/types/types.go @@ -15,7 +15,7 @@ import ( // - [pkg/github.com/acaloiaro/neoq/backends/postgres.PgBackend] type Backend interface { // Enqueue queues jobs to be executed asynchronously - Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) + Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) // Start starts processing jobs with the specified queue and handler Start(ctx context.Context, queue string, h handler.Handler) (err error)