just add topic filters for mqttsensor instead of crating tasks to accommodate new aiomqtt single message queue

This commit is contained in:
avollkopf 2024-02-27 07:20:10 +01:00
parent f659eea65b
commit 1d1a7e8cfb
3 changed files with 12 additions and 40 deletions

View file

@ -1,3 +1,3 @@
__version__ = "4.3.2.a10" __version__ = "4.4.0.a1"
__codename__ = "Winter Storm" __codename__ = "Winter Storm"

View file

@ -157,29 +157,10 @@ class SatelliteController:
except Exception as e: except Exception as e:
self.logger.warning("Failed to send sensorupdate via mqtt: {}".format(e)) self.logger.warning("Failed to send sensorupdate via mqtt: {}".format(e))
def subcribe(self, topic, method): def subscribe(self, topic, method):
task = asyncio.create_task(self._subcribe(topic, method)) self.topic_filters.append((topic,method))
return task return True
async def _subcribe(self, topic, method): def unsubscribe(self, topic, method):
self.error=False self.topic_filters.remove((topic,method))
while True: return True
try:
if self.client._connected.done():
await self.client.subscribe(topic)
async for message in self.client.messages:
if message.topic.matches(topic):
await method(message.payload.decode())
except asyncio.CancelledError:
# Cancel
self.logger.warning("Subscription {} Cancelled".format(topic))
self.error=True
except MqttError as e:
self.logger.error("Sub MQTT Exception: {}".format(e))
except Exception as e:
self.logger.error("Sub Exception: {}".format(e))
# wait before try to resubscribe
if self.error == True:
break
else:
await asyncio.sleep(5)

View file

@ -24,7 +24,7 @@ class MQTTSensor(CBPiSensor):
self.payload_text = self.props.get("PayloadDictionary", None) self.payload_text = self.props.get("PayloadDictionary", None)
if self.payload_text != None: if self.payload_text != None:
self.payload_text = self.payload_text.split('.') self.payload_text = self.payload_text.split('.')
self.mqtt_task = self.cbpi.satellite.subcribe(self.Topic, self.on_message) self.subscribed = self.cbpi.satellite.subscribe(self.Topic, self.on_message)
self.value: float = 999 self.value: float = 999
self.timeout=int(self.props.get("Timeout", 60)) self.timeout=int(self.props.get("Timeout", 60))
self.starttime = time.time() self.starttime = time.time()
@ -55,7 +55,7 @@ class MQTTSensor(CBPiSensor):
pass pass
async def on_message(self, message): async def on_message(self, message):
val = json.loads(message) val = json.loads(message.payload.decode())
try: try:
if self.payload_text is not None: if self.payload_text is not None:
for key in self.payload_text: for key in self.payload_text:
@ -130,17 +130,8 @@ class MQTTSensor(CBPiSensor):
return dict(value=self.value) return dict(value=self.value)
async def on_stop(self): async def on_stop(self):
was_cancelled=False self.subscribed = self.cbpi.satellite.unsubscribe(self.Topic, self.on_message)
if not self.mqtt_task.done():
logging.info("Task not done -> cancelling")
was_cancelled = self.mqtt_task.cancel()
try:
logging.info("Trying to call cancelled task")
await self.mqtt_task
except asyncio.CancelledError:
logging.info("Task has been Cancelled")
pass
logging.info("Task cancelled: {}".format(was_cancelled))
def setup(cbpi): def setup(cbpi):
''' '''