mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-11-09 17:07:43 +01:00
adapted sattelite_controller init to upcomming asyncio-mqtt requirements
This commit is contained in:
parent
f5d9d4304a
commit
7f76645b05
2 changed files with 55 additions and 68 deletions
|
@ -1,3 +1,3 @@
|
||||||
__version__ = "4.1.0.a3"
|
__version__ = "4.1.0.a4"
|
||||||
__codename__ = "Groundhog Day"
|
__codename__ = "Groundhog Day"
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from re import M
|
from re import M
|
||||||
from asyncio_mqtt import Client, MqttError, Will, client
|
from asyncio_mqtt import Client, MqttError, Will
|
||||||
from contextlib import AsyncExitStack, asynccontextmanager
|
from contextlib import AsyncExitStack, asynccontextmanager
|
||||||
from cbpi import __version__
|
from cbpi import __version__
|
||||||
import logging
|
import logging
|
||||||
|
@ -34,7 +34,47 @@ class SatelliteController:
|
||||||
self.tasks = set()
|
self.tasks = set()
|
||||||
|
|
||||||
async def init(self):
|
async def init(self):
|
||||||
asyncio.create_task(self.init_client(self.cbpi))
|
|
||||||
|
#not sure if required like done in the old routine
|
||||||
|
async def cancel_tasks(tasks):
|
||||||
|
for task in tasks:
|
||||||
|
print("3232")
|
||||||
|
if task.done():
|
||||||
|
continue
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"))
|
||||||
|
self.loop = asyncio.get_event_loop()
|
||||||
|
## Listen for mqtt messages in an (unawaited) asyncio task
|
||||||
|
task = self.loop.create_task(self.listen())
|
||||||
|
## Save a reference to the task so it doesn't get garbage collected
|
||||||
|
self.tasks.add(task)
|
||||||
|
task.add_done_callback(self.tasks.remove)
|
||||||
|
|
||||||
|
self.logger.info("MQTT Connected to {}:{}".format(self.host, self.port))
|
||||||
|
|
||||||
|
async def listen(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with self.client as client:
|
||||||
|
async with client.messages() as messages:
|
||||||
|
await client.subscribe("#")
|
||||||
|
async for message in messages:
|
||||||
|
for topic_filter in self.topic_filters:
|
||||||
|
topic = topic_filter[0]
|
||||||
|
method = topic_filter[1]
|
||||||
|
if message.topic.matches(topic):
|
||||||
|
await (method(message))
|
||||||
|
except MqttError as e:
|
||||||
|
self.logger.error("MQTT Exception: {}".format(e))
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error("MQTT General Exception: {}".format(e))
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
async def publish(self, topic, message, retain=False):
|
async def publish(self, topic, message, retain=False):
|
||||||
if self.client is not None and self.client._connected:
|
if self.client is not None and self.client._connected:
|
||||||
|
@ -43,26 +83,25 @@ class SatelliteController:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to push data via mqtt: {}".format(e))
|
self.logger.warning("Failed to push data via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _actor_on(self, messages):
|
async def _actor_on(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
topic_key = message.topic.split("/")
|
topic_key = str(message.topic).split("/")
|
||||||
await self.cbpi.actor.on(topic_key[2])
|
await self.cbpi.actor.on(topic_key[2])
|
||||||
|
self.logger.warning("Processed actor {} on via mqtt".format(topic_key[2]))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to process actor on via mqtt: {}".format(e))
|
self.logger.warning("Failed to process actor on via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _actor_off(self, messages):
|
async def _actor_off(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
topic_key = message.topic.split("/")
|
topic_key = str(message.topic).split("/")
|
||||||
await self.cbpi.actor.off(topic_key[2])
|
await self.cbpi.actor.off(topic_key[2])
|
||||||
|
self.logger.warning("Processed actor {} off via mqtt".format(topic_key[2]))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to process actor off via mqtt: {}".format(e))
|
self.logger.warning("Failed to process actor off via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _actor_power(self, messages):
|
async def _actor_power(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
topic_key = message.topic.split("/")
|
topic_key = str(message.topic).split("/")
|
||||||
try:
|
try:
|
||||||
power=int(message.payload.decode())
|
power=int(message.payload.decode())
|
||||||
if power > 100:
|
if power > 100:
|
||||||
|
@ -76,8 +115,7 @@ class SatelliteController:
|
||||||
except:
|
except:
|
||||||
self.logger.warning("Failed to set actor power via mqtt")
|
self.logger.warning("Failed to set actor power via mqtt")
|
||||||
|
|
||||||
async def _kettleupdate(self, messages):
|
async def _kettleupdate(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
self.kettle=self.kettlecontroller.get_state()
|
self.kettle=self.kettlecontroller.get_state()
|
||||||
for item in self.kettle['data']:
|
for item in self.kettle['data']:
|
||||||
|
@ -85,8 +123,7 @@ class SatelliteController:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to send kettleupdate via mqtt: {}".format(e))
|
self.logger.warning("Failed to send kettleupdate via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _fermenterupdate(self, messages):
|
async def _fermenterupdate(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
self.fermenter=self.fermentercontroller.get_state()
|
self.fermenter=self.fermentercontroller.get_state()
|
||||||
for item in self.fermenter['data']:
|
for item in self.fermenter['data']:
|
||||||
|
@ -94,8 +131,7 @@ class SatelliteController:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to send fermenterupdate via mqtt: {}".format(e))
|
self.logger.warning("Failed to send fermenterupdate via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _actorupdate(self, messages):
|
async def _actorupdate(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
self.actor=self.actorcontroller.get_state()
|
self.actor=self.actorcontroller.get_state()
|
||||||
for item in self.actor['data']:
|
for item in self.actor['data']:
|
||||||
|
@ -103,8 +139,7 @@ class SatelliteController:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning("Failed to send actorupdate via mqtt: {}".format(e))
|
self.logger.warning("Failed to send actorupdate via mqtt: {}".format(e))
|
||||||
|
|
||||||
async def _sensorupdate(self, messages):
|
async def _sensorupdate(self, message):
|
||||||
async for message in messages:
|
|
||||||
try:
|
try:
|
||||||
self.sensor=self.sensorcontroller.get_state()
|
self.sensor=self.sensorcontroller.get_state()
|
||||||
for item in self.sensor['data']:
|
for item in self.sensor['data']:
|
||||||
|
@ -135,51 +170,3 @@ class SatelliteController:
|
||||||
|
|
||||||
# wait before try to resubscribe
|
# wait before try to resubscribe
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
async def init_client(self, cbpi):
|
|
||||||
|
|
||||||
async def cancel_tasks(tasks):
|
|
||||||
for task in tasks:
|
|
||||||
if task.done():
|
|
||||||
continue
|
|
||||||
task.cancel()
|
|
||||||
try:
|
|
||||||
await task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# This part needs to be updated in future as filtered_messages() is depracted and will be removed in future from asyncio-mqtt
|
|
||||||
while True:
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncExitStack() as stack:
|
|
||||||
self.tasks = set()
|
|
||||||
stack.push_async_callback(cancel_tasks, self.tasks)
|
|
||||||
self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"))
|
|
||||||
|
|
||||||
await stack.enter_async_context(self.client)
|
|
||||||
|
|
||||||
for topic_filter in self.topic_filters:
|
|
||||||
topic = topic_filter[0]
|
|
||||||
logging.info("Topic: "+topic)
|
|
||||||
method = topic_filter[1]
|
|
||||||
logging.info("Method: "+str(method))
|
|
||||||
manager = self.client.filtered_messages(topic)
|
|
||||||
logging.info("Manager: " +str(manager))
|
|
||||||
messages = await stack.enter_async_context(manager)
|
|
||||||
logging.info("Messages: " +str(messages))
|
|
||||||
task = asyncio.create_task(method(messages))
|
|
||||||
self.tasks.add(task)
|
|
||||||
|
|
||||||
for topic_filter in self.topic_filters:
|
|
||||||
topic = topic_filter[0]
|
|
||||||
await self.client.subscribe(topic)
|
|
||||||
|
|
||||||
self.logger.info("MQTT Connected to {}:{}".format(self.host, self.port))
|
|
||||||
await asyncio.gather(*self.tasks)
|
|
||||||
|
|
||||||
except MqttError as e:
|
|
||||||
self.logger.error("MQTT Exception: {}".format(e))
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error("MQTT General Exception: {}".format(e))
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
|
|
Loading…
Reference in a new issue