diff --git a/esphome/components/mqtt/mqtt_client.cpp b/esphome/components/mqtt/mqtt_client.cpp index aa0cf56c51..acb863244e 100644 --- a/esphome/components/mqtt/mqtt_client.cpp +++ b/esphome/components/mqtt/mqtt_client.cpp @@ -485,16 +485,16 @@ static bool topic_match(const char *message, const char *subscription) { } void MQTTClientComponent::on_message(const std::string &topic, const std::string &payload) { -#ifdef USE_ESP8266 - // on ESP8266, this is called in LWiP thread; some components do not like running - // in an ISR. +#ifdef USE_ARDUINO + // on Arduino, this is called in lwIP/AsyncTCP task; some components do not like running + // from a different task. this->defer([this, topic, payload]() { #endif for (auto &subscription : this->subscriptions_) { if (topic_match(topic.c_str(), subscription.topic.c_str())) subscription.callback(topic, payload); } -#ifdef USE_ESP8266 +#ifdef USE_ARDUINO }); #endif } diff --git a/esphome/core/helpers.cpp b/esphome/core/helpers.cpp index 7ec5d9d23c..4ac9303b20 100644 --- a/esphome/core/helpers.cpp +++ b/esphome/core/helpers.cpp @@ -393,6 +393,18 @@ void hsv_to_rgb(int hue, float saturation, float value, float &red, float &green } // System APIs +#if defined(USE_ESP8266) +// ESP8266 doesn't have mutexes, but that shouldn't be an issue as it's single-core and non-preemptive OS. +Mutex::Mutex() {} +void Mutex::lock() {} +bool Mutex::try_lock() { return true; } +void Mutex::unlock() {} +#elif defined(USE_ESP32) || defined(USE_RP2040) +Mutex::Mutex() { handle_ = xSemaphoreCreateMutex(); } +void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); } +bool Mutex::try_lock() { return xSemaphoreTake(this->handle_, 0) == pdTRUE; } +void Mutex::unlock() { xSemaphoreGive(this->handle_); } +#endif #if defined(USE_ESP8266) IRAM_ATTR InterruptLock::InterruptLock() { state_ = xt_rsil(15); } diff --git a/esphome/core/helpers.h b/esphome/core/helpers.h index cef303941e..0d2a7e298a 100644 --- a/esphome/core/helpers.h +++ b/esphome/core/helpers.h @@ -14,6 +14,14 @@ #include #endif +#if defined(USE_ESP32) +#include +#include +#elif defined(USE_RP2040) +#include +#include +#endif + #define HOT __attribute__((hot)) #define ESPDEPRECATED(msg, when) __attribute__((deprecated(msg))) #define ALWAYS_INLINE __attribute__((always_inline)) @@ -516,6 +524,39 @@ template class Parented { /// @name System APIs ///@{ +/** Mutex implementation, with API based on the unavailable std::mutex. + * + * @note This mutex is non-recursive, so take care not to try to obtain the mutex while it is already taken. + */ +class Mutex { + public: + Mutex(); + Mutex(const Mutex &) = delete; + void lock(); + bool try_lock(); + void unlock(); + + Mutex &operator=(const Mutex &) = delete; + + private: +#if defined(USE_ESP32) || defined(USE_RP2040) + SemaphoreHandle_t handle_; +#endif +}; + +/** Helper class that wraps a mutex with a RAII-style API. + * + * This behaves like std::lock_guard: as long as the object is alive, the mutex is held. + */ +class LockGuard { + public: + LockGuard(Mutex &mutex) : mutex_{mutex} { mutex_.lock(); } + ~LockGuard() { mutex_.unlock(); } + + private: + Mutex &mutex_; +}; + /** Helper class to disable interrupts. * * This behaves like std::lock_guard: as long as the object is alive, all interrupts are disabled. diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index d880f0fda4..0cb148ec13 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -13,6 +13,11 @@ static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10; // Uncomment to debug scheduler // #define ESPHOME_DEBUG_SCHEDULER +// A note on locking: the `lock_` lock protects the `items_` and `to_add_` containers. It must be taken when writing to +// them (i.e. when adding/removing items, but not when changing items). As items are only deleted from the loop task, +// iterating over them from the loop task is fine; but iterating from any other context requires the lock to be held to +// avoid the main thread modifying the list while it is being accessed. + void HOT Scheduler::set_timeout(Component *component, const std::string &name, uint32_t timeout, std::function func) { const uint32_t now = this->millis_(); @@ -121,7 +126,7 @@ void HOT Scheduler::set_retry(Component *component, const std::string &name, uin args->backoff_increase_factor = backoff_increase_factor; args->scheduler = this; - // First exectuion of `func` immediately + // First execution of `func` immediately this->set_timeout(component, args->name, 0, [args]() { retry_handler(args); }); } bool HOT Scheduler::cancel_retry(Component *component, const std::string &name) { @@ -150,30 +155,42 @@ void HOT Scheduler::call() { std::vector> old_items; ESP_LOGVV(TAG, "Items: count=%u, now=%u", this->items_.size(), now); while (!this->empty_()) { + this->lock_.lock(); auto item = std::move(this->items_[0]); + this->pop_raw_(); + this->lock_.unlock(); + ESP_LOGVV(TAG, " %s '%s' interval=%u last_execution=%u (%u) next=%u (%u)", item->get_type_str(), item->name.c_str(), item->interval, item->last_execution, item->last_execution_major, item->next_execution(), item->next_execution_major()); - this->pop_raw_(); old_items.push_back(std::move(item)); } ESP_LOGVV(TAG, "\n"); - this->items_ = std::move(old_items); + + { + LockGuard guard{this->lock_}; + this->items_ = std::move(old_items); + } } #endif // ESPHOME_DEBUG_SCHEDULER auto to_remove_was = to_remove_; - auto items_was = items_.size(); + auto items_was = this->items_.size(); // If we have too many items to remove if (to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { std::vector> valid_items; while (!this->empty_()) { + LockGuard guard{this->lock_}; auto item = std::move(this->items_[0]); this->pop_raw_(); valid_items.push_back(std::move(item)); } - this->items_ = std::move(valid_items); + + { + LockGuard guard{this->lock_}; + this->items_ = std::move(valid_items); + } // The following should not happen unless I'm missing something if (to_remove_ != 0) { @@ -198,6 +215,7 @@ void HOT Scheduler::call() { // Don't run on failed components if (item->component != nullptr && item->component->is_failed()) { + LockGuard guard{this->lock_}; this->pop_raw_(); continue; } @@ -217,6 +235,8 @@ void HOT Scheduler::call() { } { + this->lock_.lock(); + // new scope, item from before might have been moved in the vector auto item = std::move(this->items_[0]); @@ -224,6 +244,8 @@ void HOT Scheduler::call() { // during the function call and know if we were cancelled. this->pop_raw_(); + this->lock_.unlock(); + if (item->remove) { // We were removed/cancelled in the function call, stop to_remove_--; @@ -246,6 +268,7 @@ void HOT Scheduler::call() { this->process_to_add(); } void HOT Scheduler::process_to_add() { + LockGuard guard{this->lock_}; for (auto &it : this->to_add_) { if (it->remove) { continue; @@ -263,15 +286,24 @@ void HOT Scheduler::cleanup_() { return; to_remove_--; - this->pop_raw_(); + + { + LockGuard guard{this->lock_}; + this->pop_raw_(); + } } } void HOT Scheduler::pop_raw_() { std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); this->items_.pop_back(); } -void HOT Scheduler::push_(std::unique_ptr item) { this->to_add_.push_back(std::move(item)); } +void HOT Scheduler::push_(std::unique_ptr item) { + LockGuard guard{this->lock_}; + this->to_add_.push_back(std::move(item)); +} bool HOT Scheduler::cancel_item_(Component *component, const std::string &name, Scheduler::SchedulerItem::Type type) { + // obtain lock because this function iterates and can be called from non-loop task context + LockGuard guard{this->lock_}; bool ret = false; for (auto &it : this->items_) { if (it->component == component && it->name == name && it->type == type && !it->remove) { diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index a758198b8d..44a58f37f5 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -1,9 +1,11 @@ #pragma once -#include "esphome/core/component.h" #include #include +#include "esphome/core/component.h" +#include "esphome/core/helpers.h" + namespace esphome { class Component; @@ -71,6 +73,7 @@ class Scheduler { return this->items_.empty(); } + Mutex lock_; std::vector> items_; std::vector> to_add_; uint32_t last_millis_{0};