From 698d7561913c02e267a92676efd8f71582e70087 Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Thu, 26 Sep 2013 17:33:07 +0300 Subject: [PATCH] Suspend single and multi threaded engines Change-Id: I908c8f4e98f715a4c4b5b6fa7db35e125439ec38 --- taskflow/engines/action_engine/engine.py | 45 +++++++-- .../engines/action_engine/graph_action.py | 24 ++++- taskflow/states.py | 2 + taskflow/tests/unit/test_action_engine.py | 91 +++++++++++++++++-- 4 files changed, 141 insertions(+), 21 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 235c6345..6619fdcb 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -47,14 +47,17 @@ class ActionEngine(object): self._root = None self._flow = flow self._lock = threading.RLock() + self._state_lock = threading.RLock() self.notifier = misc.TransitionNotifier() self.task_notifier = misc.TransitionNotifier() self.storage = storage def _revert(self, current_failure): self._change_state(states.REVERTING) - self._root.revert(self) - self._change_state(states.REVERTED) + state = self._root.revert(self) + self._change_state(state) + if state == states.SUSPENDED: + return self._change_state(states.FAILURE) if self._failures: if len(self._failures) == 1: @@ -68,29 +71,43 @@ class ActionEngine(object): def _reset(self): self._failures = [] + def suspend(self): + self._change_state(states.SUSPENDING) + def get_graph(self): self.compile() return self._root.graph @decorators.locked def run(self): - self.compile() - self._reset() + if self.storage.get_flow_state() != states.SUSPENDED: + self.compile() + self._reset() - external_provides = set(self.storage.fetch_all().keys()) - missing = self._flow.requires - external_provides - if missing: - raise exc.MissingDependencies(self._flow, sorted(missing)) + external_provides = set(self.storage.fetch_all().keys()) + missing = self._flow.requires - external_provides + if missing: + raise exc.MissingDependencies(self._flow, sorted(missing)) + self._run() + elif self._failures: + self._revert(self._failures[-1]) + else: + self._run() + def _run(self): self._change_state(states.RUNNING) try: - self._root.execute(self) + state = self._root.execute(self) except Exception: self._revert(misc.Failure()) else: - self._change_state(states.SUCCESS) + self._change_state(state) + @decorators.locked(lock='_state_lock') def _change_state(self, state): + if (state == states.SUSPENDING and not (self.is_running or + self.is_reverting)): + return self.storage.set_flow_state(state) details = dict(engine=self) self.notifier.notify(state, details) @@ -117,6 +134,14 @@ class ActionEngine(object): if self._root is None: self._root = self._translate_flow_to_action() + @property + def is_running(self): + return self.storage.get_flow_state() == states.RUNNING + + @property + def is_reverting(self): + return self.storage.get_flow_state() == states.REVERTING + class SingleThreadedActionEngine(ActionEngine): # This one attempts to run in a serial manner. diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index b9d27e80..505f0b01 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -24,6 +24,7 @@ from concurrent import futures from taskflow.engines.action_engine import base_action as base from taskflow import exceptions as exc +from taskflow import states as st from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -78,27 +79,35 @@ class SequentialGraphAction(GraphAction): deps_counter = self._get_nodes_dependencies_count() to_execute = self._browse_nodes_to_execute(deps_counter) - while to_execute: + while to_execute and engine.is_running: node = to_execute.pop() action = self._action_mapping[node] action.execute(engine) # raises on failure to_execute += self._resolve_dependencies(node, deps_counter) + if to_execute: + return st.SUSPENDED + return st.SUCCESS + def revert(self, engine): deps_counter = self._get_nodes_dependencies_count(True) to_revert = self._browse_nodes_to_execute(deps_counter) - while to_revert: + while to_revert and engine.is_reverting: node = to_revert.pop() action = self._action_mapping[node] action.revert(engine) # raises on failure to_revert += self._resolve_dependencies(node, deps_counter, True) + if to_revert: + return st.SUSPENDED + return st.REVERTED + class ParallelGraphAction(SequentialGraphAction): def execute(self, engine): """This action executes the provided graph in parallel by selecting - nodes which can run (those which have there dependencies satisified + nodes which can run (those which have there dependencies satisfied or those with no dependencies) and submitting them to the executor to be ran, and then after running this process will be repeated until no more nodes can be ran (or a failure has a occured and all nodes @@ -110,6 +119,7 @@ class ParallelGraphAction(SequentialGraphAction): has_failed = threading.Event() deps_lock = threading.RLock() deps_counter = self._get_nodes_dependencies_count() + self._future_flow_state = st.SUCCESS def submit_followups(node): # Mutating the deps_counter isn't thread safe. @@ -133,7 +143,11 @@ class ParallelGraphAction(SequentialGraphAction): return action = self._action_mapping[node] try: - action.execute(engine) + if engine.is_running: + action.execute(engine) + else: + self._future_flow_state = st.SUSPENDED + return except Exception: # Make sure others don't continue working (although they may # be already actively working, but u can't stop that anyway). @@ -189,3 +203,5 @@ class ParallelGraphAction(SequentialGraphAction): for fail in failures]) elif len(failures) == 1: failures[0].reraise() + + return self._future_flow_state diff --git a/taskflow/states.py b/taskflow/states.py index aeb094e9..0ec3d5c7 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -36,6 +36,8 @@ STARTED = 'STARTED' SUCCESS = SUCCESS CANCELLED = 'CANCELLED' INCOMPLETE = 'INCOMPLETE' +SUSPENDING = 'SUSPENDING' +SUSPENDED = 'SUSPENDED' # Task states. FAILURE = FAILURE diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 5d9d7128..e81d1f9c 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -27,7 +27,7 @@ from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow.engines.action_engine import engine as eng -from taskflow import exceptions +from taskflow import exceptions as exc from taskflow.persistence.backends import impl_memory from taskflow.persistence import logbook from taskflow.persistence import utils as p_utils @@ -113,6 +113,27 @@ class MultiDictTask(task.Task): return output +class AutoSuspendingTask(TestTask): + + def execute(self, engine): + result = super(AutoSuspendingTask, self).execute() + engine.suspend() + return result + + def revert(self, egnine, result): + super(AutoSuspendingTask, self).revert(**{'result': result}) + + +class AutoSuspendingTaskOnRevert(TestTask): + + def execute(self, engine): + return super(AutoSuspendingTaskOnRevert, self).execute() + + def revert(self, engine, result): + super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result}) + engine.suspend() + + class EngineTestBase(object): def setUp(self): super(EngineTestBase, self).setUp() @@ -251,7 +272,7 @@ class EngineTaskTest(EngineTestBase): flow = MultiargsTask(provides='result') engine = self._make_engine(flow) engine.storage.inject({'a': 1, 'b': 4, 'x': 17}) - with self.assertRaises(exceptions.MissingDependencies): + with self.assertRaises(exc.MissingDependencies): engine.run() def test_partial_arguments_mapping(self): @@ -285,7 +306,7 @@ class EngineTaskTest(EngineTestBase): rebind={'b': 'z'}) engine = self._make_engine(flow) engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - with self.assertRaises(exceptions.MissingDependencies): + with self.assertRaises(exc.MissingDependencies): engine.run() def test_invalid_argument_name_list(self): @@ -294,7 +315,7 @@ class EngineTaskTest(EngineTestBase): rebind=['a', 'z', 'b']) engine = self._make_engine(flow) engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - with self.assertRaises(exceptions.MissingDependencies): + with self.assertRaises(exc.MissingDependencies): engine.run() def test_bad_rebind_args_value(self): @@ -487,14 +508,14 @@ class EngineGraphFlowTest(EngineTestBase): self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4']) def test_graph_cyclic_dependency(self): - with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'): gf.Flow('g-3-cyclic').add( TestTask([], name='task1', provides='a', requires=['b']), TestTask([], name='task2', provides='b', requires=['c']), TestTask([], name='task3', provides='c', requires=['a'])) def test_graph_two_tasks_returns_same_value(self): - with self.assertRaisesRegexp(exceptions.DependencyFailure, + with self.assertRaisesRegexp(exc.DependencyFailure, "task2 provides a but is already being" " provided by task1 and duplicate" " producers are disallowed"): @@ -547,7 +568,7 @@ class EngineGraphFlowTest(EngineTestBase): }) def test_one_task_provides_and_requires_same_data(self): - with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'): gf.Flow('g-1-req-error').add( TestTask([], name='task1', requires=['a'], provides='a')) @@ -568,10 +589,65 @@ class EngineGraphFlowTest(EngineTestBase): self.assertTrue(isinstance(graph, networkx.DiGraph)) +class SuspendFlowTest(EngineTestBase): + + def test_suspend_one_task(self): + flow = AutoSuspendingTask(self.values, 'a') + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a']) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a']) + + def test_suspend_linear_flow(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTask(self.values, 'b'), + TestTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals(self.values, ['a', 'b']) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a', 'b', 'c']) + + def test_suspend_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTaskOnRevert(self.values, 'b'), + FailingTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals(self.values, + ['a', + 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)']) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.FAILURE) + self.assertEquals(self.values, + ['a', + 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)', + 'a reverted(5)']) + + class SingleThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineGraphFlowTest, + SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): if flow_detail is None: @@ -585,6 +661,7 @@ class MultiThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineGraphFlowTest, + SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if flow_detail is None: