Brewing Step

StepController

class core.controller.step_controller.StepController(cbpi)

Bases: core.http_endpoints.http_api.HttpAPI, core.controller.crud_controller.CRUDController

_step_done(task)
get_manged_fields_as_array(type_cfg)
handle(topic, **kwargs)
handle_action(topic, action, **kwargs)
handle_next(**kwargs)
handle_reset(topic, **kwargs)
handle_start(topic, **kwargs)
handle_stop(topic, **kwargs)
http_action(request)
http_reset(request)
http_start(request)
init()
Returns:
model

alias of core.database.model.StepModel

start()
stop()

SimpleStep

class core.api.step.SimpleStep(*args, **kwargs)

Bases: object

_SimpleStep__dirty = False
_exception_count = 0
_interval = 1
_max_exceptions = 2
is_dirty()
managed_fields = []
next()
reset()
reset_dirty()
run()
run_cycle()
running()
stop()

Custom Step

__init__.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio

from core.api import Property, action
from core.api.step import SimpleStep


class CustomStep(SimpleStep):

    name = Property.Number(label="Test")

    i = 0
    
    @action(key="name", parameters=None)
    def test(self, **kwargs):
        self.name="WOOHOO"

    async def run_cycle(self):


        #await asyncio.sleep(1)
        self.i = self.i + 1

        print("RUN STEP", self.id, self.name, self.__dict__)


def setup(cbpi):
    '''
    This method is called by the server during startup 
    Here you need to register your plugins at the server

    :param cbpi: the cbpi core 
    :return: 
    '''

    cbpi.plugin.register("CustomStep", CustomStep)

config.yaml

1
2
name: DummyStep
version: 4