nrf52 core based on zephyr

This commit is contained in:
Tomasz Duda 2024-07-06 18:33:15 +02:00
parent 4c6a17e304
commit 2ad8d7d1c5
28 changed files with 1215 additions and 9 deletions

View file

@ -6,6 +6,7 @@ import os
import re import re
import sys import sys
import time import time
import asyncio
from datetime import datetime from datetime import datetime
import argcomplete import argcomplete
@ -36,6 +37,7 @@ from esphome.const import (
PLATFORM_RP2040, PLATFORM_RP2040,
PLATFORM_RTL87XX, PLATFORM_RTL87XX,
SECRETS_FILES, SECRETS_FILES,
PLATFORM_NRF52,
) )
from esphome.core import CORE, EsphomeError, coroutine from esphome.core import CORE, EsphomeError, coroutine
from esphome.helpers import indent, is_ip_address from esphome.helpers import indent, is_ip_address
@ -47,6 +49,13 @@ from esphome.util import (
get_serial_ports, get_serial_ports,
) )
from esphome.log import color, setup_log, Fore from esphome.log import color, setup_log, Fore
from .zephyr_tools import (
logger_scan,
logger_connect,
smpmgr_scan,
smpmgr_upload,
is_mac_address,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -86,19 +95,59 @@ def choose_prompt(options, purpose: str = None):
def choose_upload_log_host( def choose_upload_log_host(
default, check_default, show_ota, show_mqtt, show_api, purpose: str = None default, check_default, show_ota, show_mqtt, show_api, purpose: str = None
): ):
try:
mcuboot = CORE.config["nrf52"]["bootloader"] == "mcuboot"
except KeyError:
mcuboot = False
try:
ble_logger = CORE.config["zephyr_ble_nus"]["log"]
except KeyError:
ble_logger = False
ota = "ota" in CORE.config
options = [] options = []
prefix = ""
if mcuboot and show_ota and ota:
prefix = "mcumgr "
for port in get_serial_ports(): for port in get_serial_ports():
options.append((f"{port.path} ({port.description})", port.path)) options.append(
(f"{prefix}{port.path} ({port.description})", f"{prefix}{port.path}")
)
if default == "SERIAL": if default == "SERIAL":
return choose_prompt(options, purpose=purpose) return choose_prompt(options, purpose=purpose)
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config): if default == "PYOCD":
options.append((f"Over The Air ({CORE.address})", CORE.address)) if not mcuboot:
if default == "OTA": raise EsphomeError("PYOCD for adafruit is not implemented")
return CORE.address options = [("pyocd", "PYOCD")]
return choose_prompt(options, purpose=purpose)
if not mcuboot:
if (show_ota and ota) or (show_api and "api" in CORE.config):
options.append((f"Over The Air ({CORE.address})", CORE.address))
if default == "OTA":
return CORE.address
elif show_ota and ota:
if default:
options.append((f"OTA over Bluetooth LE ({default})", f"mcumgr {default}"))
return choose_prompt(options, purpose=purpose)
ble_devices = asyncio.run(smpmgr_scan(CORE.config["esphome"]["name"]))
if len(ble_devices) == 0:
_LOGGER.warning("No OTA over Bluetooth LE service found!")
for device in ble_devices:
options.append(
(
f"OTA over Bluetooth LE({device.address}) {device.name}",
f"mcumgr {device.address}",
)
)
if show_mqtt and CONF_MQTT in CORE.config: if show_mqtt and CONF_MQTT in CORE.config:
options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT")) options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT"))
if default == "OTA": if default == "OTA":
return "MQTT" return "MQTT"
if "logging" == purpose and ble_logger and default is None:
ble_device = asyncio.run(logger_scan(CORE.config["esphome"]["name"]))
if ble_device:
options.append((f"Bluetooth LE logger ({ble_device})", ble_device.address))
else:
_LOGGER.warning("No logger over Bluetooth LE service found!")
if default is not None: if default is not None:
return default return default
if check_default is not None and check_default in [opt[1] for opt in options]: if check_default is not None and check_default in [opt[1] for opt in options]:
@ -111,6 +160,8 @@ def get_port_type(port):
return "SERIAL" return "SERIAL"
if port == "MQTT": if port == "MQTT":
return "MQTT" return "MQTT"
if is_mac_address(port):
return "BLE"
return "NETWORK" return "NETWORK"
@ -289,10 +340,11 @@ def upload_using_esptool(config, port, file):
return run_esptool(115200) return run_esptool(115200)
def upload_using_platformio(config, port): def upload_using_platformio(config, port, upload_args=None):
from esphome import platformio_api from esphome import platformio_api
upload_args = ["-t", "upload", "-t", "nobuild"] if upload_args is None:
upload_args = ["-t", "upload", "-t", "nobuild"]
if port is not None: if port is not None:
upload_args += ["--upload-port", port] upload_args += ["--upload-port", port]
return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args) return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args)
@ -329,7 +381,19 @@ def upload_program(config, args, host):
if CORE.target_platform in (PLATFORM_BK72XX, PLATFORM_RTL87XX): if CORE.target_platform in (PLATFORM_BK72XX, PLATFORM_RTL87XX):
return upload_using_platformio(config, host) return upload_using_platformio(config, host)
return 1 # Unknown target platform if CORE.target_platform in (PLATFORM_NRF52):
return upload_using_platformio(config, host, ["-t", "upload"])
raise EsphomeError(f"Unknown target platform: {CORE.target_platform}")
if host == "PYOCD":
print(CORE)
return upload_using_platformio(config, host, ["-t", "flash_pyocd"])
if host.startswith("mcumgr"):
firmware = os.path.abspath(
CORE.relative_pioenvs_path(CORE.name, "zephyr", "app_update.bin")
)
return asyncio.run(smpmgr_upload(config, host.split(" ")[1], firmware))
ota_conf = {} ota_conf = {}
for ota_item in config.get(CONF_OTA, []): for ota_item in config.get(CONF_OTA, []):
@ -389,6 +453,9 @@ def show_logs(config, args, port):
config, args.topic, args.username, args.password, args.client_id config, args.topic, args.username, args.password, args.client_id
) )
if get_port_type(port) == "BLE":
return asyncio.run(logger_connect(port))
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)") raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")

View file

