event bus changed to async

This commit is contained in:
manuel83 2018-12-13 21:45:33 +01:00
parent a8060fff08
commit 5e701e6d61
21 changed files with 799 additions and 758 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,5 @@
from abc import ABCMeta
__all__ = ["CBPiActor"]
import logging
@ -6,7 +8,13 @@ from core.api.extension import CBPiExtension
logger = logging.getLogger(__file__)
class CBPiActor(CBPiExtension):
class CBPiActor(CBPiExtension, metaclass=ABCMeta):
def init(self):
pass
def stop(self):
pass
def on(self, power):
'''
@ -35,3 +43,6 @@ class CBPiActor(CBPiExtension):
'''
pass
def reprJSON(self):
return dict(state=True)

View file

@ -1,7 +1,7 @@
import logging
import os
import sys
from core.utils.utils import load_config as load
__all__ = ["CBPiExtension"]
@ -37,6 +37,7 @@ class CBPiExtension():
super(CBPiExtension, self).__setattr__(name, value)
def load_config(self):
from core.utils.utils import load_config as load
path = os.path.dirname(sys.modules[self.__class__.__module__].__file__)
try:
return load("%s/config.yaml" % path)

View file

@ -1,3 +1,6 @@
import pprint
from asyncio import Future
import asyncio
from aiohttp import web
from core.api.actor import CBPiActor
@ -17,7 +20,11 @@ class ActorHttp(HttpAPI):
:return:
"""
id = int(request.match_info['id'])
self.cbpi.bus.fire(topic="actor/%s/switch/on" % id, id=id, power=99)
result = await self.cbpi.bus.fire2(topic="actor/%s/switch/on" % id, id=id, power=99)
print(result.timeout)
for key, value in result.results.items():
print(key, value.result)
return web.Response(status=204)
@ -28,7 +35,7 @@ class ActorHttp(HttpAPI):
:return:
"""
id = int(request.match_info['id'])
self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id)
await self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id)
return web.Response(status=204)
@request_mapping(path="/{id:\d+}/toggle", auth_required=False)
@ -39,7 +46,7 @@ class ActorHttp(HttpAPI):
"""
id = int(request.match_info['id'])
print("ID", id)
self.cbpi.bus.fire(topic="actor/%s/toggle" % id, id=id)
await self.cbpi.bus.fire(topic="actor/%s/toggle" % id, id=id)
return web.Response(status=204)
class ActorController(ActorHttp, CRUDController):
@ -58,16 +65,6 @@ class ActorController(ActorHttp, CRUDController):
self.types = {}
self.actors = {}
def register(self, name, clazz) -> None:
print("REGISTER", name)
if issubclass(clazz, CBPiActor):
print("ITS AN ACTOR")
parse_props(clazz)
self.types[name] = clazz
async def init(self):
'''
This method initializes all actors during startup. It creates actor instances
@ -76,25 +73,28 @@ class ActorController(ActorHttp, CRUDController):
'''
await super(ActorController, self).init()
for name, clazz in self.types.items():
print("Type", name)
for id, value in self.cache.items():
await self._init_actor(value)
if value.type in self.types:
cfg = value.config.copy()
async def _init_actor(self, actor):
if actor.type in self.types:
cfg = actor.config.copy()
cfg.update(dict(cbpi=self.cbpi, id=id, name=actor.name))
clazz = self.types[actor.type]["class"];
cfg.update(dict(cbpi=self.cbpi, id=id, name=value.name))
clazz = self.types[value.type]["class"];
self.cache[actor.id].instance = clazz(**cfg)
self.cache[actor.id].instance.init()
await self.cbpi.bus.fire(topic="actor/%s/initialized" % actor.id, id=actor.id)
self.cache[id].instance = clazz(**cfg)
print("gpIO", self.cache[id].instance, self.cache[id].instance.gpio)
async def _stop_actor(self, actor):
actor.instance.stop()
await self.cbpi.bus.fire(topic="actor/%s/stopped" % actor.id, id=actor.id)
@on_event(topic="actor/+/switch/on")
def on(self, id , power=100, **kwargs) -> None:
async def on(self, id , future: Future, power=100, **kwargs) -> None:
'''
Method to switch an actor on.
Supporting Event Topic "actor/+/on"
@ -109,11 +109,13 @@ class ActorController(ActorHttp, CRUDController):
if id in self.cache:
print("POWER ON")
actor = self.cache[id ].instance
self.cbpi.bus.fire("actor/%s/on/ok" % id)
await self.cbpi.bus.fire("actor/%s/on/ok" % id)
actor.on(power)
future.set_result("OK")
@on_event(topic="actor/+/toggle")
def toggle(self, id, power=100, **kwargs) -> None:
async def toggle(self, id, power=100, **kwargs) -> None:
'''
Method to toggle an actor on or off
Supporting Event Topic "actor/+/toggle"
@ -132,7 +134,7 @@ class ActorController(ActorHttp, CRUDController):
actor.on()
@on_event(topic="actor/+/off")
def off(self, id, **kwargs) -> None:
async def off(self, id, **kwargs) -> None:
"""
Method to switch and actor off
@ -147,3 +149,27 @@ class ActorController(ActorHttp, CRUDController):
if id in self.cache:
actor = self.cache[id].instance
actor.off()
async def _post_add_callback(self, m):
'''
:param m:
:return:
'''
await self._init_actor(m)
pass
async def _pre_delete_callback(self, actor_id):
if int(actor_id) not in self.cache:
return
if self.cache[int(actor_id)].instance is not None:
await self._stop_actor(self.cache[int(actor_id)])
async def _pre_update_callback(self, actor):
if actor.instance is not None:
await self._stop_actor(actor)
async def _post_update_callback(self, actor):
self._init_actor(actor)

