craftbeerpi4-pione/core/mqtt/mqtt.py

91 lines
2.7 KiB
Python
Raw Normal View History

2018-11-01 19:50:04 +01:00
from aiojobs.aiohttp import get_scheduler_from_app
from hbmqtt.broker import Broker
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1, QOS_0
from typing import Callable
from core.mqtt_matcher import MQTTMatcher
class MQTT():
2018-11-01 21:25:42 +01:00
def __init__(self,cbpi):
2018-11-01 19:50:04 +01:00
self.config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1885',
},
'ws': {
'bind': '0.0.0.0:8081',
'type': 'ws'
}
},
'sys_interval': 10,
'topic-check': {
'enabled': True,
'plugins': [
'topic_taboo'
]
},
'auth': {
'allow-anonymous': True,
2018-11-01 21:25:42 +01:00
'password-file': '/Users/manuelfritsch/github/aio_sample.cbpi/user.txt'
2018-11-01 19:50:04 +01:00
}
}
2018-11-01 21:25:42 +01:00
self.cbpi = cbpi
2018-11-01 19:50:04 +01:00
self.broker = Broker(self.config, plugin_namespace="hbmqtt.broker.plugins")
self.client = MQTTClient()
self.matcher = MQTTMatcher()
self.mqtt_methods = {"test": self.ok_msg, "$SYS/broker/#": self.sysmsg}
2018-11-01 21:25:42 +01:00
self.cbpi.app.on_startup.append(self.start_broker)
2018-11-01 19:50:04 +01:00
self.count = 0
def sysmsg(self, msg):
print("SYS", msg)
def ok_msg(self, msg):
self.count = self.count + 1
print("MSFG", msg, self.count)
def publish(self, topic, message):
print("PUSH NOW", topic)
2018-11-01 21:25:42 +01:00
self.cbpi.app.loop.create_task(self.client.publish(topic, str.encode(message), QOS_0))
2018-11-01 19:50:04 +01:00
def register_callback(self, func: Callable, topic) -> None:
self.mqtt_methods[topic] = func
async def on_message(self):
while True:
message = await self.client.deliver_message()
matched = False
packet = message.publish_packet
print(message.topic)
#print(message.topic.split('/'))
data = packet.payload.data.decode("utf-8")
for callback in self.matcher.iter_match(message.topic):
print("MATCH")
callback(data)
matched = True
if matched == False:
print("NO HANDLER", data)
async def start_broker(self, app):
await self.broker.start()
#
await self.client.connect('mqtt://username:manuel@localhost:1885')
# await self.client.connect('mqtt://broker.hivemq.com:1883')
for k, v in self.mqtt_methods.items():
print("############MQTT Subscribe:", k, v)
await self.client.subscribe([(k, QOS_1)])
self.matcher[k] = v
await get_scheduler_from_app(app).spawn(self.on_message())