From 76641d86b89cdba23ac49d8c65011467a098f6dc Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 18 Jul 2014 12:58:18 -0700 Subject: [PATCH] Relax the unordered flow symbol constraints In order to make it possible to have a symbol tree we need to relax and remove the constraints that are being imposed by the unordered constraints and later move those constraint checks and validations into the engines compilation stage. Part of blueprint taskflow-improved-scoping Change-Id: I80718b4bc01fbf0dce6a95cd2fac7e6e2e1814d1 --- taskflow/patterns/unordered_flow.py | 49 +---------------- .../unit/patterns/test_unordered_flow.py | 42 +++++++------- taskflow/tests/unit/test_flow_dependencies.py | 55 +++++++++---------- 3 files changed, 51 insertions(+), 95 deletions(-) diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index b32fbc09..52bd286e 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -14,70 +14,25 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow import exceptions from taskflow import flow class Flow(flow.Flow): - """Unordered Flow pattern. + """Unordered flow pattern. A unordered (potentially nested) flow of *tasks/flows* that can be executed in any order as one unit and rolled back as one unit. - - NOTE(harlowja): Since the flow is unordered there can *not* be any - dependency between task/flow inputs (requirements) and - task/flow outputs (provided names/values). """ def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) # NOTE(imelnikov): A unordered flow is unordered, so we use # set instead of list to save children, children so that - # people using it don't depend on the ordering + # people using it don't depend on the ordering. self._children = set() def add(self, *items): """Adds a given task/tasks/flow/flows to this flow.""" - if not items: - return self - - # check that items don't provide anything that other - # part of flow provides or requires - provides = self.provides - old_requires = self.requires - for item in items: - item_provides = item.provides - bad_provs = item_provides & old_requires - if bad_provs: - raise exceptions.DependencyFailure( - "%(item)s provides %(oo)s that are required " - "by other item(s) of unordered flow %(flow)s" - % dict(item=item.name, flow=self.name, - oo=sorted(bad_provs))) - same_provides = provides & item.provides - if same_provides: - raise exceptions.DependencyFailure( - "%(item)s provides %(value)s but is already being" - " provided by %(flow)s and duplicate producers" - " are disallowed" - % dict(item=item.name, flow=self.name, - value=sorted(same_provides))) - provides |= item.provides - - # check that items don't require anything other children provides - if self.retry: - # NOTE(imelnikov): it is allowed to depend on value provided - # by retry controller of the flow - provides -= self.retry.provides - for item in items: - bad_reqs = provides & item.requires - if bad_reqs: - raise exceptions.DependencyFailure( - "%(item)s requires %(oo)s that are provided " - "by other item(s) of unordered flow %(flow)s" - % dict(item=item.name, flow=self.name, - oo=sorted(bad_reqs))) - self._children.update(items) return self diff --git a/taskflow/tests/unit/patterns/test_unordered_flow.py b/taskflow/tests/unit/patterns/test_unordered_flow.py index a4043fe2..e55cfad0 100644 --- a/taskflow/tests/unit/patterns/test_unordered_flow.py +++ b/taskflow/tests/unit/patterns/test_unordered_flow.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow import exceptions as exc from taskflow.patterns import unordered_flow as uf from taskflow import retry from taskflow import test @@ -59,7 +58,7 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(f.requires, set(['a', 'b'])) self.assertEqual(f.provides, set(['c', 'd'])) - def test_unordered_flow_two_independent_tasks(self): + def test_unordered_flow_two_tasks(self): task1 = _task(name='task1') task2 = _task(name='task2') f = uf.Flow('test').add(task1, task2) @@ -68,35 +67,29 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(set(f), set([task1, task2])) self.assertEqual(list(f.iter_links()), []) - def test_unordered_flow_two_dependent_tasks(self): - task1 = _task(name='task1', provides=['a']) - task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task1, task2) - - def test_unordered_flow_two_dependent_tasks_two_different_calls(self): + def test_unordered_flow_two_tasks_two_different_calls(self): task1 = _task(name='task1', provides=['a']) task2 = _task(name='task2', requires=['a']) f = uf.Flow('test').add(task1) - self.assertRaises(exc.DependencyFailure, f.add, task2) + f.add(task2) + self.assertEqual(len(f), 2) + self.assertEqual(set(['a']), f.requires) + self.assertEqual(set(['a']), f.provides) - def test_unordered_flow_two_dependent_tasks_reverse_order(self): + def test_unordered_flow_two_tasks_reverse_order(self): task1 = _task(name='task1', provides=['a']) task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task2, task1) - - def test_unordered_flow_two_dependent_tasks_reverse_order2(self): - task1 = _task(name='task1', provides=['a']) - task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test').add(task2) - self.assertRaises(exc.DependencyFailure, f.add, task1) + f = uf.Flow('test').add(task2).add(task1) + self.assertEqual(len(f), 2) + self.assertEqual(set(['a']), f.requires) + self.assertEqual(set(['a']), f.provides) def test_unordered_flow_two_task_same_provide(self): task1 = _task(name='task1', provides=['a', 'b']) task2 = _task(name='task2', provides=['a', 'c']) f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task2, task1) + f.add(task2, task1) + self.assertEqual(len(f), 2) def test_unordered_flow_with_retry(self): ret = retry.AlwaysRevert(requires=['a'], provides=['b']) @@ -106,3 +99,12 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(f.requires, set(['a'])) self.assertEqual(f.provides, set(['b'])) + + def test_unordered_flow_with_retry_fully_satisfies(self): + ret = retry.AlwaysRevert(provides=['b', 'a']) + f = uf.Flow('test', ret) + f.add(_task(name='task1', requires=['a'])) + self.assertIs(f.retry, ret) + self.assertEqual(ret.name, 'test_retry') + self.assertEqual(f.requires, set([])) + self.assertEqual(f.provides, set(['b', 'a'])) diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 3f8e7509..47604e81 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -130,24 +130,27 @@ class FlowDependenciesTest(test.TestCase): def test_unordered_flow_provides_required_values(self): flow = uf.Flow('uf') - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('task1', provides='x'), - utils.TaskOneArg('task2')) + flow.add(utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) + flow.add(utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_requires_provided_value_other_call(self): flow = uf.Flow('uf') flow.add(utils.TaskOneReturn('task1', provides='x')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneArg('task2')) + flow.add(utils.TaskOneArg('task2')) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_provides_required_value_other_call(self): flow = uf.Flow('uf') flow.add(utils.TaskOneArg('task2')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('task1', provides='x')) + flow.add(utils.TaskOneReturn('task1', provides='x')) + self.assertEqual(2, len(flow)) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_multi_provides_and_requires_values(self): flow = uf.Flow('uf').add( @@ -161,16 +164,14 @@ class FlowDependenciesTest(test.TestCase): def test_unordered_flow_provides_same_values(self): flow = uf.Flow('uf').add(utils.TaskOneReturn(provides='x')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides='x')) + flow.add(utils.TaskOneReturn(provides='x')) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_provides_same_values_one_add(self): flow = uf.Flow('uf') - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides='x'), - utils.TaskOneReturn(provides='x')) + flow.add(utils.TaskOneReturn(provides='x'), + utils.TaskOneReturn(provides='x')) + self.assertEqual(set(['x']), flow.provides) def test_nested_flows_requirements(self): flow = uf.Flow('uf').add( @@ -339,24 +340,22 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.requires, set(['x', 'y', 'c'])) self.assertEqual(flow.provides, set(['a', 'b', 'z'])) - def test_unordered_flow_retry_and_task_dependency_conflict(self): + def test_unordered_flow_retry_and_task_same_requires_provides(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides=['x'])) + flow.add(utils.TaskOneReturn(provides=['x'])) + self.assertEqual(set(['x']), flow.requires) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_retry_and_task_provide_same_value(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('t1', provides=['x'])) + flow.add(utils.TaskOneReturn('t1', provides=['x'])) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_retry_two_tasks_provide_same_value(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('t1', provides=['x']), - utils.TaskOneReturn('t2', provides=['x'])) + flow.add(utils.TaskOneReturn('t1', provides=['x']), + utils.TaskOneReturn('t2', provides=['x'])) + self.assertEqual(set(['x', 'y']), flow.provides) def test_graph_flow_retry_and_task(self): flow = gf.Flow('gf', retry.AlwaysRevert('rt',