diff --git a/taskflow/atom.py b/taskflow/atom.py index ebc5bad3..82671df7 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -28,7 +28,6 @@ from oslo_utils import reflection import six from six.moves import zip as compat_zip -from taskflow import exceptions from taskflow.types import sets from taskflow.utils import misc @@ -162,10 +161,6 @@ class Atom(object): some action that furthers the overall flows progress. It usually also produces some of its own named output as a result of this process. - NOTE(harlowja): there can be no intersection between what this atom - requires and what it produces (since this would be an impossible - dependency to satisfy). - :param name: Meaningful name for this atom, should be something that is distinguishable and understandable for notification, debugging, storing and any other similar purposes. @@ -229,12 +224,6 @@ class Atom(object): inject_keys = frozenset(six.iterkeys(self.inject)) self.requires -= inject_keys self.optional -= inject_keys - out_of_order = self.provides.intersection(self.requires) - if out_of_order: - raise exceptions.DependencyFailure( - "Atom %(item)s provides %(oo)s that are required " - "by this atom" - % dict(item=self.name, oo=sorted(out_of_order))) @abc.abstractmethod def execute(self, *args, **kwargs): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index c6a2af95..cec68f32 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -149,6 +149,53 @@ class EngineOptionalRequirementsTest(utils.EngineTestBase): self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000}) +class EngineMultipleResultsTest(utils.EngineTestBase): + def test_fetch_with_a_single_result(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow) + engine.run() + result = engine.storage.fetch('x') + self.assertEqual(result, 1) + + def test_fetch_with_two_results(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch('x') + self.assertEqual(result, 0) + + def test_fetch_all_with_a_single_result(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': 1}) + + def test_fetch_all_with_two_results(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneReturn(provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': [0, 1]}) + + def test_task_can_update_value(self): + flow = lf.Flow("flow") + flow.add(utils.TaskOneArgOneReturn(requires='x', provides='x')) + + engine = self._make_engine(flow, store={'x': 0}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'x': [0, 1]}) + + class EngineLinearFlowTest(utils.EngineTestBase): def test_run_empty_flow(self): @@ -697,6 +744,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): class SerialEngineTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -723,6 +771,7 @@ class SerialEngineTest(EngineTaskTest, class ParallelEngineWithThreadsTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -761,6 +810,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -782,6 +832,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, class ParallelEngineWithProcessTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, @@ -808,6 +859,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, class WorkerBasedEngineTest(EngineTaskTest, + EngineMultipleResultsTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 9627a696..bdb427d9 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -243,10 +243,10 @@ class FlowDependenciesTest(test.TestCase): requires=['a'])) def test_task_requires_and_provides_same_values(self): - self.assertRaises(exceptions.DependencyFailure, - utils.TaskOneArgOneReturn, - requires='a', - provides='a') + flow = lf.Flow('lf', utils.TaskOneArgOneReturn('rt', requires='x', + provides='x')) + self.assertEqual(flow.requires, set('x')) + self.assertEqual(flow.provides, set('x')) def test_retry_in_linear_flow_no_requirements_no_provides(self): flow = lf.Flow('lf', retry.AlwaysRevert('rt')) @@ -271,9 +271,11 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.provides, set(['a', 'b'])) def test_retry_requires_and_provides_same_value(self): - self.assertRaises(exceptions.DependencyFailure, - retry.AlwaysRevert, - 'rt', requires=['x', 'y'], provides=['x', 'y']) + flow = lf.Flow('lf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['x', 'y'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set(['x', 'y'])) def test_retry_in_unordered_flow_no_requirements_no_provides(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt'))