Lock scheduler items while modifying them (#4410)

* Cosmetic fixes to scheduler code

* Add generic Mutex API

* Lock scheduler items while modifying them

* Always defer MQTT callbacks on Arduino
This commit is contained in:
Oxan van Leeuwen 2023-02-26 19:43:08 +01:00 committed by GitHub
parent 1a9141877d
commit 86c0e6114f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 12 deletions

View file

@ -485,16 +485,16 @@ static bool topic_match(const char *message, const char *subscription) {
}
void MQTTClientComponent::on_message(const std::string &topic, const std::string &payload) {
#ifdef USE_ESP8266
// on ESP8266, this is called in LWiP thread; some components do not like running
// in an ISR.
#ifdef USE_ARDUINO
// on Arduino, this is called in lwIP/AsyncTCP task; some components do not like running
// from a different task.
this->defer([this, topic, payload]() {
#endif
for (auto &subscription : this->subscriptions_) {
if (topic_match(topic.c_str(), subscription.topic.c_str()))
subscription.callback(topic, payload);
}
#ifdef USE_ESP8266
#ifdef USE_ARDUINO
});
#endif
}

View file

@ -393,6 +393,18 @@ void hsv_to_rgb(int hue, float saturation, float value, float &red, float &green
}
// System APIs
#if defined(USE_ESP8266)
// ESP8266 doesn't have mutexes, but that shouldn't be an issue as it's single-core and non-preemptive OS.
Mutex::Mutex() {}
void Mutex::lock() {}
bool Mutex::try_lock() { return true; }
void Mutex::unlock() {}
#elif defined(USE_ESP32) || defined(USE_RP2040)
Mutex::Mutex() { handle_ = xSemaphoreCreateMutex(); }
void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); }
bool Mutex::try_lock() { return xSemaphoreTake(this->handle_, 0) == pdTRUE; }
void Mutex::unlock() { xSemaphoreGive(this->handle_); }
#endif
#if defined(USE_ESP8266)
IRAM_ATTR InterruptLock::InterruptLock() { state_ = xt_rsil(15); }

View file

