diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 1ad00a0d..b6e953bd 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -64,6 +64,7 @@ class ActionEngine(base.EngineBase): self._task_executor = None self._task_action = None self._retry_action = None + self._storage_ensured = False def __str__(self): return "%s: %s" % (reflection.get_class_name(self), id(self)) @@ -83,15 +84,7 @@ class ActionEngine(base.EngineBase): def run(self): """Runs the flow in the engine to completion.""" self.compile() - external_provides = set(self.storage.fetch_all().keys()) - missing = self._flow.requires - external_provides - if missing: - raise exc.MissingDependencies(self._flow, sorted(missing)) - - if self.storage.get_flow_state() == states.REVERTED: - self._root.reset_all() - self._change_state(states.PENDING) - + self.prepare() self._task_executor.start() try: self._run() @@ -145,6 +138,27 @@ class ActionEngine(base.EngineBase): self.storage.ensure_task(node.name, version, node.save_as) self._change_state(states.SUSPENDED) # does nothing in PENDING state + @lock_utils.locked + def prepare(self): + if not self._compiled: + raise exc.InvalidState("Can not prepare an engine" + " which has not been compiled") + if not self._storage_ensured: + self._ensure_storage_for(self.execution_graph) + self._storage_ensured = True + # At this point we can check to ensure all dependencies are either + # flow/task provided or storage provided, if there are still missing + # dependencies then this flow will fail at runtime (which we can avoid + # by failing at preparation time). + external_provides = set(self.storage.fetch_all().keys()) + missing = self._flow.requires - external_provides + if missing: + raise exc.MissingDependencies(self._flow, sorted(missing)) + # Reset everything back to pending (if we were previously reverted). + if self.storage.get_flow_state() == states.REVERTED: + self._root.reset_all() + self._change_state(states.PENDING) + @lock_utils.locked def compile(self): if self._compiled: @@ -167,13 +181,8 @@ class ActionEngine(base.EngineBase): self.storage, self._task_action, self._retry_action) - # NOTE(harlowja): Perform initial state manipulation and setup. - # - # TODO(harlowja): This doesn't seem like it should be in a compilation - # function since compilation seems like it should not modify any - # external state. - self._ensure_storage_for(execution_graph) self._compiled = True + return class SingleThreadedActionEngine(ActionEngine): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 3a7c261e..e015798a 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -56,6 +56,16 @@ class EngineBase(object): indicating why this compilation could not be achieved. """ + @abc.abstractmethod + def prepare(self): + """Performs any pre-run, but post-compilation actions. + + NOTE(harlowja): During preparation it is currently assumed that the + underlying storage will be initialized, all final dependencies + will be verified, the tasks will be reset and the engine will enter + the PENDING state. + """ + @abc.abstractmethod def run(self): """Runs the flow in the engine to completion (or die trying).""" diff --git a/taskflow/tests/unit/test_progress.py b/taskflow/tests/unit/test_progress.py index 987c237c..f1fa15d3 100644 --- a/taskflow/tests/unit/test_progress.py +++ b/taskflow/tests/unit/test_progress.py @@ -49,6 +49,7 @@ class TestProgress(test.TestCase): flow_detail=flow_detail, backend=backend) e.compile() + e.prepare() return e def tearDown(self): diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 42f4a5fe..d9c60903 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -394,6 +394,7 @@ class RetryTest(utils.EngineTestBase): ) engine = self._make_engine(flow) engine.compile() + engine.prepare() utils.register_notifiers(engine, self.values) engine.storage.set_atom_state('r1', st.RETRYING) engine.storage.set_atom_state('t1', st.PENDING) @@ -425,6 +426,7 @@ class RetryTest(utils.EngineTestBase): ) engine = self._make_engine(flow) engine.compile() + engine.prepare() utils.register_notifiers(engine, self.values) engine.storage.set_atom_intention('r1', st.RETRY) engine.storage.set_atom_state('r1', st.SUCCESS) @@ -551,6 +553,7 @@ class RetryTest(utils.EngineTestBase): utils.SaveOrderTask('task1')) engine = self._make_engine(flow) engine.compile() + engine.prepare() # imagine we run engine engine.storage.set_flow_state(st.RUNNING) engine.storage.set_atom_intention('flow-1_retry', st.EXECUTE) @@ -662,6 +665,7 @@ class RetryTest(utils.EngineTestBase): utils.FailingTask('c'))) engine = self._make_engine(flow) engine.compile() + engine.prepare() engine.storage.save('test2_retry', 1) engine.storage.save('b', 11) engine.storage.save('a', 10) @@ -687,6 +691,7 @@ class RetryTest(utils.EngineTestBase): utils.SaveOrderTask('c'))) engine = self._make_engine(flow) engine.compile() + engine.prepare() engine.storage.save('test2_retry', 1) engine.storage.save('b', 11) # pretend that 'c' failed