Add retry to Flow patterns

Implement dependency checks when adding items to flows:
- retry can't be depended on tasks from it's subflow.
- retry can't provide same values as tasks or other retries

Change-Id: Iffa8e673fd2de39407ae22cd38ad523d484cbba7
This commit is contained in:
Anastasia Karpinska
2013-12-26 09:16:10 +02:00
parent c698f848d8
commit 1222bda50d
5 changed files with 212 additions and 9 deletions

View File

@@ -47,14 +47,32 @@ class Flow(object):
- provides - provides
""" """
def __init__(self, name): def __init__(self, name, retry=None):
self._name = six.text_type(name) self._name = six.text_type(name)
self._retry = retry
# If retry doesn't have a name,
# the name of its owner will be assigned
if self._retry:
self._retry_provides = self.retry.provides
self._retry_requires = self.retry.requires
if not self._retry.name:
self._retry.set_name(self.name + "_retry")
else:
self._retry_provides = set()
self._retry_requires = set()
@property @property
def name(self): def name(self):
"""A non-unique name for this flow (human readable).""" """A non-unique name for this flow (human readable)."""
return self._name return self._name
@property
def retry(self):
"""A retry object that will affect control how (and if) this flow
retries while execution is underway.
"""
return self._retry
@abc.abstractmethod @abc.abstractmethod
def __len__(self): def __len__(self):
"""Returns how many items are in this flow.""" """Returns how many items are in this flow."""

View File

@@ -37,8 +37,8 @@ class Flow(flow.Flow):
Note: Cyclic dependencies are not allowed. Note: Cyclic dependencies are not allowed.
""" """
def __init__(self, name): def __init__(self, name, retry=None):
super(Flow, self).__init__(name) super(Flow, self).__init__(name, retry)
self._graph = nx.freeze(nx.DiGraph()) self._graph = nx.freeze(nx.DiGraph())
def _validate(self, graph=None): def _validate(self, graph=None):
@@ -107,6 +107,11 @@ class Flow(flow.Flow):
for value in node.provides: for value in node.provides:
provided[value] = node provided[value] = node
if self.retry:
update_requirements(self.retry)
provided.update(dict((k,
self.retry) for k in self._retry_provides))
# NOTE(harlowja): Add items and edges to a temporary copy of the # NOTE(harlowja): Add items and edges to a temporary copy of the
# underlying graph and only if that is successful added to do we then # underlying graph and only if that is successful added to do we then
# swap with the underlying graph. # swap with the underlying graph.
@@ -123,6 +128,15 @@ class Flow(flow.Flow):
% dict(item=item.name, % dict(item=item.name,
flow=provided[value].name, flow=provided[value].name,
value=value)) value=value))
if value in self._retry_requires:
raise exc.InvariantViolation(
"Flows retry controller %(retry)s requires %(value)s "
"but item %(item)s being added to the flow produces "
"that item, this creates a cyclic dependency and is "
"disallowed"
% dict(item=item.name,
retry=self.retry.name,
value=value))
provided[value] = item provided[value] = item
for value in item.requires: for value in item.requires:
@@ -149,6 +163,7 @@ class Flow(flow.Flow):
@property @property
def provides(self): def provides(self):
provides = set() provides = set()
provides.update(self._retry_provides)
for subflow in self: for subflow in self:
provides.update(subflow.provides) provides.update(subflow.provides)
return provides return provides
@@ -156,6 +171,7 @@ class Flow(flow.Flow):
@property @property
def requires(self): def requires(self):
requires = set() requires = set()
requires.update(self._retry_requires)
for subflow in self: for subflow in self:
requires.update(subflow.requires) requires.update(subflow.requires)
return requires - self.provides return requires - self.provides

View File

@@ -29,8 +29,8 @@ class Flow(flow.Flow):
depend on outputs (provided names/values) of tasks/flows that follow it. depend on outputs (provided names/values) of tasks/flows that follow it.
""" """
def __init__(self, name): def __init__(self, name, retry=None):
super(Flow, self).__init__(name) super(Flow, self).__init__(name, retry)
self._children = [] self._children = []
def add(self, *items): def add(self, *items):
@@ -77,6 +77,7 @@ class Flow(flow.Flow):
@property @property
def provides(self): def provides(self):
provides = set() provides = set()
provides.update(self._retry_provides)
for subflow in self._children: for subflow in self._children:
provides.update(subflow.provides) provides.update(subflow.provides)
return provides return provides
@@ -85,6 +86,8 @@ class Flow(flow.Flow):
def requires(self): def requires(self):
requires = set() requires = set()
provides = set() provides = set()
requires.update(self._retry_requires)
provides.update(self._retry_provides)
for subflow in self._children: for subflow in self._children:
requires.update(subflow.requires - provides) requires.update(subflow.requires - provides)
provides.update(subflow.provides) provides.update(subflow.provides)

View File