@ -0,0 +1,107 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_BOARD,
KEY_CORE,
KEY_TARGET_FRAMEWORK,
KEY_TARGET_PLATFORM,
PLATFORM_NRF52,
CONF_TYPE,
CONF_FRAMEWORK,
CONF_PLATFORM_VERSION,
)
from esphome.core import CORE, coroutine_with_priority
from esphome.components.zephyr import (
zephyr_set_core_data,
zephyr_to_code,
)
from esphome.components.zephyr.const import (
KEY_ZEPHYR,
KEY_BOOTLOADER,
BOOTLOADER_MCUBOOT,
)
from .boards_zephyr import BOARDS_ZEPHYR
from .const import (
BOOTLOADER_ADAFRUIT,
)
# force import gpio to register pin schema
from .gpio import nrf52_pin_to_code # noqa
AUTO_LOAD = ["zephyr"]
def set_core_data(config):
zephyr_set_core_data(config)
CORE.data[KEY_CORE][KEY_TARGET_PLATFORM] = PLATFORM_NRF52
CORE.data[KEY_CORE][KEY_TARGET_FRAMEWORK] = KEY_ZEPHYR
return config
BOOTLOADERS = [
BOOTLOADER_ADAFRUIT,
BOOTLOADER_MCUBOOT,
]
def _detect_bootloader(value):
value = value.copy()
bootloader = None
if (
value[CONF_BOARD] in BOARDS_ZEPHYR
and KEY_BOOTLOADER in BOARDS_ZEPHYR[value[CONF_BOARD]]
):
bootloader = BOARDS_ZEPHYR[value[CONF_BOARD]][KEY_BOOTLOADER]
if KEY_BOOTLOADER not in value:
if bootloader is None:
bootloader = BOOTLOADER_MCUBOOT
value[KEY_BOOTLOADER] = bootloader
else:
if bootloader is not None and bootloader != value[KEY_BOOTLOADER]:
raise cv.Invalid(
f"{value[CONF_FRAMEWORK][CONF_TYPE]} does not support '{bootloader}' bootloader for {value[CONF_BOARD]}"
)
return value
CONFIG_SCHEMA = cv.All(
cv.Schema(
{
cv.Required(CONF_BOARD): cv.string_strict,
cv.Optional(KEY_BOOTLOADER): cv.one_of(*BOOTLOADERS, lower=True),
}
),
_detect_bootloader,
set_core_data,
)
@coroutine_with_priority(1000)
async def to_code(config):
cg.add_platformio_option("board", config[CONF_BOARD])
cg.add_build_flag("-DUSE_NRF52")
cg.add_define("ESPHOME_BOARD", config[CONF_BOARD])
cg.add_define("ESPHOME_VARIANT", "NRF52")
conf = {CONF_PLATFORM_VERSION: "platformio/nordicnrf52@10.3.0"}
cg.add_platformio_option(CONF_FRAMEWORK, CORE.data[KEY_CORE][KEY_TARGET_FRAMEWORK])
cg.add_platformio_option("platform", conf[CONF_PLATFORM_VERSION])
cg.add_platformio_option(
"platform_packages",
[
"platformio/framework-zephyr@https://github.com/tomaszduda23/framework-sdk-nrf",
"platformio/toolchain-gccarmnoneeabi@https://github.com/tomaszduda23/toolchain-sdk-ng",
],
)
if config[KEY_BOOTLOADER] == BOOTLOADER_ADAFRUIT:
# make sure that firmware.zip is created
# for Adafruit_nRF52_Bootloader
cg.add_platformio_option("board_upload.protocol", "nrfutil")
cg.add_platformio_option("board_upload.use_1200bps_touch", "true")
cg.add_platformio_option("board_upload.require_upload_port", "true")
cg.add_platformio_option("board_upload.wait_for_upload_port", "true")
#
zephyr_to_code(conf)

View file

@ -0,0 +1,6 @@
from esphome.components.zephyr.const import KEY_BOOTLOADER
from .const import BOOTLOADER_ADAFRUIT
BOARDS_ZEPHYR = {
"adafruit_itsybitsy_nrf52840": {KEY_BOOTLOADER: BOOTLOADER_ADAFRUIT},
}

View file

@ -0,0 +1 @@
BOOTLOADER_ADAFRUIT = "adafruit"

View file

@ -0,0 +1,78 @@
from esphome import pins
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ID,
CONF_MODE,
CONF_INVERTED,
CONF_NUMBER,
CONF_ANALOG,
)
from esphome.components.zephyr.const import (
zephyr_ns,
)
GPIOPin = zephyr_ns.class_("ZephyrGPIOPin", cg.InternalGPIOPin)
def _translate_pin(value):
if isinstance(value, dict) or value is None:
raise cv.Invalid(
"This variable only supports pin numbers, not full pin schemas "
"(with inverted and mode)."
)
if isinstance(value, int):
return value
try:
return int(value)
except ValueError:
pass
# e.g. P0.27
if len(value) >= len("P0.0") and value[0] == "P" and value[2] == ".":
return cv.int_(value[len("P")].strip()) * 32 + cv.int_(
value[len("P0.") :].strip()
)
raise cv.Invalid(f"Invalid pin: {value}")
ADC_INPUTS = [
"AIN0",
"AIN1",
"AIN2",
"AIN3",
"AIN4",
"AIN5",
"AIN6",
"AIN7",
"VDD",
"VDDHDIV5",
]
def validate_gpio_pin(value):
if value in ADC_INPUTS:
return value
value = _translate_pin(value)
if value < 0 or value > (32 + 16):
raise cv.Invalid(f"NRF52: Invalid pin number: {value}")
return value
NRF52_PIN_SCHEMA = cv.All(
pins.gpio_base_schema(
GPIOPin,
validate_gpio_pin,
modes=pins.GPIO_STANDARD_MODES + (CONF_ANALOG,),
),
)
@pins.PIN_SCHEMA_REGISTRY.register("nrf52", NRF52_PIN_SCHEMA)
async def nrf52_pin_to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
num = config[CONF_NUMBER]
cg.add(var.set_pin(num))
cg.add(var.set_inverted(config[CONF_INVERTED]))
cg.add(var.set_flags(pins.gpio_flags_expr(config[CONF_MODE])))
return var

View file

