diff --git a/esphome/dashboard/async_adapter.py b/esphome/dashboard/async_adapter.py deleted file mode 100644 index 44d2f42ce0..0000000000 --- a/esphome/dashboard/async_adapter.py +++ /dev/null @@ -1,31 +0,0 @@ -from __future__ import annotations - -import asyncio - - -class AsyncEvent: - """This is a shim around asyncio.Event.""" - - def __init__(self) -> None: - """Initialize the ThreadedAsyncEvent.""" - self.async_event: asyncio.Event | None = None - self.loop: asyncio.AbstractEventLoop | None = None - - def async_setup( - self, loop: asyncio.AbstractEventLoop, async_event: asyncio.Event - ) -> None: - """Set the asyncio.Event instance.""" - self.loop = loop - self.async_event = async_event - - def async_set(self) -> None: - """Set the asyncio.Event instance.""" - self.async_event.set() - - async def async_wait(self) -> None: - """Wait the event async.""" - await self.async_event.wait() - - def async_clear(self) -> None: - """Clear the event async.""" - self.async_event.clear() diff --git a/esphome/dashboard/core.py b/esphome/dashboard/core.py new file mode 100644 index 0000000000..4cc2938bb1 --- /dev/null +++ b/esphome/dashboard/core.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import asyncio +import logging +import threading +from typing import TYPE_CHECKING + +from ..zeroconf import DiscoveredImport +from .entries import DashboardEntry +from .settings import DashboardSettings + +if TYPE_CHECKING: + from .status.mdns import MDNSStatus + +_LOGGER = logging.getLogger(__name__) + + +def list_dashboard_entries() -> list[DashboardEntry]: + """List all dashboard entries.""" + return DASHBOARD.settings.entries() + + +class ESPHomeDashboard: + """Class that represents the dashboard.""" + + __slots__ = ( + "loop", + "ping_result", + "import_result", + "stop_event", + "ping_request", + "mqtt_ping_request", + "mdns_status", + "settings", + ) + + def __init__(self) -> None: + """Initialize the ESPHomeDashboard.""" + self.loop: asyncio.AbstractEventLoop | None = None + self.ping_result: dict[str, bool | None] = {} + self.import_result: dict[str, DiscoveredImport] = {} + self.stop_event = threading.Event() + self.ping_request: asyncio.Event | None = None + self.mqtt_ping_request = threading.Event() + self.mdns_status: MDNSStatus | None = None + self.settings: DashboardSettings = DashboardSettings() + + async def async_setup(self) -> None: + """Setup the dashboard.""" + self.loop = asyncio.get_running_loop() + self.ping_request = asyncio.Event() + + async def async_run(self) -> None: + """Run the dashboard.""" + settings = self.settings + mdns_task: asyncio.Task | None = None + ping_status_task: asyncio.Task | None = None + + if settings.status_use_ping: + from .status.ping import PingStatus + + ping_status = PingStatus() + ping_status_task = asyncio.create_task(ping_status.async_run()) + else: + from .status.mdns import MDNSStatus + + mdns_status = MDNSStatus() + await mdns_status.async_refresh_hosts() + self.mdns_status = mdns_status + mdns_task = asyncio.create_task(mdns_status.async_run()) + + if settings.status_use_mqtt: + from .status.mqtt import MqttStatusThread + + status_thread_mqtt = MqttStatusThread() + status_thread_mqtt.start() + + shutdown_event = asyncio.Event() + try: + await shutdown_event.wait() + finally: + _LOGGER.info("Shutting down...") + self.stop_event.set() + self.ping_request.set() + if ping_status_task: + ping_status_task.cancel() + if mdns_task: + mdns_task.cancel() + if settings.status_use_mqtt: + status_thread_mqtt.join() + self.mqtt_ping_request.set() + await asyncio.sleep(0) + + +DASHBOARD = ESPHomeDashboard() diff --git a/esphome/dashboard/dashboard.py b/esphome/dashboard/dashboard.py index c4fb8704e6..52f2701df4 100644 --- a/esphome/dashboard/dashboard.py +++ b/esphome/dashboard/dashboard.py @@ -2,12 +2,10 @@ from __future__ import annotations import asyncio import base64 -import binascii import datetime import functools import gzip import hashlib -import hmac import json import logging import os @@ -16,7 +14,7 @@ import shutil import subprocess import threading from pathlib import Path -from typing import Any, cast +from typing import Any import tornado import tornado.concurrent @@ -32,8 +30,7 @@ import tornado.websocket import yaml from tornado.log import access_log -from esphome import const, platformio_api, util, yaml_util -from esphome.core import CORE +from esphome import const, platformio_api, yaml_util from esphome.helpers import get_bool_env, mkdir_p, run_system_command from esphome.storage_json import ( EsphomeStorageJSON, @@ -43,158 +40,22 @@ from esphome.storage_json import ( trash_storage_path, ) from esphome.util import get_serial_ports, shlex_quote -from esphome.zeroconf import ( - ESPHOME_SERVICE_TYPE, - AsyncEsphomeZeroconf, - DashboardBrowser, - DashboardImportDiscovery, - DashboardStatus, -) -from .async_adapter import AsyncEvent -from .util import chunked, friendly_name_slugify, password_hash +from .core import DASHBOARD, list_dashboard_entries +from .entries import DashboardEntry +from .util import friendly_name_slugify _LOGGER = logging.getLogger(__name__) ENV_DEV = "ESPHOME_DASHBOARD_DEV" -class DashboardSettings: - def __init__(self): - self.config_dir = "" - self.password_hash = "" - self.username = "" - self.using_password = False - self.on_ha_addon = False - self.cookie_secret = None - self.absolute_config_dir = None - self._entry_cache: dict[ - str, tuple[tuple[int, int, float, int], DashboardEntry] - ] = {} - - def parse_args(self, args): - self.on_ha_addon = args.ha_addon - password = args.password or os.getenv("PASSWORD", "") - if not self.on_ha_addon: - self.username = args.username or os.getenv("USERNAME", "") - self.using_password = bool(password) - if self.using_password: - self.password_hash = password_hash(password) - self.config_dir = args.configuration - self.absolute_config_dir = Path(self.config_dir).resolve() - CORE.config_path = os.path.join(self.config_dir, ".") - - @property - def relative_url(self): - return os.getenv("ESPHOME_DASHBOARD_RELATIVE_URL", "/") - - @property - def status_use_ping(self): - return get_bool_env("ESPHOME_DASHBOARD_USE_PING") - - @property - def status_use_mqtt(self): - return get_bool_env("ESPHOME_DASHBOARD_USE_MQTT") - - @property - def using_ha_addon_auth(self): - if not self.on_ha_addon: - return False - return not get_bool_env("DISABLE_HA_AUTHENTICATION") - - @property - def using_auth(self): - return self.using_password or self.using_ha_addon_auth - - @property - def streamer_mode(self): - return get_bool_env("ESPHOME_STREAMER_MODE") - - def check_password(self, username, password): - if not self.using_auth: - return True - if username != self.username: - return False - - # Compare password in constant running time (to prevent timing attacks) - return hmac.compare_digest(self.password_hash, password_hash(password)) - - def rel_path(self, *args): - joined_path = os.path.join(self.config_dir, *args) - # Raises ValueError if not relative to ESPHome config folder - Path(joined_path).resolve().relative_to(self.absolute_config_dir) - return joined_path - - def list_yaml_files(self) -> list[str]: - return util.list_yaml_files([self.config_dir]) - - def entries(self) -> list[DashboardEntry]: - """Fetch all dashboard entries, thread-safe.""" - path_to_cache_key: dict[str, tuple[int, int, float, int]] = {} - # - # The cache key is (inode, device, mtime, size) - # which allows us to avoid locking since it ensures - # every iteration of this call will always return the newest - # items from disk at the cost of a stat() call on each - # file which is much faster than reading the file - # for the cache hit case which is the common case. - # - # Because there is no lock the cache may - # get built more than once but that's fine as its still - # thread-safe and results in orders of magnitude less - # reads from disk than if we did not cache at all and - # does not have a lock contention issue. - # - for file in self.list_yaml_files(): - try: - # Prefer the json storage path if it exists - stat = os.stat(ext_storage_path(os.path.basename(file))) - except OSError: - try: - # Fallback to the yaml file if the storage - # file does not exist or could not be generated - stat = os.stat(file) - except OSError: - # File was deleted, ignore - continue - path_to_cache_key[file] = ( - stat.st_ino, - stat.st_dev, - stat.st_mtime, - stat.st_size, - ) - - entry_cache = self._entry_cache - - # Remove entries that no longer exist - removed: list[str] = [] - for file in entry_cache: - if file not in path_to_cache_key: - removed.append(file) - - for file in removed: - entry_cache.pop(file) - - dashboard_entries: list[DashboardEntry] = [] - for file, cache_key in path_to_cache_key.items(): - if cached_entry := entry_cache.get(file): - entry_key, dashboard_entry = cached_entry - if entry_key == cache_key: - dashboard_entries.append(dashboard_entry) - continue - - dashboard_entry = DashboardEntry(file) - dashboard_entries.append(dashboard_entry) - entry_cache[file] = (cache_key, dashboard_entry) - - return dashboard_entries - - -settings = DashboardSettings() - cookie_authenticated_yes = b"yes" +settings = DASHBOARD.settings + + def template_args(): version = const.__version__ if "b" in version: @@ -412,12 +273,13 @@ class EsphomePortCommandWebSocket(EsphomeCommandWebSocket): self, args: list[str], json_message: dict[str, Any] ) -> list[str]: """Build the command to run.""" + dashboard = DASHBOARD configuration = json_message["configuration"] config_file = settings.rel_path(configuration) port = json_message["port"] if ( port == "OTA" - and (mdns := MDNS_CONTAINER.get_mdns()) + and (mdns := dashboard.mdns_status) and (host_name := mdns.filename_to_host_name_thread_safe(configuration)) and (address := await mdns.async_resolve_host(host_name)) ): @@ -458,7 +320,7 @@ class EsphomeRenameHandler(EsphomeCommandWebSocket): return # Remove the old ping result from the cache - PING_RESULT.pop(self.old_name, None) + DASHBOARD.ping_result.pop(self.old_name, None) class EsphomeUploadHandler(EsphomePortCommandWebSocket): @@ -575,6 +437,7 @@ class ImportRequestHandler(BaseHandler): def post(self): from esphome.components.dashboard_import import import_config + dashboard = DASHBOARD args = json.loads(self.request.body.decode()) try: name = args["name"] @@ -582,7 +445,12 @@ class ImportRequestHandler(BaseHandler): encryption = args.get("encryption", False) imported_device = next( - (res for res in IMPORT_RESULT.values() if res.device_name == name), None + ( + res + for res in dashboard.import_result.values() + if res.device_name == name + ), + None, ) if imported_device is not None: @@ -602,7 +470,7 @@ class ImportRequestHandler(BaseHandler): encryption, ) # Make sure the device gets marked online right away - PING_REQUEST.async_set() + dashboard.ping_request.set() except FileExistsError: self.set_status(500) self.write("File already exists") @@ -734,127 +602,15 @@ class EsphomeVersionHandler(BaseHandler): self.finish() -def _list_dashboard_entries() -> list[DashboardEntry]: - return settings.entries() - - -class DashboardEntry: - """Represents a single dashboard entry. - - This class is thread-safe and read-only. - """ - - __slots__ = ("path", "_storage", "_loaded_storage") - - def __init__(self, path: str) -> None: - """Initialize the DashboardEntry.""" - self.path = path - self._storage = None - self._loaded_storage = False - - def __repr__(self): - """Return the representation of this entry.""" - return ( - f"DashboardEntry({self.path} " - f"address={self.address} " - f"web_port={self.web_port} " - f"name={self.name} " - f"no_mdns={self.no_mdns})" - ) - - @property - def filename(self): - """Return the filename of this entry.""" - return os.path.basename(self.path) - - @property - def storage(self) -> StorageJSON | None: - """Return the StorageJSON object for this entry.""" - if not self._loaded_storage: - self._storage = StorageJSON.load(ext_storage_path(self.filename)) - self._loaded_storage = True - return self._storage - - @property - def address(self): - """Return the address of this entry.""" - if self.storage is None: - return None - return self.storage.address - - @property - def no_mdns(self): - """Return the no_mdns of this entry.""" - if self.storage is None: - return None - return self.storage.no_mdns - - @property - def web_port(self): - """Return the web port of this entry.""" - if self.storage is None: - return None - return self.storage.web_port - - @property - def name(self): - """Return the name of this entry.""" - if self.storage is None: - return self.filename.replace(".yml", "").replace(".yaml", "") - return self.storage.name - - @property - def friendly_name(self): - """Return the friendly name of this entry.""" - if self.storage is None: - return self.name - return self.storage.friendly_name - - @property - def comment(self): - """Return the comment of this entry.""" - if self.storage is None: - return None - return self.storage.comment - - @property - def target_platform(self): - """Return the target platform of this entry.""" - if self.storage is None: - return None - return self.storage.target_platform - - @property - def update_available(self): - """Return if an update is available for this entry.""" - if self.storage is None: - return True - return self.update_old != self.update_new - - @property - def update_old(self): - if self.storage is None: - return "" - return self.storage.esphome_version or "" - - @property - def update_new(self): - return const.__version__ - - @property - def loaded_integrations(self): - if self.storage is None: - return [] - return self.storage.loaded_integrations - - class ListDevicesHandler(BaseHandler): @authenticated async def get(self): loop = asyncio.get_running_loop() - entries = await loop.run_in_executor(None, _list_dashboard_entries) + entries = await loop.run_in_executor(None, list_dashboard_entries) self.set_header("content-type", "application/json") configured = {entry.name for entry in entries} + dashboard = DASHBOARD + self.write( json.dumps( { @@ -883,7 +639,7 @@ class ListDevicesHandler(BaseHandler): "project_version": res.project_version, "network": res.network, } - for res in IMPORT_RESULT.values() + for res in dashboard.import_result.values() if res.device_name not in configured ], } @@ -907,7 +663,7 @@ class MainRequestHandler(BaseHandler): class PrometheusServiceDiscoveryHandler(BaseHandler): @authenticated def get(self): - entries = _list_dashboard_entries() + entries = list_dashboard_entries() self.set_header("content-type", "application/json") sd = [] for entry in entries: @@ -967,207 +723,15 @@ class BoardsRequestHandler(BaseHandler): self.write(json.dumps(output)) -class MDNSStatus: - """Class that updates the mdns status.""" - - def __init__(self) -> None: - """Initialize the MDNSStatus class.""" - super().__init__() - self.aiozc: AsyncEsphomeZeroconf | None = None - # This is the current mdns state for each host (True, False, None) - self.host_mdns_state: dict[str, bool | None] = {} - # This is the hostnames to filenames mapping - self.host_name_to_filename: dict[str, str] = {} - self.filename_to_host_name: dict[str, str] = {} - # This is a set of host names to track (i.e no_mdns = false) - self.host_name_with_mdns_enabled: set[set] = set() - self._loop = asyncio.get_running_loop() - - def filename_to_host_name_thread_safe(self, filename: str) -> str | None: - """Resolve a filename to an address in a thread-safe manner.""" - return self.filename_to_host_name.get(filename) - - async def async_resolve_host(self, host_name: str) -> str | None: - """Resolve a host name to an address in a thread-safe manner.""" - if aiozc := self.aiozc: - return await aiozc.async_resolve_host(host_name) - return None - - async def async_refresh_hosts(self): - """Refresh the hosts to track.""" - entries = await self._loop.run_in_executor(None, _list_dashboard_entries) - host_name_with_mdns_enabled = self.host_name_with_mdns_enabled - host_mdns_state = self.host_mdns_state - host_name_to_filename = self.host_name_to_filename - filename_to_host_name = self.filename_to_host_name - - for entry in entries: - name = entry.name - # If no_mdns is set, remove it from the set - if entry.no_mdns: - host_name_with_mdns_enabled.discard(name) - continue - - # We are tracking this host - host_name_with_mdns_enabled.add(name) - filename = entry.filename - - # If we just adopted/imported this host, we likely - # already have a state for it, so we should make sure - # to set it so the dashboard shows it as online - if name in host_mdns_state: - PING_RESULT[filename] = host_mdns_state[name] - - # Make sure the mapping is up to date - # so when we get an mdns update we can map it back - # to the filename - host_name_to_filename[name] = filename - filename_to_host_name[filename] = name - - async def async_run(self) -> None: - global IMPORT_RESULT - - aiozc = AsyncEsphomeZeroconf() - self.aiozc = aiozc - host_mdns_state = self.host_mdns_state - host_name_to_filename = self.host_name_to_filename - host_name_with_mdns_enabled = self.host_name_with_mdns_enabled - - def on_update(dat: dict[str, bool | None]) -> None: - """Update the global PING_RESULT dict.""" - for name, result in dat.items(): - host_mdns_state[name] = result - if name in host_name_with_mdns_enabled: - filename = host_name_to_filename[name] - PING_RESULT[filename] = result - - stat = DashboardStatus(on_update) - imports = DashboardImportDiscovery() - browser = DashboardBrowser( - aiozc.zeroconf, - ESPHOME_SERVICE_TYPE, - [stat.browser_callback, imports.browser_callback], - ) - - while not STOP_EVENT.is_set(): - await self.async_refresh_hosts() - IMPORT_RESULT = imports.import_state - await PING_REQUEST.async_wait() - PING_REQUEST.async_clear() - - await browser.async_cancel() - await aiozc.async_close() - self.aiozc = None - - -async def _async_ping_host(host: str) -> bool: - """Ping a host.""" - ping_command = ["ping", "-n" if os.name == "nt" else "-c", "1"] - process = await asyncio.create_subprocess_exec( - *ping_command, - host, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, - close_fds=False, # Required for posix_spawn - ) - await process.wait() - return process.returncode == 0 - - -class PingStatus: - def __init__(self) -> None: - """Initialize the PingStatus class.""" - super().__init__() - self._loop = asyncio.get_running_loop() - - async def async_run(self) -> None: - """Run the ping status.""" - while not STOP_EVENT.is_set(): - # Only ping if the dashboard is open - await PING_REQUEST.async_wait() - PING_REQUEST.async_clear() - entries = await self._loop.run_in_executor(None, _list_dashboard_entries) - to_ping: list[DashboardEntry] = [ - entry for entry in entries if entry.address is not None - ] - for ping_group in chunked(to_ping, 16): - ping_group = cast(list[DashboardEntry], ping_group) - results = await asyncio.gather( - *(_async_ping_host(entry.address) for entry in ping_group), - return_exceptions=True, - ) - for entry, result in zip(ping_group, results): - if isinstance(result, Exception): - result = False - elif isinstance(result, BaseException): - raise result - PING_RESULT[entry.filename] = result - - -class MqttStatusThread(threading.Thread): - def run(self): - from esphome import mqtt - - entries = _list_dashboard_entries() - - config = mqtt.config_from_env() - topic = "esphome/discover/#" - - def on_message(client, userdata, msg): - nonlocal entries - - payload = msg.payload.decode(errors="backslashreplace") - if len(payload) > 0: - data = json.loads(payload) - if "name" not in data: - return - for entry in entries: - if entry.name == data["name"]: - PING_RESULT[entry.filename] = True - return - - def on_connect(client, userdata, flags, return_code): - client.publish("esphome/discover", None, retain=False) - - mqttid = str(binascii.hexlify(os.urandom(6)).decode()) - - client = mqtt.prepare( - config, - [topic], - on_message, - on_connect, - None, - None, - f"esphome-dashboard-{mqttid}", - ) - client.loop_start() - - while not STOP_EVENT.wait(2): - # update entries - entries = _list_dashboard_entries() - - # will be set to true on on_message - for entry in entries: - if entry.no_mdns: - PING_RESULT[entry.filename] = False - - client.publish("esphome/discover", None, retain=False) - MQTT_PING_REQUEST.wait() - MQTT_PING_REQUEST.clear() - - client.disconnect() - client.loop_stop() - - class PingRequestHandler(BaseHandler): @authenticated def get(self): - PING_REQUEST.async_set() + dashboard = DASHBOARD + dashboard.ping_request.set() if settings.status_use_mqtt: - MQTT_PING_REQUEST.set() + dashboard.mqtt_ping_request.set() self.set_header("content-type", "application/json") - self.write(json.dumps(PING_RESULT)) + self.write(json.dumps(dashboard.ping_result)) class InfoRequestHandler(BaseHandler): @@ -1224,7 +788,7 @@ class DeleteRequestHandler(BaseHandler): shutil.rmtree(build_folder, os.path.join(trash_path, name)) # Remove the old ping result from the cache - PING_RESULT.pop(configuration, None) + DASHBOARD.ping_result.pop(configuration, None) class UndoDeleteRequestHandler(BaseHandler): @@ -1236,28 +800,6 @@ class UndoDeleteRequestHandler(BaseHandler): shutil.move(os.path.join(trash_path, configuration), config_file) -class MDNSContainer: - def __init__(self) -> None: - """Initialize the MDNSContainer.""" - self._mdns: MDNSStatus | None = None - - def set_mdns(self, mdns: MDNSStatus) -> None: - """Set the MDNSStatus instance.""" - self._mdns = mdns - - def get_mdns(self) -> MDNSStatus | None: - """Return the MDNSStatus instance.""" - return self._mdns - - -PING_RESULT: dict = {} -IMPORT_RESULT = {} -STOP_EVENT = threading.Event() -PING_REQUEST = AsyncEvent() -MQTT_PING_REQUEST = threading.Event() -MDNS_CONTAINER = MDNSContainer() - - class LoginHandler(BaseHandler): def get(self): if is_authenticated(self): @@ -1519,14 +1061,14 @@ def start_web_server(args): settings.cookie_secret = storage.cookie_secret try: - asyncio.run(async_start_web_server(args)) + asyncio.run(async_start(args)) except KeyboardInterrupt: pass -async def async_start_web_server(args): - loop = asyncio.get_event_loop() - PING_REQUEST.async_setup(loop, asyncio.Event()) +async def async_start(args) -> None: + dashboard = DASHBOARD + await dashboard.async_setup() app = make_app(args.verbose) if args.socket is not None: @@ -1552,36 +1094,8 @@ async def async_start_web_server(args): webbrowser.open(f"http://{args.address}:{args.port}") - mdns_task: asyncio.Task | None = None - ping_status_task: asyncio.Task | None = None - if settings.status_use_ping: - ping_status = PingStatus() - ping_status_task = asyncio.create_task(ping_status.async_run()) - else: - mdns_status = MDNSStatus() - await mdns_status.async_refresh_hosts() - MDNS_CONTAINER.set_mdns(mdns_status) - mdns_task = asyncio.create_task(mdns_status.async_run()) - - if settings.status_use_mqtt: - status_thread_mqtt = MqttStatusThread() - status_thread_mqtt.start() - - shutdown_event = asyncio.Event() try: - await shutdown_event.wait() + await dashboard.async_run() finally: - _LOGGER.info("Shutting down...") - STOP_EVENT.set() - PING_REQUEST.async_set() - if ping_status_task: - ping_status_task.cancel() - MDNS_CONTAINER.set_mdns(None) - if mdns_task: - mdns_task.cancel() - if settings.status_use_mqtt: - status_thread_mqtt.join() - MQTT_PING_REQUEST.set() if args.socket is not None: os.remove(args.socket) - await asyncio.sleep(0) diff --git a/esphome/dashboard/entries.py b/esphome/dashboard/entries.py new file mode 100644 index 0000000000..582073d655 --- /dev/null +++ b/esphome/dashboard/entries.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import os + +from esphome import const +from esphome.storage_json import StorageJSON, ext_storage_path + + +class DashboardEntry: + """Represents a single dashboard entry. + + This class is thread-safe and read-only. + """ + + __slots__ = ("path", "_storage", "_loaded_storage") + + def __init__(self, path: str) -> None: + """Initialize the DashboardEntry.""" + self.path = path + self._storage = None + self._loaded_storage = False + + def __repr__(self): + """Return the representation of this entry.""" + return ( + f"DashboardEntry({self.path} " + f"address={self.address} " + f"web_port={self.web_port} " + f"name={self.name} " + f"no_mdns={self.no_mdns})" + ) + + @property + def filename(self): + """Return the filename of this entry.""" + return os.path.basename(self.path) + + @property + def storage(self) -> StorageJSON | None: + """Return the StorageJSON object for this entry.""" + if not self._loaded_storage: + self._storage = StorageJSON.load(ext_storage_path(self.filename)) + self._loaded_storage = True + return self._storage + + @property + def address(self): + """Return the address of this entry.""" + if self.storage is None: + return None + return self.storage.address + + @property + def no_mdns(self): + """Return the no_mdns of this entry.""" + if self.storage is None: + return None + return self.storage.no_mdns + + @property + def web_port(self): + """Return the web port of this entry.""" + if self.storage is None: + return None + return self.storage.web_port + + @property + def name(self): + """Return the name of this entry.""" + if self.storage is None: + return self.filename.replace(".yml", "").replace(".yaml", "") + return self.storage.name + + @property + def friendly_name(self): + """Return the friendly name of this entry.""" + if self.storage is None: + return self.name + return self.storage.friendly_name + + @property + def comment(self): + """Return the comment of this entry.""" + if self.storage is None: + return None + return self.storage.comment + + @property + def target_platform(self): + """Return the target platform of this entry.""" + if self.storage is None: + return None + return self.storage.target_platform + + @property + def update_available(self): + """Return if an update is available for this entry.""" + if self.storage is None: + return True + return self.update_old != self.update_new + + @property + def update_old(self): + if self.storage is None: + return "" + return self.storage.esphome_version or "" + + @property + def update_new(self): + return const.__version__ + + @property + def loaded_integrations(self): + if self.storage is None: + return [] + return self.storage.loaded_integrations diff --git a/esphome/dashboard/settings.py b/esphome/dashboard/settings.py new file mode 100644 index 0000000000..3409938e0a --- /dev/null +++ b/esphome/dashboard/settings.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import hmac +import os +from pathlib import Path + +from esphome import util +from esphome.core import CORE +from esphome.helpers import get_bool_env +from esphome.storage_json import ext_storage_path + +from .entries import DashboardEntry +from .util import password_hash + + +class DashboardSettings: + """Settings for the dashboard.""" + + def __init__(self): + self.config_dir = "" + self.password_hash = "" + self.username = "" + self.using_password = False + self.on_ha_addon = False + self.cookie_secret = None + self.absolute_config_dir = None + self._entry_cache: dict[ + str, tuple[tuple[int, int, float, int], DashboardEntry] + ] = {} + + def parse_args(self, args): + self.on_ha_addon = args.ha_addon + password = args.password or os.getenv("PASSWORD", "") + if not self.on_ha_addon: + self.username = args.username or os.getenv("USERNAME", "") + self.using_password = bool(password) + if self.using_password: + self.password_hash = password_hash(password) + self.config_dir = args.configuration + self.absolute_config_dir = Path(self.config_dir).resolve() + CORE.config_path = os.path.join(self.config_dir, ".") + + @property + def relative_url(self): + return os.getenv("ESPHOME_DASHBOARD_RELATIVE_URL", "/") + + @property + def status_use_ping(self): + return get_bool_env("ESPHOME_DASHBOARD_USE_PING") + + @property + def status_use_mqtt(self): + return get_bool_env("ESPHOME_DASHBOARD_USE_MQTT") + + @property + def using_ha_addon_auth(self): + if not self.on_ha_addon: + return False + return not get_bool_env("DISABLE_HA_AUTHENTICATION") + + @property + def using_auth(self): + return self.using_password or self.using_ha_addon_auth + + @property + def streamer_mode(self): + return get_bool_env("ESPHOME_STREAMER_MODE") + + def check_password(self, username, password): + if not self.using_auth: + return True + if username != self.username: + return False + + # Compare password in constant running time (to prevent timing attacks) + return hmac.compare_digest(self.password_hash, password_hash(password)) + + def rel_path(self, *args): + joined_path = os.path.join(self.config_dir, *args) + # Raises ValueError if not relative to ESPHome config folder + Path(joined_path).resolve().relative_to(self.absolute_config_dir) + return joined_path + + def list_yaml_files(self) -> list[str]: + return util.list_yaml_files([self.config_dir]) + + def entries(self) -> list[DashboardEntry]: + """Fetch all dashboard entries, thread-safe.""" + path_to_cache_key: dict[str, tuple[int, int, float, int]] = {} + # + # The cache key is (inode, device, mtime, size) + # which allows us to avoid locking since it ensures + # every iteration of this call will always return the newest + # items from disk at the cost of a stat() call on each + # file which is much faster than reading the file + # for the cache hit case which is the common case. + # + # Because there is no lock the cache may + # get built more than once but that's fine as its still + # thread-safe and results in orders of magnitude less + # reads from disk than if we did not cache at all and + # does not have a lock contention issue. + # + for file in self.list_yaml_files(): + try: + # Prefer the json storage path if it exists + stat = os.stat(ext_storage_path(os.path.basename(file))) + except OSError: + try: + # Fallback to the yaml file if the storage + # file does not exist or could not be generated + stat = os.stat(file) + except OSError: + # File was deleted, ignore + continue + path_to_cache_key[file] = ( + stat.st_ino, + stat.st_dev, + stat.st_mtime, + stat.st_size, + ) + + entry_cache = self._entry_cache + + # Remove entries that no longer exist + removed: list[str] = [] + for file in entry_cache: + if file not in path_to_cache_key: + removed.append(file) + + for file in removed: + entry_cache.pop(file) + + dashboard_entries: list[DashboardEntry] = [] + for file, cache_key in path_to_cache_key.items(): + if cached_entry := entry_cache.get(file): + entry_key, dashboard_entry = cached_entry + if entry_key == cache_key: + dashboard_entries.append(dashboard_entry) + continue + + dashboard_entry = DashboardEntry(file) + dashboard_entries.append(dashboard_entry) + entry_cache[file] = (cache_key, dashboard_entry) + + return dashboard_entries diff --git a/esphome/dashboard/status/__init__.py b/esphome/dashboard/status/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/esphome/dashboard/status/mdns.py b/esphome/dashboard/status/mdns.py new file mode 100644 index 0000000000..454bba9cb5 --- /dev/null +++ b/esphome/dashboard/status/mdns.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import asyncio + +from esphome.zeroconf import ( + ESPHOME_SERVICE_TYPE, + AsyncEsphomeZeroconf, + DashboardBrowser, + DashboardImportDiscovery, + DashboardStatus, +) + +from ..core import DASHBOARD, list_dashboard_entries + + +class MDNSStatus: + """Class that updates the mdns status.""" + + def __init__(self) -> None: + """Initialize the MDNSStatus class.""" + super().__init__() + self.aiozc: AsyncEsphomeZeroconf | None = None + # This is the current mdns state for each host (True, False, None) + self.host_mdns_state: dict[str, bool | None] = {} + # This is the hostnames to filenames mapping + self.host_name_to_filename: dict[str, str] = {} + self.filename_to_host_name: dict[str, str] = {} + # This is a set of host names to track (i.e no_mdns = false) + self.host_name_with_mdns_enabled: set[set] = set() + self._loop = asyncio.get_running_loop() + + def filename_to_host_name_thread_safe(self, filename: str) -> str | None: + """Resolve a filename to an address in a thread-safe manner.""" + return self.filename_to_host_name.get(filename) + + async def async_resolve_host(self, host_name: str) -> str | None: + """Resolve a host name to an address in a thread-safe manner.""" + if aiozc := self.aiozc: + return await aiozc.async_resolve_host(host_name) + return None + + async def async_refresh_hosts(self): + """Refresh the hosts to track.""" + entries = await self._loop.run_in_executor(None, list_dashboard_entries) + host_name_with_mdns_enabled = self.host_name_with_mdns_enabled + host_mdns_state = self.host_mdns_state + host_name_to_filename = self.host_name_to_filename + filename_to_host_name = self.filename_to_host_name + ping_result = DASHBOARD.ping_result + + for entry in entries: + name = entry.name + # If no_mdns is set, remove it from the set + if entry.no_mdns: + host_name_with_mdns_enabled.discard(name) + continue + + # We are tracking this host + host_name_with_mdns_enabled.add(name) + filename = entry.filename + + # If we just adopted/imported this host, we likely + # already have a state for it, so we should make sure + # to set it so the dashboard shows it as online + if name in host_mdns_state: + ping_result[filename] = host_mdns_state[name] + + # Make sure the mapping is up to date + # so when we get an mdns update we can map it back + # to the filename + host_name_to_filename[name] = filename + filename_to_host_name[filename] = name + + async def async_run(self) -> None: + dashboard = DASHBOARD + + aiozc = AsyncEsphomeZeroconf() + self.aiozc = aiozc + host_mdns_state = self.host_mdns_state + host_name_to_filename = self.host_name_to_filename + host_name_with_mdns_enabled = self.host_name_with_mdns_enabled + ping_result = dashboard.ping_result + + def on_update(dat: dict[str, bool | None]) -> None: + """Update the global PING_RESULT dict.""" + for name, result in dat.items(): + host_mdns_state[name] = result + if name in host_name_with_mdns_enabled: + filename = host_name_to_filename[name] + ping_result[filename] = result + + stat = DashboardStatus(on_update) + imports = DashboardImportDiscovery() + dashboard.import_result = imports.import_state + + browser = DashboardBrowser( + aiozc.zeroconf, + ESPHOME_SERVICE_TYPE, + [stat.browser_callback, imports.browser_callback], + ) + + while not dashboard.stop_event.is_set(): + await self.async_refresh_hosts() + await dashboard.ping_request.wait() + dashboard.ping_request.clear() + + await browser.async_cancel() + await aiozc.async_close() + self.aiozc = None diff --git a/esphome/dashboard/status/mqtt.py b/esphome/dashboard/status/mqtt.py new file mode 100644 index 0000000000..109b60e133 --- /dev/null +++ b/esphome/dashboard/status/mqtt.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import binascii +import json +import os +import threading + +from esphome import mqtt + +from ..core import DASHBOARD, list_dashboard_entries + + +class MqttStatusThread(threading.Thread): + """Status thread to get the status of the devices via MQTT.""" + + def run(self) -> None: + """Run the status thread.""" + dashboard = DASHBOARD + entries = list_dashboard_entries() + + config = mqtt.config_from_env() + topic = "esphome/discover/#" + + def on_message(client, userdata, msg): + nonlocal entries + + payload = msg.payload.decode(errors="backslashreplace") + if len(payload) > 0: + data = json.loads(payload) + if "name" not in data: + return + for entry in entries: + if entry.name == data["name"]: + dashboard.ping_result[entry.filename] = True + return + + def on_connect(client, userdata, flags, return_code): + client.publish("esphome/discover", None, retain=False) + + mqttid = str(binascii.hexlify(os.urandom(6)).decode()) + + client = mqtt.prepare( + config, + [topic], + on_message, + on_connect, + None, + None, + f"esphome-dashboard-{mqttid}", + ) + client.loop_start() + + while not dashboard.stop_event.wait(2): + # update entries + entries = list_dashboard_entries() + + # will be set to true on on_message + for entry in entries: + if entry.no_mdns: + dashboard.ping_result[entry.filename] = False + + client.publish("esphome/discover", None, retain=False) + dashboard.mqtt_ping_request.wait() + dashboard.mqtt_ping_request.clear() + + client.disconnect() + client.loop_stop() diff --git a/esphome/dashboard/status/ping.py b/esphome/dashboard/status/ping.py new file mode 100644 index 0000000000..17c1254c9d --- /dev/null +++ b/esphome/dashboard/status/ping.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import asyncio +import os +from typing import cast + +from ..core import DASHBOARD +from ..entries import DashboardEntry +from ..core import list_dashboard_entries +from ..util import chunked + + +async def _async_ping_host(host: str) -> bool: + """Ping a host.""" + ping_command = ["ping", "-n" if os.name == "nt" else "-c", "1"] + process = await asyncio.create_subprocess_exec( + *ping_command, + host, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + close_fds=False, + ) + await process.wait() + return process.returncode == 0 + + +class PingStatus: + def __init__(self) -> None: + """Initialize the PingStatus class.""" + super().__init__() + self._loop = asyncio.get_running_loop() + + async def async_run(self) -> None: + """Run the ping status.""" + dashboard = DASHBOARD + + while not dashboard.stop_event.is_set(): + # Only ping if the dashboard is open + await dashboard.ping_request.wait() + dashboard.ping_result.clear() + entries = await self._loop.run_in_executor(None, list_dashboard_entries) + to_ping: list[DashboardEntry] = [ + entry for entry in entries if entry.address is not None + ] + for ping_group in chunked(to_ping, 16): + ping_group = cast(list[DashboardEntry], ping_group) + results = await asyncio.gather( + *(_async_ping_host(entry.address) for entry in ping_group), + return_exceptions=True, + ) + for entry, result in zip(ping_group, results): + if isinstance(result, Exception): + result = False + elif isinstance(result, BaseException): + raise result + dashboard.ping_result[entry.filename] = result