Move validations

This commit is contained in:
Rapsssito 2024-08-19 13:59:22 +02:00
parent 96026e8d07
commit f5a0d9f816

View file

@ -32,7 +32,8 @@ CONF_DESCRIPTORS = "descriptors"
CONF_VALUE_ACTION_ID_ = "value_action_id_" CONF_VALUE_ACTION_ID_ = "value_action_id_"
# Core key to store the global configuration # Core key to store the global configuration
KEY_ESP32_BLE_SERVER = "esp32_ble_server" KEY_ESP32_BLE_SERVER_NOTIFY_REQUIRED = "esp32_ble_server_notify_required"
KEY_ESP32_BLE_SERVER_NOTIFY_PROVIDED = "esp32_ble_server_notify_provided"
esp32_ble_server_ns = cg.esphome_ns.namespace("esp32_ble_server") esp32_ble_server_ns = cg.esphome_ns.namespace("esp32_ble_server")
ESPBTUUID_ns = cg.esphome_ns.namespace("esp32_ble").namespace("ESPBTUUID") ESPBTUUID_ns = cg.esphome_ns.namespace("esp32_ble").namespace("ESPBTUUID")
@ -69,11 +70,44 @@ PROPERTY_MAP = {
CONF_WRITE_NO_RESPONSE: BLECharacteristic_ns.PROPERTY_WRITE_NR, CONF_WRITE_NO_RESPONSE: BLECharacteristic_ns.PROPERTY_WRITE_NR,
} }
def validate_uuid(value): def validate_uuid(value):
if len(value) != 36: if len(value) != 36:
raise cv.Invalid("UUID must be exactly 36 characters long") raise cv.Invalid("UUID must be exactly 36 characters long")
return value return value
def validate_on_write(char_config):
if CONF_ON_WRITE in char_config:
if not char_config[CONF_WRITE] and not char_config[CONF_WRITE_NO_RESPONSE]:
raise cv.Invalid(
f"{CONF_ON_WRITE} requires the {CONF_WRITE} or {CONF_WRITE_NO_RESPONSE} property to be set"
)
return char_config
def validate_notify_characteristic(char_config):
if KEY_ESP32_BLE_SERVER_NOTIFY_PROVIDED not in CORE.data:
CORE.data[KEY_ESP32_BLE_SERVER_NOTIFY_PROVIDED] = dict()
CORE.data[KEY_ESP32_BLE_SERVER_NOTIFY_PROVIDED][char_config[CONF_ID]] = char_config[CONF_NOTIFY]
# Check if the NOTIFY property is set if the characteristic has a notify action
char_ids = CORE.data.get(KEY_ESP32_BLE_SERVER_NOTIFY_REQUIRED, set())
if not char_config[CONF_NOTIFY] and char_config[CONF_ID] in char_ids:
raise cv.Invalid("Characteristic has a notify action, but the NOTIFY property is not set")
return char_config
def validate_notify_action(action_char_id):
if KEY_ESP32_BLE_SERVER_NOTIFY_REQUIRED not in CORE.data:
CORE.data[KEY_ESP32_BLE_SERVER_NOTIFY_REQUIRED] = set()
CORE.data[KEY_ESP32_BLE_SERVER_NOTIFY_REQUIRED].add(action_char_id)
# Check if the NOTIFY property is set for the characteristic
char_notify_value = CORE.data.get(KEY_ESP32_BLE_SERVER_NOTIFY_PROVIDED, dict()).get(action_char_id, None)
if char_notify_value is not None and not char_notify_value:
raise cv.Invalid("Missing NOTIFY property for characteristic with notify action")
return action_char_id
UUID_SCHEMA = cv.Any(cv.All(cv.string, validate_uuid), cv.hex_uint32_t) UUID_SCHEMA = cv.Any(cv.All(cv.string, validate_uuid), cv.hex_uint32_t)
DESCRIPTOR_VALUE_SCHEMA = cv.Any( DESCRIPTOR_VALUE_SCHEMA = cv.Any(
@ -119,10 +153,9 @@ SERVICE_CHARACTERISTIC_SCHEMA = cv.Schema(
cv.Optional(CONF_ON_WRITE): automation.validate_automation( cv.Optional(CONF_ON_WRITE): automation.validate_automation(
{cv.GenerateID(): cv.declare_id(BLECharacteristic)}, single=True {cv.GenerateID(): cv.declare_id(BLECharacteristic)}, single=True
), ),
} },
).extend({ extra_schemas=[validate_on_write, validate_notify_characteristic],
cv.Optional(k, default=False): cv.boolean for k in PROPERTY_MAP.keys() ).extend({cv.Optional(k, default=False): cv.boolean for k in PROPERTY_MAP.keys()})
})
SERVICE_SCHEMA = cv.Schema( SERVICE_SCHEMA = cv.Schema(
{ {
@ -149,7 +182,10 @@ CONFIG_SCHEMA = cv.Schema(
def parse_properties(char_conf): def parse_properties(char_conf):
return sum((PROPERTY_MAP[k] for k in char_conf if k in PROPERTY_MAP and char_conf[k]), start=0) return sum(
(PROPERTY_MAP[k] for k in char_conf if k in PROPERTY_MAP and char_conf[k]),
start=0,
)
def parse_uuid(uuid): def parse_uuid(uuid):
@ -174,6 +210,7 @@ def parse_descriptor_value(value):
]: ]:
try: try:
val = val_method(value) val = val_method(value)
# TODO: What about ByteBuffer and strings
buffer = ByteBuffer_ns.wrap(val) buffer = ByteBuffer_ns.wrap(val)
return buffer, buffer.get_capacity() return buffer, buffer.get_capacity()
except cv.Invalid: except cv.Invalid:
@ -188,6 +225,37 @@ def parse_descriptor_value(value):
raise cv.Invalid(f"Could not find type for value: {value}") raise cv.Invalid(f"Could not find type for value: {value}")
async def parse_characteristic_value(value, args):
if isinstance(value, cv.Lambda):
return await cg.templatable(
value,
args,
ByteBuffer,
ByteBuffer_ns.wrap,
)
for val_method in [
cv.boolean,
cv.float_,
cv.hex_uint8_t,
cv.hex_uint16_t,
cv.hex_uint32_t,
cv.int_,
cv.string,
]:
try:
val = val_method(value)
return ByteBuffer_ns.wrap(val)
except cv.Invalid:
pass
# Assume it's a list of bytes
try:
val = cv.All(cv.ensure_list(cv.hex_uint8_t), cv.Length(min=1))(value)
return ByteBuffer_ns.wrap(cg.std_vector.template(cg.uint8)(val))
except cv.Invalid:
pass
raise cv.Invalid(f"Could not find type for value: {value}")
def calculate_num_handles(service_config): def calculate_num_handles(service_config):
total = 1 total = 1
for char_conf in service_config[CONF_CHARACTERISTICS]: for char_conf in service_config[CONF_CHARACTERISTICS]:
@ -199,8 +267,6 @@ def calculate_num_handles(service_config):
async def to_code(config): async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID]) var = cg.new_Pvariable(config[CONF_ID])
# Store the configuration
CORE.data[KEY_ESP32_BLE_SERVER] = config
await cg.register_component(var, config) await cg.register_component(var, config)
@ -236,10 +302,6 @@ async def to_code(config):
) )
if CONF_ON_WRITE in char_conf: if CONF_ON_WRITE in char_conf:
on_write_conf = char_conf[CONF_ON_WRITE] on_write_conf = char_conf[CONF_ON_WRITE]
if not char_conf[CONF_WRITE] and not char_conf[CONF_WRITE_NO_RESPONSE]:
raise cv.Invalid(
f"on_write requires the {CONF_WRITE} or {CONF_WRITE_NO_RESPONSE} property to be set"
)
await automation.build_automation( await automation.build_automation(
BLETriggers_ns.create_on_write_trigger(char_var), BLETriggers_ns.create_on_write_trigger(char_var),
[(cg.std_vector.template(cg.uint8), "x")], [(cg.std_vector.template(cg.uint8), "x")],
@ -275,37 +337,6 @@ async def to_code(config):
add_idf_sdkconfig_option("CONFIG_BT_ENABLED", True) add_idf_sdkconfig_option("CONFIG_BT_ENABLED", True)
async def parse_characteristic_value(value, args):
if isinstance(value, cv.Lambda):
return await cg.templatable(
value,
args,
ByteBuffer,
ByteBuffer_ns.wrap,
)
for val_method in [
cv.boolean,
cv.float_,
cv.hex_uint8_t,
cv.hex_uint16_t,
cv.hex_uint32_t,
cv.int_,
cv.string,
]:
try:
val = val_method(value)
return ByteBuffer_ns.wrap(val)
except cv.Invalid:
pass
# Assume it's a list of bytes
try:
val = cv.All(cv.ensure_list(cv.hex_uint8_t), cv.Length(min=1))(value)
return ByteBuffer_ns.wrap(cg.std_vector.template(cg.uint8)(val))
except cv.Invalid:
pass
raise cv.Invalid(f"Could not find type for value: {value}")
@automation.register_action( @automation.register_action(
"ble_server.characteristic.set_value", "ble_server.characteristic.set_value",
BLECharacteristicSetValueAction, BLECharacteristicSetValueAction,
@ -329,21 +360,11 @@ async def ble_server_characteristic_set_value(config, action_id, template_arg, a
BLECharacteristicNotifyAction, BLECharacteristicNotifyAction,
cv.Schema( cv.Schema(
{ {
cv.Required(CONF_ID): cv.use_id(BLECharacteristic), cv.Required(CONF_ID): cv.All(cv.use_id(BLECharacteristic), validate_notify_action),
} }
), ),
) )
async def ble_server_characteristic_notify(config, action_id, template_arg, args): async def ble_server_characteristic_notify(config, action_id, template_arg, args):
paren = await cg.get_variable(config[CONF_ID]) paren = await cg.get_variable(config[CONF_ID])
# Check if the NOTIFY property is set from the global configuration
ble_server_config = CORE.data[KEY_ESP32_BLE_SERVER]
for service_config in ble_server_config[CONF_SERVICES]:
for char_conf in service_config[CONF_CHARACTERISTICS]:
if char_conf[CONF_ID] == config[CONF_ID]:
if not char_conf[CONF_NOTIFY]:
raise cv.Invalid(
f'Characteristic "{char_conf[CONF_ID]}" does not have the NOTIFY property set'
)
break
var = cg.new_Pvariable(action_id, template_arg, paren) var = cg.new_Pvariable(action_id, template_arg, paren)
return var return var