Add retry to execution graph

Change-Id: I8d9a608257fee66c462bd45150383774a93e90b0
This commit is contained in:
Anastasia Karpinska
2014-01-08 16:30:42 +02:00
parent 1222bda50d
commit 030554d86c
2 changed files with 126 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import retry
from taskflow import test
from taskflow.tests import utils as t_utils
@@ -183,3 +184,110 @@ class FlattenTest(test.TestCase):
self.assertRaisesRegexp(exc.InvariantViolation,
'^Tasks with duplicate names',
f_utils.flatten, flo)
def test_flatten_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
g = f_utils.flatten(flo)
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
self.assertEqual(set([c1]),
set(g_utils.get_no_predecessors(g)))
self.assertEqual(set([c2]),
set(g_utils.get_no_successors(g)))
def test_flatten_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = lf.Flow("test", c).add(a, b)
g = f_utils.flatten(flo)
self.assertEqual(3, len(g))
self.assertEqual(2, g.number_of_edges())
self.assertEqual(set([c]),
set(g_utils.get_no_predecessors(g)))
self.assertEqual(set([b]),
set(g_utils.get_no_successors(g)))
self.assertEqual(c, g.node[a]['retry'])
self.assertEqual(c, g.node[b]['retry'])
def test_flatten_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = uf.Flow("test", c).add(a, b)
g = f_utils.flatten(flo)
self.assertEqual(3, len(g))
self.assertEqual(2, g.number_of_edges())
self.assertEqual(set([c]),
set(g_utils.get_no_predecessors(g)))
self.assertEqual(set([a, b]),
set(g_utils.get_no_successors(g)))
self.assertEqual(c, g.node[a]['retry'])
self.assertEqual(c, g.node[b]['retry'])
def test_flatten_retry_in_graph_flow_with_tasks(self):
c = retry.AlwaysRevert("cp")
a, b, d = _make_many(3)
flo = gf.Flow("test", c).add(a, b, d).link(b, d)
g = f_utils.flatten(flo)
self.assertEqual(4, len(g))
self.assertEqual(3, g.number_of_edges())
self.assertEqual(set([c]),
set(g_utils.get_no_predecessors(g)))
self.assertEqual(set([a, d]),
set(g_utils.get_no_successors(g)))
self.assertEqual(c, g.node[a]['retry'])
self.assertEqual(c, g.node[b]['retry'])
self.assertEqual(c, g.node[d]['retry'])
def test_flatten_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = _make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test", c2).add(b, c),
d)
g = f_utils.flatten(flo)
self.assertEqual(6, len(g))
self.assertEqual(5, g.number_of_edges())
self.assertEqual(c1, g.node[a]['retry'])
self.assertEqual(c1, g.node[d]['retry'])
self.assertEqual(c2, g.node[b]['retry'])
self.assertEqual(c2, g.node[c]['retry'])
self.assertEqual(c1, g.node[c2]['retry'])
self.assertEqual(None, g.node[c1].get('retry'))
def test_flatten_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = _make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test").add(b, c),
d)
g = f_utils.flatten(flo)
self.assertEqual(5, len(g))
self.assertEqual(4, g.number_of_edges())
self.assertEqual(c1, g.node[a]['retry'])
self.assertEqual(c1, g.node[d]['retry'])
self.assertEqual(c1, g.node[b]['retry'])
self.assertEqual(c1, g.node[c]['retry'])
self.assertEqual(None, g.node[c1].get('retry'))

View File

@@ -20,6 +20,7 @@ import threading
import networkx as nx
from taskflow import exceptions
from taskflow import flow
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
@@ -87,6 +88,21 @@ class Flattener(object):
else:
return None
def _connect_retry(self, retry, graph):
graph.add_node(retry)
# All graph nodes that has not predecessors should be depended on its
# retry
for n in gu.get_no_predecessors(graph):
if n != retry:
# modified that the same copy isn't modified.
graph.add_edge(retry, n, FLATTEN_EDGE_DATA.copy())
# Add link to retry for each node of subgraph that hasn't
# a parent retry
for n in graph.nodes_iter():
if n != retry and 'retry' not in graph.node[n]:
graph.add_node(n, {'retry': retry})
def _flatten_linear(self, flow):
"""Flattens a linear flow."""
graph = nx.DiGraph(name=flow.name)
@@ -154,6 +170,8 @@ class Flattener(object):
def _post_item_flatten(self, item, graph):
"""Called before a item is flattened; any post-flattening actions."""
if isinstance(item, flow.Flow) and item.retry:
self._connect_retry(item.retry, graph)
LOG.debug("Finished flattening '%s'", item)
# NOTE(harlowja): this one can be expensive to calculate (especially
# the cycle detection), so only do it if we know debugging is enabled