migrated to lwmqtt

This commit is contained in:
hermlon 2024-09-03 22:28:43 +02:00
parent 81f03772ea
commit b56eeec9d9
3 changed files with 91 additions and 81 deletions

View file

@ -303,7 +303,7 @@ async def to_code(config):
await cg.register_component(var, config) await cg.register_component(var, config)
# Add required libraries for ESP8266 and LibreTiny # Add required libraries for ESP8266 and LibreTiny
if CORE.is_esp8266 or CORE.is_libretiny: if CORE.is_esp8266 or CORE.is_libretiny:
cg.add_library("knolleary/PubSubClient", "2.8") cg.add_library("256dpi/MQTT", "2.5.2")
cg.add_define("USE_MQTT") cg.add_define("USE_MQTT")
cg.add_global(mqtt_ns.using) cg.add_global(mqtt_ns.using)

View file

@ -9,15 +9,12 @@ namespace mqtt {
static const char *const TAG = "mqtt-backend-esp8266"; static const char *const TAG = "mqtt-backend-esp8266";
MQTTBackendESP8266 *object; void MQTTBackendESP8266::on_mqtt_message_wrapper_(MQTTClient *client, char topic[], char bytes[], int length) {
static_cast<MQTTBackendESP8266 *>(client->ref)->on_mqtt_message_(client, topic, bytes, length);
void MQTTBackendESP8266::on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length) {
object->on_mqtt_message_(topic, reinterpret_cast<const char *>(payload), length);
} }
void MQTTBackendESP8266::on_mqtt_message_(const char *topic, const char *payload, unsigned int length) { void MQTTBackendESP8266::on_mqtt_message_(MQTTClient *client, char topic[], char bytes[], int length) {
/* no fragmented messages supported, so current_data_offset = 0 and total_data_len = length*/ this->on_message_.call(topic, bytes, length, 0, length);
this->on_message_.call(topic, payload, length, 0, length);
} }
void MQTTBackendESP8266::initialize_() { void MQTTBackendESP8266::initialize_() {
@ -31,61 +28,72 @@ void MQTTBackendESP8266::initialize_() {
} }
#endif #endif
object = this; this->mqtt_client_.ref = this;
mqtt_client_.setCallback(MQTTBackendESP8266::on_mqtt_message_wrapper_); mqtt_client_.onMessageAdvanced(MQTTBackendESP8266::on_mqtt_message_wrapper_);
this->is_initalized_ = true; this->is_initalized_ = true;
} }
void MQTTBackendESP8266::loop() { void MQTTBackendESP8266::handleErrors_() {
if (!this->is_initalized_) lwmqtt_err_t error = this->mqtt_client_.lastError();
return; lwmqtt_return_code_t return_code = this->mqtt_client_.returnCode();
if (this->mqtt_client_.loop()) { if (error != LWMQTT_SUCCESS) {
if (!this->is_connected_) { ESP_LOGD(TAG, "Error: %d, returnCode: %d", error, return_code);
this->is_connected_ = true;
/*
* PubSubClient doesn't expose session_present flag in CONNACK, passing the clean_session flag
* assumes the broker remembered it correctly
*/
this->on_connect_.call(this->clean_session_);
}
} else {
if (this->is_connected_) {
this->is_connected_ = false;
MQTTClientDisconnectReason reason = MQTTClientDisconnectReason::TCP_DISCONNECTED;
switch (this->mqtt_client_.state()) {
case MQTT_CONNECTION_TIMEOUT:
case MQTT_CONNECTION_LOST:
case MQTT_CONNECT_FAILED:
case MQTT_DISCONNECTED:
reason = MQTTClientDisconnectReason::TCP_DISCONNECTED;
break;
case MQTT_CONNECT_BAD_PROTOCOL:
reason = MQTTClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION;
break;
case MQTT_CONNECT_BAD_CLIENT_ID:
reason = MQTTClientDisconnectReason::MQTT_IDENTIFIER_REJECTED;
break;
case MQTT_CONNECT_UNAVAILABLE:
reason = MQTTClientDisconnectReason::MQTT_SERVER_UNAVAILABLE;
break;
case MQTT_CONNECT_BAD_CREDENTIALS:
reason = MQTTClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS;
break;
case MQTT_CONNECT_UNAUTHORIZED:
reason = MQTTClientDisconnectReason::MQTT_NOT_AUTHORIZED;
break;
case MQTT_CONNECTED:
assert(false);
break;
}
this->on_disconnect_.call(reason);
}
char buffer[128]; char buffer[128];
int code = this->wifi_client_.getLastSSLError(buffer, sizeof(buffer)); int code = this->wifi_client_.getLastSSLError(buffer, sizeof(buffer));
if (code != 0) { if (code != 0) {
ESP_LOGD(TAG, "SSL error code %d: %s", code, buffer); ESP_LOGD(TAG, "SSL error code %d: %s", code, buffer);
this->disconnect();
} }
MQTTClientDisconnectReason reason = MQTTClientDisconnectReason::TCP_DISCONNECTED;
if (return_code != LWMQTT_CONNECTION_ACCEPTED) {
switch (return_code) {
case LWMQTT_CONNECTION_ACCEPTED:
assert(false);
break;
case LWMQTT_UNACCEPTABLE_PROTOCOL:
reason = MQTTClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION;
break;
case LWMQTT_IDENTIFIER_REJECTED:
reason = MQTTClientDisconnectReason::MQTT_IDENTIFIER_REJECTED;
break;
case LWMQTT_SERVER_UNAVAILABLE:
reason = MQTTClientDisconnectReason::MQTT_SERVER_UNAVAILABLE;
break;
case LWMQTT_BAD_USERNAME_OR_PASSWORD:
reason = MQTTClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS;
break;
case LWMQTT_NOT_AUTHORIZED:
reason = MQTTClientDisconnectReason::MQTT_NOT_AUTHORIZED;
break;
case LWMQTT_UNKNOWN_RETURN_CODE:
reason = MQTTClientDisconnectReason::TCP_DISCONNECTED;
break;
}
}
this->on_disconnect_.call(reason);
}
}
void MQTTBackendESP8266::connect() {
if (!this->is_initalized_) {
this->initialize_();
}
this->mqtt_client_.begin(this->host_.c_str(), this->port_, this->wifi_client_);
this->mqtt_client_.connect(this->client_id_.c_str(), this->username_.c_str(), this->password_.c_str());
this->handleErrors_();
}
void MQTTBackendESP8266::loop() {
this->mqtt_client_.loop();
if (!this->is_connected_ && this->mqtt_client_.connected()) {
this->is_connected_ = true;
this->on_connect_.call(this->clean_session_);
}
if (this->is_connected_ && !this->mqtt_client_.connected()) {
this->is_connected_ = false;
this->on_disconnect_.call(MQTTClientDisconnectReason::TCP_DISCONNECTED);
} }
} }