View file

@ -1,5 +1,10 @@
import json
import pprint
from abc import abstractmethod,ABCMeta
from core.utils.encoder import ComplexEncoder
class CRUDController(metaclass=ABCMeta):
@ -61,8 +66,11 @@ class CRUDController(metaclass=ABCMeta):
'''
await self._pre_add_callback(data)
m = await self.model.insert(**data)
await self._post_add_callback(m)
self.cache[m.id] = m
await self._post_add_callback(m)
await self.cbpi.bus.fire(topic="actor/%s/added" % m.id, actor=m)
return m
async def _pre_update_callback(self, m):
@ -117,21 +125,29 @@ class CRUDController(metaclass=ABCMeta):
async def delete(self, id):
'''
:param id:
:return:
'''
await self._pre_delete_callback(id)
if id not in self.cache:
return
m = await self.model.delete(id)
await self._post_delete_callback(id)
try:
if self.caching is True:
del self.cache[id]
print("DELTE FROM ACHE")
del self.cache[int(id)]
except Exception as e:
print(e)
pass
#self.cbpi.push("DELETE_%s" % self.key, id)
pprint.pprint(self.cache)
await self.cbpi.bus.fire(topic="actor/%s/deleted" % id, id=id)
async def delete_all(self):
'''

View file

@ -72,11 +72,11 @@ class KettleController(CRUDController):
if kettle.logic is None:
return (False, "No Logic defined")
id = kettle.heater
self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id)
await self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id)
return (True, "Logic switched on switched")
@on_event(topic="job/done")
def job_stop(self, key, **kwargs) -> None:
async def job_stop(self, key, **kwargs) -> None:
match = re.match("kettle_logic_(\d+)", key)
if match is not None:
@ -136,7 +136,7 @@ class KettleController(CRUDController):
if kettle.heater is None:
return (False, "No Heater defined")
id = kettle.heater
self.cbpi.bus.fire(topic="actor/%s/on" % id, id=id, power=99)
await self.cbpi.bus.fire(topic="actor/%s/on" % id, id=id, power=99)
return (True,"Heater switched on")
async def heater_off(self, id):
@ -153,7 +153,7 @@ class KettleController(CRUDController):
if kettle.heater is None:
return (False, "No Heater defined")
id = kettle.heater
self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id, power=99)
await self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id, power=99)
return (True, "Heater switched off")
async def agitator_on(self, id):

View file

@ -17,5 +17,5 @@ class NotificationController():
@on_event(topic="notification/#")
def _on_event(self, key, message, type, **kwargs):
async def _on_event(self, key, message, type, **kwargs):
self.cbpi.ws.send("YES")

