craftbeerpi4-pione/cbpi/extension/mqtt/__init__.py
2021-02-02 21:24:29 +01:00

42 lines
1.3 KiB
Python

import json
from cbpi.utils.encoder import ComplexEncoder
from hbmqtt.mqtt.constants import QOS_0
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1, QOS_2
from asyncio_mqtt import Client, MqttError, Will
import asyncio
class CBPiMqttClient:
def __init__(self, cbpi):
self.cbpi = cbpi
self.cbpi.bus.register("#", self.listen)
self.client = None
self._loop = asyncio.get_event_loop()
self._loop.create_task(self.init_client(self.cbpi))
async def init_client(self, cbpi):
async with Client("localhost", will=Will(topic="cbpi/diconnect", payload="MY CLIENT"))as client:
async with client.filtered_messages("cbpi/#") as messages:
await client.subscribe("cbpi/#")
async for message in messages:
await self.cbpi.actor.on("YwGzXvWMpmbLb6XobesL8n")
print(message.topic, message.payload.decode())
async def listen(self, topic, **kwargs):
if self.client is not None:
await self.client.publish(topic, str.encode(json.dumps(kwargs, cls=ComplexEncoder)), QOS_0)
def setup(cbpi):
'''
This method is called by the server during startup
Here you need to register your plugins at the server
:param cbpi: the cbpi core
:return:
'''
client = CBPiMqttClient(cbpi)