diff --git a/taskflow/flow.py b/taskflow/flow.py index d94daee2..98b3c49f 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -38,13 +38,6 @@ class Flow(object): a flow is just a 'structuring' concept this is typically a behavior that should not be worried about (as it is not visible to the user), but it is worth mentioning here. - - Flows are expected to provide the following methods/properties: - - - add - - __len__ - - requires - - provides """ def __init__(self, name, retry=None): @@ -77,6 +70,21 @@ class Flow(object): def __len__(self): """Returns how many items are in this flow.""" + @abc.abstractmethod + def __iter__(self): + """Iterates over the children of the flow.""" + + @abc.abstractmethod + def iter_links(self): + """Iterates over dependency links between children of the flow. + + Iterates over 3-tuples ``(A, B, meta)``, where + * ``A`` is a child (atom or subflow) link starts from; + * ``B`` is a child (atom or subflow) link points to; it is + said that ``B`` depends on ``A`` or ``B`` requires ``A``; + * ``meta`` is link metadata, a dictionary. + """ + def __str__(self): lines = ["%s: %s" % (reflection.get_class_name(self), self.name)] lines.append("%s" % (len(self))) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 5eb73ca8..a835e0b3 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -153,13 +153,25 @@ class Flow(flow.Flow): self._swap(tmp_graph) return self + def _get_subgraph(self): + """Get the active subgraph of _graph. + + Descendants may override this to make only part of self._graph + visible. + """ + return self._graph + def __len__(self): - return self.graph.number_of_nodes() + return self._get_subgraph().number_of_nodes() def __iter__(self): - for n in self.graph.nodes_iter(): + for n in self._get_subgraph().nodes_iter(): yield n + def iter_links(self): + for (u, v, e_data) in self._get_subgraph().edges_iter(data=True): + yield (u, v, e_data) + @property def provides(self): provides = set() @@ -176,10 +188,6 @@ class Flow(flow.Flow): requires.update(subflow.requires) return requires - self.provides - @property - def graph(self): - return self._graph - class TargetedFlow(Flow): """Graph flow with a target. @@ -227,8 +235,7 @@ class TargetedFlow(Flow): self._subgraph = None return self - @property - def graph(self): + def _get_subgraph(self): if self._subgraph is not None: return self._subgraph if self._target is None: diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 5ffd2116..89fab08a 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -18,6 +18,10 @@ from taskflow import exceptions from taskflow import flow +# TODO(imelnikov): add metadata describing link here +_LINK_METADATA = dict() + + class Flow(flow.Flow): """Linear Flow pattern. @@ -71,6 +75,11 @@ class Flow(flow.Flow): for child in self._children: yield child + def iter_links(self): + for src, dst in zip(self._children[:-1], + self._children[1:]): + yield (src, dst, _LINK_METADATA.copy()) + def __getitem__(self, index): return self._children[index] diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index efa226ea..32a21c1c 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -102,3 +102,8 @@ class Flow(flow.Flow): def __iter__(self): for child in self._children: yield child + + def iter_links(self): + # NOTE(imelnikov): children in unordered flow have no dependencies + # betwean each other by construction. + return iter(()) diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 7aac75c9..4618c24d 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -54,11 +54,12 @@ class GraphFlowTest(test.TestCase): provides=[], requires=['c']) wf.add(test_1, test_2, test_3) - self.assertTrue(wf.graph.has_edge(test_1, test_2)) - self.assertTrue(wf.graph.has_edge(test_2, test_3)) - self.assertEqual(3, len(wf.graph)) - self.assertEqual([test_1], list(gu.get_no_predecessors(wf.graph))) - self.assertEqual([test_3], list(gu.get_no_successors(wf.graph))) + self.assertEqual(3, len(wf)) + + edges = [(src, dst) for src, dst, _meta in wf.iter_links()] + self.assertIn((test_1, test_2), edges) + self.assertIn((test_2, test_3), edges) + self.assertEqual(2, len(edges)) def test_basic_edge_reasons(self): wf = gw.Flow("the-test-action") @@ -69,17 +70,17 @@ class GraphFlowTest(test.TestCase): provides=['c'], requires=['a', 'b']) wf.add(test_1, test_2) - self.assertTrue(wf.graph.has_edge(test_1, test_2)) + edges = list(wf.iter_links()) + self.assertEqual(len(edges), 1) + + from_task, to_task, edge_attrs = edges[0] + self.assertIs(from_task, test_1) + self.assertIs(to_task, test_2) - edge_attrs = gu.get_edge_attrs(wf.graph, test_1, test_2) self.assertTrue(len(edge_attrs) > 0) self.assertIn('reasons', edge_attrs) self.assertEqual(set(['a', 'b']), edge_attrs['reasons']) - # 2 -> 1 should not be linked, and therefore have no attrs - no_edge_attrs = gu.get_edge_attrs(wf.graph, test_2, test_1) - self.assertFalse(no_edge_attrs) - def test_linked_edge_reasons(self): wf = gw.Flow("the-test-action") test_1 = utils.ProvidesRequiresTask('test-1', @@ -89,11 +90,16 @@ class GraphFlowTest(test.TestCase): provides=[], requires=[]) wf.add(test_1, test_2) - self.assertFalse(wf.graph.has_edge(test_1, test_2)) - wf.link(test_1, test_2) - self.assertTrue(wf.graph.has_edge(test_1, test_2)) + self.assertEqual(len(list(wf.iter_links())), 0) + + wf.link(test_1, test_2) + edges = list(wf.iter_links()) + self.assertEqual(len(edges), 1) + + from_task, to_task, edge_attrs = edges[0] + self.assertIs(from_task, test_1) + self.assertIs(to_task, test_2) - edge_attrs = gu.get_edge_attrs(wf.graph, test_1, test_2) self.assertTrue(len(edge_attrs) > 0) self.assertTrue(edge_attrs.get('manual')) @@ -176,11 +182,11 @@ class TargetedGraphFlowTest(test.TestCase): provides=[], requires=['a']) wf.add(test_1) wf.set_target(test_1) - self.assertEqual(1, len(wf.graph)) + self.assertEqual(1, len(wf)) test_2 = utils.ProvidesRequiresTask('test-2', provides=['a'], requires=[]) wf.add(test_2) - self.assertEqual(2, len(wf.graph)) + self.assertEqual(2, len(wf)) def test_recache_on_add_no_deps(self): wf = gw.TargetedFlow("test") @@ -188,11 +194,11 @@ class TargetedGraphFlowTest(test.TestCase): provides=[], requires=[]) wf.add(test_1) wf.set_target(test_1) - self.assertEqual(1, len(wf.graph)) + self.assertEqual(1, len(wf)) test_2 = utils.ProvidesRequiresTask('test-2', provides=[], requires=[]) wf.add(test_2) - self.assertEqual(1, len(wf.graph)) + self.assertEqual(1, len(wf)) def test_recache_on_link(self): wf = gw.TargetedFlow("test") @@ -202,7 +208,8 @@ class TargetedGraphFlowTest(test.TestCase): provides=[], requires=[]) wf.add(test_1, test_2) wf.set_target(test_1) - self.assertEqual(1, len(wf.graph)) + self.assertEqual(1, len(wf)) wf.link(test_2, test_1) - self.assertEqual(2, len(wf.graph)) - self.assertEqual([(test_2, test_1)], list(wf.graph.edges())) + self.assertEqual(2, len(wf)) + edges = [(src, dst) for src, dst, _meta in wf.iter_links()] + self.assertEqual([(test_2, test_1)], edges) diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py index 7fdb103d..36c5149e 100644 --- a/taskflow/utils/flow_utils.py +++ b/taskflow/utils/flow_utils.py @@ -21,15 +21,13 @@ import networkx as nx from taskflow import exceptions from taskflow import flow -from taskflow.patterns import graph_flow as gf -from taskflow.patterns import linear_flow as lf -from taskflow.patterns import unordered_flow as uf from taskflow import retry from taskflow import task from taskflow.utils import graph_utils as gu from taskflow.utils import lock_utils as lu from taskflow.utils import misc + LOG = logging.getLogger(__name__) # Use the 'flatten' attribute as the need to add an edge here, which is useful @@ -78,12 +76,8 @@ class Flattener(object): def _find_flattener(self, item): """Locates the flattening function to use to flatten the given item.""" - if isinstance(item, lf.Flow): - return self._flatten_linear - elif isinstance(item, uf.Flow): - return self._flatten_unordered - elif isinstance(item, gf.Flow): - return self._flatten_graph + if isinstance(item, flow.Flow): + return self._flatten_flow elif isinstance(item, task.BaseTask): return self._flatten_task elif isinstance(item, retry.Retry): @@ -107,43 +101,13 @@ class Flattener(object): if n != retry and 'retry' not in graph.node[n]: graph.add_node(n, {'retry': retry}) - def _flatten_linear(self, flow): - """Flattens a linear flow.""" - graph = nx.DiGraph(name=flow.name) - previous_nodes = [] - for item in flow: - subgraph = self._flatten(item) - graph = gu.merge_graphs([graph, subgraph]) - # Find nodes that have no predecessor, make them have a predecessor - # of the previous nodes so that the linearity ordering is - # maintained. Find the ones with no successors and use this list - # to connect the next subgraph (if any). - self._add_new_edges(graph, - previous_nodes, - list(gu.get_no_predecessors(subgraph))) - # There should always be someone without successors, otherwise we - # have a cycle A -> B -> A situation, which should not be possible. - previous_nodes = list(gu.get_no_successors(subgraph)) - return graph - - def _flatten_unordered(self, flow): - """Flattens a unordered flow.""" - graph = nx.DiGraph(name=flow.name) - for item in flow: - # NOTE(harlowja): we do *not* connect the graphs together, this - # retains that each item (translated to subgraph) is disconnected - # from each other which will result in unordered execution while - # running. - graph = gu.merge_graphs([graph, self._flatten(item)]) - return graph - def _flatten_task(self, task): """Flattens a individual task.""" graph = nx.DiGraph(name=task.name) graph.add_node(task) return graph - def _flatten_graph(self, flow): + def _flatten_flow(self, flow): """Flattens a graph flow.""" graph = nx.DiGraph(name=flow.name) # Flatten all nodes into a single subgraph per node. @@ -153,15 +117,15 @@ class Flattener(object): subgraph_map[item] = subgraph graph = gu.merge_graphs([graph, subgraph]) # Reconnect all node edges to there corresponding subgraphs. - for (u, v) in flow.graph.edges_iter(): - # Retain and update the original edge attributes. - u_v_attrs = gu.get_edge_attrs(flow.graph, u, v) + for (u, v, u_v_attrs) in flow.iter_links(): # Connect the ones with no predecessors in v to the ones with no # successors in u (thus maintaining the edge dependency). self._add_new_edges(graph, list(gu.get_no_successors(subgraph_map[u])), list(gu.get_no_predecessors(subgraph_map[v])), edge_attrs=u_v_attrs) + if flow.retry is not None: + self._connect_retry(flow.retry, graph) return graph def _pre_item_flatten(self, item): @@ -174,8 +138,6 @@ class Flattener(object): def _post_item_flatten(self, item, graph): """Called before a item is flattened; any post-flattening actions.""" - if isinstance(item, flow.Flow) and item.retry: - self._connect_retry(item.retry, graph) LOG.debug("Finished flattening '%s'", item) # NOTE(harlowja): this one can be expensive to calculate (especially # the cycle detection), so only do it if we know debugging is enabled