2021-02-16 20:37:51 +01:00
import asyncio
2021-02-02 00:07:58 +01:00
import logging
from unittest . mock import MagicMock , patch
from cbpi . api import *
logger = logging . getLogger ( __name__ )
try :
import RPi . GPIO as GPIO
except Exception :
logger . error ( " Failed to load RPi.GPIO. Using Mock " )
MockRPi = MagicMock ( )
modules = {
" RPi " : MockRPi ,
" RPi.GPIO " : MockRPi . GPIO
}
patcher = patch . dict ( " sys.modules " , modules )
patcher . start ( )
import RPi . GPIO as GPIO
2021-02-09 18:48:45 +01:00
mode = GPIO . getmode ( )
if ( mode == None ) :
GPIO . setmode ( GPIO . BCM )
2021-02-02 00:07:58 +01:00
2021-08-10 13:01:45 +02:00
@parameters ( [ Property . Select ( label = " GPIO " , options = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 ] ) , Property . Select ( label = " Inverted " , options = [ " Yes " , " No " ] , description = " No: Active on high; Yes: Active on low " ) ] )
2021-02-02 00:07:58 +01:00
class GPIOActor ( CBPiActor ) :
2021-11-08 19:01:53 +01:00
# Custom property which can be configured by the user
@action ( " Set Power " , parameters = [ Property . Number ( label = " Power " , configurable = True , description = " Power Setting [0-100] " ) ] )
async def setpower ( self , Power = 100 , * * kwargs ) :
self . power = int ( Power )
if self . power < 0 :
self . power = 0
if self . power > 100 :
self . power = 100
await self . set_power ( self . power )
2021-02-27 20:09:19 +01:00
2021-02-02 00:07:58 +01:00
def get_GPIO_state ( self , state ) :
# ON
if state == 1 :
2021-02-07 12:07:48 +01:00
return 1 if self . inverted == False else 0
2021-02-02 00:07:58 +01:00
# OFF
if state == 0 :
2021-02-07 12:07:48 +01:00
return 0 if self . inverted == False else 1
2021-02-02 00:07:58 +01:00
2021-02-16 20:37:51 +01:00
async def on_start ( self ) :
2021-11-08 19:01:53 +01:00
self . sampleTime = 5
2021-11-06 15:15:11 +01:00
self . power = 100
2021-02-16 20:37:51 +01:00
self . gpio = self . props . GPIO
2021-02-02 00:07:58 +01:00
self . inverted = True if self . props . get ( " Inverted " , " No " ) == " Yes " else False
GPIO . setup ( self . gpio , GPIO . OUT )
GPIO . output ( self . gpio , self . get_GPIO_state ( 0 ) )
self . state = False
2021-11-05 19:32:34 +01:00
async def on ( self , power = None ) :
2021-11-08 19:01:53 +01:00
if power is not None :
self . power = power
2021-02-02 00:07:58 +01:00
logger . info ( " ACTOR %s ON - GPIO %s " % ( self . id , self . gpio ) )
GPIO . output ( self . gpio , self . get_GPIO_state ( 1 ) )
self . state = True
async def off ( self ) :
logger . info ( " ACTOR %s OFF - GPIO %s " % ( self . id , self . gpio ) )
GPIO . output ( self . gpio , self . get_GPIO_state ( 0 ) )
self . state = False
def get_state ( self ) :
return self . state
async def run ( self ) :
2021-03-08 01:34:13 +01:00
while self . running == True :
2021-11-08 19:01:53 +01:00
if self . state == True :
heating_time = self . sampleTime * ( self . power / 100 )
wait_time = self . sampleTime - heating_time
if heating_time > 0 :
#logging.info("Heating Time: {}".format(heating_time))
GPIO . output ( self . gpio , self . get_GPIO_state ( 1 ) )
await asyncio . sleep ( heating_time )
if wait_time > 0 :
#logging.info("Wait Time: {}".format(wait_time))
GPIO . output ( self . gpio , self . get_GPIO_state ( 0 ) )
await asyncio . sleep ( wait_time )
else :
await asyncio . sleep ( 1 )
async def set_power ( self , power ) :
self . power = power
await self . cbpi . actor . set_power ( self . id , power )
pass
2021-02-16 20:37:51 +01:00
2021-02-02 00:07:58 +01:00
2021-11-05 19:32:34 +01:00
@parameters ( [ Property . Select ( label = " GPIO " , options = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 ] ) , Property . Number ( label = " Frequency " , configurable = True ) ] )
2021-02-02 00:07:58 +01:00
class GPIOPWMActor ( CBPiActor ) :
# Custom property which can be configured by the user
2021-11-05 19:32:34 +01:00
@action ( " Set Power " , parameters = [ Property . Number ( label = " Power " , configurable = True , description = " Power Setting [0-100] " ) ] )
async def setpower ( self , Power = 100 , * * kwargs ) :
logging . info ( Power )
self . power = int ( Power )
if self . power < 0 :
self . power = 0
if self . power > 100 :
2021-11-06 15:15:11 +01:00
self . power = 100
await self . set_power ( self . power )
2021-11-05 19:32:34 +01:00
async def on_start ( self ) :
self . gpio = self . props . get ( " GPIO " , None )
self . frequency = self . props . get ( " Frequency " , 0.5 )
if self . gpio is not None :
GPIO . setup ( self . gpio , GPIO . OUT )
GPIO . output ( self . gpio , 0 )
2021-02-02 00:07:58 +01:00
self . state = False
2021-11-05 19:32:34 +01:00
self . power = 100
self . p = None
2021-02-02 00:07:58 +01:00
pass
2021-11-05 19:32:34 +01:00
async def on ( self , power = None ) :
2021-11-08 14:04:28 +01:00
if power is not None :
self . power = power
2021-11-05 19:32:34 +01:00
logger . info ( " PWM ACTOR %s ON - GPIO %s - Frequency %s - Power %s " % ( self . id , self . gpio , self . frequency , self . power ) )
2021-02-02 00:07:58 +01:00
try :
2021-11-05 19:32:34 +01:00
if self . p is None :
self . p = GPIO . PWM ( int ( self . gpio ) , float ( self . frequency ) )
self . p . start ( self . power )
self . state = True
2021-02-02 00:07:58 +01:00
except :
pass
async def off ( self ) :
logger . info ( " PWM ACTOR %s OFF - GPIO %s " % ( self . id , self . gpio ) )
2021-11-05 19:32:34 +01:00
self . p . ChangeDutyCycle ( 0 )
2021-02-02 00:07:58 +01:00
self . state = False
2021-11-06 15:15:11 +01:00
async def set_power ( self , power ) :
if self . p and self . state == True :
self . p . ChangeDutyCycle ( power )
2021-11-08 14:04:28 +01:00
await self . cbpi . actor . set_power ( self . id , power )
2021-11-06 15:15:11 +01:00
pass
2021-02-02 00:07:58 +01:00
def get_state ( self ) :
return self . state
async def run ( self ) :
2021-03-15 13:02:35 +01:00
while self . running == True :
2021-02-16 20:37:51 +01:00
await asyncio . sleep ( 1 )
2021-02-02 00:07:58 +01:00
def setup ( cbpi ) :
'''
This method is called by the server during startup
Here you need to register your plugins at the server
: param cbpi : the cbpi core
: return :
'''
cbpi . plugin . register ( " GPIOActor " , GPIOActor )
cbpi . plugin . register ( " GPIOPWMActor " , GPIOPWMActor )