esphome/esphome/yaml_util.py
Otto Winter c2a0c22bd9
Automatically hide secrets in validation (#455)
* Hide secrets in validation

* Lint
2019-02-26 18:32:20 +01:00

408 lines
14 KiB
Python

from __future__ import print_function
import codecs
from collections import OrderedDict
import fnmatch
import logging
import os
import uuid
import yaml
import yaml.constructor
from esphome import core
from esphome.core import EsphomeError, HexInt, IPAddress, Lambda, MACAddress, TimePeriod
from esphome.py_compat import string_types, text_type, IS_PY2
_LOGGER = logging.getLogger(__name__)
# Mostly copied from Home Assistant because that code works fine and
# let's not reinvent the wheel here
SECRET_YAML = u'secrets.yaml'
_SECRET_CACHE = {}
_SECRET_VALUES = {}
class NodeListClass(list):
"""Wrapper class to be able to add attributes on a list."""
class NodeStrClass(text_type):
"""Wrapper class to be able to add attributes on a string."""
class SafeLineLoader(yaml.SafeLoader): # pylint: disable=too-many-ancestors
"""Loader class that keeps track of line numbers."""
def compose_node(self, parent, index):
"""Annotate a node with the first line it was seen."""
last_line = self.line # type: int
node = super(SafeLineLoader, self).compose_node(parent, index) # type: yaml.nodes.Node
node.__line__ = last_line + 1
return node
def load_yaml(fname):
_SECRET_VALUES.clear()
_SECRET_CACHE.clear()
return _load_yaml_internal(fname)
def _load_yaml_internal(fname):
"""Load a YAML file."""
try:
with codecs.open(fname, encoding='utf-8') as conf_file:
return yaml.load(conf_file, Loader=SafeLineLoader) or OrderedDict()
except yaml.YAMLError as exc:
raise EsphomeError(exc)
except IOError as exc:
raise EsphomeError(u"Error accessing file {}: {}".format(fname, exc))
except UnicodeDecodeError as exc:
_LOGGER.error(u"Unable to read file %s: %s", fname, exc)
raise EsphomeError(exc)
def dump(dict_):
"""Dump YAML to a string and remove null."""
return yaml.safe_dump(
dict_, default_flow_style=False, allow_unicode=True)
def custom_construct_pairs(loader, node):
pairs = []
for kv in node.value:
if isinstance(kv, yaml.ScalarNode):
obj = loader.construct_object(kv)
if not isinstance(obj, dict):
raise EsphomeError(
"Expected mapping for anchored include tag, got {}".format(type(obj)))
for key, value in obj.items():
pairs.append((key, value))
else:
key_node, value_node = kv
key = loader.construct_object(key_node)
value = loader.construct_object(value_node)
pairs.append((key, value))
return pairs
def custom_flatten_mapping(loader, node):
pre_merge = []
post_merge = []
index = 0
while index < len(node.value):
if isinstance(node.value[index], yaml.ScalarNode):
index += 1
continue
key_node, value_node = node.value[index]
if key_node.tag == u'tag:yaml.org,2002:merge':
del node.value[index]
if isinstance(value_node, yaml.MappingNode):
custom_flatten_mapping(loader, value_node)
node.value = node.value[:index] + value_node.value + node.value[index:]
elif isinstance(value_node, yaml.SequenceNode):
submerge = []
for subnode in value_node.value:
if not isinstance(subnode, yaml.MappingNode):
raise yaml.constructor.ConstructorError(
"while constructing a mapping", node.start_mark,
"expected a mapping for merging, but found %{}".format(subnode.id),
subnode.start_mark)
custom_flatten_mapping(loader, subnode)
submerge.append(subnode.value)
# submerge.reverse()
node.value = node.value[:index] + submerge + node.value[index:]
elif isinstance(value_node, yaml.ScalarNode):
node.value = node.value[:index] + [value_node] + node.value[index:]
# post_merge.append(value_node)
else:
raise yaml.constructor.ConstructorError(
"while constructing a mapping", node.start_mark,
"expected a mapping or list of mappings for merging, "
"but found {}".format(value_node.id), value_node.start_mark)
elif key_node.tag == u'tag:yaml.org,2002:value':
key_node.tag = u'tag:yaml.org,2002:str'
index += 1
else:
index += 1
if pre_merge:
node.value = pre_merge + node.value
if post_merge:
node.value = node.value + post_merge
def _ordered_dict(loader, node):
"""Load YAML mappings into an ordered dictionary to preserve key order."""
custom_flatten_mapping(loader, node)
nodes = custom_construct_pairs(loader, node)
seen = {}
for (key, _), nv in zip(nodes, node.value):
if isinstance(nv, yaml.ScalarNode):
line = nv.start_mark.line
else:
line = nv[0].start_mark.line
try:
hash(key)
except TypeError:
fname = getattr(loader.stream, 'name', '')
raise yaml.MarkedYAMLError(
context="invalid key: \"{}\"".format(key),
context_mark=yaml.Mark(fname, 0, line, -1, None, None)
)
if key in seen:
fname = getattr(loader.stream, 'name', '')
raise EsphomeError(u'YAML file {} contains duplicate key "{}". '
u'Check lines {} and {}.'.format(fname, key, seen[key], line))
seen[key] = line
return _add_reference(OrderedDict(nodes), loader, node)
def _construct_seq(loader, node):
"""Add line number and file name to Load YAML sequence."""
obj, = loader.construct_yaml_seq(node)
return _add_reference(obj, loader, node)
def _add_reference(obj, loader, node):
"""Add file reference information to an object."""
if isinstance(obj, string_types):
obj = NodeStrClass(obj)
if isinstance(obj, list):
obj = NodeListClass(obj)
setattr(obj, '__config_file__', loader.name)
setattr(obj, '__line__', node.start_mark.line)
return obj
def _env_var_yaml(_, node):
"""Load environment variables and embed it into the configuration YAML."""
args = node.value.split()
# Check for a default value
if len(args) > 1:
return os.getenv(args[0], u' '.join(args[1:]))
if args[0] in os.environ:
return os.environ[args[0]]
raise EsphomeError(u"Environment variable {} not defined.".format(node.value))
def _include_yaml(loader, node):
"""Load another YAML file and embeds it using the !include tag.
Example:
device_tracker: !include device_tracker.yaml
"""
fname = os.path.join(os.path.dirname(loader.name), node.value)
return _add_reference(_load_yaml_internal(fname), loader, node)
def _is_file_valid(name):
"""Decide if a file is valid."""
return not name.startswith(u'.')
def _find_files(directory, pattern):
"""Recursively load files in a directory."""
for root, dirs, files in os.walk(directory, topdown=True):
dirs[:] = [d for d in dirs if _is_file_valid(d)]
for basename in files:
if _is_file_valid(basename) and fnmatch.fnmatch(basename, pattern):
filename = os.path.join(root, basename)
yield filename
def _include_dir_named_yaml(loader, node):
"""Load multiple files from directory as a dictionary."""
mapping = OrderedDict() # type: OrderedDict
loc = os.path.join(os.path.dirname(loader.name), node.value)
for fname in _find_files(loc, '*.yaml'):
filename = os.path.splitext(os.path.basename(fname))[0]
mapping[filename] = _load_yaml_internal(fname)
return _add_reference(mapping, loader, node)
def _include_dir_merge_named_yaml(loader, node):
"""Load multiple files from directory as a merged dictionary."""
mapping = OrderedDict() # type: OrderedDict
loc = os.path.join(os.path.dirname(loader.name), node.value)
for fname in _find_files(loc, '*.yaml'):
if os.path.basename(fname) == SECRET_YAML:
continue
loaded_yaml = _load_yaml_internal(fname)
if isinstance(loaded_yaml, dict):
mapping.update(loaded_yaml)
return _add_reference(mapping, loader, node)
def _include_dir_list_yaml(loader, node):
"""Load multiple files from directory as a list."""
loc = os.path.join(os.path.dirname(loader.name), node.value)
return [_load_yaml_internal(f) for f in _find_files(loc, '*.yaml')
if os.path.basename(f) != SECRET_YAML]
def _include_dir_merge_list_yaml(loader, node):
"""Load multiple files from directory as a merged list."""
path = os.path.join(os.path.dirname(loader.name), node.value)
merged_list = []
for fname in _find_files(path, '*.yaml'):
if os.path.basename(fname) == SECRET_YAML:
continue
loaded_yaml = _load_yaml_internal(fname)
if isinstance(loaded_yaml, list):
merged_list.extend(loaded_yaml)
return _add_reference(merged_list, loader, node)
def is_secret(value):
try:
return _SECRET_VALUES[text_type(value)]
except (KeyError, ValueError):
return None
# pylint: disable=protected-access
def _secret_yaml(loader, node):
"""Load secrets and embed it into the configuration YAML."""
secret_path = os.path.join(os.path.dirname(loader.name), SECRET_YAML)
secrets = _load_yaml_internal(secret_path)
if node.value not in secrets:
raise EsphomeError(u"Secret {} not defined".format(node.value))
val = secrets[node.value]
_SECRET_VALUES[text_type(val)] = node.value
return val
def _lambda(loader, node):
return Lambda(text_type(node.value))
yaml.SafeLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _ordered_dict)
yaml.SafeLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG, _construct_seq)
yaml.SafeLoader.add_constructor('!env_var', _env_var_yaml)
yaml.SafeLoader.add_constructor('!secret', _secret_yaml)
yaml.SafeLoader.add_constructor('!include', _include_yaml)
yaml.SafeLoader.add_constructor('!include_dir_list', _include_dir_list_yaml)
yaml.SafeLoader.add_constructor('!include_dir_merge_list',
_include_dir_merge_list_yaml)
yaml.SafeLoader.add_constructor('!include_dir_named', _include_dir_named_yaml)
yaml.SafeLoader.add_constructor('!include_dir_merge_named',
_include_dir_merge_named_yaml)
yaml.SafeLoader.add_constructor('!lambda', _lambda)
# From: https://gist.github.com/miracle2k/3184458
# pylint: disable=redefined-outer-name
def represent_odict(dump, tag, mapping, flow_style=None):
"""Like BaseRepresenter.represent_mapping but does not issue the sort()."""
value = []
node = yaml.MappingNode(tag, value, flow_style=flow_style)
if dump.alias_key is not None:
dump.represented_objects[dump.alias_key] = node
best_style = True
if hasattr(mapping, 'items'):
mapping = list(mapping.items())
for item_key, item_value in mapping:
node_key = dump.represent_data(item_key)
node_value = dump.represent_data(item_value)
if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
best_style = False
if not (isinstance(node_value, yaml.ScalarNode) and
not node_value.style):
best_style = False
value.append((node_key, node_value))
if flow_style is None:
if dump.default_flow_style is not None:
node.flow_style = dump.default_flow_style
else:
node.flow_style = best_style
return node
def represent_secret(value):
return yaml.ScalarNode(tag=u'!secret', value=_SECRET_VALUES[value])
def unicode_representer(_, uni):
if is_secret(uni):
return represent_secret(uni)
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=uni)
return node
def hex_int_representer(_, data):
if is_secret(data):
return represent_secret(data)
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:int', value=str(data))
return node
def stringify_representer(_, data):
if is_secret(data):
return represent_secret(data)
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=str(data))
return node
TIME_PERIOD_UNIT_MAP = {
'microseconds': 'us',
'milliseconds': 'ms',
'seconds': 's',
'minutes': 'min',
'hours': 'h',
'days': 'd',
}
def represent_time_period(dumper, data):
dictionary = data.as_dict()
if len(dictionary) == 1:
unit, value = dictionary.popitem()
out = '{}{}'.format(value, TIME_PERIOD_UNIT_MAP[unit])
return yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=out)
return represent_odict(dumper, 'tag:yaml.org,2002:map', dictionary)
def represent_lambda(_, data):
if is_secret(data.value):
return represent_secret(data.value)
node = yaml.ScalarNode(tag='!lambda', value=data.value, style='|')
return node
def represent_id(_, data):
if is_secret(data.id):
return represent_secret(data.id)
return yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=data.id)
yaml.SafeDumper.add_representer(
OrderedDict,
lambda dumper, value:
represent_odict(dumper, 'tag:yaml.org,2002:map', value)
)
yaml.SafeDumper.add_representer(
NodeListClass,
lambda dumper, value:
dumper.represent_sequence('tag:yaml.org,2002:seq', value)
)
yaml.SafeDumper.add_representer(str, unicode_representer)
if IS_PY2:
yaml.SafeDumper.add_representer(unicode, unicode_representer)
yaml.SafeDumper.add_representer(HexInt, hex_int_representer)
yaml.SafeDumper.add_representer(IPAddress, stringify_representer)
yaml.SafeDumper.add_representer(MACAddress, stringify_representer)
yaml.SafeDumper.add_multi_representer(TimePeriod, represent_time_period)
yaml.SafeDumper.add_multi_representer(Lambda, represent_lambda)
yaml.SafeDumper.add_multi_representer(core.ID, represent_id)
yaml.SafeDumper.add_multi_representer(uuid.UUID, stringify_representer)