changes in plugin controller

This commit is contained in:
manuel83 2019-07-31 07:58:54 +02:00
parent 8ba6b4c506
commit 8db6251fb7
22 changed files with 1028 additions and 807 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,7 @@
from functools import wraps
from voluptuous import Schema
__all__ = ["request_mapping", "on_startup", "on_event", "on_mqtt_message", "on_websocket_message", "action", "background_task"]
from aiohttp_auth import auth
@ -9,7 +13,7 @@ def composed(*decs):
return f
return deco
def request_mapping(path, name=None, method="GET", auth_required=True):
def request_mapping(path, name=None, method="GET", auth_required=True, json_schema=None):
def on_http_request(path, name=None):
def real_decorator(func):
@ -21,14 +25,34 @@ def request_mapping(path, name=None, method="GET", auth_required=True):
return real_decorator
def validate_json_body(func):
@wraps(func)
async def wrapper(*args):
if json_schema is not None:
data = await args[-1].json()
schema = Schema(json_schema)
schema(data)
return await func(*args)
return wrapper
if auth_required is True:
return composed(
on_http_request(path, name),
auth.auth_required
on_http_request(path, name),
auth.auth_required,
validate_json_body
)
else:
return composed(
on_http_request(path, name)
on_http_request(path, name),
validate_json_body
)
def on_websocket_message(path, name=None):

View file

