craftbeerpi4-pione/cbpi/eventbus.py

209 lines
6.7 KiB
Python
Raw Normal View History

2018-11-30 23:27:11 +01:00
import asyncio
2018-11-18 23:09:17 +01:00
import inspect
2018-11-01 19:50:04 +01:00
import logging
2018-12-13 21:45:33 +01:00
2019-01-05 20:43:48 +01:00
from cbpi.api import *
2019-01-02 00:48:36 +01:00
2018-12-13 21:45:33 +01:00
class CBPiEventBus(object):
2019-01-05 20:43:48 +01:00
2018-11-01 19:50:04 +01:00
class Node(object):
__slots__ = '_children', '_content'
def __init__(self):
self._children = {}
self._content = None
2018-11-29 21:59:08 +01:00
class Content(object):
2018-12-13 21:45:33 +01:00
def __init__(self, parent, topic, method, once, supports_future=False):
2018-11-29 21:59:08 +01:00
self.parent = parent
self.method = method
self.name = method.__name__
self.once = once
self.topic = topic
2018-12-13 21:45:33 +01:00
self.supports_future = supports_future
2019-01-05 20:43:48 +01:00
class Result:
2018-12-13 21:45:33 +01:00
def __init__(self, result, timeout):
self.result = result
self.timeout = timeout
2019-01-05 20:43:48 +01:00
class ResultContainer:
2018-12-13 21:45:33 +01:00
def __init__(self, results, timeout=False):
self.results = {}
self.timeout = timeout
2019-01-05 20:43:48 +01:00
self._jobs = set()
2018-12-13 21:45:33 +01:00
for key, value in results.items():
if value.done() is True:
self.results[key] = CBPiEventBus.Result(value.result(), True)
2018-12-13 21:45:33 +01:00
else:
self.results[key] = CBPiEventBus.Result(None, False)
2018-12-13 21:45:33 +01:00
2019-01-02 00:48:36 +01:00
def get(self, key):
r = self.results.get(key)
if r is None:
raise CBPiException("Event Key %s not found." % key)
return (r.result, r.timeout)
2018-12-13 21:45:33 +01:00
2018-11-29 21:59:08 +01:00
def register(self, topic, method, once=False):
2018-11-30 23:27:11 +01:00
2018-11-29 21:59:08 +01:00
if method in self.registry:
raise RuntimeError("Method %s already registerd. Please unregister first!" % method.__name__)
self.logger.info("Topic %s", topic)
2019-01-04 09:29:09 +01:00
2018-11-01 19:50:04 +01:00
node = self._root
2018-11-29 21:59:08 +01:00
for sym in topic.split('/'):
2018-11-01 19:50:04 +01:00
node = node._children.setdefault(sym, self.Node())
if not isinstance(node._content, list):
node._content = []
2018-11-29 21:59:08 +01:00
2018-12-13 21:45:33 +01:00
sig = inspect.signature(method)
if "future" in sig.parameters:
supports_future = True
else:
supports_future = False
c = self.Content(node, topic, method, once, supports_future)
2018-11-29 21:59:08 +01:00
node._content.append(c)
self.registry[method] = c
2018-11-01 19:50:04 +01:00
def get_callbacks(self, key):
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
2018-11-29 21:59:08 +01:00
def unregister(self, method):
self.logger.info("Unregister %s", method.__name__)
if method in self.registry:
content = self.registry[method]
clean_idx = None
for idx, content_obj in enumerate(content.parent._content):
if method == content_obj.method:
clean_idx = idx
break
if clean_idx is not None:
del content.parent._content[clean_idx]
2018-12-05 07:31:12 +01:00
def __init__(self, loop, cbpi):
2018-11-01 19:50:04 +01:00
self.logger = logging.getLogger(__name__)
2018-12-05 07:31:12 +01:00
self.cbpi = cbpi
2018-11-01 19:50:04 +01:00
self._root = self.Node()
2018-11-29 21:59:08 +01:00
self.registry = {}
2018-11-01 19:50:04 +01:00
self.docs = {}
2018-11-30 23:27:11 +01:00
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
2018-12-29 00:27:19 +01:00
2018-12-13 21:45:33 +01:00
def sync_fire(self,topic: str,timeout=1, **kwargs):
self.loop.create_task(self.fire(topic=topic, timeout=timeout, **kwargs))
2019-01-05 20:43:48 +01:00
async def fire(self, topic: str, timeout=0.5, **kwargs):
2018-12-13 21:45:33 +01:00
futures = {}
2019-01-17 22:11:55 +01:00
2021-01-17 22:49:18 +01:00
#self.logger.info("FIRE %s %s" % (topic, kwargs))
2019-01-14 07:33:59 +01:00
2018-12-13 21:45:33 +01:00
async def wait(futures):
if(len(futures) > 0):
await asyncio.wait(futures.values())
2018-11-29 21:59:08 +01:00
2018-11-30 23:27:11 +01:00
for e in self.iter_match(topic):
content_array = e
keep_idx = []
2018-11-29 21:59:08 +01:00
for idx, content_obj in enumerate(content_array):
if inspect.iscoroutinefunction(content_obj.method):
2018-12-13 21:45:33 +01:00
if content_obj.supports_future is True:
fut = self.loop.create_future()
futures["%s.%s" % (content_obj.method.__module__, content_obj.name)] = fut
self.loop.create_task(content_obj.method(**kwargs, topic = topic, future=fut))
2018-12-05 07:31:12 +01:00
2018-12-13 21:45:33 +01:00
else:
self.loop.create_task(content_obj.method(**kwargs, topic=topic))
else:
# only asnyc
pass
2018-11-30 23:27:11 +01:00
if content_obj.once is False:
keep_idx.append(idx)
2018-11-29 21:59:08 +01:00
2018-11-30 23:27:11 +01:00
# FILTER only elements with are required
if len(keep_idx) < len(e):
e[0].parent._content = [e[0].parent._content[i] for i in keep_idx]
2018-12-13 21:45:33 +01:00
if timeout is not None:
try:
await asyncio.wait_for(wait(futures), timeout=timeout)
is_timedout = False
except asyncio.TimeoutError:
is_timedout = True
return self.ResultContainer(futures, is_timedout)
2018-12-03 22:16:03 +01:00
2018-11-30 23:27:11 +01:00
def dump(self):
def rec(node, i=0):
result = []
if node._content is not None:
for c in node._content:
2019-01-02 00:48:36 +01:00
result.append(dict(topic=c.topic, supports_future=c.supports_future, method=c.method.__name__, path=c.method.__module__, once=c.once))
2018-11-29 21:59:08 +01:00
2018-11-30 23:27:11 +01:00
if node._children is not None:
for c in node._children:
result = result + rec(node._children[c], i + 1)
return result
2018-11-29 21:59:08 +01:00
2018-11-30 23:27:11 +01:00
result = rec(self._root)
2018-11-29 21:59:08 +01:00
2018-11-30 23:27:11 +01:00
return result
2018-11-01 19:50:04 +01:00
def iter_match(self, topic):
lst = topic.split('/')
normal = not topic.startswith('$')
def rec(node, i=0):
if i == len(lst):
if node._content is not None:
yield node._content
else:
part = lst[i]
if part in node._children:
for content in rec(node._children[part], i + 1):
yield content
if '+' in node._children and (normal or i > 0):
for content in rec(node._children['+'], i + 1):
yield content
if '#' in node._children and (normal or i > 0):
content = node._children['#']._content
if content is not None:
yield content
return rec(self._root)
2019-01-01 15:35:35 +01:00
def register_object(self, obj):
for method in [getattr(obj, f) for f in dir(obj) if callable(getattr(obj, f)) and hasattr(getattr(obj, f), "eventbus")]:
doc = None
if method.__doc__ is not None:
try:
doc = yaml.load(method.__doc__)
doc["topic"] = method.__getattribute__("topic")
except:
pass
self.register(method.__getattribute__("topic"), method)