diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 432355e..5dd97e0 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -2,7 +2,27 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -12,10 +32,10 @@
-
-
-
-
+
+
+
+
@@ -75,105 +95,91 @@
-
-
+
+
-
-
+
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
+
-
-
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
@@ -190,36 +196,36 @@
- _done
- _jobs
- bus
- futur
- gtt
- WOOOOHOOO111111
- DONE
- ONNNNN
- ###### ON 22
- ###### ON 2
- ###### ON
- ###### ON
- ###### O
- wait
- emit
- register
- step
+ *bus.fire(
+ a*bus.fire(
+ as*bus.fire(
+ asyc*bus.fire(
+ asy*bus.fire(
+ async*bus.fire(
+ async *bus.fire(
+ async * bus.fire(
+ async bus.fire(
+ async bus.fire(
+ async. bus.fire(
+ async.* bus.fire(
+ async.*bus.fire(
+ bus.fire(
+ @
+ @on
+ @on_
+ @on_e
+
+ c
cbp
- ######
- l
- logger
- logging
- dummy
- /dumy
- _run
- steps
- StepBase
- theme
- handle(
- load_plugin_list
+ cbpi.
+ cbpi.b
+ cbpi.bu
+ cbpi.bus
+ cbpi.bus.
+ cbpi.bus.f
+ cbpi.bus.fi
+ cbpi.bus.fire
+ cbpi.bus.fire
@@ -229,6 +235,7 @@
$PROJECT_DIR$
$PROJECT_DIR$/core/controller
+ $PROJECT_DIR$/core/api
$PROJECT_DIR$/core
@@ -238,9 +245,7 @@
@@ -299,10 +306,10 @@
DEFINITION_ORDER
-
-
-
-
+
+
+
+
@@ -326,8 +333,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -337,11 +366,19 @@
-
+
+
+
+
+
+
+
+
+
-
+
@@ -350,16 +387,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
+
+
-
+
+
+
+
+
@@ -369,19 +432,15 @@
-
-
+
+
-
-
+
+
-
-
-
-
-
+
@@ -391,15 +450,19 @@
-
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
@@ -409,33 +472,79 @@
-
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
@@ -448,14 +557,6 @@
-
-
-
-
-
-
-
-
@@ -470,110 +571,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -593,32 +590,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -667,38 +638,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1109,9 +1048,9 @@
-
-
-
+
+
+
@@ -1144,36 +1083,36 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1185,17 +1124,18 @@
-
+
+
-
+
-
-
-
-
+
+
+
+
@@ -1250,247 +1190,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1558,25 +1257,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1605,16 +1285,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -1639,16 +1309,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -1657,16 +1317,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -1679,16 +1329,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -1701,14 +1341,6 @@
-
-
-
-
-
-
-
-
@@ -1717,6 +1349,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1725,31 +1391,271 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/core/api/actor.py b/core/api/actor.py
index 7ee190a..4d44790 100644
--- a/core/api/actor.py
+++ b/core/api/actor.py
@@ -1,3 +1,5 @@
+from abc import ABCMeta
+
__all__ = ["CBPiActor"]
import logging
@@ -6,7 +8,13 @@ from core.api.extension import CBPiExtension
logger = logging.getLogger(__file__)
-class CBPiActor(CBPiExtension):
+class CBPiActor(CBPiExtension, metaclass=ABCMeta):
+
+ def init(self):
+ pass
+
+ def stop(self):
+ pass
def on(self, power):
'''
@@ -34,4 +42,7 @@ class CBPiActor(CBPiExtension):
:return:
'''
- pass
\ No newline at end of file
+ pass
+
+ def reprJSON(self):
+ return dict(state=True)
\ No newline at end of file
diff --git a/core/api/extension.py b/core/api/extension.py
index e01f57b..bbc917a 100644
--- a/core/api/extension.py
+++ b/core/api/extension.py
@@ -1,7 +1,7 @@
import logging
import os
import sys
-from core.utils.utils import load_config as load
+
__all__ = ["CBPiExtension"]
@@ -37,6 +37,7 @@ class CBPiExtension():
super(CBPiExtension, self).__setattr__(name, value)
def load_config(self):
+ from core.utils.utils import load_config as load
path = os.path.dirname(sys.modules[self.__class__.__module__].__file__)
try:
return load("%s/config.yaml" % path)
diff --git a/core/controller/actor_controller.py b/core/controller/actor_controller.py
index 908b96f..69c74d7 100644
--- a/core/controller/actor_controller.py
+++ b/core/controller/actor_controller.py
@@ -1,3 +1,6 @@
+import pprint
+from asyncio import Future
+import asyncio
from aiohttp import web
from core.api.actor import CBPiActor
@@ -17,7 +20,11 @@ class ActorHttp(HttpAPI):
:return:
"""
id = int(request.match_info['id'])
- self.cbpi.bus.fire(topic="actor/%s/switch/on" % id, id=id, power=99)
+ result = await self.cbpi.bus.fire2(topic="actor/%s/switch/on" % id, id=id, power=99)
+ print(result.timeout)
+
+ for key, value in result.results.items():
+ print(key, value.result)
return web.Response(status=204)
@@ -28,7 +35,7 @@ class ActorHttp(HttpAPI):
:return:
"""
id = int(request.match_info['id'])
- self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id)
+ await self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id)
return web.Response(status=204)
@request_mapping(path="/{id:\d+}/toggle", auth_required=False)
@@ -39,7 +46,7 @@ class ActorHttp(HttpAPI):
"""
id = int(request.match_info['id'])
print("ID", id)
- self.cbpi.bus.fire(topic="actor/%s/toggle" % id, id=id)
+ await self.cbpi.bus.fire(topic="actor/%s/toggle" % id, id=id)
return web.Response(status=204)
class ActorController(ActorHttp, CRUDController):
@@ -58,16 +65,6 @@ class ActorController(ActorHttp, CRUDController):
self.types = {}
self.actors = {}
-
- def register(self, name, clazz) -> None:
-
- print("REGISTER", name)
- if issubclass(clazz, CBPiActor):
- print("ITS AN ACTOR")
-
- parse_props(clazz)
- self.types[name] = clazz
-
async def init(self):
'''
This method initializes all actors during startup. It creates actor instances
@@ -76,25 +73,28 @@ class ActorController(ActorHttp, CRUDController):
'''
await super(ActorController, self).init()
- for name, clazz in self.types.items():
- print("Type", name)
-
for id, value in self.cache.items():
+ await self._init_actor(value)
- if value.type in self.types:
- cfg = value.config.copy()
+ async def _init_actor(self, actor):
+ if actor.type in self.types:
+ cfg = actor.config.copy()
+ cfg.update(dict(cbpi=self.cbpi, id=id, name=actor.name))
+ clazz = self.types[actor.type]["class"];
- cfg.update(dict(cbpi=self.cbpi, id=id, name=value.name))
- clazz = self.types[value.type]["class"];
+ self.cache[actor.id].instance = clazz(**cfg)
+ self.cache[actor.id].instance.init()
+ await self.cbpi.bus.fire(topic="actor/%s/initialized" % actor.id, id=actor.id)
- self.cache[id].instance = clazz(**cfg)
- print("gpIO", self.cache[id].instance, self.cache[id].instance.gpio)
+ async def _stop_actor(self, actor):
+ actor.instance.stop()
+ await self.cbpi.bus.fire(topic="actor/%s/stopped" % actor.id, id=actor.id)
@on_event(topic="actor/+/switch/on")
- def on(self, id , power=100, **kwargs) -> None:
+ async def on(self, id , future: Future, power=100, **kwargs) -> None:
'''
Method to switch an actor on.
Supporting Event Topic "actor/+/on"
@@ -109,11 +109,13 @@ class ActorController(ActorHttp, CRUDController):
if id in self.cache:
print("POWER ON")
actor = self.cache[id ].instance
- self.cbpi.bus.fire("actor/%s/on/ok" % id)
+ await self.cbpi.bus.fire("actor/%s/on/ok" % id)
actor.on(power)
+ future.set_result("OK")
+
@on_event(topic="actor/+/toggle")
- def toggle(self, id, power=100, **kwargs) -> None:
+ async def toggle(self, id, power=100, **kwargs) -> None:
'''
Method to toggle an actor on or off
Supporting Event Topic "actor/+/toggle"
@@ -132,7 +134,7 @@ class ActorController(ActorHttp, CRUDController):
actor.on()
@on_event(topic="actor/+/off")
- def off(self, id, **kwargs) -> None:
+ async def off(self, id, **kwargs) -> None:
"""
Method to switch and actor off
@@ -147,3 +149,27 @@ class ActorController(ActorHttp, CRUDController):
if id in self.cache:
actor = self.cache[id].instance
actor.off()
+
+ async def _post_add_callback(self, m):
+ '''
+
+ :param m:
+ :return:
+ '''
+ await self._init_actor(m)
+ pass
+
+ async def _pre_delete_callback(self, actor_id):
+ if int(actor_id) not in self.cache:
+ return
+
+ if self.cache[int(actor_id)].instance is not None:
+ await self._stop_actor(self.cache[int(actor_id)])
+
+ async def _pre_update_callback(self, actor):
+
+ if actor.instance is not None:
+ await self._stop_actor(actor)
+
+ async def _post_update_callback(self, actor):
+ self._init_actor(actor)
diff --git a/core/controller/crud_controller.py b/core/controller/crud_controller.py
index 8845017..aa1ef4b 100644
--- a/core/controller/crud_controller.py
+++ b/core/controller/crud_controller.py
@@ -1,5 +1,10 @@
+import json
+import pprint
from abc import abstractmethod,ABCMeta
+from core.utils.encoder import ComplexEncoder
+
+
class CRUDController(metaclass=ABCMeta):
@@ -61,8 +66,11 @@ class CRUDController(metaclass=ABCMeta):
'''
await self._pre_add_callback(data)
m = await self.model.insert(**data)
- await self._post_add_callback(m)
self.cache[m.id] = m
+ await self._post_add_callback(m)
+
+ await self.cbpi.bus.fire(topic="actor/%s/added" % m.id, actor=m)
+
return m
async def _pre_update_callback(self, m):
@@ -117,21 +125,29 @@ class CRUDController(metaclass=ABCMeta):
async def delete(self, id):
+
+
'''
:param id:
:return:
'''
await self._pre_delete_callback(id)
+
+ if id not in self.cache:
+
+ return
m = await self.model.delete(id)
await self._post_delete_callback(id)
try:
if self.caching is True:
- del self.cache[id]
+ print("DELTE FROM ACHE")
+ del self.cache[int(id)]
except Exception as e:
+ print(e)
pass
-
- #self.cbpi.push("DELETE_%s" % self.key, id)
+ pprint.pprint(self.cache)
+ await self.cbpi.bus.fire(topic="actor/%s/deleted" % id, id=id)
async def delete_all(self):
'''
diff --git a/core/controller/kettle_controller.py b/core/controller/kettle_controller.py
index b9d0e7a..f6347e8 100644
--- a/core/controller/kettle_controller.py
+++ b/core/controller/kettle_controller.py
@@ -72,11 +72,11 @@ class KettleController(CRUDController):
if kettle.logic is None:
return (False, "No Logic defined")
id = kettle.heater
- self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id)
+ await self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id)
return (True, "Logic switched on switched")
@on_event(topic="job/done")
- def job_stop(self, key, **kwargs) -> None:
+ async def job_stop(self, key, **kwargs) -> None:
match = re.match("kettle_logic_(\d+)", key)
if match is not None:
@@ -136,7 +136,7 @@ class KettleController(CRUDController):
if kettle.heater is None:
return (False, "No Heater defined")
id = kettle.heater
- self.cbpi.bus.fire(topic="actor/%s/on" % id, id=id, power=99)
+ await self.cbpi.bus.fire(topic="actor/%s/on" % id, id=id, power=99)
return (True,"Heater switched on")
async def heater_off(self, id):
@@ -153,7 +153,7 @@ class KettleController(CRUDController):
if kettle.heater is None:
return (False, "No Heater defined")
id = kettle.heater
- self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id, power=99)
+ await self.cbpi.bus.fire(topic="actor/%s/off" % id, id=id, power=99)
return (True, "Heater switched off")
async def agitator_on(self, id):
diff --git a/core/controller/notification_controller.py b/core/controller/notification_controller.py
index d44c8a8..1af74d9 100644
--- a/core/controller/notification_controller.py
+++ b/core/controller/notification_controller.py
@@ -17,5 +17,5 @@ class NotificationController():
@on_event(topic="notification/#")
- def _on_event(self, key, message, type, **kwargs):
+ async def _on_event(self, key, message, type, **kwargs):
self.cbpi.ws.send("YES")
\ No newline at end of file
diff --git a/core/controller/step_controller.py b/core/controller/step_controller.py
index 507efcf..d9365d5 100644
--- a/core/controller/step_controller.py
+++ b/core/controller/step_controller.py
@@ -42,7 +42,7 @@ class StepController(HttpAPI, CRUDController):
:param request: web requset
:return: web.Response(text="OK"
'''
- self.cbpi.bus.fire("step/action", action="test")
+ await self.cbpi.bus.fire("step/action", action="test")
return web.Response(text="OK")
@@ -56,7 +56,7 @@ class StepController(HttpAPI, CRUDController):
:return:
'''
- self.cbpi.bus.fire("step/start")
+ await self.cbpi.bus.fire("step/start")
return web.Response(text="OK")
@request_mapping(path="/reset", auth_required=False)
@@ -67,7 +67,7 @@ class StepController(HttpAPI, CRUDController):
:param request:
:return:
'''
- self.cbpi.bus.fire("step/reset")
+ await self.cbpi.bus.fire("step/reset")
return web.Response(text="OK")
@request_mapping(path="/next", auth_required=False)
@@ -79,11 +79,11 @@ class StepController(HttpAPI, CRUDController):
:param request:
:return:
'''
- self.cbpi.bus.fire("step/next")
+ await self.cbpi.bus.fire("step/next")
return web.Response(text="OK")
@on_event("step/action")
- def handle_action(self, action, **kwargs):
+ async def handle_action(self, action, **kwargs):
'''
Event Handler for "step/action".
@@ -99,7 +99,7 @@ class StepController(HttpAPI, CRUDController):
@on_event("step/next")
- def handle_next(self, **kwargs):
+ async def handle_next(self, **kwargs):
'''
Event Handler for "step/next".
It start the next step
@@ -114,7 +114,7 @@ class StepController(HttpAPI, CRUDController):
@on_event("step/start")
- def handle_start(self, **kwargs):
+ async def handle_start(self, **kwargs):
'''
Event Handler for "step/start".
It starts the brewing process
@@ -122,10 +122,10 @@ class StepController(HttpAPI, CRUDController):
:param kwargs:
:return: None
'''
- self.start()
+ await self.start()
@on_event("step/reset")
- def handle_reset(self, **kwargs):
+ async def handle_reset(self, **kwargs):
'''
Event Handler for "step/reset".
Resets the current step
@@ -142,10 +142,10 @@ class StepController(HttpAPI, CRUDController):
self.steps[self.current_step.id]["state"] = None
self.current_step = None
self.current_task = None
- self.start()
+ await self.start()
@on_event("step/stop")
- def handle_stop(self, **kwargs):
+ async def handle_stop(self, **kwargs):
'''
Event Handler for "step/stop".
Stops the current step
@@ -163,7 +163,7 @@ class StepController(HttpAPI, CRUDController):
self.current_step = None
@on_event("step/+/done")
- def handle_done(self, topic, **kwargs):
+ async def handle_done(self, topic, **kwargs):
'''
Event Handler for "step/+/done".
@@ -173,7 +173,7 @@ class StepController(HttpAPI, CRUDController):
:param kwargs:
:return:
'''
- self.start()
+ await self.start()
def _step_done(self, task):
@@ -181,7 +181,7 @@ class StepController(HttpAPI, CRUDController):
self.cache[self.current_step.id].state = "D"
step_id = self.current_step.id
self.current_step = None
- self.cbpi.bus.fire("step/%s/done" % step_id)
+ self.cbpi.bus.sync_fire("step/%s/done" % step_id)
def _get_manged_fields_as_array(self, type_cfg):
print("tYPE", type_cfg)
@@ -191,7 +191,7 @@ class StepController(HttpAPI, CRUDController):
return result
- def start(self):
+ async def start(self):
'''
Start the first step
@@ -215,7 +215,7 @@ class StepController(HttpAPI, CRUDController):
open_step = True
break
if open_step == False:
- self.cbpi.bus.fire("step/berwing/finished")
+ await self.cbpi.bus.fire("step/berwing/finished")
async def stop(self):
pass
diff --git a/core/craftbeerpi.py b/core/craftbeerpi.py
index 7f9fc1d..5b46435 100644
--- a/core/craftbeerpi.py
+++ b/core/craftbeerpi.py
@@ -222,7 +222,7 @@ class CraftBeerPi():
:param type: notification type (info,warning,danger,successs)
:return:
'''
- self.bus.fire(topic="notification/%s" % key, key=key, message=message, type=type)
+ self.bus.sync_fire(topic="notification/%s" % key, key=key, message=message, type=type)
def setup(self):
diff --git a/core/database/model.py b/core/database/model.py
index 0cb2a79..70eb1b8 100644
--- a/core/database/model.py
+++ b/core/database/model.py
@@ -7,6 +7,8 @@ class ActorModel(DBModel):
__json_fields__ = ["config"]
+
+
class SensorModel(DBModel):
__fields__ = ["name", "type", "config"]
__table_name__ = "sensor"
diff --git a/core/eventbus.py b/core/eventbus.py
index 7f56057..e42f2ac 100644
--- a/core/eventbus.py
+++ b/core/eventbus.py
@@ -3,6 +3,9 @@ import inspect
import logging
import json
+import time
+
+
class EventBus(object):
class Node(object):
__slots__ = '_children', '_content'
@@ -12,15 +15,37 @@ class EventBus(object):
self._content = None
class Content(object):
- def __init__(self, parent, topic, method, once):
+ def __init__(self, parent, topic, method, once, supports_future=False):
self.parent = parent
self.method = method
self.name = method.__name__
self.once = once
self.topic = topic
+ self.supports_future = supports_future
+
+ class Result():
+
+ def __init__(self, result, timeout):
+ self.result = result
+ self.timeout = timeout
+
+ class ResultContainer():
+
+ def __init__(self, results, timeout=False):
+ self.results = {}
+ self.timeout = timeout
+ for key, value in results.items():
+ if value.done() is True:
+ self.results[key] = EventBus.Result(value.result(), True)
+ else:
+ self.results[key] = EventBus.Result(None, False)
+
+
def register(self, topic, method, once=False):
+
+
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
@@ -31,7 +56,14 @@ class EventBus(object):
if not isinstance(node._content, list):
node._content = []
- c = self.Content(node, topic, method, once)
+ sig = inspect.signature(method)
+
+
+ if "future" in sig.parameters:
+ supports_future = True
+ else:
+ supports_future = False
+ c = self.Content(node, topic, method, once, supports_future)
node._content.append(c)
self.registry[method] = c
@@ -71,29 +103,36 @@ class EventBus(object):
print(self.loop)
- def fire(self, topic: str, **kwargs) -> None:
- self.logger.info("EMIT EVENT %s Data: %s", topic, kwargs)
+ def sync_fire(self,topic: str,timeout=1, **kwargs):
+ self.loop.create_task(self.fire(topic=topic, timeout=timeout, **kwargs))
+
+ async def fire(self, topic: str, timeout=1, **kwargs):
+
+ futures = {}
+
+ async def wait(futures):
+ if(len(futures) > 0):
+ await asyncio.wait(futures.values())
+
- #self.cbpi.ws.send(json.dumps(dict(topic=topic, data=dict(**kwargs))))
- trx = dict(i=0)
for e in self.iter_match(topic):
content_array = e
keep_idx = []
for idx, content_obj in enumerate(content_array):
if inspect.iscoroutinefunction(content_obj.method):
- if hasattr(content_obj.method, "future"):
+ if content_obj.supports_future is True:
+
+ fut = self.loop.create_future()
+
+ futures["%s.%s" % (content_obj.method.__module__, content_obj.name)] = fut
+ self.loop.create_task(content_obj.method(**kwargs, topic = topic, future=fut))
- self.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic))
else:
- self.loop.create_task(content_obj.method(**kwargs, topic = topic))
+ self.loop.create_task(content_obj.method(**kwargs, topic=topic))
else:
- if hasattr(content_obj.method, "future"):
- content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)
- else:
- content_obj.method(**kwargs, topic = topic)
-
-
+ # only asnyc
+ pass
if content_obj.once is False:
keep_idx.append(idx)
@@ -101,6 +140,13 @@ class EventBus(object):
if len(keep_idx) < len(e):
e[0].parent._content = [e[0].parent._content[i] for i in keep_idx]
+ if timeout is not None:
+ try:
+ await asyncio.wait_for(wait(futures), timeout=timeout)
+ is_timedout = False
+ except asyncio.TimeoutError:
+ is_timedout = True
+ return self.ResultContainer(futures, is_timedout)
def dump(self):
diff --git a/core/extension/comp/__init__.py b/core/extension/comp/__init__.py
index e510375..4d93574 100644
--- a/core/extension/comp/__init__.py
+++ b/core/extension/comp/__init__.py
@@ -31,14 +31,15 @@ class MyComp(CBPiExtension, CRUDController, HttpAPI):
@on_event(topic="actor/#")
- def listen(self, **kwargs):
+ async def listen(self, **kwargs):
print("Test", kwargs)
+
@on_event(topic="kettle/+/automatic")
- def listen2(self, **kwargs):
+ async def listen2(self, **kwargs):
print("HANDLE AUTOMATIC", kwargs)
- self.cbpi.bus.fire(topic="actor/%s/toggle" % 1, id=1)
+ await self.cbpi.bus.fire(topic="actor/%s/toggle" % 1, id=1)
def setup(cbpi):
diff --git a/core/extension/dummyactor/__init__.py b/core/extension/dummyactor/__init__.py
index 0d8442c..7b16688 100644
--- a/core/extension/dummyactor/__init__.py
+++ b/core/extension/dummyactor/__init__.py
@@ -8,6 +8,12 @@ class CustomActor(CBPiActor):
# Custom property which can be configured by the user
gpio = Property.Number(label="Test")
+ def init(self):
+ print("#########INIT MY CUSTOM ACTOR")
+
+ def stop(self):
+ print("#########STOP MY CUSTOM ACTOR")
+
@action(key="name", parameters={})
def myAction(self):
diff --git a/core/extension/dummysensor/__init__.py b/core/extension/dummysensor/__init__.py
index 87adfef..c751c98 100644
--- a/core/extension/dummysensor/__init__.py
+++ b/core/extension/dummysensor/__init__.py
@@ -37,7 +37,7 @@ class CustomSensor(CBPiSensor):
await asyncio.sleep(self.interval)
self.value = self.value + 1
- cbpi.bus.fire("sensor/%s" % self.id, value=self.value)
+ await cbpi.bus.fire("sensor/%s" % self.id, value=self.value)
print("SENSOR IS RUNNING", self.value)
diff --git a/core/http_endpoints/http_api.py b/core/http_endpoints/http_api.py
index 694e0f6..3fc3a8b 100644
--- a/core/http_endpoints/http_api.py
+++ b/core/http_endpoints/http_api.py
@@ -130,4 +130,4 @@ class HttpAPI():
"""
id = request.match_info['id']
await self.delete(id)
- return web.Response(str=204)
+ return web.Response(status=204)
diff --git a/core/job/_scheduler.py b/core/job/_scheduler.py
index 4c7fb58..75e8823 100644
--- a/core/job/_scheduler.py
+++ b/core/job/_scheduler.py
@@ -114,7 +114,7 @@ class Scheduler(*bases):
def _done(self, job):
print("JOB DONE")
- self.cbpi.bus.fire("job/done", key=job.name)
+ self.cbpi.bus.sync_fire("job/done", key=job.name)
self._jobs.discard(job)
if not self.pending_count:
return
diff --git a/core/utils/encoder.py b/core/utils/encoder.py
index 1ce1915..3f9eb29 100644
--- a/core/utils/encoder.py
+++ b/core/utils/encoder.py
@@ -10,13 +10,14 @@ class ComplexEncoder(JSONEncoder):
from core.database.model import ActorModel
from core.database.orm_framework import DBModel
from core.api.kettle_logic import CBPiKettleLogic
-
+ print("OBJECT", obj)
try:
if isinstance(obj, DBModel):
return obj.__dict__
-
- elif isinstance(obj, ActorModel):
- return None
+ elif callable(getattr(obj, "reprJSON")):
+ return obj.reprJSON()
+ #elif isinstance(obj, ActorModel):
+ # return None
elif hasattr(obj, "callback"):
return obj()
else:
diff --git a/core/websocket.py b/core/websocket.py
index 5b6f3d5..e76da9f 100644
--- a/core/websocket.py
+++ b/core/websocket.py
@@ -21,8 +21,9 @@ class WebSocket:
async def listen(self, topic, **kwargs):
print("WS", topic)
- self.send(json.dumps(dict(topic=topic, data=dict(**kwargs))))
+ from core.utils.encoder import ComplexEncoder
+ self.send(json.dumps(dict(topic=topic, data=dict(**kwargs)),skipkeys=True, check_circular=True, cls=ComplexEncoder))
def send(self, data):
@@ -64,7 +65,7 @@ class WebSocket:
else:
msg_obj = msg.json()
- self.cbpi.bus.fire(msg_obj["topic"], id=1, power=22)
+ await self.cbpi.bus.fire(msg_obj["topic"], id=1, power=22)
# await self.fire(msg_obj["key"], ws, msg)
# await ws.send_str(msg.data)
diff --git a/craftbeerpi.db b/craftbeerpi.db
index d028309..32086e0 100644
Binary files a/craftbeerpi.db and b/craftbeerpi.db differ
diff --git a/docs_src/source/standards.rst b/docs_src/source/standards.rst
index 2134fee..2d0b474 100644
--- a/docs_src/source/standards.rst
+++ b/docs_src/source/standards.rst
@@ -36,6 +36,8 @@ Here an example how listen on an event.
It's imporante to add **kwargs as parameter to the listening method. This makes sure that maybe addtional event paramenter are not causing an exception.
+A list of all registered events listeners can be found under: `http://:/system/events`
+
HTTP Endpoints
--------------
@@ -80,6 +82,13 @@ The WebSocket Event is having the following structure.
+SQL Files
+---------
+Currently only one SQL file for database initialisation is available.
+It's located under: `./core/sql`
+
+
+
Web User Interface
^^^^^^^^^^^^^^^^^^
The Web UI is based on ReactJS + Redux.
@@ -98,3 +107,8 @@ After server startup you can find the API documentaiton under: `http://