mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-11-14 02:58:16 +01:00
97 lines
3.1 KiB
Python
97 lines
3.1 KiB
Python
import logging
|
|
import weakref
|
|
from collections import defaultdict
|
|
|
|
import aiohttp
|
|
from aiohttp import web
|
|
from voluptuous import Schema
|
|
|
|
from cbpi.utils import json_dumps
|
|
|
|
|
|
class CBPiWebSocket:
|
|
def __init__(self, cbpi) -> None:
|
|
self.cbpi = cbpi
|
|
self._callbacks = defaultdict(set)
|
|
self._clients = weakref.WeakSet()
|
|
self.logger = logging.getLogger(__name__)
|
|
self.cbpi.app.add_routes([web.get('/ws', self.websocket_handler)])
|
|
self.cbpi.bus.register_object(self)
|
|
|
|
#if self.cbpi.config.static.get("ws_push_all", False):
|
|
self.cbpi.bus.register("#", self.listen)
|
|
|
|
|
|
async def listen(self, topic, **kwargs):
|
|
data = dict(topic=topic, data=dict(**kwargs))
|
|
self.logger.debug("PUSH %s " % data)
|
|
self.send(data)
|
|
|
|
|
|
def send(self, data, sorting=False):
|
|
self.logger.debug("broadcast to ws clients. Data: %s" % data)
|
|
for ws in self._clients:
|
|
async def send_data(ws, data):
|
|
try:
|
|
if sorting:
|
|
try:
|
|
data['data'].sort(key=lambda x: x.get('name').upper())
|
|
except:
|
|
pass
|
|
await ws.send_json(data=data, dumps=json_dumps)
|
|
except Exception as e:
|
|
self.logger.error("Error with client %s: %s" % (ws, str(e)))
|
|
|
|
self.cbpi.app.loop.create_task(send_data(ws, data))
|
|
|
|
async def websocket_handler(self, request):
|
|
|
|
|
|
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
self._clients.add(ws)
|
|
try:
|
|
peername = request.transport.get_extra_info('peername')
|
|
if peername is not None:
|
|
|
|
host = peername[0]
|
|
port = peername[1]
|
|
else:
|
|
host, port = "Unknowen"
|
|
self.logger.info("Client Connected - Host: %s Port: %s - client count: %s " % (host, port, len(self._clients)))
|
|
except Exception as e:
|
|
pass
|
|
|
|
|
|
try:
|
|
await ws.send_json(data=dict(topic="connection/success"))
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
|
|
msg_obj = msg.json()
|
|
schema = Schema({"topic": str, "data": dict})
|
|
schema(msg_obj)
|
|
|
|
topic = msg_obj.get("topic")
|
|
data = msg_obj.get("data")
|
|
if topic == "close":
|
|
await ws.close()
|
|
else:
|
|
if data is not None:
|
|
await self.cbpi.bus.fire(topic=topic, **data)
|
|
else:
|
|
await self.cbpi.bus.fire(topic=topic)
|
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
|
self.logger.error('ws connection closed with exception %s' % ws.exception())
|
|
|
|
except Exception as e:
|
|
self.logger.error("%s - Received Data %s" % (str(e), msg.data))
|
|
|
|
finally:
|
|
self._clients.discard(ws)
|
|
|
|
self.logger.info("Web Socket Close")
|
|
|
|
return ws
|
|
|