craftbeerpi4-pione/core/mqtt/mqtt.py
2018-11-04 00:47:26 +01:00

89 lines
2.5 KiB
Python

from aiojobs.aiohttp import get_scheduler_from_app
from core.mqtt_matcher import MQTTMatcher
from hbmqtt.broker import Broker
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1, QOS_0
from typing import Callable
class MQTT():
def __init__(self,cbpi):
self.config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1885',
},
'ws': {
'bind': '0.0.0.0:8081',
'type': 'ws'
}
},
'sys_interval': 10,
'topic-check': {
'enabled': True,
'plugins': [
'topic_taboo'
]
},
'auth': {
'allow-anonymous': True,
'password-file': '/Users/manuelfritsch/github/aio_sample.cbpi/user.txt'
}
}
self.cbpi = cbpi
self.broker = Broker(self.config, plugin_namespace="hbmqtt.broker.plugins")
self.client = MQTTClient()
self.matcher = MQTTMatcher()
self.mqtt_methods = {"test": self.ok_msg, "$SYS/broker/#": self.sysmsg}
self.cbpi.app.on_startup.append(self.start_broker)
self.count = 0
def sysmsg(self, msg):
pass
def ok_msg(self, msg):
self.count = self.count + 1
def publish(self, topic, message):
self.cbpi.app.loop.create_task(self.client.publish(topic, str.encode(message), QOS_0))
def register_callback(self, func: Callable, topic) -> None:
self.mqtt_methods[topic] = func
async def on_message(self):
while True:
message = await self.client.deliver_message()
matched = False
packet = message.publish_packet
#print(message.topic.split('/'))
data = packet.payload.data.decode("utf-8")
for callback in self.matcher.iter_match(message.topic):
callback(data)
matched = True
if matched == False:
print("NO HANDLER", data)
async def start_broker(self, app):
await self.broker.start()
#
await self.client.connect('mqtt://username:manuel@localhost:1885')
# await self.client.connect('mqtt://broker.hivemq.com:1883')
for k, v in self.mqtt_methods.items():
await self.client.subscribe([(k, QOS_1)])
self.matcher[k] = v
await get_scheduler_from_app(app).spawn(self.on_message())