add mcuboot support

This commit is contained in:
Tomasz Duda 2024-07-19 09:28:46 +02:00
parent c5b77f4590
commit 644fd263a3
4 changed files with 254 additions and 8 deletions

View file

@ -6,6 +6,7 @@ import os
import re
import sys
import time
import asyncio
from datetime import datetime
import argcomplete
@ -36,6 +37,7 @@ from esphome.const import (
PLATFORM_RP2040,
PLATFORM_RTL87XX,
SECRETS_FILES,
PLATFORM_NRF52,
)
from esphome.core import CORE, EsphomeError, coroutine
from esphome.helpers import indent, is_ip_address
@ -47,6 +49,13 @@ from esphome.util import (
get_serial_ports,
)
from esphome.log import color, setup_log, Fore
from .zephyr_tools import (
logger_scan,
logger_connect,
smpmgr_scan,
smpmgr_upload,
is_mac_address,
)
_LOGGER = logging.getLogger(__name__)
@ -86,19 +95,59 @@ def choose_prompt(options, purpose: str = None):
def choose_upload_log_host(
default, check_default, show_ota, show_mqtt, show_api, purpose: str = None
):
try:
mcuboot = CORE.config["nrf52"]["bootloader"] == "mcuboot"
except KeyError:
mcuboot = False
try:
ble_logger = CORE.config["zephyr_ble_nus"]["log"]
except KeyError:
ble_logger = False
ota = "ota" in CORE.config
options = []
prefix = ""
if mcuboot and show_ota and ota:
prefix = "mcumgr "
for port in get_serial_ports():
options.append((f"{port.path} ({port.description})", port.path))
options.append(
(f"{prefix}{port.path} ({port.description})", f"{prefix}{port.path}")
)
if default == "SERIAL":
return choose_prompt(options, purpose=purpose)
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
if default == "PYOCD":
if not mcuboot:
raise EsphomeError("PYOCD for adafruit is not implemented")
options = [("pyocd", "PYOCD")]
return choose_prompt(options, purpose=purpose)
if not mcuboot:
if (show_ota and ota) or (show_api and "api" in CORE.config):
options.append((f"Over The Air ({CORE.address})", CORE.address))
if default == "OTA":
return CORE.address
elif show_ota and ota:
if default:
options.append((f"OTA over Bluetooth LE ({default})", f"mcumgr {default}"))
return choose_prompt(options, purpose=purpose)
ble_devices = asyncio.run(smpmgr_scan(CORE.config["esphome"]["name"]))
if len(ble_devices) == 0:
_LOGGER.warning("No OTA over Bluetooth LE service found!")
for device in ble_devices:
options.append(
(
f"OTA over Bluetooth LE({device.address}) {device.name}",
f"mcumgr {device.address}",
)
)
if show_mqtt and CONF_MQTT in CORE.config:
options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT"))
if default == "OTA":
return "MQTT"
if "logging" == purpose and ble_logger and default is None:
ble_device = asyncio.run(logger_scan(CORE.config["esphome"]["name"]))
if ble_device:
options.append((f"Bluetooth LE logger ({ble_device})", ble_device.address))
else:
_LOGGER.warning("No logger over Bluetooth LE service found!")
if default is not None:
return default
if check_default is not None and check_default in [opt[1] for opt in options]:
@ -111,6 +160,8 @@ def get_port_type(port):
return "SERIAL"
if port == "MQTT":
return "MQTT"
if is_mac_address(port):
return "BLE"
return "NETWORK"
@ -289,9 +340,10 @@ def upload_using_esptool(config, port, file):
return run_esptool(115200)
def upload_using_platformio(config, port):
def upload_using_platformio(config, port, upload_args=None):
from esphome import platformio_api
if upload_args is None:
upload_args = ["-t", "upload", "-t", "nobuild"]
if port is not None:
upload_args += ["--upload-port", port]
@ -329,7 +381,19 @@ def upload_program(config, args, host):
if CORE.target_platform in (PLATFORM_BK72XX, PLATFORM_RTL87XX):
return upload_using_platformio(config, host)
return 1 # Unknown target platform
if CORE.target_platform in (PLATFORM_NRF52):
return upload_using_platformio(config, host, ["-t", "upload"])
raise EsphomeError(f"Unknown target platform: {CORE.target_platform}")
if host == "PYOCD":
print(CORE)
return upload_using_platformio(config, host, ["-t", "flash_pyocd"])
if host.startswith("mcumgr"):
firmware = os.path.abspath(
CORE.relative_pioenvs_path(CORE.name, "zephyr", "app_update.bin")
)
return asyncio.run(smpmgr_upload(config, host.split(" ")[1], firmware))
ota_conf = {}
for ota_item in config.get(CONF_OTA, []):
@ -389,6 +453,9 @@ def show_logs(config, args, port):
config, args.topic, args.username, args.password, args.client_id
)
if get_port_type(port) == "BLE":
return asyncio.run(logger_connect(port))
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")

View file

