Wrapper for the PIKA for asyncio and humans. See examples and the tutorial in documentation.
If you are newcomer in the RabbitMQ let's start the adopted official RabbitMQ tutorial
- Completely asynchronous API.
- Object oriented API.
- Transparent auto-reconnects with complete state recovery with connect_robust (e.g. declared queues or exchanges, consuming state and bindings).
- Python 3.5+ compatible (include 3.7).
- For python 3.4 users available aio-pika<4
- Transparent publisher confirms support
- Transactions support
pip install aio-pika
Simple consumer:
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
async with connection:
queue_name = "test_queue"
# Creating channel
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
) # type: aio_pika.Queue
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Simple publisher:
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
routing_key = "test_queue"
channel = await connection.channel() # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(
body='Hello {}'.format(routing_key).encode()
),
routing_key=routing_key
)
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Get single message example:
import asyncio
from aio_pika import connect_robust, Message
async def main(loop):
connection = await connect_robust(
"amqp://guest:[email protected]/",
loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange('direct', auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
# Receiving message
incoming_message = await queue.get(timeout=5)
# Confirm message
incoming_message.ack()
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
See another examples and the tutorial in documentation.
This software follows Semantic Versioning
Clone the project:
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
Create a new virtualenv for `aio_pika`_:
virtualenv -p python3.5 env
Install all requirements for `aio_pika`_:
env/bin/pip install -e '.[develop]'
NOTE: In order to run the tests locally you need to run a RabbitMQ instance with default user/password (guest/guest) and port (5672).
- ProTip: Use Docker for this:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
To test just run:
make test
You feel free to create pull request, but you should describe your cases and add some examples.
The changes should follow simple rules:
- When your changes breaks public API you must increase the major version.
- When your changes is safe for public API (e.g. added an argument with default value)
- You have to add test cases (see tests/ folder)
- You must add docstrings
- You feel free to add yourself to "thank's to" section