mirror of
https://github.com/esphome/esphome.git
synced 2024-12-04 20:48:21 +01:00
124 lines
4.7 KiB
Python
124 lines
4.7 KiB
Python
import esphome.codegen as cg
|
|
import esphome.config_validation as cv
|
|
from esphome import automation
|
|
from esphome.core import CORE, coroutine
|
|
from esphome.const import CONF_ID, CONF_TRIGGER_ID, CONF_DATA
|
|
|
|
CODEOWNERS = ['@mvturnho', '@danielschramm']
|
|
IS_PLATFORM_COMPONENT = True
|
|
|
|
CONF_CAN_ID = 'can_id'
|
|
CONF_USE_EXTENDED_ID = 'use_extended_id'
|
|
CONF_CANBUS_ID = 'canbus_id'
|
|
CONF_BIT_RATE = 'bit_rate'
|
|
CONF_ON_FRAME = 'on_frame'
|
|
CONF_CANBUS_SEND = 'canbus.send'
|
|
|
|
|
|
def validate_id(id_value, id_ext):
|
|
if not id_ext:
|
|
if id_value > 0x7ff:
|
|
raise cv.Invalid("Standard IDs must be 11 Bit (0x000-0x7ff / 0-2047)")
|
|
|
|
|
|
def validate_raw_data(value):
|
|
if isinstance(value, str):
|
|
return value.encode('utf-8')
|
|
if isinstance(value, list):
|
|
return cv.Schema([cv.hex_uint8_t])(value)
|
|
raise cv.Invalid("data must either be a string wrapped in quotes or a list of bytes")
|
|
|
|
|
|
canbus_ns = cg.esphome_ns.namespace('canbus')
|
|
CanbusComponent = canbus_ns.class_('CanbusComponent', cg.Component)
|
|
CanbusTrigger = canbus_ns.class_('CanbusTrigger',
|
|
automation.Trigger.template(cg.std_vector.template(cg.uint8)),
|
|
cg.Component)
|
|
CanSpeed = canbus_ns.enum('CAN_SPEED')
|
|
|
|
CAN_SPEEDS = {
|
|
'5KBPS': CanSpeed.CAN_5KBPS,
|
|
'10KBPS': CanSpeed.CAN_10KBPS,
|
|
'20KBPS': CanSpeed.CAN_20KBPS,
|
|
'31K25BPS': CanSpeed.CAN_31K25BPS,
|
|
'33KBPS': CanSpeed.CAN_33KBPS,
|
|
'40KBPS': CanSpeed.CAN_40KBPS,
|
|
'50KBPS': CanSpeed.CAN_50KBPS,
|
|
'80KBPS': CanSpeed.CAN_80KBPS,
|
|
'83K3BPS': CanSpeed.CAN_83K3BPS,
|
|
'95KBPS': CanSpeed.CAN_95KBPS,
|
|
'100KBPS': CanSpeed.CAN_100KBPS,
|
|
'125KBPS': CanSpeed.CAN_125KBPS,
|
|
'200KBPS': CanSpeed.CAN_200KBPS,
|
|
'250KBPS': CanSpeed.CAN_250KBPS,
|
|
'500KBPS': CanSpeed.CAN_500KBPS,
|
|
'1000KBPS': CanSpeed.CAN_1000KBPS,
|
|
}
|
|
|
|
CONFIG_SCHEMA = cv.Schema({
|
|
cv.GenerateID(): cv.declare_id(CanbusComponent),
|
|
cv.Required(CONF_CAN_ID): cv.int_range(min=0, max=0x1fffffff),
|
|
cv.Optional(CONF_BIT_RATE, default='125KBPS'): cv.enum(CAN_SPEEDS, upper=True),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
cv.Optional(CONF_ON_FRAME): automation.validate_automation({
|
|
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(CanbusTrigger),
|
|
cv.GenerateID(CONF_CAN_ID): cv.int_range(min=0, max=0x1fffffff),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
}),
|
|
}).extend(cv.COMPONENT_SCHEMA)
|
|
|
|
|
|
@coroutine
|
|
def setup_canbus_core_(var, config):
|
|
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
|
|
yield cg.register_component(var, config)
|
|
cg.add(var.set_can_id([config[CONF_CAN_ID]]))
|
|
cg.add(var.set_use_extended_id([config[CONF_USE_EXTENDED_ID]]))
|
|
cg.add(var.set_bitrate(CAN_SPEEDS[config[CONF_BIT_RATE]]))
|
|
|
|
for conf in config.get(CONF_ON_FRAME, []):
|
|
can_id = conf[CONF_CAN_ID]
|
|
ext_id = conf[CONF_USE_EXTENDED_ID]
|
|
validate_id(can_id, ext_id)
|
|
trigger = cg.new_Pvariable(conf[CONF_TRIGGER_ID], var, can_id, ext_id)
|
|
yield cg.register_component(trigger, conf)
|
|
yield automation.build_automation(trigger, [(cg.std_vector.template(cg.uint8), 'x')], conf)
|
|
|
|
|
|
@coroutine
|
|
def register_canbus(var, config):
|
|
if not CORE.has_id(config[CONF_ID]):
|
|
var = cg.new_Pvariable(config[CONF_ID], var)
|
|
yield setup_canbus_core_(var, config)
|
|
|
|
|
|
# Actions
|
|
@automation.register_action(CONF_CANBUS_SEND,
|
|
canbus_ns.class_('CanbusSendAction', automation.Action),
|
|
cv.maybe_simple_value({
|
|
cv.GenerateID(CONF_CANBUS_ID): cv.use_id(CanbusComponent),
|
|
cv.Optional(CONF_CAN_ID): cv.int_range(min=0, max=0x1fffffff),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
cv.Required(CONF_DATA): cv.templatable(validate_raw_data),
|
|
}, key=CONF_DATA))
|
|
def canbus_action_to_code(config, action_id, template_arg, args):
|
|
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
|
|
var = cg.new_Pvariable(action_id, template_arg)
|
|
yield cg.register_parented(var, config[CONF_CANBUS_ID])
|
|
|
|
if CONF_CAN_ID in config:
|
|
can_id = yield cg.templatable(config[CONF_CAN_ID], args, cg.uint32)
|
|
cg.add(var.set_can_id(can_id))
|
|
|
|
use_extended_id = yield cg.templatable(config[CONF_USE_EXTENDED_ID], args, cg.uint32)
|
|
cg.add(var.set_use_extended_id(use_extended_id))
|
|
|
|
data = config[CONF_DATA]
|
|
if isinstance(data, bytes):
|
|
data = [int(x) for x in data]
|
|
if cg.is_template(data):
|
|
templ = yield cg.templatable(data, args, cg.std_vector.template(cg.uint8))
|
|
cg.add(var.set_data_template(templ))
|
|
else:
|
|
cg.add(var.set_data_static(data))
|
|
yield var
|