Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specific workers per job type(s) #291

Open
kydemy-fran opened this issue Aug 4, 2024 · 3 comments
Open

Specific workers per job type(s) #291

kydemy-fran opened this issue Aug 4, 2024 · 3 comments

Comments

@kydemy-fran
Copy link

kydemy-fran commented Aug 4, 2024

Hi, thank you for this awesome library.
We have been using it for a while now, but we have new requirement in our projects that I am not sure we can implement with the current functionality.

Basically we have several services running, each one has its own job types and they all run in parallel in the same DB.
But now we have a new service that has to have several job types, but the same queue type. And the job types are of different nature, some tasks are really fast <20ms others really slow >2 minutes.

Problem A

We need to have many workers for the quick tasks, and only a couple of workers for the slow task.
Right now you can only specify num workers per pool/queue.

Problem B (the biggest one)

The quick tasks must be processed in near real-time fashion. The slow tasks can wait.
If I create a workmap for these queue, and we have many slow tasks, the workers get stuck doing slow tasks, and the quick tasks are not resolved until all the slow tasks created before are processed.
(this is when tasks fail, they are mainly priorized by run_at, and the priority is ignored... unfortunately we have a lot of failures due to unreliable 3rd party services)
If we separate the tasks in different workmaps or pools of workers, because they share the same queue, we get a lot of job with unknown type as both workers try to pull and lock job types of the other type. (even if we handle the unknown type, we do not want for those tasks to be locked constantly until the right pool picks them up)
This is also a problem if you have more job types in your queue than what your service understands. As you could have several services listening the same queue with workers/handlers for different job types. Even if you can ignore them, right now it fetches all jobs of any type, which is not ideal / optimal.

Solutions?

So, first of all, is there a way to achieve this with the current v5 version? (sorry if we have missed it)

If it is not possible, would you be ok for us to create a pull request with the following changes:

  • Create a WorkerOption WithWorkerJobTypes(types ...string) and store these in the worker
  • When the query is generated, if the there are job types defined, add it to the WHERE statement of the query: job_type = ANY(types)
  • Add the queue_type column to the index in the sample migration. (or give both options)

I believe with thes changes:

  • You can have several services (or pools) listening only to the job types that it understand. In other words: you can share queues among services, without conflicts or query collisions.
  • We can create different pools with different number of workers using the same queue, and only get the job_types intended.
  • Gives a simpler way to manage tasks without having to configure an unknown job type handler.

Let us know how we can proceed.
Thank you!

@vgarvardt
Copy link
Owner

The easiest way for you would be to use different queues - one for slow and one for fast tasks. This way you would be able to scale workers in the pool for fast ant slow jobs. Queue is the entity for grouping similar tasks.

Another option could be task priorities - worker and pool have option WithWorkerPollStrategy/WithPoolPollStrategy so you can use PriorityPollStrategy and assign higher priority to the fast-running jobs and lower to the slow-running.

Another option could be to use WithWorkerUnknownJobWorkFunc/WithPoolUnknownJobWorkFunc worker/pool hooks to handle unknown job types, and I would combine it with the first approach - different queues. So that you do not need to change the logic for jobs enqueuing, but route slow tasks to another queue with own worker once the main worker that handles fast jobs finds them.

@kydemy-fran
Copy link
Author

kydemy-fran commented Aug 4, 2024

The easiest way for you would be to use different queues - one for slow and one for fast tasks. This way you would be able to scale workers in the pool for fast ant slow jobs. Queue is the entity for grouping similar tasks.

We already have (with service names) more than 10 queues. If we need to create new queue names by queue+job-groups is going to get very messy.
Maybe is us, but it made sense for us to have queue names per use case or per service name. In these cases, you might have many job types that need different handling.

Another option could be task priorities - worker and pool have option WithWorkerPollStrategy/WithPoolPollStrategy so you can use PriorityPollStrategy and assign higher priority to the fast-running jobs and lower to the slow-running.

The problem here is with the failed jobs, I believe it always retries according to run_at and not priority.. any how, if our workers are all processing slow jobs, there will no be any available worker for quick jobs when they get registered.

Another option could be to use WithWorkerUnknownJobWorkFunc/WithPoolUnknownJobWorkFunc worker/pool hooks to handle unknown job types, and I would combine it with the first approach - different queues. So that you do not need to change the logic for jobs enqueuing, but route slow tasks to another queue with own worker once the main worker that handles fast jobs finds them.

This is what we were doing, but either it gets much more complex or there are many useless queries that get generated and locks to the table/rows that prevent other workers from picking up those jobs.

Please take a look to the PR and let me know if it is acceptable. (it is backwards compatible)
Thank you!

@vgarvardt
Copy link
Owner

The problem here is with the failed jobs, I believe it always retries according to run_at and not priority

retry jobs are working in the same way as regular jobs - worker is not aware if the job failed previously, errors count and last error are available for caller only

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants