diff --git a/esphome/__main__.py b/esphome/__main__.py index e1ed2240af..bee5a73542 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -6,7 +6,6 @@ import os import re import sys import time -import asyncio from datetime import datetime import argcomplete @@ -49,13 +48,6 @@ from esphome.util import ( get_serial_ports, ) from esphome.log import color, setup_log, Fore -from .zephyr_tools import ( - logger_scan, - logger_connect, - smpmgr_scan, - smpmgr_upload, - is_mac_address, -) _LOGGER = logging.getLogger(__name__) @@ -95,59 +87,19 @@ def choose_prompt(options, purpose: str = None): def choose_upload_log_host( default, check_default, show_ota, show_mqtt, show_api, purpose: str = None ): - try: - mcuboot = CORE.config["nrf52"]["bootloader"] == "mcuboot" - except KeyError: - mcuboot = False - try: - ble_logger = CORE.config["zephyr_ble_nus"]["log"] - except KeyError: - ble_logger = False - ota = "ota" in CORE.config options = [] - prefix = "" - if mcuboot and show_ota and ota: - prefix = "mcumgr " for port in get_serial_ports(): - options.append( - (f"{prefix}{port.path} ({port.description})", f"{prefix}{port.path}") - ) + options.append((f"{port.path} ({port.description})", port.path)) if default == "SERIAL": return choose_prompt(options, purpose=purpose) - if default == "PYOCD": - if not mcuboot: - raise EsphomeError("PYOCD for adafruit is not implemented") - options = [("pyocd", "PYOCD")] - return choose_prompt(options, purpose=purpose) - if not mcuboot: - if (show_ota and ota) or (show_api and "api" in CORE.config): - options.append((f"Over The Air ({CORE.address})", CORE.address)) - if default == "OTA": - return CORE.address - elif show_ota and ota: - if default: - options.append((f"OTA over Bluetooth LE ({default})", f"mcumgr {default}")) - return choose_prompt(options, purpose=purpose) - ble_devices = asyncio.run(smpmgr_scan(CORE.config["esphome"]["name"])) - if len(ble_devices) == 0: - _LOGGER.warning("No OTA over Bluetooth LE service found!") - for device in ble_devices: - options.append( - ( - f"OTA over Bluetooth LE({device.address}) {device.name}", - f"mcumgr {device.address}", - ) - ) + if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config): + options.append((f"Over The Air ({CORE.address})", CORE.address)) + if default == "OTA": + return CORE.address if show_mqtt and CONF_MQTT in CORE.config: options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT")) if default == "OTA": return "MQTT" - if "logging" == purpose and ble_logger and default is None: - ble_device = asyncio.run(logger_scan(CORE.config["esphome"]["name"])) - if ble_device: - options.append((f"Bluetooth LE logger ({ble_device})", ble_device.address)) - else: - _LOGGER.warning("No logger over Bluetooth LE service found!") if default is not None: return default if check_default is not None and check_default in [opt[1] for opt in options]: @@ -160,8 +112,6 @@ def get_port_type(port): return "SERIAL" if port == "MQTT": return "MQTT" - if is_mac_address(port): - return "BLE" return "NETWORK" @@ -388,11 +338,6 @@ def upload_program(config, args, host): if host == "PYOCD": return upload_using_platformio(config, host, ["-t", "flash_pyocd"]) - if host.startswith("mcumgr"): - firmware = os.path.abspath( - CORE.relative_pioenvs_path(CORE.name, "zephyr", "app_update.bin") - ) - return asyncio.run(smpmgr_upload(config, host.split(" ")[1], firmware)) ota_conf = {} for ota_item in config.get(CONF_OTA, []): @@ -452,9 +397,6 @@ def show_logs(config, args, port): config, args.topic, args.username, args.password, args.client_id ) - if get_port_type(port) == "BLE": - return asyncio.run(logger_connect(port)) - raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)") diff --git a/esphome/zephyr_tools.py b/esphome/zephyr_tools.py deleted file mode 100644 index d32418e403..0000000000 --- a/esphome/zephyr_tools.py +++ /dev/null @@ -1,173 +0,0 @@ -import time -import asyncio -import logging -import re -from typing import Final -from rich.pretty import pprint -from bleak import BleakScanner, BleakClient -from bleak.exc import BleakDeviceNotFoundError, BleakDBusError -from smpclient.transport.ble import SMPBLETransport -from smpclient.transport import SMPTransportDisconnected -from smpclient.transport.serial import SMPSerialTransport -from smpclient import SMPClient -from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound, MCUBootImageError -from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite -from smpclient.requests.os_management import ResetWrite -from smpclient.generics import error, success -from smp.exceptions import SMPBadStartDelimiter -from esphome.espota2 import ProgressBar - -SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84" -NUS_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" -NUS_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" -MAC_ADDRESS_PATTERN: Final = re.compile( - r"([0-9A-F]{2}[:]){5}[0-9A-F]{2}$", flags=re.IGNORECASE -) - -_LOGGER = logging.getLogger(__name__) - - -def is_mac_address(value): - return MAC_ADDRESS_PATTERN.match(value) - - -async def logger_scan(name): - _LOGGER.info("Scanning bluetooth for %s...", name) - device = await BleakScanner.find_device_by_name(name) - return device - - -async def logger_connect(host): - disconnected_event = asyncio.Event() - - def handle_disconnect(client): - disconnected_event.set() - - def handle_rx(_, data: bytearray): - print(data.decode("utf-8"), end="") - - _LOGGER.info("Connecting %s...", host) - async with BleakClient(host, disconnected_callback=handle_disconnect) as client: - _LOGGER.info("Connected %s...", host) - try: - await client.start_notify(NUS_TX_CHAR_UUID, handle_rx) - except BleakDBusError as e: - _LOGGER.error("Bluetooth LE logger: %s", e) - disconnected_event.set() - await disconnected_event.wait() - - -async def smpmgr_scan(name): - _LOGGER.info("Scanning bluetooth for %s...", name) - devices = [] - for device in await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID]): - if device.name == name: - devices += [device] - return devices - - -def get_image_tlv_sha256(file): - _LOGGER.info("Checking image: %s", str(file)) - try: - image_info = ImageInfo.load_file(str(file)) - pprint(image_info.header) - _LOGGER.debug(str(image_info)) - except MCUBootImageError as e: - _LOGGER.error("Inspection of FW image failed: %s", e) - return None - - try: - image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256) - _LOGGER.debug("IMAGE_TLV_SHA256: %s", image_tlv_sha256) - except TLVNotFound: - _LOGGER.error("Could not find IMAGE_TLV_SHA256 in image.") - return None - return image_tlv_sha256.value - - -async def smpmgr_upload(config, host, firmware): - try: - return await smpmgr_upload_(config, host, firmware) - except SMPTransportDisconnected: - _LOGGER.error("%s was disconnected.", host) - return 1 - - -async def smpmgr_upload_(config, host, firmware): - image_tlv_sha256 = get_image_tlv_sha256(firmware) - if image_tlv_sha256 is None: - return 1 - - if is_mac_address(host): - smp_client = SMPClient(SMPBLETransport(), host) - else: - smp_client = SMPClient(SMPSerialTransport(), host) - - _LOGGER.info("Connecting %s...", host) - try: - await smp_client.connect() - except BleakDeviceNotFoundError: - _LOGGER.error("Device %s not found", host) - return 1 - - _LOGGER.info("Connected %s...", host) - - try: - image_state = await smp_client.request(ImageStatesRead(), 2.5) - except SMPBadStartDelimiter as e: - _LOGGER.error("mcumgr is not supported by device (%s)", e) - return 1 - - already_uploaded = False - - if error(image_state): - _LOGGER.error(image_state) - return 1 - if success(image_state): - if len(image_state.images) == 0: - _LOGGER.warning("No images on device!") - for image in image_state.images: - pprint(image) - if image.active and not image.confirmed: - _LOGGER.error("No free slot") - return 1 - if image.hash == image_tlv_sha256: - if already_uploaded: - _LOGGER.error("Both slots have the same image") - return 1 - if image.confirmed: - _LOGGER.error("Image already confirmted") - return 1 - _LOGGER.warning("The same image already uploaded") - already_uploaded = True - - if not already_uploaded: - with open(firmware, "rb") as file: - image = file.read() - file.close() - upload_size = len(image) - progress = ProgressBar() - progress.update(0) - try: - async for offset in smp_client.upload(image): - progress.update(offset / upload_size) - finally: - progress.done() - - _LOGGER.info("Mark image for testing") - r = await smp_client.request(ImageStatesWrite(hash=image_tlv_sha256), 1.0) - - if error(r): - _LOGGER.error(r) - return 1 - - # give a chance to execute completion callback - time.sleep(1) - _LOGGER.info("Reset") - r = await smp_client.request(ResetWrite(), 1.0) - - if error(r): - _LOGGER.error(r) - return 1 - - return 0 diff --git a/requirements.txt b/requirements.txt index b860fc3104..0cbe5e7265 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,3 @@ pyparsing >= 3.0 # For autocompletion argcomplete>=2.0.0 - -# for mcumgr -rich==13.7.0 -smpclient==3.2.0