@@ -29,8 +29,8 @@ class Flow(flow.Flow):
task/flow outputs (provided names/values). task/flow outputs (provided names/values).
""" """
def __init__(self, name): def __init__(self, name, retry=None):
super(Flow, self).__init__(name) super(Flow, self).__init__(name, retry)
# NOTE(imelnikov): A unordered flow is unordered, so we use # NOTE(imelnikov): A unordered flow is unordered, so we use
# set instead of list to save children, children so that # set instead of list to save children, children so that
# people using it don't depend on the ordering # people using it don't depend on the ordering
@@ -43,7 +43,10 @@ class Flow(flow.Flow):
# NOTE(harlowja): check that items to be added are actually # NOTE(harlowja): check that items to be added are actually
# independent. # independent.
provides = self.provides provides = set()
for subflow in self:
provides.update(subflow.provides)
old_requires = self.requires old_requires = self.requires
for item in items: for item in items:
item_provides = item.provides item_provides = item.provides
@@ -54,7 +57,7 @@ class Flow(flow.Flow):
"by other item(s) of unordered flow %(flow)s" "by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name, % dict(item=item.name, flow=self.name,
oo=sorted(bad_provs))) oo=sorted(bad_provs)))
same_provides = provides & item.provides same_provides = (provides | self._retry_provides) & item.provides
if same_provides: if same_provides:
raise exceptions.DependencyFailure( raise exceptions.DependencyFailure(
"%(item)s provides %(value)s but is already being" "%(item)s provides %(value)s but is already being"
@@ -79,6 +82,7 @@ class Flow(flow.Flow):
@property @property
def provides(self): def provides(self):
provides = set() provides = set()
provides.update(self._retry_provides)
for subflow in self: for subflow in self:
provides.update(subflow.provides) provides.update(subflow.provides)
return provides return provides
@@ -88,6 +92,8 @@ class Flow(flow.Flow):
requires = set() requires = set()
for subflow in self: for subflow in self:
requires.update(subflow.requires) requires.update(subflow.requires)
requires.update(self._retry_requires)
requires -= self._retry_provides
return requires return requires
def __len__(self): def __len__(self):

View File

@@ -19,6 +19,7 @@ from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import exceptions from taskflow import exceptions
from taskflow import retry
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
@@ -281,3 +282,162 @@ class FlowDependenciesTest(test.TestCase):
utils.TaskOneArgOneReturn, utils.TaskOneArgOneReturn,
requires='a', requires='a',
provides='a') provides='a')
def test_retry_in_linear_flow_no_requirements_no_provides(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt'))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set())
def test_retry_in_linear_flow_with_requirements(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt', requires=['x', 'y']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set())
def test_retry_in_linear_flow_with_provides(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt', provides=['x', 'y']))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set(['x', 'y']))
def test_retry_in_linear_flow_requires_and_provides(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set(['a', 'b']))
def test_retry_requires_and_provides_same_value(self):
self.assertRaises(exceptions.InvariantViolation,
retry.AlwaysRevert,
'rt', requires=['x', 'y'], provides=['x', 'y'])
def test_retry_in_unordered_flow_no_requirements_no_provides(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt'))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set())
def test_retry_in_unordered_flow_with_requirements(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x', 'y']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set())
def test_retry_in_unordered_flow_with_provides(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x', 'y']))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set(['x', 'y']))
def test_retry_in_unordered_flow_requires_and_provides(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set(['a', 'b']))
def test_retry_in_graph_flow_no_requirements_no_provides(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt'))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set())
def test_retry_in_graph_flow_with_requirements(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt', requires=['x', 'y']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set())
def test_retry_in_graph_flow_with_provides(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt', provides=['x', 'y']))
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set(['x', 'y']))
def test_retry_in_graph_flow_requires_and_provides(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
self.assertEqual(flow.requires, set(['x', 'y']))
self.assertEqual(flow.provides, set(['a', 'b']))
def test_linear_flow_retry_and_task(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'],
provides=['z']))
self.assertEqual(flow.requires, set(['x', 'y', 'c']))
self.assertEqual(flow.provides, set(['a', 'b', 'z']))
def test_linear_flow_retry_and_task_dependency_conflict(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt', requires=['x']))
self.assertRaises(exceptions.InvariantViolation,
flow.add,
utils.TaskOneReturn(provides=['x']))
def test_linear_flow_retry_and_task_provide_same_value(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']))
def test_unordered_flow_retry_and_task(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'],
provides=['z']))
self.assertEqual(flow.requires, set(['x', 'y', 'c']))
self.assertEqual(flow.provides, set(['a', 'b', 'z']))
def test_unordered_flow_retry_and_task_dependency_conflict(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x']))
self.assertRaises(exceptions.InvariantViolation,
flow.add,
utils.TaskOneReturn(provides=['x']))
def test_unordered_flow_retry_and_task_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']))
def test_unordered_flow_retry_two_tasks_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']),
utils.TaskOneReturn('t2', provides=['x']))
def test_graph_flow_retry_and_task(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt',
requires=['x', 'y'],
provides=['a', 'b']))
flow.add(utils.TaskMultiArgOneReturn(rebind=['a', 'x', 'c'],
provides=['z']))
self.assertEqual(flow.requires, set(['x', 'y', 'c']))
self.assertEqual(flow.provides, set(['a', 'b', 'z']))
def test_graph_flow_retry_and_task_dependency_conflict(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt', requires=['x']))
self.assertRaises(exceptions.InvariantViolation,
flow.add,
utils.TaskOneReturn(provides=['x']))
def test_graph_flow_retry_and_task_provide_same_value(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']))
def test_two_retries_provide_same_values_in_nested_flows(self):
flow = lf.Flow('lf', retry.AlwaysRevert('rt1', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
lf.Flow('lf1', retry.AlwaysRevert('rt2',
provides=['x'])))
def test_two_retries_provide_same_values(self):
flow = lf.Flow('lf').add(
lf.Flow('lf1', retry.AlwaysRevert('rt1', provides=['x'])))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
lf.Flow('lf2', retry.AlwaysRevert('rt2',
provides=['x'])))