-
Notifications
You must be signed in to change notification settings - Fork 7.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) #15458
Comments
Perhaps |
This helps to flush the already-queued messages without waiting a whole second between each one, but it doesn't solve the problem that it's a whole second before the first message is sent, after calling Using --- mqtt_client.c.orig 2025-02-25 10:41:20.280544113 +0000
+++ mqtt_client.c 2025-02-25 10:45:39.571513453 +0000
@@ -1577,6 +1577,7 @@ static void esp_mqtt_task(void *pv)
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
+ uint32_t next_timeout = MQTT_POLL_READ_TIMEOUT_MS;
MQTT_API_LOCK(client);
run_event_loop(client);
// delete long pending messages
@@ -1665,6 +1666,7 @@ static void esp_mqtt_task(void *pv)
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
+ next_timeout = 10;
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
// delete all qos0 publish messages once we process them
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
@@ -1693,7 +1695,7 @@ static void esp_mqtt_task(void *pv)
#endif
}
}
- }
+ } // else reduce next_timeout if it *will* time out before the max. */
if (process_keepalive(client) != ESP_OK) {
break;
@@ -1704,7 +1706,7 @@ static void esp_mqtt_task(void *pv)
ESP_LOGD(TAG, "Refreshing the connection...");
esp_mqtt_abort_connection(client);
client->state = MQTT_STATE_INIT;
- }
+ } // else reduce next_timeout if it *will* time out before the max. */
break;
case MQTT_STATE_WAIT_RECONNECT:
@@ -1733,7 +1735,7 @@ static void esp_mqtt_task(void *pv)
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
- if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
+ if (esp_transport_poll_read(client->transport, max_poll_timeout(client, next_timeout)) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
|
The ESP-IDF MQTT component is fairly unusable for low-latency setups such as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly from the MQTT component's loop() function, or when publishing status updates for other components. This may block for up to 20 seconds(!!) in adverse network conditions. With the `idf_send_async` option, subscribe and unsubscribe requests can still block the loop thread for multiple seconds, but publishing sensor updates is queued for esp-mqtt's own thread to actually send them. Which it does very slowly, no more than one per second, as discussed in espressif/esp-idf#15458 And to top it all off, even with `idf_send_async` set, the so-called 'asynchronous' send can still block for ten seconds because it takes the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs its loop and a network send is timing out. This is reported in espressif/esp-idf#13078 The only way I can see to use esp-mqtt sanely is to use a thread of our own, queueing all sub/unsub/publish requests and invoking the esp-mqtt APIs from that thread. The existing RingBuffer abstraction works nicely for this as it already handles all the atomicity and waking when data are available. I've chosen to avoid allocations by passing the actual data through the ringbuffer, which means we impose a hard limit on the total topic+payload size for each request. An alternative would be to allocate copies in the enqueue() function and to pass *pointers* through the ringbuffer (which could be a different type of queue then, if we wanted to reinvent things).
The ESP-IDF MQTT component is fairly unusable for low-latency setups such as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly from the MQTT component's loop() function, or when publishing status updates for other components. This may block for up to 20 seconds(!!) in adverse network conditions. With the `idf_send_async` option, subscribe and unsubscribe requests can still block the loop thread for multiple seconds, but publishing sensor updates is queued for esp-mqtt's own thread to actually send them. Which it does very slowly, no more than one per second, as discussed in espressif/esp-idf#15458 And to top it all off, even with `idf_send_async` set, the so-called 'asynchronous' send can still block for ten seconds because it takes the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs its loop and a network send is timing out. This is reported in espressif/esp-idf#13078 The only way I can see to use esp-mqtt sanely is to use a thread of our own, queueing all sub/unsub/publish requests and invoking the esp-mqtt APIs from that thread. The existing RingBuffer abstraction works nicely for this as it already handles all the atomicity and waking when data are available. I've chosen to avoid allocations by passing the actual data through the ringbuffer, which means we impose a hard limit on the total topic+payload size for each request. An alternative would be to allocate copies in the enqueue() function and to pass *pointers* through the ringbuffer (which could be a different type of queue then, if we wanted to reinvent things). Fixes: esphome#6810
The ESP-IDF MQTT component is fairly unusable for low-latency setups such as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly from the MQTT component's loop() function, or when publishing status updates for other components. This may block for up to 20 seconds(!!) in adverse network conditions. With the `idf_send_async` option, subscribe and unsubscribe requests can still block the loop thread for multiple seconds, but publishing sensor updates is queued for esp-mqtt's own thread to actually send them. Which it does very slowly, no more than one per second, as discussed in espressif/esp-idf#15458 And to top it all off, even with `idf_send_async` set, the so-called 'asynchronous' send can still block for ten seconds because it takes the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs its loop and a network send is timing out. This is reported in espressif/esp-idf#13078 The only way I can see to use esp-mqtt sanely is to use a thread of our own, queueing all sub/unsub/publish requests and invoking the esp-mqtt APIs from that thread. The existing RingBuffer abstraction works nicely for this as it already handles all the atomicity and waking when data are available. I've chosen to avoid allocations by passing the actual data through the ringbuffer, which means we impose a hard limit on the total topic+payload size for each request. An alternative would be to allocate copies in the enqueue() function and to pass *pointers* through the ringbuffer (which could be a different type of queue then, if we wanted to reinvent things). Fixes: esphome#6810
Answers checklist.
IDF version.
v5.1.5
Espressif SoC revision.
ESP32-S3
Operating System used.
Linux
How did you build your project?
Other (please specify in More Information)
If you are using Windows, please specify command line type.
None
Development Kit.
Custom Board
Power Supply used.
USB
What is the expected behavior?
Using ESPHome with the
idf_send_async
option to make it useesp_mqtt_client_enqueue()
, I expected it to send my queued messages promptly.(I set this option because I didn't like it when
esp_mqtt_client_publish()
blocks for 20 seconds when the network is down).What is the actual behavior?
Each time around the loop in
esp_mqtt_task()
, it attempts to send precisely one message, then goes to sleep for another second before sending the next, leading to a large backlog and lost messages.Steps to reproduce.
Set
idf_send_async
in an ESPHome configuration and watch how long it takes for MQTT messages to be sent.Debug Logs.
More Information.
This patch helps a bit, by flushing the whole queue every time. But there's still up to a second (MQTT_POLL_READ_TIMEOUT_MS) before each message is sent. Can we wake the task when a message is queued?
The text was updated successfully, but these errors were encountered: