Language schema 202204 (#3492)

This commit is contained in:
Guillermo Ruffino 2022-06-16 22:46:20 -03:00 committed by GitHub
parent 29d6d0a906
commit f002a23d2d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 904 deletions

View file

@ -12,7 +12,7 @@ from esphome.const import (
CONF_TYPE_ID, CONF_TYPE_ID,
CONF_TIME, CONF_TIME,
) )
from esphome.jsonschema import jschema_extractor from esphome.schema_extractors import SCHEMA_EXTRACT, schema_extractor
from esphome.util import Registry from esphome.util import Registry
@ -23,11 +23,10 @@ def maybe_simple_id(*validators):
def maybe_conf(conf, *validators): def maybe_conf(conf, *validators):
validator = cv.All(*validators) validator = cv.All(*validators)
@jschema_extractor("maybe") @schema_extractor("maybe")
def validate(value): def validate(value):
# pylint: disable=comparison-with-callable if value == SCHEMA_EXTRACT:
if value == jschema_extractor: return (validator, conf)
return validator
if isinstance(value, dict): if isinstance(value, dict):
return validator(value) return validator(value)
@ -111,11 +110,9 @@ def validate_automation(extra_schema=None, extra_validators=None, single=False):
# This should only happen with invalid configs, but let's have a nice error message. # This should only happen with invalid configs, but let's have a nice error message.
return [schema(value)] return [schema(value)]
@jschema_extractor("automation") @schema_extractor("automation")
def validator(value): def validator(value):
# hack to get the schema if value == SCHEMA_EXTRACT:
# pylint: disable=comparison-with-callable
if value == jschema_extractor:
return schema return schema
value = validator_(value) value = validator_(value)

View file

@ -1,4 +1,4 @@
from esphome.jsonschema import jschema_extractor from esphome.schema_extractors import SCHEMA_EXTRACT, schema_extractor
import esphome.codegen as cg import esphome.codegen as cg
import esphome.config_validation as cv import esphome.config_validation as cv
from esphome import automation from esphome import automation
@ -479,11 +479,11 @@ async def addressable_flicker_effect_to_code(config, effect_id):
def validate_effects(allowed_effects): def validate_effects(allowed_effects):
@jschema_extractor("effects") @schema_extractor("effects")
def validator(value): def validator(value):
# pylint: disable=comparison-with-callable if value == SCHEMA_EXTRACT:
if value == jschema_extractor:
return (allowed_effects, EFFECTS_REGISTRY) return (allowed_effects, EFFECTS_REGISTRY)
value = cv.validate_registry("effect", EFFECTS_REGISTRY)(value) value = cv.validate_registry("effect", EFFECTS_REGISTRY)(value)
errors = [] errors = []
names = set() names = set()

View file

@ -32,7 +32,7 @@ from esphome.const import (
CONF_LEVEL, CONF_LEVEL,
) )
from esphome.core import coroutine from esphome.core import coroutine
from esphome.jsonschema import jschema_extractor from esphome.schema_extractors import SCHEMA_EXTRACT, schema_extractor
from esphome.util import Registry, SimpleRegistry from esphome.util import Registry, SimpleRegistry
AUTO_LOAD = ["binary_sensor"] AUTO_LOAD = ["binary_sensor"]
@ -195,14 +195,14 @@ def validate_dumpers(value):
def validate_triggers(base_schema): def validate_triggers(base_schema):
assert isinstance(base_schema, cv.Schema) assert isinstance(base_schema, cv.Schema)
@jschema_extractor("triggers") @schema_extractor("triggers")
def validator(config): def validator(config):
added_keys = {} added_keys = {}
for key, (_, valid) in TRIGGER_REGISTRY.items(): for key, (_, valid) in TRIGGER_REGISTRY.items():
added_keys[cv.Optional(key)] = valid added_keys[cv.Optional(key)] = valid
new_schema = base_schema.extend(added_keys) new_schema = base_schema.extend(added_keys)
# pylint: disable=comparison-with-callable
if config == jschema_extractor: if config == SCHEMA_EXTRACT:
return new_schema return new_schema
return new_schema(config) return new_schema(config)

View file

@ -57,11 +57,12 @@ from esphome.core import (
TimePeriodMinutes, TimePeriodMinutes,
) )
from esphome.helpers import list_starts_with, add_class_to_obj from esphome.helpers import list_starts_with, add_class_to_obj
from esphome.jsonschema import ( from esphome.schema_extractors import (
jschema_list, SCHEMA_EXTRACT,
jschema_extractor, schema_extractor_list,
jschema_registry, schema_extractor,
jschema_typed, schema_extractor_registry,
schema_extractor_typed,
) )
from esphome.util import parse_esphome_version from esphome.util import parse_esphome_version
from esphome.voluptuous_schema import _Schema from esphome.voluptuous_schema import _Schema
@ -327,7 +328,7 @@ def boolean(value):
) )
@jschema_list @schema_extractor_list
def ensure_list(*validators): def ensure_list(*validators):
"""Validate this configuration option to be a list. """Validate this configuration option to be a list.
@ -452,7 +453,11 @@ def validate_id_name(value):
def use_id(type): def use_id(type):
"""Declare that this configuration option should point to an ID with the given type.""" """Declare that this configuration option should point to an ID with the given type."""
@schema_extractor("use_id")
def validator(value): def validator(value):
if value == SCHEMA_EXTRACT:
return type
check_not_templatable(value) check_not_templatable(value)
if value is None: if value is None:
return core.ID(None, is_declaration=False, type=type) return core.ID(None, is_declaration=False, type=type)
@ -475,7 +480,11 @@ def declare_id(type):
If two IDs with the same name exist, a validation error is thrown. If two IDs with the same name exist, a validation error is thrown.
""" """
@schema_extractor("declare_id")
def validator(value): def validator(value):
if value == SCHEMA_EXTRACT:
return type
check_not_templatable(value) check_not_templatable(value)
if value is None: if value is None:
return core.ID(None, is_declaration=True, type=type) return core.ID(None, is_declaration=True, type=type)
@ -494,11 +503,11 @@ def templatable(other_validators):
""" """
schema = Schema(other_validators) schema = Schema(other_validators)
@jschema_extractor("templatable") @schema_extractor("templatable")
def validator(value): def validator(value):
# pylint: disable=comparison-with-callable if value == SCHEMA_EXTRACT:
if value == jschema_extractor:
return other_validators return other_validators
if isinstance(value, Lambda): if isinstance(value, Lambda):
return returning_lambda(value) return returning_lambda(value)
if isinstance(other_validators, dict): if isinstance(other_validators, dict):
@ -1177,10 +1186,9 @@ def one_of(*values, **kwargs):
if kwargs: if kwargs:
raise ValueError raise ValueError
@jschema_extractor("one_of") @schema_extractor("one_of")
def validator(value): def validator(value):
# pylint: disable=comparison-with-callable if value == SCHEMA_EXTRACT:
if value == jschema_extractor:
return values return values
if string_: if string_:
@ -1220,10 +1228,9 @@ def enum(mapping, **kwargs):
assert isinstance(mapping, dict) assert isinstance(mapping, dict)
one_of_validator = one_of(*mapping, **kwargs) one_of_validator = one_of(*mapping, **kwargs)
@jschema_extractor("enum") @schema_extractor("enum")
def validator(value): def validator(value):
# pylint: disable=comparison-with-callable if value == SCHEMA_EXTRACT:
if value == jschema_extractor:
return mapping return mapping
value = one_of_validator(value) value = one_of_validator(value)
@ -1396,7 +1403,7 @@ def extract_keys(schema):
return keys return keys
@jschema_typed @schema_extractor_typed
def typed_schema(schemas, **kwargs): def typed_schema(schemas, **kwargs):
"""Create a schema that has a key to distinguish between schemas""" """Create a schema that has a key to distinguish between schemas"""
key = kwargs.pop("key", CONF_TYPE) key = kwargs.pop("key", CONF_TYPE)
@ -1510,7 +1517,7 @@ def validate_registry_entry(name, registry):
) )
ignore_keys = extract_keys(base_schema) ignore_keys = extract_keys(base_schema)
@jschema_registry(registry) @schema_extractor_registry(registry)
def validator(value): def validator(value):
if isinstance(value, str): if isinstance(value, str):
value = {value: {}} value = {value: {}}
@ -1555,12 +1562,15 @@ def validate_registry(name, registry):
return ensure_list(validate_registry_entry(name, registry)) return ensure_list(validate_registry_entry(name, registry))
@jschema_list
def maybe_simple_value(*validators, **kwargs): def maybe_simple_value(*validators, **kwargs):
key = kwargs.pop("key", CONF_VALUE) key = kwargs.pop("key", CONF_VALUE)
validator = All(*validators) validator = All(*validators)
@schema_extractor("maybe")
def validate(value): def validate(value):
if value == SCHEMA_EXTRACT:
return (validator, key)
if isinstance(value, dict) and key in value: if isinstance(value, dict) and key in value:
return validator(value) return validator(value)
return validator({key: value}) return validator({key: value})

View file

@ -9,9 +9,9 @@ However there is a property to further disable decorator
impact.""" impact."""
# This is set to true by script/build_jsonschema.py # This is set to true by script/build_language_schema.py
# only, so data is collected (again functionality is not modified) # only, so data is collected (again functionality is not modified)
EnableJsonSchemaCollect = False EnableSchemaExtraction = False
extended_schemas = {} extended_schemas = {}
list_schemas = {} list_schemas = {}
@ -19,9 +19,12 @@ registry_schemas = {}
hidden_schemas = {} hidden_schemas = {}
typed_schemas = {} typed_schemas = {}
# This key is used to generate schema files of Esphome configuration.
SCHEMA_EXTRACT = object()
def jschema_extractor(validator_name):
if EnableJsonSchemaCollect: def schema_extractor(validator_name):
if EnableSchemaExtraction:
def decorator(func): def decorator(func):
hidden_schemas[repr(func)] = validator_name hidden_schemas[repr(func)] = validator_name
@ -35,8 +38,8 @@ def jschema_extractor(validator_name):
return dummy return dummy
def jschema_extended(func): def schema_extractor_extended(func):
if EnableJsonSchemaCollect: if EnableSchemaExtraction:
def decorate(*args, **kwargs): def decorate(*args, **kwargs):
ret = func(*args, **kwargs) ret = func(*args, **kwargs)
@ -49,8 +52,8 @@ def jschema_extended(func):
return func return func
def jschema_list(func): def schema_extractor_list(func):
if EnableJsonSchemaCollect: if EnableSchemaExtraction:
def decorate(*args, **kwargs): def decorate(*args, **kwargs):
ret = func(*args, **kwargs) ret = func(*args, **kwargs)
@ -63,8 +66,8 @@ def jschema_list(func):
return func return func
def jschema_registry(registry): def schema_extractor_registry(registry):
if EnableJsonSchemaCollect: if EnableSchemaExtraction:
def decorator(func): def decorator(func):
registry_schemas[repr(func)] = registry registry_schemas[repr(func)] = registry
@ -78,8 +81,8 @@ def jschema_registry(registry):
return dummy return dummy
def jschema_typed(func): def schema_extractor_typed(func):
if EnableJsonSchemaCollect: if EnableSchemaExtraction:
def decorate(*args, **kwargs): def decorate(*args, **kwargs):
ret = func(*args, **kwargs) ret = func(*args, **kwargs)

View file

@ -2,7 +2,7 @@ import difflib
import itertools import itertools
import voluptuous as vol import voluptuous as vol
from esphome.jsonschema import jschema_extended from esphome.schema_extractors import schema_extractor_extended
class ExtraKeysInvalid(vol.Invalid): class ExtraKeysInvalid(vol.Invalid):
@ -203,7 +203,7 @@ class _Schema(vol.Schema):
self._extra_schemas.append(validator) self._extra_schemas.append(validator)
return self return self
@jschema_extended @schema_extractor_extended
# pylint: disable=signature-differs # pylint: disable=signature-differs
def extend(self, *schemas, **kwargs): def extend(self, *schemas, **kwargs):
extra = kwargs.pop("extra", None) extra = kwargs.pop("extra", None)

