From eaf49950383865438dd5aaef05de7d86977403f5 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 17 Nov 2014 16:28:49 -0800 Subject: [PATCH] Properly handle and skip empty intermediary flows Instead of linking nodes which have elements to predecessors which do not we should search backwards through the prior predecessors and link to one that does have nodes; this ensures that we do not create bad workflows when empty flows are injected. Fixes bug 1392650 Change-Id: Ic362ef3400f9c77e60ed07b0097e3427b999d1cd --- taskflow/engines/action_engine/compiler.py | 280 +++++++++++++++--- taskflow/exceptions.py | 4 + .../tests/unit/action_engine/test_compile.py | 101 +++++++ 3 files changed, 339 insertions(+), 46 deletions(-) diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 38b48db65..a71b9d179 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import threading from taskflow import exceptions as exc @@ -43,39 +44,241 @@ class Compilation(object): @property def execution_graph(self): + """The execution ordering of atoms (as a graph structure).""" return self._execution_graph @property def hierarchy(self): + """The hierachy of patterns (as a tree structure).""" return self._hierarchy +def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None): + """Adds/updates edges from nodes to other nodes in the specified graph. + + It will connect the 'nodes_from' to the 'nodes_to' if an edge currently + does *not* exist (if it does already exist then the edges attributes + are just updated instead). When an edge is created the provided edge + attributes dictionary will be applied to the new edge between these two + nodes. + """ + # NOTE(harlowja): give each edge its own attr copy so that if it's + # later modified that the same copy isn't modified... + for u in nodes_from: + for v in nodes_to: + if not graph.has_edge(u, v): + if attr_dict: + graph.add_edge(u, v, attr_dict=attr_dict.copy()) + else: + graph.add_edge(u, v) + else: + # Just update the attr_dict (if any). + if attr_dict: + graph.add_edge(u, v, attr_dict=attr_dict.copy()) + + +class Linker(object): + """Compiler helper that adds pattern(s) constraints onto a graph.""" + + @staticmethod + def _is_not_empty(graph): + # Returns true if the given graph is *not* empty... + return graph.number_of_nodes() > 0 + + @staticmethod + def _find_first_decomposed(node, priors, + decomposed_members, decomposed_filter): + # How this works; traverse backwards and find only the predecessor + # items that are actually connected to this entity, and avoid any + # linkage that is not directly connected. This is guaranteed to be + # valid since we always iter_links() over predecessors before + # successors in all currently known patterns; a queue is used here + # since it is possible for a node to have 2+ different predecessors so + # we must search back through all of them in a reverse BFS order... + # + # Returns the first decomposed graph of those nodes (including the + # passed in node) that passes the provided filter + # function (returns none if none match). + frontier = collections.deque([node]) + # NOTE(harowja): None is in this initial set since the first prior in + # the priors list has None as its predecessor (which we don't want to + # look for a decomposed member of). + visited = set([None]) + while frontier: + node = frontier.popleft() + if node in visited: + continue + node_graph = decomposed_members[node] + if decomposed_filter(node_graph): + return node_graph + visited.add(node) + # TODO(harlowja): optimize this more to avoid searching through + # things already searched... + for (u, v) in reversed(priors): + if node == v: + # Queue its predecessor to be searched in the future... + frontier.append(u) + else: + return None + + def apply_constraints(self, graph, flow, decomposed_members): + # This list is used to track the links that have been previously + # iterated over, so that when we are trying to find a entry to + # connect to that we iterate backwards through this list, finding + # connected nodes to the current target (lets call it v) and find + # the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into + # a non-empty graph. We also retain all predecessors of v so that we + # can correctly locate u_n - 1 if u_n turns out to have decomposed into + # an empty graph (and so on). + priors = [] + # NOTE(harlowja): u, v are flows/tasks (also graph terminology since + # we are compiling things down into a flattened graph), the meaning + # of this link iteration via iter_links() is that u -> v (with the + # provided dictionary attributes, if any). + for (u, v, attr_dict) in flow.iter_links(): + if not priors: + priors.append((None, u)) + v_g = decomposed_members[v] + if not v_g.number_of_nodes(): + priors.append((u, v)) + continue + invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS) + if not invariant: + # This is a symbol *only* dependency, connect + # corresponding providers and consumers to allow the consumer + # to be executed immediately after the provider finishes (this + # is an optimization for these types of dependencies...) + u_g = decomposed_members[u] + if not u_g.number_of_nodes(): + # This must always exist, but incase it somehow doesn't... + raise exc.CompilationFailure( + "Non-invariant link being created from '%s' ->" + " '%s' even though the target '%s' was found to be" + " decomposed into an empty graph" % (v, u, u)) + for provider in u_g: + for consumer in v_g: + reasons = provider.provides & consumer.requires + if reasons: + graph.add_edge(provider, consumer, reasons=reasons) + else: + # Connect nodes with no predecessors in v to nodes with no + # successors in the *first* non-empty predecessor of v (thus + # maintaining the edge dependency). + match = self._find_first_decomposed(u, priors, + decomposed_members, + self._is_not_empty) + if match is not None: + _add_update_edges(graph, + match.no_successors_iter(), + list(v_g.no_predecessors_iter()), + attr_dict=attr_dict) + priors.append((u, v)) + + class PatternCompiler(object): - """Compiles a pattern (or task) into a compilation unit.""" + """Compiles a pattern (or task) into a compilation unit. + + Let's dive into the basic idea for how this works: + + The compiler here is provided a 'root' object via its __init__ method, + this object could be a task, or a flow (one of the supported patterns), + the end-goal is to produce a :py:class:`.Compilation` object as the result + with the needed components. If this is not possible a + :py:class:`~.taskflow.exceptions.CompilationFailure` will be raised (or + in the case where a unknown type is being requested to compile + a ``TypeError`` will be raised). + + The complexity of this comes into play when the 'root' is a flow that + contains itself other nested flows (and so-on); to compile this object and + its contained objects into a graph that *preserves* the constraints the + pattern mandates we have to go through a recursive algorithm that creates + subgraphs for each nesting level, and then on the way back up through + the recursion (now with a decomposed mapping from contained patterns or + atoms to there corresponding subgraph) we have to then connect the + subgraphs (and the atom(s) there-in) that were decomposed for a pattern + correctly into a new graph (using a :py:class:`.Linker` object to ensure + the pattern mandated constraints are retained) and then return to the + caller (and they will do the same thing up until the root node, which by + that point one graph is created with all contained atoms in the + pattern/nested patterns mandated ordering). + + Also maintained in the :py:class:`.Compilation` object is a hierarchy of + the nesting of items (which is also built up during the above mentioned + recusion, via a much simpler algorithm); this is typically used later to + determine the prior atoms of a given atom when looking up values that can + be provided to that atom for execution (see the scopes.py file for how this + works). Note that although you *could* think that the graph itself could be + used for this, which in some ways it can (for limited usage) the hierarchy + retains the nested structure (which is useful for scoping analysis/lookup) + to be able to provide back a iterator that gives back the scopes visible + at each level (the graph does not have this information once flattened). + + Let's take an example: + + Given the pattern ``f(a(b, c), d)`` where ``f`` is a + :py:class:`~taskflow.patterns.linear_flow.Flow` with items ``a(b, c)`` + where ``a`` is a :py:class:`~taskflow.patterns.linear_flow.Flow` composed + of tasks ``(b, c)`` and task ``d``. + + The algorithm that will be performed (mirroring the above described logic) + will go through the following steps (the tree hierachy building is left + out as that is more obvious):: + + Compiling f + - Decomposing flow f with no parent (must be the root) + - Compiling a + - Decomposing flow a with parent f + - Compiling b + - Decomposing task b with parent a + - Decomposed b into: + Name: b + Nodes: 1 + - b + Edges: 0 + - Compiling c + - Decomposing task c with parent a + - Decomposed c into: + Name: c + Nodes: 1 + - c + Edges: 0 + - Relinking decomposed b -> decomposed c + - Decomposed a into: + Name: a + Nodes: 2 + - b + - c + Edges: 1 + b -> c ({'invariant': True}) + - Compiling d + - Decomposing task d with parent f + - Decomposed d into: + Name: d + Nodes: 1 + - d + Edges: 0 + - Relinking decomposed a -> decomposed d + - Decomposed f into: + Name: f + Nodes: 3 + - c + - b + - d + Edges: 2 + c -> d ({'invariant': True}) + b -> c ({'invariant': True}) + """ def __init__(self, root, freeze=True): self._root = root self._history = set() + self._linker = Linker() self._freeze = freeze self._lock = threading.Lock() self._compilation = None - def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs): - """Adds new edges from nodes to other nodes in the specified graph. - - It will connect the nodes_from to the nodes_to if an edge currently - does *not* exist. When an edge is created the provided edge attributes - will be applied to the new edge between these two nodes. - """ - nodes_to = list(nodes_to) - for u in nodes_from: - for v in nodes_to: - if not graph.has_edge(u, v): - # NOTE(harlowja): give each edge its own attr copy so that - # if it's later modified that the same copy isn't modified. - graph.add_edge(u, v, attr_dict=edge_attrs.copy()) - def _flatten(self, item, parent): + """Flattens a item (pattern, task) into a graph + tree node.""" functor = self._find_flattener(item, parent) self._pre_item_flatten(item) graph, node = functor(item, parent) @@ -106,7 +309,9 @@ class PatternCompiler(object): # All nodes that have no predecessors should depend on this retry. nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry] - self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA) + if nodes_to: + _add_update_edges(graph, [retry], nodes_to, + attr_dict=_RETRY_EDGE_DATA) # Add association for each node of graph that has no existing retry. for n in graph.nodes_iter(): @@ -122,43 +327,26 @@ class PatternCompiler(object): parent.add(node) return graph, node - def _flatten_flow(self, flow, parent): - """Flattens a flow.""" + def _decompose_flow(self, flow, parent): + """Decomposes a flow into a graph, tree node + decomposed subgraphs.""" graph = gr.DiGraph(name=flow.name) node = tr.Node(flow) if parent is not None: parent.add(node) if flow.retry is not None: node.add(tr.Node(flow.retry)) - - # Flatten all nodes into a single subgraph per item (and track origin - # item to its newly expanded graph). - subgraphs = {} + decomposed_members = {} for item in flow: - subgraph = self._flatten(item, node)[0] - subgraphs[item] = subgraph - graph = gr.merge_graphs([graph, subgraph]) - - # Reconnect all items edges to their corresponding subgraphs. - for (u, v, attrs) in flow.iter_links(): - u_g = subgraphs[u] - v_g = subgraphs[v] - if any(attrs.get(k) for k in _EDGE_INVARIANTS): - # Connect nodes with no predecessors in v to nodes with - # no successors in u (thus maintaining the edge dependency). - self._add_new_edges(graph, - u_g.no_successors_iter(), - v_g.no_predecessors_iter(), - edge_attrs=attrs) - else: - # This is symbol dependency edge, connect corresponding - # providers and consumers. - for provider in u_g: - for consumer in v_g: - reasons = provider.provides & consumer.requires - if reasons: - graph.add_edge(provider, consumer, reasons=reasons) + subgraph, subnode = self._flatten(item, node) + decomposed_members[item] = subgraph + if subgraph.number_of_nodes(): + graph = gr.merge_graphs([graph, subgraph]) + return graph, node, decomposed_members + def _flatten_flow(self, flow, parent): + """Flattens a flow.""" + graph, node, decomposed_members = self._decompose_flow(flow, parent) + self._linker.apply_constraints(graph, flow, decomposed_members) if flow.retry is not None: self._connect_retry(flow.retry, graph) return graph, node diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 1c6bc9f16..c66327542 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -135,6 +135,10 @@ class MissingDependencies(DependencyFailure): self.missing_requirements = requirements +class CompilationFailure(TaskFlowException): + """Raised when some type of compilation issue is found.""" + + class IncompatibleVersion(TaskFlowException): """Raised when some type of version incompatibility is found.""" diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py index 63b3c0b03..a290c50be 100644 --- a/taskflow/tests/unit/action_engine/test_compile.py +++ b/taskflow/tests/unit/action_engine/test_compile.py @@ -264,6 +264,107 @@ class PatternCompileTest(test.TestCase): self.assertItemsEqual([b], g.no_predecessors_iter()) self.assertItemsEqual([a, c], g.no_successors_iter()) + def test_empty_flow_in_linear_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = gf.Flow("empty") + flow.add(a, empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertItemsEqual(g.edges(data=True), [ + (a, b, {'invariant': True}), + ]) + + def test_many_empty_in_graph_flow(self): + flow = gf.Flow('root') + + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + flow.add(a) + + b = lf.Flow('b') + b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[]) + b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[]) + b.add( + b_0, + lf.Flow('b.1'), lf.Flow('b.2'), + b_3, + ) + flow.add(b) + + c = lf.Flow('c') + c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2')) + flow.add(c) + + d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[]) + flow.add(d) + + flow.link(b, d) + flow.link(a, d) + flow.link(c, d) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertTrue(g.has_edge(b_0, b_3)) + self.assertTrue(g.has_edge(b_3, d)) + self.assertEqual(4, len(g)) + + def test_empty_flow_in_nested_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + + flow2 = lf.Flow("lf-2") + c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[]) + d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[]) + empty_flow = gf.Flow("empty") + flow2.add(c, empty_flow, d) + flow.add(a, flow2, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + + self.assertTrue(g.has_edge(a, c)) + self.assertTrue(g.has_edge(c, d)) + self.assertTrue(g.has_edge(d, b)) + + def test_empty_flow_in_graph_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=['a'], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['a']) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertTrue(g.has_edge(a, b)) + + def test_empty_flow_in_graph_flow_empty_linkage(self): + flow = gf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + flow.link(empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertEqual(0, len(g.edges())) + + def test_empty_flow_in_graph_flow_linkage(self): + flow = gf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + flow.link(a, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertEqual(1, len(g.edges())) + self.assertTrue(g.has_edge(a, b)) + def test_checks_for_dups(self): flo = gf.Flow("test").add( test_utils.DummyTask(name="a"),