@ -1,19 +1,15 @@
import asyncio
import logging
import os
from importlib import import_module
import datetime
import aiohttp
import yaml
from aiohttp import web
from cbpi.api import *
from cbpi.utils.utils import load_config, json_dumps
logger = logging.getLogger(__name__)
import subprocess
import sys
from cbpi.api import *
from cbpi.utils.utils import load_config
logger = logging.getLogger(__name__)
class PluginController():
modules = {}
@ -21,53 +17,87 @@ class PluginController():
def __init__(self, cbpi):
self.cbpi = cbpi
self.cbpi.register(self, "/plugin")
@classmethod
self.plugins = {}
self.plugins = load_config("./config/plugin_list.txt")
async def load_plugin_list(self):
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/Manuel83/craftbeerpi-plugins/master/plugins_v4.yaml') as resp:
async with session.get('http://localhost:2202/list') as resp:
if (resp.status == 200):
data = yaml.load(await resp.text())
self.plugins = data
return data
def installed_plugins(self):
return self.plugins
async def install(self, package_name):
async def install(cbpi, plugins, package_name):
data = subprocess.check_output([sys.executable, "-m", "pip", "install", package_name])
data = data.decode('UTF-8')
if package_name not in self.plugins:
now = datetime.datetime.now()
self.plugins[package_name] = dict(version="1.0", installation_date=now.strftime("%Y-%m-%d %H:%M:%S"))
with open('./config/plugin_list.txt', 'w') as outfile:
yaml.dump(self.plugins, outfile, default_flow_style=False)
if data.startswith('Requirement already satisfied'):
self.cbpi.notify(key="p", message="Plugin already installed ", type="warning")
else:
self.cbpi.notify(key="p", message="Plugin installed ", type="success")
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost:2202/get/%s' % package_name) as resp:
if (resp.status == 200):
data = await resp.json()
await self.cbpi.job.start_job(install(self.cbpi, self.plugins, data["package_name"]), data["package_name"], "plugins_install")
return True
else:
self.cbpi.notify(key="p", message="Failed to install Plugin %s " % package_name, type="danger")
return False
async def uninstall(self, package_name):
async def uninstall(cbpi, plugins, package_name):
print("try to uninstall", package_name)
try:
data = subprocess.check_output([sys.executable, "-m", "pip", "uninstall", "-y", package_name])
data = data.decode('UTF-8')
if data.startswith("Successfully uninstalled"):
cbpi.notify(key="p", message="Plugin %s Uninstalled" % package_name, type="success")
else:
cbpi.notify(key="p", message=data, type="success")
except Exception as e:
print(e)
if package_name in self.plugins:
print("Uninstall", self.plugins[package_name])
await self.cbpi.job.start_job(uninstall(self.cbpi, self.plugins, package_name), package_name, "plugins_uninstall")
def load_plugins(self):
this_directory = os.path.dirname(__file__)
for filename in os.listdir(os.path.join(this_directory, "../extension")):
if os.path.isdir(os.path.join(this_directory, "../extension/") + filename) is False or filename == "__pycache__":
continue
try:
logger.info("Trying to load plugin %s" % filename)
data = load_config(os.path.join(this_directory, "../extension/%s/config.yaml" % filename))
if (data.get("version") == 4):
self.modules[filename] = import_module("cbpi.extension.%s" % (filename))
self.modules[filename].setup(self.cbpi)
logger.info("Plugin %s loaded successful" % filename)
else:
logger.warning("Plugin %s is not supporting version 4" % filename)
except Exception as e:
print(e)
logger.error(e)
def load_plugins_from_evn(self):
plugins = []
this_directory = os.path.dirname(__file__)
with open("./config/plugin_list.txt") as f:
plugins = f.read().splitlines()
plugins = list(set(plugins))
for p in plugins:
for p in self.plugins:
logger.debug("Load Plugin %s" % p)
try:
logger.info("Try to load plugin: %s " % p)
@ -79,53 +109,6 @@ class PluginController():
logger.error("FAILED to load plugin %s " % p)
logger.error(e)
@on_event("job/plugins_install/done")
async def done(self, **kwargs):
self.cbpi.notify(key="p", message="Plugin installed ", type="success")
print("DONE INSTALL PLUGIN", kwargs)
@request_mapping(path="/install", method="GET", auth_required=False)
async def install_plugin(self, request):
"""
---
description: Install Plugin
tags:
- Plugin
produces:
- application/json
responses:
"204":
description: successful operation. Return "pong" text
"405":
description: invalid HTTP Method
"""
async def install(name):
await asyncio.sleep(5)
subprocess.call([sys.executable, "-m", "pip", "install", name])
print("OK")
await self.cbpi.job.start_job(install('requests'), "requests", "plugins_install")
return web.Response(status=204)
@request_mapping(path="/list", method="GET", auth_required=False)
async def get_plugins(self, request):
"""
---
description: Get a list of avialable plugins
tags:
- Plugin
produces:
- application/json
responses:
"200":
description: successful operation. Return "pong" text
"405":
description: invalid HTTP Method
"""
return web.json_response(await self.load_plugin_list(), dumps=json_dumps)
def register(self, name, clazz) -> None:
'''
Register a new actor type

View file

@ -1,9 +1,11 @@
import json
import logging
import time
from cbpi.api import *
from cbpi.controller.crud_controller import CRUDController
from cbpi.database.model import StepModel
from utils.encoder import ComplexEncoder
class StepController(CRUDController):
@ -31,20 +33,7 @@ class StepController(CRUDController):
else:
return False
@on_event("step/action")
async def handle_action(self, action, **kwargs):
'''
Event Handler for "step/action".
It invokes the provided method name on the current step
:param action: the method name which will be invoked
:param kwargs:
:return: None
'''
if self.current_step is not None:
self.current_step.__getattribute__(action)()
def _get_manged_fields_as_array(self, type_cfg):
@ -106,7 +95,7 @@ class StepController(CRUDController):
self.current_step = next_step
# start the step job
self.current_job = await self.cbpi.job.start_job(self.current_step.instance.run(), next_step.name, "step")
await self.cbpi.bus.fire("step/%s/started" % self.current_step.id)
await self.cbpi.bus.fire("step/%s/started" % self.current_step.id, step=next_step)
else:
await self.cbpi.bus.fire("step/brewing/finished")
else:
@ -132,6 +121,7 @@ class StepController(CRUDController):
self.current_step.state = "D"
await self.model.update_state(self.current_step.id, "D", int(time.time()))
await self.cbpi.bus.fire("step/%s/done" % self.current_step.id, step=self.current_step)
self.current_step = None
# start the next step
@ -197,3 +187,10 @@ class StepController(CRUDController):
async def get_state(self):
return dict(items=await self.get_all(),types=self.types,is_running=self.is_running(),current_step=self.current_step)
@on_event(topic="step/action")
async def call_action(self, name, parameter, **kwargs) -> None:
print(name, parameter)
if self.current_step is not None:
self.current_step.instance.__getattribute__(name)(**parameter)

View file

@ -1,6 +1,7 @@
import datetime
import re
import aiohttp
from aiohttp import web
import os
from aiojobs.aiohttp import get_scheduler_from_app
@ -15,7 +16,20 @@ class SystemController():
def __init__(self, cbpi):
self.cbpi = cbpi
self.service = cbpi.actor
self.cbpi.register(self, "/system")
self.cbpi.app.on_startup.append(self.check_for_update)
async def check_for_update(self, app):
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post('http://localhost:2202/check', json=dict(version=app["cbpi"].version)) as resp:
if (resp.status == 200):
data = await resp.json()
print(data)
@request_mapping("/", method="GET", auth_required=False)
async def state(self, request):

View file

@ -32,6 +32,7 @@ from cbpi.http_endpoints.http_sensor import SensorHttpEndpoints
from cbpi.http_endpoints.http_step import StepHttpEndpoints
from cbpi.controller.translation_controller import TranslationController
from cbpi.http_endpoints.http_translation import TranslationHttpEndpoint
from http_endpoints.http_plugin import PluginHttpEndpoints
logger = logging.getLogger(__name__)
@ -61,6 +62,8 @@ class CraftBeerPi():
def __init__(self):
self.version = "4.0.0.1"
self.static_config = load_config("./config/config.yaml")
self.database_file = "./craftbeerpi.db"
logger.info("Init CraftBeerPI")
@ -68,6 +71,7 @@ class CraftBeerPi():
policy = auth.SessionTktAuthentication(urandom(32), 60, include_ip=True)
middlewares = [web.normalize_path_middleware(), session_middleware(EncryptedCookieStorage(urandom(32))), auth.auth_middleware(policy), error_middleware]
self.app = web.Application(middlewares=middlewares)
self.app["cbpi"] = self
self._setup_shutdownhook()
self.initializer = []
@ -94,7 +98,7 @@ class CraftBeerPi():
self.http_kettle = KettleHttpEndpoints(self)
self.http_dashboard = DashBoardHttpEndpoints(self)
self.http_translation = TranslationHttpEndpoint(self)
self.http_plugin = PluginHttpEndpoints(self)
self.notification = NotificationController(self)
self.login = Login(self)
@ -125,7 +129,7 @@ class CraftBeerPi():
'''
self.register_http_endpoints(obj, url_prefix, static)
self.bus.register_object(obj)
#self.ws.register_object(obj)
#self.ws.register_object(obj)
self.job.register_background_task(obj)
self.register_on_startup(obj)

