craftbeerpi4-pione/core/websocket.py

107 lines
2.9 KiB
Python
Raw Normal View History

2018-11-01 19:50:04 +01:00
import logging
import weakref
from collections import defaultdict
2018-12-03 22:16:03 +01:00
import json
2018-11-01 19:50:04 +01:00
import aiohttp
from aiohttp import web
from typing import Iterable, Callable
2018-12-29 00:27:19 +01:00
from cbpi_api import *
2018-12-03 22:16:03 +01:00
2018-11-01 19:50:04 +01:00
class WebSocket:
2018-11-01 21:25:42 +01:00
def __init__(self, cbpi) -> None:
self.cbpi = cbpi
2018-11-01 19:50:04 +01:00
self._callbacks = defaultdict(set)
self._clients = weakref.WeakSet()
self.logger = logging.getLogger(__name__)
2018-11-01 21:25:42 +01:00
self.cbpi.app.add_routes([web.get('/ws', self.websocket_handler)])
2018-11-01 19:50:04 +01:00
2018-12-03 22:16:03 +01:00
@on_event(topic="#")
async def listen(self, topic, **kwargs):
2018-12-13 21:45:33 +01:00
from core.utils.encoder import ComplexEncoder
2018-12-29 00:27:19 +01:00
data = json.dumps(dict(topic=topic, data=dict(**kwargs)),skipkeys=True, check_circular=True, cls=ComplexEncoder)
self.logger.info("PUSH %s " % data)
2018-12-03 22:16:03 +01:00
2018-12-29 00:27:19 +01:00
self.send(data)
2018-12-03 22:16:03 +01:00
2018-11-16 20:35:59 +01:00
def send(self, data):
for ws in self._clients:
async def send_data(ws, data):
await ws.send_str(data)
self.cbpi.app.loop.create_task(send_data(ws, data))
2018-11-01 19:50:04 +01:00
def add_callback(self, func: Callable, event: str) -> None:
self._callbacks[event].add(func)
async def emit(self, event: str, *args, **kwargs) -> None:
for func in self._event_funcs(event):
await func(*args, **kwargs)
def _event_funcs(self, event: str) -> Iterable[Callable]:
for func in self._callbacks[event]:
yield func
async def websocket_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
self._clients.add(ws)
c = len(self._clients) - 1
self.logger.info(ws)
self.logger.info(c)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
self.logger.info("WS Close")
else:
msg_obj = msg.json()
2018-12-13 21:45:33 +01:00
await self.cbpi.bus.fire(msg_obj["topic"], id=1, power=22)
2018-11-04 01:55:54 +01:00
# await self.fire(msg_obj["key"], ws, msg)
2018-11-01 19:50:04 +01:00
2018-11-04 01:55:54 +01:00
# await ws.send_str(msg.data)
2018-11-01 19:50:04 +01:00
elif msg.type == aiohttp.WSMsgType.ERROR:
self.logger.error('ws connection closed with exception %s' % ws.exception())
finally:
self._clients.discard(ws)
self.logger.info("Web Socket Close")
return ws
2018-11-16 20:35:59 +01:00
'''
2018-11-01 19:50:04 +01:00
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
_ws.append(ws)
c = len(_ws) - 1
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
del _ws[c]
print('websocket connection closed')
2018-11-04 01:55:54 +01:00
return ws
2018-11-16 20:35:59 +01:00
'''