Merge "Change engine 'self._check' into a decorator"
This commit is contained in:
@@ -56,6 +56,32 @@ def _start_stop(task_executor, retry_executor):
|
|||||||
task_executor.stop()
|
task_executor.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def _pre_check(check_compiled=True, check_storage_ensured=True,
|
||||||
|
check_validated=True):
|
||||||
|
"""Engine state precondition checking decorator."""
|
||||||
|
|
||||||
|
def decorator(meth):
|
||||||
|
do_what = meth.__name__
|
||||||
|
|
||||||
|
@six.wraps(meth)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
if check_compiled and not self._compiled:
|
||||||
|
raise exc.InvalidState("Can not %s an engine which"
|
||||||
|
" has not been compiled" % do_what)
|
||||||
|
if check_storage_ensured and not self._storage_ensured:
|
||||||
|
raise exc.InvalidState("Can not %s an engine"
|
||||||
|
" which has not had its storage"
|
||||||
|
" populated" % do_what)
|
||||||
|
if check_validated and not self._validated:
|
||||||
|
raise exc.InvalidState("Can not %s an engine which"
|
||||||
|
" has not been validated" % do_what)
|
||||||
|
return meth(self, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
class ActionEngine(base.Engine):
|
class ActionEngine(base.Engine):
|
||||||
"""Generic action-based engine.
|
"""Generic action-based engine.
|
||||||
|
|
||||||
@@ -131,24 +157,21 @@ class ActionEngine(base.Engine):
|
|||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
self._state_lock = threading.RLock()
|
self._state_lock = threading.RLock()
|
||||||
self._storage_ensured = False
|
self._storage_ensured = False
|
||||||
|
self._validated = False
|
||||||
# Retries are not *currently* executed out of the engines process
|
# Retries are not *currently* executed out of the engines process
|
||||||
# or thread (this could change in the future if we desire it to).
|
# or thread (this could change in the future if we desire it to).
|
||||||
self._retry_executor = executor.SerialRetryExecutor()
|
self._retry_executor = executor.SerialRetryExecutor()
|
||||||
self._inject_transient = strutils.bool_from_string(
|
self._inject_transient = strutils.bool_from_string(
|
||||||
self._options.get('inject_transient', True))
|
self._options.get('inject_transient', True))
|
||||||
|
|
||||||
def _check(self, name, check_compiled, check_storage_ensured):
|
@_pre_check(check_compiled=True,
|
||||||
"""Check (and raise) if the engine has not reached a certain stage."""
|
# NOTE(harlowja): We can alter the state of the
|
||||||
if check_compiled and not self._compiled:
|
# flow without ensuring its storage is setup for
|
||||||
raise exc.InvalidState("Can not %s an engine which"
|
# its atoms (since this state change does not affect
|
||||||
" has not been compiled" % name)
|
# those units).
|
||||||
if check_storage_ensured and not self._storage_ensured:
|
check_storage_ensured=False,
|
||||||
raise exc.InvalidState("Can not %s an engine"
|
check_validated=False)
|
||||||
" which has not has its storage"
|
|
||||||
" populated" % name)
|
|
||||||
|
|
||||||
def suspend(self):
|
def suspend(self):
|
||||||
self._check('suspend', True, False)
|
|
||||||
self._change_state(states.SUSPENDING)
|
self._change_state(states.SUSPENDING)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -300,8 +323,8 @@ class ActionEngine(base.Engine):
|
|||||||
transient=self._inject_transient)
|
transient=self._inject_transient)
|
||||||
|
|
||||||
@fasteners.locked
|
@fasteners.locked
|
||||||
|
@_pre_check(check_validated=False)
|
||||||
def validate(self):
|
def validate(self):
|
||||||
self._check('validate', True, True)
|
|
||||||
# At this point we can check to ensure all dependencies are either
|
# At this point we can check to ensure all dependencies are either
|
||||||
# flow/task provided or storage provided, if there are still missing
|
# flow/task provided or storage provided, if there are still missing
|
||||||
# dependencies then this flow will fail at runtime (which we can avoid
|
# dependencies then this flow will fail at runtime (which we can avoid
|
||||||
@@ -341,10 +364,11 @@ class ActionEngine(base.Engine):
|
|||||||
raise exc.MissingDependencies(self._flow,
|
raise exc.MissingDependencies(self._flow,
|
||||||
sorted(missing),
|
sorted(missing),
|
||||||
cause=last_cause)
|
cause=last_cause)
|
||||||
|
self._validated = True
|
||||||
|
|
||||||
@fasteners.locked
|
@fasteners.locked
|
||||||
|
@_pre_check(check_storage_ensured=False, check_validated=False)
|
||||||
def prepare(self):
|
def prepare(self):
|
||||||
self._check('prepare', True, False)
|
|
||||||
if not self._storage_ensured:
|
if not self._storage_ensured:
|
||||||
# Set our own state to resuming -> (ensure atoms exist
|
# Set our own state to resuming -> (ensure atoms exist
|
||||||
# in storage) -> suspended in the storage unit and notify any
|
# in storage) -> suspended in the storage unit and notify any
|
||||||
@@ -358,8 +382,8 @@ class ActionEngine(base.Engine):
|
|||||||
self.reset()
|
self.reset()
|
||||||
|
|
||||||
@fasteners.locked
|
@fasteners.locked
|
||||||
|
@_pre_check(check_validated=False)
|
||||||
def reset(self):
|
def reset(self):
|
||||||
self._check('reset', True, True)
|
|
||||||
# This transitions *all* contained atoms back into the PENDING state
|
# This transitions *all* contained atoms back into the PENDING state
|
||||||
# with an intention to EXECUTE (or dies trying to do that) and then
|
# with an intention to EXECUTE (or dies trying to do that) and then
|
||||||
# changes the state of the flow to PENDING so that it can then run...
|
# changes the state of the flow to PENDING so that it can then run...
|
||||||
|
Reference in New Issue
Block a user