From d5dac67c3515e84aa5718b462c0f002c070e7ba4 Mon Sep 17 00:00:00 2001 From: manuel83 Date: Sun, 16 Dec 2018 21:42:47 +0100 Subject: [PATCH] step controller restart logic added --- .../_metadata_/metadata | Bin 65536 -> 65536 bytes .../_metadata_/metadata.keystream | Bin 4096 -> 4096 bytes .../_metadata_/metadata.keystream.len | Bin 8 -> 8 bytes .../_metadata_/metadata.len | Bin 8 -> 8 bytes .../_metadata_/metadata.values.at | Bin 69 -> 89 bytes .../_metadata_/metadata_i | Bin 32768 -> 32768 bytes .../_metadata_/metadata_i.len | Bin 8 -> 8 bytes .idea/workspace.xml | 753 ++++++++++-------- core/api/step.py | 7 +- core/controller/step_controller.py | 76 +- core/craftbeerpi.py | 10 + core/database/model.py | 39 +- core/database/orm_framework.py | 2 +- core/extension/dummystep/__init__.py | 5 +- core/job/_scheduler.py | 2 +- craftbeerpi.db | Bin 53248 -> 53248 bytes 16 files changed, 535 insertions(+), 359 deletions(-) diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata index bbf0401c192c9fb399ae206b28d0a76286ca6f2f..a638799362906e605d22fc182b38b94d3fb99ff4 100644 GIT binary patch delta 105 zcmZo@U}6<1`ZG(NdE@{5EEGqh>eTSxY2QaJH)`65V`;W delta 61 ucmZo@U}t{m{LEnpaB3fW*3|Q diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.keystream b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.keystream index 08e7df176454f3ee5eeda13efa0adaa54828dfd8..f886e175fc70bf9ce73bc8f710161baa835f8b54 100644 GIT binary patch delta 24 fcmZorXi(s1WMIin%*ms~O;-ma delta 11 ScmZorXi(TF@PU7#f&c&*aRb5t diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.keystream.len b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.keystream.len index 1b1cb4d44c57c2d7a5122870fa6ac3e62ff7e94e..c0f177dfcd03fb7f74f69f76ba46d6dc69d4913c 100644 GIT binary patch literal 8 LcmZQz00RL402lxf literal 8 KcmZQzfB*mh2mk>9 diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.len b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.len index 1b1cb4d44c57c2d7a5122870fa6ac3e62ff7e94e..60e2d8aa9e10da6b90c32ba9742bd89d145b31f8 100644 GIT binary patch literal 8 McmZQz0D%S$009sHi2wiq literal 8 KcmZQzfB*mh2mk>9 diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.values.at b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata.values.at index 2bf01cfd49958f478dd1d22fd37a9711a6440a92..9210250bb9a438e4edf3297ea242c2a883f52116 100644 GIT binary patch delta 7 OcmZ>boZvdqPy_%94FXC4 delta 4 Lcma!ao!|-p1DpYL diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata_i b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata_i index 12f3be4dd3b5a2b5146f36630acbf7e99e490797..143861c0f5407b3df56e48b3728dcc170f68bee5 100644 GIT binary patch delta 27 icmZo@U}|V!VqsupVBW}hmV>>4fgwGb|H4K_jdB28Q3nkG delta 16 XcmZo@U}|V!+Wde+p?vcPjt3k7I4K6o diff --git a/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata_i.len b/.idea/dataSources/5067e7fe-480d-4433-bc40-f2d1c38362a2/_metadata_/metadata_i.len index 1b1cb4d44c57c2d7a5122870fa6ac3e62ff7e94e..131e265740f37d77b7c4a3676d2a7704ca3e4a29 100644 GIT binary patch literal 8 McmZQz0D%Su009U9fdBvi literal 8 KcmZQzfB*mh2mk>9 diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 9d09ff3..a3ed28a 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,16 +2,21 @@ - - - - - - + + + + + + + - + + + + + @@ -213,8 +222,6 @@ - as*bus.fire( - asyc*bus.fire( asy*bus.fire( async*bus.fire( async *bus.fire( @@ -240,9 +247,11 @@ cbpi.bus.f cbpi.bus.fi cbpi.bus.fire - cbpi.bus.fire fire + update_step_state schedul + reset + setup @@ -262,11 +271,9 @@ @@ -323,8 +332,8 @@ DEFINITION_ORDER - @@ -343,6 +352,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -373,7 +429,7 @@ @@ -391,13 +447,17 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1141,19 +1278,18 @@ - + + - + + - - - - + @@ -1162,6 +1298,7 @@ + @@ -1207,66 +1344,10 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1290,7 +1371,6 @@ - @@ -1306,7 +1386,6 @@ - @@ -1318,19 +1397,10 @@ - - - - - - - - - @@ -1338,7 +1408,6 @@ - @@ -1346,9 +1415,6 @@ - - - @@ -1356,7 +1422,6 @@ - @@ -1364,7 +1429,6 @@ - @@ -1380,7 +1444,6 @@ - @@ -1388,7 +1451,6 @@ - @@ -1396,7 +1458,6 @@ - @@ -1404,9 +1465,6 @@ - - - @@ -1438,22 +1496,6 @@ - - - - - - - - - - - - - - - - @@ -1461,7 +1503,6 @@ - @@ -1469,7 +1510,6 @@ - @@ -1497,7 +1537,6 @@ - @@ -1510,7 +1549,6 @@ - @@ -1572,17 +1610,6 @@ - - - - - - - - - - - @@ -1594,14 +1621,7 @@ - - - - - - - - + @@ -1620,44 +1640,111 @@ - + - - + + + + + + + + + + - + - + - - - + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + + + + @@ -1671,10 +1758,18 @@ + + + + + + + + - - + + diff --git a/core/api/step.py b/core/api/step.py index e8e2201..f1f1425 100644 --- a/core/api/step.py +++ b/core/api/step.py @@ -60,7 +60,12 @@ class CBPiSimpleStep(metaclass=ABCMeta): if self.is_dirty(): # Now we have to store the managed props - + state = {} + for field in self.managed_fields: + state[field] = self.__getattribute__(field) + #step_controller.model.update_step_state(step_controller.current_step.id, state) + print("STATE",state) + await self.cbpi.step.model.update_step_state(self.id, state) self.reset_dirty() diff --git a/core/controller/step_controller.py b/core/controller/step_controller.py index 1fc7242..c5bacbd 100644 --- a/core/controller/step_controller.py +++ b/core/controller/step_controller.py @@ -1,4 +1,6 @@ import asyncio + +import time from aiohttp import web from core.api import on_event, request_mapping from core.controller.crud_controller import CRUDController @@ -34,7 +36,33 @@ class StepController(HttpAPI, CRUDController): :return: ''' await super(StepController, self).init() - pass + print("INIT LAST STEP") + await self.init_after_startup() + + async def init_after_startup(self): + step = await self.model.get_by_state('A') + # We have an active step + + + if step is not None: + print("INIT LAST STEP", step.__dict__) + # get the type + print(self.types) + step_type = self.types.get(step.type) + + if step_type is None: + # step type not found. cant restart step + print("STEP TYPE NONT FOUND") + return + + if step.stepstate is not None: + cfg = step.stepstate.copy() + else: + cfg = {} + cfg.update(dict(cbpi=self.cbpi, id=step.id, managed_fields=self._get_manged_fields_as_array(step_type))) + + self.current_step = step_type["class"](**cfg) + self.current_job = await self.cbpi.start_job(self.current_step.run(), step.name, "step") @request_mapping(path="/action", auth_required=False) async def http_action(self, request): @@ -111,6 +139,7 @@ class StepController(HttpAPI, CRUDController): :return: None ''' if self.current_step is not None: + self.current_step.next() pass @@ -136,16 +165,9 @@ class StepController(HttpAPI, CRUDController): :param kwargs: :return: None ''' + await self.model.reset_all_steps() - if self.current_step is not None: - self.current_job.stop() - self.current_step.reset() - - self.steps[self.current_step.id]["state"] = None - self.current_step = None - self.current_task = None - await self.start() @on_event("step/stop") async def handle_stop(self, **kwargs): @@ -179,6 +201,9 @@ class StepController(HttpAPI, CRUDController): :return: ''' + print("IS SHUTODONW", self.cbpi.shutdown) + if self.cbpi.shutdown: + return print("JOB DONE STEP") self.cache[self.current_step.id].state = "D" step_id = self.current_step.id @@ -207,20 +232,31 @@ class StepController(HttpAPI, CRUDController): if self.current_step is None: loop = asyncio.get_event_loop() open_step = False - for key, step in self.cache.items(): - if step.state is None: - step_type = self.types["CustomStepCBPi"] - config = dict(cbpi = self.cbpi, id=key, name=step.name, managed_fields=self._get_manged_fields_as_array(step_type)) - self.current_step = step_type["class"](**config) + inactive = await self.model.get_by_state("I") + active = await self.model.get_by_state("A") - self.current_job = await self.cbpi.start_job(self.current_step.run(), step.name, "step") - await asyncio.sleep(4) - await self.current_job.close() + print("STEPES", inactive, active) - open_step = True - break - if open_step == False: + if active is not None: + active.state = 'D' + active.end = int(time.time()) + # self.stop_step() + self.current_step = None + await self.model.update(**active.__dict__) + + if inactive is not None: + step_type = self.types["CustomStepCBPi"] + + config = dict(cbpi=self.cbpi, id=inactive.id, name=inactive.name, managed_fields=self._get_manged_fields_as_array(step_type)) + self.current_step = step_type["class"](**config) + + inactive.state = 'A' + inactive.stepstate = inactive.config + inactive.start = int(time.time()) + await self.model.update(**inactive.__dict__) + self.current_job = await self.cbpi.start_job(self.current_step.run(), inactive.name, "step") + else: await self.cbpi.bus.fire("step/berwing/finished") async def stop(self): diff --git a/core/craftbeerpi.py b/core/craftbeerpi.py index 5b46435..b7257ee 100644 --- a/core/craftbeerpi.py +++ b/core/craftbeerpi.py @@ -50,8 +50,18 @@ class CraftBeerPi(): middlewares = [session_middleware(EncryptedCookieStorage(urandom(32))), auth.auth_middleware(policy)] self.app = web.Application(middlewares=middlewares) self.initializer = [] + self.shutdown = False + + async def on_cleanup(app): + self.shutdown = True + + + self.app.on_cleanup.append(on_cleanup) + setup(self.app, self) + + self.bus = EventBus(self.app.loop, self) self.ws = WebSocket(self) self.actor = ActorController(self) diff --git a/core/database/model.py b/core/database/model.py index 70eb1b8..c2c79b0 100644 --- a/core/database/model.py +++ b/core/database/model.py @@ -1,5 +1,9 @@ -from core.database.orm_framework import DBModel +import json +import aiosqlite + +from core.database.orm_framework import DBModel +TEST_DB = "./craftbeerpi.db" class ActorModel(DBModel): __fields__ = ["name", "type", "config"] @@ -31,6 +35,33 @@ class StepModel(DBModel): __table_name__ = "step" __json_fields__ = ["config", "stepstate"] + @classmethod + async def update_step_state(cls, step_id, state): + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("UPDATE %s SET stepstate = ? WHERE id = ?" % cls.__table_name__, (json.dumps(state), step_id)) + await db.commit() + + @classmethod + async def get_by_state(cls, state, order=True): + + + async with aiosqlite.connect(TEST_DB) as db: + db.row_factory = aiosqlite.Row + db.row_factory = DBModel.dict_factory + async with db.execute("SELECT * FROM %s WHERE state = ? ORDER BY %s.'order'" % (cls.__table_name__, cls.__table_name__,), state) as cursor: + row = await cursor.fetchone() + if row is not None: + return cls(row) + else: + return None + + @classmethod + async def reset_all_steps(cls): + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("UPDATE %s SET state = 'I', stepstate = NULL , start = NULL, end = NULL " % cls.__table_name__) + await db.commit() + + ''' @classmethod @@ -63,11 +94,7 @@ class StepModel(DBModel): else: return None - @classmethod - def update_step_state(cls, id, state): - cur = get_db().cursor() - cur.execute("UPDATE %s SET stepstate = ? WHERE id =?" % cls.__table_name__, (json.dumps(state), id)) - get_db().commit() + @classmethod def reset_all_steps(cls): diff --git a/core/database/orm_framework.py b/core/database/orm_framework.py index 59b9b3d..09cf051 100644 --- a/core/database/orm_framework.py +++ b/core/database/orm_framework.py @@ -121,7 +121,7 @@ class DBModel(object): else: data = data + (kwargs.get(f),) - + print("INSERT DATA", query, data) cursor = await db.execute(query, data) await db.commit() diff --git a/core/extension/dummystep/__init__.py b/core/extension/dummystep/__init__.py index 3fd009e..93a1525 100644 --- a/core/extension/dummystep/__init__.py +++ b/core/extension/dummystep/__init__.py @@ -19,7 +19,10 @@ class CustomStepCBPi(CBPiSimpleStep): #await asyncio.sleep(1) self.i = self.i + 1 - self.cbpi.notify(key="step", message="OH YES") + self.name="HALLO WELT" + if self.i == 20: + self.next() + self.cbpi.notify(key="step", message="HELLO FROM STEP") print("RUN STEP", self.id, self.name, self.__dict__) diff --git a/core/job/_scheduler.py b/core/job/_scheduler.py index 2d9d73b..20d5706 100644 --- a/core/job/_scheduler.py +++ b/core/job/_scheduler.py @@ -86,7 +86,7 @@ class Scheduler(*bases): if self._closed: return self._closed = True # prevent adding new jobs - + print("####### CLOSE") jobs = self._jobs if jobs: # cleanup pending queue diff --git a/craftbeerpi.db b/craftbeerpi.db index d8154ee73427f0c1d055a358df1faaaf81aabc49..be4384ba1e95df950066668bf67ccc764a2d5003 100644 GIT binary patch delta 435 zcmZozz}&Ead4e?K#fdV`j2Aa1EYX+Y;mcs)zsJ9ae-?iQe;B_BzYyOCzH@wQ_`3Kq zHY*r-@G&XyOuk^J$jA*Qxj^LR^L9p#{QLn7EPO8*`CjnP;n(87y;)FT9bY}a3xg|z zxS}OL3yUa2NGksfhFAu1O(2&6#4Ro{aV{+`$n`+