Test to transfer Data to influxdb

CSV logfile writing can be switched off via settings
influxdb can be switched on via settings

->Some changes will be required
This commit is contained in:
avollkopf 2021-12-14 07:20:22 +01:00
parent b2fe624d08
commit 9b3219d89c
4 changed files with 127 additions and 18 deletions

View file

@ -1 +1 @@
__version__ = "4.0.0.56" __version__ = "4.0.0.57"

View file

@ -9,7 +9,7 @@ class ActorController(BasicController):
self.update_key = "actorupdate" self.update_key = "actorupdate"
async def on(self, id, power=None): async def on(self, id, power=None):
logging.info("Controller_power: {}".format(power)) # logging.info("Controller_power: {}".format(power))
try: try:
item = self.find_by_id(id) item = self.find_by_id(id)
if power is None: if power is None:

View file

@ -6,6 +6,13 @@ from logging.handlers import RotatingFileHandler
from time import strftime, localtime from time import strftime, localtime
import pandas as pd import pandas as pd
import zipfile import zipfile
import base64
import urllib3
from cbpi.api import *
from cbpi.api.config import ConfigType
from cbpi.api.base import CBPiBase
import asyncio
class LogController: class LogController:
@ -16,27 +23,61 @@ class LogController:
''' '''
self.cbpi = cbpi self.cbpi = cbpi
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.configuration = False
self.datalogger = {} self.datalogger = {}
# self.cbpi.config.get does not seem to work here...
# self.influxdbaddr="192.168.163.105"
# self.influxdbport="8086"
# self.influxdbname="cbpi4"
# self.influxdburl='http://' + self.influxdbaddr + ':' + str(self.influxdbport) + '/write?db=' + self.influxdbname
# self.influxdbuser=""
# self.influxdbpwd=""
# self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode())
def log_data(self, name: str, value: str) -> None: def log_data(self, name: str, value: str) -> None:
self.logfiles = self.cbpi.config.get("CSVLOGFILES", "Yes")
self.influxdb = self.cbpi.config.get("INFLUXDB", "No")
if self.logfiles == "Yes":
if name not in self.datalogger:
max_bytes = self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 1048576)
backup_count = self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3)
data_logger = logging.getLogger('cbpi.sensor.%s' % name)
data_logger.propagate = False
data_logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler('./logs/sensor_%s.log' % name, maxBytes=max_bytes, backupCount=backup_count)
data_logger.addHandler(handler)
self.datalogger[name] = data_logger
if name not in self.datalogger: formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())
max_bytes = self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", 1048576) self.datalogger[name].info("%s,%s" % (formatted_time, value))
backup_count = self.cbpi.config.get("SENSOR_LOG_BACKUP_COUNT", 3)
if self.influxdb == "Yes":
self.influxdb = self.cbpi.config.get("INFLUXDB", "No")
self.influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None)
self.influxdbport = self.cbpi.config.get("INFLUXDBPORT", None)
self.influxdbname = self.cbpi.config.get("INFLUXDBNAME", None)
self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None)
self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None)
self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode())
self.influxdburl='http://' + self.influxdbaddr + ':' + str(self.influxdbport) + '/write?db=' + self.influxdbname
try:
sensor=self.cbpi.sensor.find_by_id(name)
sensorname=sensor.name.replace(" ", "_")
out="measurement,source=" + sensorname + "___" + name + " value="+str(value)
header = {'User-Agent': name, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')}
http = urllib3.PoolManager()
req = http.request('POST',self.influxdburl, body=out, headers = header)
except Exception as e:
logging.error("InfluxDB write Error: {}".format(e))
data_logger = logging.getLogger('cbpi.sensor.%s' % name)
data_logger.propagate = False
data_logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler('./logs/sensor_%s.log' % name, maxBytes=max_bytes, backupCount=backup_count)
data_logger.addHandler(handler)
self.datalogger[name] = data_logger
formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())
self.datalogger[name].info("%s,%s" % (formatted_time, value))
async def get_data(self, names, sample_rate='60s'): async def get_data(self, names, sample_rate='60s'):
logging.info("Start Log for {}".format(names))
''' '''
:param names: name as string or list of names as string :param names: name as string or list of names as string
:param sample_rate: rate for resampling the data :param sample_rate: rate for resampling the data
@ -70,11 +111,11 @@ class LogController:
# concat all logs # concat all logs
df = pd.concat([pd.read_csv(f, parse_dates=True, date_parser=dateparse, index_col='DateTime', names=['DateTime', name], header=None) for f in all_filenames]) df = pd.concat([pd.read_csv(f, parse_dates=True, date_parser=dateparse, index_col='DateTime', names=['DateTime', name], header=None) for f in all_filenames])
logging.info("Read all files for {}".format(names))
# resample if rate provided # resample if rate provided
if sample_rate is not None: if sample_rate is not None:
df = df[name].resample(sample_rate).max() df = df[name].resample(sample_rate).max()
logging.info("Sampled now for {}".format(names))
df = df.dropna() df = df.dropna()
if result is None: if result is None:
result = df result = df
@ -88,7 +129,7 @@ class LogController:
data[name] = result[name].interpolate(limit_direction='both', limit=10).tolist() data[name] = result[name].interpolate(limit_direction='both', limit=10).tolist()
else: else:
data[name] = result.interpolate().tolist() data[name] = result.interpolate().tolist()
logging.info("Send Log for {}".format(names))
return data return data
async def get_data2(self, ids) -> dict: async def get_data2(self, ids) -> dict:

