test: add wait time for influxdb if no connection can be established after several retries -> currently, there seems to be interference with mqtt.

This commit is contained in:
avollkopf 2023-06-29 19:16:27 +02:00
parent 355376a1ba
commit a301276725
2 changed files with 44 additions and 24 deletions

View file

@ -1,3 +1,3 @@
__version__ = "4.1.11.a1" __version__ = "4.1.11.a2"
__codename__ = "Groundhog Day" __codename__ = "Groundhog Day"

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
from urllib3 import Timeout, PoolManager from urllib3 import Timeout, PoolManager, Retry
import logging import logging
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import asyncio import asyncio
@ -21,6 +21,9 @@ class SensorLogTargetInfluxDB(CBPiExtension):
if self.influxdb == "No": if self.influxdb == "No":
return # never run() return # never run()
self._task = asyncio.create_task(self.run()) # one time run() only self._task = asyncio.create_task(self.run()) # one time run() only
self.counter = 0
self.max_retries = 2
self.send=True
async def run(self): # called by __init__ once on start if influx is enabled async def run(self): # called by __init__ once on start if influx is enabled
@ -40,7 +43,7 @@ class SensorLogTargetInfluxDB(CBPiExtension):
self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None) self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None)
self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None) self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None)
self.influxdbmeasurement = self.cbpi.config.get("INFLUXDBMEASUREMENT", "measurement") self.influxdbmeasurement = self.cbpi.config.get("INFLUXDBMEASUREMENT", "measurement")
timeout = Timeout(connect=5.0, read=None) timeout = Timeout(connect=2.0, read=None)
try: try:
sensor=self.cbpi.sensor.find_by_id(id) sensor=self.cbpi.sensor.find_by_id(id)
if sensor is not None: if sensor is not None:
@ -49,28 +52,45 @@ class SensorLogTargetInfluxDB(CBPiExtension):
except Exception as e: except Exception as e:
logging.error("InfluxDB ID Error: {}".format(e)) logging.error("InfluxDB ID Error: {}".format(e))
if self.influxdbcloud == "Yes": if self.influxdbcloud == "Yes" and self.send == True:
if self.counter <= self.max_retries:
self.influxdburl=self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s" self.influxdburl=self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s"
try: try:
header = {'User-Agent': id, 'Authorization': "Token {}".format(self.influxdbpwd)} header = {'User-Agent': id, 'Authorization': "Token {}".format(self.influxdbpwd)}
http = PoolManager(timeout=timeout) http = PoolManager(timeout=timeout)
req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) req = http.request('POST',self.influxdburl, body=out.encode(), headers = header, retries=Retry(2))
if req.status != 204: if req.status != 204:
raise Exception(f'InfluxDB Status code {req.status}') raise Exception(f'InfluxDB Status code {req.status}')
except Exception as e: except Exception as e:
logging.error("InfluxDB cloud write Error: {}".format(e)) self.counter += 1
logging.error("InfluxDB cloud write Error #{}: {}".format(self.counter, e))
else: else:
logging.warning("Waiting 3 Minutes before connecting to INFLUXDB again")
self.send=False
await asyncio.sleep(180)
self.counter = 0
self.send = True
elif self.influxdbcloud == "No" and self.send == True:
if self.counter <= self.max_retries:
self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode()) self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode())
self.influxdburl= self.influxdbaddr + '/write?db=' + self.influxdbname self.influxdburl= self.influxdbaddr + '/write?db=' + self.influxdbname
try: try:
header = {'User-Agent': id, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')} header = {'User-Agent': id, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')}
http = PoolManager(timeout=timeout) http = PoolManager(timeout=timeout)
req = http.request('POST',self.influxdburl, body=out.encode(), headers = header) req = http.request('POST',self.influxdburl, body=out.encode(), headers = header, retries=Retry(2))
if req.status != 204: if req.status != 204:
raise Exception(f'InfluxDB Status code {req.status}') raise Exception(f'InfluxDB Status code {req.status}')
except Exception as e: except Exception as e:
logging.error("InfluxDB write Error: {}".format(e)) self.counter += 1
logging.error("InfluxDB write Error #{}: {}".format(self.counter, e))
else:
logging.warning("Waiting 3 Minutes before connecting to INFLUXDB again")
self.send=False
await asyncio.sleep(180)
self.counter = 0
self.send = True
def setup(cbpi): def setup(cbpi):
cbpi.plugin.register("SensorLogTargetInfluxDB", SensorLogTargetInfluxDB) cbpi.plugin.register("SensorLogTargetInfluxDB", SensorLogTargetInfluxDB)