diff --git a/esphome/config.py b/esphome/config.py index f5a1ebb8d7..c5764dd4f2 100644 --- a/esphome/config.py +++ b/esphome/config.py @@ -1,10 +1,11 @@ +from __future__ import annotations import abc import functools import heapq import logging import re -from typing import Optional, Union +from typing import Union, Any from contextlib import contextmanager import contextvars @@ -76,7 +77,7 @@ def _path_begins_with(path: ConfigPath, other: ConfigPath) -> bool: @functools.total_ordering class _ValidationStepTask: - def __init__(self, priority: float, id_number: int, step: "ConfigValidationStep"): + def __init__(self, priority: float, id_number: int, step: ConfigValidationStep): self.priority = priority self.id_number = id_number self.step = step @@ -130,7 +131,7 @@ class Config(OrderedDict, fv.FinalValidateConfig): ) self.errors.append(error) - def add_validation_step(self, step: "ConfigValidationStep"): + def add_validation_step(self, step: ConfigValidationStep): id_num = self._validation_tasks_id self._validation_tasks_id += 1 heapq.heappush( @@ -172,7 +173,7 @@ class Config(OrderedDict, fv.FinalValidateConfig): conf = conf[key] conf[path[-1]] = value - def get_error_for_path(self, path: ConfigPath) -> Optional[vol.Invalid]: + def get_error_for_path(self, path: ConfigPath) -> vol.Invalid | None: for err in self.errors: if self.get_deepest_path(err.path) == path: self.errors.remove(err) @@ -181,7 +182,7 @@ class Config(OrderedDict, fv.FinalValidateConfig): def get_deepest_document_range_for_path( self, path: ConfigPath, get_key: bool = False - ) -> Optional[ESPHomeDataBase]: + ) -> ESPHomeDataBase | None: data = self doc_range = None for index, path_item in enumerate(path): @@ -733,7 +734,9 @@ class PinUseValidationCheck(ConfigValidationStep): pins.PIN_SCHEMA_REGISTRY.final_validate(result) -def validate_config(config, command_line_substitutions) -> Config: +def validate_config( + config: dict[str, Any], command_line_substitutions: dict[str, Any] +) -> Config: result = Config() loader.clear_component_meta_finders() @@ -897,24 +900,23 @@ class InvalidYAMLError(EsphomeError): self.base_exc = base_exc -def _load_config(command_line_substitutions): +def _load_config(command_line_substitutions: dict[str, Any]) -> Config: + """Load the configuration file.""" try: config = yaml_util.load_yaml(CORE.config_path) except EsphomeError as e: raise InvalidYAMLError(e) from e try: - result = validate_config(config, command_line_substitutions) + return validate_config(config, command_line_substitutions) except EsphomeError: raise except Exception: _LOGGER.error("Unexpected exception while reading configuration:") raise - return result - -def load_config(command_line_substitutions): +def load_config(command_line_substitutions: dict[str, Any]) -> Config: try: return _load_config(command_line_substitutions) except vol.Invalid as err: diff --git a/esphome/config_helpers.py b/esphome/config_helpers.py index ac52c6ede2..7b47e097c8 100644 --- a/esphome/config_helpers.py +++ b/esphome/config_helpers.py @@ -1,9 +1,4 @@ -import json -import os - from esphome.const import CONF_ID -from esphome.core import CORE -from esphome.helpers import read_file class Extend: @@ -38,25 +33,6 @@ class Remove: return isinstance(b, Remove) and self.value == b.value -def read_config_file(path: str) -> str: - if CORE.vscode and ( - not CORE.ace or os.path.abspath(path) == os.path.abspath(CORE.config_path) - ): - print( - json.dumps( - { - "type": "read_file", - "path": path, - } - ) - ) - data = json.loads(input()) - assert data["type"] == "file_response" - return data["content"] - - return read_file(path) - - def merge_config(full_old, full_new): def merge(old, new): if isinstance(new, dict): diff --git a/esphome/vscode.py b/esphome/vscode.py index cb2f51976f..8198d2659a 100644 --- a/esphome/vscode.py +++ b/esphome/vscode.py @@ -1,20 +1,22 @@ +from __future__ import annotations import json import os +from io import StringIO +from typing import Any -from typing import Optional - -from esphome.config import load_config, _format_vol_invalid, Config +from esphome.yaml_util import parse_yaml +from esphome.config import validate_config, _format_vol_invalid, Config from esphome.core import CORE, DocumentRange import esphome.config_validation as cv -def _get_invalid_range(res: Config, invalid: cv.Invalid) -> Optional[DocumentRange]: +def _get_invalid_range(res: Config, invalid: cv.Invalid) -> DocumentRange | None: return res.get_deepest_document_range_for_path( invalid.path, invalid.error_message == "extra keys not allowed" ) -def _dump_range(range: Optional[DocumentRange]) -> Optional[dict]: +def _dump_range(range: DocumentRange | None) -> dict | None: if range is None: return None return { @@ -56,6 +58,25 @@ class VSCodeResult: ) +def _read_file_content_from_json_on_stdin() -> str: + """Read the content of a json encoded file from stdin.""" + data = json.loads(input()) + assert data["type"] == "file_response" + return data["content"] + + +def _print_file_read_event(path: str) -> None: + """Print a file read event.""" + print( + json.dumps( + { + "type": "read_file", + "path": path, + } + ) + ) + + def read_config(args): while True: CORE.reset() @@ -68,9 +89,17 @@ def read_config(args): CORE.config_path = os.path.join(args.configuration, f) else: CORE.config_path = data["file"] + + file_name = CORE.config_path + _print_file_read_event(file_name) + raw_yaml = _read_file_content_from_json_on_stdin() + command_line_substitutions: dict[str, Any] = ( + dict(args.substitution) if args.substitution else {} + ) vs = VSCodeResult() try: - res = load_config(dict(args.substitution) if args.substitution else {}) + config = parse_yaml(file_name, StringIO(raw_yaml)) + res = validate_config(config, command_line_substitutions) except Exception as err: # pylint: disable=broad-except vs.add_yaml_error(str(err)) else: diff --git a/esphome/yaml_util.py b/esphome/yaml_util.py index 60705082b6..c7aa78201f 100644 --- a/esphome/yaml_util.py +++ b/esphome/yaml_util.py @@ -417,20 +417,25 @@ def load_yaml(fname: str, clear_secrets: bool = True) -> Any: return _load_yaml_internal(fname) +def parse_yaml(file_name: str, file_handle: TextIOWrapper) -> Any: + """Parse a YAML file.""" + try: + return _load_yaml_internal_with_type(ESPHomeLoader, file_name, file_handle) + except EsphomeError: + # Loading failed, so we now load with the Python loader which has more + # readable exceptions + # Rewind the stream so we can try again + file_handle.seek(0, 0) + return _load_yaml_internal_with_type( + ESPHomePurePythonLoader, file_name, file_handle + ) + + def _load_yaml_internal(fname: str) -> Any: """Load a YAML file.""" try: with open(fname, encoding="utf-8") as f_handle: - try: - return _load_yaml_internal_with_type(ESPHomeLoader, fname, f_handle) - except EsphomeError: - # Loading failed, so we now load with the Python loader which has more - # readable exceptions - # Rewind the stream so we can try again - f_handle.seek(0, 0) - return _load_yaml_internal_with_type( - ESPHomePurePythonLoader, fname, f_handle - ) + return parse_yaml(fname, f_handle) except (UnicodeDecodeError, OSError) as err: raise EsphomeError(f"Error reading file {fname}: {err}") from err