@ -0,0 +1,41 @@
#ifdef USE_NRF52
#include <zephyr/init.h>
#include <hal/nrf_power.h>
namespace esphome {
namespace nrf52 {
static int board_esphome_init(void) {
/* if the board is powered from USB
* (high voltage mode), GPIO output voltage is set to 1.8 volts by
* default and that is not enough to turn the green and blue LEDs on.
* Increase GPIO voltage to 3.3 volts.
*/
if ((nrf_power_mainregstatus_get(NRF_POWER) == NRF_POWER_MAINREGSTATUS_HIGH) &&
((NRF_UICR->REGOUT0 & UICR_REGOUT0_VOUT_Msk) == (UICR_REGOUT0_VOUT_DEFAULT << UICR_REGOUT0_VOUT_Pos))) {
NRF_NVMC->CONFIG = NVMC_CONFIG_WEN_Wen << NVMC_CONFIG_WEN_Pos;
while (NRF_NVMC->READY == NVMC_READY_READY_Busy) {
;
}
NRF_UICR->REGOUT0 =
(NRF_UICR->REGOUT0 & ~((uint32_t) UICR_REGOUT0_VOUT_Msk)) | (UICR_REGOUT0_VOUT_3V0 << UICR_REGOUT0_VOUT_Pos);
NRF_NVMC->CONFIG = NVMC_CONFIG_WEN_Ren << NVMC_CONFIG_WEN_Pos;
while (NRF_NVMC->READY == NVMC_READY_READY_Busy) {
;
}
/* a reset is required for changes to take effect */
NVIC_SystemReset();
}
return 0;
}
} // namespace nrf52
} // namespace esphome
static int board_esphome_init(void) { return esphome::nrf52::board_esphome_init(); }
SYS_INIT(board_esphome_init, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_DEFAULT);
#endif

View file

@ -0,0 +1,222 @@
import os
from typing import Union
import esphome.codegen as cg
from esphome.core import CORE
from esphome.helpers import (
write_file_if_changed,
copy_file_if_changed,
)
from esphome.const import (
CONF_BOARD,
KEY_NAME,
)
from .const import (
KEY_ZEPHYR,
KEY_PRJ_CONF,
KEY_OVERLAY,
zephyr_ns,
BOOTLOADER_MCUBOOT,
KEY_EXTRA_BUILD_FILES,
KEY_PATH,
KEY_BOOTLOADER,
)
AUTO_LOAD = ["preferences"]
KEY_BOARD = "board"
KEY_USER = "user"
def zephyr_set_core_data(config):
CORE.data[KEY_ZEPHYR] = {}
CORE.data[KEY_ZEPHYR][KEY_BOARD] = config[CONF_BOARD]
CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF] = {}
CORE.data[KEY_ZEPHYR][KEY_OVERLAY] = ""
CORE.data[KEY_ZEPHYR][KEY_USER] = {}
CORE.data[KEY_ZEPHYR][KEY_BOOTLOADER] = config[KEY_BOOTLOADER]
CORE.data[KEY_ZEPHYR][KEY_EXTRA_BUILD_FILES] = {}
return config
PrjConfValueType = Union[bool, str, int]
def zephyr_add_prj_conf(name: str, value: PrjConfValueType, required: bool = True):
"""Set an zephyr prj conf value."""
if not name.startswith("CONFIG_"):
name = "CONFIG_" + name
if name in CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF]:
old_value = CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF][name]
if old_value[0] != value and old_value[1]:
raise ValueError(
f"{name} already set with value '{old_value[0]}', cannot set again to '{value}'"
)
if required:
CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF][name] = (value, required)
else:
CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF][name] = (value, required)
def zephyr_add_user(key, value):
if key not in CORE.data[KEY_ZEPHYR][KEY_USER]:
CORE.data[KEY_ZEPHYR][KEY_USER][key] = []
CORE.data[KEY_ZEPHYR][KEY_USER][key] += [value]
def zephyr_add_overlay(content):
CORE.data[KEY_ZEPHYR][KEY_OVERLAY] += content
def add_extra_build_file(filename: str, path: str) -> bool:
"""Add an extra build file to the project."""
if filename not in CORE.data[KEY_ZEPHYR][KEY_EXTRA_BUILD_FILES]:
CORE.data[KEY_ZEPHYR][KEY_EXTRA_BUILD_FILES][filename] = {
KEY_NAME: filename,
KEY_PATH: path,
}
return True
return False
def add_extra_script(stage: str, filename: str, path: str):
"""Add an extra script to the project."""
key = f"{stage}:{filename}"
if add_extra_build_file(filename, path):
cg.add_platformio_option("extra_scripts", [key])
def zephyr_to_code(conf):
cg.add(zephyr_ns.setup_preferences())
cg.add_build_flag("-DUSE_ZEPHYR")
# build is done by west so bypass board checking in platformio
cg.add_platformio_option("boards_dir", CORE.relative_build_path("boards"))
# c++ support
zephyr_add_prj_conf("NEWLIB_LIBC", True)
zephyr_add_prj_conf("CONFIG_FPU", True)
zephyr_add_prj_conf("NEWLIB_LIBC_FLOAT_PRINTF", True)
zephyr_add_prj_conf("CPLUSPLUS", True)
zephyr_add_prj_conf("LIB_CPLUSPLUS", True)
# preferences
zephyr_add_prj_conf("SETTINGS", True)
zephyr_add_prj_conf("NVS", True)
zephyr_add_prj_conf("FLASH_MAP", True)
zephyr_add_prj_conf("CONFIG_FLASH", True)
# watchdog
zephyr_add_prj_conf("WATCHDOG", True)
zephyr_add_prj_conf("WDT_DISABLE_AT_BOOT", False)
# disable console
zephyr_add_prj_conf("UART_CONSOLE", False)
zephyr_add_prj_conf("CONSOLE", False, False)
# TODO move to nrf52
# use NFC pins as GPIO
zephyr_add_prj_conf("NFCT_PINS_AS_GPIOS", True)
add_extra_script(
"pre",
"pre_build.py",
os.path.join(os.path.dirname(__file__), "pre_build.py.script"),
)
def _format_prj_conf_val(value: PrjConfValueType) -> str:
if isinstance(value, bool):
return "y" if value else "n"
if isinstance(value, int):
return str(value)
if isinstance(value, str):
return f'"{value}"'
raise ValueError
def zephyr_add_cdc_acm(config, id):
zephyr_add_prj_conf("USB_DEVICE_STACK", True)
zephyr_add_prj_conf("USB_CDC_ACM", True)
# prevent device to go to susspend, without this communication stop working in python
# there should be a way to solve it
zephyr_add_prj_conf("USB_DEVICE_REMOTE_WAKEUP", False)
# prevent logging when buffer is full
zephyr_add_prj_conf("USB_CDC_ACM_LOG_LEVEL_WRN", True)
zephyr_add_overlay(
f"""
&zephyr_udc0 {{
cdc_acm_uart{id}: cdc_acm_uart{id} {{
compatible = "zephyr,cdc-acm-uart";
}};
}};
"""
)
# Called by writer.py
def copy_files():
want_opts = CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF]
prj_conf = (
"\n".join(
f"{name}={_format_prj_conf_val(value[0])}"
for name, value in sorted(want_opts.items())
)
+ "\n"
)
write_file_if_changed(CORE.relative_build_path("zephyr/prj.conf"), prj_conf)
if CORE.data[KEY_ZEPHYR][KEY_USER]:
zephyr_add_overlay(
f"""
/ {{
zephyr,user {{
{[f"{key} = {', '.join(value)};" for key, value in CORE.data[KEY_ZEPHYR][KEY_USER].items()][0]}
}};
}};"""
)
write_file_if_changed(
CORE.relative_build_path("zephyr/app.overlay"),
CORE.data[KEY_ZEPHYR][KEY_OVERLAY],
)
# write_file_if_changed(
# CORE.relative_build_path("zephyr/child_image/mcuboot.conf"),
# """
# CONFIG_MCUBOOT_SERIAL=y
# CONFIG_BOOT_SERIAL_PIN_RESET=y
# CONFIG_UART_CONSOLE=n
# CONFIG_BOOT_SERIAL_ENTRANCE_GPIO=n
# CONFIG_BOOT_SERIAL_CDC_ACM=y
# CONFIG_UART_NRFX=n
# CONFIG_LOG=n
# CONFIG_ASSERT_VERBOSE=n
# CONFIG_BOOT_BANNER=n
# CONFIG_PRINTK=n
# CONFIG_CBPRINTF_LIBC_SUBSTS=n
# """,
# )
if CORE.data[KEY_ZEPHYR][KEY_BOOTLOADER] == BOOTLOADER_MCUBOOT:
fake_board_manifest = """
{
"frameworks": [
"zephyr"
],
"name": "esphome nrf52",
"upload": {
"maximum_ram_size": 248832,
"maximum_size": 815104
},
"url": "https://esphome.io/",
"vendor": "esphome"
}
"""
write_file_if_changed(
CORE.relative_build_path(f"boards/{CORE.data[KEY_ZEPHYR][KEY_BOARD]}.json"),
fake_board_manifest,
)
for _, file in CORE.data[KEY_ZEPHYR][KEY_EXTRA_BUILD_FILES].items():
copy_file_if_changed(
file[KEY_PATH],
CORE.relative_build_path(file[KEY_NAME]),
)

