craftbeerpi4-pione/cbpi/extension/httpsensor/__init__.py

117 lines
2.7 KiB
Python
Raw Normal View History

2019-08-16 21:36:55 +02:00
# -*- coding: utf-8 -*-
import asyncio
from aiohttp import web
from cbpi.api import *
import re
import random
cache = {}
class HTTPSensor(CBPiSensor):
# Custom Properties which will can be configured by the user
key = Property.Text(label="Key", configurable=True)
def init(self):
super().init()
self.state = True
def get_state(self):
return self.state
def get_value(self):
return self.value
def stop(self):
pass
async def run(self, cbpi):
self.value = 0
while True:
await asyncio.sleep(1)
try:
value = cache.pop(self.key, None)
if value is not None:
self.log_data(value)
await cbpi.bus.fire("sensor/%s/data" % self.id, value=value)
except Exception as e:
print(e)
pass
class HTTPSensorEndpoint(CBPiExtension):
def __init__(self, cbpi):
'''
Initializer
:param cbpi:
'''
self.pattern_check = re.compile("^[a-zA-Z0-9,.]{0,10}$")
self.cbpi = cbpi
# register component for http, events
# In addtion the sub folder static is exposed to access static content via http
self.cbpi.register(self, "/httpsensor")
@request_mapping(path="/{key}/{value}", auth_required=False)
async def http_new_value2(self, request):
"""
---
description: Kettle Heater on
tags:
- HttpSensor
parameters:
- name: "key"
in: "path"
description: "Sensor Key"
required: true
type: "string"
- name: "value"
in: "path"
description: "Value"
required: true
type: "integer"
format: "int64"
responses:
"204":
description: successful operation
"""
global cache
key = request.match_info['key']
value = request.match_info['value']
if self.pattern_check.match(key) is None:
return web.json_response(status=422, data={'error': "Key not matching pattern ^[a-zA-Z0-9,.]{0,10}$"})
if self.pattern_check.match(value) is None:
return web.json_response(status=422, data={'error': "Data not matching pattern ^[a-zA-Z0-9,.]{0,10}$"})
print("HTTP SENSOR ", key, value)
cache[key] = value
return web.Response(status=204)
def setup(cbpi):
'''
This method is called by the server during startup
Here you need to register your plugins at the server
:param cbpi: the cbpi core
:return:
'''
cbpi.plugin.register("HTTPSensor", HTTPSensor)
cbpi.plugin.register("HTTPSensorEndpoint", HTTPSensorEndpoint)