from __future__ import print_function import argparse from collections import OrderedDict from datetime import datetime import logging import os import random import sys from esphomeyaml import const, core_config, mqtt, platformio_api, wizard, writer, yaml_util from esphomeyaml.api.client import run_logs from esphomeyaml.config import get_component, iter_components, read_config, strip_default_ids from esphomeyaml.const import CONF_BAUD_RATE, CONF_ESPHOMEYAML, CONF_LOGGER, CONF_USE_CUSTOM_CODE, \ CONF_BROKER from esphomeyaml.core import CORE, EsphomeyamlError from esphomeyaml.cpp_generator import Expression, RawStatement, add, statement from esphomeyaml.helpers import color, indent from esphomeyaml.py_compat import safe_input, text_type, IS_PY2 from esphomeyaml.storage_json import StorageJSON, esphomeyaml_storage_path, \ start_update_check_thread, storage_path from esphomeyaml.util import run_external_command, safe_print _LOGGER = logging.getLogger(__name__) PRE_INITIALIZE = ['esphomeyaml', 'logger', 'wifi', 'ethernet', 'ota', 'mqtt', 'web_server', 'api', 'i2c'] def get_serial_ports(): # from https://github.com/pyserial/pyserial/blob/master/serial/tools/list_ports.py from serial.tools.list_ports import comports result = [] for port, desc, info in comports(): if not port: continue if "VID:PID" in info: result.append((port, desc)) result.sort(key=lambda x: x[0]) return result def choose_prompt(options): if not options: raise ValueError if len(options) == 1: return options[0][1] safe_print(u"Found multiple options, please choose one:") for i, (desc, _) in enumerate(options): safe_print(u" [{}] {}".format(i + 1, desc)) while True: opt = safe_input('(number): ') if opt in options: opt = options.index(opt) break try: opt = int(opt) if opt < 1 or opt > len(options): raise ValueError break except ValueError: safe_print(color('red', u"Invalid option: '{}'".format(opt))) return options[opt - 1][1] def choose_upload_log_host(default, check_default, show_ota, show_mqtt, show_api): options = [] for res, desc in get_serial_ports(): options.append((u"{} ({})".format(res, desc), res)) if (show_ota and 'ota' in CORE.config) or (show_api and 'api' in CORE.config): options.append((u"Over The Air ({})".format(CORE.address), CORE.address)) if default == 'OTA': return CORE.address if show_mqtt and 'mqtt' in CORE.config: options.append((u"MQTT ({})".format(CORE.config['mqtt'][CONF_BROKER]), 'MQTT')) if default == 'OTA': return 'MQTT' if default is not None: return default if check_default is not None and check_default in [opt[1] for opt in options]: return check_default return choose_prompt(options) def get_port_type(port): if port.startswith('/') or port.startswith('COM'): return 'SERIAL' if port == 'MQTT': return 'MQTT' return 'NETWORK' def run_miniterm(config, port): import serial if CONF_LOGGER not in config: _LOGGER.info("Logger is not enabled. Not starting UART logs.") return baud_rate = config['logger'][CONF_BAUD_RATE] if baud_rate == 0: _LOGGER.info("UART logging is disabled (baud_rate=0). Not starting UART logs.") _LOGGER.info("Starting log output from %s with baud rate %s", port, baud_rate) backtrace_state = False with serial.Serial(port, baudrate=baud_rate) as ser: while True: try: raw = ser.readline() except serial.SerialException: _LOGGER.error("Serial port closed!") return if IS_PY2: line = raw.replace('\r', '').replace('\n', '') else: line = raw.replace(b'\r', b'').replace(b'\n', b'').decode('utf8', 'backslashreplace') time = datetime.now().time().strftime('[%H:%M:%S]') message = time + line safe_print(message) backtrace_state = platformio_api.process_stacktrace( config, line, backtrace_state=backtrace_state) def write_cpp(config): _LOGGER.info("Generating C++ source...") CORE.add_job(core_config.to_code, config[CONF_ESPHOMEYAML], domain='esphomeyaml') for domain in PRE_INITIALIZE: if domain == CONF_ESPHOMEYAML or domain not in config: continue CORE.add_job(get_component(domain).to_code, config[domain], domain=domain) for domain, component, conf in iter_components(config): if domain in PRE_INITIALIZE or not hasattr(component, 'to_code'): continue CORE.add_job(component.to_code, conf, domain=domain) CORE.flush_tasks() add(RawStatement('')) add(RawStatement('')) all_code = [] for exp in CORE.expressions: if not config[CONF_ESPHOMEYAML][CONF_USE_CUSTOM_CODE]: if isinstance(exp, Expression) and not exp.required: continue all_code.append(text_type(statement(exp))) writer.write_platformio_project() code_s = indent('\n'.join(line.rstrip() for line in all_code)) writer.write_cpp(code_s) return 0 def compile_program(args, config): _LOGGER.info("Compiling app...") update_check = not os.getenv('ESPHOMEYAML_NO_UPDATE_CHECK', '') if update_check: thread = start_update_check_thread(esphomeyaml_storage_path(CORE.config_dir)) rc = platformio_api.run_compile(config, args.verbose) if update_check: thread.join() return rc def upload_using_esptool(config, port): import esptool path = os.path.join(CORE.build_path, '.pioenvs', CORE.name, 'firmware.bin') cmd = ['esptool.py', '--before', 'default_reset', '--after', 'hard_reset', '--chip', 'esp8266', '--port', port, 'write_flash', '0x0', path] # pylint: disable=protected-access return run_external_command(esptool._main, *cmd) def upload_program(config, args, host): # if upload is to a serial port use platformio, otherwise assume ota if get_port_type(host) == 'SERIAL': if CORE.is_esp8266: return upload_using_esptool(config, host) return platformio_api.run_upload(config, args.verbose, host) from esphomeyaml.components import ota from esphomeyaml import espota2 if args.host_port is not None: host_port = args.host_port else: host_port = int(os.getenv('ESPHOMEYAML_OTA_HOST_PORT', random.randint(10000, 60000))) verbose = args.verbose remote_port = ota.get_port(config) password = ota.get_auth(config) storage = StorageJSON.load(storage_path()) res = espota2.run_ota(host, remote_port, password, CORE.firmware_bin) if res == 0: if storage is not None and storage.use_legacy_ota: storage.use_legacy_ota = False storage.save(storage_path()) return res if storage is not None and not storage.use_legacy_ota: return res _LOGGER.warning("OTA v2 method failed. Trying with legacy OTA...") return espota2.run_legacy_ota(verbose, host_port, host, remote_port, password, CORE.firmware_bin) def show_logs(config, args, port): if 'logger' not in config: raise EsphomeyamlError("Logger is not configured!") if get_port_type(port) == 'SERIAL': run_miniterm(config, port) return 0 if get_port_type(port) == 'NETWORK' and 'api' in config: return run_logs(config, port) if get_port_type(port) == 'MQTT' and 'mqtt' in config: return mqtt.show_logs(config, args.topic, args.username, args.password, args.client_id) raise ValueError def clean_mqtt(config, args): return mqtt.clear_topic(config, args.topic, args.username, args.password, args.client_id) def setup_log(debug=False): log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig(level=log_level) fmt = "%(levelname)s %(message)s" colorfmt = "%(log_color)s{}%(reset)s".format(fmt) datefmt = '%H:%M:%S' logging.getLogger('urllib3').setLevel(logging.WARNING) try: from colorlog import ColoredFormatter logging.getLogger().handlers[0].setFormatter(ColoredFormatter( colorfmt, datefmt=datefmt, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', } )) except ImportError: pass def command_wizard(args): return wizard.wizard(args.configuration) def command_config(args, config): _LOGGER.info("Configuration is valid!") if not args.verbose: config = strip_default_ids(config) safe_print(yaml_util.dump(config)) return 0 def command_compile(args, config): exit_code = write_cpp(config) if exit_code != 0: return exit_code if args.only_generate: _LOGGER.info(u"Successfully generated source code.") return 0 exit_code = compile_program(args, config) if exit_code != 0: return exit_code _LOGGER.info(u"Successfully compiled program.") return 0 def command_upload(args, config): port = choose_upload_log_host(default=args.upload_port, check_default=None, show_ota=True, show_mqtt=False, show_api=False) exit_code = upload_program(config, args, port) if exit_code != 0: return exit_code _LOGGER.info(u"Successfully uploaded program.") return 0 def command_logs(args, config): port = choose_upload_log_host(default=args.serial_port, check_default=None, show_ota=False, show_mqtt=True, show_api=True) return show_logs(config, args, port) def command_run(args, config): exit_code = write_cpp(config) if exit_code != 0: return exit_code exit_code = compile_program(args, config) if exit_code != 0: return exit_code _LOGGER.info(u"Successfully compiled program.") port = choose_upload_log_host(default=args.upload_port, check_default=None, show_ota=True, show_mqtt=False, show_api=True) exit_code = upload_program(config, args, port) if exit_code != 0: return exit_code _LOGGER.info(u"Successfully uploaded program.") if args.no_logs: return 0 port = choose_upload_log_host(default=args.upload_port, check_default=port, show_ota=False, show_mqtt=True, show_api=True) return show_logs(config, args, port) def command_clean_mqtt(args, config): return clean_mqtt(config, args) def command_mqtt_fingerprint(args, config): return mqtt.get_fingerprint(config) def command_version(args): safe_print(u"Version: {}".format(const.__version__)) return 0 def command_clean(args, config): try: writer.clean_build() except OSError as err: _LOGGER.error("Error deleting build files: %s", err) return 1 _LOGGER.info("Done!") return 0 def command_hass_config(args, config): from esphomeyaml.components import mqtt as mqtt_component _LOGGER.info("This is what you should put in your Home Assistant YAML configuration.") _LOGGER.info("Please note this is only necessary if you're not using MQTT discovery.") data = mqtt_component.GenerateHassConfigData(config) hass_config = OrderedDict() for domain, component, conf in iter_components(config): if not hasattr(component, 'to_hass_config'): continue func = getattr(component, 'to_hass_config') ret = func(data, conf) if not isinstance(ret, (list, tuple)): ret = [ret] ret = [x for x in ret if x is not None] domain_conf = hass_config.setdefault(domain.split('.')[0], []) domain_conf += ret safe_print(yaml_util.dump(hass_config)) return 0 def command_dashboard(args): from esphomeyaml.dashboard import dashboard return dashboard.start_web_server(args) PRE_CONFIG_ACTIONS = { 'wizard': command_wizard, 'version': command_version, 'dashboard': command_dashboard } POST_CONFIG_ACTIONS = { 'config': command_config, 'compile': command_compile, 'upload': command_upload, 'logs': command_logs, 'run': command_run, 'clean-mqtt': command_clean_mqtt, 'mqtt-fingerprint': command_mqtt_fingerprint, 'clean': command_clean, 'hass-config': command_hass_config, } def parse_args(argv): parser = argparse.ArgumentParser(prog='esphomeyaml') parser.add_argument('-v', '--verbose', help="Enable verbose esphomeyaml logs.", action='store_true') parser.add_argument('--dashboard', help="Internal flag to set if the command is run from the " "dashboard.", action='store_true') parser.add_argument('configuration', help='Your YAML configuration file.') subparsers = parser.add_subparsers(help='Commands', dest='command') subparsers.required = True subparsers.add_parser('config', help='Validate the configuration and spit it out.') parser_compile = subparsers.add_parser('compile', help='Read the configuration and compile a program.') parser_compile.add_argument('--only-generate', help="Only generate source code, do not compile.", action='store_true') parser_upload = subparsers.add_parser('upload', help='Validate the configuration ' 'and upload the latest binary.') parser_upload.add_argument('--upload-port', help="Manually specify the upload port to use. " "For example /dev/cu.SLAB_USBtoUART.") parser_upload.add_argument('--host-port', help="Specify the host port.", type=int) parser_logs = subparsers.add_parser('logs', help='Validate the configuration ' 'and show all MQTT logs.') parser_logs.add_argument('--topic', help='Manually set the topic to subscribe to.') parser_logs.add_argument('--username', help='Manually set the username.') parser_logs.add_argument('--password', help='Manually set the password.') parser_logs.add_argument('--client-id', help='Manually set the client id.') parser_logs.add_argument('--serial-port', help="Manually specify a serial port to use" "For example /dev/cu.SLAB_USBtoUART.") parser_run = subparsers.add_parser('run', help='Validate the configuration, create a binary, ' 'upload it, and start MQTT logs.') parser_run.add_argument('--upload-port', help="Manually specify the upload port/ip to use. " "For example /dev/cu.SLAB_USBtoUART.") parser_run.add_argument('--host-port', help="Specify the host port to use for OTA", type=int) parser_run.add_argument('--no-logs', help='Disable starting MQTT logs.', action='store_true') parser_run.add_argument('--topic', help='Manually set the topic to subscribe to for logs.') parser_run.add_argument('--username', help='Manually set the MQTT username for logs.') parser_run.add_argument('--password', help='Manually set the MQTT password for logs.') parser_run.add_argument('--client-id', help='Manually set the client id for logs.') parser_clean = subparsers.add_parser('clean-mqtt', help="Helper to clear an MQTT topic from " "retain messages.") parser_clean.add_argument('--topic', help='Manually set the topic to subscribe to.') parser_clean.add_argument('--username', help='Manually set the username.') parser_clean.add_argument('--password', help='Manually set the password.') parser_clean.add_argument('--client-id', help='Manually set the client id.') subparsers.add_parser('wizard', help="A helpful setup wizard that will guide " "you through setting up esphomeyaml.") subparsers.add_parser('mqtt-fingerprint', help="Get the SSL fingerprint from a MQTT broker.") subparsers.add_parser('version', help="Print the esphomeyaml version and exit.") subparsers.add_parser('clean', help="Delete all temporary build files.") dashboard = subparsers.add_parser('dashboard', help="Create a simple web server for a dashboard.") dashboard.add_argument("--port", help="The HTTP port to open connections on. Defaults to 6052.", type=int, default=6052) dashboard.add_argument("--password", help="The optional password to require for all requests.", type=str, default='') dashboard.add_argument("--open-ui", help="Open the dashboard UI in a browser.", action='store_true') dashboard.add_argument("--hassio", help="Internal flag used to tell esphomeyaml is started as a Hass.io " "add-on.", action="store_true") dashboard.add_argument("--socket", help="Make the dashboard serve under a unix socket", type=str) subparsers.add_parser('hass-config', help="Dump the configuration entries that should be added " "to Home Assistant when not using MQTT discovery.") return parser.parse_args(argv[1:]) def run_esphomeyaml(argv): args = parse_args(argv) CORE.dashboard = args.dashboard setup_log(args.verbose) if args.command in PRE_CONFIG_ACTIONS: try: return PRE_CONFIG_ACTIONS[args.command](args) except EsphomeyamlError as e: _LOGGER.error(e) return 1 CORE.config_path = args.configuration config = read_config(args.verbose) if config is None: return 1 CORE.config = config if args.command in POST_CONFIG_ACTIONS: try: return POST_CONFIG_ACTIONS[args.command](args, config) except EsphomeyamlError as e: _LOGGER.error(e) return 1 safe_print(u"Unknown command {}".format(args.command)) return 1 def main(): try: return run_esphomeyaml(sys.argv) except EsphomeyamlError as e: _LOGGER.error(e) return 1 except KeyboardInterrupt: return 1 if __name__ == "__main__": sys.exit(main())