diff --git a/cbpi/__init__.py b/cbpi/__init__.py index ae93b5b..87f292b 100644 --- a/cbpi/__init__.py +++ b/cbpi/__init__.py @@ -1,3 +1,3 @@ -__version__ = "4.3.3.a1" +__version__ = "4.4.0.a1" __codename__ = "Winter Storm" diff --git a/cbpi/controller/satellite_controller.py b/cbpi/controller/satellite_controller.py index 7b8e545..19f2dc5 100644 --- a/cbpi/controller/satellite_controller.py +++ b/cbpi/controller/satellite_controller.py @@ -53,7 +53,7 @@ class SatelliteController: except asyncio.CancelledError: pass - self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"),client_id=self.client_id) + self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"),identifier=self.client_id) self.loop = asyncio.get_event_loop() ## Listen for mqtt messages in an (unawaited) asyncio task task = self.loop.create_task(self.listen()) @@ -67,9 +67,8 @@ class SatelliteController: while True: try: async with self.client as client: - async with client.messages() as messages: await client.subscribe("#") - async for message in messages: + async for message in client.messages: for topic_filter in self.topic_filters: topic = topic_filter[0] method = topic_filter[1] @@ -158,30 +157,10 @@ class SatelliteController: except Exception as e: self.logger.warning("Failed to send sensorupdate via mqtt: {}".format(e)) - def subcribe(self, topic, method): - task = asyncio.create_task(self._subcribe(topic, method)) - return task - - async def _subcribe(self, topic, method): - self.error=False - while True: - try: - if self.client._connected.done(): - async with self.client.messages() as messages: - await self.client.subscribe(topic) - async for message in messages: - if message.topic.matches(topic): - await method(message.payload.decode()) - except asyncio.CancelledError: - # Cancel - self.logger.warning("Subscription {} Cancelled".format(topic)) - self.error=True - except MqttError as e: - self.logger.error("Sub MQTT Exception: {}".format(e)) - except Exception as e: - self.logger.error("Sub Exception: {}".format(e)) - # wait before try to resubscribe - if self.error == True: - break - else: - await asyncio.sleep(5) + def subscribe(self, topic, method): + self.topic_filters.append((topic,method)) + return True + + def unsubscribe(self, topic, method): + self.topic_filters.remove((topic,method)) + return True diff --git a/cbpi/extension/mqtt_sensor/__init__.py b/cbpi/extension/mqtt_sensor/__init__.py index 47114cf..c5b0c0b 100644 --- a/cbpi/extension/mqtt_sensor/__init__.py +++ b/cbpi/extension/mqtt_sensor/__init__.py @@ -24,7 +24,7 @@ class MQTTSensor(CBPiSensor): self.payload_text = self.props.get("PayloadDictionary", None) if self.payload_text != None: self.payload_text = self.payload_text.split('.') - self.mqtt_task = self.cbpi.satellite.subcribe(self.Topic, self.on_message) + self.subscribed = self.cbpi.satellite.subscribe(self.Topic, self.on_message) self.value: float = 999 self.timeout=int(self.props.get("Timeout", 60)) self.starttime = time.time() @@ -55,7 +55,7 @@ class MQTTSensor(CBPiSensor): pass async def on_message(self, message): - val = json.loads(message) + val = json.loads(message.payload.decode()) try: if self.payload_text is not None: for key in self.payload_text: @@ -130,17 +130,8 @@ class MQTTSensor(CBPiSensor): return dict(value=self.value) async def on_stop(self): - was_cancelled=False - if not self.mqtt_task.done(): - logging.info("Task not done -> cancelling") - was_cancelled = self.mqtt_task.cancel() - try: - logging.info("Trying to call cancelled task") - await self.mqtt_task - except asyncio.CancelledError: - logging.info("Task has been Cancelled") - pass - logging.info("Task cancelled: {}".format(was_cancelled)) + self.subscribed = self.cbpi.satellite.unsubscribe(self.Topic, self.on_message) + def setup(cbpi): ''' diff --git a/setup.py b/setup.py index 6b7b604..8cd0905 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ setup(name='cbpi4', 'click==8.1.7', 'shortuuid==1.0.11', 'tabulate==0.9.0', - 'aiomqtt==1.2.1', + 'aiomqtt==2.0.0', 'inquirer==3.1.3', 'colorama==0.4.6', 'psutil==5.9.6',