From fd4772ca84a79c2bea45a6d8571a3644c4055f8a Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 5 Mar 2015 15:54:47 -0800 Subject: [PATCH] Specialize checking for overlaps When merging a child graph into it's parents graph we can specialize checking for overlaps to avoid some of overhead of the default subgraph algorithm (which builds a full graph, with edges) since we only care about the number of duplicate nodes (not needing a full subgraph with retained edges). Change-Id: Ib211460c58efca3ddb5a254da11aafe44716a639 --- taskflow/engines/action_engine/compiler.py | 12 +++- taskflow/tests/unit/test_types.py | 65 ++++++++++++++++++++++ taskflow/types/graph.py | 33 +++++++---- 3 files changed, 97 insertions(+), 13 deletions(-) diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 23e75bf7..6116cb01 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -220,6 +220,11 @@ class _FlowCompiler(object): if n is not retry and flow.LINK_RETRY not in graph.node[n]: graph.node[n][flow.LINK_RETRY] = retry + @staticmethod + def _occurence_detector(to_graph, from_graph): + return sum(1 for node in from_graph.nodes_iter() + if node in to_graph) + def _decompose_flow(self, flow, parent=None): """Decomposes a flow into a graph, tree node + decomposed subgraphs.""" graph = gr.DiGraph(name=flow.name) @@ -233,7 +238,12 @@ class _FlowCompiler(object): subgraph, _subnode = self._deep_compiler_func(item, parent=node) decomposed_members[item] = subgraph if subgraph.number_of_nodes(): - graph = gr.merge_graphs([graph, subgraph]) + graph = gr.merge_graphs( + graph, subgraph, + # We can specialize this to be simpler than the default + # algorithm which creates overhead that we don't + # need for our purposes... + overlap_detector=self._occurence_detector) return graph, node, decomposed_members def compile(self, flow, parent=None): diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index 7c05dd2c..baaca5b9 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -78,6 +78,71 @@ class GraphTest(test.TestCase): g.freeze() self.assertRaises(nx.NetworkXError, g.add_node, "c") + def test_merge(self): + g = graph.DiGraph() + g.add_node("a") + g.add_node("b") + + g2 = graph.DiGraph() + g2.add_node('c') + + g3 = graph.merge_graphs(g, g2) + self.assertEqual(3, len(g3)) + + def test_merge_edges(self): + g = graph.DiGraph() + g.add_node("a") + g.add_node("b") + g.add_edge('a', 'b') + + g2 = graph.DiGraph() + g2.add_node('c') + g2.add_node('d') + g2.add_edge('c', 'd') + + g3 = graph.merge_graphs(g, g2) + self.assertEqual(4, len(g3)) + self.assertTrue(g3.has_edge('c', 'd')) + self.assertTrue(g3.has_edge('a', 'b')) + + def test_overlap_detector(self): + g = graph.DiGraph() + g.add_node("a") + g.add_node("b") + g.add_edge('a', 'b') + + g2 = graph.DiGraph() + g2.add_node('a') + g2.add_node('d') + g2.add_edge('a', 'd') + + self.assertRaises(ValueError, + graph.merge_graphs, g, g2) + + def occurence_detector(to_graph, from_graph): + return sum(1 for node in from_graph.nodes_iter() + if node in to_graph) + + self.assertRaises(ValueError, + graph.merge_graphs, g, g2, + overlap_detector=occurence_detector) + + g3 = graph.merge_graphs(g, g2, allow_overlaps=True) + self.assertEqual(3, len(g3)) + self.assertTrue(g3.has_edge('a', 'b')) + self.assertTrue(g3.has_edge('a', 'd')) + + def test_invalid_detector(self): + g = graph.DiGraph() + g.add_node("a") + + g2 = graph.DiGraph() + g2.add_node('c') + + self.assertRaises(ValueError, + graph.merge_graphs, g, g2, + overlap_detector='b') + class TreeTest(test.TestCase): def _make_species(self): diff --git a/taskflow/types/graph.py b/taskflow/types/graph.py index 068a8e20..53eddba6 100644 --- a/taskflow/types/graph.py +++ b/taskflow/types/graph.py @@ -122,26 +122,35 @@ class DiGraph(nx.DiGraph): queue.append(pred_pred) -def merge_graphs(graphs, allow_overlaps=False): - """Merges a bunch of graphs into a single graph.""" - if not graphs: - return None - graph = graphs[0] - for g in graphs[1:]: +def merge_graphs(graph, *graphs, **kwargs): + """Merges a bunch of graphs into a new graph. + + If no additional graphs are provided the first graph is + returned unmodified otherwise the merged graph is returned. + """ + tmp_graph = graph + allow_overlaps = kwargs.get('allow_overlaps', False) + overlap_detector = kwargs.get('overlap_detector') + if overlap_detector is not None and not six.callable(overlap_detector): + raise ValueError("Overlap detection callback expected to be callable") + elif overlap_detector is None: + overlap_detector = (lambda to_graph, from_graph: + len(to_graph.subgraph(from_graph.nodes_iter()))) + for g in graphs: # This should ensure that the nodes to be merged do not already exist # in the graph that is to be merged into. This could be problematic if # there are duplicates. if not allow_overlaps: # Attempt to induce a subgraph using the to be merged graphs nodes # and see if any graph results. - overlaps = graph.subgraph(g.nodes_iter()) - if len(overlaps): + overlaps = overlap_detector(graph, g) + if overlaps: raise ValueError("Can not merge graph %s into %s since there " "are %s overlapping nodes (and we do not " "support merging nodes)" % (g, graph, - len(overlaps))) - # Keep the target graphs name. - name = graph.name + overlaps)) graph = nx.algorithms.compose(graph, g) - graph.name = name + # Keep the first graphs name. + if graphs: + graph.name = tmp_graph.name return graph