2021-03-14 11:52:46 +01:00
# -*- coding: utf-8 -*-
import asyncio
2023-01-22 19:53:54 +01:00
from cbpi . api . dataclasses import NotificationAction , NotificationType
2021-03-18 19:27:03 +01:00
from cbpi . api import parameters , Property , CBPiSensor
2021-11-22 17:33:46 +01:00
from cbpi . api import *
import logging
import json
2023-01-22 19:53:54 +01:00
import time
2023-01-23 07:15:29 +01:00
from datetime import datetime
2021-03-14 11:52:46 +01:00
2021-11-22 17:33:46 +01:00
@parameters ( [ Property . Text ( label = " Topic " , configurable = True , description = " MQTT Topic " ) ,
Property . Text ( label = " PayloadDictionary " , configurable = True , default_value = " " ,
2023-01-22 19:53:54 +01:00
description = " Where to find msg in payload, leave blank for raw payload " ) ,
2023-03-25 10:56:42 +01:00
Property . Kettle ( label = " Kettle " , description = " Reduced logging if Kettle is inactive (only Kettle or Fermenter to be selected) " ) ,
Property . Fermenter ( label = " Fermenter " , description = " Reduced logging in seconds if Fermenter is inactive (only Kettle or Fermenter to be selected) " ) ,
Property . Number ( label = " ReducedLogging " , configurable = True , description = " Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default is 60 sec) " ) ,
Property . Number ( label = " Timeout " , configurable = True , unit = " sec " ,
2023-01-22 19:53:54 +01:00
description = " Timeout in seconds to send notification (default:60 | deactivated: 0) " ) ] )
2021-03-14 11:52:46 +01:00
class MQTTSensor ( CBPiSensor ) :
2021-03-18 19:27:03 +01:00
2021-03-14 11:52:46 +01:00
def __init__ ( self , cbpi , id , props ) :
super ( MQTTSensor , self ) . __init__ ( cbpi , id , props )
2021-11-22 17:33:46 +01:00
self . Topic = self . props . get ( " Topic " , None )
self . payload_text = self . props . get ( " PayloadDictionary " , None )
if self . payload_text != None :
self . payload_text = self . payload_text . split ( ' . ' )
self . mqtt_task = self . cbpi . satellite . subcribe ( self . Topic , self . on_message )
2022-03-07 15:03:41 +01:00
self . value : float = 999
2023-01-22 19:53:54 +01:00
self . timeout = int ( self . props . get ( " Timeout " , 60 ) )
self . starttime = time . time ( )
self . notificationsend = False
self . nextchecktime = self . starttime + self . timeout
2023-01-23 07:15:29 +01:00
self . lastdata = time . time ( )
2023-03-25 10:56:42 +01:00
self . lastlog = 0
2023-01-23 07:15:29 +01:00
self . sensor = self . get_sensor ( self . id )
2023-03-25 10:56:42 +01:00
self . reducedfrequency = int ( self . props . get ( " ReducedLogging " , 60 ) )
self . kettleid = self . props . get ( " Kettle " , None )
self . reducedlogging = True
self . fermenterid = self . props . get ( " Fermenter " , None )
if self . kettleid is not None and self . fermenterid is not None :
self . reducedlogging = False
self . cbpi . notify ( " MQTTSensor " , " Sensor ' " + str ( self . sensor . name ) + " ' cant ' t have Fermenter and Kettle defined for reduced logging. " , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
self . kettle = self . get_kettle ( self . kettleid ) if self . kettleid is not None else None
self . fermenter = self . get_fermenter ( self . fermenterid ) if self . fermenterid is not None else None
2023-01-22 19:53:54 +01:00
async def Confirm ( self , * * kwargs ) :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
pass
async def message ( self ) :
2023-01-23 07:15:29 +01:00
target_timestring = datetime . fromtimestamp ( self . lastdata )
self . cbpi . notify ( " MQTTSensor Timeout " , " Sensor ' " + str ( self . sensor . name ) + " ' did not respond. Last data received: " + target_timestring . strftime ( " % D % H: % M " ) , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
2023-01-22 19:53:54 +01:00
pass
2021-03-18 19:27:03 +01:00
2021-11-22 17:33:46 +01:00
async def on_message ( self , message ) :
val = json . loads ( message )
try :
if self . payload_text is not None :
for key in self . payload_text :
val = val . get ( key , None )
if isinstance ( val , ( int , float , str ) ) :
self . value = float ( val )
self . push_update ( self . value )
2023-03-25 10:56:42 +01:00
if self . reducedlogging :
await self . logvalue ( )
else :
self . log_data ( self . value )
self . lastlog = time . time ( )
2023-01-22 19:53:54 +01:00
if self . timeout != 0 :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
2023-01-23 07:15:29 +01:00
self . lastdata = time . time ( )
2021-11-22 17:33:46 +01:00
except Exception as e :
2023-03-25 10:56:42 +01:00
logging . error ( " MQTT Sensor Error {} " . format ( e ) )
async def logvalue ( self ) :
now = time . time ( )
if self . kettle is not None :
try :
kettlestatus = self . kettle . instance . state
except :
kettlestatus = False
if kettlestatus :
self . log_data ( self . value )
logging . info ( " Kettle Active " )
self . lastlog = time . time ( )
else :
logging . info ( " Kettle Inactive " )
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
if self . fermenter is not None :
try :
fermenterstatus = self . fermenter . instance . state
except :
fermenterstatus = False
if fermenterstatus :
self . log_data ( self . value )
logging . info ( " Fermenter Active " )
self . lastlog = time . time ( )
else :
logging . info ( " Fermenter Inactive " )
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
2021-11-22 17:33:46 +01:00
2021-03-14 11:52:46 +01:00
async def run ( self ) :
2021-03-18 19:27:03 +01:00
while self . running :
2023-01-22 19:53:54 +01:00
if self . timeout != 0 :
if time . time ( ) > self . nextchecktime and self . notificationsend == False :
await self . message ( )
self . notificationsend = True
2021-03-14 11:52:46 +01:00
await asyncio . sleep ( 1 )
def get_state ( self ) :
return dict ( value = self . value )
async def on_stop ( self ) :
if self . mqtt_task . done ( ) is False :
self . mqtt_task . cancel ( )
try :
await self . mqtt_task
except asyncio . CancelledError :
pass
2021-03-18 19:27:03 +01:00
def setup ( cbpi ) :
2021-03-14 11:52:46 +01:00
'''
2021-03-18 19:27:03 +01:00
This method is called by the server during startup
2021-03-14 11:52:46 +01:00
Here you need to register your plugins at the server
2021-03-18 19:27:03 +01:00
: param cbpi : the cbpi core
: return :
2021-03-14 11:52:46 +01:00
'''
2021-11-23 17:33:58 +01:00
if str ( cbpi . static_config . get ( " mqtt " , False ) ) . lower ( ) == " true " :
2021-03-14 11:52:46 +01:00
cbpi . plugin . register ( " MQTTSensor " , MQTTSensor )