Skip to content

Commit

Permalink
Move handlers.FromJobContext() -> jobs.FromContext() (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Apr 2, 2023
1 parent d8c6378 commit 81845a2
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 19 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter.
ctx := context.Background()
nq, _ := neoq.New(ctx)
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
j, _ := handler.JobFromContext(ctx)
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))
Expand Down Expand Up @@ -82,7 +82,7 @@ nq, _ := neoq.New(ctx,
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
j, _ := handler.JobFromContext(ctx)
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))
Expand Down
3 changes: 2 additions & 1 deletion backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job,
//
// ultimately, this means that any time a database connection is lost while updating job status, then the job will be
// processed at least one more time.
// nolint: cyclop
func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
status := internal.JobStatusProcessed
errMsg := ""
Expand All @@ -528,7 +529,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
}

var job *jobs.Job
if job, err = handler.JobFromContext(ctx); err != nil {
if job, err = jobs.FromContext(ctx); err != nil {
return fmt.Errorf("error getting job from context: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (
type Config struct {
BackendInitializer BackendInitializer
ConnectionString string // a string containing connection details for the backend
FutureJobWindow time.Duration // the window of time between now and RunAfter that goroutines are scheduled for future jobs
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs
IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed
}

Expand Down
3 changes: 3 additions & 0 deletions env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TEST_DATABASE_URL=<TEST_DATABASE_URL>
TEST_REDIS_URL=<TEST_REDIS_URL>

2 changes: 1 addition & 1 deletion examples/add_job_with_custom_concurrency/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
// Option 1: add options when creating the handler
h := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = handler.JobFromContext(ctx)
j, err = jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
done <- true
return
Expand Down
2 changes: 1 addition & 1 deletion examples/add_job_with_deadline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
h := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
time.Sleep(1 * time.Second)
j, err = handler.JobFromContext(ctx)
j, err = jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
done <- true
return
Expand Down
2 changes: 1 addition & 1 deletion examples/add_postgres_job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
h := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
time.Sleep(1 * time.Second)
j, err = handler.JobFromContext(ctx)
j, err = jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
done <- true
return
Expand Down
2 changes: 1 addition & 1 deletion examples/start_processing_jobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {

h := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = handler.JobFromContext(ctx)
j, err = jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
})
Expand Down
11 changes: 0 additions & 11 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"runtime"
"time"

"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
)

Expand Down Expand Up @@ -120,13 +119,3 @@ func Exec(ctx context.Context, handler Handler) (err error) {

return
}

// JobFromContext fetches the job from a context if the job context variable is already set
func JobFromContext(ctx context.Context) (j *jobs.Job, err error) {
var ok bool
if j, ok = ctx.Value(internal.JobCtxVarKey).(*jobs.Job); ok {
return
}

return nil, ErrContextHasNoJob
}
13 changes: 13 additions & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package jobs

import (
"context"
"crypto/md5" // nolint: gosec
"encoding/json"
"errors"
"fmt"
"io"
"time"

"github.com/acaloiaro/neoq/internal"
"github.com/guregu/null"
)

var (
ErrContextHasNoJob = errors.New("context has no Job")
ErrJobTimeout = errors.New("timed out waiting for job(s)")
ErrNoQueueSpecified = errors.New("this job does not specify a queue. please specify a queue")
)
Expand Down Expand Up @@ -68,3 +71,13 @@ func FingerprintJob(j *Job) (err error) {

return
}

// FromContext fetches the job from a context if the job context variable is set
func FromContext(ctx context.Context) (j *Job, err error) {
var ok bool
if j, ok = ctx.Value(internal.JobCtxVarKey).(*Job); ok {
return
}

return nil, ErrContextHasNoJob
}

0 comments on commit 81845a2

Please sign in to comment.