mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-11-10 01:17:42 +01:00
89 lines
2.5 KiB
Python
89 lines
2.5 KiB
Python
from aiojobs.aiohttp import get_scheduler_from_app
|
|
from core.mqtt_matcher import MQTTMatcher
|
|
from hbmqtt.broker import Broker
|
|
from hbmqtt.client import MQTTClient
|
|
from hbmqtt.mqtt.constants import QOS_1, QOS_0
|
|
from typing import Callable
|
|
|
|
|
|
class MQTT():
|
|
def __init__(self,cbpi):
|
|
|
|
self.config = {
|
|
'listeners': {
|
|
'default': {
|
|
'type': 'tcp',
|
|
'bind': '0.0.0.0:1885',
|
|
},
|
|
'ws': {
|
|
'bind': '0.0.0.0:8081',
|
|
'type': 'ws'
|
|
}
|
|
},
|
|
'sys_interval': 10,
|
|
'topic-check': {
|
|
'enabled': True,
|
|
'plugins': [
|
|
'topic_taboo'
|
|
]
|
|
},
|
|
'auth': {
|
|
'allow-anonymous': True,
|
|
'password-file': '/Users/manuelfritsch/github/aio_sample.cbpi/user.txt'
|
|
}
|
|
}
|
|
|
|
self.cbpi = cbpi
|
|
self.broker = Broker(self.config, plugin_namespace="hbmqtt.broker.plugins")
|
|
self.client = MQTTClient()
|
|
self.matcher = MQTTMatcher()
|
|
self.mqtt_methods = {"test": self.ok_msg, "$SYS/broker/#": self.sysmsg}
|
|
self.cbpi.app.on_startup.append(self.start_broker)
|
|
self.count = 0
|
|
|
|
def sysmsg(self, msg):
|
|
|
|
pass
|
|
|
|
def ok_msg(self, msg):
|
|
self.count = self.count + 1
|
|
|
|
|
|
def publish(self, topic, message):
|
|
|
|
self.cbpi.app.loop.create_task(self.client.publish(topic, str.encode(message), QOS_0))
|
|
|
|
def register_callback(self, func: Callable, topic) -> None:
|
|
|
|
self.mqtt_methods[topic] = func
|
|
|
|
async def on_message(self):
|
|
while True:
|
|
|
|
message = await self.client.deliver_message()
|
|
matched = False
|
|
packet = message.publish_packet
|
|
|
|
#print(message.topic.split('/'))
|
|
data = packet.payload.data.decode("utf-8")
|
|
|
|
for callback in self.matcher.iter_match(message.topic):
|
|
|
|
callback(data)
|
|
matched = True
|
|
|
|
if matched == False:
|
|
print("NO HANDLER", data)
|
|
|
|
async def start_broker(self, app):
|
|
|
|
await self.broker.start()
|
|
#
|
|
await self.client.connect('mqtt://username:manuel@localhost:1885')
|
|
# await self.client.connect('mqtt://broker.hivemq.com:1883')
|
|
|
|
for k, v in self.mqtt_methods.items():
|
|
|
|
await self.client.subscribe([(k, QOS_1)])
|
|
self.matcher[k] = v
|
|
await get_scheduler_from_app(app).spawn(self.on_message())
|