diff --git a/.gitignore b/.gitignore index d23da45..6157d3e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ logs/ .coverage .devcontainer/cbpi-dev-config/* cbpi4-* -temp* \ No newline at end of file +temp* +*.patch \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index e24e1e9..0b0fdc4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,11 @@ "type": "python", "request": "launch", "module": "run", - "args": ["--config-folder-path=./.devcontainer/cbpi-dev-config", "start"], + "args": [ + "--config-folder-path=./.devcontainer/cbpi-dev-config", + "--debug-log-level=20", + "start" + ], "preLaunchTask": "copy default cbpi config files if dev config files dont exist" }, diff --git a/cbpi/controller/log_file_controller.py b/cbpi/controller/log_file_controller.py index c752216..fecae47 100644 --- a/cbpi/controller/log_file_controller.py +++ b/cbpi/controller/log_file_controller.py @@ -13,6 +13,7 @@ from cbpi.api import * from cbpi.api.config import ConfigType from cbpi.api.base import CBPiBase import asyncio +import shortuuid class LogController: @@ -28,66 +29,34 @@ class LogController: self.datalogger = {} self.logsFolderPath = self.cbpi.config_folder.logsFolderPath self.logger.info("Log folder path : " + self.logsFolderPath) - - def log_data(self, name: str, value: str) -> None: - self.logfiles = self.cbpi.config.get("CSVLOGFILES", "Yes") - self.influxdb = self.cbpi.config.get("INFLUXDB", "No") - if self.logfiles == "Yes": - if name not in self.datalogger: - max_bytes = int(self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 100000)) - backup_count = int(self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3)) + self.sensor_data_listeners = {} - data_logger = logging.getLogger('cbpi.sensor.%s' % name) - data_logger.propagate = False - data_logger.setLevel(logging.DEBUG) - handler = RotatingFileHandler(os.path.join(self.logsFolderPath, f"sensor_{name}.log"), maxBytes=max_bytes, backupCount=backup_count) - data_logger.addHandler(handler) - self.datalogger[name] = data_logger + def add_sensor_data_listener(self, method): + listener_id = shortuuid.uuid() + self.sensor_data_listeners[listener_id] = method + return listener_id + + def remove_sensor_data_listener(self, listener_id): + try: + del self.sensor_data_listener[listener_id] + except: + self.logger.error("Failed to remove listener {}".format(listener_id)) - formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) - self.datalogger[name].info("%s,%s" % (formatted_time, str(value))) + async def _call_sensor_data_listeners(self, sensor_id, value, formatted_time, name): + for listener_id, method in self.sensor_data_listeners.items(): + asyncio.create_task(method(self.cbpi, sensor_id, value, formatted_time, name)) - if self.influxdb == "Yes": - ## Write to influxdb in an asyncio task - self._task = asyncio.create_task(self.log_influx(name,value)) - - async def log_influx(self, name:str, value:str): - self.influxdbcloud = self.cbpi.config.get("INFLUXDBCLOUD", "No") - self.influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None) - self.influxdbname = self.cbpi.config.get("INFLUXDBNAME", None) - self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None) - self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None) - self.influxdbmeasurement = self.cbpi.config.get("INFLUXDBMEASUREMENT", "measurement") - id = name - timeout = Timeout(connect=5.0, read=None) + def log_data(self, id: str, value: str) -> None: + # all plugin targets: + if self.sensor_data_listeners: # true if there are listners try: - sensor=self.cbpi.sensor.find_by_id(name) + sensor=self.cbpi.sensor.find_by_id(id) if sensor is not None: - itemname=sensor.name.replace(" ", "_") - out=str(self.influxdbmeasurement)+",source=" + itemname + ",itemID=" + str(id) + " value="+str(value) + name = sensor.name.replace(" ", "_") + formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) + asyncio.create_task(self._call_sensor_data_listeners(id, value, formatted_time, name)) except Exception as e: - logging.error("InfluxDB ID Error: {}".format(e)) - - if self.influxdbcloud == "Yes": - self.influxdburl=self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s" - try: - header = {'User-Agent': name, 'Authorization': "Token {}".format(self.influxdbpwd)} - http = PoolManager(timeout=timeout) - req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) - except Exception as e: - logging.error("InfluxDB cloud write Error: {}".format(e)) - - else: - self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode()) - self.influxdburl= self.influxdbaddr + '/write?db=' + self.influxdbname - try: - header = {'User-Agent': name, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')} - http = PoolManager(timeout=timeout) - req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) - except Exception as e: - logging.error("InfluxDB write Error: {}".format(e)) - - + logging.error("sensor logging listener exception: {}".format(e)) async def get_data(self, names, sample_rate='60s'): logging.info("Start Log for {}".format(names)) @@ -182,7 +151,9 @@ class LogController: def clear_log(self, name:str ) -> str: all_filenames = glob.glob(os.path.join(self.logsFolderPath, f"sensor_{name}.log*")) - + + logging.info(f'Deleting logfiles for sensor {name}.') + if name in self.datalogger: self.datalogger[name].removeHandler(self.datalogger[name].handlers[0]) del self.datalogger[name] diff --git a/cbpi/extension/ConfigUpdate/__init__.py b/cbpi/extension/ConfigUpdate/__init__.py index 419e523..62fee1b 100644 --- a/cbpi/extension/ConfigUpdate/__init__.py +++ b/cbpi/extension/ConfigUpdate/__init__.py @@ -278,7 +278,7 @@ class ConfigUpdate(CBPiExtension): if logfiles is None: logger.info("INIT CSV logfiles") try: - await self.cbpi.config.add("CSVLOGFILES", "Yes", type=ConfigType.SELECT, description="Write sensor data to csv logfiles", + await self.cbpi.config.add("CSVLOGFILES", "Yes", type=ConfigType.SELECT, description="Write sensor data to csv logfiles (enabling requires restart)", source="craftbeerpi", options= [{"label": "Yes", "value": "Yes"}, {"label": "No", "value": "No"}]) @@ -289,7 +289,7 @@ class ConfigUpdate(CBPiExtension): if influxdb is None: logger.info("INIT Influxdb") try: - await self.cbpi.config.add("INFLUXDB", "No", type=ConfigType.SELECT, description="Write sensor data to influxdb", + await self.cbpi.config.add("INFLUXDB", "No", type=ConfigType.SELECT, description="Write sensor data to influxdb (enabling requires restart)", source="craftbeerpi", options= [{"label": "Yes", "value": "Yes"}, {"label": "No", "value": "No"}]) diff --git a/cbpi/extension/SensorLogTarget_CSV/__init__.py b/cbpi/extension/SensorLogTarget_CSV/__init__.py new file mode 100644 index 0000000..6360cf7 --- /dev/null +++ b/cbpi/extension/SensorLogTarget_CSV/__init__.py @@ -0,0 +1,51 @@ + +# -*- coding: utf-8 -*- +import os +from logging.handlers import RotatingFileHandler +import logging +from unittest.mock import MagicMock, patch +import asyncio +import random +from cbpi.api import * +from cbpi.api.config import ConfigType +import urllib3 +import base64 + +logger = logging.getLogger(__name__) + +class SensorLogTargetCSV(CBPiExtension): + + def __init__(self, cbpi): # called from cbpi on start + self.cbpi = cbpi + self.logfiles = self.cbpi.config.get("CSVLOGFILES", "Yes") + if self.logfiles == "No": + return # never run() + self._task = asyncio.create_task(self.run()) # one time run() only + + + async def run(self): # called by __init__ once on start if CSV is enabled + self.listener_ID = self.cbpi.log.add_sensor_data_listener(self.log_data_to_CSV) + logger.info("CSV sensor log target listener ID: {}".format(self.listener_ID)) + + async def log_data_to_CSV(self, cbpi, id:str, value:str, formatted_time, name): # called by log_data() hook from the log file controller + self.logfiles = self.cbpi.config.get("CSVLOGFILES", "Yes") + if self.logfiles == "No": + # We intentionally do not unsubscribe the listener here because then we had no way of resubscribing him without a restart of cbpi + # as long as cbpi was STARTED with CSVLOGFILES set to Yes this function is still subscribed, so changes can be made on the fly. + # but after initially enabling this logging target a restart is required. + return + if id not in self.cbpi.log.datalogger: + max_bytes = int(self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 100000)) + backup_count = int(self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3)) + + data_logger = logging.getLogger('cbpi.sensor.%s' % id) + data_logger.propagate = False + data_logger.setLevel(logging.DEBUG) + handler = RotatingFileHandler(os.path.join(self.cbpi.log.logsFolderPath, f"sensor_{id}.log"), maxBytes=max_bytes, backupCount=backup_count) + data_logger.addHandler(handler) + self.cbpi.log.datalogger[id] = data_logger + + self.cbpi.log.datalogger[id].info("%s,%s" % (formatted_time, str(value))) + +def setup(cbpi): + cbpi.plugin.register("SensorLogTargetCSV", SensorLogTargetCSV) diff --git a/cbpi/extension/SensorLogTarget_CSV/config.yaml b/cbpi/extension/SensorLogTarget_CSV/config.yaml new file mode 100644 index 0000000..d396a0c --- /dev/null +++ b/cbpi/extension/SensorLogTarget_CSV/config.yaml @@ -0,0 +1,3 @@ +name: SensorLogTargetCSV +version: 4 +active: true diff --git a/cbpi/extension/SensorLogTarget_InfluxDB/__init__.py b/cbpi/extension/SensorLogTarget_InfluxDB/__init__.py new file mode 100644 index 0000000..cb9fe75 --- /dev/null +++ b/cbpi/extension/SensorLogTarget_InfluxDB/__init__.py @@ -0,0 +1,76 @@ + +# -*- coding: utf-8 -*- +import os +from urllib3 import Timeout, PoolManager +import logging +from unittest.mock import MagicMock, patch +import asyncio +import random +from cbpi.api import * +from cbpi.api.config import ConfigType +import urllib3 +import base64 + +logger = logging.getLogger(__name__) + +class SensorLogTargetInfluxDB(CBPiExtension): + + def __init__(self, cbpi): # called from cbpi on start + self.cbpi = cbpi + self.influxdb = self.cbpi.config.get("INFLUXDB", "No") + if self.influxdb == "No": + return # never run() + self._task = asyncio.create_task(self.run()) # one time run() only + + + async def run(self): # called by __init__ once on start if influx is enabled + self.listener_ID = self.cbpi.log.add_sensor_data_listener(self.log_data_to_InfluxDB) + logger.info("InfluxDB sensor log target listener ID: {}".format(self.listener_ID)) + + async def log_data_to_InfluxDB(self, cbpi, id:str, value:str, timestamp, name): # called by log_data() hook from the log file controller + self.influxdb = self.cbpi.config.get("INFLUXDB", "No") + if self.influxdb == "No": + # We intentionally do not unsubscribe the listener here because then we had no way of resubscribing him without a restart of cbpi + # as long as cbpi was STARTED with INFLUXDB set to Yes this function is still subscribed, so changes can be made on the fly. + # but after initially enabling this logging target a restart is required. + return + self.influxdbcloud = self.cbpi.config.get("INFLUXDBCLOUD", "No") + self.influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None) + self.influxdbname = self.cbpi.config.get("INFLUXDBNAME", None) + self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None) + self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None) + self.influxdbmeasurement = self.cbpi.config.get("INFLUXDBMEASUREMENT", "measurement") + timeout = Timeout(connect=5.0, read=None) + try: + sensor=self.cbpi.sensor.find_by_id(id) + if sensor is not None: + itemname=sensor.name.replace(" ", "_") + out=str(self.influxdbmeasurement)+",source=" + itemname + ",itemID=" + str(id) + " value="+str(value) + except Exception as e: + logging.error("InfluxDB ID Error: {}".format(e)) + + if self.influxdbcloud == "Yes": + self.influxdburl=self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s" + try: + header = {'User-Agent': id, 'Authorization': "Token {}".format(self.influxdbpwd)} + http = PoolManager(timeout=timeout) + req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) + if req.status != 204: + raise Exception(f'InfluxDB Status code {req.status}') + except Exception as e: + logging.error("InfluxDB cloud write Error: {}".format(e)) + + else: + self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode()) + self.influxdburl= self.influxdbaddr + '/write?db=' + self.influxdbname + try: + header = {'User-Agent': id, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')} + http = PoolManager(timeout=timeout) + req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) + if req.status != 204: + raise Exception(f'InfluxDB Status code {req.status}') + except Exception as e: + logging.error("InfluxDB write Error: {}".format(e)) + +def setup(cbpi): + cbpi.plugin.register("SensorLogTargetInfluxDB", SensorLogTargetInfluxDB) diff --git a/cbpi/extension/SensorLogTarget_InfluxDB/config.yaml b/cbpi/extension/SensorLogTarget_InfluxDB/config.yaml new file mode 100644 index 0000000..13cb8df --- /dev/null +++ b/cbpi/extension/SensorLogTarget_InfluxDB/config.yaml @@ -0,0 +1,3 @@ +name: SensorLogTargetInfluxDB +version: 4 +active: true diff --git a/tests/cbpi-test-config/config.json b/tests/cbpi-test-config/config.json index da7fab4..298b8b0 100644 --- a/tests/cbpi-test-config/config.json +++ b/tests/cbpi-test-config/config.json @@ -80,7 +80,7 @@ "options": null, "source": "hidden", "type": "string", - "value": "4.1.8.a11" + "value": "4.1.10.a1" }, "CSVLOGFILES": { "description": "Write sensor data to csv logfiles", @@ -117,7 +117,7 @@ "value": "No" }, "INFLUXDBADDR": { - "description": "IP Address of your influxdb server (If INFLUXDBCLOUD set to Yes use URL Address of your influxdb cloud server)", + "description": "URL Address of your influxdb server incl. http:// and port, e.g. http://localhost:8086 (If INFLUXDBCLOUD set to Yes use URL Address of your influxdb cloud server)", "name": "INFLUXDBADDR", "options": null, "source": "craftbeerpi", @@ -157,14 +157,6 @@ "type": "string", "value": "cbpi4" }, - "INFLUXDBPORT": { - "description": "Port of your influxdb server", - "name": "INFLUXDBPORT", - "options": null, - "source": "craftbeerpi", - "type": "string", - "value": "8086" - }, "INFLUXDBPWD": { "description": "Password for your influxdb database (only if required)(If INFLUXDBCLOUD set to Yes use token of your influxdb cloud database)", "name": "INFLUXDBPWD", @@ -174,7 +166,7 @@ "value": " " }, "INFLUXDBUSER": { - "description": "User name for your influxdb database (only if required)(If INFLUXDBCLOUD set to Yes use organisation of your influxdb cloud database)", + "description": "User Name for your influxdb database (only if required)(If INFLUXDBCLOUD set to Yes use organisation of your influxdb cloud database)", "name": "INFLUXDBUSER", "options": null, "source": "craftbeerpi", diff --git a/tests/cbpi-test-config/sensor.json b/tests/cbpi-test-config/sensor.json index ce96464..4cf8966 100644 --- a/tests/cbpi-test-config/sensor.json +++ b/tests/cbpi-test-config/sensor.json @@ -1,3 +1,11 @@ { - "data": [] + "data": [ + { + "id": "unconfigured_test_sensor_ID", + "name": "unconfigured_mqtt_sensor", + "props": {}, + "state": false, + "type": "MQTTSensor" + } + ] } \ No newline at end of file diff --git a/tests/test_logger.py b/tests/test_logger.py index 50d729b..633ba00 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -10,10 +10,10 @@ class LoggerTestCase(CraftBeerPiTestCase): async def test_log_data(self): os.makedirs(os.path.join(".", "tests", "logs"), exist_ok=True) - log_name = "test" + log_name = "unconfigured_test_sensor_ID" #clear all logs self.cbpi.log.clear_log(log_name) - assert len(glob.glob(os.path.join(".", "tests", "logs", f"sensor_{log_name}.log*"))) == 0 + assert len(glob.glob(os.path.join(self.cbpi.log.logsFolderPath, f"sensor_{log_name}.log*"))) == 0 # write log entries for i in range(5):