diff --git a/cbpi/__init__.py b/cbpi/__init__.py index 892692a..60d9b8e 100644 --- a/cbpi/__init__.py +++ b/cbpi/__init__.py @@ -1,3 +1,3 @@ -__version__ = "4.1.0.a1" -__codename__ = "November Rain" +__version__ = "4.1.0.a2" +__codename__ = "Groundhog Day" diff --git a/cbpi/controller/satellite_controller.py b/cbpi/controller/satellite_controller.py index 230193e..73edc3d 100644 --- a/cbpi/controller/satellite_controller.py +++ b/cbpi/controller/satellite_controller.py @@ -120,10 +120,11 @@ class SatelliteController: while True: try: if self.client._connected.done(): - async with self.client.filtered_messages(topic) as messages: + async with self.client.messages() as messages: await self.client.subscribe(topic) async for message in messages: - await method(message.payload.decode()) + if message.topic.matches(topic): + await method(message.payload.decode()) except asyncio.CancelledError: # Cancel self.logger.warning("Sub Cancelled") @@ -147,7 +148,9 @@ class SatelliteController: except asyncio.CancelledError: pass + # This part needs to be updated in future as filtered_messages() is depracted and will be removed in future from asyncio-mqtt while True: + try: async with AsyncExitStack() as stack: self.tasks = set() @@ -158,9 +161,13 @@ class SatelliteController: for topic_filter in self.topic_filters: topic = topic_filter[0] + logging.info("Topic: "+topic) method = topic_filter[1] + logging.info("Method: "+str(method)) manager = self.client.filtered_messages(topic) + logging.info("Manager: " +str(manager)) messages = await stack.enter_async_context(manager) + logging.info("Messages: " +str(messages)) task = asyncio.create_task(method(messages)) self.tasks.add(task) diff --git a/setup.py b/setup.py index 5ae9677..9b97dc3 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ setup(name='cbpi4', 'click==8.1.3', 'shortuuid==1.0.11', 'tabulate==0.9.0', - 'asyncio-mqtt', + 'asyncio-mqtt==0.16.1', 'inquirer==3.1.1', 'colorama==0.4.6', 'psutil==5.9.4',