diff --git a/esphome/dashboard/web_server.py b/esphome/dashboard/web_server.py index 9bbf0b28dc..4552aebf7b 100644 --- a/esphome/dashboard/web_server.py +++ b/esphome/dashboard/web_server.py @@ -13,9 +13,9 @@ import secrets import shutil import subprocess import threading -from pathlib import Path -from typing import Any, Callable, TypeVar from collections.abc import Iterable +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, TypeVar import tornado import tornado.concurrent @@ -45,12 +45,17 @@ from .util.file import write_file from .util.subprocess import async_run_system_command from .util.text import friendly_name_slugify +if TYPE_CHECKING: + from requests import Response + + _LOGGER = logging.getLogger(__name__) ENV_DEV = "ESPHOME_DASHBOARD_DEV" +COOKIE_AUTHENTICATED_YES = b"yes" -cookie_authenticated_yes = b"yes" +AUTH_COOKIE_NAME = "authenticated" settings = DASHBOARD.settings @@ -89,18 +94,18 @@ def authenticated(func: T) -> T: return decorator -def is_authenticated(request_handler): +def is_authenticated(handler: BaseHandler) -> bool: + """Check if the request is authenticated.""" if settings.on_ha_addon: # Handle ingress - disable auth on ingress port # X-HA-Ingress is automatically stripped on the non-ingress server in nginx - header = request_handler.request.headers.get("X-HA-Ingress", "NO") + header = handler.request.headers.get("X-HA-Ingress", "NO") if str(header) == "YES": return True + if settings.using_auth: - return ( - request_handler.get_secure_cookie("authenticated") - == cookie_authenticated_yes - ) + return handler.get_secure_cookie(AUTH_COOKIE_NAME) == COOKIE_AUTHENTICATED_YES + return True @@ -860,38 +865,45 @@ class LoginHandler(BaseHandler): **template_args(), ) - def post_ha_addon_login(self) -> None: + def _make_supervisor_auth_request(self) -> Response: + """Make a request to the supervisor auth endpoint.""" import requests - headers = { - "X-Supervisor-Token": os.getenv("SUPERVISOR_TOKEN"), - } - + headers = {"X-Supervisor-Token": os.getenv("SUPERVISOR_TOKEN")} data = { "username": self.get_argument("username", ""), "password": self.get_argument("password", ""), } + return requests.post( + "http://supervisor/auth", headers=headers, json=data, timeout=30 + ) + + async def post_ha_addon_login(self) -> None: + loop = asyncio.get_running_loop() try: - req = requests.post( - "http://supervisor/auth", headers=headers, json=data, timeout=30 - ) - if req.status_code == 200: - self.set_secure_cookie("authenticated", cookie_authenticated_yes) - self.redirect("/") - return + req = await loop.run_in_executor(None, self._make_supervisor_auth_request) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Error during Hass.io auth request: %s", err) self.set_status(500) self.render_login_page(error="Internal server error") return + + if req.status_code == 200: + self._set_authenticated() + self.redirect("/") + return self.set_status(401) self.render_login_page(error="Invalid username or password") + def _set_authenticated(self) -> None: + """Set the authenticated cookie.""" + self.set_secure_cookie(AUTH_COOKIE_NAME, COOKIE_AUTHENTICATED_YES) + def post_native_login(self) -> None: username = self.get_argument("username", "") password = self.get_argument("password", "") if settings.check_password(username, password): - self.set_secure_cookie("authenticated", cookie_authenticated_yes) + self._set_authenticated() self.redirect("./") return error_str = ( @@ -900,9 +912,9 @@ class LoginHandler(BaseHandler): self.set_status(401) self.render_login_page(error=error_str) - def post(self) -> None: + async def post(self): if settings.using_ha_addon_auth: - self.post_ha_addon_login() + await self.post_ha_addon_login() else: self.post_native_login() @@ -910,7 +922,7 @@ class LoginHandler(BaseHandler): class LogoutHandler(BaseHandler): @authenticated def get(self) -> None: - self.clear_cookie("authenticated") + self.clear_cookie(AUTH_COOKIE_NAME) self.redirect("./login")