View file

@ -82,7 +82,7 @@ class HTTPSensor(CBPiSensor):
try:
value = cache.pop(self.key, None)
print("HTTP SENSOR READ", value)
if value is not None:
self.log_data(value)
await cbpi.bus.fire("sensor/%s/data" % self.id, value=value)

View file

@ -0,0 +1,36 @@
import json
from cbpi.utils.encoder import ComplexEncoder
from hbmqtt.mqtt.constants import QOS_0
from hbmqtt.client import MQTTClient
class CBPiMqttClient:
def __init__(self, cbpi):
self.cbpi = cbpi
self.cbpi.bus.register("#", self.listen)
self.client = None
self.cbpi.app.on_startup.append(self.init_client)
async def init_client(self, cbpi):
self.client = MQTTClient()
await self.client.connect('mqtt://localhost:1883')
async def listen(self, topic, **kwargs):
if self.client is not None:
print(topic, kwargs)
await self.client.publish(topic, str.encode(json.dumps(kwargs, cls=ComplexEncoder)), QOS_0)
def setup(cbpi):
'''
This method is called by the server during startup
Here you need to register your plugins at the server
:param cbpi: the cbpi core
:return:
'''
print("MQTT REGISTER-------------")
c = CBPiMqttClient(cbpi)

View file

