Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) #15458

Open
3 tasks done
dwmw2 opened this issue Feb 24, 2025 · 2 comments
Open
3 tasks done

Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) #15458

dwmw2 opened this issue Feb 24, 2025 · 2 comments
Labels
Status: Opened Issue is new Type: Bug bugs in IDF

Comments

@dwmw2
Copy link

dwmw2 commented Feb 24, 2025

Answers checklist.

  • I have read the documentation ESP-IDF Programming Guide and the issue is not addressed there.
  • I have updated my IDF branch (master or release) to the latest version and checked that the issue is present there.
  • I have searched the issue tracker for a similar issue and not found a similar issue.

IDF version.

v5.1.5

Espressif SoC revision.

ESP32-S3

Operating System used.

Linux

How did you build your project?

Other (please specify in More Information)

If you are using Windows, please specify command line type.

None

Development Kit.

Custom Board

Power Supply used.

USB

What is the expected behavior?

Using ESPHome with the idf_send_async option to make it use esp_mqtt_client_enqueue(), I expected it to send my queued messages promptly.

(I set this option because I didn't like it when esp_mqtt_client_publish() blocks for 20 seconds when the network is down).

What is the actual behavior?

Each time around the loop in esp_mqtt_task(), it attempts to send precisely one message, then goes to sleep for another second before sending the next, leading to a large backlog and lost messages.

Steps to reproduce.

Set idf_send_async in an ESPHome configuration and watch how long it takes for MQTT messages to be sent.

Debug Logs.


More Information.

This patch helps a bit, by flushing the whole queue every time. But there's still up to a second (MQTT_POLL_READ_TIMEOUT_MS) before each message is sent. Can we wake the task when a message is queued?

--- components/mqtt/esp-mqtt/mqtt_client.c	2025-02-24 15:40:44.868028678 +0000
+++ components/mqtt/esp-mqtt/mqtt_client.c~	2024-10-31 17:03:14.000000000 +0000
@@ -1662,13 +1662,16 @@ static void esp_mqtt_task(void *pv)
 
 
             // resend all non-transmitted messages first
-            outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
+            outbox_item_handle_t item;
+	    do {
+	    item = outbox_dequeue(client->outbox, QUEUED, NULL);
             if (item) {
                 if (mqtt_resend_queued(client, item) == ESP_OK) {
                     if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
                         // delete all qos0 publish messages once we process them
                         if (outbox_delete_item(client->outbox, item) != ESP_OK) {
                             ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
+			    break;
                         }
                     }
                     if (client->mqtt_state.pending_publish_qos > 0) {
@@ -1679,7 +1682,7 @@ static void esp_mqtt_task(void *pv)
                         }
 #endif
                     }
-                }
+                } else break;
                 // resend other "transmitted" messages after 1s
             } else if (has_timed_out(last_retransmit, client->config->message_retransmit_timeout)) {
                 last_retransmit = platform_tick_get_ms();
@@ -1691,9 +1694,10 @@ static void esp_mqtt_task(void *pv)
                             esp_mqtt5_increment_packet_counter(client);
                         }
 #endif
-                    }
+                    } else break;
                 }
-            }
+            } else break;
+	    } while (item /* && this thread doesn't need to voluntarily yield */);
 
             if (process_keepalive(client) != ESP_OK) {
                 break;
@dwmw2 dwmw2 added the Type: Bug bugs in IDF label Feb 24, 2025
@github-actions github-actions bot changed the title Message queued with esp_mqtt_client_enqueue() are delayed Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) Feb 24, 2025
@espressif-bot espressif-bot added the Status: Opened Issue is new label Feb 24, 2025
@dwmw2
Copy link
Author

dwmw2 commented Feb 25, 2025

Perhaps max_poll_timeout() should return 10ms instead of max_timeout if there are any QUEUED messages in the outbox?

@dwmw2
Copy link
Author

dwmw2 commented Feb 25, 2025

This helps to flush the already-queued messages without waiting a whole second between each one, but it doesn't solve the problem that it's a whole second before the first message is sent, after calling esp_mqtt_client_publish(). I can't see a way in FreeRTOS to wake a thread that's waiting in esp_transport_poll_read() / select().

Using esp_mqtt_client_publish() to attempt to publish it directly and fall back to queuing doesn't seem to be an option because that can block for a long time. If there was a way to use esp_mqtt_client_publish() with a much shorter network timeout (and without aborting the connection if it does time out) then that might work?

--- mqtt_client.c.orig	2025-02-25 10:41:20.280544113 +0000
+++ mqtt_client.c	2025-02-25 10:45:39.571513453 +0000
@@ -1577,6 +1577,7 @@ static void esp_mqtt_task(void *pv)
     client->state = MQTT_STATE_INIT;
     xEventGroupClearBits(client->status_bits, STOPPED_BIT);
     while (client->run) {
+        uint32_t next_timeout = MQTT_POLL_READ_TIMEOUT_MS;
         MQTT_API_LOCK(client);
         run_event_loop(client);
         // delete long pending messages
@@ -1665,6 +1666,7 @@ static void esp_mqtt_task(void *pv)
             outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
             if (item) {
                 if (mqtt_resend_queued(client, item) == ESP_OK) {
+                    next_timeout = 10;
                     if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
                         // delete all qos0 publish messages once we process them
                         if (outbox_delete_item(client->outbox, item) != ESP_OK) {
@@ -1693,7 +1695,7 @@ static void esp_mqtt_task(void *pv)
 #endif
                     }
                 }
-            }
+            } // else reduce next_timeout if it *will* time out before the max. */
 
             if (process_keepalive(client) != ESP_OK) {
                 break;
@@ -1704,7 +1706,7 @@ static void esp_mqtt_task(void *pv)
                 ESP_LOGD(TAG, "Refreshing the connection...");
                 esp_mqtt_abort_connection(client);
                 client->state = MQTT_STATE_INIT;
-            }
+            } // else reduce next_timeout if it *will* time out before the max. */
 
             break;
         case MQTT_STATE_WAIT_RECONNECT:
@@ -1733,7 +1735,7 @@ static void esp_mqtt_task(void *pv)
         }
         MQTT_API_UNLOCK(client);
         if (MQTT_STATE_CONNECTED == client->state) {
-            if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
+            if (esp_transport_poll_read(client->transport, max_poll_timeout(client, next_timeout)) < 0) {
                 ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
                 esp_mqtt_abort_connection(client);
             }

dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).
dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Opened Issue is new Type: Bug bugs in IDF
Projects
None yet
Development

No branches or pull requests

2 participants