added filters for mqtt updates that can be triggered via mqtt

cbpi/updatekettle
cbpi/updateactor
cbpi/updatesensor
cbpi/updatefermenter
This commit is contained in:
avollkopf 2022-02-03 18:03:53 +01:00
parent 0bf2b64c3e
commit c1dacd691e
3 changed files with 47 additions and 4 deletions

View file

@ -1 +1 @@
__version__ = "4.0.1.10"
__version__ = "4.0.1.11"

View file

@ -1,5 +1,4 @@
import asyncio
import json
from re import M
@ -8,11 +7,14 @@ from contextlib import AsyncExitStack, asynccontextmanager
from cbpi import __version__
import logging
class SatelliteController:
def __init__(self, cbpi):
self.cbpi = cbpi
self.kettlecontroller = cbpi.kettle
self.fermentercontroller = cbpi.fermenter
self.sensorcontroller = cbpi.sensor
self.actorcontroller = cbpi.actor
self.logger = logging.getLogger(__name__)
self.host = cbpi.static_config.get("mqtt_host", "localhost")
self.port = cbpi.static_config.get("mqtt_port", 1883)
@ -23,6 +25,11 @@ class SatelliteController:
("cbpi/actor/+/on", self._actor_on),
("cbpi/actor/+/off", self._actor_off),
("cbpi/actor/+/power", self._actor_power),
("cbpi/updateactor", self._actorupdate),
("cbpi/updatekettle", self._kettleupdate),
("cbpi/updatesensor", self._sensorupdate),
("cbpi/updatefermenter", self._fermenterupdate),
]
self.tasks = set()
@ -69,6 +76,42 @@ class SatelliteController:
except:
self.logger.warning("Failed to set actor power via mqtt")
async def _kettleupdate(self, messages):
async for message in messages:
try:
self.kettle=self.kettlecontroller.get_state()
for item in self.kettle['data']:
self.cbpi.push_update("cbpi/{}/{}".format("kettleupdate",item['id']), item)
except Exception as e:
self.logger.warning("Failed to send kettleupdate via mqtt: {}".format(e))
async def _fermenterupdate(self, messages):
async for message in messages:
try:
self.fermenter=self.fermentercontroller.get_state()
for item in self.fermenter['data']:
self.cbpi.push_update("cbpi/{}/{}".format("fermenterupdate",item['id']), item)
except Exception as e:
self.logger.warning("Failed to send fermenterupdate via mqtt: {}".format(e))
async def _actorupdate(self, messages):
async for message in messages:
try:
self.actor=self.actorcontroller.get_state()
for item in self.actor['data']:
self.cbpi.push_update("cbpi/{}/{}".format("actorupdate",item['id']), item)
except Exception as e:
self.logger.warning("Failed to send actorupdate via mqtt: {}".format(e))
async def _sensorupdate(self, messages):
async for message in messages:
try:
self.sensor=self.sensorcontroller.get_state()
for item in self.sensor['data']:
self.cbpi.push_update("cbpi/{}/{}".format("sensorupdate",item['id']), item)
except Exception as e:
self.logger.warning("Failed to send sensorupdate via mqtt: {}".format(e))
def subcribe(self, topic, method):
task = asyncio.create_task(self._subcribe(topic, method))
return task

View file

@ -109,6 +109,7 @@ class CraftBeerPi:
self.log = LogController(self)
self.system = SystemController(self)
self.kettle = KettleController(self)
self.fermenter : FermentationController = FermentationController(self)
self.step : StepController = StepController(self)
self.recipe : RecipeController = RecipeController(self)
self.upload : UploadController = UploadController(self)
@ -117,7 +118,6 @@ class CraftBeerPi:
if str(self.static_config.get("mqtt", False)).lower() == "true":
self.satellite: SatelliteController = SatelliteController(self)
self.dashboard = DashboardController(self)
self.fermenter : FermentationController = FermentationController(self)
self.http_step = StepHttpEndpoints(self)
self.http_recipe = RecipeHttpEndpoints(self)