From 034cbe9bec124baffd29becdfd51814e79709318 Mon Sep 17 00:00:00 2001 From: hermlon Date: Wed, 28 Feb 2024 22:03:52 +0100 Subject: [PATCH] Switched mqtt backend for esp8266 to PubSubClient --- esphome/components/mqtt/__init__.py | 49 ++----- .../components/mqtt/mqtt_backend_esp8266.cpp | 86 +++++++++++ .../components/mqtt/mqtt_backend_esp8266.h | 135 +++++++++++++----- esphome/components/mqtt/mqtt_client.cpp | 7 - esphome/components/mqtt/mqtt_client.h | 21 +-- 5 files changed, 207 insertions(+), 91 deletions(-) create mode 100644 esphome/components/mqtt/mqtt_backend_esp8266.cpp diff --git a/esphome/components/mqtt/__init__.py b/esphome/components/mqtt/__init__.py index 240b407819..87ba59a7f5 100644 --- a/esphome/components/mqtt/__init__.py +++ b/esphome/components/mqtt/__init__.py @@ -1,5 +1,3 @@ -import re - from esphome import automation from esphome.automation import Condition import esphome.codegen as cg @@ -38,7 +36,6 @@ from esphome.const import ( CONF_REBOOT_TIMEOUT, CONF_RETAIN, CONF_SHUTDOWN_MESSAGE, - CONF_SSL_FINGERPRINTS, CONF_STATE_TOPIC, CONF_TOPIC, CONF_TOPIC_PREFIX, @@ -50,10 +47,15 @@ from esphome.const import ( PLATFORM_ESP32, PLATFORM_ESP8266, ) -from esphome.core import CORE, coroutine_with_priority +from esphome.core import coroutine_with_priority, CORE DEPENDENCIES = ["network"] +# required for WifiSecureClient to have current time to validate the certificate +if CORE.is_esp8266: + DEPENDENCIES.append("time") + +AUTO_LOAD = ["json"] def AUTO_LOAD(): if CORE.is_esp8266 or CORE.is_libretiny: @@ -194,13 +196,6 @@ def validate_config(value): return out -def validate_fingerprint(value): - value = cv.string(value) - if re.match(r"^[0-9a-f]{40}$", value) is None: - raise cv.Invalid("fingerprint must be valid SHA1 hash") - return value - - CONFIG_SCHEMA = cv.All( cv.Schema( { @@ -213,18 +208,14 @@ CONFIG_SCHEMA = cv.All( cv.SplitDefault(CONF_IDF_SEND_ASYNC, esp32_idf=False): cv.All( cv.boolean, cv.only_with_esp_idf ), - cv.Optional(CONF_CERTIFICATE_AUTHORITY): cv.All( - cv.string, cv.only_with_esp_idf - ), + cv.Optional(CONF_CERTIFICATE_AUTHORITY): cv.All(cv.string), cv.Inclusive(CONF_CLIENT_CERTIFICATE, "cert-key-pair"): cv.All( cv.string, cv.only_on_esp32 ), cv.Inclusive(CONF_CLIENT_CERTIFICATE_KEY, "cert-key-pair"): cv.All( cv.string, cv.only_on_esp32 ), - cv.SplitDefault(CONF_SKIP_CERT_CN_CHECK, esp32_idf=False): cv.All( - cv.boolean, cv.only_with_esp_idf - ), + cv.Optional(CONF_SKIP_CERT_CN_CHECK, default=False): cv.All(cv.boolean), cv.Optional(CONF_DISCOVERY, default=True): cv.Any( cv.boolean, cv.one_of("CLEAN", upper=True) ), @@ -253,9 +244,6 @@ CONFIG_SCHEMA = cv.All( ), validate_message_just_topic, ), - cv.Optional(CONF_SSL_FINGERPRINTS): cv.All( - cv.only_on_esp8266, cv.ensure_list(validate_fingerprint) - ), cv.Optional(CONF_KEEPALIVE, default="15s"): cv.positive_time_period_seconds, cv.Optional( CONF_REBOOT_TIMEOUT, default="15min" @@ -315,8 +303,7 @@ async def to_code(config): await cg.register_component(var, config) # Add required libraries for ESP8266 and LibreTiny if CORE.is_esp8266 or CORE.is_libretiny: - # https://github.com/heman/async-mqtt-client/blob/master/library.json - cg.add_library("heman/AsyncMqttClient-esphome", "2.0.0") + cg.add_library("knolleary/PubSubClient", "2.8") cg.add_define("USE_MQTT") cg.add_global(mqtt_ns.using) @@ -392,33 +379,25 @@ async def to_code(config): if CONF_LEVEL in log_topic: cg.add(var.set_log_level(logger.LOG_LEVELS[log_topic[CONF_LEVEL]])) - if CONF_SSL_FINGERPRINTS in config: - for fingerprint in config[CONF_SSL_FINGERPRINTS]: - arr = [ - cg.RawExpression(f"0x{fingerprint[i:i + 2]}") for i in range(0, 40, 2) - ] - cg.add(var.add_ssl_fingerprint(arr)) - cg.add_build_flag("-DASYNC_TCP_SSL_ENABLED=1") - cg.add(var.set_keep_alive(config[CONF_KEEPALIVE])) cg.add(var.set_reboot_timeout(config[CONF_REBOOT_TIMEOUT])) - # esp-idf only if CONF_CERTIFICATE_AUTHORITY in config: + cg.add_define("USE_MQTT_SECURE_CLIENT") cg.add(var.set_ca_certificate(config[CONF_CERTIFICATE_AUTHORITY])) cg.add(var.set_skip_cert_cn_check(config[CONF_SKIP_CERT_CN_CHECK])) if CONF_CLIENT_CERTIFICATE in config: cg.add(var.set_cl_certificate(config[CONF_CLIENT_CERTIFICATE])) cg.add(var.set_cl_key(config[CONF_CLIENT_CERTIFICATE_KEY])) - # prevent error -0x428e - # See https://github.com/espressif/esp-idf/issues/139 - add_idf_sdkconfig_option("CONFIG_MBEDTLS_HARDWARE_MPI", False) + if CORE.is_esp32: + # prevent error -0x428e + # See https://github.com/espressif/esp-idf/issues/139 + add_idf_sdkconfig_option("CONFIG_MBEDTLS_HARDWARE_MPI", False) if CONF_IDF_SEND_ASYNC in config and config[CONF_IDF_SEND_ASYNC]: cg.add_define("USE_MQTT_IDF_ENQUEUE") - # end esp-idf for conf in config.get(CONF_ON_MESSAGE, []): trig = cg.new_Pvariable(conf[CONF_TRIGGER_ID], conf[CONF_TOPIC]) diff --git a/esphome/components/mqtt/mqtt_backend_esp8266.cpp b/esphome/components/mqtt/mqtt_backend_esp8266.cpp new file mode 100644 index 0000000000..8822d59c62 --- /dev/null +++ b/esphome/components/mqtt/mqtt_backend_esp8266.cpp @@ -0,0 +1,86 @@ +#include + +#include "mqtt_backend_esp8266.h" + +#include "esphome/core/helpers.h" + +namespace esphome { +namespace mqtt { + +static const char *const TAG = "mqtt-backend-esp8266"; + +MQTTBackendESP8266 *object; + +void MQTTBackendESP8266::on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length) { + object->on_mqtt_message_(topic, reinterpret_cast(payload), length); +} + +void MQTTBackendESP8266::on_mqtt_message_(const char *topic, const char *payload, unsigned int length) { + /* no fragmented messages supported, so current_data_offset = 0 and total_data_len = length*/ + this->on_message_.call(topic, payload, length, 0, length); +} + +void MQTTBackendESP8266::initialize_() { + #ifdef USE_MQTT_SECURE_CLIENT + if (this->ca_certificate_str_.has_value()) { + this->ca_certificate_.append(this->ca_certificate_str_.value().c_str()); + this->wifi_client_.setTrustAnchors(&this->ca_certificate_); + if (this->skip_cert_cn_check_) { + this->wifi_client_.setInsecure(); + } + } + #endif + + object = this; + mqtt_client_.setCallback(MQTTBackendESP8266::on_mqtt_message_wrapper_); + this->is_initalized_ = true; +} + +void MQTTBackendESP8266::loop() { + if (!this->is_initalized_) + return; + if (this->mqtt_client_.loop()) { + if (!this->is_connected_) { + this->is_connected_ = true; + /* + * PubSubClient doesn't expose session_present flag in CONNACK, passing the clean_session flag + * assumes the broker remembered it correctly + */ + this->on_connect_.call(this->clean_session_); + } + } else { + if (this->is_connected_) { + this->is_connected_ = false; + MQTTClientDisconnectReason reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; + switch (this->mqtt_client_.state()) { + case MQTT_CONNECTION_TIMEOUT: + case MQTT_CONNECTION_LOST: + case MQTT_CONNECT_FAILED: + case MQTT_DISCONNECTED: + reason = MQTTClientDisconnectReason::TCP_DISCONNECTED; break; + case MQTT_CONNECT_BAD_PROTOCOL: + reason = MQTTClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION; break; + case MQTT_CONNECT_BAD_CLIENT_ID: + reason = MQTTClientDisconnectReason::MQTT_IDENTIFIER_REJECTED; break; + case MQTT_CONNECT_UNAVAILABLE: + reason = MQTTClientDisconnectReason::MQTT_SERVER_UNAVAILABLE; break; + case MQTT_CONNECT_BAD_CREDENTIALS: + reason = MQTTClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS; break; + case MQTT_CONNECT_UNAUTHORIZED: + reason = MQTTClientDisconnectReason::MQTT_NOT_AUTHORIZED; break; + case MQTT_CONNECTED: + assert(false); break; + } + this->on_disconnect_.call(reason); + } + char buffer[128]; + int code = this->wifi_client_.getLastSSLError(buffer, sizeof(buffer)); + if (code != 0) { + ESP_LOGD(TAG, "SSL error code %d: %s", code, buffer); + this->disconnect(); + } + } +} + +} // namespace mqtt +} // namespace esphome diff --git a/esphome/components/mqtt/mqtt_backend_esp8266.h b/esphome/components/mqtt/mqtt_backend_esp8266.h index a979634bf4..1435d418db 100644 --- a/esphome/components/mqtt/mqtt_backend_esp8266.h +++ b/esphome/components/mqtt/mqtt_backend_esp8266.h @@ -4,67 +4,138 @@ #ifdef USE_MQTT #ifdef USE_ESP8266 -#include +#include "mqtt_backend.h" + +#include "esphome/core/log.h" + +#include +#include +#include namespace esphome { namespace mqtt { class MQTTBackendESP8266 final : public MQTTBackend { public: - void set_keep_alive(uint16_t keep_alive) final { mqtt_client_.setKeepAlive(keep_alive); } - void set_client_id(const char *client_id) final { mqtt_client_.setClientId(client_id); } - void set_clean_session(bool clean_session) final { mqtt_client_.setCleanSession(clean_session); } + void set_keep_alive(uint16_t keep_alive) final { + this->keep_alive_ = keep_alive; + } + void set_client_id(const char *client_id) final { + this->client_id_ = client_id; + } + void set_clean_session(bool clean_session) final { + this->clean_session_ = clean_session; + } void set_credentials(const char *username, const char *password) final { - mqtt_client_.setCredentials(username, password); + if (username) + this->username_ = username; + if (password) + this->password_ = password; } void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final { - mqtt_client_.setWill(topic, qos, retain, payload); + if (topic) + this->lwt_topic_ = topic; + this->lwt_qos_ = qos; + if (payload) + this->lwt_message_ = payload; + this->lwt_retain_ = retain; + } + void set_server(network::IPAddress ip, uint16_t port) final { + ESP_LOGD("mqtt", "setting by ip"); + this->host_ = ip.str(); + this->port_ = port; + this->mqtt_client_.setServer(ip, port); + } + void set_server(const char *host, uint16_t port) final { + ESP_LOGD("mqtt", "setting by host %s, port: %d", host, port); + this->host_ = host; + this->port_ = port; + this->mqtt_client_.setServer(this->host_.c_str(), port); } - void set_server(network::IPAddress ip, uint16_t port) final { mqtt_client_.setServer(ip, port); } - void set_server(const char *host, uint16_t port) final { mqtt_client_.setServer(host, port); } -#if ASYNC_TCP_SSL_ENABLED - void set_secure(bool secure) { mqtt_client.setSecure(secure); } - void add_server_fingerprint(const uint8_t *fingerprint) { mqtt_client.addServerFingerprint(fingerprint); } -#endif void set_on_connect(std::function &&callback) final { - this->mqtt_client_.onConnect(std::move(callback)); + this->on_connect_.add(std::move(callback)); } void set_on_disconnect(std::function &&callback) final { - auto async_callback = [callback](AsyncMqttClientDisconnectReason reason) { - // int based enum so casting isn't a problem - callback(static_cast(reason)); - }; - this->mqtt_client_.onDisconnect(std::move(async_callback)); + this->on_disconnect_.add(std::move(callback)); } void set_on_subscribe(std::function &&callback) final { - this->mqtt_client_.onSubscribe(std::move(callback)); + this->on_subscribe_.add(std::move(callback)); } void set_on_unsubscribe(std::function &&callback) final { - this->mqtt_client_.onUnsubscribe(std::move(callback)); + this->on_unsubscribe_.add(std::move(callback)); } void set_on_message(std::function &&callback) final { - auto async_callback = [callback](const char *topic, const char *payload, - AsyncMqttClientMessageProperties async_properties, size_t len, size_t index, - size_t total) { callback(topic, payload, len, index, total); }; - mqtt_client_.onMessage(std::move(async_callback)); + this->on_message_.add(std::move(callback)); } void set_on_publish(std::function &&callback) final { - this->mqtt_client_.onPublish(std::move(callback)); + this->on_publish_.add(std::move(callback)); } - bool connected() const final { return mqtt_client_.connected(); } - void connect() final { mqtt_client_.connect(); } - void disconnect() final { mqtt_client_.disconnect(true); } - bool subscribe(const char *topic, uint8_t qos) final { return mqtt_client_.subscribe(topic, qos) != 0; } - bool unsubscribe(const char *topic) final { return mqtt_client_.unsubscribe(topic) != 0; } + bool connected() const final { + return this->is_connected_; + } + void connect() final { + if (!this->is_initalized_) { + this->initialize_(); + } + this->mqtt_client_.connect(this->client_id_.c_str(), this->username_.c_str(), this->password_.c_str(), + this->lwt_topic_.c_str(), this->lwt_qos_, this->lwt_retain_, this->lwt_message_.c_str(), this->clean_session_); + } + void disconnect() final { + if (this->is_initalized_) + this->mqtt_client_.disconnect(); + } + bool subscribe(const char *topic, uint8_t qos) final { return this->mqtt_client_.subscribe(topic, qos); } + bool unsubscribe(const char *topic) final { return this->mqtt_client_.unsubscribe(topic); } bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final { - return mqtt_client_.publish(topic, qos, retain, payload, length, false, 0) != 0; + /* qos parameter is ignored, as PubSubClient can only publish QoS 0 messages */ + return this->mqtt_client_.publish(topic, reinterpret_cast(payload), length, retain); } using MQTTBackend::publish; + void loop() final; + + void set_ca_certificate(const std::string &cert) { ca_certificate_str_ = cert; } + void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; } + protected: - AsyncMqttClient mqtt_client_; + void initialize_(); + static void on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length); + void on_mqtt_message_(const char *topic, const char *payload, unsigned int length); + + #ifdef USE_MQTT_SECURE_CLIENT + WiFiClientSecure wifi_client_; + #else + WiFiClient wifi_client_; + #endif + PubSubClient mqtt_client_{wifi_client_}; + + bool is_connected_{false}; + bool is_initalized_{false}; + + std::string host_; + uint16_t port_; + std::string username_; + std::string password_; + std::string lwt_topic_; + std::string lwt_message_; + uint8_t lwt_qos_; + bool lwt_retain_; + std::string client_id_; + uint16_t keep_alive_; + bool clean_session_; + optional ca_certificate_str_; + BearSSL::X509List ca_certificate_; + bool skip_cert_cn_check_{false}; + + // callbacks + CallbackManager on_connect_; + CallbackManager on_disconnect_; + CallbackManager on_subscribe_; + CallbackManager on_unsubscribe_; + CallbackManager on_message_; + CallbackManager on_publish_; }; } // namespace mqtt diff --git a/esphome/components/mqtt/mqtt_client.cpp b/esphome/components/mqtt/mqtt_client.cpp index c19b24c0cf..4dc70419de 100644 --- a/esphome/components/mqtt/mqtt_client.cpp +++ b/esphome/components/mqtt/mqtt_client.cpp @@ -655,13 +655,6 @@ void MQTTClientComponent::set_on_disconnect(mqtt_on_disconnect_callback_t &&call this->mqtt_backend_.set_on_disconnect(std::forward(callback)); } -#if ASYNC_TCP_SSL_ENABLED -void MQTTClientComponent::add_ssl_fingerprint(const std::array &fingerprint) { - this->mqtt_backend_.setSecure(true); - this->mqtt_backend_.addServerFingerprint(fingerprint.data()); -} -#endif - MQTTClientComponent *global_mqtt_client = nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) // MQTTMessageTrigger diff --git a/esphome/components/mqtt/mqtt_client.h b/esphome/components/mqtt/mqtt_client.h index b0d3bbe66d..21e49fbfd8 100644 --- a/esphome/components/mqtt/mqtt_client.h +++ b/esphome/components/mqtt/mqtt_client.h @@ -132,27 +132,14 @@ class MQTTClientComponent : public Component { bool is_discovery_enabled() const; bool is_discovery_ip_enabled() const; -#if ASYNC_TCP_SSL_ENABLED - /** Add a SSL fingerprint to use for TCP SSL connections to the MQTT broker. - * - * To use this feature you first have to globally enable the `ASYNC_TCP_SSL_ENABLED` define flag. - * This function can be called multiple times and any certificate that matches any of the provided fingerprints - * will match. Calling this method will also automatically disable all non-ssl connections. - * - * @warning This is *not* secure and *not* how SSL is usually done. You'll have to add - * a separate fingerprint for every certificate you use. Additionally, the hashing - * algorithm used here due to the constraints of the MCU, SHA1, is known to be insecure. - * - * @param fingerprint The SSL fingerprint as a 20 value long std::array. - */ - void add_ssl_fingerprint(const std::array &fingerprint); -#endif -#ifdef USE_ESP32 void set_ca_certificate(const char *cert) { this->mqtt_backend_.set_ca_certificate(cert); } + void set_skip_cert_cn_check(bool skip_check) { this->mqtt_backend_.set_skip_cert_cn_check(skip_check); } + +#ifdef USE_ESP32 void set_cl_certificate(const char *cert) { this->mqtt_backend_.set_cl_certificate(cert); } void set_cl_key(const char *key) { this->mqtt_backend_.set_cl_key(key); } - void set_skip_cert_cn_check(bool skip_check) { this->mqtt_backend_.set_skip_cert_cn_check(skip_check); } #endif + const Availability &get_availability(); /** Set the topic prefix that will be prepended to all topics together with "/". This will, in most cases,