From b56eeec9d97b329539d2830d185e16aafcbb903d Mon Sep 17 00:00:00 2001 From: hermlon Date: Tue, 3 Sep 2024 22:28:43 +0200 Subject: [PATCH] migrated to lwmqtt --- esphome/components/mqtt/__init__.py | 2 +- .../components/mqtt/mqtt_backend_esp8266.cpp | 116 ++++++++++-------- .../components/mqtt/mqtt_backend_esp8266.h | 54 ++++---- 3 files changed, 91 insertions(+), 81 deletions(-) diff --git a/esphome/components/mqtt/__init__.py b/esphome/components/mqtt/__init__.py index 5ec1ddb6e3..40570f0d01 100644 --- a/esphome/components/mqtt/__init__.py +++ b/esphome/components/mqtt/__init__.py @@ -303,7 +303,7 @@ async def to_code(config): await cg.register_component(var, config) # Add required libraries for ESP8266 and LibreTiny if CORE.is_esp8266 or CORE.is_libretiny: - cg.add_library("knolleary/PubSubClient", "2.8") + cg.add_library("256dpi/MQTT", "2.5.2") cg.add_define("USE_MQTT") cg.add_global(mqtt_ns.using) diff --git a/esphome/components/mqtt/mqtt_backend_esp8266.cpp b/esphome/components/mqtt/mqtt_backend_esp8266.cpp index a06fe3ce04..2fc9ce3b6d 100644 --- a/esphome/components/mqtt/mqtt_backend_esp8266.cpp +++ b/esphome/components/mqtt/mqtt_backend_esp8266.cpp @@ -9,15 +9,12 @@ namespace mqtt { static const char *const TAG = "mqtt-backend-esp8266"; -MQTTBackendESP8266 *object; - -void MQTTBackendESP8266::on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length) { - object->on_mqtt_message_(topic, reinterpret_cast(payload), length); +void MQTTBackendESP8266::on_mqtt_message_wrapper_(MQTTClient *client, char topic[], char bytes[], int length) { + static_cast(client->ref)->on_mqtt_message_(client, topic, bytes, length); } -void MQTTBackendESP8266::on_mqtt_message_(const char *topic, const char *payload, unsigned int length) { - /* no fragmented messages supported, so current_data_offset = 0 and total_data_len = length*/ - this->on_message_.call(topic, payload, length, 0, length); +void MQTTBackendESP8266::on_mqtt_message_(MQTTClient *client, char topic[], char bytes[], int length) { + this->on_message_.call(topic, bytes, length, 0, length); } void MQTTBackendESP8266::initialize_() { @@ -31,61 +28,72 @@ void MQTTBackendESP8266::initialize_() { } #endif - object = this; - mqtt_client_.setCallback(MQTTBackendESP8266::on_mqtt_message_wrapper_); + this->mqtt_client_.ref = this; + mqtt_client_.onMessageAdvanced(MQTTBackendESP8266::on_mqtt_message_wrapper_); this->is_initalized_ = true; } -void MQTTBackendESP8266::loop() { - if (!this->is_initalized_) - return; - if (this->mqtt_client_.loop()) { - if (!this->is_connected_) { - this->is_connected_ = true; - /* - * PubSubClient doesn't expose session_present flag in CONNACK, passing the clean_session flag - * assumes the broker remembered it correctly - */ - this->on_connect_.call(this->clean_session_); - } - } else { - if (this->is_connected_) { - this->is_connected_ = false; - MQTTClientDisconnectReason reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; - switch (this->mqtt_client_.state()) { - case MQTT_CONNECTION_TIMEOUT: - case MQTT_CONNECTION_LOST: - case MQTT_CONNECT_FAILED: - case MQTT_DISCONNECTED: - reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; - break; - case MQTT_CONNECT_BAD_PROTOCOL: - reason = MQTTClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION; - break; - case MQTT_CONNECT_BAD_CLIENT_ID: - reason = MQTTClientDisconnectReason::MQTT_IDENTIFIER_REJECTED; - break; - case MQTT_CONNECT_UNAVAILABLE: - reason = MQTTClientDisconnectReason::MQTT_SERVER_UNAVAILABLE; - break; - case MQTT_CONNECT_BAD_CREDENTIALS: - reason = MQTTClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS; - break; - case MQTT_CONNECT_UNAUTHORIZED: - reason = MQTTClientDisconnectReason::MQTT_NOT_AUTHORIZED; - break; - case MQTT_CONNECTED: - assert(false); - break; - } - this->on_disconnect_.call(reason); - } +void MQTTBackendESP8266::handleErrors_() { + lwmqtt_err_t error = this->mqtt_client_.lastError(); + lwmqtt_return_code_t return_code = this->mqtt_client_.returnCode(); + if (error != LWMQTT_SUCCESS) { + ESP_LOGD(TAG, "Error: %d, returnCode: %d", error, return_code); + char buffer[128]; int code = this->wifi_client_.getLastSSLError(buffer, sizeof(buffer)); if (code != 0) { ESP_LOGD(TAG, "SSL error code %d: %s", code, buffer); - this->disconnect(); } + + MQTTClientDisconnectReason reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; + + if (return_code != LWMQTT_CONNECTION_ACCEPTED) { + switch (return_code) { + case LWMQTT_CONNECTION_ACCEPTED: + assert(false); + break; + case LWMQTT_UNACCEPTABLE_PROTOCOL: + reason = MQTTClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION; + break; + case LWMQTT_IDENTIFIER_REJECTED: + reason = MQTTClientDisconnectReason::MQTT_IDENTIFIER_REJECTED; + break; + case LWMQTT_SERVER_UNAVAILABLE: + reason = MQTTClientDisconnectReason::MQTT_SERVER_UNAVAILABLE; + break; + case LWMQTT_BAD_USERNAME_OR_PASSWORD: + reason = MQTTClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS; + break; + case LWMQTT_NOT_AUTHORIZED: + reason = MQTTClientDisconnectReason::MQTT_NOT_AUTHORIZED; + break; + case LWMQTT_UNKNOWN_RETURN_CODE: + reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; + break; + } + } + this->on_disconnect_.call(reason); + } +} + +void MQTTBackendESP8266::connect() { + if (!this->is_initalized_) { + this->initialize_(); + } + this->mqtt_client_.begin(this->host_.c_str(), this->port_, this->wifi_client_); + this->mqtt_client_.connect(this->client_id_.c_str(), this->username_.c_str(), this->password_.c_str()); + this->handleErrors_(); +} + +void MQTTBackendESP8266::loop() { + this->mqtt_client_.loop(); + if (!this->is_connected_ && this->mqtt_client_.connected()) { + this->is_connected_ = true; + this->on_connect_.call(this->clean_session_); + } + if (this->is_connected_ && !this->mqtt_client_.connected()) { + this->is_connected_ = false; + this->on_disconnect_.call(MQTTClientDisconnectReason::TCP_DISCONNECTED); } } diff --git a/esphome/components/mqtt/mqtt_backend_esp8266.h b/esphome/components/mqtt/mqtt_backend_esp8266.h index 9ab9e431e3..bc0b902533 100644 --- a/esphome/components/mqtt/mqtt_backend_esp8266.h +++ b/esphome/components/mqtt/mqtt_backend_esp8266.h @@ -10,16 +10,19 @@ #include #include -#include +#include namespace esphome { namespace mqtt { class MQTTBackendESP8266 final : public MQTTBackend { public: - void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } + void set_keep_alive(uint16_t keep_alive) final { this->mqtt_client_.setKeepAlive(keep_alive); } void set_client_id(const char *client_id) final { this->client_id_ = client_id; } - void set_clean_session(bool clean_session) final { this->clean_session_ = clean_session; } + void set_clean_session(bool clean_session) final { + this->clean_session_ = clean_session; + this->mqtt_client_.setCleanSession(clean_session); + } void set_credentials(const char *username, const char *password) final { if (username) this->username_ = username; @@ -29,20 +32,19 @@ class MQTTBackendESP8266 final : public MQTTBackend { void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final { if (topic) this->lwt_topic_ = topic; - this->lwt_qos_ = qos; if (payload) this->lwt_message_ = payload; - this->lwt_retain_ = retain; + this->mqtt_client_.setWill(this->lwt_topic_.c_str(), this->lwt_message_.c_str(), retain, qos); } void set_server(network::IPAddress ip, uint16_t port) final { this->host_ = ip.str(); this->port_ = port; - this->mqtt_client_.setServer(ip, port); + this->mqtt_client_.setHost(ip, port); } void set_server(const char *host, uint16_t port) final { this->host_ = host; this->port_ = port; - this->mqtt_client_.setServer(this->host_.c_str(), port); + this->mqtt_client_.setHost(this->host_.c_str(), port); } void set_on_connect(std::function &&callback) final { @@ -65,23 +67,25 @@ class MQTTBackendESP8266 final : public MQTTBackend { } bool connected() const final { return this->is_connected_; } - void connect() final { - if (!this->is_initalized_) { - this->initialize_(); - } - this->mqtt_client_.connect(this->client_id_.c_str(), this->username_.c_str(), this->password_.c_str(), - this->lwt_topic_.c_str(), this->lwt_qos_, this->lwt_retain_, this->lwt_message_.c_str(), - this->clean_session_); - } + void connect() final; void disconnect() final { if (this->is_initalized_) this->mqtt_client_.disconnect(); } - bool subscribe(const char *topic, uint8_t qos) final { return this->mqtt_client_.subscribe(topic, qos); } - bool unsubscribe(const char *topic) final { return this->mqtt_client_.unsubscribe(topic); } + bool subscribe(const char *topic, uint8_t qos) final { + bool res = this->mqtt_client_.subscribe(topic, qos); + this->on_subscribe_.call(this->mqtt_client_.lastPacketID(), qos); + return res; + } + bool unsubscribe(const char *topic) final { + bool res = this->mqtt_client_.unsubscribe(topic); + this->on_unsubscribe_.call(this->mqtt_client_.lastPacketID()); + return res; + } bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final { - /* qos parameter is ignored, as PubSubClient can only publish QoS 0 messages */ - return this->mqtt_client_.publish(topic, reinterpret_cast(payload), length, retain); + bool res = this->mqtt_client_.publish(topic, payload, length, retain, qos); + this->on_publish_.call(this->mqtt_client_.lastPacketID()); + return res; } using MQTTBackend::publish; @@ -92,30 +96,28 @@ class MQTTBackendESP8266 final : public MQTTBackend { protected: void initialize_(); - static void on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length); - void on_mqtt_message_(const char *topic, const char *payload, unsigned int length); + void handleErrors_(); + static void on_mqtt_message_wrapper_(MQTTClient *client, char topic[], char bytes[], int length); + void on_mqtt_message_(MQTTClient *client, char topic[], char bytes[], int length); #ifdef USE_MQTT_SECURE_CLIENT WiFiClientSecure wifi_client_; #else WiFiClient wifi_client_; #endif - PubSubClient mqtt_client_{wifi_client_}; + MQTTClient mqtt_client_; bool is_connected_{false}; bool is_initalized_{false}; std::string host_; uint16_t port_; + bool clean_session_{true}; std::string username_; std::string password_; std::string lwt_topic_; std::string lwt_message_; - uint8_t lwt_qos_; - bool lwt_retain_; std::string client_id_; - uint16_t keep_alive_; - bool clean_session_; optional ca_certificate_str_; BearSSL::X509List ca_certificate_; bool skip_cert_cn_check_{false};