From 030554d86c95a5016b87457155d357f953b507cb Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Wed, 8 Jan 2014 16:30:42 +0200 Subject: [PATCH] Add retry to execution graph Change-Id: I8d9a608257fee66c462bd45150383774a93e90b0 --- taskflow/tests/unit/test_flattening.py | 108 +++++++++++++++++++++++++ taskflow/utils/flow_utils.py | 18 +++++ 2 files changed, 126 insertions(+) diff --git a/taskflow/tests/unit/test_flattening.py b/taskflow/tests/unit/test_flattening.py index becbd738..4e948d9d 100644 --- a/taskflow/tests/unit/test_flattening.py +++ b/taskflow/tests/unit/test_flattening.py @@ -22,6 +22,7 @@ from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf +from taskflow import retry from taskflow import test from taskflow.tests import utils as t_utils @@ -183,3 +184,110 @@ class FlattenTest(test.TestCase): self.assertRaisesRegexp(exc.InvariantViolation, '^Tasks with duplicate names', f_utils.flatten, flo) + + def test_flatten_retry_in_linear_flow(self): + flo = lf.Flow("test", retry.AlwaysRevert("c")) + g = f_utils.flatten(flo) + self.assertEqual(1, len(g)) + self.assertEqual(0, g.number_of_edges()) + + def test_flatten_retry_in_unordered_flow(self): + flo = uf.Flow("test", retry.AlwaysRevert("c")) + g = f_utils.flatten(flo) + self.assertEqual(1, len(g)) + self.assertEqual(0, g.number_of_edges()) + + def test_flatten_retry_in_graph_flow(self): + flo = gf.Flow("test", retry.AlwaysRevert("c")) + g = f_utils.flatten(flo) + self.assertEqual(1, len(g)) + self.assertEqual(0, g.number_of_edges()) + + def test_flatten_retry_in_nested_flows(self): + c1 = retry.AlwaysRevert("c1") + c2 = retry.AlwaysRevert("c2") + flo = lf.Flow("test", c1).add(lf.Flow("test2", c2)) + g = f_utils.flatten(flo) + self.assertEqual(2, len(g)) + self.assertEqual(1, g.number_of_edges()) + self.assertEqual(set([c1]), + set(g_utils.get_no_predecessors(g))) + self.assertEqual(set([c2]), + set(g_utils.get_no_successors(g))) + + def test_flatten_retry_in_linear_flow_with_tasks(self): + c = retry.AlwaysRevert("c") + a, b = _make_many(2) + flo = lf.Flow("test", c).add(a, b) + g = f_utils.flatten(flo) + self.assertEqual(3, len(g)) + self.assertEqual(2, g.number_of_edges()) + self.assertEqual(set([c]), + set(g_utils.get_no_predecessors(g))) + self.assertEqual(set([b]), + set(g_utils.get_no_successors(g))) + self.assertEqual(c, g.node[a]['retry']) + self.assertEqual(c, g.node[b]['retry']) + + def test_flatten_retry_in_unordered_flow_with_tasks(self): + c = retry.AlwaysRevert("c") + a, b = _make_many(2) + flo = uf.Flow("test", c).add(a, b) + g = f_utils.flatten(flo) + self.assertEqual(3, len(g)) + self.assertEqual(2, g.number_of_edges()) + self.assertEqual(set([c]), + set(g_utils.get_no_predecessors(g))) + self.assertEqual(set([a, b]), + set(g_utils.get_no_successors(g))) + self.assertEqual(c, g.node[a]['retry']) + self.assertEqual(c, g.node[b]['retry']) + + def test_flatten_retry_in_graph_flow_with_tasks(self): + c = retry.AlwaysRevert("cp") + a, b, d = _make_many(3) + flo = gf.Flow("test", c).add(a, b, d).link(b, d) + g = f_utils.flatten(flo) + self.assertEqual(4, len(g)) + self.assertEqual(3, g.number_of_edges()) + self.assertEqual(set([c]), + set(g_utils.get_no_predecessors(g))) + self.assertEqual(set([a, d]), + set(g_utils.get_no_successors(g))) + self.assertEqual(c, g.node[a]['retry']) + self.assertEqual(c, g.node[b]['retry']) + self.assertEqual(c, g.node[d]['retry']) + + def test_flatten_retries_hierarchy(self): + c1 = retry.AlwaysRevert("cp1") + c2 = retry.AlwaysRevert("cp2") + a, b, c, d = _make_many(4) + flo = lf.Flow("test", c1).add( + a, + lf.Flow("test", c2).add(b, c), + d) + g = f_utils.flatten(flo) + self.assertEqual(6, len(g)) + self.assertEqual(5, g.number_of_edges()) + self.assertEqual(c1, g.node[a]['retry']) + self.assertEqual(c1, g.node[d]['retry']) + self.assertEqual(c2, g.node[b]['retry']) + self.assertEqual(c2, g.node[c]['retry']) + self.assertEqual(c1, g.node[c2]['retry']) + self.assertEqual(None, g.node[c1].get('retry')) + + def test_flatten_retry_subflows_hierarchy(self): + c1 = retry.AlwaysRevert("cp1") + a, b, c, d = _make_many(4) + flo = lf.Flow("test", c1).add( + a, + lf.Flow("test").add(b, c), + d) + g = f_utils.flatten(flo) + self.assertEqual(5, len(g)) + self.assertEqual(4, g.number_of_edges()) + self.assertEqual(c1, g.node[a]['retry']) + self.assertEqual(c1, g.node[d]['retry']) + self.assertEqual(c1, g.node[b]['retry']) + self.assertEqual(c1, g.node[c]['retry']) + self.assertEqual(None, g.node[c1].get('retry')) diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py index 21685d16..47abb53f 100644 --- a/taskflow/utils/flow_utils.py +++ b/taskflow/utils/flow_utils.py @@ -20,6 +20,7 @@ import threading import networkx as nx from taskflow import exceptions +from taskflow import flow from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf @@ -87,6 +88,21 @@ class Flattener(object): else: return None + def _connect_retry(self, retry, graph): + graph.add_node(retry) + # All graph nodes that has not predecessors should be depended on its + # retry + for n in gu.get_no_predecessors(graph): + if n != retry: + # modified that the same copy isn't modified. + graph.add_edge(retry, n, FLATTEN_EDGE_DATA.copy()) + + # Add link to retry for each node of subgraph that hasn't + # a parent retry + for n in graph.nodes_iter(): + if n != retry and 'retry' not in graph.node[n]: + graph.add_node(n, {'retry': retry}) + def _flatten_linear(self, flow): """Flattens a linear flow.""" graph = nx.DiGraph(name=flow.name) @@ -154,6 +170,8 @@ class Flattener(object): def _post_item_flatten(self, item, graph): """Called before a item is flattened; any post-flattening actions.""" + if isinstance(item, flow.Flow) and item.retry: + self._connect_retry(item.retry, graph) LOG.debug("Finished flattening '%s'", item) # NOTE(harlowja): this one can be expensive to calculate (especially # the cycle detection), so only do it if we know debugging is enabled