Merge "Add a test that checks for task result visibility"
This commit is contained in:
@@ -159,6 +159,60 @@ class EngineMultipleResultsTest(utils.EngineTestBase):
|
||||
result = engine.storage.fetch('x')
|
||||
self.assertEqual(result, 1)
|
||||
|
||||
def test_many_results_visible_to(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.AddOneSameProvidesRequires(
|
||||
'a', rebind={'value': 'source'}))
|
||||
flow.add(utils.AddOneSameProvidesRequires('b'))
|
||||
flow.add(utils.AddOneSameProvidesRequires('c'))
|
||||
engine = self._make_engine(flow, store={'source': 0})
|
||||
engine.run()
|
||||
|
||||
# Check what each task in the prior should be seeing...
|
||||
atoms = list(flow)
|
||||
a = atoms[0]
|
||||
a_kwargs = engine.storage.fetch_mapped_args(a.rebind,
|
||||
atom_name='a')
|
||||
self.assertEqual({'value': 0}, a_kwargs)
|
||||
|
||||
b = atoms[1]
|
||||
b_kwargs = engine.storage.fetch_mapped_args(b.rebind,
|
||||
atom_name='b')
|
||||
self.assertEqual({'value': 1}, b_kwargs)
|
||||
|
||||
c = atoms[2]
|
||||
c_kwargs = engine.storage.fetch_mapped_args(c.rebind,
|
||||
atom_name='c')
|
||||
self.assertEqual({'value': 2}, c_kwargs)
|
||||
|
||||
def test_many_results_storage_provided_visible_to(self):
|
||||
# This works as expected due to docs listed at
|
||||
#
|
||||
# http://docs.openstack.org/developer/taskflow/engines.html#scoping
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.AddOneSameProvidesRequires('a'))
|
||||
flow.add(utils.AddOneSameProvidesRequires('b'))
|
||||
flow.add(utils.AddOneSameProvidesRequires('c'))
|
||||
engine = self._make_engine(flow, store={'value': 0})
|
||||
engine.run()
|
||||
|
||||
# Check what each task in the prior should be seeing...
|
||||
atoms = list(flow)
|
||||
a = atoms[0]
|
||||
a_kwargs = engine.storage.fetch_mapped_args(a.rebind,
|
||||
atom_name='a')
|
||||
self.assertEqual({'value': 0}, a_kwargs)
|
||||
|
||||
b = atoms[1]
|
||||
b_kwargs = engine.storage.fetch_mapped_args(b.rebind,
|
||||
atom_name='b')
|
||||
self.assertEqual({'value': 0}, b_kwargs)
|
||||
|
||||
c = atoms[2]
|
||||
c_kwargs = engine.storage.fetch_mapped_args(c.rebind,
|
||||
atom_name='c')
|
||||
self.assertEqual({'value': 0}, c_kwargs)
|
||||
|
||||
def test_fetch_with_two_results(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneReturn(provides='x'))
|
||||
|
||||
@@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase):
|
||||
self.broker_url = 'test-url'
|
||||
self.exchange = 'test-exchange'
|
||||
self.topic = 'test-topic'
|
||||
self.endpoint_count = 24
|
||||
self.endpoint_count = 25
|
||||
|
||||
# patch classes
|
||||
self.executor_mock, self.executor_inst_mock = self.patchClass(
|
||||
|
||||
@@ -89,6 +89,13 @@ class DummyTask(task.Task):
|
||||
pass
|
||||
|
||||
|
||||
class AddOneSameProvidesRequires(task.Task):
|
||||
default_provides = 'value'
|
||||
|
||||
def execute(self, value):
|
||||
return value + 1
|
||||
|
||||
|
||||
class AddOne(task.Task):
|
||||
default_provides = 'result'
|
||||
|
||||
|
||||
Reference in New Issue
Block a user