From 149d814fab0980a4948806e014306d587805eadf Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 15 Nov 2023 20:49:56 -0600 Subject: [PATCH] dashboard: Centralize dashboard entries into DashboardEntries class (#5774) * Centralize dashboard entries into DashboardEntries class * preen * preen * preen * preen * preen --- esphome/dashboard/core.py | 11 +- esphome/dashboard/entries.py | 202 ++++++++++++++++++++++++++----- esphome/dashboard/settings.py | 86 ++----------- esphome/dashboard/status/mdns.py | 7 +- esphome/dashboard/status/mqtt.py | 7 +- esphome/dashboard/status/ping.py | 3 +- esphome/dashboard/web_server.py | 26 ++-- 7 files changed, 209 insertions(+), 133 deletions(-) diff --git a/esphome/dashboard/core.py b/esphome/dashboard/core.py index 4cc2938bb1..f18da92d80 100644 --- a/esphome/dashboard/core.py +++ b/esphome/dashboard/core.py @@ -6,7 +6,7 @@ import threading from typing import TYPE_CHECKING from ..zeroconf import DiscoveredImport -from .entries import DashboardEntry +from .entries import DashboardEntries from .settings import DashboardSettings if TYPE_CHECKING: @@ -15,15 +15,11 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) -def list_dashboard_entries() -> list[DashboardEntry]: - """List all dashboard entries.""" - return DASHBOARD.settings.entries() - - class ESPHomeDashboard: """Class that represents the dashboard.""" __slots__ = ( + "entries", "loop", "ping_result", "import_result", @@ -36,6 +32,7 @@ class ESPHomeDashboard: def __init__(self) -> None: """Initialize the ESPHomeDashboard.""" + self.entries: DashboardEntries | None = None self.loop: asyncio.AbstractEventLoop | None = None self.ping_result: dict[str, bool | None] = {} self.import_result: dict[str, DiscoveredImport] = {} @@ -49,12 +46,14 @@ class ESPHomeDashboard: """Setup the dashboard.""" self.loop = asyncio.get_running_loop() self.ping_request = asyncio.Event() + self.entries = DashboardEntries(self.settings.config_dir) async def async_run(self) -> None: """Run the dashboard.""" settings = self.settings mdns_task: asyncio.Task | None = None ping_status_task: asyncio.Task | None = None + await self.entries.async_update_entries() if settings.status_use_ping: from .status.ping import PingStatus diff --git a/esphome/dashboard/entries.py b/esphome/dashboard/entries.py index 582073d655..ff539fc620 100644 --- a/esphome/dashboard/entries.py +++ b/esphome/dashboard/entries.py @@ -1,10 +1,150 @@ from __future__ import annotations +import asyncio +import logging import os -from esphome import const +from esphome import const, util from esphome.storage_json import StorageJSON, ext_storage_path +_LOGGER = logging.getLogger(__name__) + +DashboardCacheKeyType = tuple[int, int, float, int] + + +class DashboardEntries: + """Represents all dashboard entries.""" + + __slots__ = ("_loop", "_config_dir", "_entries", "_loaded_entries", "_update_lock") + + def __init__(self, config_dir: str) -> None: + """Initialize the DashboardEntries.""" + self._loop = asyncio.get_running_loop() + self._config_dir = config_dir + # Entries are stored as + # { + # "path/to/file.yaml": DashboardEntry, + # ... + # } + self._entries: dict[str, DashboardEntry] = {} + self._loaded_entries = False + self._update_lock = asyncio.Lock() + + def get(self, path: str) -> DashboardEntry | None: + """Get an entry by path.""" + return self._entries.get(path) + + async def _async_all(self) -> list[DashboardEntry]: + """Return all entries.""" + return list(self._entries.values()) + + def all(self) -> list[DashboardEntry]: + """Return all entries.""" + return asyncio.run_coroutine_threadsafe(self._async_all, self._loop).result() + + def async_all(self) -> list[DashboardEntry]: + """Return all entries.""" + return list(self._entries.values()) + + async def async_request_update_entries(self) -> None: + """Request an update of the dashboard entries from disk. + + If an update is already in progress, this will do nothing. + """ + if self._update_lock.locked(): + _LOGGER.debug("Dashboard entries are already being updated") + return + await self.async_update_entries() + + async def async_update_entries(self) -> None: + """Update the dashboard entries from disk.""" + async with self._update_lock: + await self._async_update_entries() + + def _load_entries( + self, entries: dict[DashboardEntry, DashboardCacheKeyType] + ) -> None: + """Load all entries from disk.""" + for entry, cache_key in entries.items(): + _LOGGER.debug( + "Loading dashboard entry %s because cache key changed: %s", + entry.path, + cache_key, + ) + entry.load_from_disk(cache_key) + + async def _async_update_entries(self) -> list[DashboardEntry]: + """Sync the dashboard entries from disk.""" + _LOGGER.debug("Updating dashboard entries") + # At some point it would be nice to use watchdog to avoid polling + + path_to_cache_key = await self._loop.run_in_executor( + None, self._get_path_to_cache_key + ) + added: dict[DashboardEntry, DashboardCacheKeyType] = {} + updated: dict[DashboardEntry, DashboardCacheKeyType] = {} + removed: set[DashboardEntry] = { + entry + for filename, entry in self._entries.items() + if filename not in path_to_cache_key + } + entries = self._entries + for path, cache_key in path_to_cache_key.items(): + if entry := self._entries.get(path): + if entry.cache_key != cache_key: + updated[entry] = cache_key + else: + entry = DashboardEntry(path, cache_key) + added[entry] = cache_key + + if added or updated: + await self._loop.run_in_executor( + None, self._load_entries, {**added, **updated} + ) + + for entry in added: + _LOGGER.debug("Added dashboard entry %s", entry.path) + entries[entry.path] = entry + + if entry in removed: + _LOGGER.debug("Removed dashboard entry %s", entry.path) + entries.pop(entry.path) + + for entry in updated: + _LOGGER.debug("Updated dashboard entry %s", entry.path) + # In the future we can fire events when entries are added/removed/updated + + def _get_path_to_cache_key(self) -> dict[str, DashboardCacheKeyType]: + """Return a dict of path to cache key.""" + path_to_cache_key: dict[str, DashboardCacheKeyType] = {} + # + # The cache key is (inode, device, mtime, size) + # which allows us to avoid locking since it ensures + # every iteration of this call will always return the newest + # items from disk at the cost of a stat() call on each + # file which is much faster than reading the file + # for the cache hit case which is the common case. + # + for file in util.list_yaml_files([self._config_dir]): + try: + # Prefer the json storage path if it exists + stat = os.stat(ext_storage_path(os.path.basename(file))) + except OSError: + try: + # Fallback to the yaml file if the storage + # file does not exist or could not be generated + stat = os.stat(file) + except OSError: + # File was deleted, ignore + continue + path_to_cache_key[file] = ( + stat.st_ino, + stat.st_dev, + stat.st_mtime, + stat.st_size, + ) + return path_to_cache_key + class DashboardEntry: """Represents a single dashboard entry. @@ -12,13 +152,15 @@ class DashboardEntry: This class is thread-safe and read-only. """ - __slots__ = ("path", "_storage", "_loaded_storage") + __slots__ = ("path", "filename", "_storage_path", "cache_key", "storage") - def __init__(self, path: str) -> None: + def __init__(self, path: str, cache_key: DashboardCacheKeyType) -> None: """Initialize the DashboardEntry.""" self.path = path - self._storage = None - self._loaded_storage = False + self.filename = os.path.basename(path) + self._storage_path = ext_storage_path(self.filename) + self.cache_key = cache_key + self.storage: StorageJSON | None = None def __repr__(self): """Return the representation of this entry.""" @@ -30,87 +172,91 @@ class DashboardEntry: f"no_mdns={self.no_mdns})" ) - @property - def filename(self): - """Return the filename of this entry.""" - return os.path.basename(self.path) + def load_from_disk(self, cache_key: DashboardCacheKeyType | None = None) -> None: + """Load this entry from disk.""" + self.storage = StorageJSON.load(self._storage_path) + # + # Currently StorageJSON.load() will return None if the file does not exist + # + # StorageJSON currently does not provide an updated cache key so we use the + # one that is passed in. + # + # The cache key was read from the disk moments ago and may be stale but + # it does not matter since we are polling anyways, and the next call to + # async_update_entries() will load it again in the extremely rare case that + # it changed between the two calls. + # + if cache_key: + self.cache_key = cache_key @property - def storage(self) -> StorageJSON | None: - """Return the StorageJSON object for this entry.""" - if not self._loaded_storage: - self._storage = StorageJSON.load(ext_storage_path(self.filename)) - self._loaded_storage = True - return self._storage - - @property - def address(self): + def address(self) -> str | None: """Return the address of this entry.""" if self.storage is None: return None return self.storage.address @property - def no_mdns(self): + def no_mdns(self) -> bool | None: """Return the no_mdns of this entry.""" if self.storage is None: return None return self.storage.no_mdns @property - def web_port(self): + def web_port(self) -> int | None: """Return the web port of this entry.""" if self.storage is None: return None return self.storage.web_port @property - def name(self): + def name(self) -> str: """Return the name of this entry.""" if self.storage is None: return self.filename.replace(".yml", "").replace(".yaml", "") return self.storage.name @property - def friendly_name(self): + def friendly_name(self) -> str: """Return the friendly name of this entry.""" if self.storage is None: return self.name return self.storage.friendly_name @property - def comment(self): + def comment(self) -> str | None: """Return the comment of this entry.""" if self.storage is None: return None return self.storage.comment @property - def target_platform(self): + def target_platform(self) -> str | None: """Return the target platform of this entry.""" if self.storage is None: return None return self.storage.target_platform @property - def update_available(self): + def update_available(self) -> bool: """Return if an update is available for this entry.""" if self.storage is None: return True return self.update_old != self.update_new @property - def update_old(self): + def update_old(self) -> str: if self.storage is None: return "" return self.storage.esphome_version or "" @property - def update_new(self): + def update_new(self) -> str: return const.__version__ @property - def loaded_integrations(self): + def loaded_integrations(self) -> list[str]: if self.storage is None: return [] return self.storage.loaded_integrations diff --git a/esphome/dashboard/settings.py b/esphome/dashboard/settings.py index 888616f6f7..76633e1bf2 100644 --- a/esphome/dashboard/settings.py +++ b/esphome/dashboard/settings.py @@ -4,29 +4,23 @@ import hmac import os from pathlib import Path -from esphome import util from esphome.core import CORE from esphome.helpers import get_bool_env -from esphome.storage_json import ext_storage_path -from .entries import DashboardEntry from .util.password import password_hash class DashboardSettings: """Settings for the dashboard.""" - def __init__(self): - self.config_dir = "" - self.password_hash = "" - self.username = "" - self.using_password = False - self.on_ha_addon = False - self.cookie_secret = None - self.absolute_config_dir = None - self._entry_cache: dict[ - str, tuple[tuple[int, int, float, int], DashboardEntry] - ] = {} + def __init__(self) -> None: + self.config_dir: str = "" + self.password_hash: str = "" + self.username: str = "" + self.using_password: bool = False + self.on_ha_addon: bool = False + self.cookie_secret: str | None = None + self.absolute_config_dir: Path | None = None def parse_args(self, args): self.on_ha_addon: bool = args.ha_addon @@ -80,67 +74,3 @@ class DashboardSettings: # Raises ValueError if not relative to ESPHome config folder Path(joined_path).resolve().relative_to(self.absolute_config_dir) return joined_path - - def list_yaml_files(self) -> list[str]: - return util.list_yaml_files([self.config_dir]) - - def entries(self) -> list[DashboardEntry]: - """Fetch all dashboard entries, thread-safe.""" - path_to_cache_key: dict[str, tuple[int, int, float, int]] = {} - # - # The cache key is (inode, device, mtime, size) - # which allows us to avoid locking since it ensures - # every iteration of this call will always return the newest - # items from disk at the cost of a stat() call on each - # file which is much faster than reading the file - # for the cache hit case which is the common case. - # - # Because there is no lock the cache may - # get built more than once but that's fine as its still - # thread-safe and results in orders of magnitude less - # reads from disk than if we did not cache at all and - # does not have a lock contention issue. - # - for file in self.list_yaml_files(): - try: - # Prefer the json storage path if it exists - stat = os.stat(ext_storage_path(os.path.basename(file))) - except OSError: - try: - # Fallback to the yaml file if the storage - # file does not exist or could not be generated - stat = os.stat(file) - except OSError: - # File was deleted, ignore - continue - path_to_cache_key[file] = ( - stat.st_ino, - stat.st_dev, - stat.st_mtime, - stat.st_size, - ) - - entry_cache = self._entry_cache - - # Remove entries that no longer exist - removed: list[str] = [] - for file in entry_cache: - if file not in path_to_cache_key: - removed.append(file) - - for file in removed: - entry_cache.pop(file) - - dashboard_entries: list[DashboardEntry] = [] - for file, cache_key in path_to_cache_key.items(): - if cached_entry := entry_cache.get(file): - entry_key, dashboard_entry = cached_entry - if entry_key == cache_key: - dashboard_entries.append(dashboard_entry) - continue - - dashboard_entry = DashboardEntry(file) - dashboard_entries.append(dashboard_entry) - entry_cache[file] = (cache_key, dashboard_entry) - - return dashboard_entries diff --git a/esphome/dashboard/status/mdns.py b/esphome/dashboard/status/mdns.py index 454bba9cb5..51d11390b7 100644 --- a/esphome/dashboard/status/mdns.py +++ b/esphome/dashboard/status/mdns.py @@ -10,7 +10,7 @@ from esphome.zeroconf import ( DashboardStatus, ) -from ..core import DASHBOARD, list_dashboard_entries +from ..core import DASHBOARD class MDNSStatus: @@ -41,12 +41,13 @@ class MDNSStatus: async def async_refresh_hosts(self): """Refresh the hosts to track.""" - entries = await self._loop.run_in_executor(None, list_dashboard_entries) + dashboard = DASHBOARD + entries = dashboard.entries.async_all() host_name_with_mdns_enabled = self.host_name_with_mdns_enabled host_mdns_state = self.host_mdns_state host_name_to_filename = self.host_name_to_filename filename_to_host_name = self.filename_to_host_name - ping_result = DASHBOARD.ping_result + ping_result = dashboard.ping_result for entry in entries: name = entry.name diff --git a/esphome/dashboard/status/mqtt.py b/esphome/dashboard/status/mqtt.py index 109b60e133..2fd3a332a7 100644 --- a/esphome/dashboard/status/mqtt.py +++ b/esphome/dashboard/status/mqtt.py @@ -7,7 +7,7 @@ import threading from esphome import mqtt -from ..core import DASHBOARD, list_dashboard_entries +from ..core import DASHBOARD class MqttStatusThread(threading.Thread): @@ -16,7 +16,7 @@ class MqttStatusThread(threading.Thread): def run(self) -> None: """Run the status thread.""" dashboard = DASHBOARD - entries = list_dashboard_entries() + entries = dashboard.entries.all() config = mqtt.config_from_env() topic = "esphome/discover/#" @@ -51,8 +51,7 @@ class MqttStatusThread(threading.Thread): client.loop_start() while not dashboard.stop_event.wait(2): - # update entries - entries = list_dashboard_entries() + entries = dashboard.entries.all() # will be set to true on on_message for entry in entries: diff --git a/esphome/dashboard/status/ping.py b/esphome/dashboard/status/ping.py index 678d7844ae..35fb2259f0 100644 --- a/esphome/dashboard/status/ping.py +++ b/esphome/dashboard/status/ping.py @@ -6,7 +6,6 @@ from typing import cast from ..core import DASHBOARD from ..entries import DashboardEntry -from ..core import list_dashboard_entries from ..util.itertools import chunked from ..util.subprocess import async_system_command_status @@ -32,7 +31,7 @@ class PingStatus: # Only ping if the dashboard is open await dashboard.ping_request.wait() dashboard.ping_result.clear() - entries = await self._loop.run_in_executor(None, list_dashboard_entries) + entries = dashboard.entries.async_all() to_ping: list[DashboardEntry] = [ entry for entry in entries if entry.address is not None ] diff --git a/esphome/dashboard/web_server.py b/esphome/dashboard/web_server.py index 76faa43015..9a5de0a933 100644 --- a/esphome/dashboard/web_server.py +++ b/esphome/dashboard/web_server.py @@ -36,10 +36,9 @@ from esphome.storage_json import StorageJSON, ext_storage_path, trash_storage_pa from esphome.util import get_serial_ports, shlex_quote from esphome.yaml_util import FastestAvailableSafeLoader -from .core import DASHBOARD, list_dashboard_entries -from .entries import DashboardEntry -from .util.text import friendly_name_slugify +from .core import DASHBOARD from .util.subprocess import async_run_system_command +from .util.text import friendly_name_slugify _LOGGER = logging.getLogger(__name__) @@ -601,11 +600,11 @@ class EsphomeVersionHandler(BaseHandler): class ListDevicesHandler(BaseHandler): @authenticated async def get(self): - loop = asyncio.get_running_loop() - entries = await loop.run_in_executor(None, list_dashboard_entries) + dashboard = DASHBOARD + await dashboard.entries.async_request_update_entries() + entries = dashboard.entries.async_all() self.set_header("content-type", "application/json") configured = {entry.name for entry in entries} - dashboard = DASHBOARD self.write( json.dumps( @@ -658,8 +657,10 @@ class MainRequestHandler(BaseHandler): class PrometheusServiceDiscoveryHandler(BaseHandler): @authenticated - def get(self): - entries = list_dashboard_entries() + async def get(self): + dashboard = DASHBOARD + await dashboard.entries.async_request_update_entries() + entries = dashboard.entries.async_all() self.set_header("content-type", "application/json") sd = [] for entry in entries: @@ -733,16 +734,17 @@ class PingRequestHandler(BaseHandler): class InfoRequestHandler(BaseHandler): @authenticated @bind_config - def get(self, configuration=None): + async def get(self, configuration=None): yaml_path = settings.rel_path(configuration) - all_yaml_files = settings.list_yaml_files() + dashboard = DASHBOARD + entry = dashboard.entries.get(yaml_path) - if yaml_path not in all_yaml_files: + if not entry: self.set_status(404) return self.set_header("content-type", "application/json") - self.write(DashboardEntry(yaml_path).storage.to_json()) + self.write(entry.storage.to_json()) class EditRequestHandler(BaseHandler):