@ -0,0 +1,2 @@
name: MQTT
version: 4.1

View file

@ -72,7 +72,7 @@ class ActorHttpEndpoints(HttpCrudEndpoints):
- in: body
name: body
description: Created an actor
required: false
required: true
schema:
type: object
properties:

View file

@ -9,7 +9,6 @@ class Login():
def __init__(self,cbpi):
self.cbpi = cbpi
self.cbpi.register(self, url_prefix="/")
self.db = {cbpi.static_config.get("username", "cbpi"): cbpi.static_config.get("password", "cbpi")}
@request_mapping(path="/logout", name="Logout", method="GET", auth_required=True)

View file

@ -0,0 +1,86 @@
from aiohttp import web
from api import request_mapping
from utils import json_dumps
class PluginHttpEndpoints:
def __init__(self,cbpi):
self.cbpi = cbpi
self.cbpi.register(self, url_prefix="/plugin")
@request_mapping(path="/install/", method="POST", auth_required=False, json_schema={"package_name": str})
async def install(self, request):
"""
---
description: Install Plugin
tags:
- Plugin
parameters:
- in: body
name: body
description: Install a plugin
required: true
schema:
type: object
properties:
package_name:
type: string
produces:
- application/json
responses:
"204":
description: successful operation. Return "pong" text
"405":
description: invalid HTTP Method
"""
data = await request.json()
return web.Response(status=204) if await self.cbpi.plugin.install(data["package_name"]) is True else web.Response(status=500)
@request_mapping(path="/uninstall", method="POST", auth_required=False, json_schema={"package_name": str})
async def uninstall(self, request):
"""
---
description: Uninstall Plugin
tags:
- Plugin
parameters:
- in: body
name: body
description: Uninstall a plugin
required: true
schema:
type: object
properties:
package_name:
type: string
produces:
- application/json
responses:
"204":
description: successful operation. Return "pong" text
"405":
description: invalid HTTP Method
"""
data = await request.json()
return web.Response(status=204) if await self.cbpi.plugin.uninstall(data["package_name"]) is True else web.Response(status=500)
@request_mapping(path="/list", method="GET", auth_required=False)
async def list(self, request):
"""
---
description: Get a list of avialable plugins
tags:
- Plugin
produces:
- application/json
responses:
"200":
description: successful operation. Return "pong" text
"405":
description: invalid HTTP Method
"""
return web.json_response(await self.cbpi.plugin.load_plugin_list(), dumps=json_dumps)

View file

