Skip to content

Commit

Permalink
WIP: Submit esp-mqtt sub/unsub/pub requests asynchronously
Browse files Browse the repository at this point in the history
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
  • Loading branch information
dwmw2 committed Feb 25, 2025
1 parent bfa3254 commit ef65856
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
63 changes: 63 additions & 0 deletions esphome/components/mqtt/mqtt_backend_esp32.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ bool MQTTBackendESP32::initialize_() {
} else {
mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
}
#endif
#ifdef ESPHOME_MQTT_THREAD
this->ring_buffer_ = RingBuffer::create(MQTT_BUFFER_SIZE);
xTaskCreate(esphome_mqtt_task, "esphome_mqtt", TASK_STACK_SIZE, (void *) this, TASK_PRIORITY, &this->task_handle_);
if (this->task_handle_ == nullptr) {
ESP_LOGE(TAG, "Failed to start MQTT thread");
return false;
}
#endif
auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_);
if (mqtt_client) {
Expand Down Expand Up @@ -188,6 +196,61 @@ void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t b
}
}

#ifdef ESPHOME_MQTT_THREAD
void MQTTBackendESP32::esphome_mqtt_task(void *params) {
MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
char recv_buf[MQTT_BUFFER_SIZE];
const char *topic = recv_buf;
const char *payload;

while (1) {
struct RingElement elem;
size_t r;

r = this_mqtt->ring_buffer_->read(&elem, sizeof(elem), portMAX_DELAY);
if (r != sizeof(elem) || elem.topic_len > MQTT_BUFFER_SIZE || elem.payload_len > MQTT_BUFFER_SIZE ||
elem.topic_len + elem.payload_len > MQTT_BUFFER_SIZE) {
ESP_LOGE(TAG, "Invalid header read %zd from MQTT ring buffer", r);
this_mqtt->ring_buffer_->reset();
continue;
}

r = this_mqtt->ring_buffer_->read(recv_buf, elem.topic_len, portMAX_DELAY);
if (r != elem.topic_len || recv_buf[elem.topic_len - 1] != '\0') {
ESP_LOGE(TAG, "Invalid topic read (%zd of %zd) from MQTT ring buffer", r, elem.topic_len);
this_mqtt->ring_buffer_->reset();
continue;
}

r = this_mqtt->ring_buffer_->read(recv_buf + elem.topic_len, elem.payload_len, portMAX_DELAY);
if (r != elem.payload_len) {
ESP_LOGE(TAG, "Invalid payload read (%zd of %zd) from MQTT ring buffer", r, elem.payload_len);
this_mqtt->ring_buffer_->reset();
continue;
}

switch (elem.type) {
case MQTT_EVENT_SUBSCRIBED:
esp_mqtt_client_subscribe(this_mqtt->handler_.get(), topic, elem.qos);
break;

case MQTT_EVENT_UNSUBSCRIBED:
esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), topic);
break;

case MQTT_EVENT_PUBLISHED:
payload = &recv_buf[elem.topic_len];
esp_mqtt_client_publish(this_mqtt->handler_.get(), topic, payload, elem.payload_len, elem.qos, elem.retain);
break;

default:
ESP_LOGE(TAG, "Invalid operation type from MQTT ring buffer");
this_mqtt->ring_buffer_->reset();
break;
}
}
}
#endif
} // namespace mqtt
} // namespace esphome
#endif // USE_ESP32
Expand Down
59 changes: 57 additions & 2 deletions esphome/components/mqtt/mqtt_backend_esp32.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
#ifdef USE_MQTT
#ifdef USE_ESP32

#define ESPHOME_MQTT_THREAD

#include <string>
#include <queue>
#include <mqtt_client.h>
#include "esphome/components/network/ip_address.h"
#include "esphome/core/helpers.h"
#include "esphome/core/ring_buffer.h"

namespace esphome {
namespace mqtt {
Expand Down Expand Up @@ -42,9 +45,19 @@ struct Event {
error_handle(*event.error_handle) {}
};

struct RingElement {
esp_mqtt_event_id_t type;
int qos;
bool retain;
uint32_t topic_len;
uint32_t payload_len;
};

class MQTTBackendESP32 final : public MQTTBackend {
public:
static const size_t MQTT_BUFFER_SIZE = 4096;
static const size_t TASK_STACK_SIZE = 4096 + MQTT_BUFFER_SIZE;
static const ssize_t TASK_PRIORITY = 5;

void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
Expand Down Expand Up @@ -105,12 +118,24 @@ class MQTTBackendESP32 final : public MQTTBackend {
}

bool subscribe(const char *topic, uint8_t qos) final {
#ifdef ESPHOME_MQTT_THREAD
return enqueue(MQTT_EVENT_SUBSCRIBED, topic, qos);
#else
return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1;
#endif
}
bool unsubscribe(const char *topic) final {
#ifdef ESPHOME_MQTT_THREAD
return enqueue(MQTT_EVENT_UNSUBSCRIBED, topic);
#else
return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1;
#endif
}
bool unsubscribe(const char *topic) final { return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1; }

bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
#if defined(USE_MQTT_IDF_ENQUEUE)
#ifdef ESPHOME_MQTT_THREAD
return enqueue(MQTT_EVENT_PUBLISHED, topic, qos, retain, payload, length);
#elif defined(USE_MQTT_IDF_ENQUEUE)
// use the non-blocking version
// it can delay sending a couple of seconds but won't block
return esp_mqtt_client_enqueue(handler_.get(), topic, payload, length, qos, retain, true) != -1;
Expand Down Expand Up @@ -160,6 +185,36 @@ class MQTTBackendESP32 final : public MQTTBackend {
optional<std::string> cl_certificate_;
optional<std::string> cl_key_;
bool skip_cert_cn_check_{false};
#ifdef ESPHOME_MQTT_THREAD
static void esphome_mqtt_task(void *params);
std::unique_ptr<RingBuffer> ring_buffer_;
TaskHandle_t task_handle_;
bool enqueue(esp_mqtt_event_id_t type, const char *topic, int qos = 0, bool retain = false,
const char *payload = NULL, size_t len = 0) {
struct RingElement elem;

elem.type = type;
elem.qos = qos;
elem.retain = retain;
elem.topic_len = strlen(topic) + 1; // include the trailing NUL
elem.payload_len = len;

/*
* Assert that current_task == loop_task_handle, which is what makes it
* OK that these writes (and the free() check) aren't atomic.
*/

if (this->ring_buffer_->free() < sizeof(elem) + elem.topic_len + len)
return false;

this->ring_buffer_->write(&elem, sizeof(elem));
this->ring_buffer_->write(topic, elem.topic_len);
if (len)
this->ring_buffer_->write(payload, len);

return true;
}
#endif

// callbacks
CallbackManager<on_connect_callback_t> on_connect_;
Expand Down

0 comments on commit ef65856

Please sign in to comment.