View file

@ -10,16 +10,19 @@
#include <BearSSLHelpers.h> #include <BearSSLHelpers.h>
#include <WiFiClientSecure.h> #include <WiFiClientSecure.h>
#include <PubSubClient.h> #include <MQTT.h>
namespace esphome { namespace esphome {
namespace mqtt { namespace mqtt {
class MQTTBackendESP8266 final : public MQTTBackend { class MQTTBackendESP8266 final : public MQTTBackend {
public: public:
void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } void set_keep_alive(uint16_t keep_alive) final { this->mqtt_client_.setKeepAlive(keep_alive); }
void set_client_id(const char *client_id) final { this->client_id_ = client_id; } void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
void set_clean_session(bool clean_session) final { this->clean_session_ = clean_session; } void set_clean_session(bool clean_session) final {
this->clean_session_ = clean_session;
this->mqtt_client_.setCleanSession(clean_session);
}
void set_credentials(const char *username, const char *password) final { void set_credentials(const char *username, const char *password) final {
if (username) if (username)
this->username_ = username; this->username_ = username;
@ -29,20 +32,19 @@ class MQTTBackendESP8266 final : public MQTTBackend {
void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final { void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final {
if (topic) if (topic)
this->lwt_topic_ = topic; this->lwt_topic_ = topic;
this->lwt_qos_ = qos;
if (payload) if (payload)
this->lwt_message_ = payload; this->lwt_message_ = payload;
this->lwt_retain_ = retain; this->mqtt_client_.setWill(this->lwt_topic_.c_str(), this->lwt_message_.c_str(), retain, qos);
} }
void set_server(network::IPAddress ip, uint16_t port) final { void set_server(network::IPAddress ip, uint16_t port) final {
this->host_ = ip.str(); this->host_ = ip.str();
this->port_ = port; this->port_ = port;
this->mqtt_client_.setServer(ip, port); this->mqtt_client_.setHost(ip, port);
} }
void set_server(const char *host, uint16_t port) final { void set_server(const char *host, uint16_t port) final {
this->host_ = host; this->host_ = host;
this->port_ = port; this->port_ = port;
this->mqtt_client_.setServer(this->host_.c_str(), port); this->mqtt_client_.setHost(this->host_.c_str(), port);
} }
void set_on_connect(std::function<on_connect_callback_t> &&callback) final { void set_on_connect(std::function<on_connect_callback_t> &&callback) final {
@ -65,23 +67,25 @@ class MQTTBackendESP8266 final : public MQTTBackend {
} }
bool connected() const final { return this->is_connected_; } bool connected() const final { return this->is_connected_; }
void connect() final { void connect() final;
if (!this->is_initalized_) {
this->initialize_();
}
this->mqtt_client_.connect(this->client_id_.c_str(), this->username_.c_str(), this->password_.c_str(),
this->lwt_topic_.c_str(), this->lwt_qos_, this->lwt_retain_, this->lwt_message_.c_str(),
this->clean_session_);
}
void disconnect() final { void disconnect() final {
if (this->is_initalized_) if (this->is_initalized_)
this->mqtt_client_.disconnect(); this->mqtt_client_.disconnect();
} }
bool subscribe(const char *topic, uint8_t qos) final { return this->mqtt_client_.subscribe(topic, qos); } bool subscribe(const char *topic, uint8_t qos) final {
bool unsubscribe(const char *topic) final { return this->mqtt_client_.unsubscribe(topic); } bool res = this->mqtt_client_.subscribe(topic, qos);
this->on_subscribe_.call(this->mqtt_client_.lastPacketID(), qos);
return res;
}
bool unsubscribe(const char *topic) final {
bool res = this->mqtt_client_.unsubscribe(topic);
this->on_unsubscribe_.call(this->mqtt_client_.lastPacketID());
return res;
}
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final { bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
/* qos parameter is ignored, as PubSubClient can only publish QoS 0 messages */ bool res = this->mqtt_client_.publish(topic, payload, length, retain, qos);
return this->mqtt_client_.publish(topic, reinterpret_cast<const uint8_t *>(payload), length, retain); this->on_publish_.call(this->mqtt_client_.lastPacketID());
return res;
} }
using MQTTBackend::publish; using MQTTBackend::publish;
@ -92,30 +96,28 @@ class MQTTBackendESP8266 final : public MQTTBackend {
protected: protected:
void initialize_(); void initialize_();
static void on_mqtt_message_wrapper_(const char *topic, unsigned char *payload, unsigned int length); void handleErrors_();
void on_mqtt_message_(const char *topic, const char *payload, unsigned int length); static void on_mqtt_message_wrapper_(MQTTClient *client, char topic[], char bytes[], int length);
void on_mqtt_message_(MQTTClient *client, char topic[], char bytes[], int length);
#ifdef USE_MQTT_SECURE_CLIENT #ifdef USE_MQTT_SECURE_CLIENT
WiFiClientSecure wifi_client_; WiFiClientSecure wifi_client_;
#else #else
WiFiClient wifi_client_; WiFiClient wifi_client_;
#endif #endif
PubSubClient mqtt_client_{wifi_client_}; MQTTClient mqtt_client_;
bool is_connected_{false}; bool is_connected_{false};
bool is_initalized_{false}; bool is_initalized_{false};
std::string host_; std::string host_;
uint16_t port_; uint16_t port_;
bool clean_session_{true};
std::string username_; std::string username_;
std::string password_; std::string password_;
std::string lwt_topic_; std::string lwt_topic_;
std::string lwt_message_; std::string lwt_message_;
uint8_t lwt_qos_;
bool lwt_retain_;
std::string client_id_; std::string client_id_;
uint16_t keep_alive_;
bool clean_session_;
optional<std::string> ca_certificate_str_; optional<std::string> ca_certificate_str_;
BearSSL::X509List ca_certificate_; BearSSL::X509List ca_certificate_;
bool skip_cert_cn_check_{false}; bool skip_cert_cn_check_{false};