View file

@ -0,0 +1,12 @@
import esphome.codegen as cg
KEY_ZEPHYR = "zephyr"
KEY_PRJ_CONF = "prj_conf"
KEY_OVERLAY = "overlay"
KEY_BOOTLOADER = "bootloader"
KEY_EXTRA_BUILD_FILES = "extra_build_files"
KEY_PATH = "path"
BOOTLOADER_MCUBOOT = "mcuboot"
zephyr_ns = cg.esphome_ns.namespace("zephyr")

View file

@ -0,0 +1,53 @@
#ifdef USE_ZEPHYR
#include <zephyr/kernel.h>
#include <zephyr/drivers/watchdog.h>
#include <zephyr/sys/reboot.h>
namespace esphome {
static int wdt_channel_id = -EINVAL;
const device *wdt = nullptr;
void yield() { ::k_yield(); }
uint32_t millis() { return k_ticks_to_ms_floor32(k_uptime_ticks()); }
void delay(uint32_t ms) { ::k_msleep(ms); }
uint32_t micros() { return k_ticks_to_us_floor32(k_uptime_ticks()); }
void arch_init() {
wdt = DEVICE_DT_GET(DT_ALIAS(watchdog0));
if (device_is_ready(wdt)) {
static wdt_timeout_cfg wdt_config{};
wdt_config.flags = WDT_FLAG_RESET_SOC;
wdt_config.window.max = 2000;
wdt_channel_id = wdt_install_timeout(wdt, &wdt_config);
if (wdt_channel_id >= 0) {
wdt_setup(wdt, WDT_OPT_PAUSE_HALTED_BY_DBG | WDT_OPT_PAUSE_IN_SLEEP);
}
}
}
void arch_feed_wdt() {
if (wdt_channel_id >= 0) {
wdt_feed(wdt, wdt_channel_id);
}
}
void arch_restart() { sys_reboot(SYS_REBOOT_COLD); }
} // namespace esphome
void setup();
void loop();
int main() {
setup();
while (1) {
loop();
esphome::yield();
}
return 0;
}
#endif

View file

@ -0,0 +1,120 @@
#ifdef USE_ZEPHYR
#include "gpio.h"
#include "esphome/core/log.h"
#include <zephyr/drivers/gpio.h>
namespace esphome {
namespace zephyr {
static const char *const TAG = "zephyr";
static int flags_to_mode(gpio::Flags flags, bool inverted, bool value) {
int ret = 0;
if (flags & gpio::FLAG_INPUT) {
ret |= GPIO_INPUT;
}
if (flags & gpio::FLAG_OUTPUT) {
ret |= GPIO_OUTPUT;
if (value != inverted) {
ret |= GPIO_OUTPUT_INIT_HIGH;
} else {
ret |= GPIO_OUTPUT_INIT_LOW;
}
}
if (flags & gpio::FLAG_PULLUP) {
ret |= GPIO_PULL_UP;
}
if (flags & gpio::FLAG_PULLDOWN) {
ret |= GPIO_PULL_DOWN;
}
if (flags & gpio::FLAG_OPEN_DRAIN) {
ret |= GPIO_OPEN_DRAIN;
}
return ret;
}
struct ISRPinArg {
uint8_t pin;
bool inverted;
};
ISRInternalGPIOPin ZephyrGPIOPin::to_isr() const {
auto *arg = new ISRPinArg{};
arg->pin = pin_;
arg->inverted = inverted_;
return ISRInternalGPIOPin((void *) arg);
}
void ZephyrGPIOPin::attach_interrupt(void (*func)(void *), void *arg, gpio::InterruptType type) const {
// TODO
}
void ZephyrGPIOPin::setup() {
const struct device *gpio = nullptr;
if (pin_ < 32) {
#define GPIO0 DT_NODELABEL(gpio0)
#if DT_NODE_HAS_STATUS(GPIO0, okay)
gpio = DEVICE_DT_GET(GPIO0);
#else
#error "gpio0 is disabled"
#endif
} else {
#define GPIO1 DT_NODELABEL(gpio1)
#if DT_NODE_HAS_STATUS(GPIO1, okay)
gpio = DEVICE_DT_GET(GPIO1);
#else
#error "gpio1 is disabled"
#endif
}
if (device_is_ready(gpio)) {
gpio_ = gpio;
} else {
ESP_LOGE(TAG, "gpio %u is not ready.", pin_);
return;
}
pin_mode(flags_);
}
void ZephyrGPIOPin::pin_mode(gpio::Flags flags) {
if (nullptr == gpio_) {
return;
}
gpio_pin_configure(gpio_, pin_ % 32, flags_to_mode(flags, inverted_, value_));
}
std::string ZephyrGPIOPin::dump_summary() const {
char buffer[32];
snprintf(buffer, sizeof(buffer), "GPIO%u", pin_);
return buffer;
}
bool ZephyrGPIOPin::digital_read() {
if (nullptr == gpio_) {
return false;
}
return bool(gpio_pin_get(gpio_, pin_ % 32) != inverted_);
}
void ZephyrGPIOPin::digital_write(bool value) {
// make sure that value is not ignored since it can be inverted e.g. on switch side
// that way init state should be correct
value_ = value;
if (nullptr == gpio_) {
return;
}
gpio_pin_set(gpio_, pin_ % 32, value != inverted_ ? 1 : 0);
}
void ZephyrGPIOPin::detach_interrupt() const {
// TODO
}
} // namespace zephyr
bool IRAM_ATTR ISRInternalGPIOPin::digital_read() {
// TODO
return false;
}
} // namespace esphome
#endif