View file

@ -42,7 +42,7 @@ class StepController(HttpAPI, CRUDController):
:param request: web requset
:return: web.Response(text="OK"
'''
self.cbpi.bus.fire("step/action", action="test")
await self.cbpi.bus.fire("step/action", action="test")
return web.Response(text="OK")
@ -56,7 +56,7 @@ class StepController(HttpAPI, CRUDController):
:return:
'''
self.cbpi.bus.fire("step/start")
await self.cbpi.bus.fire("step/start")
return web.Response(text="OK")
@request_mapping(path="/reset", auth_required=False)
@ -67,7 +67,7 @@ class StepController(HttpAPI, CRUDController):
:param request:
:return:
'''
self.cbpi.bus.fire("step/reset")
await self.cbpi.bus.fire("step/reset")
return web.Response(text="OK")
@request_mapping(path="/next", auth_required=False)
@ -79,11 +79,11 @@ class StepController(HttpAPI, CRUDController):
:param request:
:return:
'''
self.cbpi.bus.fire("step/next")
await self.cbpi.bus.fire("step/next")
return web.Response(text="OK")
@on_event("step/action")
def handle_action(self, action, **kwargs):
async def handle_action(self, action, **kwargs):
'''
Event Handler for "step/action".
@ -99,7 +99,7 @@ class StepController(HttpAPI, CRUDController):
@on_event("step/next")
def handle_next(self, **kwargs):
async def handle_next(self, **kwargs):
'''
Event Handler for "step/next".
It start the next step
@ -114,7 +114,7 @@ class StepController(HttpAPI, CRUDController):
@on_event("step/start")
def handle_start(self, **kwargs):
async def handle_start(self, **kwargs):
'''
Event Handler for "step/start".
It starts the brewing process
@ -122,10 +122,10 @@ class StepController(HttpAPI, CRUDController):
:param kwargs:
:return: None
'''
self.start()
await self.start()
@on_event("step/reset")
def handle_reset(self, **kwargs):
async def handle_reset(self, **kwargs):
'''
Event Handler for "step/reset".
Resets the current step
@ -142,10 +142,10 @@ class StepController(HttpAPI, CRUDController):
self.steps[self.current_step.id]["state"] = None
self.current_step = None
self.current_task = None
self.start()
await self.start()
@on_event("step/stop")
def handle_stop(self, **kwargs):
async def handle_stop(self, **kwargs):
'''
Event Handler for "step/stop".
Stops the current step
@ -163,7 +163,7 @@ class StepController(HttpAPI, CRUDController):
self.current_step = None
@on_event("step/+/done")
def handle_done(self, topic, **kwargs):
async def handle_done(self, topic, **kwargs):
'''
Event Handler for "step/+/done".
@ -173,7 +173,7 @@ class StepController(HttpAPI, CRUDController):
:param kwargs:
:return:
'''
self.start()
await self.start()
def _step_done(self, task):
@ -181,7 +181,7 @@ class StepController(HttpAPI, CRUDController):
self.cache[self.current_step.id].state = "D"
step_id = self.current_step.id
self.current_step = None
self.cbpi.bus.fire("step/%s/done" % step_id)
self.cbpi.bus.sync_fire("step/%s/done" % step_id)
def _get_manged_fields_as_array(self, type_cfg):
print("tYPE", type_cfg)
@ -191,7 +191,7 @@ class StepController(HttpAPI, CRUDController):
return result
def start(self):
async def start(self):
'''
Start the first step
@ -215,7 +215,7 @@ class StepController(HttpAPI, CRUDController):
open_step = True
break
if open_step == False:
self.cbpi.bus.fire("step/berwing/finished")
await self.cbpi.bus.fire("step/berwing/finished")
async def stop(self):
pass

View file

@ -222,7 +222,7 @@ class CraftBeerPi():
:param type: notification type (info,warning,danger,successs)
:return:
'''
self.bus.fire(topic="notification/%s" % key, key=key, message=message, type=type)
self.bus.sync_fire(topic="notification/%s" % key, key=key, message=message, type=type)
def setup(self):

View file

@ -7,6 +7,8 @@ class ActorModel(DBModel):
__json_fields__ = ["config"]
class SensorModel(DBModel):
__fields__ = ["name", "type", "config"]
__table_name__ = "sensor"

View file

@ -3,6 +3,9 @@ import inspect
import logging
import json
import time
class EventBus(object):
class Node(object):
__slots__ = '_children', '_content'
@ -12,15 +15,37 @@ class EventBus(object):
self._content = None
class Content(object):
def __init__(self, parent, topic, method, once):
def __init__(self, parent, topic, method, once, supports_future=False):
self.parent = parent
self.method = method
self.name = method.__name__
self.once = once
self.topic = topic
self.supports_future = supports_future
class Result():
def __init__(self, result, timeout):
self.result = result
self.timeout = timeout
class ResultContainer():
def __init__(self, results, timeout=False):
self.results = {}
self.timeout = timeout
for key, value in results.items():
if value.done() is True:
self.results[key] = EventBus.Result(value.result(), True)
else:
self.results[key] = EventBus.Result(None, False)
def register(self, topic, method, once=False):
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
@ -31,7 +56,14 @@ class EventBus(object):
if not isinstance(node._content, list):
node._content = []
c = self.Content(node, topic, method, once)
sig = inspect.signature(method)
if "future" in sig.parameters:
supports_future = True
else:
supports_future = False
c = self.Content(node, topic, method, once, supports_future)
node._content.append(c)
self.registry[method] = c
@ -71,29 +103,36 @@ class EventBus(object):
print(self.loop)
def fire(self, topic: str, **kwargs) -> None:
self.logger.info("EMIT EVENT %s Data: %s", topic, kwargs)
def sync_fire(self,topic: str,timeout=1, **kwargs):
self.loop.create_task(self.fire(topic=topic, timeout=timeout, **kwargs))
async def fire(self, topic: str, timeout=1, **kwargs):
futures = {}
async def wait(futures):
if(len(futures) > 0):
await asyncio.wait(futures.values())
#self.cbpi.ws.send(json.dumps(dict(topic=topic, data=dict(**kwargs))))
trx = dict(i=0)
for e in self.iter_match(topic):
content_array = e
keep_idx = []
for idx, content_obj in enumerate(content_array):
if inspect.iscoroutinefunction(content_obj.method):
if hasattr(content_obj.method, "future"):
if content_obj.supports_future is True:
fut = self.loop.create_future()
futures["%s.%s" % (content_obj.method.__module__, content_obj.name)] = fut
self.loop.create_task(content_obj.method(**kwargs, topic = topic, future=fut))
self.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic))
else:
self.loop.create_task(content_obj.method(**kwargs, topic = topic))
self.loop.create_task(content_obj.method(**kwargs, topic=topic))
else:
if hasattr(content_obj.method, "future"):
content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)
else:
content_obj.method(**kwargs, topic = topic)
# only asnyc
pass
if content_obj.once is False:
keep_idx.append(idx)
@ -101,6 +140,13 @@ class EventBus(object):
if len(keep_idx) < len(e):
e[0].parent._content = [e[0].parent._content[i] for i in keep_idx]
if timeout is not None:
try:
await asyncio.wait_for(wait(futures), timeout=timeout)
is_timedout = False
except asyncio.TimeoutError:
is_timedout = True
return self.ResultContainer(futures, is_timedout)
def dump(self):

