diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index b32fbc09..52bd286e 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -14,70 +14,25 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow import exceptions from taskflow import flow class Flow(flow.Flow): - """Unordered Flow pattern. + """Unordered flow pattern. A unordered (potentially nested) flow of *tasks/flows* that can be executed in any order as one unit and rolled back as one unit. - - NOTE(harlowja): Since the flow is unordered there can *not* be any - dependency between task/flow inputs (requirements) and - task/flow outputs (provided names/values). """ def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) # NOTE(imelnikov): A unordered flow is unordered, so we use # set instead of list to save children, children so that - # people using it don't depend on the ordering + # people using it don't depend on the ordering. self._children = set() def add(self, *items): """Adds a given task/tasks/flow/flows to this flow.""" - if not items: - return self - - # check that items don't provide anything that other - # part of flow provides or requires - provides = self.provides - old_requires = self.requires - for item in items: - item_provides = item.provides - bad_provs = item_provides & old_requires - if bad_provs: - raise exceptions.DependencyFailure( - "%(item)s provides %(oo)s that are required " - "by other item(s) of unordered flow %(flow)s" - % dict(item=item.name, flow=self.name, - oo=sorted(bad_provs))) - same_provides = provides & item.provides - if same_provides: - raise exceptions.DependencyFailure( - "%(item)s provides %(value)s but is already being" - " provided by %(flow)s and duplicate producers" - " are disallowed" - % dict(item=item.name, flow=self.name, - value=sorted(same_provides))) - provides |= item.provides - - # check that items don't require anything other children provides - if self.retry: - # NOTE(imelnikov): it is allowed to depend on value provided - # by retry controller of the flow - provides -= self.retry.provides - for item in items: - bad_reqs = provides & item.requires - if bad_reqs: - raise exceptions.DependencyFailure( - "%(item)s requires %(oo)s that are provided " - "by other item(s) of unordered flow %(flow)s" - % dict(item=item.name, flow=self.name, - oo=sorted(bad_reqs))) - self._children.update(items) return self diff --git a/taskflow/tests/unit/patterns/test_unordered_flow.py b/taskflow/tests/unit/patterns/test_unordered_flow.py index a4043fe2..e55cfad0 100644 --- a/taskflow/tests/unit/patterns/test_unordered_flow.py +++ b/taskflow/tests/unit/patterns/test_unordered_flow.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow import exceptions as exc from taskflow.patterns import unordered_flow as uf from taskflow import retry from taskflow import test @@ -59,7 +58,7 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(f.requires, set(['a', 'b'])) self.assertEqual(f.provides, set(['c', 'd'])) - def test_unordered_flow_two_independent_tasks(self): + def test_unordered_flow_two_tasks(self): task1 = _task(name='task1') task2 = _task(name='task2') f = uf.Flow('test').add(task1, task2) @@ -68,35 +67,29 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(set(f), set([task1, task2])) self.assertEqual(list(f.iter_links()), []) - def test_unordered_flow_two_dependent_tasks(self): - task1 = _task(name='task1', provides=['a']) - task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task1, task2) - - def test_unordered_flow_two_dependent_tasks_two_different_calls(self): + def test_unordered_flow_two_tasks_two_different_calls(self): task1 = _task(name='task1', provides=['a']) task2 = _task(name='task2', requires=['a']) f = uf.Flow('test').add(task1) - self.assertRaises(exc.DependencyFailure, f.add, task2) + f.add(task2) + self.assertEqual(len(f), 2) + self.assertEqual(set(['a']), f.requires) + self.assertEqual(set(['a']), f.provides) - def test_unordered_flow_two_dependent_tasks_reverse_order(self): + def test_unordered_flow_two_tasks_reverse_order(self): task1 = _task(name='task1', provides=['a']) task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task2, task1) - - def test_unordered_flow_two_dependent_tasks_reverse_order2(self): - task1 = _task(name='task1', provides=['a']) - task2 = _task(name='task2', requires=['a']) - f = uf.Flow('test').add(task2) - self.assertRaises(exc.DependencyFailure, f.add, task1) + f = uf.Flow('test').add(task2).add(task1) + self.assertEqual(len(f), 2) + self.assertEqual(set(['a']), f.requires) + self.assertEqual(set(['a']), f.provides) def test_unordered_flow_two_task_same_provide(self): task1 = _task(name='task1', provides=['a', 'b']) task2 = _task(name='task2', provides=['a', 'c']) f = uf.Flow('test') - self.assertRaises(exc.DependencyFailure, f.add, task2, task1) + f.add(task2, task1) + self.assertEqual(len(f), 2) def test_unordered_flow_with_retry(self): ret = retry.AlwaysRevert(requires=['a'], provides=['b']) @@ -106,3 +99,12 @@ class UnorderedFlowTest(test.TestCase): self.assertEqual(f.requires, set(['a'])) self.assertEqual(f.provides, set(['b'])) + + def test_unordered_flow_with_retry_fully_satisfies(self): + ret = retry.AlwaysRevert(provides=['b', 'a']) + f = uf.Flow('test', ret) + f.add(_task(name='task1', requires=['a'])) + self.assertIs(f.retry, ret) + self.assertEqual(ret.name, 'test_retry') + self.assertEqual(f.requires, set([])) + self.assertEqual(f.provides, set(['b', 'a'])) diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 3f8e7509..47604e81 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -130,24 +130,27 @@ class FlowDependenciesTest(test.TestCase): def test_unordered_flow_provides_required_values(self): flow = uf.Flow('uf') - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('task1', provides='x'), - utils.TaskOneArg('task2')) + flow.add(utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) + flow.add(utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_requires_provided_value_other_call(self): flow = uf.Flow('uf') flow.add(utils.TaskOneReturn('task1', provides='x')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneArg('task2')) + flow.add(utils.TaskOneArg('task2')) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_provides_required_value_other_call(self): flow = uf.Flow('uf') flow.add(utils.TaskOneArg('task2')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('task1', provides='x')) + flow.add(utils.TaskOneReturn('task1', provides='x')) + self.assertEqual(2, len(flow)) + self.assertEqual(set(['x']), flow.provides) + self.assertEqual(set(['x']), flow.requires) def test_unordered_flow_multi_provides_and_requires_values(self): flow = uf.Flow('uf').add( @@ -161,16 +164,14 @@ class FlowDependenciesTest(test.TestCase): def test_unordered_flow_provides_same_values(self): flow = uf.Flow('uf').add(utils.TaskOneReturn(provides='x')) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides='x')) + flow.add(utils.TaskOneReturn(provides='x')) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_provides_same_values_one_add(self): flow = uf.Flow('uf') - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides='x'), - utils.TaskOneReturn(provides='x')) + flow.add(utils.TaskOneReturn(provides='x'), + utils.TaskOneReturn(provides='x')) + self.assertEqual(set(['x']), flow.provides) def test_nested_flows_requirements(self): flow = uf.Flow('uf').add( @@ -339,24 +340,22 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.requires, set(['x', 'y', 'c'])) self.assertEqual(flow.provides, set(['a', 'b', 'z'])) - def test_unordered_flow_retry_and_task_dependency_conflict(self): + def test_unordered_flow_retry_and_task_same_requires_provides(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn(provides=['x'])) + flow.add(utils.TaskOneReturn(provides=['x'])) + self.assertEqual(set(['x']), flow.requires) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_retry_and_task_provide_same_value(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('t1', provides=['x'])) + flow.add(utils.TaskOneReturn('t1', provides=['x'])) + self.assertEqual(set(['x']), flow.provides) def test_unordered_flow_retry_two_tasks_provide_same_value(self): flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y'])) - self.assertRaises(exceptions.DependencyFailure, - flow.add, - utils.TaskOneReturn('t1', provides=['x']), - utils.TaskOneReturn('t2', provides=['x'])) + flow.add(utils.TaskOneReturn('t1', provides=['x']), + utils.TaskOneReturn('t2', provides=['x'])) + self.assertEqual(set(['x', 'y']), flow.provides) def test_graph_flow_retry_and_task(self): flow = gf.Flow('gf', retry.AlwaysRevert('rt',