View file

@ -1,828 +0,0 @@
#!/usr/bin/env python3
from esphome.cpp_generator import MockObj
import json
import argparse
import os
import re
from pathlib import Path
import voluptuous as vol
# NOTE: Cannot import other esphome components globally as a modification in jsonschema
# is needed before modules are loaded
import esphome.jsonschema as ejs
ejs.EnableJsonSchemaCollect = True
DUMP_COMMENTS = False
JSC_ACTION = "automation.ACTION_REGISTRY"
JSC_ALLOF = "allOf"
JSC_ANYOF = "anyOf"
JSC_COMMENT = "$comment"
JSC_CONDITION = "automation.CONDITION_REGISTRY"
JSC_DESCRIPTION = "description"
JSC_ONEOF = "oneOf"
JSC_PROPERTIES = "properties"
JSC_REF = "$ref"
# this should be required, but YAML Language server completion does not work properly if required are specified.
# still needed for other features / checks
JSC_REQUIRED = "required_"
SIMPLE_AUTOMATION = "simple_automation"
schema_names = {}
schema_registry = {}
components = {}
modules = {}
registries = []
pending_refs = []
definitions = {}
base_props = {}
parser = argparse.ArgumentParser()
parser.add_argument(
"--output", default="esphome.json", help="Output filename", type=os.path.abspath
)
args = parser.parse_args()
def get_ref(definition):
return {JSC_REF: "#/definitions/" + definition}
def is_ref(jschema):
return isinstance(jschema, dict) and JSC_REF in jschema
def unref(jschema):
return definitions.get(jschema[JSC_REF][len("#/definitions/") :])
def add_definition_array_or_single_object(ref):
return {JSC_ANYOF: [{"type": "array", "items": ref}, ref]}
def add_core():
from esphome.core.config import CONFIG_SCHEMA
base_props["esphome"] = get_jschema("esphome", CONFIG_SCHEMA)
def add_buses():
# uart
from esphome.components.uart import UART_DEVICE_SCHEMA
get_jschema("uart_bus", UART_DEVICE_SCHEMA)
# spi
from esphome.components.spi import spi_device_schema
get_jschema("spi_bus", spi_device_schema(False))
# i2c
from esphome.components.i2c import i2c_device_schema
get_jschema("i2c_bus", i2c_device_schema(None))
def add_registries():
for domain, module in modules.items():
add_module_registries(domain, module)
def add_module_registries(domain, module):
from esphome.util import Registry
for c in dir(module):
m = getattr(module, c)
if isinstance(m, Registry):
add_registry(domain + "." + c, m)
def add_registry(registry_name, registry):
validators = []
registries.append((registry, registry_name))
for name in registry.keys():
schema = get_jschema(str(name), registry[name].schema, create_return_ref=False)
if not schema:
schema = {"type": "null"}
o_schema = {"type": "object", JSC_PROPERTIES: {name: schema}}
o_schema = create_ref(
registry_name + "-" + name, str(registry[name].schema) + "x", o_schema
)
validators.append(o_schema)
definitions[registry_name] = {JSC_ANYOF: validators}
def get_registry_ref(registry):
# we don't know yet
ref = {JSC_REF: "pending"}
pending_refs.append((ref, registry))
return ref
def solve_pending_refs():
for ref, registry in pending_refs:
for registry_match, name in registries:
if registry == registry_match:
ref[JSC_REF] = "#/definitions/" + name
def add_module_schemas(name, module):
import esphome.config_validation as cv
for c in dir(module):
v = getattr(module, c)
if isinstance(v, cv.Schema):
get_jschema(name + "." + c, v)
def get_dirs():
from esphome.loader import CORE_COMPONENTS_PATH
dir_names = [
d
for d in os.listdir(CORE_COMPONENTS_PATH)
if not d.startswith("__")
and os.path.isdir(os.path.join(CORE_COMPONENTS_PATH, d))
]
return dir_names
def get_logger_tags():
from esphome.loader import CORE_COMPONENTS_PATH
import glob
pattern = re.compile(r'^static const char(\*\s|\s\*)TAG = "(\w.*)";', re.MULTILINE)
tags = [
"app",
"component",
"esphal",
"helpers",
"preferences",
"scheduler",
"api.service",
]
for x in os.walk(CORE_COMPONENTS_PATH):
for y in glob.glob(os.path.join(x[0], "*.cpp")):
with open(y) as file:
data = file.read()
match = pattern.search(data)
if match:
tags.append(match.group(2))
return tags
def load_components():
import esphome.config_validation as cv
from esphome.config import get_component
modules["cv"] = cv
from esphome import automation
modules["automation"] = automation
for domain in get_dirs():
components[domain] = get_component(domain)
modules[domain] = components[domain].module
def add_components():
from esphome.config import get_platform
for domain, c in components.items():
if c.is_platform_component:
# this is a platform_component, e.g. binary_sensor
platform_schema = [
{
"type": "object",
"properties": {"platform": {"type": "string"}},
}
]
if domain not in ("output", "display"):
# output bases are either FLOAT or BINARY so don't add common base for this
# display bases are either simple or FULL so don't add common base for this
platform_schema = [
{"$ref": f"#/definitions/{domain}.{domain.upper()}_SCHEMA"}
] + platform_schema
base_props[domain] = {"type": "array", "items": {"allOf": platform_schema}}
add_module_registries(domain, c.module)
add_module_schemas(domain, c.module)
# need first to iterate all platforms then iterate components
# a platform component can have other components as properties,
# e.g. climate components usually have a temperature sensor
for domain, c in components.items():
if (c.config_schema is not None) or c.is_platform_component:
if c.is_platform_component:
platform_schema = base_props[domain]["items"]["allOf"]
for platform in get_dirs():
p = get_platform(domain, platform)
if p is not None:
# this is a platform element, e.g.
# - platform: gpio
schema = get_jschema(
domain + "-" + platform,
p.config_schema,
create_return_ref=False,
)
if (
schema
): # for invalid schemas, None is returned thus is deprecated
platform_schema.append(
{
"if": {
JSC_PROPERTIES: {
"platform": {"const": platform}
}
},
"then": schema,
}
)
elif c.config_schema is not None:
# adds root components which are not platforms, e.g. api: logger:
if c.multi_conf:
schema = get_jschema(domain, c.config_schema)
schema = add_definition_array_or_single_object(schema)
else:
schema = get_jschema(domain, c.config_schema, False)
base_props[domain] = schema
def get_automation_schema(name, vschema):
from esphome.automation import AUTOMATION_SCHEMA
# ensure SIMPLE_AUTOMATION
if SIMPLE_AUTOMATION not in definitions:
simple_automation = add_definition_array_or_single_object(get_ref(JSC_ACTION))
simple_automation[JSC_ANYOF].append(
get_jschema(AUTOMATION_SCHEMA.__module__, AUTOMATION_SCHEMA)
)
definitions[schema_names[str(AUTOMATION_SCHEMA)]][JSC_PROPERTIES][
"then"
] = add_definition_array_or_single_object(get_ref(JSC_ACTION))
definitions[SIMPLE_AUTOMATION] = simple_automation
extra_vschema = None
if AUTOMATION_SCHEMA == ejs.extended_schemas[str(vschema)][0]:
extra_vschema = ejs.extended_schemas[str(vschema)][1]
if not extra_vschema:
return get_ref(SIMPLE_AUTOMATION)
# add then property
extra_jschema = get_jschema(name, extra_vschema, False)
if is_ref(extra_jschema):
return extra_jschema
if not JSC_PROPERTIES in extra_jschema:
# these are interval: and exposure_notifications, featuring automations a component
extra_jschema[JSC_ALLOF][0][JSC_PROPERTIES][
"then"
] = add_definition_array_or_single_object(get_ref(JSC_ACTION))
ref = create_ref(name, extra_vschema, extra_jschema)
return add_definition_array_or_single_object(ref)
# automations can be either
# * a single action,
# * an array of action,
# * an object with automation's schema and a then key
# with again a single action or an array of actions
if len(extra_jschema[JSC_PROPERTIES]) == 0:
return get_ref(SIMPLE_AUTOMATION)
extra_jschema[JSC_PROPERTIES]["then"] = add_definition_array_or_single_object(
get_ref(JSC_ACTION)
)
# if there is a required element in extra_jschema then this automation does not support
# directly a list of actions
if JSC_REQUIRED in extra_jschema:
return create_ref(name, extra_vschema, extra_jschema)
jschema = add_definition_array_or_single_object(get_ref(JSC_ACTION))
jschema[JSC_ANYOF].append(extra_jschema)
return create_ref(name, extra_vschema, jschema)
def get_entry(parent_key, vschema):
from esphome.voluptuous_schema import _Schema as schema_type
entry = {}
# annotate schema validator info
if DUMP_COMMENTS:
entry[JSC_COMMENT] = "entry: " + parent_key + "/" + str(vschema)
if isinstance(vschema, dict):
entry = {"what": "is_this"}
elif isinstance(vschema, list):
ref = get_jschema(parent_key + "[]", vschema[0])
entry = {"type": "array", "items": ref}
elif isinstance(vschema, schema_type) and hasattr(vschema, "schema"):
entry = get_jschema(parent_key, vschema, False)
elif hasattr(vschema, "validators"):
entry = get_jschema(parent_key, vschema, False)
elif vschema in schema_registry:
entry = schema_registry[vschema].copy()
elif str(vschema) in ejs.registry_schemas:
entry = get_registry_ref(ejs.registry_schemas[str(vschema)])
elif str(vschema) in ejs.list_schemas:
ref = get_jschema(parent_key, ejs.list_schemas[str(vschema)][0])
entry = {JSC_ANYOF: [ref, {"type": "array", "items": ref}]}
elif str(vschema) in ejs.typed_schemas:
schema_types = [{"type": "object", "properties": {"type": {"type": "string"}}}]
entry = {"allOf": schema_types}
for schema_key, vschema_type in ejs.typed_schemas[str(vschema)][0][0].items():
schema_types.append(
{
"if": {"properties": {"type": {"const": schema_key}}},
"then": get_jschema(f"{parent_key}-{schema_key}", vschema_type),
}
)
elif str(vschema) in ejs.hidden_schemas:
# get the schema from the automation schema
type = ejs.hidden_schemas[str(vschema)]
inner_vschema = vschema(ejs.jschema_extractor)
if type == "automation":
entry = get_automation_schema(parent_key, inner_vschema)
elif type == "maybe":
entry = get_jschema(parent_key, inner_vschema)
elif type == "one_of":
entry = {"enum": list(inner_vschema)}
elif type == "enum":
entry = {"enum": list(inner_vschema.keys())}
elif type == "effects":
# Like list schema but subset from list.
subset_list = inner_vschema[0]
# get_jschema('strobex', registry['strobe'].schema)
registry_schemas = []
for name in subset_list:
registry_schemas.append(get_ref("light.EFFECTS_REGISTRY-" + name))
entry = {
JSC_ANYOF: [{"type": "array", "items": {JSC_ANYOF: registry_schemas}}]
}
else:
raise ValueError("Unknown extracted schema type")
elif str(vschema).startswith("<function invalid."):
# deprecated options, don't list as valid schema
return None
else:
# everything else just accept string and let ESPHome validate
try:
from esphome.core import ID
from esphome.automation import Trigger, Automation
v = vschema(None)
if isinstance(v, ID):
if (
v.type.base != "script::Script"
and v.type.base != "switch_::Switch"
and (v.type.inherits_from(Trigger) or v.type == Automation)
):
return None
entry = {"type": "string", "id_type": v.type.base}
elif isinstance(v, str):
entry = {"type": "string"}
elif isinstance(v, list):
entry = {"type": "array"}
else:
entry = default_schema()
except:
entry = default_schema()
return entry
def default_schema():
# Accept anything
return {"type": ["null", "object", "string", "array", "number"]}
def is_default_schema(jschema):
if jschema is None:
return False
if is_ref(jschema):
jschema = unref(jschema)
if not jschema:
return False
return is_default_schema(jschema)
return "type" in jschema and jschema["type"] == default_schema()["type"]
def get_jschema(path, vschema, create_return_ref=True):
name = schema_names.get(get_schema_str(vschema))
if name:
return get_ref(name)
jschema = convert_schema(path, vschema)
if jschema is None:
return None
if is_ref(jschema):
# this can happen when returned extended
# schemas where all properties found in previous extended schema
return jschema
if not create_return_ref:
return jschema
return create_ref(path, vschema, jschema)
def get_schema_str(vschema):
# Hack on cs.use_id, in the future this can be improved by tracking which type is required by
# the id, this information can be added somehow to schema (not supported by jsonschema) and
# completion can be improved listing valid ids only Meanwhile it's a problem because it makes
# all partial schemas with cv.use_id different, e.g. i2c
return re.sub(
pattern="function use_id.<locals>.validator at 0[xX][0-9a-fA-F]+>",
repl="function use_id.<locals>.validator<>",
string=str(vschema),
)
def create_ref(name, vschema, jschema):
if jschema is None:
raise ValueError("Cannot create a ref with null jschema for " + name)
if name in schema_names:
raise ValueError("Not supported")
schema_str = get_schema_str(vschema)
schema_names[schema_str] = name
definitions[name] = jschema
return get_ref(name)
def get_all_properties(jschema):
if JSC_PROPERTIES in jschema:
return list(jschema[JSC_PROPERTIES].keys())
if is_ref(jschema):
return get_all_properties(unref(jschema))
arr = jschema.get(JSC_ALLOF, jschema.get(JSC_ANYOF))
props = []
for x in arr:
props = props + get_all_properties(x)
return props
def merge(arr, element):
# arr is an array of dicts, dicts can have keys like, properties, $ref, required:[], etc
# element is a single dict which might have several keys to
# the result should be an array with only one element containing properties, required, etc
# and other elements for needed $ref elements
# NOTE: json schema supports allof with properties in different elements, but that makes
# complex for later adding docs to the schema
for k, v in element.items():
if k == JSC_PROPERTIES:
props_found = False
for a_dict in arr:
if JSC_PROPERTIES in a_dict:
# found properties
arr_props = a_dict[JSC_PROPERTIES]
for v_k, v_v in v.items():
arr_props[v_k] = v_v # add or overwrite
props_found = True
if not props_found:
arr.append(element)
elif k == JSC_REF:
ref_found = False
for a_dict in arr:
if k in a_dict and a_dict[k] == v:
ref_found = True
continue
if not ref_found:
arr.append(element)
else:
# TODO: Required might require special handling
pass
def convert_schema(path, vschema, un_extend=True):
import esphome.config_validation as cv
# analyze input key, if it is not a Required or Optional, then it is an array
output = {}
if str(vschema) in ejs.hidden_schemas:
if ejs.hidden_schemas[str(vschema)] == "automation":
vschema = vschema(ejs.jschema_extractor)
jschema = get_jschema(path, vschema, True)
return add_definition_array_or_single_object(jschema)
else:
vschema = vschema(ejs.jschema_extractor)
if un_extend:
extended = ejs.extended_schemas.get(str(vschema))
if extended:
lhs = get_jschema(path, extended[0], False)
# The midea actions are extending an empty schema (resulted in the templatize not templatizing anything)
# this causes a recursion in that this extended looks the same in extended schema as the extended[1]
if ejs.extended_schemas.get(str(vschema)) == ejs.extended_schemas.get(
str(extended[1])
):
assert path.startswith("midea_ac")
return convert_schema(path, extended[1], False)
rhs = get_jschema(path, extended[1], False)
# check if we are not merging properties which are already in base component
lprops = get_all_properties(lhs)
rprops = get_all_properties(rhs)
if all(item in lprops for item in rprops):
return lhs
if all(item in rprops for item in lprops):
return rhs
# merge
if JSC_ALLOF in lhs and JSC_ALLOF in rhs:
output = lhs
for k in rhs[JSC_ALLOF]:
merge(output[JSC_ALLOF], k)
elif JSC_ALLOF in lhs:
output = lhs
merge(output[JSC_ALLOF], rhs)
elif JSC_ALLOF in rhs:
output = rhs
merge(output[JSC_ALLOF], lhs)
else:
output = {JSC_ALLOF: [lhs]}
merge(output[JSC_ALLOF], rhs)
return output
# When schema contains all, all also has a schema which points
# back to the containing schema
if isinstance(vschema, MockObj):
return output
while hasattr(vschema, "schema") and not hasattr(vschema, "validators"):
vschema = vschema.schema
if hasattr(vschema, "validators"):
output = default_schema()
for v in vschema.validators:
if v:
# we should take the valid schema,
# commonly all is used to validate a schema, and then a function which
# is not a schema es also given, get_schema will then return a default_schema()
if v == dict:
continue # this is a dict in the SCHEMA of packages
val_schema = get_jschema(path, v, False)
if is_default_schema(val_schema):
if not output:
output = val_schema
else:
if is_default_schema(output):
output = val_schema
else:
output = {**output, **val_schema}
return output
if not vschema:
return output
if not hasattr(vschema, "keys"):
return get_entry(path, vschema)
key = list(vschema.keys())[0]
# used for platformio_options in core_config
# pylint: disable=comparison-with-callable
if key == cv.string_strict:
output["type"] = "object"
return output
props = output[JSC_PROPERTIES] = {}
required = []
output["type"] = ["object", "null"]
if DUMP_COMMENTS:
output[JSC_COMMENT] = "converted: " + path + "/" + str(vschema)
if path == "logger-logs":
tags = get_logger_tags()
for k in tags:
props[k] = {
"enum": [
"NONE",
"ERROR",
"WARN",
"INFO",
"DEBUG",
"VERBOSE",
"VERY_VERBOSE",
]
}
else:
for k in vschema:
if str(k).startswith("<function"):
# generate all logger tags
# TODO handle key functions
continue
v = vschema[k]
prop = {}
if isinstance(v, vol.Schema):
prop = get_jschema(path + "-" + str(k), v.schema)
elif hasattr(v, "validators"):
prop = convert_schema(path + "-" + str(k), v, False)
else:
prop = get_entry(path + "-" + str(k), v)
if prop: # Deprecated (cv.Invalid) properties not added
props[str(k)] = prop
# TODO: see required, sometimes completions doesn't show up because of this...
if isinstance(k, cv.Required):
required.append(str(k))
try:
if str(k.default) != "...":
default_value = k.default()
# Yaml validator fails if `"default": null` ends up in the json schema
if default_value is not None:
if prop["type"] == "string":
default_value = str(default_value)
prop["default"] = default_value
except:
pass
if len(required) > 0:
output[JSC_REQUIRED] = required
return output
def add_pin_schema():
from esphome import pins
add_module_schemas("PIN", pins)
def add_pin_registry():
from esphome import pins
pin_registry = pins.PIN_SCHEMA_REGISTRY
assert len(pin_registry) > 0
# Here are schemas for pcf8574, mcp23xxx and other port expanders which add
# gpio registers
# ESPHome validates pins schemas if it founds a key in the pin configuration.
# This key is added to a required in jsonschema, and all options are part of a
# oneOf section, so only one is selected. Also internal schema adds number as required.
for mode in ("INPUT", "OUTPUT"):
schema_name = f"PIN.GPIO_FULL_{mode}_PIN_SCHEMA"
# TODO: get pin definitions properly
if schema_name not in definitions:
definitions[schema_name] = {"type": ["object", "null"], JSC_PROPERTIES: {}}
internal = definitions[schema_name]
definitions[schema_name]["additionalItems"] = False
definitions[f"PIN.{mode}_INTERNAL"] = internal
internal[JSC_PROPERTIES]["number"] = {"type": ["number", "string"]}
schemas = [get_ref(f"PIN.{mode}_INTERNAL")]
schemas[0]["required"] = ["number"]
# accept string and object, for internal shorthand pin IO:
definitions[schema_name] = {"oneOf": schemas, "type": ["string", "object"]}
for k, v in pin_registry.items():
if isinstance(v[1], vol.validators.All):
pin_jschema = get_jschema(f"PIN.{mode}_" + k, v[1])
if unref(pin_jschema):
pin_jschema["required"] = [k]
schemas.append(pin_jschema)
def dump_schema():
import esphome.config_validation as cv
from esphome import automation
from esphome.automation import validate_potentially_and_condition
from esphome import pins
from esphome.core import CORE
from esphome.helpers import write_file_if_changed
from esphome.components import remote_base
# The root directory of the repo
root = Path(__file__).parent.parent
# Fake some directory so that get_component works
CORE.config_path = str(root)
file_path = args.output
schema_registry[cv.boolean] = {"type": "boolean"}
for v in [
cv.int_,
cv.int_range,
cv.positive_int,
cv.float_,
cv.positive_float,
cv.positive_float,
cv.positive_not_null_int,
cv.negative_one_to_one_float,
cv.port,
]:
schema_registry[v] = {"type": "number"}
for v in [
cv.string,
cv.string_strict,
cv.valid_name,
cv.hex_int,
cv.hex_int_range,
pins.gpio_output_pin_schema,
pins.gpio_input_pin_schema,
pins.gpio_input_pullup_pin_schema,
cv.float_with_unit,
cv.subscribe_topic,
cv.publish_topic,
cv.mqtt_payload,
cv.ssid,
cv.percentage_int,
cv.percentage,
cv.possibly_negative_percentage,
cv.positive_time_period,
cv.positive_time_period_microseconds,
cv.positive_time_period_milliseconds,
cv.positive_time_period_minutes,
cv.positive_time_period_seconds,
]:
schema_registry[v] = {"type": "string"}
schema_registry[validate_potentially_and_condition] = get_ref("condition_list")
for v in [pins.gpio_input_pin_schema, pins.gpio_input_pullup_pin_schema]:
schema_registry[v] = get_ref("PIN.GPIO_FULL_INPUT_PIN_SCHEMA")
for v in [pins.internal_gpio_input_pin_schema, pins.gpio_input_pin_schema]:
schema_registry[v] = get_ref("PIN.INPUT_INTERNAL")
for v in [pins.gpio_output_pin_schema, pins.internal_gpio_output_pin_schema]:
schema_registry[v] = get_ref("PIN.GPIO_FULL_OUTPUT_PIN_SCHEMA")
for v in [pins.internal_gpio_output_pin_schema, pins.gpio_output_pin_schema]:
schema_registry[v] = get_ref("PIN.OUTPUT_INTERNAL")
add_module_schemas("CONFIG", cv)
get_jschema("POLLING_COMPONENT", cv.polling_component_schema("60s"))
add_pin_schema()
add_module_schemas("REMOTE_BASE", remote_base)
add_module_schemas("AUTOMATION", automation)
load_components()
add_registries()
definitions["condition_list"] = {
JSC_ONEOF: [
{"type": "array", "items": get_ref(JSC_CONDITION)},
get_ref(JSC_CONDITION),
]
}
output = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"definitions": definitions,
JSC_PROPERTIES: base_props,
}
add_core()
add_buses()
add_components()
add_registries() # need second pass, e.g. climate.pid.autotune
add_pin_registry()
solve_pending_refs()
write_file_if_changed(file_path, json.dumps(output))
print(f"Wrote {file_path}")
dump_schema()

