diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 15cbccd..779ef2a 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,23 +2,16 @@ - - + - - - - - - + - - - - - + + + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + @@ -176,35 +117,101 @@ - - + + - + - - + + - - + + - - + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -216,11 +223,6 @@ - try - logging - theme - slee - setup swagger du dum @@ -246,6 +248,11 @@ __init__ 0x1048b31d0 Content + asyncio + _done + _jobs + bus + futur @@ -264,7 +271,6 @@ @@ -345,53 +352,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -399,130 +359,8 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -539,36 +377,6 @@ + + + + - - - - - - - - - - - - - - - - - - - @@ -1294,18 +979,19 @@ + + - - - + + @@ -1337,11 +1023,6 @@ file:///usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py 25 - - file://$PROJECT_DIR$/core/eventbus.py - 101 - @@ -1359,52 +1040,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1477,7 +1112,6 @@ - @@ -1489,28 +1123,28 @@ - - - - - + + + + + - - - - - + + + + + @@ -1526,14 +1160,6 @@ - - - - - - - - @@ -1603,13 +1229,6 @@ - - - - - - - @@ -1632,23 +1251,6 @@ - - - - - - - - - - - - - - - - - @@ -1657,14 +1259,6 @@ - - - - - - - - @@ -1677,7 +1271,6 @@ - @@ -1699,26 +1292,10 @@ - + - - - - - - - - - - - - - - - - - - + + @@ -1731,30 +1308,152 @@ - + - + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - + diff --git a/core/controller/kettle_controller.py b/core/controller/kettle_controller.py index d185595..9817e4e 100644 --- a/core/controller/kettle_controller.py +++ b/core/controller/kettle_controller.py @@ -4,6 +4,7 @@ from core.api import request_mapping, on_event from core.controller.crud_controller import CRUDController from core.database.model import KettleModel from core.http_endpoints.http_api import HttpAPI +from core.job.aiohttp import get_scheduler_from_app from core.utils import json_dumps @@ -72,6 +73,15 @@ class KettleController(CRUDController): self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id) return (True, "Logic switched on switched") + @on_event(topic="job/done") + def job_stop(self, key, **kwargs) -> None: + + name = key.split("_") + kettle = self.cache[int(name[2])] + kettle.instance = None + + print("STOP KETTLE LOGIC", int(name[2])) + @on_event(topic="kettle/+/automatic") async def handle_automtic_event(self, id, **kwargs): @@ -90,7 +100,7 @@ class KettleController(CRUDController): if hasattr(kettle, "instance") is False: kettle.instance = None - + self._is_logic_running(id) if kettle.instance is None: if kettle.logic in self.types: clazz = self.types.get("CustomKettleLogic")["class"] @@ -98,13 +108,15 @@ class KettleController(CRUDController): cfg.update(dict(cbpi=self.cbpi)) kettle.instance = clazz(**cfg) print("START LOGIC") - await self.cbpi.start_job(kettle.instance.run(), "Kettle_logic_%s" % kettle.id, "kettle_logic") + await self.cbpi.start_job(kettle.instance.run(), "Kettle_logic_%s" % kettle.id, "kettle_logic%s"%id) else: kettle.instance.running = False kettle.instance = None - + def _is_logic_running(self, kettle_id): + scheduler = get_scheduler_from_app(self.cbpi.app) + print("JOB KETTLE RUNNING", scheduler.is_running("Kettle_logic_%s"%kettle_id)) async def heater_on(self, id): ''' diff --git a/core/controller/system_controller.py b/core/controller/system_controller.py index 64e0ae5..57ad276 100644 --- a/core/controller/system_controller.py +++ b/core/controller/system_controller.py @@ -35,4 +35,8 @@ class SystemController(): except: pass # await j.close() - return web.json_response(data=result) \ No newline at end of file + return web.json_response(data=result) + + @request_mapping("/events", method="GET", name="get_all_events", auth_required=False) + def get_all_events(self, request): + return web.json_response(data=self.cbpi.bus.dump()) \ No newline at end of file diff --git a/core/craftbeerpi.py b/core/craftbeerpi.py index 7c66f15..661c91f 100644 --- a/core/craftbeerpi.py +++ b/core/craftbeerpi.py @@ -49,8 +49,8 @@ class CraftBeerPi(): self.app = web.Application(middlewares=middlewares) self.initializer = [] - setup(self.app) - self.bus = EventBus(self) + setup(self.app, self) + self.bus = EventBus(self.app.loop) self.ws = WebSocket(self) self.actor = ActorController(self) self.sensor = SensorController(self) @@ -232,6 +232,8 @@ class CraftBeerPi(): await self.sensor.init() await self.actor.init() await self.kettle.init() + import pprint + pprint.pprint(self.bus.dump()) async def load_plugins(app): #await PluginController.load_plugin_list() diff --git a/core/eventbus.py b/core/eventbus.py index 7a75e82..b9cb89e 100644 --- a/core/eventbus.py +++ b/core/eventbus.py @@ -1,14 +1,9 @@ +import asyncio import inspect import logging -import asyncio - class EventBus(object): - - - - class Node(object): __slots__ = '_children', '_content' @@ -16,11 +11,8 @@ class EventBus(object): self._children = {} self._content = None - - class Content(object): def __init__(self, parent, topic, method, once): - self.parent = parent self.method = method self.name = method.__name__ @@ -28,11 +20,10 @@ class EventBus(object): self.topic = topic def register(self, topic, method, once=False): - print("REGISTER", topic, method) + if method in self.registry: raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__) self.logger.info("Topic %s", topic) - node = self._root for sym in topic.split('/'): node = node._children.setdefault(sym, self.Node()) @@ -40,16 +31,10 @@ class EventBus(object): if not isinstance(node._content, list): node._content = [] - c = self.Content(node, topic, method, once) - - node._content.append(c) - print(c, node._content, topic) self.registry[method] = c - - def get_callbacks(self, key): try: node = self._root @@ -61,7 +46,6 @@ class EventBus(object): except KeyError: raise KeyError(key) - def unregister(self, method): self.logger.info("Unregister %s", method.__name__) if method in self.registry: @@ -74,58 +58,67 @@ class EventBus(object): if clean_idx is not None: del content.parent._content[clean_idx] - ''' - def unregister(self, key, method): - lst = [] - try: - parent, node = None, self._root - for k in key.split('/'): - parent, node = node, node._children[k] - lst.append((parent, k, node)) - except KeyError: - raise KeyError(key) - else: # cleanup - for parent, k, node in reversed(lst): - if node._children or node._content is not None: - break - del parent._children[k] - ''' - - def __init__(self, cbpi): + def __init__(self, loop): self.logger = logging.getLogger(__name__) self._root = self.Node() self.registry = {} self.docs = {} - self.cbpi = cbpi + if loop is not None: + self.loop = loop + else: + self.loop = asyncio.get_event_loop() + + print(self.loop) def fire(self, topic: str, **kwargs) -> None: - print("#### FIRE", topic) self.logger.info("EMIT EVENT %s", topic) - cleanup_methods = [] - for content_array in self.iter_match(topic): - - print(content_array) - cleanup = [] + trx = dict(i=0) + for e in self.iter_match(topic): + content_array = e + keep_idx = [] for idx, content_obj in enumerate(content_array): - print("#################") - print("TOPIC", content_obj.method, content_obj.topic) - print("#################") if inspect.iscoroutinefunction(content_obj.method): - self.cbpi.app.loop.create_task(content_obj.method(**kwargs, topic = topic)) + if hasattr(content_obj.method, "future"): + + self.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)) + else: + self.loop.create_task(content_obj.method(**kwargs, topic = topic)) else: - content_obj.method(**kwargs, topic = topic) + if hasattr(content_obj.method, "future"): + content_obj.method(**kwargs, future=content_obj.method.future, topic=topic) + else: + content_obj.method(**kwargs, topic = topic) - if content_obj.once is True: - cleanup.append(idx) - for idx in cleanup: - del content_array[idx] + #if inspect.iscoroutinefunction(content_obj.method): + # self.loop.create_task(content_obj.method(**kwargs, trx=trx, topic=topic)) + #else: + # content_obj.method(**kwargs, trx=trx, topic=topic) + if content_obj.once is False: + keep_idx.append(idx) + # FILTER only elements with are required + if len(keep_idx) < len(e): + e[0].parent._content = [e[0].parent._content[i] for i in keep_idx] - print(self._root) - print("#### FIRE END ######") + print("DONE", trx) + def dump(self): + def rec(node, i=0): + result = [] + if node._content is not None: + for c in node._content: + result.append(dict(topic=c.topic, method=c.method.__name__, path=c.method.__module__, once=c.once)) + + if node._children is not None: + for c in node._children: + result = result + rec(node._children[c], i + 1) + return result + + result = rec(self._root) + + return result def iter_match(self, topic): diff --git a/core/eventbus3.py b/core/eventbus3.py new file mode 100644 index 0000000..d6b0cfd --- /dev/null +++ b/core/eventbus3.py @@ -0,0 +1,132 @@ +import inspect +import logging + +class EventBus(object): + + def __init__(self, cbpi): + self.logger = logging.getLogger(__name__) + self._root = self.Node() + self.registry = {} + self.docs = {} + self.cbpi = cbpi + + class Node(object): + __slots__ = '_children', '_content' + + def __init__(self): + self._children = {} + self._content = None + + + class Content(object): + def __init__(self, parent, topic, method, once): + + self.parent = parent + self.method = method + self.name = method.__name__ + self.once = once + self.topic = topic + + def register(self, topic, method, once=False): + print("REGISTER", topic, method) + if method in self.registry: + raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__) + self.logger.info("Topic %s", topic) + + node = self._root + for sym in topic.split('/'): + node = node._children.setdefault(sym, self.Node()) + + if not isinstance(node._content, list): + node._content = [] + + + c = self.Content(node, topic, method, once) + + + node._content.append(c) + print(c, node._content, topic) + self.registry[method] = c + + + + def get_callbacks(self, key): + try: + node = self._root + for sym in key.split('/'): + node = node._children[sym] + if node._content is None: + raise KeyError(key) + return node._content + except KeyError: + raise KeyError(key) + + + def unregister(self, method): + self.logger.info("Unregister %s", method.__name__) + if method in self.registry: + content = self.registry[method] + clean_idx = None + for idx, content_obj in enumerate(content.parent._content): + if method == content_obj.method: + clean_idx = idx + break + if clean_idx is not None: + del content.parent._content[clean_idx] + + + + def fire(self, topic: str, **kwargs) -> None: + + self.logger.info("EMIT EVENT %s", topic) + + cleanup_methods = [] + for content_array in self.iter_match(topic): + + print(content_array) + cleanup = [] + for idx, content_obj in enumerate(content_array): + + if inspect.iscoroutinefunction(content_obj.method): + if hasattr(content_obj.method, "future"): + + self.cbpi.app.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)) + else: + self.cbpi.app.loop.create_task(content_obj.method(**kwargs, topic = topic)) + else: + if hasattr(content_obj.method, "future"): + content_obj.method(**kwargs, future=content_obj.method.future, topic=topic) + else: + content_obj.method(**kwargs, topic = topic) + + if content_obj.once is True: + cleanup.append(idx) + for idx in cleanup: + del content_array[idx] + + + + + def iter_match(self, topic): + + lst = topic.split('/') + normal = not topic.startswith('$') + + def rec(node, i=0): + if i == len(lst): + if node._content is not None: + yield node._content + else: + part = lst[i] + if part in node._children: + for content in rec(node._children[part], i + 1): + yield content + if '+' in node._children and (normal or i > 0): + for content in rec(node._children['+'], i + 1): + yield content + if '#' in node._children and (normal or i > 0): + content = node._children['#']._content + if content is not None: + yield content + + return rec(self._root) diff --git a/core/extension/dummylogic/__init__.py b/core/extension/dummylogic/__init__.py index 88f6593..6448635 100644 --- a/core/extension/dummylogic/__init__.py +++ b/core/extension/dummylogic/__init__.py @@ -12,36 +12,29 @@ class CustomLogic(CBPiKettleLogic): running = True - async def wait_for_event(self, topic, timeout=None): + async def wait_for_event(self, topic, callback=None, timeout=None): future_obj = self.cbpi.app.loop.create_future() - async def callback(id, **kwargs): - print("---------------------------------- CALLBACK ----------------") - print(kwargs) - - if int(id) == 1: - self.cbpi.bus.unregister(callback) - future_obj.set_result("HELLO") - elif int(id) == 2: - self.cbpi.bus.unregister(callback) - else: - print("ID", id) + async def default_callback(id, **kwargs): + future_obj.set_result("HELLO") - - print("TOPIC", topic) - self.cbpi.bus.register(topic=topic, method=callback) + if callback is None: + self.cbpi.bus.register(topic=topic, method=default_callback) + else: + callback.future = future_obj + self.cbpi.bus.register(topic=topic, method=callback) if timeout is not None: try: print("----> WAIT FOR FUTURE") - await asyncio.wait_for(future_obj, timeout=10.0) - print("------> RETURN RESULT") + await asyncio.wait_for(future_obj, timeout=timeout) + print("------> TIMEOUT") return future_obj.result() except asyncio.TimeoutError: print('timeout!') @@ -54,7 +47,13 @@ class CustomLogic(CBPiKettleLogic): async def run(self): - result = await self.wait_for_event("actor/+/on") + + async def my_callback(id, **kwargs): + self.cbpi.bus.unregister(my_callback) + kwargs["future"].set_result("AMAZING") + return "OK" + + result = await self.wait_for_event("actor/+/on", callback=my_callback) print("THE RESULT", result) diff --git a/core/job/__init__.py b/core/job/__init__.py index 3df543f..0c5dddb 100644 --- a/core/job/__init__.py +++ b/core/job/__init__.py @@ -10,13 +10,13 @@ import asyncio from ._scheduler import Scheduler -async def create_scheduler(*, close_timeout=0.1, limit=100, +async def create_scheduler(cbpi,*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None): if exception_handler is not None and not callable(exception_handler): raise TypeError('A callable object or None is expected, ' 'got {!r}'.format(exception_handler)) loop = asyncio.get_event_loop() - return Scheduler(loop=loop, close_timeout=close_timeout, + return Scheduler(cbpi=cbpi,loop=loop, close_timeout=close_timeout, limit=limit, pending_limit=pending_limit, exception_handler=exception_handler) diff --git a/core/job/_scheduler.py b/core/job/_scheduler.py index 6019c6b..4c7fb58 100644 --- a/core/job/_scheduler.py +++ b/core/job/_scheduler.py @@ -13,9 +13,10 @@ else: # pragma: no cover class Scheduler(*bases): - def __init__(self, *, close_timeout, limit, pending_limit, + def __init__(self, cbpi, *, close_timeout, limit, pending_limit, exception_handler, loop): self._loop = loop + self.cbpi = cbpi self._jobs = set() self._close_timeout = close_timeout self._limit = limit @@ -111,6 +112,9 @@ class Scheduler(*bases): return self._exception_handler def _done(self, job): + + print("JOB DONE") + self.cbpi.bus.fire("job/done", key=job.name) self._jobs.discard(job) if not self.pending_count: return @@ -127,6 +131,13 @@ class Scheduler(*bases): new_job._start() i += 1 + def is_running(self, name): + + for j in self._jobs: + if name == j.name: + return True + return False + async def _wait_failed(self): # a coroutine for waiting failed tasks # without awaiting for failed tasks async raises a warning diff --git a/core/job/aiohttp.py b/core/job/aiohttp.py index d11d409..1f78204 100644 --- a/core/job/aiohttp.py +++ b/core/job/aiohttp.py @@ -40,9 +40,9 @@ def atomic(coro): return wrapper -def setup(app, **kwargs): +def setup(app, cbpi, **kwargs): async def on_startup(app): - app['AIOJOBS_SCHEDULER'] = await create_scheduler(**kwargs) + app['AIOJOBS_SCHEDULER'] = await create_scheduler(cbpi, **kwargs) async def on_cleanup(app): await app['AIOJOBS_SCHEDULER'].close() diff --git a/main2.py b/main2.py index 23a7a00..023194e 100644 --- a/main2.py +++ b/main2.py @@ -1,6 +1,6 @@ import asyncio -from core.eventbus import EventBus +from core.eventbus3 import EventBus async def waiter(event):