diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 650d93fd..9f85efe9 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -40,6 +40,7 @@ class Flow(flow.Flow): # NOTE(imelnikov): we add item to the end of flow, so it should # not provide anything previous items of the flow require requires = self.requires + provides = self.provides for item in items: requires |= item.requires out_of_order = requires & item.provides @@ -49,6 +50,15 @@ class Flow(flow.Flow): "by previous item(s) of linear flow %(flow)s" % dict(item=item.name, flow=self.name, oo=sorted(out_of_order))) + same_provides = provides & item.provides + if same_provides: + raise exceptions.DependencyFailure( + "%(item)s provides %(value)s but is already being" + " provided by %(flow)s and duplicate producers" + " are disallowed" + % dict(item=item.name, flow=self.name, + value=sorted(same_provides))) + provides |= item.provides self._children.extend(items) return self diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 294176e6..200fbdd0 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -52,7 +52,15 @@ class Flow(flow.Flow): "by other item(s) of unordered flow %(flow)s" % dict(item=item.name, flow=self.name, oo=sorted(bad_provs))) - provides |= item_provides + same_provides = provides & item.provides + if same_provides: + raise exceptions.DependencyFailure( + "%(item)s provides %(value)s but is already being" + " provided by %(flow)s and duplicate producers" + " are disallowed" + % dict(item=item.name, flow=self.name, + value=sorted(same_provides))) + provides |= item.provides for item in items: bad_reqs = provides & item.requires diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 0bdb7abf..cf2f8f9a 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -110,12 +110,25 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k'])) def test_linear_flow_self_requires(self): - flow = lf.Flow('uf') + flow = lf.Flow('lf') self.assertRaises(exceptions.InvariantViolationException, flow.add, utils.TaskNoRequiresNoReturns(rebind=['x'], provides='x')) + def test_linear_flow_provides_same_values(self): + flow = lf.Flow('lf').add(utils.TaskOneReturn(provides='x')) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn(provides='x')) + + def test_linear_flow_provides_same_values_one_add(self): + flow = lf.Flow('lf') + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn(provides='x'), + utils.TaskOneReturn(provides='x')) + def test_unordered_flow_without_dependencies(self): flow = uf.Flow('uf').add( utils.TaskNoRequiresNoReturns('task1'), @@ -182,6 +195,19 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z'])) self.assertEqual(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k'])) + def test_unordered_flow_provides_same_values(self): + flow = uf.Flow('uf').add(utils.TaskOneReturn(provides='x')) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn(provides='x')) + + def test_unordered_flow_provides_same_values_one_add(self): + flow = uf.Flow('uf') + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn(provides='x'), + utils.TaskOneReturn(provides='x')) + def test_nested_flows_requirements(self): flow = uf.Flow('uf').add( lf.Flow('lf').add( @@ -196,6 +222,13 @@ class FlowDependenciesTest(test.TestCase): self.assertEqual(flow.requires, set(['a', 'b', 'c'])) self.assertEqual(flow.provides, set(['x', 'y', 'z', 'q'])) + def test_nested_flows_provides_same_values(self): + flow = lf.Flow('lf').add( + uf.Flow('uf').add(utils.TaskOneReturn(provides='x'))) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + gf.Flow('gf').add(utils.TaskOneReturn(provides='x'))) + def test_graph_flow_without_dependencies(self): flow = gf.Flow('gf').add( utils.TaskNoRequiresNoReturns('task1'),