mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-12-28 00:11:45 +01:00
123 lines
3.7 KiB
Python
123 lines
3.7 KiB
Python
import asyncio
|
|
import logging
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from cbpi.api import *
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
except Exception:
|
|
logger.error("Failed to load RPi.GPIO. Using Mock")
|
|
MockRPi = MagicMock()
|
|
modules = {
|
|
"RPi": MockRPi,
|
|
"RPi.GPIO": MockRPi.GPIO
|
|
}
|
|
patcher = patch.dict("sys.modules", modules)
|
|
patcher.start()
|
|
import RPi.GPIO as GPIO
|
|
|
|
mode = GPIO.getmode()
|
|
if (mode == None):
|
|
GPIO.setmode(GPIO.BCM)
|
|
|
|
@parameters([Property.Select(label="GPIO", options=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27]), Property.Select(label="Inverted", options=["Yes", "No"],description="No: Active on high; Yes: Active on low")])
|
|
class GPIOActor(CBPiActor):
|
|
|
|
@action(key="Cusotm Action", parameters=[Property.Number("Value", configurable=True), Property.Kettle("Kettle")])
|
|
async def custom_action(self, **kwargs):
|
|
print("ACTION", kwargs)
|
|
|
|
|
|
@action(key="Cusotm Action2", parameters=[Property.Number("Value", configurable=True)])
|
|
async def custom_action2(self, **kwargs):
|
|
print("ACTION2")
|
|
|
|
|
|
def get_GPIO_state(self, state):
|
|
# ON
|
|
if state == 1:
|
|
return 1 if self.inverted == False else 0
|
|
# OFF
|
|
if state == 0:
|
|
return 0 if self.inverted == False else 1
|
|
|
|
async def on_start(self):
|
|
self.gpio = self.props.GPIO
|
|
self.inverted = True if self.props.get("Inverted", "No") == "Yes" else False
|
|
GPIO.setup(self.gpio, GPIO.OUT)
|
|
GPIO.output(self.gpio, self.get_GPIO_state(0))
|
|
self.state = False
|
|
|
|
async def on(self, power=0):
|
|
logger.info("ACTOR %s ON - GPIO %s " % (self.id, self.gpio))
|
|
GPIO.output(self.gpio, self.get_GPIO_state(1))
|
|
self.state = True
|
|
|
|
async def off(self):
|
|
logger.info("ACTOR %s OFF - GPIO %s " % (self.id, self.gpio))
|
|
GPIO.output(self.gpio, self.get_GPIO_state(0))
|
|
self.state = False
|
|
|
|
def get_state(self):
|
|
return self.state
|
|
|
|
async def run(self):
|
|
while self.running == True:
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
@parameters([Property.Select(label="GPIO", options=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27]), Property.Number("Frequency", configurable=True)])
|
|
class GPIOPWMActor(CBPiActor):
|
|
|
|
# Custom property which can be configured by the user
|
|
@action("test", parameters={})
|
|
async def power(self, **kwargs):
|
|
self.p.ChangeDutyCycle(1)
|
|
|
|
async def start(self):
|
|
await super().start()
|
|
self.gpio = self.props.get("GPIO")
|
|
self.frequency = self.props.get("Frequency")
|
|
GPIO.setup(self.gpio, GPIO.OUT)
|
|
GPIO.output(self.gpio, 0)
|
|
self.state = False
|
|
pass
|
|
|
|
async def on(self, power=0):
|
|
logger.info("PWM ACTOR %s ON - GPIO %s " % (self.id, self.gpio))
|
|
try:
|
|
self.p = GPIO.PWM(int(self.gpio), float(self.frequency))
|
|
self.p.start(1)
|
|
except:
|
|
pass
|
|
self.state = True
|
|
|
|
async def off(self):
|
|
logger.info("PWM ACTOR %s OFF - GPIO %s " % (self.id, self.gpio))
|
|
self.p.stop()
|
|
self.state = False
|
|
|
|
def get_state(self):
|
|
return self.state
|
|
|
|
async def run(self):
|
|
while self.running == True:
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
def setup(cbpi):
|
|
|
|
'''
|
|
This method is called by the server during startup
|
|
Here you need to register your plugins at the server
|
|
|
|
:param cbpi: the cbpi core
|
|
:return:
|
|
'''
|
|
|
|
cbpi.plugin.register("GPIOActor", GPIOActor)
|
|
cbpi.plugin.register("GPIOPWMActor", GPIOPWMActor)
|