1
0
Fork 0
mirror of https://github.com/PiBrewing/craftbeerpi4.git synced 2025-01-21 20:06:04 +01:00
craftbeerpi4-pione/cbpi/controller/plugin_controller.py

217 lines
9.3 KiB
Python

import importlib
import logging
import os
import pkgutil
from importlib import import_module
from cbpi.api import *
from cbpi.utils.utils import load_config
from importlib_metadata import metadata, version
logger = logging.getLogger(__name__)
class PluginController():
modules = {}
types = {}
def __init__(self, cbpi):
self.cbpi = cbpi
def load_plugins(self):
this_directory = os.sep.join(
os.path.abspath(__file__).split(os.sep)[:-1])
for filename in os.listdir(os.path.join(this_directory, "../extension")):
if os.path.isdir(
os.path.join(this_directory, "../extension/") + filename) is False or filename == "__pycache__":
continue
try:
logger.info("Trying to load plugin %s" % filename)
data = load_config(os.path.join(
this_directory, "../extension/%s/config.yaml" % filename))
if (data.get("active") is True and data.get("version") == 4):
self.modules[filename] = import_module(
"cbpi.extension.%s" % (filename))
self.modules[filename].setup(self.cbpi)
# logger.info("Plugin %s loaded successful" % filename)
else:
logger.warning(
"Plugin %s is not supporting version 4" % filename)
except Exception as e:
logger.error(e)
def load_plugins_from_evn(self):
discovered_plugins = {
name: importlib.import_module(name)
for finder, name, ispkg
in pkgutil.iter_modules()
if name.startswith('cbpi') and len(name) > 4
}
for key, value in discovered_plugins.items():
from importlib.metadata import version
try:
logger.info("Try to load plugin: {} == {} ".format(
key, version(key)))
value.setup(self.cbpi)
logger.info("Plugin {} loaded successfully".format(key))
except Exception as e:
logger.error("FAILED to load plugin {} ".format(key))
logger.error(e)
def register(self, name, clazz) -> None:
'''
Register a new actor type
:param name: actor name
:param clazz: actor class
:return: None
'''
logger.debug("Register %s Class %s" % (name, clazz.__name__))
if issubclass(clazz, CBPiActor):
self.cbpi.actor.types[name] = self._parse_step_props(clazz, name)
if issubclass(clazz, CBPiKettleLogic):
self.cbpi.kettle.types[name] = self._parse_step_props(clazz, name)
if issubclass(clazz, CBPiFermenterLogic):
self.cbpi.fermenter.types[name] = self._parse_step_props(
clazz, name)
if issubclass(clazz, CBPiSensor):
self.cbpi.sensor.types[name] = self._parse_step_props(clazz, name)
if issubclass(clazz, CBPiStep):
self.cbpi.step.types[name] = self._parse_step_props(clazz, name)
if issubclass(clazz, CBPiFermentationStep):
self.cbpi.fermenter.steptypes[name] = self._parse_step_props(
clazz, name)
if issubclass(clazz, CBPiExtension):
self.c = clazz(self.cbpi)
def _parse_property_object(self, p):
if isinstance(p, Property.Number):
return {"label": p.label, "type": "number", "configurable": p.configurable, "description": p.description,
"default_value": p.default_value}
elif isinstance(p, Property.Text):
return {"label": p.label, "type": "text", "configurable": p.configurable, "default_value": p.default_value,
"description": p.description}
elif isinstance(p, Property.Select):
return {"label": p.label, "type": "select", "configurable": True, "options": p.options,
"description": p.description}
elif isinstance(p, Property.Actor):
return {"label": p.label, "type": "actor", "configurable": p.configurable, "description": p.description}
elif isinstance(p, Property.Sensor):
return {"label": p.label, "type": "sensor", "configurable": p.configurable, "description": p.description}
elif isinstance(p, Property.Kettle):
return {"label": p.label, "type": "kettle", "configurable": p.configurable, "description": p.description}
elif isinstance(p, Property.Fermenter):
return {"label": p.label, "type": "fermenter", "configurable": p.configurable, "description": p.description}
def _parse_step_props(self, cls, name):
result = {"name": name, "class": cls, "properties": [], "actions": []}
if hasattr(cls, "cbpi_parameters"):
parameters = []
for p in cls.cbpi_parameters:
parameters.append(self._parse_property_object(p))
result["properties"] = parameters
for method_name, method in cls.__dict__.items():
if hasattr(method, "action"):
key = method.__getattribute__("key")
parameters = []
for p in method.__getattribute__("parameters"):
parameters.append(self._parse_property_object(p))
result["actions"].append(
{"method": method_name, "label": key, "parameters": parameters})
return result
def _parse_props(self, cls):
name = cls.__name__
result = {"name": name, "class": cls, "properties": [], "actions": []}
tmpObj = cls(cbpi=None, managed_fields=None)
members = [attr for attr in dir(tmpObj) if not callable(
getattr(tmpObj, attr)) and not attr.startswith("__")]
for m in members:
if isinstance(tmpObj.__getattribute__(m), Property.Number):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "number", "configurable": t.configurable,
"description": t.description, "default_value": t.default_value})
elif isinstance(tmpObj.__getattribute__(m), Property.Text):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "text", "configurable": t.configurable,
"default_value": t.default_value, "description": t.description})
elif isinstance(tmpObj.__getattribute__(m), Property.Select):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "select", "configurable": True, "options": t.options,
"description": t.description})
elif isinstance(tmpObj.__getattribute__(m), Property.Actor):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "actor", "configurable": t.configurable,
"description": t.description})
elif isinstance(tmpObj.__getattribute__(m), Property.Sensor):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "sensor", "configurable": t.configurable,
"description": t.description})
elif isinstance(tmpObj.__getattribute__(m), Property.Kettle):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "kettle", "configurable": t.configurable,
"description": t.description})
elif isinstance(tmpObj.__getattribute__(m), Property.Fermenter):
t = tmpObj.__getattribute__(m)
result["properties"].append(
{"name": m, "label": t.label, "type": "fermenter", "configurable": t.configurable,
"description": t.description})
for method_name, method in cls.__dict__.items():
if hasattr(method, "action"):
key = method.__getattribute__("key")
parameters = method.__getattribute__("parameters")
result["actions"].append(
{"method": method_name, "label": key, "parameters": parameters})
return result
async def load_plugin_list(self):
result = []
try:
discovered_plugins = {
name: importlib.import_module(name)
for finder, name, ispkg
in pkgutil.iter_modules()
if name.startswith('cbpi') and len(name) > 4
}
for key, module in discovered_plugins.items():
from importlib.metadata import version
try:
from importlib.metadata import (distribution, metadata,
version)
meta = metadata(key)
result.append({row: meta[row]
for row in list(metadata(key))})
except Exception as e:
logger.error("FAILED to load plugin {} ".format(key))
logger.error(e)
except Exception as e:
logger.error(e)
return []
return result