View file

@ -31,14 +31,15 @@ class MyComp(CBPiExtension, CRUDController, HttpAPI):
@on_event(topic="actor/#")
def listen(self, **kwargs):
async def listen(self, **kwargs):
print("Test", kwargs)
@on_event(topic="kettle/+/automatic")
def listen2(self, **kwargs):
async def listen2(self, **kwargs):
print("HANDLE AUTOMATIC", kwargs)
self.cbpi.bus.fire(topic="actor/%s/toggle" % 1, id=1)
await self.cbpi.bus.fire(topic="actor/%s/toggle" % 1, id=1)
def setup(cbpi):

View file

@ -8,6 +8,12 @@ class CustomActor(CBPiActor):
# Custom property which can be configured by the user
gpio = Property.Number(label="Test")
def init(self):
print("#########INIT MY CUSTOM ACTOR")
def stop(self):
print("#########STOP MY CUSTOM ACTOR")
@action(key="name", parameters={})
def myAction(self):

View file

@ -37,7 +37,7 @@ class CustomSensor(CBPiSensor):
await asyncio.sleep(self.interval)
self.value = self.value + 1
cbpi.bus.fire("sensor/%s" % self.id, value=self.value)
await cbpi.bus.fire("sensor/%s" % self.id, value=self.value)
print("SENSOR IS RUNNING", self.value)

