From c7e8c868cd85c878ab9896c868f9989adc7dfc09 Mon Sep 17 00:00:00 2001 From: Rick van de Loo Date: Wed, 6 May 2015 09:56:25 +0200 Subject: [PATCH] Allow same deps for requires and provides in task This change removes the DependencyFailure that is raised when a task requires the same dependency as it provides. Taskflow returns a frozenset([]) instead of a frozenset() when more than one value with the same name is in the store. This prevents the need for an inconvenient rename function when you want to update a store variable with a new value. Example case: class Inc(task.Task): def execute(self, a): return a + 1 class AwkwardRename(task.Task): def execute(self, b): return b store = { 'a': 1 } f = linear_flow.Flow('inc-flow') f.add( Inc('t1', provides='b', requires='a', ), AwkwardRename('t2', provides='a', requires='b')) e = engines.load(f, store=store) e.run() print e.storage.fetch('a', many_handler=lambda x: x[-1]) Now with ability to have the same provides as requires: class Inc(task.Task): def execute(self, a): return a + 1 store = { 'a': 1 } f = linear_flow.Flow('inc-flow') f.add( Inc('t3', provides='a', requires='a'), ) e = engines.load(f, store=store) e.run() print e.storage.fetch('a', many_handler=lambda x: x[-1]) Change-Id: I421e1ab33508c25baf78bd76df158bb6116d6fb0 --- taskflow/atom.py | 11 ---- taskflow/tests/unit/test_engines.py | 52 +++++++++++++++++++ taskflow/tests/unit/test_flow_dependencies.py | 16 +++--- 3 files changed, 61 insertions(+), 18 deletions(-) diff --git a/taskflow/atom.py b/taskflow/atom.py index ebc5bad3..82671df7 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -28,7 +28,6 @@ from oslo_utils import reflection import six from six.moves import zip as compat_zip -from taskflow import exceptions from taskflow.types import sets from taskflow.utils import misc @@ -162,10 +161,6 @@ class Atom(object): some action that furthers the overall flows progress. It usually also produces some of its own named output as a result of this process. - NOTE(harlowja): there can be no intersection between what this atom - requires and what it produces (since this would be an impossible - dependency to satisfy). - :param name: Meaningful name for this atom, should be something that is distinguishable and understandable for notification, debugging, storing and any other similar purposes. @@ -229,12 +224,6 @@ class Atom(object): inject_keys = frozenset(six.iterkeys(self.inject)) self.requires -= inject_keys self.optional -= inject_keys - out_of_order = self.provides.intersection(self.requires) - if out_of_order: - raise exceptions.DependencyFailure( - "Atom %(item)s provides %(oo)s that are required " - "by this atom" - % dict(item=self.name, oo=sorted(out_of_order))) @abc.abstractmethod def execute(self, *args, **kwargs): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index c6a2af95..cec68f32 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -149,6 +149,53 @@ class EngineOptionalRequirementsTest(utils.EngineTestBase): self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000}) +class EngineMultipleResultsTest(utils.EngineTestBase): + def test_fetch_with_a_single_result(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow) + engine.run() + result = engine.storage.fetch('x') + self.assertEqual(result, 1) + + def test_fetch_with_two_results(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch('x') + self.assertEqual(result, 0) + + def test_fetch_all_with_a_single_result(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': 1}) + + def test_fetch_all_with_two_results(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': [0, 1]}) + + def test_task_can_update_value(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneArgOneReturn(requires='x', provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': [0, 1]}) + + class EngineLinearFlowTest(utils.EngineTestBase): def test_run_empty_flow(self): @@ -697,6 +744,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): class SerialEngineTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -723,6 +771,7 @@ class SerialEngineTest(EngineTaskTest, class ParallelEngineWithThreadsTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -761,6 +810,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -782,6 +832,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, class ParallelEngineWithProcessTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -808,6 +859,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, class WorkerBasedEngineTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 9627a696..bdb427d9 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -243,10 +243,10 @@ class FlowDependenciesTest(test.TestCase): requires=['a'])) def test_task_requires_and_provides_same_values(self): - self.assertRaises(exceptions.DependencyFailure, - utils.TaskOneArgOneReturn, - requires='a', - provides='a') + flow = lf.Flow('lf', utils.TaskOneArgOneReturn('rt', requires='x', + provides='x')) + self.assertEqual(flow.requires, set('x')) + self.assertEqual(flow.provides, set('x')) def test_retry_in_linear_flow_no_requirements_no_provides(self): flow = lf.Flow('lf', retry.AlwaysRevert('rt')) @@ -271,9 +271,11 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.provides, set(['a', 'b'])) def test_retry_requires_and_provides_same_value(self): - self.assertRaises(exceptions.DependencyFailure, - retry.AlwaysRevert, - 'rt', requires=['x', 'y'], provides=['x', 'y']) + flow = lf.Flow('lf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['x', 'y'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set(['x', 'y'])) def test_retry_in_unordered_flow_no_requirements_no_provides(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt'))