Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve inline testing example #75

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ func Push(mgr worker.Manager, job *faktory.Job) error {
}

func syntheticPush(mgr worker.Manager, job *faktory.Job) error {
if mgr.IsRegistered(job.Type) {
return mgr.Dispatch(job)
err := mgr.InlineDispatch(job)
if err != nil {
return errors.Wrap(err, "syntheticPush failed")
}

return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type)
return nil
}

func realPush(job *faktory.Job) error {
Expand Down
30 changes: 25 additions & 5 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,42 @@ func (mgr *Manager) Register(name string, fn Perform) {
}
}

// IsRegistered checks if a given job name is registered with the manager.
// isRegistered checks if a given job name is registered with the manager.
//
// mgr.IsRegistered("SomeJob")
func (mgr *Manager) IsRegistered(name string) bool {
// mgr.isRegistered("SomeJob")
func (mgr *Manager) isRegistered(name string) bool {
_, ok := mgr.jobHandlers[name]

return ok
}

// Dispatch immediately executes a job, including all middleware.
func (mgr *Manager) Dispatch(job *faktory.Job) error {
// dispatch immediately executes a job, including all middleware.
func (mgr *Manager) dispatch(job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
}

// InlineDispatch is designed for testing. It immediate executes a job, including all middleware,
// as well as performs manager setup if needed.
func (mgr *Manager) InlineDispatch(job *faktory.Job) error {
if !mgr.isRegistered(job.Type) {
return fmt.Errorf("failed to dispatch inline for job type %s; job not registered", job.Type)
}

err := mgr.setUpWorkerProcess()
if err != nil {
return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err)
}

err = mgr.dispatch(job)
if err != nil {
return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err)
}

return nil
}

// Register a callback to be fired when a process lifecycle event occurs.
// These are useful for hooking into process startup or shutdown.
func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) {
Expand Down
4 changes: 2 additions & 2 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func processOne(mgr *Manager) error {
}
}

if !mgr.IsRegistered(job.Type) {
if !mgr.isRegistered(job.Type) {
je := &NoHandlerError{JobType: job.Type}
err := mgr.with(func(c *faktory.Client) error {
return c.Fail(job.Jid, je, nil)
Expand All @@ -155,7 +155,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := mgr.Dispatch(job)
joberr := mgr.dispatch(job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
Loading