View file

@ -130,4 +130,4 @@ class HttpAPI():
"""
id = request.match_info['id']
await self.delete(id)
return web.Response(str=204)
return web.Response(status=204)

View file

@ -114,7 +114,7 @@ class Scheduler(*bases):
def _done(self, job):
print("JOB DONE")
self.cbpi.bus.fire("job/done", key=job.name)
self.cbpi.bus.sync_fire("job/done", key=job.name)
self._jobs.discard(job)
if not self.pending_count:
return

View file

@ -10,13 +10,14 @@ class ComplexEncoder(JSONEncoder):
from core.database.model import ActorModel
from core.database.orm_framework import DBModel
from core.api.kettle_logic import CBPiKettleLogic
print("OBJECT", obj)
try:
if isinstance(obj, DBModel):
return obj.__dict__
elif isinstance(obj, ActorModel):
return None
elif callable(getattr(obj, "reprJSON")):
return obj.reprJSON()
#elif isinstance(obj, ActorModel):
# return None
elif hasattr(obj, "callback"):
return obj()
else:

View file

@ -21,8 +21,9 @@ class WebSocket:
async def listen(self, topic, **kwargs):
print("WS", topic)
self.send(json.dumps(dict(topic=topic, data=dict(**kwargs))))
from core.utils.encoder import ComplexEncoder
self.send(json.dumps(dict(topic=topic, data=dict(**kwargs)),skipkeys=True, check_circular=True, cls=ComplexEncoder))
def send(self, data):
@ -64,7 +65,7 @@ class WebSocket:
else:
msg_obj = msg.json()
self.cbpi.bus.fire(msg_obj["topic"], id=1, power=22)
await self.cbpi.bus.fire(msg_obj["topic"], id=1, power=22)
# await self.fire(msg_obj["key"], ws, msg)
# await ws.send_str(msg.data)

Binary file not shown.

View file

@ -36,6 +36,8 @@ Here an example how listen on an event.
It's imporante to add **kwargs as parameter to the listening method. This makes sure that maybe addtional event paramenter are not causing an exception.
A list of all registered events listeners can be found under: `http://<IP_ADDRESS>:<PORT>/system/events`
HTTP Endpoints
--------------
@ -80,6 +82,13 @@ The WebSocket Event is having the following structure.
SQL Files
---------
Currently only one SQL file for database initialisation is available.
It's located under: `./core/sql`
Web User Interface
^^^^^^^^^^^^^^^^^^
The Web UI is based on ReactJS + Redux.
@ -98,3 +107,8 @@ After server startup you can find the API documentaiton under: `http://<IP_ADDRE
To generate the swagger file `aiohttp-swagger` is used. for more information see: https://aiohttp-swagger.readthedocs.io/en/latest/
Custom Extensions & Pluins
^^^^^^^^^^^^^^^^^^^^^^^^^^
Custom Extension should be placed under `./core/extensions`

View file

@ -14,6 +14,15 @@ class KettleTestCase(AioHTTPTestCase):
@unittest_run_loop
async def test_example(self):
await asyncio.sleep(10)
for i in range(100):
resp = await self.client.request("GET", "/actor/")
print(resp)
resp = await self.client.post(path="/actor/", json={ "name": "Test", "type": "CustomActor", "config": {"gpio": 22 }})
print(resp)
'''
result = await self.cbpi.kettle.toggle_automtic(1)
print("#### RESULT", result)
assert result[0] is True
@ -28,3 +37,4 @@ class KettleTestCase(AioHTTPTestCase):
await asyncio.sleep(5)
#assert await self.cbpi.kettle.toggle_automtic(1) is True
#assert await self.cbpi.kettle.toggle_automtic(99) is False
'''