2021-02-16 20:37:51 +01:00
import asyncio
2021-03-14 11:52:46 +01:00
import json
from re import M
2023-01-15 08:25:01 +01:00
from asyncio_mqtt import Client , MqttError , Will
2021-02-16 20:37:51 +01:00
from contextlib import AsyncExitStack , asynccontextmanager
2021-03-14 11:52:46 +01:00
from cbpi import __version__
import logging
2023-01-15 09:33:11 +01:00
import shortuuid
2021-03-14 11:52:46 +01:00
2021-02-16 20:37:51 +01:00
class SatelliteController :
def __init__ ( self , cbpi ) :
2023-01-15 09:33:11 +01:00
self . client_id = shortuuid . uuid ( )
2021-02-16 20:37:51 +01:00
self . cbpi = cbpi
2022-02-03 18:03:53 +01:00
self . kettlecontroller = cbpi . kettle
self . fermentercontroller = cbpi . fermenter
self . sensorcontroller = cbpi . sensor
self . actorcontroller = cbpi . actor
2021-03-14 11:52:46 +01:00
self . logger = logging . getLogger ( __name__ )
self . host = cbpi . static_config . get ( " mqtt_host " , " localhost " )
self . port = cbpi . static_config . get ( " mqtt_port " , 1883 )
self . username = cbpi . static_config . get ( " mqtt_username " , None )
self . password = cbpi . static_config . get ( " mqtt_password " , None )
2021-02-16 20:37:51 +01:00
self . client = None
2021-03-14 11:52:46 +01:00
self . topic_filters = [
( " cbpi/actor/+/on " , self . _actor_on ) ,
2021-11-22 16:09:09 +01:00
( " cbpi/actor/+/off " , self . _actor_off ) ,
( " cbpi/actor/+/power " , self . _actor_power ) ,
2022-02-03 18:03:53 +01:00
( " cbpi/updateactor " , self . _actorupdate ) ,
( " cbpi/updatekettle " , self . _kettleupdate ) ,
( " cbpi/updatesensor " , self . _sensorupdate ) ,
( " cbpi/updatefermenter " , self . _fermenterupdate ) ,
2021-03-14 11:52:46 +01:00
]
self . tasks = set ( )
2021-02-16 20:37:51 +01:00
async def init ( self ) :
2023-01-15 08:25:01 +01:00
#not sure if required like done in the old routine
async def cancel_tasks ( tasks ) :
for task in tasks :
if task . done ( ) :
continue
task . cancel ( )
try :
await task
except asyncio . CancelledError :
pass
2023-01-15 09:33:11 +01:00
self . client = Client ( self . host , port = self . port , username = self . username , password = self . password , will = Will ( topic = " cbpi/disconnect " , payload = " CBPi Server Disconnected " ) , client_id = self . client_id )
2023-01-15 08:25:01 +01:00
self . loop = asyncio . get_event_loop ( )
## Listen for mqtt messages in an (unawaited) asyncio task
task = self . loop . create_task ( self . listen ( ) )
## Save a reference to the task so it doesn't get garbage collected
self . tasks . add ( task )
task . add_done_callback ( self . tasks . remove )
self . logger . info ( " MQTT Connected to {} : {} " . format ( self . host , self . port ) )
async def listen ( self ) :
while True :
try :
async with self . client as client :
async with client . messages ( ) as messages :
await client . subscribe ( " # " )
async for message in messages :
for topic_filter in self . topic_filters :
topic = topic_filter [ 0 ]
method = topic_filter [ 1 ]
if message . topic . matches ( topic ) :
await ( method ( message ) )
except MqttError as e :
self . logger . error ( " MQTT Exception: {} " . format ( e ) )
except Exception as e :
self . logger . error ( " MQTT General Exception: {} " . format ( e ) )
await asyncio . sleep ( 5 )
2021-02-16 20:37:51 +01:00
2021-03-14 11:52:46 +01:00
async def publish ( self , topic , message , retain = False ) :
if self . client is not None and self . client . _connected :
try :
await self . client . publish ( topic , message , qos = 1 , retain = retain )
2021-12-03 11:56:28 +01:00
except Exception as e :
self . logger . warning ( " Failed to push data via mqtt: {} " . format ( e ) )
2021-03-14 11:52:46 +01:00
2023-01-15 08:25:01 +01:00
async def _actor_on ( self , message ) :
2021-03-14 11:52:46 +01:00
try :
2023-01-15 08:25:01 +01:00
topic_key = str ( message . topic ) . split ( " / " )
2021-03-14 11:52:46 +01:00
await self . cbpi . actor . on ( topic_key [ 2 ] )
2023-01-15 08:25:01 +01:00
self . logger . warning ( " Processed actor {} on via mqtt " . format ( topic_key [ 2 ] ) )
2021-12-03 11:56:28 +01:00
except Exception as e :
self . logger . warning ( " Failed to process actor on via mqtt: {} " . format ( e ) )
2021-02-16 20:37:51 +01:00
2023-01-15 08:25:01 +01:00
async def _actor_off ( self , message ) :
2021-03-14 11:52:46 +01:00
try :
2023-01-15 08:25:01 +01:00
topic_key = str ( message . topic ) . split ( " / " )
2021-03-14 11:52:46 +01:00
await self . cbpi . actor . off ( topic_key [ 2 ] )
2023-01-15 08:25:01 +01:00
self . logger . warning ( " Processed actor {} off via mqtt " . format ( topic_key [ 2 ] ) )
2021-12-03 11:56:28 +01:00
except Exception as e :
self . logger . warning ( " Failed to process actor off via mqtt: {} " . format ( e ) )
2021-11-22 16:09:09 +01:00
2023-01-15 08:25:01 +01:00
async def _actor_power ( self , message ) :
2021-11-22 16:09:09 +01:00
try :
2023-01-15 08:25:01 +01:00
topic_key = str ( message . topic ) . split ( " / " )
2021-11-22 16:09:09 +01:00
try :
power = int ( message . payload . decode ( ) )
if power > 100 :
power = 100
if power < 0 :
power = 0
await self . cbpi . actor . set_power ( topic_key [ 2 ] , power )
2022-02-03 17:00:12 +01:00
#await self.cbpi.actor.actor_update(topic_key[2],power)
2021-11-22 16:09:09 +01:00
except :
self . logger . warning ( " Failed to set actor power via mqtt. No valid power in message " )
except :
self . logger . warning ( " Failed to set actor power via mqtt " )
2021-03-14 11:52:46 +01:00
2023-01-15 08:25:01 +01:00
async def _kettleupdate ( self , message ) :
2022-02-03 18:03:53 +01:00
try :
self . kettle = self . kettlecontroller . get_state ( )
for item in self . kettle [ ' data ' ] :
self . cbpi . push_update ( " cbpi/ {} / {} " . format ( " kettleupdate " , item [ ' id ' ] ) , item )
except Exception as e :
self . logger . warning ( " Failed to send kettleupdate via mqtt: {} " . format ( e ) )
2023-01-15 08:25:01 +01:00
async def _fermenterupdate ( self , message ) :
2022-02-03 18:03:53 +01:00
try :
self . fermenter = self . fermentercontroller . get_state ( )
for item in self . fermenter [ ' data ' ] :
self . cbpi . push_update ( " cbpi/ {} / {} " . format ( " fermenterupdate " , item [ ' id ' ] ) , item )
except Exception as e :
self . logger . warning ( " Failed to send fermenterupdate via mqtt: {} " . format ( e ) )
2023-01-15 08:25:01 +01:00
async def _actorupdate ( self , message ) :
2022-02-03 18:03:53 +01:00
try :
self . actor = self . actorcontroller . get_state ( )
for item in self . actor [ ' data ' ] :
self . cbpi . push_update ( " cbpi/ {} / {} " . format ( " actorupdate " , item [ ' id ' ] ) , item )
except Exception as e :
self . logger . warning ( " Failed to send actorupdate via mqtt: {} " . format ( e ) )
2023-01-15 08:25:01 +01:00
async def _sensorupdate ( self , message ) :
2022-02-03 18:03:53 +01:00
try :
self . sensor = self . sensorcontroller . get_state ( )
for item in self . sensor [ ' data ' ] :
self . cbpi . push_update ( " cbpi/ {} / {} " . format ( " sensorupdate " , item [ ' id ' ] ) , item )
except Exception as e :
self . logger . warning ( " Failed to send sensorupdate via mqtt: {} " . format ( e ) )
2021-03-14 11:52:46 +01:00
def subcribe ( self , topic , method ) :
task = asyncio . create_task ( self . _subcribe ( topic , method ) )
return task
async def _subcribe ( self , topic , method ) :
2023-03-25 19:15:26 +01:00
self . error = False
2021-03-14 11:52:46 +01:00
while True :
try :
if self . client . _connected . done ( ) :
2023-01-08 12:05:17 +01:00
async with self . client . messages ( ) as messages :
2021-03-14 11:52:46 +01:00
await self . client . subscribe ( topic )
async for message in messages :
2023-01-08 12:05:17 +01:00
if message . topic . matches ( topic ) :
await method ( message . payload . decode ( ) )
2021-11-23 17:33:58 +01:00
except asyncio . CancelledError :
2021-03-14 11:52:46 +01:00
# Cancel
2021-11-23 17:33:58 +01:00
self . logger . warning ( " Sub Cancelled " )
2023-03-25 19:15:26 +01:00
self . error = True
2021-03-14 11:52:46 +01:00
except MqttError as e :
self . logger . error ( " Sub MQTT Exception: {} " . format ( e ) )
except Exception as e :
self . logger . error ( " Sub Exception: {} " . format ( e ) )
# wait before try to resubscribe
2023-03-25 19:15:26 +01:00
if self . error == True :
break
else :
await asyncio . sleep ( 5 )