Event Bus cleanup added

This commit is contained in:
manuel83 2018-11-30 23:27:11 +01:00
parent 35ee2fbad9
commit e2d4485f1f
11 changed files with 516 additions and 664 deletions

File diff suppressed because it is too large Load diff

View file

@ -4,6 +4,7 @@ from core.api import request_mapping, on_event
from core.controller.crud_controller import CRUDController
from core.database.model import KettleModel
from core.http_endpoints.http_api import HttpAPI
from core.job.aiohttp import get_scheduler_from_app
from core.utils import json_dumps
@ -72,6 +73,15 @@ class KettleController(CRUDController):
self.cbpi.bus.fire(topic="kettle/%s/automatic" % id, id=id)
return (True, "Logic switched on switched")
@on_event(topic="job/done")
def job_stop(self, key, **kwargs) -> None:
name = key.split("_")
kettle = self.cache[int(name[2])]
kettle.instance = None
print("STOP KETTLE LOGIC", int(name[2]))
@on_event(topic="kettle/+/automatic")
async def handle_automtic_event(self, id, **kwargs):
@ -90,7 +100,7 @@ class KettleController(CRUDController):
if hasattr(kettle, "instance") is False:
kettle.instance = None
self._is_logic_running(id)
if kettle.instance is None:
if kettle.logic in self.types:
clazz = self.types.get("CustomKettleLogic")["class"]
@ -98,13 +108,15 @@ class KettleController(CRUDController):
cfg.update(dict(cbpi=self.cbpi))
kettle.instance = clazz(**cfg)
print("START LOGIC")
await self.cbpi.start_job(kettle.instance.run(), "Kettle_logic_%s" % kettle.id, "kettle_logic")
await self.cbpi.start_job(kettle.instance.run(), "Kettle_logic_%s" % kettle.id, "kettle_logic%s"%id)
else:
kettle.instance.running = False
kettle.instance = None
def _is_logic_running(self, kettle_id):
scheduler = get_scheduler_from_app(self.cbpi.app)
print("JOB KETTLE RUNNING", scheduler.is_running("Kettle_logic_%s"%kettle_id))
async def heater_on(self, id):
'''

View file

@ -35,4 +35,8 @@ class SystemController():
except:
pass
# await j.close()
return web.json_response(data=result)
return web.json_response(data=result)
@request_mapping("/events", method="GET", name="get_all_events", auth_required=False)
def get_all_events(self, request):
return web.json_response(data=self.cbpi.bus.dump())

View file

@ -49,8 +49,8 @@ class CraftBeerPi():
self.app = web.Application(middlewares=middlewares)
self.initializer = []
setup(self.app)
self.bus = EventBus(self)
setup(self.app, self)
self.bus = EventBus(self.app.loop)
self.ws = WebSocket(self)
self.actor = ActorController(self)
self.sensor = SensorController(self)
@ -232,6 +232,8 @@ class CraftBeerPi():
await self.sensor.init()
await self.actor.init()
await self.kettle.init()
import pprint
pprint.pprint(self.bus.dump())
async def load_plugins(app):
#await PluginController.load_plugin_list()

View file

@ -1,14 +1,9 @@
import asyncio
import inspect
import logging
import asyncio
class EventBus(object):
class Node(object):
__slots__ = '_children', '_content'
@ -16,11 +11,8 @@ class EventBus(object):
self._children = {}
self._content = None
class Content(object):
def __init__(self, parent, topic, method, once):
self.parent = parent
self.method = method
self.name = method.__name__
@ -28,11 +20,10 @@ class EventBus(object):
self.topic = topic
def register(self, topic, method, once=False):
print("REGISTER", topic, method)
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
node = self._root
for sym in topic.split('/'):
node = node._children.setdefault(sym, self.Node())
@ -40,16 +31,10 @@ class EventBus(object):
if not isinstance(node._content, list):
node._content = []
c = self.Content(node, topic, method, once)
node._content.append(c)
print(c, node._content, topic)
self.registry[method] = c
def get_callbacks(self, key):
try:
node = self._root
@ -61,7 +46,6 @@ class EventBus(object):
except KeyError:
raise KeyError(key)
def unregister(self, method):
self.logger.info("Unregister %s", method.__name__)
if method in self.registry:
@ -74,58 +58,67 @@ class EventBus(object):
if clean_idx is not None:
del content.parent._content[clean_idx]
'''
def unregister(self, key, method):
lst = []
try:
parent, node = None, self._root
for k in key.split('/'):
parent, node = node, node._children[k]
lst.append((parent, k, node))
except KeyError:
raise KeyError(key)
else: # cleanup
for parent, k, node in reversed(lst):
if node._children or node._content is not None:
break
del parent._children[k]
'''
def __init__(self, cbpi):
def __init__(self, loop):
self.logger = logging.getLogger(__name__)
self._root = self.Node()
self.registry = {}
self.docs = {}
self.cbpi = cbpi
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
print(self.loop)
def fire(self, topic: str, **kwargs) -> None:
print("#### FIRE", topic)
self.logger.info("EMIT EVENT %s", topic)
cleanup_methods = []
for content_array in self.iter_match(topic):
print(content_array)
cleanup = []
trx = dict(i=0)
for e in self.iter_match(topic):
content_array = e
keep_idx = []
for idx, content_obj in enumerate(content_array):
print("#################")
print("TOPIC", content_obj.method, content_obj.topic)
print("#################")
if inspect.iscoroutinefunction(content_obj.method):
self.cbpi.app.loop.create_task(content_obj.method(**kwargs, topic = topic))
if hasattr(content_obj.method, "future"):
self.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic))
else:
self.loop.create_task(content_obj.method(**kwargs, topic = topic))
else:
content_obj.method(**kwargs, topic = topic)
if hasattr(content_obj.method, "future"):
content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)
else:
content_obj.method(**kwargs, topic = topic)
if content_obj.once is True:
cleanup.append(idx)
for idx in cleanup:
del content_array[idx]
#if inspect.iscoroutinefunction(content_obj.method):
# self.loop.create_task(content_obj.method(**kwargs, trx=trx, topic=topic))
#else:
# content_obj.method(**kwargs, trx=trx, topic=topic)
if content_obj.once is False:
keep_idx.append(idx)
# FILTER only elements with are required
if len(keep_idx) < len(e):
e[0].parent._content = [e[0].parent._content[i] for i in keep_idx]
print(self._root)
print("#### FIRE END ######")
print("DONE", trx)
def dump(self):
def rec(node, i=0):
result = []
if node._content is not None:
for c in node._content:
result.append(dict(topic=c.topic, method=c.method.__name__, path=c.method.__module__, once=c.once))
if node._children is not None:
for c in node._children:
result = result + rec(node._children[c], i + 1)
return result
result = rec(self._root)
return result
def iter_match(self, topic):

132
core/eventbus3.py Normal file
View file

@ -0,0 +1,132 @@
import inspect
import logging
class EventBus(object):
def __init__(self, cbpi):
self.logger = logging.getLogger(__name__)
self._root = self.Node()
self.registry = {}
self.docs = {}
self.cbpi = cbpi
class Node(object):
__slots__ = '_children', '_content'
def __init__(self):
self._children = {}
self._content = None
class Content(object):
def __init__(self, parent, topic, method, once):
self.parent = parent
self.method = method
self.name = method.__name__
self.once = once
self.topic = topic
def register(self, topic, method, once=False):
print("REGISTER", topic, method)
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
node = self._root
for sym in topic.split('/'):
node = node._children.setdefault(sym, self.Node())
if not isinstance(node._content, list):
node._content = []
c = self.Content(node, topic, method, once)
node._content.append(c)
print(c, node._content, topic)
self.registry[method] = c
def get_callbacks(self, key):
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
def unregister(self, method):
self.logger.info("Unregister %s", method.__name__)
if method in self.registry:
content = self.registry[method]
clean_idx = None
for idx, content_obj in enumerate(content.parent._content):
if method == content_obj.method:
clean_idx = idx
break
if clean_idx is not None:
del content.parent._content[clean_idx]
def fire(self, topic: str, **kwargs) -> None:
self.logger.info("EMIT EVENT %s", topic)
cleanup_methods = []
for content_array in self.iter_match(topic):
print(content_array)
cleanup = []
for idx, content_obj in enumerate(content_array):
if inspect.iscoroutinefunction(content_obj.method):
if hasattr(content_obj.method, "future"):
self.cbpi.app.loop.create_task(content_obj.method(**kwargs, future=content_obj.method.future, topic=topic))
else:
self.cbpi.app.loop.create_task(content_obj.method(**kwargs, topic = topic))
else:
if hasattr(content_obj.method, "future"):
content_obj.method(**kwargs, future=content_obj.method.future, topic=topic)
else:
content_obj.method(**kwargs, topic = topic)
if content_obj.once is True:
cleanup.append(idx)
for idx in cleanup:
del content_array[idx]
def iter_match(self, topic):
lst = topic.split('/')
normal = not topic.startswith('$')
def rec(node, i=0):
if i == len(lst):
if node._content is not None:
yield node._content
else:
part = lst[i]
if part in node._children:
for content in rec(node._children[part], i + 1):
yield content
if '+' in node._children and (normal or i > 0):
for content in rec(node._children['+'], i + 1):
yield content
if '#' in node._children and (normal or i > 0):
content = node._children['#']._content
if content is not None:
yield content
return rec(self._root)

View file

@ -12,36 +12,29 @@ class CustomLogic(CBPiKettleLogic):
running = True
async def wait_for_event(self, topic, timeout=None):
async def wait_for_event(self, topic, callback=None, timeout=None):
future_obj = self.cbpi.app.loop.create_future()
async def callback(id, **kwargs):
print("---------------------------------- CALLBACK ----------------")
print(kwargs)
if int(id) == 1:
self.cbpi.bus.unregister(callback)
future_obj.set_result("HELLO")
elif int(id) == 2:
self.cbpi.bus.unregister(callback)
else:
print("ID", id)
async def default_callback(id, **kwargs):
future_obj.set_result("HELLO")
print("TOPIC", topic)
self.cbpi.bus.register(topic=topic, method=callback)
if callback is None:
self.cbpi.bus.register(topic=topic, method=default_callback)
else:
callback.future = future_obj
self.cbpi.bus.register(topic=topic, method=callback)
if timeout is not None:
try:
print("----> WAIT FOR FUTURE")
await asyncio.wait_for(future_obj, timeout=10.0)
print("------> RETURN RESULT")
await asyncio.wait_for(future_obj, timeout=timeout)
print("------> TIMEOUT")
return future_obj.result()
except asyncio.TimeoutError:
print('timeout!')
@ -54,7 +47,13 @@ class CustomLogic(CBPiKettleLogic):
async def run(self):
result = await self.wait_for_event("actor/+/on")
async def my_callback(id, **kwargs):
self.cbpi.bus.unregister(my_callback)
kwargs["future"].set_result("AMAZING")
return "OK"
result = await self.wait_for_event("actor/+/on", callback=my_callback)
print("THE RESULT", result)

View file

@ -10,13 +10,13 @@ import asyncio
from ._scheduler import Scheduler
async def create_scheduler(*, close_timeout=0.1, limit=100,
async def create_scheduler(cbpi,*, close_timeout=0.1, limit=100,
pending_limit=10000, exception_handler=None):
if exception_handler is not None and not callable(exception_handler):
raise TypeError('A callable object or None is expected, '
'got {!r}'.format(exception_handler))
loop = asyncio.get_event_loop()
return Scheduler(loop=loop, close_timeout=close_timeout,
return Scheduler(cbpi=cbpi,loop=loop, close_timeout=close_timeout,
limit=limit, pending_limit=pending_limit,
exception_handler=exception_handler)

View file

@ -13,9 +13,10 @@ else: # pragma: no cover
class Scheduler(*bases):
def __init__(self, *, close_timeout, limit, pending_limit,
def __init__(self, cbpi, *, close_timeout, limit, pending_limit,
exception_handler, loop):
self._loop = loop
self.cbpi = cbpi
self._jobs = set()
self._close_timeout = close_timeout
self._limit = limit
@ -111,6 +112,9 @@ class Scheduler(*bases):
return self._exception_handler
def _done(self, job):
print("JOB DONE")
self.cbpi.bus.fire("job/done", key=job.name)
self._jobs.discard(job)
if not self.pending_count:
return
@ -127,6 +131,13 @@ class Scheduler(*bases):
new_job._start()
i += 1
def is_running(self, name):
for j in self._jobs:
if name == j.name:
return True
return False
async def _wait_failed(self):
# a coroutine for waiting failed tasks
# without awaiting for failed tasks async raises a warning

View file

@ -40,9 +40,9 @@ def atomic(coro):
return wrapper
def setup(app, **kwargs):
def setup(app, cbpi, **kwargs):
async def on_startup(app):
app['AIOJOBS_SCHEDULER'] = await create_scheduler(**kwargs)
app['AIOJOBS_SCHEDULER'] = await create_scheduler(cbpi, **kwargs)
async def on_cleanup(app):
await app['AIOJOBS_SCHEDULER'].close()

View file

@ -1,6 +1,6 @@
import asyncio
from core.eventbus import EventBus
from core.eventbus3 import EventBus
async def waiter(event):