View file

@ -0,0 +1,37 @@
#pragma once
#ifdef USE_ZEPHYR
#include "esphome/core/hal.h"
struct device;
namespace esphome {
namespace zephyr {
class ZephyrGPIOPin : public InternalGPIOPin {
public:
void set_pin(uint8_t pin) { pin_ = pin; }
void set_inverted(bool inverted) { inverted_ = inverted; }
void set_flags(gpio::Flags flags) { flags_ = flags; }
void setup() override;
void pin_mode(gpio::Flags flags) override;
bool digital_read() override;
void digital_write(bool value) override;
std::string dump_summary() const override;
void detach_interrupt() const override;
ISRInternalGPIOPin to_isr() const override;
uint8_t get_pin() const override { return pin_; }
bool is_inverted() const override { return inverted_; }
protected:
void attach_interrupt(void (*func)(void *), void *arg, gpio::InterruptType type) const override;
uint8_t pin_;
bool inverted_;
gpio::Flags flags_;
const device *gpio_ = nullptr;
bool value_ = false;
};
} // namespace zephyr
} // namespace esphome
#endif // USE_ZEPHYR

View file

@ -0,0 +1,4 @@
Import("env")
board_config = env.BoardConfig()
board_config.update("frameworks", ["arduino", "zephyr"])

View file

@ -0,0 +1,155 @@
#ifdef USE_ZEPHYR
#include "esphome/core/preferences.h"
#include "esphome/core/log.h"
#include <zephyr/settings/settings.h>
namespace esphome {
namespace zephyr {
static const char *const TAG = "zephyr.preferences";
#define ESPHOME_SETTINGS_KEY "esphome"
class ZephyrPreferenceBackend : public ESPPreferenceBackend {
public:
ZephyrPreferenceBackend(uint32_t type) { this->type_ = type; }
ZephyrPreferenceBackend(uint32_t type, std::vector<uint8_t> &&data) : data(std::move(data)) { this->type_ = type; }
bool save(const uint8_t *data, size_t len) override {
this->data.resize(len);
std::memcpy(this->data.data(), data, len);
ESP_LOGVV(TAG, "save key: %u, len: %d", type_, len);
return true;
}
bool load(uint8_t *data, size_t len) override {
if (len != this->data.size()) {
ESP_LOGE(TAG, "size of setting key %s changed, from: %u, to: %u", get_key().c_str(), this->data.size(), len);
return false;
}
std::memcpy(data, this->data.data(), len);
ESP_LOGVV(TAG, "load key: %u, len: %d", type_, len);
return true;
}
const uint32_t get_type() const { return type_; }
const std::string get_key() const { return str_sprintf(ESPHOME_SETTINGS_KEY "/%" PRIx32, type_); }
std::vector<uint8_t> data;
protected:
uint32_t type_ = 0;
};
class ZephyrPreferences : public ESPPreferences {
public:
void open() {
int err = settings_subsys_init();
if (err) {
ESP_LOGE(TAG, "Failed to initialize settings subsystem, err: %d", err);
return;
}
static struct settings_handler settings_cb = {
.name = ESPHOME_SETTINGS_KEY,
.h_set = load_setting_,
.h_export = export_settings_,
};
err = settings_register(&settings_cb);
if (err) {
ESP_LOGE(TAG, "setting_register failed, err, %d", err);
return;
}
err = settings_load_subtree(ESPHOME_SETTINGS_KEY);
if (err) {
ESP_LOGE(TAG, "Cannot load settings, err: %d", err);
return;
}
ESP_LOGD(TAG, "Loaded %u settings.", backends_.size());
}
ESPPreferenceObject make_preference(size_t length, uint32_t type, bool in_flash) override {
return make_preference(length, type);
}
ESPPreferenceObject make_preference(size_t length, uint32_t type) override {
for (auto backend : backends_) {
if (backend->get_type() == type) {
return ESPPreferenceObject(backend);
}
}
printf("type %u size %u\n", type, backends_.size());
auto *pref = new ZephyrPreferenceBackend(type);
ESP_LOGD(TAG, "Add new setting %s.", pref->get_key().c_str());
backends_.push_back(pref);
return ESPPreferenceObject(pref);
}
bool sync() override {
ESP_LOGD(TAG, "Save settings");
int err = settings_save();
if (err) {
ESP_LOGE(TAG, "Cannot save settings, err: %d", err);
return false;
}
return true;
}
bool reset() override {
ESP_LOGD(TAG, "Reset settings");
for (auto backend : backends_) {
// save empty delete data
backend->data.clear();
}
sync();
return true;
}
protected:
std::vector<ZephyrPreferenceBackend *> backends_;
static int load_setting_(const char *name, size_t len, settings_read_cb read_cb, void *cb_arg) {
auto type = parse_hex<uint32_t>(name);
if (!type.has_value()) {
std::string full_name(ESPHOME_SETTINGS_KEY);
full_name += "/";
full_name += name;
// Delete unusable keys. Otherwise it will stay in flash forever.
settings_delete(full_name.c_str());
return 1;
}
std::vector<uint8_t> data(len);
int err = read_cb(cb_arg, data.data(), len);
ESP_LOGD(TAG, "load setting, name: %s(%u), len %u, err %u", name, *type, len, err);
auto *pref = new ZephyrPreferenceBackend(*type, std::move(data));
static_cast<ZephyrPreferences *>(global_preferences)->backends_.push_back(pref);
return 0;
}
static int export_settings_(int (*cb)(const char *name, const void *value, size_t val_len)) {
for (auto backend : static_cast<ZephyrPreferences *>(global_preferences)->backends_) {
auto name = backend->get_key();
int err = cb(name.c_str(), backend->data.data(), backend->data.size());
ESP_LOGD(TAG, "save in flash, name %s, len %u, err %d", name.c_str(), backend->data.size(), err);
}
return 0;
}
};
void setup_preferences() {
auto *prefs = new ZephyrPreferences();
global_preferences = prefs;
prefs->open();
}
} // namespace zephyr
ESPPreferences *global_preferences; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
} // namespace esphome
#endif

