mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-12-23 14:04:58 +01:00
9caca9074a
If sensor is set to active, GPIO pin should be high under normal conditions. If inverted, GPIO pin should be low when sensor is active. This logic is aligned with the old craftbeerpi3 GPIOSystem plugin
110 lines
3.1 KiB
Python
110 lines
3.1 KiB
Python
import logging
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from cbpi.api import *
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
except Exception:
|
|
logger.error("Failed to load RPi.GPIO. Using Mock")
|
|
MockRPi = MagicMock()
|
|
modules = {
|
|
"RPi": MockRPi,
|
|
"RPi.GPIO": MockRPi.GPIO
|
|
}
|
|
patcher = patch.dict("sys.modules", modules)
|
|
patcher.start()
|
|
import RPi.GPIO as GPIO
|
|
|
|
|
|
@parameters([Property.Select(label="GPIO", options=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27]), Property.Select(label="Inverted", options=["Yes", "No"],description="No: Active on high; Yes: Active on low")])
|
|
class GPIOActor(CBPiActor):
|
|
|
|
|
|
def get_GPIO_state(self, state):
|
|
# ON
|
|
if state == 1:
|
|
return 1 if self.inverted == False else 0
|
|
# OFF
|
|
if state == 0:
|
|
return 0 if self.inverted == False else 1
|
|
|
|
async def start(self):
|
|
await super().start()
|
|
self.gpio = self.props.get("GPIO")
|
|
self.inverted = True if self.props.get("Inverted", "No") == "Yes" else False
|
|
GPIO.setup(self.gpio, GPIO.OUT)
|
|
GPIO.output(self.gpio, self.get_GPIO_state(0))
|
|
self.state = False
|
|
pass
|
|
|
|
async def on(self, power=0):
|
|
logger.info("ACTOR %s ON - GPIO %s " % (self.id, self.gpio))
|
|
GPIO.output(self.gpio, self.get_GPIO_state(1))
|
|
self.state = True
|
|
|
|
async def off(self):
|
|
logger.info("ACTOR %s OFF - GPIO %s " % (self.id, self.gpio))
|
|
GPIO.output(self.gpio, self.get_GPIO_state(0))
|
|
self.state = False
|
|
|
|
def get_state(self):
|
|
|
|
return self.state
|
|
|
|
async def run(self):
|
|
pass
|
|
|
|
@parameters([Property.Select(label="GPIO", options=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27]), Property.Number("Frequency", configurable=True)])
|
|
class GPIOPWMActor(CBPiActor):
|
|
|
|
# Custom property which can be configured by the user
|
|
@action("test", parameters={})
|
|
async def power(self, **kwargs):
|
|
self.p.ChangeDutyCycle(1)
|
|
|
|
|
|
async def start(self):
|
|
await super().start()
|
|
self.gpio = self.props.get("GPIO")
|
|
self.frequency = self.props.get("Frequency")
|
|
GPIO.setup(self.gpio, GPIO.OUT)
|
|
GPIO.output(self.gpio, 0)
|
|
self.state = False
|
|
pass
|
|
|
|
async def on(self, power=0):
|
|
logger.info("PWM ACTOR %s ON - GPIO %s " % (self.id, self.gpio))
|
|
try:
|
|
self.p = GPIO.PWM(int(self.gpio), float(self.frequency))
|
|
self.p.start(1)
|
|
except:
|
|
pass
|
|
self.state = True
|
|
|
|
async def off(self):
|
|
logger.info("PWM ACTOR %s OFF - GPIO %s " % (self.id, self.gpio))
|
|
self.p.stop()
|
|
self.state = False
|
|
|
|
def get_state(self):
|
|
return self.state
|
|
|
|
async def run(self):
|
|
pass
|
|
|
|
def setup(cbpi):
|
|
|
|
'''
|
|
This method is called by the server during startup
|
|
Here you need to register your plugins at the server
|
|
|
|
:param cbpi: the cbpi core
|
|
:return:
|
|
'''
|
|
|
|
cbpi.plugin.register("GPIOActor", GPIOActor)
|
|
cbpi.plugin.register("GPIOPWMActor", GPIOPWMActor)
|