View file

@ -1,15 +1,16 @@
import inspect import inspect
import json import json
import argparse import argparse
from operator import truediv
import os import os
import glob
import re
import voluptuous as vol import voluptuous as vol
# NOTE: Cannot import other esphome components globally as a modification in jsonschema # NOTE: Cannot import other esphome components globally as a modification in vol_schema
# is needed before modules are loaded # is needed before modules are loaded
import esphome.jsonschema as ejs import esphome.schema_extractors as ejs
ejs.EnableJsonSchemaCollect = True ejs.EnableSchemaExtraction = True
# schema format: # schema format:
# Schemas are splitted in several files in json format, one for core stuff, one for each platform (sensor, binary_sensor, etc) and # Schemas are splitted in several files in json format, one for core stuff, one for each platform (sensor, binary_sensor, etc) and
@ -60,15 +61,6 @@ solve_registry = []
def get_component_names(): def get_component_names():
# return [
# "esphome",
# "esp32",
# "esp8266",
# "logger",
# "sensor",
# "remote_receiver",
# "binary_sensor",
# ]
from esphome.loader import CORE_COMPONENTS_PATH from esphome.loader import CORE_COMPONENTS_PATH
component_names = ["esphome", "sensor"] component_names = ["esphome", "sensor"]
@ -100,7 +92,7 @@ from esphome import automation
from esphome import pins from esphome import pins
from esphome.components import remote_base from esphome.components import remote_base
from esphome.const import CONF_TYPE from esphome.const import CONF_TYPE
from esphome.loader import get_platform from esphome.loader import get_platform, CORE_COMPONENTS_PATH
from esphome.helpers import write_file_if_changed from esphome.helpers import write_file_if_changed
from esphome.util import Registry from esphome.util import Registry
@ -120,10 +112,12 @@ def write_file(name, obj):
def register_module_schemas(key, module, manifest=None): def register_module_schemas(key, module, manifest=None):
for name, schema in module_schemas(module): for name, schema in module_schemas(module):
register_known_schema(key, name, schema) register_known_schema(key, name, schema)
if (
manifest and manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS] if manifest:
): # not sure about 2nd part of the if, might be useless config (e.g. as3935) # Multi conf should allow list of components
output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True # not sure about 2nd part of the if, might be useless config (e.g. as3935)
if manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]:
output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True
def register_known_schema(module, name, schema): def register_known_schema(module, name, schema):
@ -265,13 +259,58 @@ def do_esp8266():
def fix_remote_receiver(): def fix_remote_receiver():
output["remote_receiver.binary_sensor"]["schemas"]["CONFIG_SCHEMA"] = { remote_receiver_schema = output["remote_receiver.binary_sensor"]["schemas"]
remote_receiver_schema["CONFIG_SCHEMA"] = {
"type": "schema", "type": "schema",
"schema": { "schema": {
"extends": ["binary_sensor.BINARY_SENSOR_SCHEMA", "core.COMPONENT_SCHEMA"], "extends": ["binary_sensor.BINARY_SENSOR_SCHEMA", "core.COMPONENT_SCHEMA"],
"config_vars": output["remote_base"]["binary"], "config_vars": output["remote_base"].pop("binary"),
}, },
} }
remote_receiver_schema["CONFIG_SCHEMA"]["schema"]["config_vars"]["receiver_id"] = {
"key": "GeneratedID",
"use_id_type": "remote_base::RemoteReceiverBase",
"type": "use_id",
}
def fix_script():
output["script"][S_SCHEMAS][S_CONFIG_SCHEMA][S_TYPE] = S_SCHEMA
config_schema = output["script"][S_SCHEMAS][S_CONFIG_SCHEMA]
config_schema[S_SCHEMA][S_CONFIG_VARS]["id"]["id_type"] = {
"class": "script::Script"
}
config_schema["is_list"] = True
def get_logger_tags():
pattern = re.compile(r'^static const char \*const TAG = "(\w.*)";', re.MULTILINE)
# tags not in components dir
tags = [
"app",
"component",
"entity_base",
"scheduler",
"api.service",
]
for x in os.walk(CORE_COMPONENTS_PATH):
for y in glob.glob(os.path.join(x[0], "*.cpp")):
with open(y, encoding="utf-8") as file:
data = file.read()
match = pattern.search(data)
if match:
tags.append(match.group(1))
return tags
def add_logger_tags():
tags = get_logger_tags()
logs = output["logger"]["schemas"]["CONFIG_SCHEMA"]["schema"]["config_vars"][
"logs"
]["schema"]["config_vars"]
for t in tags:
logs[t] = logs["string"].copy()
logs.pop("string")
def add_referenced_recursive(referenced_schemas, config_var, path, eat_schema=False): def add_referenced_recursive(referenced_schemas, config_var, path, eat_schema=False):
@ -401,7 +440,7 @@ def shrink():
else: else:
print("expected extends here!" + x) print("expected extends here!" + x)
arr_s = merge(key_s, arr_s) arr_s = merge(key_s, arr_s)
if arr_s[S_TYPE] == "enum": if arr_s[S_TYPE] in ["enum", "typed"]:
arr_s.pop(S_SCHEMA) arr_s.pop(S_SCHEMA)
else: else:
arr_s.pop(S_EXTENDS) arr_s.pop(S_EXTENDS)
@ -491,14 +530,20 @@ def build_schema():
if domain not in platforms: if domain not in platforms:
if manifest.config_schema is not None: if manifest.config_schema is not None:
core_components[domain] = {} core_components[domain] = {}
if len(manifest.dependencies) > 0:
core_components[domain]["dependencies"] = manifest.dependencies
register_module_schemas(domain, manifest.module, manifest) register_module_schemas(domain, manifest.module, manifest)
for platform in platforms: for platform in platforms:
platform_manifest = get_platform(domain=platform, platform=domain) platform_manifest = get_platform(domain=platform, platform=domain)
if platform_manifest is not None: if platform_manifest is not None:
output[platform][S_COMPONENTS][domain] = {} output[platform][S_COMPONENTS][domain] = {}
if len(platform_manifest.dependencies) > 0:
output[platform][S_COMPONENTS][domain][
"dependencies"
] = platform_manifest.dependencies
register_module_schemas( register_module_schemas(
f"{domain}.{platform}", platform_manifest.module f"{domain}.{platform}", platform_manifest.module, platform_manifest
) )
# Do registries # Do registries
@ -517,6 +562,8 @@ def build_schema():
do_esp8266() do_esp8266()
do_esp32() do_esp32()
fix_remote_receiver() fix_remote_receiver()
fix_script()
add_logger_tags()
shrink() shrink()
# aggregate components, so all component info is in same file, otherwise we have dallas.json, dallas.sensor.json, etc. # aggregate components, so all component info is in same file, otherwise we have dallas.json, dallas.sensor.json, etc.
@ -585,7 +632,7 @@ def convert_1(schema, config_var, path):
assert S_EXTENDS not in config_var assert S_EXTENDS not in config_var
if not S_TYPE in config_var: if not S_TYPE in config_var:
config_var[S_TYPE] = S_SCHEMA config_var[S_TYPE] = S_SCHEMA
assert config_var[S_TYPE] == S_SCHEMA # assert config_var[S_TYPE] == S_SCHEMA
if S_SCHEMA not in config_var: if S_SCHEMA not in config_var:
config_var[S_SCHEMA] = {} config_var[S_SCHEMA] = {}
@ -662,7 +709,7 @@ def convert_1(schema, config_var, path):
elif repr_schema in ejs.hidden_schemas: elif repr_schema in ejs.hidden_schemas:
schema_type = ejs.hidden_schemas[repr_schema] schema_type = ejs.hidden_schemas[repr_schema]
data = schema(ejs.jschema_extractor) data = schema(ejs.SCHEMA_EXTRACT)
# enums, e.g. esp32/variant # enums, e.g. esp32/variant
if schema_type == "one_of": if schema_type == "one_of":
@ -672,8 +719,9 @@ def convert_1(schema, config_var, path):
config_var[S_TYPE] = "enum" config_var[S_TYPE] = "enum"
config_var["values"] = list(data.keys()) config_var["values"] = list(data.keys())
elif schema_type == "maybe": elif schema_type == "maybe":
config_var[S_TYPE] = "maybe" config_var[S_TYPE] = S_SCHEMA
config_var["schema"] = convert_config(data, path + "/maybe")["schema"] config_var["maybe"] = data[1]
config_var["schema"] = convert_config(data[0], path + "/maybe")["schema"]
# esphome/on_boot # esphome/on_boot
elif schema_type == "automation": elif schema_type == "automation":
extra_schema = None extra_schema = None
@ -717,8 +765,50 @@ def convert_1(schema, config_var, path):
elif schema_type == "sensor": elif schema_type == "sensor":
schema = data schema = data
convert_1(data, config_var, path + "/trigger") convert_1(data, config_var, path + "/trigger")
elif schema_type == "declare_id":
# pylint: disable=protected-access
parents = data._parents
config_var["id_type"] = {
"class": str(data.base),
"parents": [str(x.base) for x in parents]
if isinstance(parents, list)
else None,
}
elif schema_type == "use_id":
if inspect.ismodule(data):
m_attr_obj = getattr(data, "CONFIG_SCHEMA")
use_schema = known_schemas.get(repr(m_attr_obj))
if use_schema:
[output_module, output_name] = use_schema[0][1].split(".")
use_id_config = output[output_module][S_SCHEMAS][output_name]
config_var["use_id_type"] = use_id_config["schema"]["config_vars"][
"id"
]["id_type"]["class"]
config_var[S_TYPE] = "use_id"
else:
print("TODO deferred?")
else:
if isinstance(data, str):
# TODO: Figure out why pipsolar does this
config_var["use_id_type"] = data
else:
config_var["use_id_type"] = str(data.base)
config_var[S_TYPE] = "use_id"
else: else:
raise Exception("Unknown extracted schema type") raise Exception("Unknown extracted schema type")
elif config_var.get("key") == "GeneratedID":
if path == "i2c/CONFIG_SCHEMA/extL/all/id":
config_var["id_type"] = {"class": "i2c::I2CBus", "parents": ["Component"]}
elif path == "uart/CONFIG_SCHEMA/val 1/extL/all/id":
config_var["id_type"] = {
"class": "uart::UARTComponent",
"parents": ["Component"],
}
elif path == "pins/esp32/val 1/id":
config_var["id_type"] = "pin"
else:
raise Exception("Cannot determine id_type for " + path)
elif repr_schema in ejs.registry_schemas: elif repr_schema in ejs.registry_schemas:
solve_registry.append((ejs.registry_schemas[repr_schema], config_var)) solve_registry.append((ejs.registry_schemas[repr_schema], config_var))
@ -787,7 +877,13 @@ def convert_keys(converted, schema, path):
result["key"] = "Optional" result["key"] = "Optional"
else: else:
converted["key"] = "String" converted["key"] = "String"
converted["key_dump"] = str(k) key_string_match = re.search(
r"<function (\w*) at \w*>", str(k), re.IGNORECASE
)
if key_string_match:
converted["key_type"] = key_string_match.group(1)
else:
converted["key_type"] = str(k)
esphome_core.CORE.data = { esphome_core.CORE.data = {
esphome_core.KEY_CORE: {esphome_core.KEY_TARGET_PLATFORM: "esp8266"} esphome_core.KEY_CORE: {esphome_core.KEY_TARGET_PLATFORM: "esp8266"}
@ -808,6 +904,12 @@ def convert_keys(converted, schema, path):
if base_k in result and base_v == result[base_k]: if base_k in result and base_v == result[base_k]:
result.pop(base_k) result.pop(base_k)
converted["schema"][S_CONFIG_VARS][str(k)] = result converted["schema"][S_CONFIG_VARS][str(k)] = result
if "key" in converted and converted["key"] == "String":
config_vars = converted["schema"]["config_vars"]
assert len(config_vars) == 1
key = list(config_vars.keys())[0]
assert key.startswith("<")
config_vars["string"] = config_vars.pop(key)
build_schema() build_schema()