This project is a complete worker and client implementation for the Faktory job server. You can use it to either consume jobs from Faktory or push jobs to the Faktory server to be processed.
Requires Python 3.7+.
❌ 0.5.0
✅ 0.6
✅ 0.7
✅ 0.8
✅ 1.0 and up
- Creating a worker to run jobs from Faktory
- Concurrency (with multiple processes or threads with the
use_threads=True
option) - Pushing work to Faktory from Python (with retries, custom metadata and scheduled support)
- Pushing exception / errors from Python back up to Faktory
- Sends worker status back to Faktory
- Supports quiet and teminate from the Faktory web UI
- Password authentication
- TLS support
- Graceful worker shutdown (ctrl-c will allow 15s for pending jobs to finish)
- Documentation (in progress, help would be appreciated)
- Tests (in progress, help would be appreciated)
- Django integration (
./manage.py runworker
andapp/tasks.py
support)
pip install faktory
There is a client context manager that you can use like this:
import faktory
with faktory.connection() as client:
client.queue('test', args=(1, 2))
client.queue('test', args=(4, 5), queue='other')
test
doesn't need to be implemented by the Python worker, it can be any of the available worker implementations.
To create a faktory worker (to process jobs from the server) you'll need something like this:
from faktory import Worker
def your_function(x, y):
return x + y
w = Worker(queues=['default'], concurrency=1)
w.register('test', your_function)
w.run() # runs until control-c or worker shutdown from Faktory web UI
The default mode of concurrency is to use a ProcessPoolExecutor. Multiple processes are started, the number being controlled by the concurrency
keyword argument of the Worker
class. New processes are started only once, and stay up, processing jobs from the queue. There is the possibility to use threads instead of processes as a concurency mechanism. This is done by using use_threads=True
at Worker creation. As with processes, threads are started once and reused for each job. When doing so, be mindful of the consequences of using threads in your code, like global variables concurrent access, or the fact that initialization code that is run outside of the registered functions will be run only once at worker startup, not once for each thread.
There is very basic example worker and an example producer that you can use as a basis for your project.
faktory_worker_python uses this format for the Faktory URL:
tcp://:password@localhost:7419
or with TLS:
tcp+tls://:password@localhost:7419
If the environment variable FAKTORY_URL
is set, that is used. Otherwise you can pass the server URL in to the Worker
or Client
constructor, like this:
w = Worker(faktory="tcp://localhost:7419")
The worker users Python's built in logging module, which you can enable like this before calling .run()
:
import logging
logging.basicConfig(level=logging.DEBUG)
When using the default multiprocessing mode of concurrency, the underlying process pool uses the standard library's pickle
module to serialize registered functions. However, a function can only be pickled
if defined directly at the top-level of a module. If the function is instead produced by a decorator, the pickling won't work. A workaround for this issue is to change the mode of concurrenry and use threads instead:
w = Worker(..., use_threads=True)