From 9b3219d89c6fe25cd250f984df6715afd1c6ad09 Mon Sep 17 00:00:00 2001 From: avollkopf <43980694+avollkopf@users.noreply.github.com> Date: Tue, 14 Dec 2021 07:20:22 +0100 Subject: [PATCH] Test to transfer Data to influxdb CSV logfile writing can be switched off via settings influxdb can be switched on via settings ->Some changes will be required --- cbpi/__init__.py | 2 +- cbpi/controller/actor_controller.py | 2 +- cbpi/controller/log_file_controller.py | 73 +++++++++++++++++++------ cbpi/extension/ConfigUpdate/__init__.py | 68 +++++++++++++++++++++++ 4 files changed, 127 insertions(+), 18 deletions(-) diff --git a/cbpi/__init__.py b/cbpi/__init__.py index 6fa1e66..75aa62c 100644 --- a/cbpi/__init__.py +++ b/cbpi/__init__.py @@ -1 +1 @@ -__version__ = "4.0.0.56" +__version__ = "4.0.0.57" diff --git a/cbpi/controller/actor_controller.py b/cbpi/controller/actor_controller.py index dca5a44..75bd623 100644 --- a/cbpi/controller/actor_controller.py +++ b/cbpi/controller/actor_controller.py @@ -9,7 +9,7 @@ class ActorController(BasicController): self.update_key = "actorupdate" async def on(self, id, power=None): - logging.info("Controller_power: {}".format(power)) +# logging.info("Controller_power: {}".format(power)) try: item = self.find_by_id(id) if power is None: diff --git a/cbpi/controller/log_file_controller.py b/cbpi/controller/log_file_controller.py index d4d6576..8546e7d 100644 --- a/cbpi/controller/log_file_controller.py +++ b/cbpi/controller/log_file_controller.py @@ -6,6 +6,13 @@ from logging.handlers import RotatingFileHandler from time import strftime, localtime import pandas as pd import zipfile +import base64 +import urllib3 +from cbpi.api import * +from cbpi.api.config import ConfigType +from cbpi.api.base import CBPiBase +import asyncio + class LogController: @@ -16,27 +23,61 @@ class LogController: ''' self.cbpi = cbpi self.logger = logging.getLogger(__name__) - + self.configuration = False self.datalogger = {} +# self.cbpi.config.get does not seem to work here... +# self.influxdbaddr="192.168.163.105" +# self.influxdbport="8086" +# self.influxdbname="cbpi4" +# self.influxdburl='http://' + self.influxdbaddr + ':' + str(self.influxdbport) + '/write?db=' + self.influxdbname +# self.influxdbuser="" +# self.influxdbpwd="" +# self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode()) + def log_data(self, name: str, value: str) -> None: + self.logfiles = self.cbpi.config.get("CSVLOGFILES", "Yes") + self.influxdb = self.cbpi.config.get("INFLUXDB", "No") + if self.logfiles == "Yes": + if name not in self.datalogger: + max_bytes = self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 1048576) + backup_count = self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3) + + data_logger = logging.getLogger('cbpi.sensor.%s' % name) + data_logger.propagate = False + data_logger.setLevel(logging.DEBUG) + handler = RotatingFileHandler('./logs/sensor_%s.log' % name, maxBytes=max_bytes, backupCount=backup_count) + data_logger.addHandler(handler) + self.datalogger[name] = data_logger - if name not in self.datalogger: - max_bytes = self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 1048576) - backup_count = self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3) + formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) + self.datalogger[name].info("%s,%s" % (formatted_time, value)) + + if self.influxdb == "Yes": + self.influxdb = self.cbpi.config.get("INFLUXDB", "No") + self.influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None) + self.influxdbport = self.cbpi.config.get("INFLUXDBPORT", None) + self.influxdbname = self.cbpi.config.get("INFLUXDBNAME", None) + self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None) + self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None) + self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode()) + self.influxdburl='http://' + self.influxdbaddr + ':' + str(self.influxdbport) + '/write?db=' + self.influxdbname + + + try: + sensor=self.cbpi.sensor.find_by_id(name) + sensorname=sensor.name.replace(" ", "_") + out="measurement,source=" + sensorname + "___" + name + " value="+str(value) + header = {'User-Agent': name, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')} + http = urllib3.PoolManager() + req = http.request('POST',self.influxdburl, body=out, headers = header) + except Exception as e: + logging.error("InfluxDB write Error: {}".format(e)) - data_logger = logging.getLogger('cbpi.sensor.%s' % name) - data_logger.propagate = False - data_logger.setLevel(logging.DEBUG) - handler = RotatingFileHandler('./logs/sensor_%s.log' % name, maxBytes=max_bytes, backupCount=backup_count) - data_logger.addHandler(handler) - self.datalogger[name] = data_logger - formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) - self.datalogger[name].info("%s,%s" % (formatted_time, value)) async def get_data(self, names, sample_rate='60s'): - + logging.info("Start Log for {}".format(names)) ''' :param names: name as string or list of names as string :param sample_rate: rate for resampling the data @@ -70,11 +111,11 @@ class LogController: # concat all logs df = pd.concat([pd.read_csv(f, parse_dates=True, date_parser=dateparse, index_col='DateTime', names=['DateTime', name], header=None) for f in all_filenames]) - + logging.info("Read all files for {}".format(names)) # resample if rate provided if sample_rate is not None: df = df[name].resample(sample_rate).max() - + logging.info("Sampled now for {}".format(names)) df = df.dropna() if result is None: result = df @@ -88,7 +129,7 @@ class LogController: data[name] = result[name].interpolate(limit_direction='both', limit=10).tolist() else: data[name] = result.interpolate().tolist() - + logging.info("Send Log for {}".format(names)) return data async def get_data2(self, ids) -> dict: diff --git a/cbpi/extension/ConfigUpdate/__init__.py b/cbpi/extension/ConfigUpdate/__init__.py index 7cd874d..7d7cc63 100644 --- a/cbpi/extension/ConfigUpdate/__init__.py +++ b/cbpi/extension/ConfigUpdate/__init__.py @@ -35,6 +35,15 @@ class ConfigUpdate(CBPiExtension): cooldown_step = self.cbpi.config.get("steps_cooldown", None) max_dashboard_number = self.cbpi.config.get("max_dashboard_number", None) current_dashboard_number = self.cbpi.config.get("current_dashboard_number", None) + logfiles = self.cbpi.config.get("CSVLOGFILES", None) + influxdb = self.cbpi.config.get("INFLUXDB", None) + influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None) + influxdbport = self.cbpi.config.get("INFLUXDBPORT", None) + influxdbname = self.cbpi.config.get("INFLUXDBNAME", None) + influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None) + influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None) + + if boil_temp is None: logger.info("INIT Boil Temp Setting") @@ -174,6 +183,65 @@ class ConfigUpdate(CBPiExtension): except: logger.warning('Unable to update config') + ## Check if CSV logfiles is on config + if logfiles is None: + logger.info("INIT CSV logfiles") + try: + await self.cbpi.config.add("CSVLOGFILES", "Yes", ConfigType.SELECT, "Write sensor data to csv logfiles", + [{"label": "Yes", "value": "Yes"}, + {"label": "No", "value": "No"}]) + except: + logger.warning('Unable to update config') + + ## Check if CSV logfiles is on config + if influxdb is None: + logger.info("INIT Influxdb") + try: + await self.cbpi.config.add("INFLUXDB", "No", ConfigType.SELECT, "Write sensor data to influxdb", + [{"label": "Yes", "value": "Yes"}, + {"label": "No", "value": "No"}]) + except: + logger.warning('Unable to update config') + + ## Check if influxdbaddr is in config + if influxdbaddr is None: + logger.info("INIT Influxdbaddr") + try: + await self.cbpi.config.add("INFLUXDBADDR", "localhost", ConfigType.STRING, "IP Address of your influxdb server") + except: + logger.warning('Unable to update config') + + ## Check if influxdbport is in config + if influxdbport is None: + logger.info("INIT Influxdbport") + try: + await self.cbpi.config.add("INFLUXDBPORT", "8086", ConfigType.STRING, "Port of your influxdb server") + except: + logger.warning('Unable to update config') + + ## Check if influxdbname is in config + if influxdbname is None: + logger.info("INIT Influxdbname") + try: + await self.cbpi.config.add("INFLUXDBNAME", "cbpi4", ConfigType.STRING, "Name of your influxdb database name") + except: + logger.warning('Unable to update config') + + ## Check if influxdber is in config + if influxdbuser is None: + logger.info("INIT Influxdbuser") + try: + await self.cbpi.config.add("INFLUXDBUSER", " ", ConfigType.STRING, "User name for your influxdb database (only if required)") + except: + logger.warning('Unable to update config') + + ## Check if influxdber is in config + if influxdbpwd is None: + logger.info("INIT Influxdbpwd") + try: + await self.cbpi.config.add("INFLUXDBPWD", " ", ConfigType.STRING, "Password for your influxdb database (only if required)") + except: + logger.warning('Unable to update config') def setup(cbpi): cbpi.plugin.register("ConfigUpdate", ConfigUpdate)