2021-02-16 20:37:51 +01:00
import asyncio
2021-02-02 00:07:58 +01:00
import logging
from unittest . mock import MagicMock , patch
from cbpi . api import *
logger = logging . getLogger ( __name__ )
try :
import RPi . GPIO as GPIO
except Exception :
logger . error ( " Failed to load RPi.GPIO. Using Mock " )
MockRPi = MagicMock ( )
modules = {
" RPi " : MockRPi ,
" RPi.GPIO " : MockRPi . GPIO
}
patcher = patch . dict ( " sys.modules " , modules )
patcher . start ( )
import RPi . GPIO as GPIO
2021-02-09 18:48:45 +01:00
mode = GPIO . getmode ( )
if ( mode == None ) :
GPIO . setmode ( GPIO . BCM )
2021-02-02 00:07:58 +01:00
2021-02-07 12:07:48 +01:00
@parameters ( [ Property . Select ( label = " GPIO " , options = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 ] ) , Property . Select ( label = " Inverted " , options = [ " Yes " , " No " ] , description = " No: Active on high; Yes: Active on low " ) ] )
2021-02-02 00:07:58 +01:00
class GPIOActor ( CBPiActor ) :
def get_GPIO_state ( self , state ) :
# ON
if state == 1 :
2021-02-07 12:07:48 +01:00
return 1 if self . inverted == False else 0
2021-02-02 00:07:58 +01:00
# OFF
if state == 0 :
2021-02-07 12:07:48 +01:00
return 0 if self . inverted == False else 1
2021-02-02 00:07:58 +01:00
2021-02-16 20:37:51 +01:00
async def on_start ( self ) :
self . gpio = self . props . GPIO
2021-02-02 00:07:58 +01:00
self . inverted = True if self . props . get ( " Inverted " , " No " ) == " Yes " else False
GPIO . setup ( self . gpio , GPIO . OUT )
GPIO . output ( self . gpio , self . get_GPIO_state ( 0 ) )
self . state = False
async def on ( self , power = 0 ) :
logger . info ( " ACTOR %s ON - GPIO %s " % ( self . id , self . gpio ) )
GPIO . output ( self . gpio , self . get_GPIO_state ( 1 ) )
self . state = True
async def off ( self ) :
logger . info ( " ACTOR %s OFF - GPIO %s " % ( self . id , self . gpio ) )
GPIO . output ( self . gpio , self . get_GPIO_state ( 0 ) )
self . state = False
def get_state ( self ) :
return self . state
async def run ( self ) :
2021-02-16 20:37:51 +01:00
while True :
await asyncio . sleep ( 1 )
2021-02-02 00:07:58 +01:00
@parameters ( [ Property . Select ( label = " GPIO " , options = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 ] ) , Property . Number ( " Frequency " , configurable = True ) ] )
class GPIOPWMActor ( CBPiActor ) :
# Custom property which can be configured by the user
@action ( " test " , parameters = { } )
async def power ( self , * * kwargs ) :
self . p . ChangeDutyCycle ( 1 )
async def start ( self ) :
await super ( ) . start ( )
self . gpio = self . props . get ( " GPIO " )
self . frequency = self . props . get ( " Frequency " )
GPIO . setup ( self . gpio , GPIO . OUT )
GPIO . output ( self . gpio , 0 )
self . state = False
pass
async def on ( self , power = 0 ) :
logger . info ( " PWM ACTOR %s ON - GPIO %s " % ( self . id , self . gpio ) )
try :
self . p = GPIO . PWM ( int ( self . gpio ) , float ( self . frequency ) )
self . p . start ( 1 )
except :
pass
self . state = True
async def off ( self ) :
logger . info ( " PWM ACTOR %s OFF - GPIO %s " % ( self . id , self . gpio ) )
self . p . stop ( )
self . state = False
def get_state ( self ) :
return self . state
async def run ( self ) :
2021-02-16 20:37:51 +01:00
while True :
await asyncio . sleep ( 1 )
2021-02-02 00:07:58 +01:00
def setup ( cbpi ) :
'''
This method is called by the server during startup
Here you need to register your plugins at the server
: param cbpi : the cbpi core
: return :
'''
cbpi . plugin . register ( " GPIOActor " , GPIOActor )
cbpi . plugin . register ( " GPIOPWMActor " , GPIOPWMActor )