diff --git a/cbpi/controller/satellite_controller.py b/cbpi/controller/satellite_controller.py index 7b8e545..d37ea2c 100644 --- a/cbpi/controller/satellite_controller.py +++ b/cbpi/controller/satellite_controller.py @@ -53,7 +53,7 @@ class SatelliteController: except asyncio.CancelledError: pass - self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"),client_id=self.client_id) + self.client = Client(self.host, port=self.port, username=self.username, password=self.password, will=Will(topic="cbpi/disconnect", payload="CBPi Server Disconnected"),identifier=self.client_id) self.loop = asyncio.get_event_loop() ## Listen for mqtt messages in an (unawaited) asyncio task task = self.loop.create_task(self.listen()) @@ -67,9 +67,8 @@ class SatelliteController: while True: try: async with self.client as client: - async with client.messages() as messages: await client.subscribe("#") - async for message in messages: + async for message in client.messages: for topic_filter in self.topic_filters: topic = topic_filter[0] method = topic_filter[1] @@ -167,11 +166,10 @@ class SatelliteController: while True: try: if self.client._connected.done(): - async with self.client.messages() as messages: - await self.client.subscribe(topic) - async for message in messages: - if message.topic.matches(topic): - await method(message.payload.decode()) + await self.client.subscribe(topic) + async for message in self.client.messages: + if message.topic.matches(topic): + await method(message.payload.decode()) except asyncio.CancelledError: # Cancel self.logger.warning("Subscription {} Cancelled".format(topic)) diff --git a/setup.py b/setup.py index 8def85b..cda918b 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ setup(name='cbpi4', 'click==8.1.7', 'shortuuid==1.0.11', 'tabulate==0.9.0', - 'aiomqtt==1.2.1', + 'aiomqtt==2.0.0', 'inquirer==3.1.3', 'colorama==0.4.6', 'psutil==5.9.6',