Skip to content

Commit

Permalink
Remove add multiple handlers per queue (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Apr 2, 2023
1 parent d47845d commit d8c6378
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 94 deletions.
74 changes: 41 additions & 33 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type MemBackend struct {
types.Backend
config *config.Config
logger logging.Logger
handlers *sync.Map // map queue names [string] to queue handlers [*Handler]
handlers *sync.Map // map queue names [string] to queue handlers [Handler]
fingerprints *sync.Map // map fingerprints [string] to job [Job]
futureJobs *sync.Map // map jobIDs [int64] to job [Job]
queues *sync.Map // map queue names [string] to queue handler channels [chan Job]
cron *cron.Cron
mu *sync.Mutex // mutext to protect mutating state on a pgWorker
cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown()
Expand All @@ -46,6 +47,7 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend,
config: config.New(),
cron: cron.New(),
mu: &sync.Mutex{},
queues: &sync.Map{},
handlers: &sync.Map{},
futureJobs: &sync.Map{},
fingerprints: &sync.Map{},
Expand All @@ -66,14 +68,15 @@ func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend,

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) {
var ha any
var queueChan chan *jobs.Job
var qc any
var ok bool

if ha, ok = m.handlers.Load(job.Queue); !ok {
if qc, ok = m.queues.Load(job.Queue); !ok {
return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue)
}

handlers := ha.([]*handler.Handler)
queueChan = qc.(chan *jobs.Job)

// Make sure RunAfter is set to a non-zero value if not provided by the caller
// if already set, schedule the future job
Expand Down Expand Up @@ -107,9 +110,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e
jobID = m.jobCount

if job.RunAfter.Equal(now) {
for _, h := range handlers {
h.JobsChannel <- job
}
queueChan <- job
} else {
m.queueFutureJob(job)
}
Expand All @@ -119,30 +120,21 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, e

// Start starts processing jobs with the specified queue and handler
func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
var ha any
var ok bool

var queueCapacity = h.QueueCapacity
if queueCapacity == emptyCapacity {
queueCapacity = defaultMemQueueCapacity
}

h.JobsChannel = make(chan *jobs.Job, queueCapacity)
if ha, ok = m.handlers.Load(queue); ok {
handlers := ha.([]*handler.Handler)
handlers = append(handlers, &h)
m.handlers.Store(queue, handlers)
} else {
m.handlers.Store(queue, []*handler.Handler{&h})
}
m.handlers.Store(queue, h)
m.queues.Store(queue, make(chan *jobs.Job, queueCapacity))

ctx, cancel := context.WithCancel(ctx)

m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()

err = m.start(ctx, queue, h)
err = m.start(ctx, queue)
if err != nil {
return
}
Expand Down Expand Up @@ -200,19 +192,35 @@ func (m *MemBackend) Shutdown(ctx context.Context) {
}

// start starts a processor that handles new incoming jobs and future jobs
func (m *MemBackend) start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
var queueChan chan *jobs.Job
var qc any
var ht any
var h handler.Handler
var ok bool

if ht, ok = m.handlers.Load(queue); !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
}

if qc, ok = m.queues.Load(queue); !ok {
return fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
}

go func() { m.scheduleFutureJobs(ctx, queue) }()

h = ht.(handler.Handler)
queueChan = qc.(chan *jobs.Job)

for i := 0; i < h.Concurrency; i++ {
go func() {
var retries int
var err error
var job *jobs.Job

for {
select {
case job = <-h.JobsChannel:
retries, err = m.handleJob(ctx, job, h)
case job = <-queueChan:
err = m.handleJob(ctx, job, h)
case <-ctx.Done():
return
}
Expand All @@ -223,10 +231,8 @@ func (m *MemBackend) start(ctx context.Context, queue string, h handler.Handler)
}

m.logger.Error("job failed", err, "job_id", job.ID)
runAfter := internal.CalculateBackoff(retries)
runAfter := internal.CalculateBackoff(job.Retries)
job.RunAfter = runAfter
job.Retries = retries
job.Error = null.StringFrom(err.Error())
m.queueFutureJob(job)
}

Expand All @@ -246,18 +252,17 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
// loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds
m.futureJobs.Range(func(_, v any) bool {
job := v.(*jobs.Job)
var queueChan chan *jobs.Job

timeUntilRunAfter := time.Until(job.RunAfter)
if timeUntilRunAfter <= m.config.FutureJobWindow {
m.removeFutureJob(job.ID)
go func(j *jobs.Job) {
scheduleCh := time.After(timeUntilRunAfter)
<-scheduleCh
if ha, ok := m.handlers.Load(queue); ok {
handlers := ha.([]*handler.Handler)
for _, h := range handlers {
h.JobsChannel <- j
}
if qc, ok := m.queues.Load(queue); ok {
queueChan = qc.(chan *jobs.Job)
queueChan <- j
} else {
m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), handler.ErrNoHandlerForQueue)
}
Expand All @@ -275,16 +280,19 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
}
}

func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (retries int, err error) {
func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) {
ctx = withJobContext(ctx, job)

// check if the job is being retried and increment retry count accordingly
if job.Status != internal.JobStatusNew {
retries = job.Retries + 1
job.Retries++
}

// execute the queue handler of this job
err = handler.Exec(ctx, h)
if err != nil {
job.Error = null.StringFrom(err.Error())
}

return
}
Expand Down
3 changes: 2 additions & 1 deletion backends/memory/memory_backend_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func TestingBackend(conf *config.Config,
config: conf,
cron: c,
mu: &sync.Mutex{},
handlers: &sync.Map{},
queues: queues,
handlers: h,
futureJobs: futureJobs,
fingerprints: fingerprints,
logger: logger,
Expand Down
59 changes: 0 additions & 59 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,62 +230,3 @@ func TestCron(t *testing.T) {
t.Error(err)
}
}

func TestFutureMultipleHandlerOnSameQueue(t *testing.T) {
ctx := context.Background()
var done = make(chan bool, 1)
doneCnt := 0
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h1 := handler.New(func(ctx context.Context) (err error) {
done <- true
return
})

h2 := handler.New(func(ctx context.Context) (err error) {
done <- true
return
})

if err := nq.Start(ctx, queue, h1); err != nil {
t.Fatal(err)
}
if err := nq.Start(ctx, queue, h2); err != nil {
t.Fatal(err)
}

jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello multie queue world",
},
})
if err != nil || jid == jobs.DuplicateJobID {
err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err)
t.Error(err)
}

timeout := time.After(2 * time.Second)

result_loop:
for {
select {
case <-timeout:
err = errors.New("timeout")
break result_loop
case <-done:
doneCnt++
}
if doneCnt == 2 {
break result_loop
}
}

if err != nil {
t.Error(err)
}
}
1 change: 0 additions & 1 deletion handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type Handler struct {
Concurrency int
Deadline time.Duration
QueueCapacity int64
JobsChannel chan *jobs.Job
}

// Option is function that sets optional configuration for Handlers
Expand Down

0 comments on commit d8c6378

Please sign in to comment.