@ -159,18 +159,35 @@ class StepHttpEndpoints(HttpCrudEndpoints):
return web.Response(status=204)
@request_mapping(path="/action", auth_required=False)
@request_mapping(path="/action", method="POST", auth_required=False, json_schema={"action": str, "parameter": dict})
async def http_action(self, request):
"""
---
description: Call step action
tags:
- Step
responses:
"204":
description: successful operation
"""
await self.cbpi.bus.fire("step/action", action="test")
---
description: Call Step Action
tags:
- Step
parameters:
- in: body
name: body
description: Step Action
required: true
schema:
type: object
properties:
action:
type: string
parameter:
type: object
produces:
- application/json
responses:
"204":
description: successful operation
"405":
description: invalid HTTP Method
"""
data = await request.json()
await self.cbpi.bus.fire("step/action", name=data["action"], parameter=data["parameter"])
return web.Response(text="OK")
@request_mapping(path="/start", auth_required=False)

View file

View file

@ -1,89 +0,0 @@
from aiojobs.aiohttp import get_scheduler_from_app
from cbpi.mqtt_matcher import MQTTMatcher
from hbmqtt.broker import Broker
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1, QOS_0
from typing import Callable
class MQTT():
def __init__(self,cbpi):
self.config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1885',
},
'ws': {
'bind': '0.0.0.0:8081',
'type': 'ws'
}
},
'sys_interval': 10,
'topic-check': {
'enabled': True,
'plugins': [
'topic_taboo'
]
},
'auth': {
'allow-anonymous': True,
'password-file': '/Users/manuelfritsch/github/aio_sample.cbpi/user.txt'
}
}
self.cbpi = cbpi
self.broker = Broker(self.config, plugin_namespace="hbmqtt.broker.plugins")
self.client = MQTTClient()
self.matcher = MQTTMatcher()
self.mqtt_methods = {"test": self.ok_msg, "$SYS/broker/#": self.sysmsg}
self.cbpi.app.on_startup.append(self.start_broker)
self.count = 0
def sysmsg(self, msg):
pass
def ok_msg(self, msg):
self.count = self.count + 1
def publish(self, topic, message):
self.cbpi.app.loop.create_task(self.client.publish(topic, str.encode(message), QOS_0))
def register_callback(self, func: Callable, topic) -> None:
self.mqtt_methods[topic] = func
async def on_message(self):
while True:
message = await self.client.deliver_message()
matched = False
packet = message.publish_packet
#print(message.topic.split('/'))
data = packet.payload.data.decode("utf-8")
for callback in self.matcher.iter_match(message.topic):
callback(data)
matched = True
if matched == False:
print("NO HANDLER", data)
async def start_broker(self, app):
await self.broker.start()
#
await self.client.connect('mqtt://username:manuel@localhost:1885')
# await self.client.connect('mqtt://broker.hivemq.com:1883')
for k, v in self.mqtt_methods.items():
await self.client.subscribe([(k, QOS_1)])
self.matcher[k] = v
await get_scheduler_from_app(app).spawn(self.on_message())

View file

@ -1,167 +0,0 @@
class MQTTMatcher(object):
class Node(object):
__slots__ = '_children', '_content'
def __init__(self):
self._children = {}
self._content = None
def register(self, key, value):
node = self._root
for sym in key.split('/'):
node = node._children.setdefault(sym, self.Node())
if not isinstance(node._content, list):
node._content = []
node._content.append(value)
def get_callbacks(self, key):
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
def unregister(self, key, method=None):
lst = []
try:
parent, node = None, self._root
for k in key.split('/'):
parent, node = node, node._children[k]
lst.append((parent, k, node))
# TODO
if method is not None:
node._content = None
else:
node._content = None
except KeyError:
raise KeyError(key)
else: # cleanup
for parent, k, node in reversed(lst):
if node._children or node._content is not None:
break
del parent._children[k]
def __init__(self):
self._root = self.Node()
def __setitem__(self, key, value):
node = self._root
for sym in key.split('/'):
node = node._children.setdefault(sym, self.Node())
if not isinstance(node._content, list):
#print("new array")
node._content = []
node._content.append(value)
#node._content = value
def __getitem__(self, key):
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
'''
def __delitem__(self, thekey):
print("DELETE")
if isinstance(thekey, tuple):
key = thekey[1]
methods = thekey[0]
print(methods.__module__, methods.__name__)
else:
methods = None
key = thekey
lst = []
try:
parent, node = None, self._root
for k in key.split('/'):
parent, node = node, node._children[k]
lst.append((parent, k, node))
# TODO
print(node._content)
if methods is not None:
node._content = None
else:
node._content = None
except KeyError:
raise KeyError(key)
else: # cleanup
for parent, k, node in reversed(lst):
if node._children or node._content is not None:
break
del parent._children[k]
'''
def iter_match(self, topic):
lst = topic.split('/')
normal = not topic.startswith('$')
def rec(node, i=0):
if i == len(lst):
if node._content is not None:
yield node._content
else:
part = lst[i]
if part in node._children:
for content in rec(node._children[part], i + 1):
yield content
if '+' in node._children and (normal or i > 0):
for content in rec(node._children['+'], i + 1):
yield content
if '#' in node._children and (normal or i > 0):
content = node._children['#']._content
if content is not None:
yield content
return rec(self._root)
if __name__ == "__main__":
m = MQTTMatcher()
def test_name():
print("actor/1/on")
def test_name2():
print("actor/2/on")
def test_name3():
print("actor/#")
def test_name4():
print("actor/+/on")
m.register("actor/1/on", test_name)
m.register("actor/1/on", test_name)
m.register("actor/1/on", test_name)
m.unregister("actor/1/on")
for methods in m.iter_match("actor/1/on"):
for f in methods:
f()

View file

@ -0,0 +1,14 @@
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
sys_interval: 20
auth:
allow-anonymous: true
plugins:
- auth_file
- auth_anonymous
topic-check:
enabled': True
plugins':
- topic_taboo

View file

@ -0,0 +1,13 @@
SampleActor:
description: A sample Actor for CraftBeerPi
api: 4.0
author: CraftBeerPi11
pip: requests
repo_url: https://github.com/craftbeerpi/sample_actor
SampleActor2:
description: A sample Actor2 for CraftBeerPi
api: 4.0
author: CraftBeerPi
pip: requests
repo_url: https://github.com/craftbeerpi/sample_actor

65
cbpi_cloud/run.py Normal file
View file

@ -0,0 +1,65 @@
import yaml
from aiohttp import web
def load_yaml():
try:
with open('./repo/plugins.yaml', 'rt') as f:
data = yaml.load(f)
return data
except Exception as e:
print(e)
pass
data = load_yaml()
for k, v in data.items():
del v["pip"]
data2 = load_yaml()
async def check(request):
peername = request.transport.get_extra_info('peername')
if peername is not None:
host, port = peername
print(host, port)
data = await request.json()
print(data)
return web.json_response(data=dict(latestversion="4.0.0.3"))
async def reload_yaml(request):
global data, data2
file = load_yaml()
for k, v in file.items():
del v["pip"]
data = file
data2 = load_yaml()
return web.json_response(data=data2)
async def get_list(request):
print("Request List")
return web.json_response(data=data)
async def get_package_name(request):
print("Request Package")
name = request.match_info.get('plugin_name', None)
if name in data2:
package_name = data2[name]["pip"]
else:
package_name = None
return web.json_response(data=dict(package_name=package_name))
app = web.Application()
app.add_routes([
web.get('/list', get_list),
web.post('/check', check),
web.get('/reload', reload_yaml),
web.get('/get/{plugin_name}', get_package_name)])
web.run_app(app, port=2202)

View file

@ -1,2 +1,13 @@
cbpi-actor
cbpi-ui
SampleActor:
api: 4.0
author: CraftBeerPi11
description: A sample Actor for CraftBeerPi
repo_url: https://github.com/craftbeerpi/sample_actor
SampleActor2:
api: 4.0
author: CraftBeerPi
description: A sample Actor2 for CraftBeerPi
repo_url: https://github.com/craftbeerpi/sample_actor
requests:
installation_date: '2019-07-29 23:02:25'
version: '1.0'

Binary file not shown.

View file

@ -1,13 +1,10 @@
import asyncio
from unittest import mock
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
from cbpi.craftbeerpi import CraftBeerPi
class StepTestCase(AioHTTPTestCase):
async def get_application(self):
self.cbpi = CraftBeerPi()
await self.cbpi.init_serivces()
@ -71,14 +68,11 @@ class StepTestCase(AioHTTPTestCase):
if future in done:
pass
@unittest_run_loop
async def test_process(self):
step_ctlr = self.cbpi.step
await step_ctlr.clear_all()
await step_ctlr.add(**{"name": "Kettle1", "type": "CustomStepCBPi", "config": {"name1": "1", "temp": 99}})
await step_ctlr.add(**{"name": "Kettle1", "type": "CustomStepCBPi", "config": {"name1": "1", "temp": 99}})
@ -97,11 +91,9 @@ class StepTestCase(AioHTTPTestCase):
await self.print_steps()
async def print_steps(self):
s = await self.cbpi.step.get_all()
print(s)
for k, v in s.items():
print(k, v.to_json())
print(k, v.to_json())