diff --git a/doc/source/img/flow_states.svg b/doc/source/img/flow_states.svg index 80bf1a0a..cf60000f 100644 --- a/doc/source/img/flow_states.svg +++ b/doc/source/img/flow_states.svg @@ -1,8 +1,8 @@ - - -Flow statesPENDINGRUNNINGFAILURESUSPENDINGREVERTEDSUCCESSRESUMINGSUSPENDEDstart + +Flow statesPENDINGRUNNINGFAILURESUSPENDINGREVERTEDSUCCESSRESUMINGSUSPENDEDstart diff --git a/doc/source/states.rst b/doc/source/states.rst index 3d42bad1..d8e19eae 100644 --- a/doc/source/states.rst +++ b/doc/source/states.rst @@ -50,10 +50,11 @@ Flow :align: center :alt: Flow state transitions -**PENDING** - A flow starts its execution lifecycle in this state (it has no -state prior to being ran by an engine, since flow(s) are just pattern(s) -that define the semantics and ordering of their contents and flows gain -state only when they are executed). +**PENDING** - A flow starts (or +via :py:meth:`~taskflow.engines.base.Engine.reset`) its execution lifecycle +in this state (it has no state prior to being ran by an engine, since +flow(s) are just pattern(s) that define the semantics and ordering of their +contents and flows gain state only when they are executed). **RUNNING** - In this state the engine running a flow progresses through the flow. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index fa6247d8..ef557f93 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -290,8 +290,19 @@ class ActionEngine(base.Engine): self._storage_ensured = True # Reset everything back to pending (if we were previously reverted). if self.storage.get_flow_state() == states.REVERTED: - self._runtime.reset_all() - self._change_state(states.PENDING) + self.reset() + + @fasteners.locked + def reset(self): + if not self._storage_ensured: + raise exc.InvalidState("Can not reset an engine" + " which has not has its storage" + " populated") + # This transitions *all* contained atoms back into the PENDING state + # with an intention to EXECUTE (or dies trying to do that) and then + # changes the state of the flow to PENDING so that it can then run... + self._runtime.reset_all() + self._change_state(states.PENDING) @fasteners.locked def compile(self): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 4b6a648d..a500dd47 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -92,13 +92,26 @@ class Engine(object): could not be achieved. """ + @abc.abstractmethod + def reset(self): + """Reset back to the ``PENDING`` state. + + If a flow had previously ended up (from a prior engine + :py:func:`.run`) in the ``FAILURE``, ``SUCCESS`` or ``REVERTED`` + states (or for some reason it ended up in an intermediary state) it + can be desireable to make it possible to run it again. Calling this + method enables that to occur (without causing a state transition + failure, which would typically occur if :py:meth:`.run` is called + directly without doing a reset). + """ + @abc.abstractmethod def prepare(self): """Performs any pre-run, but post-compilation actions. NOTE(harlowja): During preparation it is currently assumed that the underlying storage will be initialized, the atoms will be reset and - the engine will enter the PENDING state. + the engine will enter the ``PENDING`` state. """ @abc.abstractmethod diff --git a/taskflow/states.py b/taskflow/states.py index 07e70dd1..1939012b 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -103,6 +103,7 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( (FAILURE, RUNNING), # see note below (REVERTED, PENDING), # try again + (SUCCESS, PENDING), # run it again (SUSPENDING, SUSPENDED), # suspend finished (SUSPENDING, SUCCESS), # all tasks finished while we were waiting diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 0c3ac031..c56d7569 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -16,6 +16,7 @@ import contextlib import functools +import threading import futurist import six @@ -798,6 +799,138 @@ class EngineMissingDepsTest(utils.EngineTestBase): self.assertIsNotNone(c_e.cause) +class EngineResetTests(utils.EngineTestBase): + def test_completed_reset_run_again(self): + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + + flow = lf.Flow('root') + flow.add(task1, task2, task3) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ] + self.assertEqual(expected, capturer.values) + + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + self.assertEqual([], capturer.values) + + engine.reset() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + self.assertEqual(expected, capturer.values) + + def test_failed_reset_run_again(self): + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.FailingTask(name='task3') + + flow = lf.Flow('root') + flow.add(task1, task2, task3) + engine = self._make_engine(flow) + + with utils.CaptureListener(engine, capture_flow=False) as capturer: + # Also allow a WrappedFailure exception so that when this is used + # with the WBE engine (as it can't re-raise the original + # exception) that we will work correctly.... + self.assertRaises((RuntimeError, exc.WrappedFailure), engine.run) + + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + 'task3.t RUNNING', + + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + + 'task3.t REVERTING', + 'task3.t REVERTED(None)', + 'task2.t REVERTING', + 'task2.t REVERTED(None)', + 'task1.t REVERTING', + 'task1.t REVERTED(None)', + ] + self.assertEqual(expected, capturer.values) + + engine.reset() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertRaises((RuntimeError, exc.WrappedFailure), engine.run) + self.assertEqual(expected, capturer.values) + + def test_suspended_reset_run_again(self): + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + + flow = lf.Flow('root') + flow.add(task1, task2, task3) + engine = self._make_engine(flow) + suspend_at = object() + expected_states = [ + states.RESUMING, + states.SCHEDULING, + states.WAITING, + states.ANALYZING, + states.SCHEDULING, + states.WAITING, + # Stop/suspend here... + suspend_at, + states.SUSPENDED, + ] + with utils.CaptureListener(engine, capture_flow=False) as capturer: + for i, st in enumerate(engine.run_iter()): + expected = expected_states[i] + if expected is suspend_at: + engine.suspend() + else: + self.assertEqual(expected, st) + + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + ] + self.assertEqual(expected, capturer.values) + + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = [ + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ] + self.assertEqual(expected, capturer.values) + + engine.reset() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ] + self.assertEqual(expected, capturer.values) + + class EngineGraphConditionalFlowTest(utils.EngineTestBase): def test_graph_flow_conditional(self): @@ -829,6 +962,54 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase): ]) self.assertEqual(expected, set(capturer.values)) + def test_graph_flow_conditional_ignore_reset(self): + allow_execute = threading.Event() + flow = gf.Flow('root') + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + + flow.add(task1, task2, task3) + flow.link(task1, task2) + flow.link(task2, task3, decider=lambda history: allow_execute.is_set()) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t IGNORE', + ]) + self.assertEqual(expected, set(capturer.values)) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_state('task3')) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_intention('task3')) + + engine.reset() + allow_execute.set() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + def test_graph_flow_diamond_ignored(self): flow = gf.Flow('root') @@ -951,6 +1132,7 @@ class SerialEngineTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineResetTests, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): @@ -978,6 +1160,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, @@ -1018,6 +1201,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, @@ -1041,6 +1225,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, test.TestCase): @@ -1069,6 +1254,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, test.TestCase):