mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2025-01-08 22:01:48 +01:00
f924e1a683
because cbpi already writes timestamps into the logs and we dont need two timestamps when exporting. This commit also reverts to using the same formatter for all logHandlers.
304 lines
12 KiB
Python
304 lines
12 KiB
Python
import logging
|
|
import os
|
|
import shutil
|
|
import psutil
|
|
import pathlib
|
|
import json
|
|
import aiohttp
|
|
from voluptuous.schema_builder import message
|
|
from cbpi.api.dataclasses import NotificationAction, NotificationType
|
|
from cbpi.api.base import CBPiBase
|
|
from cbpi.api.config import ConfigType
|
|
from cbpi.api import *
|
|
import zipfile
|
|
import socket
|
|
|
|
class SystemController:
|
|
|
|
def __init__(self, cbpi):
|
|
self.cbpi = cbpi
|
|
self.service = cbpi.actor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.cbpi.app.on_startup.append(self.check_for_update)
|
|
|
|
|
|
async def check_for_update(self, app):
|
|
pass
|
|
|
|
async def restart(self):
|
|
logging.info("RESTART")
|
|
os.system('systemctl reboot')
|
|
pass
|
|
|
|
async def shutdown(self):
|
|
logging.info("SHUTDOWN")
|
|
os.system('systemctl poweroff')
|
|
pass
|
|
|
|
async def backupConfig(self):
|
|
output_filename = "cbpi4_config"
|
|
dir_name = pathlib.Path(self.cbpi.config_folder.get_file_path(''))
|
|
shutil.make_archive(output_filename, 'zip', dir_name)
|
|
|
|
async def downloadlog(self, logtime):
|
|
filename = "cbpi4.log"
|
|
fullname = pathlib.Path(os.path.join(".",filename))
|
|
pluginname = "cbpi4_plugins.txt"
|
|
fullpluginname = pathlib.Path(os.path.join(".",pluginname))
|
|
actorname = "cbpi4_actors.txt"
|
|
fullactorname = pathlib.Path(os.path.join(".",actorname))
|
|
sensorname = "cbpi4_sensors.txt"
|
|
fullsensorname = pathlib.Path(os.path.join(".",sensorname))
|
|
kettlename = "cbpi4_kettles.txt"
|
|
fullkettlename = pathlib.Path(os.path.join(".",kettlename))
|
|
|
|
output_filename="cbpi4_log.zip"
|
|
|
|
if logtime == "b":
|
|
os.system('journalctl -b -u craftbeerpi.service --output cat > {}'.format(fullname))
|
|
else:
|
|
os.system('journalctl --since \"{} hours ago\" -u craftbeerpi.service --output cat > {}'.format(logtime, fullname))
|
|
|
|
os.system('cbpi plugins > {}'.format(fullpluginname))
|
|
|
|
try:
|
|
actors = self.cbpi.actor.get_state()
|
|
json.dump(actors['data'],open(fullactorname,'w'),indent=4, sort_keys=True)
|
|
sensors = self.cbpi.sensor.get_state()
|
|
json.dump(sensors['data'],open(fullsensorname,'w'),indent=4, sort_keys=True)
|
|
kettles = self.cbpi.kettle.get_state()
|
|
json.dump(kettles['data'],open(fullkettlename,'w'),indent=4, sort_keys=True)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
self.cbpi.notify("Error", "Creation of files failed: {}".format(e), NotificationType.ERROR)
|
|
|
|
try:
|
|
zipObj=zipfile.ZipFile(output_filename , 'w', zipfile.ZIP_DEFLATED)
|
|
zipObj.write(fullname)
|
|
zipObj.write(fullpluginname)
|
|
zipObj.write(fullactorname)
|
|
zipObj.write(fullsensorname)
|
|
zipObj.write(fullkettlename)
|
|
zipObj.close()
|
|
except Exception as e:
|
|
logging.info(e)
|
|
self.cbpi.notify("Error", "Zip creation failed: {}".format(e), NotificationType.ERROR)
|
|
|
|
try:
|
|
os.remove(fullname)
|
|
os.remove(fullpluginname)
|
|
os.remove(fullactorname)
|
|
os.remove(fullsensorname)
|
|
os.remove(fullkettlename)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
self.cbpi.notify("Error", "Removal of original files failed: {}".format(e), NotificationType.ERROR)
|
|
|
|
|
|
def allowed_file(self, filename, extension):
|
|
return '.' in filename and filename.rsplit('.', 1)[1] in set([extension])
|
|
|
|
def recursive_chown(self, path, owner, group):
|
|
for dirpath, dirnames, filenames in os.walk(path):
|
|
shutil.chown(dirpath, owner, group)
|
|
for filename in filenames:
|
|
shutil.chown(os.path.join(dirpath, filename), owner, group)
|
|
|
|
async def restoreConfig(self, data):
|
|
fileData = data['File']
|
|
filename = fileData.filename
|
|
backup_file = fileData.file
|
|
content_type = fileData.content_type
|
|
required_content=['dashboard/', 'recipes/', 'upload/', 'config.json', 'config.yaml']
|
|
|
|
if content_type == 'application/x-zip-compressed':
|
|
try:
|
|
content = backup_file.read()
|
|
if backup_file and self.allowed_file(filename, 'zip'):
|
|
self.path = os.path.join(self.cbpi.config_folder.configFolderPath, "restored_config.zip")
|
|
|
|
f=open(self.path, "wb")
|
|
f.write(content)
|
|
f.close()
|
|
zip=zipfile.ZipFile(self.path)
|
|
zip_content_list = zip.namelist()
|
|
zip_content = True
|
|
for content in required_content:
|
|
try:
|
|
check = zip_content_list.index(content)
|
|
except:
|
|
zip_content = False
|
|
if zip_content == True:
|
|
self.cbpi.notify("Success", "Config backup has been uploaded", NotificationType.SUCCESS)
|
|
self.cbpi.notify("Action Required!", "Please restart the server", NotificationType.WARNING)
|
|
else:
|
|
self.cbpi.notify("Error", "Wrong content type. Upload failed", NotificationType.ERROR)
|
|
os.remove(self.path)
|
|
except:
|
|
self.cbpi.notify("Error", "Config backup upload failed", NotificationType.ERROR)
|
|
pass
|
|
else:
|
|
self.cbpi.notify("Error", "Wrong content type. Upload failed", NotificationType.ERROR)
|
|
|
|
async def uploadSVG(self, data):
|
|
fileData = data['File']
|
|
filename = fileData.filename
|
|
svg_file = fileData.file
|
|
content_type = fileData.content_type
|
|
|
|
logging.info(content_type)
|
|
|
|
if content_type == 'image/svg+xml':
|
|
try:
|
|
content = svg_file.read().decode('utf-8','replace')
|
|
if svg_file and self.allowed_file(filename, 'svg'):
|
|
self.path = os.path.join(self.cbpi.config_folder.get_file_path("dashboard"),"widgets", filename)
|
|
logging.info(self.path)
|
|
|
|
f=open(self.path, "w")
|
|
f.write(content)
|
|
f.close()
|
|
self.cbpi.notify("Success", "SVG file ({}) has been uploaded.".format(filename), NotificationType.SUCCESS)
|
|
except:
|
|
self.cbpi.notify("Error", "SVG upload failed", NotificationType.ERROR)
|
|
pass
|
|
else:
|
|
self.cbpi.notify("Error", "Wrong content type. Upload failed", NotificationType.ERROR)
|
|
|
|
async def systeminfo(self):
|
|
logging.info("SYSTEMINFO")
|
|
system = ""
|
|
temp = 0
|
|
cpuload = 0
|
|
cpucount = 0
|
|
cpufreq = 0
|
|
totalmem = 0
|
|
availmem = 0
|
|
mempercent = 0
|
|
eth0IP = "N/A"
|
|
wlan0IP = "N/A"
|
|
eth0speed = "N/A"
|
|
wlan0speed = "N/A"
|
|
|
|
TEMP_UNIT=self.cbpi.config.get("TEMP_UNIT", "C")
|
|
FAHRENHEIT = False if TEMP_UNIT == "C" else True
|
|
|
|
af_map = { socket.AF_INET: 'IPv4',
|
|
socket.AF_INET6: 'IPv6',
|
|
}
|
|
|
|
try:
|
|
if psutil.LINUX == True:
|
|
system = "Linux"
|
|
elif psutil.WINDOWS == True:
|
|
system = "Windows"
|
|
elif psutil.MACOS == True:
|
|
system = "MacOS"
|
|
cpuload = round(psutil.cpu_percent(interval=None),1)
|
|
cpucount = psutil.cpu_count(logical=False)
|
|
cpufreq = psutil.cpu_freq()
|
|
mem = psutil.virtual_memory()
|
|
availmem = round((int(mem.available) / (1024*1024)),1)
|
|
mempercent = round(float(mem.percent),1)
|
|
totalmem = round((int(mem.total) / (1024*1024)),1)
|
|
if system == "Linux":
|
|
try:
|
|
temps = psutil.sensors_temperatures(fahrenheit=FAHRENHEIT)
|
|
for name, entries in temps.items():
|
|
for entry in entries:
|
|
if name == "cpu_thermal":
|
|
temp = round(float(entry.current),1)
|
|
except:
|
|
pass
|
|
else:
|
|
temp = "N/A"
|
|
if system == "Linux":
|
|
try:
|
|
ethernet = psutil.net_if_addrs()
|
|
for nic, addrs in ethernet.items():
|
|
if nic == "eth0":
|
|
for addr in addrs:
|
|
if str(addr.family) == "AddressFamily.AF_INET":
|
|
if addr.address:
|
|
eth0IP = addr.address
|
|
if nic == "wlan0":
|
|
for addr in addrs:
|
|
if str(addr.family) == "AddressFamily.AF_INET":
|
|
if addr.address:
|
|
wlan0IP = addr.address
|
|
info = psutil.net_if_stats()
|
|
try:
|
|
for nic in info:
|
|
if nic == 'eth0':
|
|
if info[nic].isup == True:
|
|
if info[nic].speed:
|
|
eth0speed = info[nic].speed
|
|
else:
|
|
eth0speed = "down"
|
|
if nic == 'wlan0':
|
|
if info[nic].isup == True:
|
|
ratestring = os.popen('iwlist wlan0 rate | grep Rate').read()
|
|
start = ratestring.find("=") + 1
|
|
end = ratestring.find(" Mb/s")
|
|
wlan0speed = ratestring[start:end]
|
|
else:
|
|
wlan0speed = "down"
|
|
except Exception as e:
|
|
logging.info(e)
|
|
except:
|
|
pass
|
|
|
|
if system == "Windows":
|
|
try:
|
|
ethernet = psutil.net_if_addrs()
|
|
for nic, addrs in ethernet.items():
|
|
if nic == "Ethernet":
|
|
for addr in addrs:
|
|
if str(addr.family) == "AddressFamily.AF_INET":
|
|
if addr.address:
|
|
eth0IP = addr.address
|
|
if nic == "WLAN":
|
|
for addr in addrs:
|
|
if str(addr.family) == "AddressFamily.AF_INET":
|
|
if addr.address:
|
|
wlan0IP = addr.address
|
|
info = psutil.net_if_stats()
|
|
try:
|
|
for nic in info:
|
|
if nic == 'Ethernet':
|
|
if info[nic].isup == True:
|
|
if info[nic].speed:
|
|
eth0speed = info[nic].speed
|
|
else:
|
|
eth0speed = "down"
|
|
if nic == 'WLAN':
|
|
if info[nic].isup == True:
|
|
if info[nic].speed:
|
|
wlan0speed = info[nic].speed
|
|
else:
|
|
wlan0speed = "down"
|
|
except Exception as e:
|
|
logging.info(e)
|
|
except:
|
|
pass
|
|
|
|
except:
|
|
pass
|
|
|
|
systeminfo = {'system': system,
|
|
'cpuload': cpuload,
|
|
'cpucount': cpucount,
|
|
'cpufreq': cpufreq.current,
|
|
'totalmem': totalmem,
|
|
'availmem': availmem,
|
|
'mempercent': mempercent,
|
|
'temp': temp,
|
|
'temp_unit': TEMP_UNIT,
|
|
'eth0': eth0IP,
|
|
'wlan0': wlan0IP,
|
|
'eth0speed': eth0speed,
|
|
'wlan0speed': wlan0speed}
|
|
return systeminfo
|
|
|
|
|