From 1764e3efc9042819d127d26ddff0c1bbed698d83 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 21 Apr 2015 18:01:59 -0700 Subject: [PATCH] Retain chain of missing dependencies Change-Id: I170bedb5c667aa30764ad29634910dda40c6cd49 --- taskflow/engines/action_engine/engine.py | 30 ++++++++++++++++++++---- taskflow/tests/unit/test_engines.py | 25 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index e7763068..bed43e8e 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -225,12 +225,34 @@ class ActionEngine(base.Engine): execution_graph.number_of_edges(), nx.density(execution_graph)) missing = set() - fetch = self.storage.fetch_unsatisfied_args + # Attempt to retain a chain of what was missing (so that the final + # raised exception for the flow has the nodes that had missing + # dependencies). + last_cause = None + last_node = None + missing_nodes = 0 + fetch_func = self.storage.fetch_unsatisfied_args for node in execution_graph.nodes_iter(): - missing.update(fetch(node.name, node.rebind, - optional_args=node.optional)) + node_missing = fetch_func(node.name, node.rebind, + optional_args=node.optional) + if node_missing: + cause = exc.MissingDependencies(node, + sorted(node_missing), + cause=last_cause) + last_cause = cause + last_node = node + missing_nodes += 1 + missing.update(node_missing) if missing: - raise exc.MissingDependencies(self._flow, sorted(missing)) + # For when a task is provided (instead of a flow) and that + # task is the only item in the graph and its missing deps, avoid + # re-wrapping it in yet another exception... + if missing_nodes == 1 and last_node is self._flow: + raise last_cause + else: + raise exc.MissingDependencies(self._flow, + sorted(missing), + cause=last_cause) @lock_utils.locked def prepare(self): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 04b8fa3a..c6a2af95 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -651,6 +651,26 @@ class EngineGraphFlowTest(utils.EngineTestBase): self.assertIsInstance(graph, gr.DiGraph) +class EngineMissingDepsTest(utils.EngineTestBase): + def test_missing_deps_deep(self): + flow = gf.Flow('missing-many').add( + utils.TaskOneReturn(name='task1', + requires=['a', 'b', 'c']), + utils.TaskMultiArgOneReturn(name='task2', + rebind=['e', 'f', 'g'])) + engine = self._make_engine(flow) + engine.compile() + engine.prepare() + self.assertRaises(exc.MissingDependencies, engine.validate) + c_e = None + try: + engine.validate() + except exc.MissingDependencies as e: + c_e = e + self.assertIsNotNone(c_e) + self.assertIsNotNone(c_e.cause) + + class EngineCheckingTaskTest(utils.EngineTestBase): # FIXME: this test uses a inner class that workers/process engines can't # get to, so we need to do something better to make this test useful for @@ -682,6 +702,7 @@ class SerialEngineTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineMissingDepsTest, EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, @@ -707,6 +728,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineMissingDepsTest, EngineCheckingTaskTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -744,6 +766,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineMissingDepsTest, EngineCheckingTaskTest, test.TestCase): @@ -764,6 +787,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineMissingDepsTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -789,6 +813,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineLinearAndUnorderedExceptionsTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, + EngineMissingDepsTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp()