diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 12f5feac..3dda9125 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -82,10 +82,18 @@ class ActionEngine(base.Engine): self._state_lock = threading.RLock() self._storage_ensured = False + def _check(self, name, check_compiled, check_storage_ensured): + """Check (and raise) if the engine has not reached a certain stage.""" + if check_compiled and not self._compiled: + raise exc.InvalidState("Can not %s an engine which" + " has not been compiled" % name) + if check_storage_ensured and not self._storage_ensured: + raise exc.InvalidState("Can not %s an engine" + " which has not has its storage" + " populated" % name) + def suspend(self): - if not self._compiled: - raise exc.InvalidState("Can not suspend an engine" - " which has not been compiled") + self._check('suspend', True, False) self._change_state(states.SUSPENDING) @property @@ -216,10 +224,7 @@ class ActionEngine(base.Engine): @lock_utils.locked def validate(self): - if not self._storage_ensured: - raise exc.InvalidState("Can not validate an engine" - " which has not has its storage" - " populated") + self._check('validate', True, True) # At this point we can check to ensure all dependencies are either # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid @@ -263,9 +268,7 @@ class ActionEngine(base.Engine): @lock_utils.locked def prepare(self): - if not self._compiled: - raise exc.InvalidState("Can not prepare an engine" - " which has not been compiled") + self._check('prepare', True, False) if not self._storage_ensured: # Set our own state to resuming -> (ensure atoms exist # in storage) -> suspended in the storage unit and notify any