@ -14,6 +14,14 @@
#include <esp_heap_caps.h>
#endif
#if defined(USE_ESP32)
#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
#elif defined(USE_RP2040)
#include <FreeRTOS.h>
#include <semphr.h>
#endif
#define HOT __attribute__((hot))
#define ESPDEPRECATED(msg, when) __attribute__((deprecated(msg)))
#define ALWAYS_INLINE __attribute__((always_inline))
@ -516,6 +524,39 @@ template<typename T> class Parented {
/// @name System APIs
///@{
/** Mutex implementation, with API based on the unavailable std::mutex.
*
* @note This mutex is non-recursive, so take care not to try to obtain the mutex while it is already taken.
*/
class Mutex {
public:
Mutex();
Mutex(const Mutex &) = delete;
void lock();
bool try_lock();
void unlock();
Mutex &operator=(const Mutex &) = delete;
private:
#if defined(USE_ESP32) || defined(USE_RP2040)
SemaphoreHandle_t handle_;
#endif
};
/** Helper class that wraps a mutex with a RAII-style API.
*
* This behaves like std::lock_guard: as long as the object is alive, the mutex is held.
*/
class LockGuard {
public:
LockGuard(Mutex &mutex) : mutex_{mutex} { mutex_.lock(); }
~LockGuard() { mutex_.unlock(); }
private:
Mutex &mutex_;
};
/** Helper class to disable interrupts.
*
* This behaves like std::lock_guard: as long as the object is alive, all interrupts are disabled.

View file

@ -13,6 +13,11 @@ static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
// Uncomment to debug scheduler
// #define ESPHOME_DEBUG_SCHEDULER
// A note on locking: the `lock_` lock protects the `items_` and `to_add_` containers. It must be taken when writing to
// them (i.e. when adding/removing items, but not when changing items). As items are only deleted from the loop task,
// iterating over them from the loop task is fine; but iterating from any other context requires the lock to be held to
// avoid the main thread modifying the list while it is being accessed.
void HOT Scheduler::set_timeout(Component *component, const std::string &name, uint32_t timeout,
std::function<void()> func) {
const uint32_t now = this->millis_();
@ -121,7 +126,7 @@ void HOT Scheduler::set_retry(Component *component, const std::string &name, uin
args->backoff_increase_factor = backoff_increase_factor;
args->scheduler = this;
// First exectuion of `func` immediately
// First execution of `func` immediately
this->set_timeout(component, args->name, 0, [args]() { retry_handler(args); });
}
bool HOT Scheduler::cancel_retry(Component *component, const std::string &name) {
@ -150,30 +155,42 @@ void HOT Scheduler::call() {
std::vector<std::unique_ptr<SchedulerItem>> old_items;
ESP_LOGVV(TAG, "Items: count=%u, now=%u", this->items_.size(), now);
while (!this->empty_()) {
this->lock_.lock();
auto item = std::move(this->items_[0]);
this->pop_raw_();
this->lock_.unlock();
ESP_LOGVV(TAG, " %s '%s' interval=%u last_execution=%u (%u) next=%u (%u)", item->get_type_str(),
item->name.c_str(), item->interval, item->last_execution, item->last_execution_major,
item->next_execution(), item->next_execution_major());
this->pop_raw_();
old_items.push_back(std::move(item));
}
ESP_LOGVV(TAG, "\n");
this->items_ = std::move(old_items);
{
LockGuard guard{this->lock_};
this->items_ = std::move(old_items);
}
}
#endif // ESPHOME_DEBUG_SCHEDULER
auto to_remove_was = to_remove_;
auto items_was = items_.size();
auto items_was = this->items_.size();
// If we have too many items to remove
if (to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
while (!this->empty_()) {
LockGuard guard{this->lock_};
auto item = std::move(this->items_[0]);
this->pop_raw_();
valid_items.push_back(std::move(item));
}
this->items_ = std::move(valid_items);
{
LockGuard guard{this->lock_};
this->items_ = std::move(valid_items);
}
// The following should not happen unless I'm missing something
if (to_remove_ != 0) {
@ -198,6 +215,7 @@ void HOT Scheduler::call() {
// Don't run on failed components
if (item->component != nullptr && item->component->is_failed()) {
LockGuard guard{this->lock_};
this->pop_raw_();
continue;
}
@ -217,6 +235,8 @@ void HOT Scheduler::call() {
}
{
this->lock_.lock();
// new scope, item from before might have been moved in the vector
auto item = std::move(this->items_[0]);
@ -224,6 +244,8 @@ void HOT Scheduler::call() {
// during the function call and know if we were cancelled.
this->pop_raw_();
this->lock_.unlock();
if (item->remove) {
// We were removed/cancelled in the function call, stop
to_remove_--;
@ -246,6 +268,7 @@ void HOT Scheduler::call() {
this->process_to_add();
}
void HOT Scheduler::process_to_add() {
LockGuard guard{this->lock_};
for (auto &it : this->to_add_) {
if (it->remove) {
continue;
@ -263,15 +286,24 @@ void HOT Scheduler::cleanup_() {
return;
to_remove_--;
this->pop_raw_();
{
LockGuard guard{this->lock_};
this->pop_raw_();
}
}
}
void HOT Scheduler::pop_raw_() {
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
this->items_.pop_back();
}
void HOT Scheduler::push_(std::unique_ptr<Scheduler::SchedulerItem> item) { this->to_add_.push_back(std::move(item)); }
void HOT Scheduler::push_(std::unique_ptr<Scheduler::SchedulerItem> item) {
LockGuard guard{this->lock_};
this->to_add_.push_back(std::move(item));
}
bool HOT Scheduler::cancel_item_(Component *component, const std::string &name, Scheduler::SchedulerItem::Type type) {
// obtain lock because this function iterates and can be called from non-loop task context
LockGuard guard{this->lock_};
bool ret = false;
for (auto &it : this->items_) {
if (it->component == component && it->name == name && it->type == type && !it->remove) {

View file

@ -1,9 +1,11 @@
#pragma once
#include "esphome/core/component.h"
#include <vector>
#include <memory>
#include "esphome/core/component.h"
#include "esphome/core/helpers.h"
namespace esphome {
class Component;
@ -71,6 +73,7 @@ class Scheduler {
return this->items_.empty();
}
Mutex lock_;
std::vector<std::unique_ptr<SchedulerItem>> items_;
std::vector<std::unique_ptr<SchedulerItem>> to_add_;
uint32_t last_millis_{0};