diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cec68f32..4e38dfa5 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -159,6 +159,60 @@ class EngineMultipleResultsTest(utils.EngineTestBase): result = engine.storage.fetch('x') self.assertEqual(result, 1) + def test_many_results_visible_to(self): + flow = lf.Flow("flow") + flow.add(utils.AddOneSameProvidesRequires( + 'a', rebind={'value': 'source'})) + flow.add(utils.AddOneSameProvidesRequires('b')) + flow.add(utils.AddOneSameProvidesRequires('c')) + engine = self._make_engine(flow, store={'source': 0}) + engine.run() + + # Check what each task in the prior should be seeing... + atoms = list(flow) + a = atoms[0] + a_kwargs = engine.storage.fetch_mapped_args(a.rebind, + atom_name='a') + self.assertEqual({'value': 0}, a_kwargs) + + b = atoms[1] + b_kwargs = engine.storage.fetch_mapped_args(b.rebind, + atom_name='b') + self.assertEqual({'value': 1}, b_kwargs) + + c = atoms[2] + c_kwargs = engine.storage.fetch_mapped_args(c.rebind, + atom_name='c') + self.assertEqual({'value': 2}, c_kwargs) + + def test_many_results_storage_provided_visible_to(self): + # This works as expected due to docs listed at + # + # http://docs.openstack.org/developer/taskflow/engines.html#scoping + flow = lf.Flow("flow") + flow.add(utils.AddOneSameProvidesRequires('a')) + flow.add(utils.AddOneSameProvidesRequires('b')) + flow.add(utils.AddOneSameProvidesRequires('c')) + engine = self._make_engine(flow, store={'value': 0}) + engine.run() + + # Check what each task in the prior should be seeing... + atoms = list(flow) + a = atoms[0] + a_kwargs = engine.storage.fetch_mapped_args(a.rebind, + atom_name='a') + self.assertEqual({'value': 0}, a_kwargs) + + b = atoms[1] + b_kwargs = engine.storage.fetch_mapped_args(b.rebind, + atom_name='b') + self.assertEqual({'value': 0}, b_kwargs) + + c = atoms[2] + c_kwargs = engine.storage.fetch_mapped_args(c.rebind, + atom_name='c') + self.assertEqual({'value': 0}, c_kwargs) + def test_fetch_with_two_results(self): flow = lf.Flow("flow") flow.add(utils.TaskOneReturn(provides='x')) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index a475c51d..3acf245b 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.endpoint_count = 24 + self.endpoint_count = 25 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 031dd706..43f208bb 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -89,6 +89,13 @@ class DummyTask(task.Task): pass +class AddOneSameProvidesRequires(task.Task): + default_provides = 'value' + + def execute(self, value): + return value + 1 + + class AddOne(task.Task): default_provides = 'result'