diff --git a/taskflow/flow.py b/taskflow/flow.py index 34ee58e2d..d94daee24 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -47,14 +47,32 @@ class Flow(object): - provides """ - def __init__(self, name): + def __init__(self, name, retry=None): self._name = six.text_type(name) + self._retry = retry + # If retry doesn't have a name, + # the name of its owner will be assigned + if self._retry: + self._retry_provides = self.retry.provides + self._retry_requires = self.retry.requires + if not self._retry.name: + self._retry.set_name(self.name + "_retry") + else: + self._retry_provides = set() + self._retry_requires = set() @property def name(self): """A non-unique name for this flow (human readable).""" return self._name + @property + def retry(self): + """A retry object that will affect control how (and if) this flow + retries while execution is underway. + """ + return self._retry + @abc.abstractmethod def __len__(self): """Returns how many items are in this flow.""" diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 1c9a9be2f..5eb73ca8b 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -37,8 +37,8 @@ class Flow(flow.Flow): Note: Cyclic dependencies are not allowed. """ - def __init__(self, name): - super(Flow, self).__init__(name) + def __init__(self, name, retry=None): + super(Flow, self).__init__(name, retry) self._graph = nx.freeze(nx.DiGraph()) def _validate(self, graph=None): @@ -107,6 +107,11 @@ class Flow(flow.Flow): for value in node.provides: provided[value] = node + if self.retry: + update_requirements(self.retry) + provided.update(dict((k, + self.retry) for k in self._retry_provides)) + # NOTE(harlowja): Add items and edges to a temporary copy of the # underlying graph and only if that is successful added to do we then # swap with the underlying graph. @@ -123,6 +128,15 @@ class Flow(flow.Flow): % dict(item=item.name, flow=provided[value].name, value=value)) + if value in self._retry_requires: + raise exc.InvariantViolation( + "Flows retry controller %(retry)s requires %(value)s " + "but item %(item)s being added to the flow produces " + "that item, this creates a cyclic dependency and is " + "disallowed" + % dict(item=item.name, + retry=self.retry.name, + value=value)) provided[value] = item for value in item.requires: @@ -149,6 +163,7 @@ class Flow(flow.Flow): @property def provides(self): provides = set() + provides.update(self._retry_provides) for subflow in self: provides.update(subflow.provides) return provides @@ -156,6 +171,7 @@ class Flow(flow.Flow): @property def requires(self): requires = set() + requires.update(self._retry_requires) for subflow in self: requires.update(subflow.requires) return requires - self.provides diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index c364541aa..5ffd2116c 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -29,8 +29,8 @@ class Flow(flow.Flow): depend on outputs (provided names/values) of tasks/flows that follow it. """ - def __init__(self, name): - super(Flow, self).__init__(name) + def __init__(self, name, retry=None): + super(Flow, self).__init__(name, retry) self._children = [] def add(self, *items): @@ -77,6 +77,7 @@ class Flow(flow.Flow): @property def provides(self): provides = set() + provides.update(self._retry_provides) for subflow in self._children: provides.update(subflow.provides) return provides @@ -85,6 +86,8 @@ class Flow(flow.Flow): def requires(self): requires = set() provides = set() + requires.update(self._retry_requires) + provides.update(self._retry_provides) for subflow in self._children: requires.update(subflow.requires - provides) provides.update(subflow.provides) diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 672a5335b..efa226ea4 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -29,8 +29,8 @@ class Flow(flow.Flow): task/flow outputs (provided names/values). """ - def __init__(self, name): - super(Flow, self).__init__(name) + def __init__(self, name, retry=None): + super(Flow, self).__init__(name, retry) # NOTE(imelnikov): A unordered flow is unordered, so we use # set instead of list to save children, children so that # people using it don't depend on the ordering @@ -43,7 +43,10 @@ class Flow(flow.Flow): # NOTE(harlowja): check that items to be added are actually # independent. - provides = self.provides + provides = set() + for subflow in self: + provides.update(subflow.provides) + old_requires = self.requires for item in items: item_provides = item.provides @@ -54,7 +57,7 @@ class Flow(flow.Flow): "by other item(s) of unordered flow %(flow)s" % dict(item=item.name, flow=self.name, oo=sorted(bad_provs))) - same_provides = provides & item.provides + same_provides = (provides | self._retry_provides) & item.provides if same_provides: raise exceptions.DependencyFailure( "%(item)s provides %(value)s but is already being" @@ -79,6 +82,7 @@ class Flow(flow.Flow): @property def provides(self): provides = set() + provides.update(self._retry_provides) for subflow in self: provides.update(subflow.provides) return provides @@ -88,6 +92,8 @@ class Flow(flow.Flow): requires = set() for subflow in self: requires.update(subflow.requires) + requires.update(self._retry_requires) + requires -= self._retry_provides return requires def __len__(self): diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index edb21e13d..cec2c6dde 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -19,6 +19,7 @@ from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import exceptions +from taskflow import retry from taskflow import test from taskflow.tests import utils @@ -281,3 +282,162 @@ class FlowDependenciesTest(test.TestCase): utils.TaskOneArgOneReturn, requires='a', provides='a') + + def test_retry_in_linear_flow_no_requirements_no_provides(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt')) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set()) + + def test_retry_in_linear_flow_with_requirements(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', requires=['x', 'y'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set()) + + def test_retry_in_linear_flow_with_provides(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', provides=['x', 'y'])) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set(['x', 'y'])) + + def test_retry_in_linear_flow_requires_and_provides(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set(['a', 'b'])) + + def test_retry_requires_and_provides_same_value(self): + self.assertRaises(exceptions.InvariantViolation, + retry.AlwaysRevert, + 'rt', requires=['x', 'y'], provides=['x', 'y']) + + def test_retry_in_unordered_flow_no_requirements_no_provides(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt')) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set()) + + def test_retry_in_unordered_flow_with_requirements(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x', 'y'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set()) + + def test_retry_in_unordered_flow_with_provides(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x', 'y'])) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set(['x', 'y'])) + + def test_retry_in_unordered_flow_requires_and_provides(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set(['a', 'b'])) + + def test_retry_in_graph_flow_no_requirements_no_provides(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt')) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set()) + + def test_retry_in_graph_flow_with_requirements(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', requires=['x', 'y'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set()) + + def test_retry_in_graph_flow_with_provides(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', provides=['x', 'y'])) + self.assertEqual(flow.requires, set()) + self.assertEqual(flow.provides, set(['x', 'y'])) + + def test_retry_in_graph_flow_requires_and_provides(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + self.assertEqual(flow.requires, set(['x', 'y'])) + self.assertEqual(flow.provides, set(['a', 'b'])) + + def test_linear_flow_retry_and_task(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'], + provides=['z'])) + + self.assertEqual(flow.requires, set(['x', 'y', 'c'])) + self.assertEqual(flow.provides, set(['a', 'b', 'z'])) + + def test_linear_flow_retry_and_task_dependency_conflict(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', requires=['x'])) + self.assertRaises(exceptions.InvariantViolation, + flow.add, + utils.TaskOneReturn(provides=['x'])) + + def test_linear_flow_retry_and_task_provide_same_value(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt', provides=['x'])) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn('t1', provides=['x'])) + + def test_unordered_flow_retry_and_task(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'], + provides=['z'])) + + self.assertEqual(flow.requires, set(['x', 'y', 'c'])) + self.assertEqual(flow.provides, set(['a', 'b', 'z'])) + + def test_unordered_flow_retry_and_task_dependency_conflict(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x'])) + self.assertRaises(exceptions.InvariantViolation, + flow.add, + utils.TaskOneReturn(provides=['x'])) + + def test_unordered_flow_retry_and_task_provide_same_value(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x'])) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn('t1', provides=['x'])) + + def test_unordered_flow_retry_two_tasks_provide_same_value(self): + flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y'])) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn('t1', provides=['x']), + utils.TaskOneReturn('t2', provides=['x'])) + + def test_graph_flow_retry_and_task(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', + requires=['x', 'y'], + provides=['a', 'b'])) + flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'], + provides=['z'])) + + self.assertEqual(flow.requires, set(['x', 'y', 'c'])) + self.assertEqual(flow.provides, set(['a', 'b', 'z'])) + + def test_graph_flow_retry_and_task_dependency_conflict(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', requires=['x'])) + self.assertRaises(exceptions.InvariantViolation, + flow.add, + utils.TaskOneReturn(provides=['x'])) + + def test_graph_flow_retry_and_task_provide_same_value(self): + flow = gf.Flow('gf', retry.AlwaysRevert('rt', provides=['x'])) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + utils.TaskOneReturn('t1', provides=['x'])) + + def test_two_retries_provide_same_values_in_nested_flows(self): + flow = lf.Flow('lf', retry.AlwaysRevert('rt1', provides=['x'])) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + lf.Flow('lf1', retry.AlwaysRevert('rt2', + provides=['x']))) + + def test_two_retries_provide_same_values(self): + flow = lf.Flow('lf').add( + lf.Flow('lf1', retry.AlwaysRevert('rt1', provides=['x']))) + self.assertRaises(exceptions.DependencyFailure, + flow.add, + lf.Flow('lf2', retry.AlwaysRevert('rt2', + provides=['x'])))