View file

@ -0,0 +1,13 @@
#pragma once
#ifdef USE_ZEPHYR
namespace esphome {
namespace zephyr {
void setup_preferences();
} // namespace zephyr
} // namespace esphome
#endif

View file

@ -62,6 +62,7 @@ from esphome.const import (
TYPE_LOCAL, TYPE_LOCAL,
VALID_SUBSTITUTIONS_CHARACTERS, VALID_SUBSTITUTIONS_CHARACTERS,
__version__ as ESPHOME_VERSION, __version__ as ESPHOME_VERSION,
PLATFORM_NRF52,
) )
from esphome.core import ( from esphome.core import (
CORE, CORE,
@ -604,8 +605,10 @@ def only_with_framework(frameworks):
only_on_esp32 = only_on(PLATFORM_ESP32) only_on_esp32 = only_on(PLATFORM_ESP32)
only_on_esp8266 = only_on(PLATFORM_ESP8266) only_on_esp8266 = only_on(PLATFORM_ESP8266)
only_on_rp2040 = only_on(PLATFORM_RP2040) only_on_rp2040 = only_on(PLATFORM_RP2040)
only_on_nrf52 = only_on(PLATFORM_NRF52)
only_with_arduino = only_with_framework("arduino") only_with_arduino = only_with_framework("arduino")
only_with_esp_idf = only_with_framework("esp-idf") only_with_esp_idf = only_with_framework("esp-idf")
only_with_zephyr = only_with_framework("zephyr")
# Adapted from: # Adapted from:
@ -1648,6 +1651,7 @@ class SplitDefault(Optional):
bk72xx=vol.UNDEFINED, bk72xx=vol.UNDEFINED,
rtl87xx=vol.UNDEFINED, rtl87xx=vol.UNDEFINED,
host=vol.UNDEFINED, host=vol.UNDEFINED,
nrf52=vol.UNDEFINED,
): ):
super().__init__(key) super().__init__(key)
self._esp8266_default = vol.default_factory(esp8266) self._esp8266_default = vol.default_factory(esp8266)
@ -1679,6 +1683,7 @@ class SplitDefault(Optional):
self._bk72xx_default = vol.default_factory(bk72xx) self._bk72xx_default = vol.default_factory(bk72xx)
self._rtl87xx_default = vol.default_factory(rtl87xx) self._rtl87xx_default = vol.default_factory(rtl87xx)
self._host_default = vol.default_factory(host) self._host_default = vol.default_factory(host)
self._nrf52_default = vol.default_factory(nrf52)
@property @property
def default(self): def default(self):
@ -1721,6 +1726,8 @@ class SplitDefault(Optional):
return self._rtl87xx_default return self._rtl87xx_default
if CORE.is_host: if CORE.is_host:
return self._host_default return self._host_default
if CORE.is_nrf52:
return self._nrf52_default
raise NotImplementedError raise NotImplementedError
@default.setter @default.setter

View file

@ -14,6 +14,7 @@ PLATFORM_HOST = "host"
PLATFORM_BK72XX = "bk72xx" PLATFORM_BK72XX = "bk72xx"
PLATFORM_RTL87XX = "rtl87xx" PLATFORM_RTL87XX = "rtl87xx"
PLATFORM_LIBRETINY_OLDSTYLE = "libretiny" PLATFORM_LIBRETINY_OLDSTYLE = "libretiny"
PLATFORM_NRF52 = "nrf52"
TARGET_PLATFORMS = [ TARGET_PLATFORMS = [
PLATFORM_ESP32, PLATFORM_ESP32,
@ -23,6 +24,7 @@ TARGET_PLATFORMS = [
PLATFORM_BK72XX, PLATFORM_BK72XX,
PLATFORM_RTL87XX, PLATFORM_RTL87XX,
PLATFORM_LIBRETINY_OLDSTYLE, PLATFORM_LIBRETINY_OLDSTYLE,
PLATFORM_NRF52,
] ]
SOURCE_FILE_EXTENSIONS = {".cpp", ".hpp", ".h", ".c", ".tcc", ".ino"} SOURCE_FILE_EXTENSIONS = {".cpp", ".hpp", ".h", ".c", ".tcc", ".ino"}

View file

@ -21,6 +21,7 @@ from esphome.const import (
PLATFORM_RTL87XX, PLATFORM_RTL87XX,
PLATFORM_RP2040, PLATFORM_RP2040,
PLATFORM_HOST, PLATFORM_HOST,
PLATFORM_NRF52,
) )
from esphome.coroutine import FakeAwaitable as _FakeAwaitable from esphome.coroutine import FakeAwaitable as _FakeAwaitable
from esphome.coroutine import FakeEventLoop as _FakeEventLoop from esphome.coroutine import FakeEventLoop as _FakeEventLoop
@ -661,6 +662,10 @@ class EsphomeCore:
def is_host(self): def is_host(self):
return self.target_platform == PLATFORM_HOST return self.target_platform == PLATFORM_HOST
@property
def is_nrf52(self):
return self.target_platform == PLATFORM_NRF52
@property @property
def target_framework(self): def target_framework(self):
return self.data[KEY_CORE][KEY_TARGET_FRAMEWORK] return self.data[KEY_CORE][KEY_TARGET_FRAMEWORK]
@ -673,6 +678,10 @@ class EsphomeCore:
def using_esp_idf(self): def using_esp_idf(self):
return self.target_framework == "esp-idf" return self.target_framework == "esp-idf"
@property
def using_zephyr(self):
return self.target_framework == "zephyr"
def add_job(self, func, *args, **kwargs): def add_job(self, func, *args, **kwargs):
self.event_loop.add_job(func, *args, **kwargs) self.event_loop.add_job(func, *args, **kwargs)

