diff --git a/README.md b/README.md index aa3109a..3c307d8 100644 --- a/README.md +++ b/README.md @@ -156,11 +156,12 @@ func Push(mgr worker.Manager, job *faktory.Job) error { } func syntheticPush(mgr worker.Manager, job *faktory.Job) error { - if mgr.IsRegistered(job.Type) { - return mgr.Dispatch(job) + err := mgr.InlineDispatch(job) + if err != nil { + return errors.Wrap(err, "syntheticPush failed") } - return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type) + return nil } func realPush(job *faktory.Job) error { diff --git a/manager.go b/manager.go index 6337b3f..6ef453e 100644 --- a/manager.go +++ b/manager.go @@ -48,22 +48,42 @@ func (mgr *Manager) Register(name string, fn Perform) { } } -// IsRegistered checks if a given job name is registered with the manager. +// isRegistered checks if a given job name is registered with the manager. // -// mgr.IsRegistered("SomeJob") -func (mgr *Manager) IsRegistered(name string) bool { +// mgr.isRegistered("SomeJob") +func (mgr *Manager) isRegistered(name string) bool { _, ok := mgr.jobHandlers[name] return ok } -// Dispatch immediately executes a job, including all middleware. -func (mgr *Manager) Dispatch(job *faktory.Job) error { +// dispatch immediately executes a job, including all middleware. +func (mgr *Manager) dispatch(job *faktory.Job) error { perform := mgr.jobHandlers[job.Type] return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform) } +// InlineDispatch is designed for testing. It immediate executes a job, including all middleware, +// as well as performs manager setup if needed. +func (mgr *Manager) InlineDispatch(job *faktory.Job) error { + if !mgr.isRegistered(job.Type) { + return fmt.Errorf("failed to dispatch inline for job type %s; job not registered", job.Type) + } + + err := mgr.setUpWorkerProcess() + if err != nil { + return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err) + } + + err = mgr.dispatch(job) + if err != nil { + return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err) + } + + return nil +} + // Register a callback to be fired when a process lifecycle event occurs. // These are useful for hooking into process startup or shutdown. func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) { diff --git a/runner.go b/runner.go index f552324..2482c25 100644 --- a/runner.go +++ b/runner.go @@ -144,7 +144,7 @@ func processOne(mgr *Manager) error { } } - if !mgr.IsRegistered(job.Type) { + if !mgr.isRegistered(job.Type) { je := &NoHandlerError{JobType: job.Type} err := mgr.with(func(c *faktory.Client) error { return c.Fail(job.Jid, je, nil) @@ -155,7 +155,7 @@ func processOne(mgr *Manager) error { return je } - joberr := mgr.Dispatch(job) + joberr := mgr.dispatch(job) if joberr != nil { // job errors are normal and expected, we don't return early from them mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)