step controller restart logic added

This commit is contained in:
manuel83 2018-12-16 21:42:47 +01:00
parent a134463646
commit d5dac67c35
16 changed files with 535 additions and 359 deletions

File diff suppressed because it is too large Load diff

View file

@ -60,7 +60,12 @@ class CBPiSimpleStep(metaclass=ABCMeta):
if self.is_dirty(): if self.is_dirty():
# Now we have to store the managed props # Now we have to store the managed props
state = {}
for field in self.managed_fields:
state[field] = self.__getattribute__(field)
#step_controller.model.update_step_state(step_controller.current_step.id, state)
print("STATE",state)
await self.cbpi.step.model.update_step_state(self.id, state)
self.reset_dirty() self.reset_dirty()

View file

@ -1,4 +1,6 @@
import asyncio import asyncio
import time
from aiohttp import web from aiohttp import web
from core.api import on_event, request_mapping from core.api import on_event, request_mapping
from core.controller.crud_controller import CRUDController from core.controller.crud_controller import CRUDController
@ -34,7 +36,33 @@ class StepController(HttpAPI, CRUDController):
:return: :return:
''' '''
await super(StepController, self).init() await super(StepController, self).init()
pass print("INIT LAST STEP")
await self.init_after_startup()
async def init_after_startup(self):
step = await self.model.get_by_state('A')
# We have an active step
if step is not None:
print("INIT LAST STEP", step.__dict__)
# get the type
print(self.types)
step_type = self.types.get(step.type)
if step_type is None:
# step type not found. cant restart step
print("STEP TYPE NONT FOUND")
return
if step.stepstate is not None:
cfg = step.stepstate.copy()
else:
cfg = {}
cfg.update(dict(cbpi=self.cbpi, id=step.id, managed_fields=self._get_manged_fields_as_array(step_type)))
self.current_step = step_type["class"](**cfg)
self.current_job = await self.cbpi.start_job(self.current_step.run(), step.name, "step")
@request_mapping(path="/action", auth_required=False) @request_mapping(path="/action", auth_required=False)
async def http_action(self, request): async def http_action(self, request):
@ -111,6 +139,7 @@ class StepController(HttpAPI, CRUDController):
:return: None :return: None
''' '''
if self.current_step is not None: if self.current_step is not None:
self.current_step.next() self.current_step.next()
pass pass
@ -136,16 +165,9 @@ class StepController(HttpAPI, CRUDController):
:param kwargs: :param kwargs:
:return: None :return: None
''' '''
await self.model.reset_all_steps()
if self.current_step is not None:
self.current_job.stop()
self.current_step.reset()
self.steps[self.current_step.id]["state"] = None
self.current_step = None
self.current_task = None
await self.start()
@on_event("step/stop") @on_event("step/stop")
async def handle_stop(self, **kwargs): async def handle_stop(self, **kwargs):
@ -179,6 +201,9 @@ class StepController(HttpAPI, CRUDController):
:return: :return:
''' '''
print("IS SHUTODONW", self.cbpi.shutdown)
if self.cbpi.shutdown:
return
print("JOB DONE STEP") print("JOB DONE STEP")
self.cache[self.current_step.id].state = "D" self.cache[self.current_step.id].state = "D"
step_id = self.current_step.id step_id = self.current_step.id
@ -207,20 +232,31 @@ class StepController(HttpAPI, CRUDController):
if self.current_step is None: if self.current_step is None:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
open_step = False open_step = False
for key, step in self.cache.items():
if step.state is None: inactive = await self.model.get_by_state("I")
active = await self.model.get_by_state("A")
print("STEPES", inactive, active)
if active is not None:
active.state = 'D'
active.end = int(time.time())
# self.stop_step()
self.current_step = None
await self.model.update(**active.__dict__)
if inactive is not None:
step_type = self.types["CustomStepCBPi"] step_type = self.types["CustomStepCBPi"]
config = dict(cbpi = self.cbpi, id=key, name=step.name, managed_fields=self._get_manged_fields_as_array(step_type)) config = dict(cbpi=self.cbpi, id=inactive.id, name=inactive.name, managed_fields=self._get_manged_fields_as_array(step_type))
self.current_step = step_type["class"](**config) self.current_step = step_type["class"](**config)
self.current_job = await self.cbpi.start_job(self.current_step.run(), step.name, "step") inactive.state = 'A'
await asyncio.sleep(4) inactive.stepstate = inactive.config
await self.current_job.close() inactive.start = int(time.time())
await self.model.update(**inactive.__dict__)
open_step = True self.current_job = await self.cbpi.start_job(self.current_step.run(), inactive.name, "step")
break else:
if open_step == False:
await self.cbpi.bus.fire("step/berwing/finished") await self.cbpi.bus.fire("step/berwing/finished")
async def stop(self): async def stop(self):

View file

@ -50,8 +50,18 @@ class CraftBeerPi():
middlewares = [session_middleware(EncryptedCookieStorage(urandom(32))), auth.auth_middleware(policy)] middlewares = [session_middleware(EncryptedCookieStorage(urandom(32))), auth.auth_middleware(policy)]
self.app = web.Application(middlewares=middlewares) self.app = web.Application(middlewares=middlewares)
self.initializer = [] self.initializer = []
self.shutdown = False
async def on_cleanup(app):
self.shutdown = True
self.app.on_cleanup.append(on_cleanup)
setup(self.app, self) setup(self.app, self)
self.bus = EventBus(self.app.loop, self) self.bus = EventBus(self.app.loop, self)
self.ws = WebSocket(self) self.ws = WebSocket(self)
self.actor = ActorController(self) self.actor = ActorController(self)

View file

@ -1,5 +1,9 @@
from core.database.orm_framework import DBModel import json
import aiosqlite
from core.database.orm_framework import DBModel
TEST_DB = "./craftbeerpi.db"
class ActorModel(DBModel): class ActorModel(DBModel):
__fields__ = ["name", "type", "config"] __fields__ = ["name", "type", "config"]
@ -31,6 +35,33 @@ class StepModel(DBModel):
__table_name__ = "step" __table_name__ = "step"
__json_fields__ = ["config", "stepstate"] __json_fields__ = ["config", "stepstate"]
@classmethod
async def update_step_state(cls, step_id, state):
async with aiosqlite.connect(TEST_DB) as db:
cursor = await db.execute("UPDATE %s SET stepstate = ? WHERE id = ?" % cls.__table_name__, (json.dumps(state), step_id))
await db.commit()
@classmethod
async def get_by_state(cls, state, order=True):
async with aiosqlite.connect(TEST_DB) as db:
db.row_factory = aiosqlite.Row
db.row_factory = DBModel.dict_factory
async with db.execute("SELECT * FROM %s WHERE state = ? ORDER BY %s.'order'" % (cls.__table_name__, cls.__table_name__,), state) as cursor:
row = await cursor.fetchone()
if row is not None:
return cls(row)
else:
return None
@classmethod
async def reset_all_steps(cls):
async with aiosqlite.connect(TEST_DB) as db:
cursor = await db.execute("UPDATE %s SET state = 'I', stepstate = NULL , start = NULL, end = NULL " % cls.__table_name__)
await db.commit()
''' '''
@classmethod @classmethod
@ -63,11 +94,7 @@ class StepModel(DBModel):
else: else:
return None return None
@classmethod
def update_step_state(cls, id, state):
cur = get_db().cursor()
cur.execute("UPDATE %s SET stepstate = ? WHERE id =?" % cls.__table_name__, (json.dumps(state), id))
get_db().commit()
@classmethod @classmethod
def reset_all_steps(cls): def reset_all_steps(cls):

View file

@ -121,7 +121,7 @@ class DBModel(object):
else: else:
data = data + (kwargs.get(f),) data = data + (kwargs.get(f),)
print("INSERT DATA", query, data)
cursor = await db.execute(query, data) cursor = await db.execute(query, data)
await db.commit() await db.commit()

View file

@ -19,7 +19,10 @@ class CustomStepCBPi(CBPiSimpleStep):
#await asyncio.sleep(1) #await asyncio.sleep(1)
self.i = self.i + 1 self.i = self.i + 1
self.cbpi.notify(key="step", message="OH YES") self.name="HALLO WELT"
if self.i == 20:
self.next()
self.cbpi.notify(key="step", message="HELLO FROM STEP")
print("RUN STEP", self.id, self.name, self.__dict__) print("RUN STEP", self.id, self.name, self.__dict__)

View file

@ -86,7 +86,7 @@ class Scheduler(*bases):
if self._closed: if self._closed:
return return
self._closed = True # prevent adding new jobs self._closed = True # prevent adding new jobs
print("####### CLOSE")
jobs = self._jobs jobs = self._jobs
if jobs: if jobs:
# cleanup pending queue # cleanup pending queue

Binary file not shown.