-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] transition to anyio #302
Conversation
This commit subclasses the paho MQTT client code (somewhat intrusively), replacing its threading with anyio-style tasks.
Recursion is bad for you. More to the point it's not necessary. Also, several common non-matches can be checked for up front.
Pre-split the topic so the comparison code doesn't need to run `split` each time it's called.
Drop the trailing hash from the split wildcard topic. This further simplifies the code and prepares for the next bit.
Subscriptions are arranged into a tree that's traversed when a message is dispatched.
No sense in doing this in the comparator
tests/conftest.py
Outdated
|
||
return ("asyncio", {"policy": WindowsSelectorEventLoopPolicy()}) | ||
return ("asyncio", {}) | ||
return ("trio", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that the test suite will only run against Trio now. The proper fix is to delete the fixture completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's temporary until the next anyio release and/or I can figure out the reason for the test failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
Use `contextlib.aclosing` to cleanly stop the iterator
instead, synchronize via task_status.started()
If the server is busy it might delay forwarding messages until after it sees the unsubscription.
anyio 4.4 warns when that doesn't happen
This change adds subscriptions with separate message queues. Does not yet handle duplicate subscriptions.
needs just a bool return, not a counter
This test doesn't yet support subscribing to a topic multiple times. TODO.
This change skips walking the topic tree, assuming server support.
Now includes a async with Client("test.mosquitto.org") as client:
async with client.subscription("foo/bar") as msgs:
async for m in msgs:
await process(m) |
Hi there, very cool to see so much work on this! 👍 This seems to be the go-ahead for our yearly discussion on this subject 😄 For reference, we had a PR in the past (#152) to switch to anyio that we ultimately decided to abandon. In discussion #44, people were reluctant about a switch to anyio, especially if the changes would impact the interface. All that being said, we focus a lot on keeping aiomqtt as small and maintainable as possible. I can see an internal switch to anyio if it works towards that goal. In the best case, this would be non-breaking. The trio support would be a nice extra, but wouldn't make the switch worth it to me if it increases maintenance cost. I am also still on the edge if it's worth adding another dependency if we get e.g. task groups "for free" once we drop Python <3.11. @frederikaalund, @JonathanPlasse, what are your thoughts on this? 🙂
We had something similar to this in the past with |
import anyio | ||
from anyio import create_memory_object_stream | ||
|
||
from outcome import Error, Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see outcome
in the dependencies. I also don't think it's worth the trouble to implement a wrapper on top of memory object streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I'll fix the dependency.
The wrapper is intended for those who want to use their own asyncio.queue
-based implementation (e.g. LIFO, or something priority-based). If the maintainers decide to drop backwards compatibility and use anyio's object streams instead, I'd be happy to remove it.
Is it absolutely necessary to depend on the |
More fixes required …
@agronholm Paho-MQTT at least has a working MQTTv5 implementation and complete data classes for all the fiddly bits; reimplementing all that is not exactly trivial, thus the straightest way to working threads-free async-only code was to anyio-ify it by way of a subclass. I'm the first person to admit that this subclass is way more intrusive than any sane programmer should be comfortable with, but you got to start somewhere. I do plan to build a reasonable sans-io MQTT core eventually. Ideally I'd then be able to teach the current paho.mqtt.Client class to use that (should be a net reduction in code size …). Assuming that its maintainers are amenable to this, which I'm a bit skeptical about, but we'll see. In the meantime I've submitted a somewhat less intrusive clean-up patch to them. |
@empicano asyncio taskgroups may have the same name, but not the same features of anyio taskgroups. For instance, they don't have cancel scopes; you need to cancel the task the group runs in, which is not the same thing. There are other differences which @agronholm is way more qualified than I am to educate people about. ;-) As to changing the interface, my patch manages not to do that. Admittedly it does this by way of somewhat-invasively subclassing the Paho client but frankly the Paho code could do with a bit of improvement. :-/ As far as "filtered_messages", IMHO it's far more confusing, or rather detrimental to modularity, to require a central "async with client.messages()" loop to do the message dispatching. You can't do a "this task deals with temperatures and that task deals with humidity" pattern that way. It also requires more CPU. IMHO it's a question of designing the right interface. The old NB 9c6a7f4 documents the new |
Fair enough. |
turns out that it obfuscates more than it helps, at least here
I hope Frederik will still comment here, he can frankly review this better than me. I'll say that I'd be pretty neat to get rid of the threads and clean up the code, I'm just concerned about maintainability when adding as much code as we're already stretched thin as is. I agree that that sans-io "backend" would be really cool to see in aiomqtt 👍 On the interface topic:
Maybe I misunderstand you here, I'd say this example in the documentation does that, or not? The flexibility of this approach was in fact one of the main reasons I pushed for the change. Generally, I want this library to serve the most basic use case (which I think is a single task and queue) as well as possible, with flexibility to use it for as much as possible after that. I'd like to avoid having multiple queues by default, because when we still had Apart from that, I'd expect a fixed number of tasks working a single priority queue to fit better in most use cases where we need to process message concurrently, e.g. because the number of messages coming over different topics is often skewed. What do you think about that?
In my opinion, subscriptions and routing should be separate. I admit that context managers for subscriptions are elegant, but most of the time we either don't need to unsubscribe (clean session) or don't want to unsubscribe (persistent session) before disconnection. I want this library to support as many use cases as possible, and dynamic un/subscriptions are part of that. We can still have those via task cancellation with the proposed changes, but I'd like to find a simpler solution. Let me propose an alternative idea to improve the message handling interface: router = aiomqtt.Router()
@router.match("humidity/+/inside/#")
async def handle(message, client, *args): # automatically extract wildcards into *args
print(message.payload)
async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
await client.subscribe("humidity/#")
async for message in client.messages:
await client.route(message) Where we can process messages concurrently e.g. like this: router = aiomqtt.Router()
@router.match("humidity/+/inside/#")
async def handle(message, client, *args): # automatically extract wildcards into *args
print(message.payload)
async def work(client):
async for message in client._messages():
await client.route(message)
async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
await client.subscribe("humidity/#")
async with asyncio.TaskGroup() as tg:
tg.create_task(work(client))
tg.create_task(work(client)) ( I opened #304 with a proof of concept. I'd be glad to hear what you think! |
Well, the downside of separating subscription and routing is, as your example demonstrates, that the router needs to match every message to all your wildcard patterns. My code uses the fact that the server already did that work. As to sans-io – in an ideal world that'd be part of Paho and thus not add to aiomqtt's maintainance burden. We'll see. |
Did I ever mention https://github.com/agronholm/mqttproto ? Caveat: it only supports MQTT v5. |
Umm no you didn't. I'm afraid that this is as good a place as any to cut my losses and switch to that. If nothing else, its overhead is way smaller than anything we could reasonably massage the Paho code into. The fact that the Paho people are very reluctant to accept patches doesn't help either. |
As per #226 here's a draft of an anyio-ified aiomqtt. It abuses paho-mqtt 2.0 by way of subclassing its
Client
class.No more threads. No more issues with locking. Trio compatibility.
Tests pass (one failure on asyncio) but there's still a lot to do; checking robustness and handling reconnections and getting typing back up to standard and finishing the paho callback API v2 transition and support for paho-mqtt 2.1 and general clean-up and support for more MQTTv5 features and whatnot.
I need this code in order to replace my
moat-mqtt
package (which is a hbmqtt port to anyio that shows its age). No guarantees but I plan to keep poking at this.