From 81845a23036efdf5132fd878c3e783d5a9692de3 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 1 Apr 2023 17:54:05 -0700 Subject: [PATCH] Move handlers.FromJobContext() -> jobs.FromContext() (#41) --- README.md | 4 ++-- backends/postgres/postgres_backend.go | 3 ++- config/config.go | 2 +- env.sample | 3 +++ examples/add_job_with_custom_concurrency/main.go | 2 +- examples/add_job_with_deadline/main.go | 2 +- examples/add_postgres_job/main.go | 2 +- examples/start_processing_jobs/main.go | 2 +- handler/handler.go | 11 ----------- jobs/jobs.go | 13 +++++++++++++ 10 files changed, 25 insertions(+), 19 deletions(-) create mode 100644 env.sample diff --git a/README.md b/README.md index ca4cf06..64b02e6 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter. ctx := context.Background() nq, _ := neoq.New(ctx) nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { - j, _ := handler.JobFromContext(ctx) + j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return })) @@ -82,7 +82,7 @@ nq, _ := neoq.New(ctx, ) nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { - j, _ := handler.JobFromContext(ctx) + j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return })) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 1091fbc..b6f396f 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -518,6 +518,7 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, // // ultimately, this means that any time a database connection is lost while updating job status, then the job will be // processed at least one more time. +// nolint: cyclop func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { status := internal.JobStatusProcessed errMsg := "" @@ -528,7 +529,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { } var job *jobs.Job - if job, err = handler.JobFromContext(ctx); err != nil { + if job, err = jobs.FromContext(ctx); err != nil { return fmt.Errorf("error getting job from context: %w", err) } diff --git a/config/config.go b/config/config.go index 8ec1b40..e31b1ad 100644 --- a/config/config.go +++ b/config/config.go @@ -21,8 +21,8 @@ const ( type Config struct { BackendInitializer BackendInitializer ConnectionString string // a string containing connection details for the backend - FutureJobWindow time.Duration // the window of time between now and RunAfter that goroutines are scheduled for future jobs JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs + FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed } diff --git a/env.sample b/env.sample new file mode 100644 index 0000000..fd2f701 --- /dev/null +++ b/env.sample @@ -0,0 +1,3 @@ +TEST_DATABASE_URL= +TEST_REDIS_URL= + diff --git a/examples/add_job_with_custom_concurrency/main.go b/examples/add_job_with_custom_concurrency/main.go index 9730af6..778f53f 100644 --- a/examples/add_job_with_custom_concurrency/main.go +++ b/examples/add_job_with_custom_concurrency/main.go @@ -27,7 +27,7 @@ func main() { // Option 1: add options when creating the handler h := handler.New(func(ctx context.Context) (err error) { var j *jobs.Job - j, err = handler.JobFromContext(ctx) + j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true return diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_deadline/main.go index 33fb364..03482de 100644 --- a/examples/add_job_with_deadline/main.go +++ b/examples/add_job_with_deadline/main.go @@ -31,7 +31,7 @@ func main() { h := handler.New(func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) - j, err = handler.JobFromContext(ctx) + j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true return diff --git a/examples/add_postgres_job/main.go b/examples/add_postgres_job/main.go index e7c031d..308bfb9 100644 --- a/examples/add_postgres_job/main.go +++ b/examples/add_postgres_job/main.go @@ -28,7 +28,7 @@ func main() { h := handler.New(func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) - j, err = handler.JobFromContext(ctx) + j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true return diff --git a/examples/start_processing_jobs/main.go b/examples/start_processing_jobs/main.go index 7632929..fea70cb 100644 --- a/examples/start_processing_jobs/main.go +++ b/examples/start_processing_jobs/main.go @@ -23,7 +23,7 @@ func main() { h := handler.New(func(ctx context.Context) (err error) { var j *jobs.Job - j, err = handler.JobFromContext(ctx) + j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return }) diff --git a/handler/handler.go b/handler/handler.go index da78d57..755e4d6 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -7,7 +7,6 @@ import ( "runtime" "time" - "github.com/acaloiaro/neoq/internal" "github.com/acaloiaro/neoq/jobs" ) @@ -120,13 +119,3 @@ func Exec(ctx context.Context, handler Handler) (err error) { return } - -// JobFromContext fetches the job from a context if the job context variable is already set -func JobFromContext(ctx context.Context) (j *jobs.Job, err error) { - var ok bool - if j, ok = ctx.Value(internal.JobCtxVarKey).(*jobs.Job); ok { - return - } - - return nil, ErrContextHasNoJob -} diff --git a/jobs/jobs.go b/jobs/jobs.go index ea6ed19..c0ca048 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -1,6 +1,7 @@ package jobs import ( + "context" "crypto/md5" // nolint: gosec "encoding/json" "errors" @@ -8,10 +9,12 @@ import ( "io" "time" + "github.com/acaloiaro/neoq/internal" "github.com/guregu/null" ) var ( + ErrContextHasNoJob = errors.New("context has no Job") ErrJobTimeout = errors.New("timed out waiting for job(s)") ErrNoQueueSpecified = errors.New("this job does not specify a queue. please specify a queue") ) @@ -68,3 +71,13 @@ func FingerprintJob(j *Job) (err error) { return } + +// FromContext fetches the job from a context if the job context variable is set +func FromContext(ctx context.Context) (j *Job, err error) { + var ok bool + if j, ok = ctx.Value(internal.JobCtxVarKey).(*Job); ok { + return + } + + return nil, ErrContextHasNoJob +}