diff --git a/cbpi/__init__.py b/cbpi/__init__.py index f419ca4..254756f 100644 --- a/cbpi/__init__.py +++ b/cbpi/__init__.py @@ -1,3 +1,3 @@ -__version__ = "4.1.0.a3" +__version__ = "4.1.0.a4" __codename__ = "Groundhog Day" diff --git a/cbpi/controller/satellite_controller.py b/cbpi/controller/satellite_controller.py index 73edc3d..e53a086 100644 --- a/cbpi/controller/satellite_controller.py +++ b/cbpi/controller/satellite_controller.py @@ -2,7 +2,7 @@ import asyncio import json from re import M -from asyncio_mqtt import Client, MqttError, Will, client +from asyncio_mqtt import Client, MqttError, Will from contextlib import AsyncExitStack, asynccontextmanager from cbpi import __version__ import logging @@ -34,7 +34,47 @@ class SatelliteController: self.tasks = set() async def init(self): - asyncio.create_task(self.init_client(self.cbpi)) + + #not sure if required like done in the old routine + async def cancel_tasks(tasks): + for task in tasks: + print("3232") + if task.done(): + continue + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected")) + self.loop = asyncio.get_event_loop() + ## Listen for mqtt messages in an (unawaited) asyncio task + task = self.loop.create_task(self.listen()) + ## Save a reference to the task so it doesn't get garbage collected + self.tasks.add(task) + task.add_done_callback(self.tasks.remove) + + self.logger.info("MQTT Connected to {}:{}".format(self.host, self.port)) + + async def listen(self): + while True: + try: + async with self.client as client: + async with client.messages() as messages: + await client.subscribe("#") + async for message in messages: + for topic_filter in self.topic_filters: + topic = topic_filter[0] + method = topic_filter[1] + if message.topic.matches(topic): + await (method(message)) + except MqttError as e: + self.logger.error("MQTT Exception: {}".format(e)) + except Exception as e: + self.logger.error("MQTT General Exception: {}".format(e)) + await asyncio.sleep(5) + async def publish(self, topic, message, retain=False): if self.client is not None and self.client._connected: @@ -43,26 +83,25 @@ class SatelliteController: except Exception as e: self.logger.warning("Failed to push data via mqtt: {}".format(e)) - async def _actor_on(self, messages): - async for message in messages: + async def _actor_on(self, message): try: - topic_key = message.topic.split("/") + topic_key = str(message.topic).split("/") await self.cbpi.actor.on(topic_key[2]) + self.logger.warning("Processed actor {} on via mqtt".format(topic_key[2])) except Exception as e: self.logger.warning("Failed to process actor on via mqtt: {}".format(e)) - async def _actor_off(self, messages): - async for message in messages: + async def _actor_off(self, message): try: - topic_key = message.topic.split("/") + topic_key = str(message.topic).split("/") await self.cbpi.actor.off(topic_key[2]) + self.logger.warning("Processed actor {} off via mqtt".format(topic_key[2])) except Exception as e: self.logger.warning("Failed to process actor off via mqtt: {}".format(e)) - async def _actor_power(self, messages): - async for message in messages: + async def _actor_power(self, message): try: - topic_key = message.topic.split("/") + topic_key = str(message.topic).split("/") try: power=int(message.payload.decode()) if power > 100: @@ -76,8 +115,7 @@ class SatelliteController: except: self.logger.warning("Failed to set actor power via mqtt") - async def _kettleupdate(self, messages): - async for message in messages: + async def _kettleupdate(self, message): try: self.kettle=self.kettlecontroller.get_state() for item in self.kettle['data']: @@ -85,8 +123,7 @@ class SatelliteController: except Exception as e: self.logger.warning("Failed to send kettleupdate via mqtt: {}".format(e)) - async def _fermenterupdate(self, messages): - async for message in messages: + async def _fermenterupdate(self, message): try: self.fermenter=self.fermentercontroller.get_state() for item in self.fermenter['data']: @@ -94,8 +131,7 @@ class SatelliteController: except Exception as e: self.logger.warning("Failed to send fermenterupdate via mqtt: {}".format(e)) - async def _actorupdate(self, messages): - async for message in messages: + async def _actorupdate(self, message): try: self.actor=self.actorcontroller.get_state() for item in self.actor['data']: @@ -103,8 +139,7 @@ class SatelliteController: except Exception as e: self.logger.warning("Failed to send actorupdate via mqtt: {}".format(e)) - async def _sensorupdate(self, messages): - async for message in messages: + async def _sensorupdate(self, message): try: self.sensor=self.sensorcontroller.get_state() for item in self.sensor['data']: @@ -135,51 +170,3 @@ class SatelliteController: # wait before try to resubscribe await asyncio.sleep(5) - - async def init_client(self, cbpi): - - async def cancel_tasks(tasks): - for task in tasks: - if task.done(): - continue - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - # This part needs to be updated in future as filtered_messages() is depracted and will be removed in future from asyncio-mqtt - while True: - - try: - async with AsyncExitStack() as stack: - self.tasks = set() - stack.push_async_callback(cancel_tasks, self.tasks) - self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected")) - - await stack.enter_async_context(self.client) - - for topic_filter in self.topic_filters: - topic = topic_filter[0] - logging.info("Topic: "+topic) - method = topic_filter[1] - logging.info("Method: "+str(method)) - manager = self.client.filtered_messages(topic) - logging.info("Manager: " +str(manager)) - messages = await stack.enter_async_context(manager) - logging.info("Messages: " +str(messages)) - task = asyncio.create_task(method(messages)) - self.tasks.add(task) - - for topic_filter in self.topic_filters: - topic = topic_filter[0] - await self.client.subscribe(topic) - - self.logger.info("MQTT Connected to {}:{}".format(self.host, self.port)) - await asyncio.gather(*self.tasks) - - except MqttError as e: - self.logger.error("MQTT Exception: {}".format(e)) - except Exception as e: - self.logger.error("MQTT General Exception: {}".format(e)) - await asyncio.sleep(5)