View file

@ -35,6 +35,15 @@ class ConfigUpdate(CBPiExtension):
cooldown_step = self.cbpi.config.get("steps_cooldown", None) cooldown_step = self.cbpi.config.get("steps_cooldown", None)
max_dashboard_number = self.cbpi.config.get("max_dashboard_number", None) max_dashboard_number = self.cbpi.config.get("max_dashboard_number", None)
current_dashboard_number = self.cbpi.config.get("current_dashboard_number", None) current_dashboard_number = self.cbpi.config.get("current_dashboard_number", None)
logfiles = self.cbpi.config.get("CSVLOGFILES", None)
influxdb = self.cbpi.config.get("INFLUXDB", None)
influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None)
influxdbport = self.cbpi.config.get("INFLUXDBPORT", None)
influxdbname = self.cbpi.config.get("INFLUXDBNAME", None)
influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None)
influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None)
if boil_temp is None: if boil_temp is None:
logger.info("INIT Boil Temp Setting") logger.info("INIT Boil Temp Setting")
@ -174,6 +183,65 @@ class ConfigUpdate(CBPiExtension):
except: except:
logger.warning('Unable to update config') logger.warning('Unable to update config')
## Check if CSV logfiles is on config
if logfiles is None:
logger.info("INIT CSV logfiles")
try:
await self.cbpi.config.add("CSVLOGFILES", "Yes", ConfigType.SELECT, "Write sensor data to csv logfiles",
[{"label": "Yes", "value": "Yes"},
{"label": "No", "value": "No"}])
except:
logger.warning('Unable to update config')
## Check if CSV logfiles is on config
if influxdb is None:
logger.info("INIT Influxdb")
try:
await self.cbpi.config.add("INFLUXDB", "No", ConfigType.SELECT, "Write sensor data to influxdb",
[{"label": "Yes", "value": "Yes"},
{"label": "No", "value": "No"}])
except:
logger.warning('Unable to update config')
## Check if influxdbaddr is in config
if influxdbaddr is None:
logger.info("INIT Influxdbaddr")
try:
await self.cbpi.config.add("INFLUXDBADDR", "localhost", ConfigType.STRING, "IP Address of your influxdb server")
except:
logger.warning('Unable to update config')
## Check if influxdbport is in config
if influxdbport is None:
logger.info("INIT Influxdbport")
try:
await self.cbpi.config.add("INFLUXDBPORT", "8086", ConfigType.STRING, "Port of your influxdb server")
except:
logger.warning('Unable to update config')
## Check if influxdbname is in config
if influxdbname is None:
logger.info("INIT Influxdbname")
try:
await self.cbpi.config.add("INFLUXDBNAME", "cbpi4", ConfigType.STRING, "Name of your influxdb database name")
except:
logger.warning('Unable to update config')
## Check if influxdber is in config
if influxdbuser is None:
logger.info("INIT Influxdbuser")
try:
await self.cbpi.config.add("INFLUXDBUSER", " ", ConfigType.STRING, "User name for your influxdb database (only if required)")
except:
logger.warning('Unable to update config')
## Check if influxdber is in config
if influxdbpwd is None:
logger.info("INIT Influxdbpwd")
try:
await self.cbpi.config.add("INFLUXDBPWD", " ", ConfigType.STRING, "Password for your influxdb database (only if required)")
except:
logger.warning('Unable to update config')
def setup(cbpi): def setup(cbpi):
cbpi.plugin.register("ConfigUpdate", ConfigUpdate) cbpi.plugin.register("ConfigUpdate", ConfigUpdate)