2021-03-14 11:52:46 +01:00
# -*- coding: utf-8 -*-
import asyncio
2023-01-22 19:53:54 +01:00
from cbpi . api . dataclasses import NotificationAction , NotificationType
2021-03-18 19:27:03 +01:00
from cbpi . api import parameters , Property , CBPiSensor
2021-11-22 17:33:46 +01:00
from cbpi . api import *
import logging
import json
2023-01-22 19:53:54 +01:00
import time
2023-01-23 07:15:29 +01:00
from datetime import datetime
2021-03-14 11:52:46 +01:00
2024-07-06 09:06:48 +02:00
2021-11-22 17:33:46 +01:00
@parameters ( [ Property . Text ( label = " Topic " , configurable = True , description = " MQTT Topic " ) ,
2024-07-06 09:06:48 +02:00
Property . Text ( label = " PayloadDictionary " , configurable = True , default_value = " " ,
description = " Where to find msg in payload, leave blank for raw payload " ) ,
Property . Kettle ( label = " Kettle " , description = " Reduced logging if Kettle is inactive / range warning in dashboard (only Kettle or Fermenter to be selected) " ) ,
Property . Fermenter ( label = " Fermenter " , description = " Reduced logging if Fermenter is inactive / range warning in dashboard (only Kettle or Fermenter to be selected) " ) ,
Property . Number ( label = " ReducedLogging " , configurable = True , description = " Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled) " ) ,
Property . Number ( label = " Timeout " , configurable = True , unit = " sec " ,
description = " Timeout in seconds to send notification (default:60 | deactivated: 0) " ) ,
Property . Number ( label = " TempRange " , configurable = True , unit = " degree " ,
description = " Temp range in degree between reading and target temp of fermenter/kettle. Larger difference shows different color in dashboard (default:0 | deactivated: 0) " ) ] )
2021-03-14 11:52:46 +01:00
class MQTTSensor ( CBPiSensor ) :
2021-03-18 19:27:03 +01:00
2021-03-14 11:52:46 +01:00
def __init__ ( self , cbpi , id , props ) :
super ( MQTTSensor , self ) . __init__ ( cbpi , id , props )
2021-11-22 17:33:46 +01:00
self . Topic = self . props . get ( " Topic " , None )
self . payload_text = self . props . get ( " PayloadDictionary " , None )
if self . payload_text != None :
self . payload_text = self . payload_text . split ( ' . ' )
2024-02-27 07:20:10 +01:00
self . subscribed = self . cbpi . satellite . subscribe ( self . Topic , self . on_message )
2022-03-07 15:03:41 +01:00
self . value : float = 999
2023-01-22 19:53:54 +01:00
self . timeout = int ( self . props . get ( " Timeout " , 60 ) )
2024-05-28 21:51:17 +02:00
self . temprange = float ( self . props . get ( " TempRange " , 0 ) )
2023-01-22 19:53:54 +01:00
self . starttime = time . time ( )
self . notificationsend = False
self . nextchecktime = self . starttime + self . timeout
2023-01-23 07:15:29 +01:00
self . lastdata = time . time ( )
2023-03-25 10:56:42 +01:00
self . lastlog = 0
2023-01-23 07:15:29 +01:00
self . sensor = self . get_sensor ( self . id )
2023-03-25 10:56:42 +01:00
self . reducedfrequency = int ( self . props . get ( " ReducedLogging " , 60 ) )
2023-03-27 20:19:37 +02:00
if self . reducedfrequency < 0 :
self . reducedfrequency = 0
2023-03-25 10:56:42 +01:00
self . kettleid = self . props . get ( " Kettle " , None )
self . fermenterid = self . props . get ( " Fermenter " , None )
2023-03-25 13:19:02 +01:00
self . reducedlogging = True if self . kettleid or self . fermenterid else False
2023-03-25 10:56:42 +01:00
if self . kettleid is not None and self . fermenterid is not None :
self . reducedlogging = False
2024-05-28 21:51:17 +02:00
self . cbpi . notify ( " MQTTSensor " , " Sensor ' " + str ( self . sensor . name ) + " ' cant ' t have Fermenter and Kettle defined for reduced logging / range warning. " , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
2023-03-25 10:56:42 +01:00
2023-01-22 19:53:54 +01:00
async def Confirm ( self , * * kwargs ) :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
pass
async def message ( self ) :
2023-01-23 07:15:29 +01:00
target_timestring = datetime . fromtimestamp ( self . lastdata )
self . cbpi . notify ( " MQTTSensor Timeout " , " Sensor ' " + str ( self . sensor . name ) + " ' did not respond. Last data received: " + target_timestring . strftime ( " % D % H: % M " ) , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
2023-01-22 19:53:54 +01:00
pass
2021-03-18 19:27:03 +01:00
2021-11-22 17:33:46 +01:00
async def on_message ( self , message ) :
2024-02-27 07:20:10 +01:00
val = json . loads ( message . payload . decode ( ) )
2021-11-22 17:33:46 +01:00
try :
if self . payload_text is not None :
for key in self . payload_text :
val = val . get ( key , None )
if isinstance ( val , ( int , float , str ) ) :
self . value = float ( val )
self . push_update ( self . value )
2023-03-25 13:19:02 +01:00
if self . reducedlogging == True :
2023-03-25 10:56:42 +01:00
await self . logvalue ( )
else :
2023-03-25 13:19:02 +01:00
logging . info ( " MQTTSensor {} regular logging " . format ( self . sensor . name ) )
2023-03-25 10:56:42 +01:00
self . log_data ( self . value )
self . lastlog = time . time ( )
2023-01-22 19:53:54 +01:00
if self . timeout != 0 :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
2023-01-23 07:15:29 +01:00
self . lastdata = time . time ( )
2021-11-22 17:33:46 +01:00
except Exception as e :
2023-03-25 10:56:42 +01:00
logging . error ( " MQTT Sensor Error {} " . format ( e ) )
async def logvalue ( self ) :
2023-03-28 07:00:21 +02:00
self . kettle = self . get_kettle ( self . kettleid ) if self . kettleid is not None else None
self . fermenter = self . get_fermenter ( self . fermenterid ) if self . fermenterid is not None else None
now = time . time ( )
if self . kettle is not None :
try :
kettlestatus = self . kettle . instance . state
except :
kettlestatus = False
if kettlestatus :
self . log_data ( self . value )
logging . info ( " MQTTSensor {} Kettle Active " . format ( self . sensor . name ) )
self . lastlog = time . time ( )
else :
logging . info ( " MQTTSensor {} Kettle Inactive " . format ( self . sensor . name ) )
if self . reducedfrequency != 0 :
2023-03-27 20:19:37 +02:00
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
2023-03-25 10:56:42 +01:00
2023-03-28 07:00:21 +02:00
if self . fermenter is not None :
try :
fermenterstatus = self . fermenter . instance . state
except :
fermenterstatus = False
if fermenterstatus :
self . log_data ( self . value )
logging . info ( " MQTTSensor {} Fermenter Active " . format ( self . sensor . name ) )
self . lastlog = time . time ( )
else :
logging . info ( " MQTTSensor {} Fermenter Inactive " . format ( self . sensor . name ) )
if self . reducedfrequency != 0 :
2023-03-27 20:19:37 +02:00
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
2021-11-22 17:33:46 +01:00
2021-03-14 11:52:46 +01:00
async def run ( self ) :
2021-03-18 19:27:03 +01:00
while self . running :
2023-01-22 19:53:54 +01:00
if self . timeout != 0 :
if time . time ( ) > self . nextchecktime and self . notificationsend == False :
await self . message ( )
self . notificationsend = True
2021-03-14 11:52:46 +01:00
await asyncio . sleep ( 1 )
def get_state ( self ) :
return dict ( value = self . value )
async def on_stop ( self ) :
2024-02-27 07:20:10 +01:00
self . subscribed = self . cbpi . satellite . unsubscribe ( self . Topic , self . on_message )
2024-07-06 09:06:48 +02:00
@parameters ( [ Property . Text ( label = " Topic " , configurable = True , description = " MQTT Topic " ) ,
Property . Text ( label = " PayloadDictionary " , configurable = True , default_value = " " ,
description = " Where to find msg in payload, leave blank for raw payload " ) ,
Property . Kettle ( label = " Kettle " , description = " Reduced logging if Kettle is inactive / range warning in dashboard (only Kettle or Fermenter to be selected) " ) ,
Property . Fermenter ( label = " Fermenter " , description = " Reduced logging if Fermenter is inactive / range warning in dashboard (only Kettle or Fermenter to be selected) " ) ,
Property . Number ( label = " Offset " , configurable = True , description = " Offset for MQTT Sensor (default is 0). !!! Use this only with caution as offset for MQTT sensor should be defined on Sensor side !!! " ) ,
Property . Number ( label = " ReducedLogging " , configurable = True , description = " Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled) " ) ,
Property . Number ( label = " Timeout " , configurable = True , unit = " sec " ,
description = " Timeout in seconds to send notification (default:60 | deactivated: 0) " ) ,
Property . Number ( label = " TempRange " , configurable = True , unit = " degree " ,
description = " Temp range in degree between reading and target temp of fermenter/kettle. Larger difference shows different color in dashboard (default:0 | deactivated: 0) " ) ] )
class MQTTSensorOffset ( CBPiSensor ) :
def __init__ ( self , cbpi , id , props ) :
super ( MQTTSensorOffset , self ) . __init__ ( cbpi , id , props )
self . Topic = self . props . get ( " Topic " , None )
self . offset = float ( self . props . get ( " Offset " , 0 ) )
self . payload_text = self . props . get ( " PayloadDictionary " , None )
if self . payload_text != None :
self . payload_text = self . payload_text . split ( ' . ' )
self . subscribed = self . cbpi . satellite . subscribe ( self . Topic , self . on_message )
self . value : float = 999
self . timeout = int ( self . props . get ( " Timeout " , 60 ) )
self . temprange = float ( self . props . get ( " TempRange " , 0 ) )
self . starttime = time . time ( )
self . notificationsend = False
self . nextchecktime = self . starttime + self . timeout
self . lastdata = time . time ( )
self . lastlog = 0
self . sensor = self . get_sensor ( self . id )
self . reducedfrequency = int ( self . props . get ( " ReducedLogging " , 60 ) )
if self . reducedfrequency < 0 :
self . reducedfrequency = 0
self . kettleid = self . props . get ( " Kettle " , None )
self . fermenterid = self . props . get ( " Fermenter " , None )
self . reducedlogging = True if self . kettleid or self . fermenterid else False
if self . kettleid is not None and self . fermenterid is not None :
self . reducedlogging = False
self . cbpi . notify ( " MQTTSensor " , " Sensor ' " + str ( self . sensor . name ) + " ' cant ' t have Fermenter and Kettle defined for reduced logging / range warning. " , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
async def Confirm ( self , * * kwargs ) :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
pass
async def message ( self ) :
target_timestring = datetime . fromtimestamp ( self . lastdata )
self . cbpi . notify ( " MQTTSensor Timeout " , " Sensor ' " + str ( self . sensor . name ) + " ' did not respond. Last data received: " + target_timestring . strftime ( " % D % H: % M " ) , NotificationType . WARNING , action = [ NotificationAction ( " OK " , self . Confirm ) ] )
pass
async def on_message ( self , message ) :
val = json . loads ( message . payload . decode ( ) )
try :
if self . payload_text is not None :
for key in self . payload_text :
val = val . get ( key , None )
if isinstance ( val , ( int , float , str ) ) :
self . value = float ( val ) + self . offset
self . push_update ( self . value )
if self . reducedlogging == True :
await self . logvalue ( )
else :
logging . info ( " MQTTSensor {} regular logging " . format ( self . sensor . name ) )
self . log_data ( self . value )
self . lastlog = time . time ( )
if self . timeout != 0 :
self . nextchecktime = time . time ( ) + self . timeout
self . notificationsend = False
self . lastdata = time . time ( )
except Exception as e :
logging . error ( " MQTT Sensor Error {} " . format ( e ) )
async def logvalue ( self ) :
self . kettle = self . get_kettle ( self . kettleid ) if self . kettleid is not None else None
self . fermenter = self . get_fermenter ( self . fermenterid ) if self . fermenterid is not None else None
now = time . time ( )
if self . kettle is not None :
try :
kettlestatus = self . kettle . instance . state
except :
kettlestatus = False
if kettlestatus :
self . log_data ( self . value )
logging . info ( " MQTTSensor {} Kettle Active " . format ( self . sensor . name ) )
self . lastlog = time . time ( )
else :
logging . info ( " MQTTSensor {} Kettle Inactive " . format ( self . sensor . name ) )
if self . reducedfrequency != 0 :
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
if self . fermenter is not None :
try :
fermenterstatus = self . fermenter . instance . state
except :
fermenterstatus = False
if fermenterstatus :
self . log_data ( self . value )
logging . info ( " MQTTSensor {} Fermenter Active " . format ( self . sensor . name ) )
self . lastlog = time . time ( )
else :
logging . info ( " MQTTSensor {} Fermenter Inactive " . format ( self . sensor . name ) )
if self . reducedfrequency != 0 :
if now > = self . lastlog + self . reducedfrequency :
self . log_data ( self . value )
self . lastlog = time . time ( )
logging . info ( " Logged with reduced freqency " )
pass
async def run ( self ) :
while self . running :
if self . timeout != 0 :
if time . time ( ) > self . nextchecktime and self . notificationsend == False :
await self . message ( )
self . notificationsend = True
await asyncio . sleep ( 1 )
def get_state ( self ) :
return dict ( value = self . value )
async def on_stop ( self ) :
self . subscribed = self . cbpi . satellite . unsubscribe ( self . Topic , self . on_message )
2021-03-14 11:52:46 +01:00
2021-03-18 19:27:03 +01:00
def setup ( cbpi ) :
2021-03-14 11:52:46 +01:00
'''
2021-03-18 19:27:03 +01:00
This method is called by the server during startup
2021-03-14 11:52:46 +01:00
Here you need to register your plugins at the server
2021-03-18 19:27:03 +01:00
: param cbpi : the cbpi core
: return :
2021-03-14 11:52:46 +01:00
'''
2021-11-23 17:33:58 +01:00
if str ( cbpi . static_config . get ( " mqtt " , False ) ) . lower ( ) == " true " :
2024-07-06 09:06:48 +02:00
if str ( cbpi . static_config . get ( " mqtt_offset " , False ) ) . lower ( ) == " false " :
cbpi . plugin . register ( " MQTTSensor " , MQTTSensor )
else :
cbpi . plugin . register ( " MQTTSensor " , MQTTSensorOffset )