Python 2 & 3 compatible.
An AMQP and Redis, producer-consumer result store that facilitates IPC between client and celery worker process. Altenatively, it may be used in place of a result backend or as a stand alone result store for producer-consumer style applications. Consumer is blocking, while producer is fire-and-forget. The producer may wait for an ack in the case of amqp.
Installation:
pip install resultstore
Or, to install from source:
python setup.py install
[Stand alone usage]
>>> from resultstore.amqp import BlockingProducer, BlockingConsumer, uid
>>> correlation_id = uid()
>>> p = BlockingProducer(task_id=correlation_id)
>>> c = BlockingConsumer(task_id=correlation_id)
>>> p.send_message('hello world!')
>>> print(c.get())
hello world!
>>>
Producer and consumer above can be in different processes as long as they can communicate or agree upon a common task-id.
[Usage with Celery]
AMQP producer-consumer
Client code (consumer) that calls a celery task in an async manner. Blocking consumer that blocks for message from celery worker process.
from amqp import BlockingConsumer, uid
task_id = uid()
consumer = BlockingConsumer(task_id)
# pass task_id along to celery task
async_result = my_celery_task.apply_async(args=(), kwargs=dict(task_id=task_id))
# Block on a message from Producer
message = consumer.get()
Task code (producer)
from amqp import BlockingProducer
# app is a celery.Celery() instance
@app.task
def hello_world(*args, **kwargs):
task_id = kwargs.get('task_id')
# communicates with consumer that is defined by matching task-id
producer = BlockingProducer(task_id=task_id)
# continue with task computation
#...
# communicate with client
producer.send_message('hello world!')
Note: Producer-Consumer pair may be reversed. In other words, task may be a consumer while client code can be a producer.
Redis producer-consumer
from pyredis import RedisConsumer
task_id = uid()
consumer = RedisConsumer(task_id, poll_interval=0.5)
# pass task_id along to celery task
async_result = my_celery_task.apply_async(args=(), kwargs=dict(task_id=task_id))
# Block on a message from Producer
message = consumer.get()
from pyredis import RedisProducer
# app is a celery.Celery() instance
@app.task
def hello_world(*args, **kwargs):
task_id = kwargs.get('task_id')
producer = RedisProducer(task_id=task_id)
# continue with task computation
#...
# communicate with client
producer.send_message('hello world!')