-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reconnect and shared connection #237
Comments
Hi Arseny, Thank you for the reproducible example 👍 The code does not correctly initialize the client object. Specifically, you need to call
Very true; that's a nice opportunity for a self-contained first contribution to update the docs on that 😉 |
Hi, I ran and debugged the example, Client initialization is placed in the event, otherwise different event loop errors will be generated, sleep for 0.5s to switch to connect. This way is not recommended. I'm still thinking about how to gracefully handle the two scenarios of receiving messages in an infinite loop and sending messages through routing in the web framework. import uvicorn
import asyncio
from fastapi import FastAPI
from aiomqtt import Client, MqttError
def create_app():
app = FastAPI()
@app.on_event('startup')
async def startup():
loop = asyncio.get_running_loop()
+ client = Client(hostname="localhost")
loop.create_task(consume_mqtt(client), name='blabla')
+ await asyncio.sleep(0.5)
loop.create_task(publish_some_messages(client))
return app
async def _handle_mqtt_msg(message):
print(message.payload)
async def consume_mqtt(client):
reconnect_interval = 5
while True:
try:
await client.__aenter__()
async with client.messages() as messages:
await client.subscribe('test')
await client.publish(topic='test', payload='test')
async for message in messages:
await _handle_mqtt_msg(message=message)
except MqttError as e:
print(e)
await client.__aexit__(None, None, None)
await asyncio.sleep(reconnect_interval)
async def publish_some_messages(client):
await client.publish(topic='test', payload='test')
if __name__ == '__main__':
uvicorn.run(app=create_app(), access_log=False)
|
Seems like a worry-free solution, The disadvantage is if routing is not restricted, concurrency will lead to too many connections. import uvicorn
import asyncio
from fastapi import FastAPI
from aiomqtt import Client, MqttError, ProtocolVersion
app = FastAPI()
@app.on_event('startup')
async def startup():
asyncio.create_task(consume_mqtt())
@app.on_event("shutdown")
async def shutdown():
...
async def consume_mqtt():
reconnect_interval = 5
while True:
try:
async with Client(hostname="localhost") as client:
await client.subscribe('vehicles/v1')
await client.publish(topic='vehicles/v1', payload='test')
async with client.messages() as messages:
async for message in messages:
await asyncio.sleep(0.1)
print(message.payload)
except MqttError as e:
print(e)
await asyncio.sleep(reconnect_interval)
@app.get("/qwer")
async def q():
async with Client(hostname="localhost") as client:
await client.publish(topic='vehicles/v1', payload='test')
return "success"
if __name__ == '__main__':
uvicorn.run(app=app, access_log=False) |
I also have this issue where I have a consumer and producer task and At first I got However, in attempting to correct this error and removing the redundant Your proposed solution @vvanglro would not work for me as I am connecting to IoT Core which only allows a single connection per Client ID, so I wish to re-use the same Client and therefore client ID for both subscribe and publish operations. I'm using aiomqtt for the first time today and am also fairly new to asyncio in general so I may be doing something wrong, but I will just have to use the redundant context manager for now until I can perhaps provide more detail/reproduction steps in addition to the author's. |
Hi Dylan, Maybe the documentation on sharing the connection might help you? From what you wrote I don't believe your problem has the same root cause as this issue. We also have some information specifically on aiomqtt with FastAPI in case you haven't seen that, yet. Let me know if that helps 😋 |
I was following your Sharing the connection doc already, but with the added complexity of a TaskGroup. Attached is my reproduction of the issue, cut down as much as I could before losing all semblance of representing my actual code. The jist is that I have two classes, one for handling messages to and from an MQTT broker and another for to and from RabbitMQ. That is why I have a TaskGroup, to subscribe to both brokers and forward messages from one to the other. I've mocked out the MQ side in this base case. Thankfully, I think I found the issue. (Or at least the most visible one. I know not what other sins I may be committing with your library. For one I should probably now move the retry loop to main() to go along with the context manager.) Running the code as-is I get an error on publishing that the client isn't connected, however, simply switching the ordering of the context managers, the TaskGroup and mqtt_client in main(), that solves my issue! Not sure if this is expected or not. My issue may now be unrelated to the original author's issue, but I along with them and perhaps others could probably benefit from some documentation on what is the best practice for this kind of two-way communications scenario, or any potential gotchas. The sharing the connection demo is just a simple fire and forget two publish calls, not a long-running example encompassing all your guides like for handling Reconnection or parallel processing. I guess I had issues merging them all together myself. I thought my code was close enough to your Listening without blocking example, as my TaskGroup is on the outset as well, but perhaps this has some gotchas when trying to share the connection as well. Or just let me know this was some asyncio rookie mistake and I can go kick rocks. 😂 (Edit for syntax highlighting.) import asyncio
import logging
from asyncio import sleep as AsyncSleep
from functools import partial
from time import time
from typing import Any, Awaitable, Callable
import aiomqtt
from aiomqtt import Client as MQTTClient
from aiomqtt import MqttError
from aiomqtt.client import Message as MQTTMessage
from aiomqtt.types import PayloadType
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MqttDao:
def __init__(self):
self.client = aiomqtt.Client("test.mosquitto.org")
async def start(self, client: aiomqtt.Client, callback: Callable[[MQTTMessage], Awaitable[Any]]):
retry_delay = 0 - 5
while True:
try:
# FIXME - This redundant context manager is illegal: https://sbtinstruments.github.io/aiomqtt/reconnection.html#reconnection
# async with client:
async with client.messages() as messages:
logger.info('[start] Subscribing to topics temperature/outside and humidity/inside...')
await client.subscribe('temperature/outside', qos=1)
await client.subscribe('humidity/inside', qos=1)
retry_delay = 0 - 5
async for message in messages:
logger.debug(f'[start] Invoking callback for message from topic {message.topic}')
await callback(message)
except MqttError:
logger.exception('[start] An MqttError occured...')
if retry_delay <= 30:
retry_delay += 5
logger.info(f"[start] Reconnecting to MQTT broker in {retry_delay} seconds ...")
await AsyncSleep(retry_delay)
except Exception as e:
logger.exception('[start] An unknown exception occurred...')
raise e
# Publishes a given payload to our MQTT broker with the given topic and QoS level
async def publish_message(self, client: MQTTClient, topic: str, payload: PayloadType, qos: int):
logger.debug(f"[publish_message] Publishing MQTT message to topic '{topic}' with qos {qos}: {payload}")
await client.publish(topic=topic, payload=payload, qos=qos)
logger.debug(f"[publish_message] Successfully published MQTT message to topic '{topic}' with qos {qos}: {payload}")
class MqDao:
def __init__(self):
pass
async def start(self, callback: Callable[[MqttDao, MQTTClient, dict], Awaitable[Any]], mqtt_dao: MqttDao, mqtt_client: MQTTClient):
while True:
logger.info('[start] Sleep for 2 seconds instead of waiting on a message from RabbitMQ...')
await AsyncSleep(2)
await callback(mqtt_dao, mqtt_client, {'body': 'Mock message from mock broker!', 'routing_key': 'humidity.inside'})
async def publish_message(self, body: bytes, routing_key: str, timestamp: float = time()):
logger.info('[publish_message] Sleep for 0.25 seconds instead of actually sending a message to RabbitMQ...')
await AsyncSleep(0.25)
# Callback to be invoked by MQTT client responsible for converting MQTT messages into MQ Messages to be published
async def inbound_consumer(mq_dao: MqDao, mqtt_message: MQTTMessage):
payload: bytes = mqtt_message.payload # type: ignore - Our payloads are bytes not floats
routing_key = mqtt_message.topic.value.replace('/', '.') # Slash delimiters in MQTT are dots in AMQP
await mq_dao.publish_message(body=payload, routing_key=routing_key)
# Callback to be invoked by AMQP client responsible for converting MQTT messages into MQ Messages to be published
async def outbound_consumer(mqtt_dao: MqttDao, mqtt_client: MQTTClient, amqp_message: dict):
logger.debug(f"[outbound_consumer] Received MQ message with routing key '{amqp_message['routing_key']}': {amqp_message['body']}")
topic = amqp_message['routing_key'].replace('.', '/') # type: ignore - We will always have a routing key defined
await mqtt_dao.publish_message(client=mqtt_client, topic=topic, payload=amqp_message['body'], qos=1)
def init():
mqtt_dao = MqttDao()
mq_dao = MqDao()
return (mqtt_dao, mq_dao)
async def main():
(mqtt_dao, mq_dao) = init()
try:
async with asyncio.TaskGroup() as tg:
async with mqtt_dao.client as mqtt_client:
inbound_producer_task = tg.create_task(mqtt_dao.start(mqtt_client, partial(inbound_consumer, mq_dao)))
outbound_consumer_task = tg.create_task(mq_dao.start(outbound_consumer, mqtt_dao, mqtt_client))
logger.info('[main] Producer and consumer tasks have now finished: '
+ f'{inbound_producer_task.result()}, {outbound_consumer_task.result()}')
except (asyncio.CancelledError, Exception) as e:
logger.exception('[main] An error occured during the consumer TaskGroup.')
raise e
asyncio.run(main()) |
That's a fun one 😄 I simlified your example down to this, which still gives the same error, and still let's us switch the import asyncio
import aiomqtt
class ABC:
async def start(self, client):
await asyncio.sleep(1)
await client.publish(topic='foo')
class XYZ:
async def start(self, client):
await asyncio.sleep(1)
await client.publish(topic='bar')
async def main():
client = aiomqtt.Client("test.mosquitto.org")
async with asyncio.TaskGroup() as tg:
async with client:
task_1 = tg.create_task(ABC().start(client))
task_2 = tg.create_task(XYZ().start(client))
asyncio.run(main()) The asyncio documentation describes a TaskGroup as "An asynchronous context manager holding a group of tasks. Tasks can be added to the group using The problem here is that both So what happens is something like:
Does that make sense? I hope that also explains why switching the two
This is out of scope for our docs in my opinion. We're explicitely focusing on small examples to keep them understandable and to cover the most amount of use cases. |
Ahhhhh, I believe that makes sense to me! You're a legend for this response. If I understand correctly from your explanation and some additional reading, what you're saying is that the client connects when entering its context, but since nothing is being awaited inside its context (because no awaits occur until reaching the end of the outer context, the TaskGroup), the code is able to progress to the end of the client's context manager, which triggers a disconnect. Context managers are cool, but I've never coded one before myself. Looking at the aiomqtt source code for the magic aenter/aexit functions helped me understand further. It's nice to be back to Python development so I can learn how async is done here after only doing it myself with Typescript for several years. I agree that this seems beyond the scope of the docs, and hope your excellent response may help another confused individual in future digging through Issues like me. 😄 |
I have the following code example which I am trying to get to work.
The point is that I have two coroutines that I run on a startup event in fastapi, one should consume messages, and the other should produce.
It is not possible to use the latest version of fastapi with lifespan now (although how to ensure reconnect is still a question there). Also I need to provide
reconnect so that any crash does not stop the application.
I'm trying to run the following piece of code and it doesn't work.
Surely this is some kind of common use case, but for some reason I don’t see a solution. Can you tell me how to run this code
The text was updated successfully, but these errors were encountered: