From 7b75b7c6aaa9618f2db6221c50c33debcd799f42 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 18 Sep 2015 11:18:59 -0700 Subject: [PATCH] Change engine 'self._check' into a decorator Instead of having 'self._check' be called at the stat of every using function it can be made into a decorator and placed on the function instead. This way it is a little more explicit than having it called at the start of each method. This also adds a check to ensure the engine has been validated before it is ran (and the decorator takes into this into account as well). Change-Id: I67ee8fe8629392003f7777d4d4fab918b1a071dd --- taskflow/engines/action_engine/engine.py | 52 +++++++++++++++++------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 36ab11b2..1a32b2e8 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -56,6 +56,32 @@ def _start_stop(task_executor, retry_executor): task_executor.stop() +def _pre_check(check_compiled=True, check_storage_ensured=True, + check_validated=True): + """Engine state precondition checking decorator.""" + + def decorator(meth): + do_what = meth.__name__ + + @six.wraps(meth) + def wrapper(self, *args, **kwargs): + if check_compiled and not self._compiled: + raise exc.InvalidState("Can not %s an engine which" + " has not been compiled" % do_what) + if check_storage_ensured and not self._storage_ensured: + raise exc.InvalidState("Can not %s an engine" + " which has not had its storage" + " populated" % do_what) + if check_validated and not self._validated: + raise exc.InvalidState("Can not %s an engine which" + " has not been validated" % do_what) + return meth(self, *args, **kwargs) + + return wrapper + + return decorator + + class ActionEngine(base.Engine): """Generic action-based engine. @@ -131,24 +157,21 @@ class ActionEngine(base.Engine): self._lock = threading.RLock() self._state_lock = threading.RLock() self._storage_ensured = False + self._validated = False # Retries are not *currently* executed out of the engines process # or thread (this could change in the future if we desire it to). self._retry_executor = executor.SerialRetryExecutor() self._inject_transient = strutils.bool_from_string( self._options.get('inject_transient', True)) - def _check(self, name, check_compiled, check_storage_ensured): - """Check (and raise) if the engine has not reached a certain stage.""" - if check_compiled and not self._compiled: - raise exc.InvalidState("Can not %s an engine which" - " has not been compiled" % name) - if check_storage_ensured and not self._storage_ensured: - raise exc.InvalidState("Can not %s an engine" - " which has not has its storage" - " populated" % name) - + @_pre_check(check_compiled=True, + # NOTE(harlowja): We can alter the state of the + # flow without ensuring its storage is setup for + # its atoms (since this state change does not affect + # those units). + check_storage_ensured=False, + check_validated=False) def suspend(self): - self._check('suspend', True, False) self._change_state(states.SUSPENDING) @property @@ -300,8 +323,8 @@ class ActionEngine(base.Engine): transient=self._inject_transient) @fasteners.locked + @_pre_check(check_validated=False) def validate(self): - self._check('validate', True, True) # At this point we can check to ensure all dependencies are either # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid @@ -341,10 +364,11 @@ class ActionEngine(base.Engine): raise exc.MissingDependencies(self._flow, sorted(missing), cause=last_cause) + self._validated = True @fasteners.locked + @_pre_check(check_storage_ensured=False, check_validated=False) def prepare(self): - self._check('prepare', True, False) if not self._storage_ensured: # Set our own state to resuming -> (ensure atoms exist # in storage) -> suspended in the storage unit and notify any @@ -358,8 +382,8 @@ class ActionEngine(base.Engine): self.reset() @fasteners.locked + @_pre_check(check_validated=False) def reset(self): - self._check('reset', True, True) # This transitions *all* contained atoms back into the PENDING state # with an intention to EXECUTE (or dies trying to do that) and then # changes the state of the flow to PENDING so that it can then run...