View file

@ -40,6 +40,7 @@ from esphome.const import (
) )
from esphome.core import CORE, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.helpers import copy_file_if_changed, get_str_env, walk_files from esphome.helpers import copy_file_if_changed, get_str_env, walk_files
from esphome.components.zephyr import zephyr_add_prj_conf
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -361,6 +362,9 @@ async def to_code(config):
) )
) )
if CORE.using_zephyr:
zephyr_add_prj_conf("BT_DEVICE_NAME", config[CONF_NAME])
CORE.add_job(_add_automations, config) CORE.add_job(_add_automations, config)
cg.add_build_flag("-fno-exceptions") cg.add_build_flag("-fno-exceptions")

View file

@ -10,6 +10,7 @@
#include <cstdarg> #include <cstdarg>
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <strings.h>
#ifdef USE_HOST #ifdef USE_HOST
#ifndef _WIN32 #ifndef _WIN32
@ -55,6 +56,10 @@
#include <WiFi.h> // for macAddress() #include <WiFi.h> // for macAddress()
#endif #endif
#ifdef USE_ZEPHYR
#include <zephyr/random/rand32.h>
#endif
namespace esphome { namespace esphome {
static const char *const TAG = "helpers"; static const char *const TAG = "helpers";
@ -209,6 +214,8 @@ uint32_t random_uint32() {
std::mt19937 rng(dev()); std::mt19937 rng(dev());
std::uniform_int_distribution<uint32_t> dist(0, std::numeric_limits<uint32_t>::max()); std::uniform_int_distribution<uint32_t> dist(0, std::numeric_limits<uint32_t>::max());
return dist(rng); return dist(rng);
#elif defined(USE_ZEPHYR)
return rand();
#else #else
#error "No random source available for this configuration." #error "No random source available for this configuration."
#endif #endif
@ -246,6 +253,9 @@ bool random_bytes(uint8_t *data, size_t len) {
} }
fclose(fp); fclose(fp);
return true; return true;
#elif defined(USE_ZEPHYR)
sys_rand_get(data, len);
return true;
#else #else
#error "No random source available for this configuration." #error "No random source available for this configuration."
#endif #endif
@ -624,6 +634,11 @@ Mutex::Mutex() {}
void Mutex::lock() {} void Mutex::lock() {}
bool Mutex::try_lock() { return true; } bool Mutex::try_lock() { return true; }
void Mutex::unlock() {} void Mutex::unlock() {}
#elif defined(USE_ZEPHYR)
Mutex::Mutex() { k_mutex_init(&handle_); }
void Mutex::lock() { k_mutex_lock(&this->handle_, K_FOREVER); }
bool Mutex::try_lock() { return k_mutex_lock(&this->handle_, K_NO_WAIT) == 0; }
void Mutex::unlock() { k_mutex_unlock(&this->handle_); }
#elif defined(USE_ESP32) || defined(USE_LIBRETINY) #elif defined(USE_ESP32) || defined(USE_LIBRETINY)
Mutex::Mutex() { handle_ = xSemaphoreCreateMutex(); } Mutex::Mutex() { handle_ = xSemaphoreCreateMutex(); }
void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); } void Mutex::lock() { xSemaphoreTake(this->handle_, portMAX_DELAY); }
@ -681,6 +696,13 @@ void get_mac_address_raw(uint8_t *mac) { // NOLINT(readability-non-const-parame
WiFi.macAddress(mac); WiFi.macAddress(mac);
#elif defined(USE_LIBRETINY) #elif defined(USE_LIBRETINY)
WiFi.macAddress(mac); WiFi.macAddress(mac);
#elif defined(USE_NRF52)
mac[0] = ((NRF_FICR->DEVICEADDR[1] & 0xFFFF) >> 8) | 0xC0;
mac[1] = NRF_FICR->DEVICEADDR[1] & 0xFFFF;
mac[2] = NRF_FICR->DEVICEADDR[0] >> 24;
mac[3] = NRF_FICR->DEVICEADDR[0] >> 16;
mac[4] = NRF_FICR->DEVICEADDR[0] >> 8;
mac[5] = NRF_FICR->DEVICEADDR[0];
#else #else
// this should be an error, but that messes with CI checks. #error No mac address method defined // this should be an error, but that messes with CI checks. #error No mac address method defined
#endif #endif

View file

@ -7,6 +7,8 @@
#include <string> #include <string>
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
#include <limits>
#include <array>
#include "esphome/core/optional.h" #include "esphome/core/optional.h"
@ -20,6 +22,8 @@
#elif defined(USE_LIBRETINY) #elif defined(USE_LIBRETINY)
#include <FreeRTOS.h> #include <FreeRTOS.h>
#include <semphr.h> #include <semphr.h>
#elif defined(USE_ZEPHYR)
#include <zephyr/kernel.h>
#endif #endif
#define HOT __attribute__((hot)) #define HOT __attribute__((hot))
@ -552,7 +556,9 @@ class Mutex {
Mutex &operator=(const Mutex &) = delete; Mutex &operator=(const Mutex &) = delete;
private: private:
#if defined(USE_ESP32) || defined(USE_LIBRETINY) #if defined(USE_ZEPHYR)
k_mutex handle_;
#elif defined(USE_ESP32) || defined(USE_LIBRETINY)
SemaphoreHandle_t handle_; SemaphoreHandle_t handle_;
#endif #endif
}; };

View file

@ -310,6 +310,10 @@ def copy_src_tree():
CORE.relative_src_path("esphome.h"), CORE.relative_src_path("esphome.h"),
ESPHOME_H_FORMAT.format(include_s + '\n#include "pio_includes.h"'), ESPHOME_H_FORMAT.format(include_s + '\n#include "pio_includes.h"'),
) )
elif CORE.using_zephyr:
from esphome.components.zephyr import copy_files
copy_files()
def generate_defines_h(): def generate_defines_h():

173
esphome/zephyr_tools.py Normal file
View file

