First Version of Step Controller

This commit is contained in:
manuel83 2018-12-03 22:16:03 +01:00
parent e2d4485f1f
commit 2b22f9ee4a
20 changed files with 1343 additions and 584 deletions

File diff suppressed because it is too large Load diff

11
core/api/step.py Normal file
View file

@ -0,0 +1,11 @@
class Step(object):
def __init__(self, key=None, cbpi=None):
self.cbpi = cbpi
self.id = key
async def run(self):
pass
def stop(self):
pass

View file

@ -17,7 +17,7 @@ class ActorHttp(HttpAPI):
:return:
"""
id = int(request.match_info['id'])
self.cbpi.bus.fire(topic="actor/%s/on" % id, id=id, power=99)
self.cbpi.bus.fire(topic="actor/%s/switch/on" % id, id=id, power=99)
return web.Response(status=204)
@ -90,16 +90,10 @@ class ActorController(ActorHttp, CRUDController):
self.cache[id].instance = clazz(**cfg)
print("gpIO", self.cache[id].instance, self.cache[id].instance.gpio)
@on_event(topic="actor/1/on")
def on1(self, **kwargs) -> None:
print("WOOOOHOOO111111")
@on_event(topic="actor/1/on")
def on3(self, **kwargs) -> None:
print("WOOOOHOOO22222")
@on_event(topic="actor/+/on")
@on_event(topic="actor/+/switch/on")
def on(self, id , power=100, **kwargs) -> None:
'''
Method to switch an actor on.
@ -115,7 +109,7 @@ class ActorController(ActorHttp, CRUDController):
if id in self.cache:
print("POWER ON")
actor = self.cache[id ].instance
print("ONNNNN", actor)
self.cbpi.bus.fire("actor/%s/on/ok" % id)
actor.on(power)
@on_event(topic="actor/+/toggle")

View file

@ -1,3 +1,5 @@
import re
from aiohttp import web
from core.api import request_mapping, on_event
@ -76,11 +78,13 @@ class KettleController(CRUDController):
@on_event(topic="job/done")
def job_stop(self, key, **kwargs) -> None:
name = key.split("_")
kettle = self.cache[int(name[2])]
kettle.instance = None
match = re.match("kettle_logic_(\d+)", key)
if match is not None:
kid = match.group(1)
kettle = self.cache[int(kid)]
kettle.instance = None
print("STOP KETTLE LOGIC", int(name[2]))
print("STOP KETTLE LOGIC", kid)
@on_event(topic="kettle/+/automatic")
async def handle_automtic_event(self, id, **kwargs):

View file

@ -13,6 +13,7 @@ from core.api.extension import CBPiExtension
from core.api.kettle_logic import CBPiKettleLogic
from core.api.property import Property
from core.api.sensor import CBPiSensor
from core.api.step import Step
from core.utils.utils import load_config, json_dumps
logger = logging.getLogger(__file__)
@ -101,13 +102,16 @@ class PluginController():
if issubclass(clazz, CBPiKettleLogic):
self.cbpi.kettle.types[name] = {"class": clazz, "config": self._parse_props(clazz)}
if issubclass(clazz, Step):
print("NAME", name)
self.cbpi.step.types[name] = {"class": clazz, "config": self._parse_props(clazz)}
if issubclass(clazz, CBPiExtension):
self.c = clazz(self.cbpi)
def _parse_props(self, cls):
print("PARSE", cls)
name = cls.__name__
result = {"name": name, "class": cls, "properties": [], "actions": []}

View file

@ -1,8 +1,69 @@
import asyncio
from aiohttp import web
from core.api import on_event, request_mapping
class StepController():
def __init__(self, cbpi):
self.cbpi = cbpi
self.types = {}
self.steps = {
1: dict(name="S1", config=dict(time=1), type="CustomStep", state=None),
2: dict(name="S2", config=dict(time=1), type="CustomStep", state=None),
3: dict(name="S3", config=dict(time=1), type="CustomStep", state=None)
}
self.current_step = None
self.cbpi.register(self, "/step")
async def start(self):
async def init(self):
#self.start()
pass
@request_mapping(path="/start", auth_required=False)
async def http_start(self, request):
self.cbpi.bus.fire("step/start")
return web.Response(text="OK")
@request_mapping(path="/reset", auth_required=False)
async def http_reset(self, request):
self.cbpi.bus.fire("step/reset")
return web.Response(text="OK")
@on_event("step/start")
def handle_start(self, topic, **kwargs):
self.start()
@on_event("step/reset")
def handle_rest(self, topic, **kwargs):
for key, step in self.steps.items():
step["state"] = None
self.current_step = Nonecd
@on_event("step/+/done")
def handle(self, topic, **kwargs):
self.start()
def _step_done(self, task):
self.steps[self.current_step.id]["state"] = "D"
step_id = self.current_step.id
self.current_step = None
self.cbpi.bus.fire("step/%s/done" % step_id)
def start(self):
if self.current_step is None:
loop = asyncio.get_event_loop()
open_step = False
for key, step in self.steps.items():
if step["state"] is None:
type = self.types.get(step["type"])
self.current_step = type["class"](key, self.cbpi)
task = loop.create_task(self.current_step.run())
task.add_done_callback(self._step_done)
open_step = True
break
if open_step == False:
self.cbpi.bus.fire("step/berwing/finished")
async def stop(self):
pass

View file

@ -12,6 +12,7 @@ from aiohttp_swagger import setup_swagger
from core.controller.config_controller import ConfigController
from core.controller.kettle_controller import KettleController
from core.controller.step_controller import StepController
from core.job.aiohttp import setup, get_scheduler_from_app
from core.controller.actor_controller import ActorController
@ -58,10 +59,13 @@ class CraftBeerPi():
self.system = SystemController(self)
self.config2 = ConfigController(self)
self.kettle = KettleController(self)
self.step = StepController(self)
self.notification = NotificationController(self)
self.login = Login(self)
self.register_events(self.ws)
def register_events(self, obj):
@ -173,6 +177,7 @@ class CraftBeerPi():
switcher[http_method]()
if url_prefix is not None:
print("Prefx", url_prefix)
sub = web.Application()
sub.add_routes(routes)
self.app.add_subapp(url_prefix, sub)
@ -230,11 +235,14 @@ class CraftBeerPi():
async def init_controller(app):
await self.sensor.init()
await self.step.init()
await self.actor.init()
await self.kettle.init()
import pprint
pprint.pprint(self.bus.dump())
async def load_plugins(app):
#await PluginController.load_plugin_list()
await self.plugin.load_plugins()

View file

@ -71,7 +71,7 @@ class EventBus(object):
print(self.loop)
def fire(self, topic: str, **kwargs) -> None:
self.logger.info("EMIT EVENT %s", topic)
self.logger.info("EMIT EVENT %s Data: %s", topic, kwargs)
trx = dict(i=0)
for e in self.iter_match(topic):
@ -91,10 +91,7 @@ class EventBus(object):
else:
content_obj.method(**kwargs, topic = topic)
#if inspect.iscoroutinefunction(content_obj.method):
# self.loop.create_task(content_obj.method(**kwargs, trx=trx, topic=topic))
#else:
# content_obj.method(**kwargs, trx=trx, topic=topic)
if content_obj.once is False:
keep_idx.append(idx)
@ -102,7 +99,7 @@ class EventBus(object):
if len(keep_idx) < len(e):
e[0].parent._content = [e[0].parent._content[i] for i in keep_idx]
print("DONE", trx)
def dump(self):
def rec(node, i=0):

View file

@ -1,132 +0,0 @@
import inspect
import logging
class EventBus(object):
def __init__(self, cbpi):
self.logger = logging.getLogger(__name__)
self._root = self.Node()
self.registry = {}
self.docs = {}
self.cbpi = cbpi
class Node(object):
__slots__ = '_children', '_content'
def __init__(self):
self._children = {}
self._content = None
class Content(object):
def __init__(self, parent, topic, method, once):
self.parent = parent
self.method = method
self.name = method.__name__
self.once = once
self.topic = topic
def register(self, topic, method, once=False):
print("REGISTER", topic, method)
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
node = self._root
for sym in topic.split('/'):
node = node._children.setdefault(sym, self.Node())
if not isinstance(node._content, list):
node._content = []
c = self.Content(node, topic, method, once)
node._content.append(c)
print(c, node._content, topic)
self.registry[method] = c
def get_callbacks(self, key):
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
def unregister(self, method):
self.logger.info("Unregister %s", method.__name__)
if method in self.registry:
content = self.registry[method]
clean_idx = None
for idx, content_obj in enumerate(content.parent._content):
if method == content_obj.method:
clean_idx = idx
break
if clean_idx is not None:
del content.parent._content[clean_idx]
def fire(self, topic: str, **kwargs) -> None:
self.logger.info("EMIT EVENT %s", topic)
cleanup_methods = []
for content_array in self.iter_match(topic):
print(content_array)
cleanup = []
for idx, content_obj in enumerate(content_array):
if inspect.iscoroutinefunction(content_obj.method):
if hasattr(content_obj.method, "future"):
self.cbpi.app.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic))
else:
self.cbpi.app.loop.create_task(content_obj.method(**kwargs, topic = topic))
else:
if hasattr(content_obj.method, "future"):
content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)
else:
content_obj.method(**kwargs, topic = topic)
if content_obj.once is True:
cleanup.append(idx)
for idx in cleanup:
del content_array[idx]
def iter_match(self, topic):
lst = topic.split('/')
normal = not topic.startswith('$')
def rec(node, i=0):
if i == len(lst):
if node._content is not None:
yield node._content
else:
part = lst[i]
if part in node._children:
for content in rec(node._children[part], i + 1):
yield content
if '+' in node._children and (normal or i > 0):
for content in rec(node._children['+'], i + 1):
yield content
if '#' in node._children and (normal or i > 0):
content = node._children['#']._content
if content is not None:
yield content
return rec(self._root)

View file

@ -23,7 +23,7 @@ class CustomActor(CBPiActor):
def on(self, power=100):
print("###### ON", self.gpio)
self.state = True

View file

@ -48,12 +48,15 @@ class CustomLogic(CBPiKettleLogic):
async def run(self):
async def my_callback(id, **kwargs):
self.cbpi.bus.unregister(my_callback)
kwargs["future"].set_result("AMAZING")
return "OK"
async def my_callback(value, **kwargs):
result = await self.wait_for_event("actor/+/on", callback=my_callback)
if value == 5:
self.cbpi.bus.unregister(my_callback)
kwargs["future"].set_result("AMAZING")
else:
print("OTHER VALUE", value)
result = await self.wait_for_event("sensor/1", callback=my_callback)
print("THE RESULT", result)
@ -68,7 +71,7 @@ class CustomLogic(CBPiKettleLogic):
break
await asyncio.sleep(1)
'''
print("STOP LOGIC")
print("YES IM FINISHED STOP LOGIC")
def setup(cbpi):

View file

@ -1,5 +1,6 @@
import asyncio
import logging
import random
from core.api import CBPiActor, Property, action, background_task
from core.api.sensor import CBPiSensor
@ -27,10 +28,12 @@ class CustomSensor(CBPiSensor):
async def run(self, cbpi):
self.value = 0
while True:
await asyncio.sleep(self.interval)
#await asyncio.sleep(self.interval)
await asyncio.sleep(random.uniform(0, 1))
self.value = self.value + 1
print("SENSOR IS RUNNING")
cbpi.bus.fire("sensor/%s" % self.id, value=self.value)
print("SENSOR IS RUNNING", self.value)

View file

@ -0,0 +1,25 @@
import asyncio
from core.api.step import Step
class CustomStep(Step):
async def run(self):
i = 0
while i < 3:
await asyncio.sleep(1)
print("RUN STEP")
i = i + 1
def setup(cbpi):
'''
This method is called by the server during startup
Here you need to register your plugins at the server
:param cbpi: the cbpi core
:return:
'''
cbpi.plugin.register("CustomStep", CustomStep)

View file

@ -0,0 +1,2 @@
name: Manuel
version: 4

View file

@ -1,11 +1,13 @@
import logging
import weakref
from collections import defaultdict
import json
import aiohttp
from aiohttp import web
from typing import Iterable, Callable
from core.api import on_event
class WebSocket:
def __init__(self, cbpi) -> None:
@ -15,6 +17,13 @@ class WebSocket:
self.logger = logging.getLogger(__name__)
self.cbpi.app.add_routes([web.get('/ws', self.websocket_handler)])
@on_event(topic="#")
async def listen(self, topic, **kwargs):
print("WS", topic)
self.send(json.dumps(dict(topic=topic, data=dict(**kwargs))))
def send(self, data):
for ws in self._clients:

Binary file not shown.

BIN
hello.mp3 Normal file

Binary file not shown.

BIN
hello3.mp3 Normal file

Binary file not shown.

View file

@ -1,4 +1,5 @@
import asyncio
import re
from core.eventbus3 import EventBus
@ -66,6 +67,9 @@ id2 = bus.register("test/name", test2)
print(id1, id2)
from gtts import gTTS
print(hex(id(test2)))
@ -77,4 +81,10 @@ bus.fire("test/name")
bus.unregister(test2)
bus.fire("test/name")
bus.fire("test/name")
m0 = re.match("kettle_logic_(\d+)", "kettle_1logic_22")
print(m0)
#print(m0.group(1))

View file

@ -14,6 +14,17 @@ class KettleTestCase(AioHTTPTestCase):
@unittest_run_loop
async def test_example(self):
assert await self.cbpi.kettle.toggle_automtic(1) is True
assert await self.cbpi.kettle.toggle_automtic(1) is True
assert await self.cbpi.kettle.toggle_automtic(99) is False
result = await self.cbpi.kettle.toggle_automtic(1)
print("#### RESULT", result)
assert result[0] is True
print("FIRE")
await asyncio.sleep(1)
self.cbpi.bus.fire("actor/1/on", id=1)
await asyncio.sleep(5)
#assert await self.cbpi.kettle.toggle_automtic(1) is True
#assert await self.cbpi.kettle.toggle_automtic(99) is False