@ -14,6 +14,7 @@ PLATFORM_HOST = "host"
PLATFORM_BK72XX = "bk72xx"
PLATFORM_RTL87XX = "rtl87xx"
PLATFORM_LIBRETINY_OLDSTYLE = "libretiny"
PLATFORM_NRF52 = "nrf52"
TARGET_PLATFORMS = [
PLATFORM_ESP32,
@ -23,6 +24,7 @@ TARGET_PLATFORMS = [
PLATFORM_BK72XX,
PLATFORM_RTL87XX,
PLATFORM_LIBRETINY_OLDSTYLE,
PLATFORM_NRF52,
]
SOURCE_FILE_EXTENSIONS = {".cpp", ".hpp", ".h", ".c", ".tcc", ".ino"}

173
esphome/zephyr_tools.py Normal file
View file

@ -0,0 +1,173 @@
import time
import asyncio
import logging
import re
from typing import Final
from rich.pretty import pprint
from bleak import BleakScanner, BleakClient
from bleak.exc import BleakDeviceNotFoundError, BleakDBusError
from smpclient.transport.ble import SMPBLETransport
from smpclient.transport import SMPTransportDisconnected
from smpclient.transport.serial import SMPSerialTransport
from smpclient import SMPClient
from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound, MCUBootImageError
from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite
from smpclient.requests.os_management import ResetWrite
from smpclient.generics import error, success
from smp.exceptions import SMPBadStartDelimiter
from esphome.espota2 import ProgressBar
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
NUS_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
NUS_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
MAC_ADDRESS_PATTERN: Final = re.compile(
r"([0-9A-F]{2}[:]){5}[0-9A-F]{2}$", flags=re.IGNORECASE
)
_LOGGER = logging.getLogger(__name__)
def is_mac_address(value):
return MAC_ADDRESS_PATTERN.match(value)
async def logger_scan(name):
_LOGGER.info("Scanning bluetooth for %s...", name)
device = await BleakScanner.find_device_by_name(name)
return device
async def logger_connect(host):
disconnected_event = asyncio.Event()
def handle_disconnect(client):
disconnected_event.set()
def handle_rx(_, data: bytearray):
print(data.decode("utf-8"), end="")
_LOGGER.info("Connecting %s...", host)
async with BleakClient(host, disconnected_callback=handle_disconnect) as client:
_LOGGER.info("Connected %s...", host)
try:
await client.start_notify(NUS_TX_CHAR_UUID, handle_rx)
except BleakDBusError as e:
_LOGGER.error("Bluetooth LE logger: %s", e)
disconnected_event.set()
await disconnected_event.wait()
async def smpmgr_scan(name):
_LOGGER.info("Scanning bluetooth for %s...", name)
devices = []
for device in await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID]):
if device.name == name:
devices += [device]
return devices
def get_image_tlv_sha256(file):
_LOGGER.info("Checking image: %s", str(file))
try:
image_info = ImageInfo.load_file(str(file))
pprint(image_info.header)
_LOGGER.debug(str(image_info))
except MCUBootImageError as e:
_LOGGER.error("Inspection of FW image failed: %s", e)
return None
try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
_LOGGER.debug("IMAGE_TLV_SHA256: %s", image_tlv_sha256)
except TLVNotFound:
_LOGGER.error("Could not find IMAGE_TLV_SHA256 in image.")
return None
return image_tlv_sha256.value
async def smpmgr_upload(config, host, firmware):
try:
return await smpmgr_upload_(config, host, firmware)
except SMPTransportDisconnected:
_LOGGER.error("%s was disconnected.", host)
return 1
async def smpmgr_upload_(config, host, firmware):
image_tlv_sha256 = get_image_tlv_sha256(firmware)
if image_tlv_sha256 is None:
return 1
if is_mac_address(host):
smp_client = SMPClient(SMPBLETransport(), host)
else:
smp_client = SMPClient(SMPSerialTransport(), host)
_LOGGER.info("Connecting %s...", host)
try:
await smp_client.connect()
except BleakDeviceNotFoundError:
_LOGGER.error("Device %s not found", host)
return 1
_LOGGER.info("Connected %s...", host)
try:
image_state = await smp_client.request(ImageStatesRead(), 2.5)
except SMPBadStartDelimiter as e:
_LOGGER.error("mcumgr is not supported by device (%s)", e)
return 1
already_uploaded = False
if error(image_state):
_LOGGER.error(image_state)
return 1
if success(image_state):
if len(image_state.images) == 0:
_LOGGER.warning("No images on device!")
for image in image_state.images:
pprint(image)
if image.active and not image.confirmed:
_LOGGER.error("No free slot")
return 1
if image.hash == image_tlv_sha256:
if already_uploaded:
_LOGGER.error("Both slots have the same image")
return 1
if image.confirmed:
_LOGGER.error("Image already confirmted")
return 1
_LOGGER.warning("The same image already uploaded")
already_uploaded = True
if not already_uploaded:
with open(firmware, "rb") as file:
image = file.read()
file.close()
upload_size = len(image)
progress = ProgressBar()
progress.update(0)
try:
async for offset in smp_client.upload(image):
progress.update(offset / upload_size)
finally:
progress.done()
_LOGGER.info("Mark image for testing")
r = await smp_client.request(ImageStatesWrite(hash=image_tlv_sha256), 1.0)
if error(r):
_LOGGER.error(r)
return 1
# give a chance to execute completion callback
time.sleep(1)
_LOGGER.info("Reset")
r = await smp_client.request(ResetWrite(), 1.0)
if error(r):
_LOGGER.error(r)
return 1
return 0

View file

@ -27,3 +27,7 @@ pyparsing >= 3.0
# For autocompletion
argcomplete>=2.0.0
# for mcumgr
rich==13.7.0
smpclient==3.2.0