@ -0,0 +1,173 @@
import time
import asyncio
import logging
import re
from typing import Final
from rich.pretty import pprint
from bleak import BleakScanner, BleakClient
from bleak.exc import BleakDeviceNotFoundError, BleakDBusError
from smpclient.transport.ble import SMPBLETransport
from smpclient.transport import SMPTransportDisconnected
from smpclient.transport.serial import SMPSerialTransport
from smpclient import SMPClient
from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound, MCUBootImageError
from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite
from smpclient.requests.os_management import ResetWrite
from smpclient.generics import error, success
from smp.exceptions import SMPBadStartDelimiter
from esphome.espota2 import ProgressBar
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
NUS_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
NUS_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
MAC_ADDRESS_PATTERN: Final = re.compile(
r"([0-9A-F]{2}[:]){5}[0-9A-F]{2}$", flags=re.IGNORECASE
)
_LOGGER = logging.getLogger(__name__)
def is_mac_address(value):
return MAC_ADDRESS_PATTERN.match(value)
async def logger_scan(name):
_LOGGER.info("Scanning bluetooth for %s...", name)
device = await BleakScanner.find_device_by_name(name)
return device
async def logger_connect(host):
disconnected_event = asyncio.Event()
def handle_disconnect(client):
disconnected_event.set()
def handle_rx(_, data: bytearray):
print(data.decode("utf-8"), end="")
_LOGGER.info("Connecting %s...", host)
async with BleakClient(host, disconnected_callback=handle_disconnect) as client:
_LOGGER.info("Connected %s...", host)
try:
await client.start_notify(NUS_TX_CHAR_UUID, handle_rx)
except BleakDBusError as e:
_LOGGER.error("Bluetooth LE logger: %s", e)
disconnected_event.set()
await disconnected_event.wait()
async def smpmgr_scan(name):
_LOGGER.info("Scanning bluetooth for %s...", name)
devices = []
for device in await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID]):
if device.name == name:
devices += [device]
return devices
def get_image_tlv_sha256(file):
_LOGGER.info("Checking image: %s", str(file))
try:
image_info = ImageInfo.load_file(str(file))
pprint(image_info.header)
_LOGGER.debug(str(image_info))
except MCUBootImageError as e:
_LOGGER.error("Inspection of FW image failed: %s", e)
return None
try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
_LOGGER.debug("IMAGE_TLV_SHA256: %s", image_tlv_sha256)
except TLVNotFound:
_LOGGER.error("Could not find IMAGE_TLV_SHA256 in image.")
return None
return image_tlv_sha256.value
async def smpmgr_upload(config, host, firmware):
try:
return await smpmgr_upload_(config, host, firmware)
except SMPTransportDisconnected:
_LOGGER.error("%s was disconnected.", host)
return 1
async def smpmgr_upload_(config, host, firmware):
image_tlv_sha256 = get_image_tlv_sha256(firmware)
if image_tlv_sha256 is None:
return 1
if is_mac_address(host):
smp_client = SMPClient(SMPBLETransport(), host)
else:
smp_client = SMPClient(SMPSerialTransport(), host)
_LOGGER.info("Connecting %s...", host)
try:
await smp_client.connect()
except BleakDeviceNotFoundError:
_LOGGER.error("Device %s not found", host)
return 1
_LOGGER.info("Connected %s...", host)
try:
image_state = await smp_client.request(ImageStatesRead(), 2.5)
except SMPBadStartDelimiter as e:
_LOGGER.error("mcumgr is not supported by device (%s)", e)
return 1
already_uploaded = False
if error(image_state):
_LOGGER.error(image_state)
return 1
if success(image_state):
if len(image_state.images) == 0:
_LOGGER.warning("No images on device!")
for image in image_state.images:
pprint(image)
if image.active and not image.confirmed:
_LOGGER.error("No free slot")
return 1
if image.hash == image_tlv_sha256:
if already_uploaded:
_LOGGER.error("Both slots have the same image")
return 1
if image.confirmed:
_LOGGER.error("Image already confirmted")
return 1
_LOGGER.warning("The same image already uploaded")
already_uploaded = True
if not already_uploaded:
with open(firmware, "rb") as file:
image = file.read()
file.close()
upload_size = len(image)
progress = ProgressBar()
progress.update(0)
try:
async for offset in smp_client.upload(image):
progress.update(offset / upload_size)
finally:
progress.done()
_LOGGER.info("Mark image for testing")
r = await smp_client.request(ImageStatesWrite(hash=image_tlv_sha256), 1.0)
if error(r):
_LOGGER.error(r)
return 1
# give a chance to execute completion callback
time.sleep(1)
_LOGGER.info("Reset")
r = await smp_client.request(ResetWrite(), 1.0)
if error(r):
_LOGGER.error(r)
return 1
return 0

View file

@ -27,3 +27,7 @@ pyparsing >= 3.0
# For autocompletion # For autocompletion
argcomplete>=2.0.0 argcomplete>=2.0.0
# for mcumgr
rich==13.7.0
smpclient==3.2.0

View file

@ -540,6 +540,7 @@ def lint_relative_py_import(fname):
"esphome/components/rp2040/core.cpp", "esphome/components/rp2040/core.cpp",
"esphome/components/libretiny/core.cpp", "esphome/components/libretiny/core.cpp",
"esphome/components/host/core.cpp", "esphome/components/host/core.cpp",
"esphome/components/zephyr/core.cpp",
], ],
) )
def lint_namespace(fname, content): def lint_namespace(fname, content):

View file

@ -0,0 +1,14 @@
binary_sensor:
- platform: gpio
pin: 2
id: gpio_binary_sensor
output:
- platform: gpio
pin: 3
id: gpio_output
switch:
- platform: gpio
pin: 4
id: gpio_switch

View file

@ -0,0 +1,14 @@
binary_sensor:
- platform: gpio
pin: 2
id: gpio_binary_sensor
output:
- platform: gpio
pin: 3
id: gpio_output
switch:
- platform: gpio
pin: 4
id: gpio_switch

View file

@ -0,0 +1,15 @@
esphome:
name: componenttestnrf52
friendly_name: $component_name
nrf52:
board: adafruit_itsybitsy_nrf52840
logger:
level: VERY_VERBOSE
packages:
component_under_test: !include
file: $component_test_file
vars:
component_test_file: $component_test_file

View file

@ -0,0 +1,15 @@
esphome:
name: componenttestnrf52
friendly_name: $component_name
nrf52:
board: adafruit_feather_nrf52840
logger:
level: VERY_VERBOSE
packages:
component_under_test: !include
file: $component_test_file
vars:
component_test_file: $component_test_file