import asyncio from tabulate import tabulate import json import copy import shortuuid import logging import os.path from ..api.step import CBPiStep, Stop_Reason class StepController: def __init__(self, cbpi): self.cbpi = cbpi self.woohoo = "HALLLO" self.logger = logging.getLogger(__name__) self.path = os.path.join(".", 'config', "step_data.json") self._loop = asyncio.get_event_loop() self.basic_data = {} self.step = None self.types = {} self.cbpi.app.on_cleanup.append(self.shutdown) async def init(self): logging.info("INIT STEP Controller") self.load(startActive=True) def load(self, startActive=False): # create file if not exists if os.path.exists(self.path) is False: with open(self.path, "w") as file: json.dump(dict(basic={}, profile=[]), file, indent=4, sort_keys=True) #load from json file with open(self.path) as json_file: data = json.load(json_file) self.basic_data = data["basic"] self.profile = data["profile"] # Start step after start up self.profile = list(map(lambda item: {**item, "instance": self.create_step(item.get("id"), item.get("type"), item.get("name"), item.get("props", {}))}, self.profile)) if startActive is True: active_step = self.find_by_status("A") if active_step is not None: self._loop.create_task(self.start_step(active_step)) async def add(self, data): logging.info("Add step") id = shortuuid.uuid() item = {**{"status": "I", "props": {}}, **data, "id": id, "instance": self.create_step(id, data.get("type"), data.get("name"), data.get("props", {}))} self.profile.append(item) await self.save() return item async def update(self, id, data): logging.info("update step") def merge_data(id, old, data): step = {**old, **data} try: step["instance"] = self.create_step(id,data["type"], data["name"], data["props"]) except Exception as e: logging.error("Faild create step instance during update props") return step self.profile = list(map(lambda old: {**merge_data(id, old, data)} if old["id"] == id else old, self.profile)) await self.save() return self.find_by_id(id) async def save(self): logging.debug("save profile") data = dict(basic=self.basic_data, profile=list(map(lambda x: dict(name=x["name"], type=x.get("type"), id=x["id"], status=x["status"],props=x["props"]), self.profile))) with open(self.path, "w") as file: json.dump(data, file, indent=4, sort_keys=True) self.push_udpate() async def start(self): # already running if self.find_by_status("A") is not None: logging.error("Steps already running") return # Find next inactive step step = self.find_by_status("P") if step is not None: logging.info("Resume step") await self.start_step(step) await self.save() return step = self.find_by_status("I") if step is not None: logging.info("####### Start Step") await self.start_step(step) await self.save() return await self.cbpi.notification.notify(message="HALLO") logging.info("BREWING COMPLETE") async def next(self): logging.info("Trigger Next") step = self.find_by_status("A") if step is not None: instance = step.get("instance") if instance is not None: await instance.next() step = self.find_by_status("P") if step is not None: instance = step.get("instance") if instance is not None: step["status"] = "D" await self.save() await self.start() else: logging.info("No Step is running") async def resume(self): step = self.find_by_status("P") if step is not None: instance = step.get("instance") if instance is not None: await self.start_step(step) else: logging.info("Nothing to resume") async def stop(self): logging.info("STOP STEP") step = self.find_by_status("A") if step != None and step.get("instance") is not None: logging.info("CALLING STOP STEP") instance = step.get("instance") await instance.stop() logging.info("STEP STOPPED") step["status"] = "P" await self.save() async def reset_all(self): step = self.find_by_status("A") if step is not None: logging.error("Please stop before reset") return for item in self.profile: logging.info("Reset %s" % item.get("name")) item["status"] = "I" await item["instance"].reset() self.push_udpate() def create_step(self, id, type, name, props): print(id, type, name, props) try: type_cfg = self.types.get(type) clazz = type_cfg.get("class") return clazz(self.cbpi, id, name, {**props}) except: pass def create_dict(self, data): return dict(name=data["name"], id=data["id"], type=data.get("type"), status=data["status"],props=data["props"], state_text=data["instance"].get_state()) def get_types2(self): result = {} for key, value in self.types.items(): result[key] = dict(name=value.get("name"), properties=value.get("properties"), actions=value.get("actions")) return result def get_state(self): return {"basic": self.basic_data, "profile": list(map(lambda x: self.create_dict(x), self.profile)), "types":self.get_types2()} async def move(self, id, direction): index = self.get_index_by_id(id) if direction not in [-1, 1]: self.logger.error("Cant move. Direction 1 and -1 allowed") return self.profile[index], self.profile[index+direction] = self.profile[index+direction], self.profile[index] await self.save() self.push_udpate() async def delete(self, id): step = self.find_by_id(id) if step.get("status") == "A": logging.error("Cant delete active Step %s", id) return self.profile = list(filter(lambda x: x["id"] != id, self.profile)) await self.save() async def shutdown(self, app): logging.info("Mash Profile Shutdonw") for p in self.profile: instance = p.get("instance") # Stopping all running task if instance.task != None and instance.task.done() is False: logging.info("Stop Step") await instance.stop() await instance.task await self.save() def done(self, task): id, reason = task.result() print("DONE", id, reason) if reason == "MAX_EXCEPTIONS": step_current = self.find_by_id(id) step_current["status"] = "E" self._loop.create_task(self.save()) return if reason == Stop_Reason.NEXT: step_current = self.find_by_status("A") if step_current is not None: step_current["status"] = "D" async def wrapper(): ## TODO DONT CALL SAVE await self.save() await self.start() self._loop.create_task(wrapper()) def find_by_status(self, status): return next((item for item in self.profile if item["status"] == status), None) def find_by_id(self, id): return next((item for item in self.profile if item["id"] == id), None) def get_index_by_id(self, id): return next((i for i, item in enumerate(self.profile) if item["id"] == id), None) def push_udpate(self): self.cbpi.ws.send(dict(topic="step_update", data=list(map(lambda x: self.create_dict(x), self.profile)))) async def start_step(self,step): logging.info("Start Step") try: await step["instance"].start() except Exception as e: print(".........",e) step["status"] = "A" print("STARTED",step) async def update_props(self, id, props): logging.info("SAVE PROPS") step = self.find_by_id(id) step["props"] = props await self.save() self.push_udpate() async def save_basic(self, data): logging.info("SAVE Basic Data") self.basic_data = {**self.basic_data, **data,} await self.save() self.push_udpate()