From ef6585681fc9babb1079977b6cd1829e96952b19 Mon Sep 17 00:00:00 2001 From: David Woodhouse Date: Tue, 25 Feb 2025 15:02:33 +0000 Subject: [PATCH] WIP: Submit esp-mqtt sub/unsub/pub requests asynchronously The ESP-IDF MQTT component is fairly unusable for low-latency setups such as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly from the MQTT component's loop() function, or when publishing status updates for other components. This may block for up to 20 seconds(!!) in adverse network conditions. With the `idf_send_async` option, subscribe and unsubscribe requests can still block the loop thread for multiple seconds, but publishing sensor updates is queued for esp-mqtt's own thread to actually send them. Which it does very slowly, no more than one per second, as discussed in https://github.com/espressif/esp-idf/issues/15458 And to top it all off, even with `idf_send_async` set, the so-called 'asynchronous' send can still block for ten seconds because it takes the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs its loop and a network send is timing out. This is reported in https://github.com/espressif/esp-idf/issues/13078 The only way I can see to use esp-mqtt sanely is to use a thread of our own, queueing all sub/unsub/publish requests and invoking the esp-mqtt APIs from that thread. The existing RingBuffer abstraction works nicely for this as it already handles all the atomicity and waking when data are available. I've chosen to avoid allocations by passing the actual data through the ringbuffer, which means we impose a hard limit on the total topic+payload size for each request. An alternative would be to allocate copies in the enqueue() function and to pass *pointers* through the ringbuffer (which could be a different type of queue then, if we wanted to reinvent things). Fixes: #6810 --- .../components/mqtt/mqtt_backend_esp32.cpp | 63 +++++++++++++++++++ esphome/components/mqtt/mqtt_backend_esp32.h | 59 ++++++++++++++++- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/esphome/components/mqtt/mqtt_backend_esp32.cpp b/esphome/components/mqtt/mqtt_backend_esp32.cpp index 2cccb957eb3e..b8a0f2318cbe 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.cpp +++ b/esphome/components/mqtt/mqtt_backend_esp32.cpp @@ -94,6 +94,14 @@ bool MQTTBackendESP32::initialize_() { } else { mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP; } +#endif +#ifdef ESPHOME_MQTT_THREAD + this->ring_buffer_ = RingBuffer::create(MQTT_BUFFER_SIZE); + xTaskCreate(esphome_mqtt_task, "esphome_mqtt", TASK_STACK_SIZE, (void *) this, TASK_PRIORITY, &this->task_handle_); + if (this->task_handle_ == nullptr) { + ESP_LOGE(TAG, "Failed to start MQTT thread"); + return false; + } #endif auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_); if (mqtt_client) { @@ -188,6 +196,61 @@ void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t b } } +#ifdef ESPHOME_MQTT_THREAD +void MQTTBackendESP32::esphome_mqtt_task(void *params) { + MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params; + char recv_buf[MQTT_BUFFER_SIZE]; + const char *topic = recv_buf; + const char *payload; + + while (1) { + struct RingElement elem; + size_t r; + + r = this_mqtt->ring_buffer_->read(&elem, sizeof(elem), portMAX_DELAY); + if (r != sizeof(elem) || elem.topic_len > MQTT_BUFFER_SIZE || elem.payload_len > MQTT_BUFFER_SIZE || + elem.topic_len + elem.payload_len > MQTT_BUFFER_SIZE) { + ESP_LOGE(TAG, "Invalid header read %zd from MQTT ring buffer", r); + this_mqtt->ring_buffer_->reset(); + continue; + } + + r = this_mqtt->ring_buffer_->read(recv_buf, elem.topic_len, portMAX_DELAY); + if (r != elem.topic_len || recv_buf[elem.topic_len - 1] != '\0') { + ESP_LOGE(TAG, "Invalid topic read (%zd of %zd) from MQTT ring buffer", r, elem.topic_len); + this_mqtt->ring_buffer_->reset(); + continue; + } + + r = this_mqtt->ring_buffer_->read(recv_buf + elem.topic_len, elem.payload_len, portMAX_DELAY); + if (r != elem.payload_len) { + ESP_LOGE(TAG, "Invalid payload read (%zd of %zd) from MQTT ring buffer", r, elem.payload_len); + this_mqtt->ring_buffer_->reset(); + continue; + } + + switch (elem.type) { + case MQTT_EVENT_SUBSCRIBED: + esp_mqtt_client_subscribe(this_mqtt->handler_.get(), topic, elem.qos); + break; + + case MQTT_EVENT_UNSUBSCRIBED: + esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), topic); + break; + + case MQTT_EVENT_PUBLISHED: + payload = &recv_buf[elem.topic_len]; + esp_mqtt_client_publish(this_mqtt->handler_.get(), topic, payload, elem.payload_len, elem.qos, elem.retain); + break; + + default: + ESP_LOGE(TAG, "Invalid operation type from MQTT ring buffer"); + this_mqtt->ring_buffer_->reset(); + break; + } + } +} +#endif } // namespace mqtt } // namespace esphome #endif // USE_ESP32 diff --git a/esphome/components/mqtt/mqtt_backend_esp32.h b/esphome/components/mqtt/mqtt_backend_esp32.h index 9054702115a9..afb66039e8ef 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.h +++ b/esphome/components/mqtt/mqtt_backend_esp32.h @@ -4,11 +4,14 @@ #ifdef USE_MQTT #ifdef USE_ESP32 +#define ESPHOME_MQTT_THREAD + #include #include #include #include "esphome/components/network/ip_address.h" #include "esphome/core/helpers.h" +#include "esphome/core/ring_buffer.h" namespace esphome { namespace mqtt { @@ -42,9 +45,19 @@ struct Event { error_handle(*event.error_handle) {} }; +struct RingElement { + esp_mqtt_event_id_t type; + int qos; + bool retain; + uint32_t topic_len; + uint32_t payload_len; +}; + class MQTTBackendESP32 final : public MQTTBackend { public: static const size_t MQTT_BUFFER_SIZE = 4096; + static const size_t TASK_STACK_SIZE = 4096 + MQTT_BUFFER_SIZE; + static const ssize_t TASK_PRIORITY = 5; void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } void set_client_id(const char *client_id) final { this->client_id_ = client_id; } @@ -105,12 +118,24 @@ class MQTTBackendESP32 final : public MQTTBackend { } bool subscribe(const char *topic, uint8_t qos) final { +#ifdef ESPHOME_MQTT_THREAD + return enqueue(MQTT_EVENT_SUBSCRIBED, topic, qos); +#else return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1; +#endif + } + bool unsubscribe(const char *topic) final { +#ifdef ESPHOME_MQTT_THREAD + return enqueue(MQTT_EVENT_UNSUBSCRIBED, topic); +#else + return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; +#endif } - bool unsubscribe(const char *topic) final { return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; } bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final { -#if defined(USE_MQTT_IDF_ENQUEUE) +#ifdef ESPHOME_MQTT_THREAD + return enqueue(MQTT_EVENT_PUBLISHED, topic, qos, retain, payload, length); +#elif defined(USE_MQTT_IDF_ENQUEUE) // use the non-blocking version // it can delay sending a couple of seconds but won't block return esp_mqtt_client_enqueue(handler_.get(), topic, payload, length, qos, retain, true) != -1; @@ -160,6 +185,36 @@ class MQTTBackendESP32 final : public MQTTBackend { optional cl_certificate_; optional cl_key_; bool skip_cert_cn_check_{false}; +#ifdef ESPHOME_MQTT_THREAD + static void esphome_mqtt_task(void *params); + std::unique_ptr ring_buffer_; + TaskHandle_t task_handle_; + bool enqueue(esp_mqtt_event_id_t type, const char *topic, int qos = 0, bool retain = false, + const char *payload = NULL, size_t len = 0) { + struct RingElement elem; + + elem.type = type; + elem.qos = qos; + elem.retain = retain; + elem.topic_len = strlen(topic) + 1; // include the trailing NUL + elem.payload_len = len; + + /* + * Assert that current_task == loop_task_handle, which is what makes it + * OK that these writes (and the free() check) aren't atomic. + */ + + if (this->ring_buffer_->free() < sizeof(elem) + elem.topic_len + len) + return false; + + this->ring_buffer_->write(&elem, sizeof(elem)); + this->ring_buffer_->write(topic, elem.topic_len); + if (len) + this->ring_buffer_->write(payload, len); + + return true; + } +#endif // callbacks CallbackManager on_connect_;