diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 38b48db65..a71b9d179 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import threading from taskflow import exceptions as exc @@ -43,39 +44,241 @@ class Compilation(object): @property def execution_graph(self): + """The execution ordering of atoms (as a graph structure).""" return self._execution_graph @property def hierarchy(self): + """The hierachy of patterns (as a tree structure).""" return self._hierarchy +def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None): + """Adds/updates edges from nodes to other nodes in the specified graph. + + It will connect the 'nodes_from' to the 'nodes_to' if an edge currently + does *not* exist (if it does already exist then the edges attributes + are just updated instead). When an edge is created the provided edge + attributes dictionary will be applied to the new edge between these two + nodes. + """ + # NOTE(harlowja): give each edge its own attr copy so that if it's + # later modified that the same copy isn't modified... + for u in nodes_from: + for v in nodes_to: + if not graph.has_edge(u, v): + if attr_dict: + graph.add_edge(u, v, attr_dict=attr_dict.copy()) + else: + graph.add_edge(u, v) + else: + # Just update the attr_dict (if any). + if attr_dict: + graph.add_edge(u, v, attr_dict=attr_dict.copy()) + + +class Linker(object): + """Compiler helper that adds pattern(s) constraints onto a graph.""" + + @staticmethod + def _is_not_empty(graph): + # Returns true if the given graph is *not* empty... + return graph.number_of_nodes() > 0 + + @staticmethod + def _find_first_decomposed(node, priors, + decomposed_members, decomposed_filter): + # How this works; traverse backwards and find only the predecessor + # items that are actually connected to this entity, and avoid any + # linkage that is not directly connected. This is guaranteed to be + # valid since we always iter_links() over predecessors before + # successors in all currently known patterns; a queue is used here + # since it is possible for a node to have 2+ different predecessors so + # we must search back through all of them in a reverse BFS order... + # + # Returns the first decomposed graph of those nodes (including the + # passed in node) that passes the provided filter + # function (returns none if none match). + frontier = collections.deque([node]) + # NOTE(harowja): None is in this initial set since the first prior in + # the priors list has None as its predecessor (which we don't want to + # look for a decomposed member of). + visited = set([None]) + while frontier: + node = frontier.popleft() + if node in visited: + continue + node_graph = decomposed_members[node] + if decomposed_filter(node_graph): + return node_graph + visited.add(node) + # TODO(harlowja): optimize this more to avoid searching through + # things already searched... + for (u, v) in reversed(priors): + if node == v: + # Queue its predecessor to be searched in the future... + frontier.append(u) + else: + return None + + def apply_constraints(self, graph, flow, decomposed_members): + # This list is used to track the links that have been previously + # iterated over, so that when we are trying to find a entry to + # connect to that we iterate backwards through this list, finding + # connected nodes to the current target (lets call it v) and find + # the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into + # a non-empty graph. We also retain all predecessors of v so that we + # can correctly locate u_n - 1 if u_n turns out to have decomposed into + # an empty graph (and so on). + priors = [] + # NOTE(harlowja): u, v are flows/tasks (also graph terminology since + # we are compiling things down into a flattened graph), the meaning + # of this link iteration via iter_links() is that u -> v (with the + # provided dictionary attributes, if any). + for (u, v, attr_dict) in flow.iter_links(): + if not priors: + priors.append((None, u)) + v_g = decomposed_members[v] + if not v_g.number_of_nodes(): + priors.append((u, v)) + continue + invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS) + if not invariant: + # This is a symbol *only* dependency, connect + # corresponding providers and consumers to allow the consumer + # to be executed immediately after the provider finishes (this + # is an optimization for these types of dependencies...) + u_g = decomposed_members[u] + if not u_g.number_of_nodes(): + # This must always exist, but incase it somehow doesn't... + raise exc.CompilationFailure( + "Non-invariant link being created from '%s' ->" + " '%s' even though the target '%s' was found to be" + " decomposed into an empty graph" % (v, u, u)) + for provider in u_g: + for consumer in v_g: + reasons = provider.provides & consumer.requires + if reasons: + graph.add_edge(provider, consumer, reasons=reasons) + else: + # Connect nodes with no predecessors in v to nodes with no + # successors in the *first* non-empty predecessor of v (thus + # maintaining the edge dependency). + match = self._find_first_decomposed(u, priors, + decomposed_members, + self._is_not_empty) + if match is not None: + _add_update_edges(graph, + match.no_successors_iter(), + list(v_g.no_predecessors_iter()), + attr_dict=attr_dict) + priors.append((u, v)) + + class PatternCompiler(object): - """Compiles a pattern (or task) into a compilation unit.""" + """Compiles a pattern (or task) into a compilation unit. + + Let's dive into the basic idea for how this works: + + The compiler here is provided a 'root' object via its __init__ method, + this object could be a task, or a flow (one of the supported patterns), + the end-goal is to produce a :py:class:`.Compilation` object as the result + with the needed components. If this is not possible a + :py:class:`~.taskflow.exceptions.CompilationFailure` will be raised (or + in the case where a unknown type is being requested to compile + a ``TypeError`` will be raised). + + The complexity of this comes into play when the 'root' is a flow that + contains itself other nested flows (and so-on); to compile this object and + its contained objects into a graph that *preserves* the constraints the + pattern mandates we have to go through a recursive algorithm that creates + subgraphs for each nesting level, and then on the way back up through + the recursion (now with a decomposed mapping from contained patterns or + atoms to there corresponding subgraph) we have to then connect the + subgraphs (and the atom(s) there-in) that were decomposed for a pattern + correctly into a new graph (using a :py:class:`.Linker` object to ensure + the pattern mandated constraints are retained) and then return to the + caller (and they will do the same thing up until the root node, which by + that point one graph is created with all contained atoms in the + pattern/nested patterns mandated ordering). + + Also maintained in the :py:class:`.Compilation` object is a hierarchy of + the nesting of items (which is also built up during the above mentioned + recusion, via a much simpler algorithm); this is typically used later to + determine the prior atoms of a given atom when looking up values that can + be provided to that atom for execution (see the scopes.py file for how this + works). Note that although you *could* think that the graph itself could be + used for this, which in some ways it can (for limited usage) the hierarchy + retains the nested structure (which is useful for scoping analysis/lookup) + to be able to provide back a iterator that gives back the scopes visible + at each level (the graph does not have this information once flattened). + + Let's take an example: + + Given the pattern ``f(a(b, c), d)`` where ``f`` is a + :py:class:`~taskflow.patterns.linear_flow.Flow` with items ``a(b, c)`` + where ``a`` is a :py:class:`~taskflow.patterns.linear_flow.Flow` composed + of tasks ``(b, c)`` and task ``d``. + + The algorithm that will be performed (mirroring the above described logic) + will go through the following steps (the tree hierachy building is left + out as that is more obvious):: + + Compiling f + - Decomposing flow f with no parent (must be the root) + - Compiling a + - Decomposing flow a with parent f + - Compiling b + - Decomposing task b with parent a + - Decomposed b into: + Name: b + Nodes: 1 + - b + Edges: 0 + - Compiling c + - Decomposing task c with parent a + - Decomposed c into: + Name: c + Nodes: 1 + - c + Edges: 0 + - Relinking decomposed b -> decomposed c + - Decomposed a into: + Name: a + Nodes: 2 + - b + - c + Edges: 1 + b -> c ({'invariant': True}) + - Compiling d + - Decomposing task d with parent f + - Decomposed d into: + Name: d + Nodes: 1 + - d + Edges: 0 + - Relinking decomposed a -> decomposed d + - Decomposed f into: + Name: f + Nodes: 3 + - c + - b + - d + Edges: 2 + c -> d ({'invariant': True}) + b -> c ({'invariant': True}) + """ def __init__(self, root, freeze=True): self._root = root self._history = set() + self._linker = Linker() self._freeze = freeze self._lock = threading.Lock() self._compilation = None - def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs): - """Adds new edges from nodes to other nodes in the specified graph. - - It will connect the nodes_from to the nodes_to if an edge currently - does *not* exist. When an edge is created the provided edge attributes - will be applied to the new edge between these two nodes. - """ - nodes_to = list(nodes_to) - for u in nodes_from: - for v in nodes_to: - if not graph.has_edge(u, v): - # NOTE(harlowja): give each edge its own attr copy so that - # if it's later modified that the same copy isn't modified. - graph.add_edge(u, v, attr_dict=edge_attrs.copy()) - def _flatten(self, item, parent): + """Flattens a item (pattern, task) into a graph + tree node.""" functor = self._find_flattener(item, parent) self._pre_item_flatten(item) graph, node = functor(item, parent) @@ -106,7 +309,9 @@ class PatternCompiler(object): # All nodes that have no predecessors should depend on this retry. nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry] - self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA) + if nodes_to: + _add_update_edges(graph, [retry], nodes_to, + attr_dict=_RETRY_EDGE_DATA) # Add association for each node of graph that has no existing retry. for n in graph.nodes_iter(): @@ -122,43 +327,26 @@ class PatternCompiler(object): parent.add(node) return graph, node - def _flatten_flow(self, flow, parent): - """Flattens a flow.""" + def _decompose_flow(self, flow, parent): + """Decomposes a flow into a graph, tree node + decomposed subgraphs.""" graph = gr.DiGraph(name=flow.name) node = tr.Node(flow) if parent is not None: parent.add(node) if flow.retry is not None: node.add(tr.Node(flow.retry)) - - # Flatten all nodes into a single subgraph per item (and track origin - # item to its newly expanded graph). - subgraphs = {} + decomposed_members = {} for item in flow: - subgraph = self._flatten(item, node)[0] - subgraphs[item] = subgraph - graph = gr.merge_graphs([graph, subgraph]) - - # Reconnect all items edges to their corresponding subgraphs. - for (u, v, attrs) in flow.iter_links(): - u_g = subgraphs[u] - v_g = subgraphs[v] - if any(attrs.get(k) for k in _EDGE_INVARIANTS): - # Connect nodes with no predecessors in v to nodes with - # no successors in u (thus maintaining the edge dependency). - self._add_new_edges(graph, - u_g.no_successors_iter(), - v_g.no_predecessors_iter(), - edge_attrs=attrs) - else: - # This is symbol dependency edge, connect corresponding - # providers and consumers. - for provider in u_g: - for consumer in v_g: - reasons = provider.provides & consumer.requires - if reasons: - graph.add_edge(provider, consumer, reasons=reasons) + subgraph, subnode = self._flatten(item, node) + decomposed_members[item] = subgraph + if subgraph.number_of_nodes(): + graph = gr.merge_graphs([graph, subgraph]) + return graph, node, decomposed_members + def _flatten_flow(self, flow, parent): + """Flattens a flow.""" + graph, node, decomposed_members = self._decompose_flow(flow, parent) + self._linker.apply_constraints(graph, flow, decomposed_members) if flow.retry is not None: self._connect_retry(flow.retry, graph) return graph, node diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 1c6bc9f16..c66327542 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -135,6 +135,10 @@ class MissingDependencies(DependencyFailure): self.missing_requirements = requirements +class CompilationFailure(TaskFlowException): + """Raised when some type of compilation issue is found.""" + + class IncompatibleVersion(TaskFlowException): """Raised when some type of version incompatibility is found.""" diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py index 63b3c0b03..a290c50be 100644 --- a/taskflow/tests/unit/action_engine/test_compile.py +++ b/taskflow/tests/unit/action_engine/test_compile.py @@ -264,6 +264,107 @@ class PatternCompileTest(test.TestCase): self.assertItemsEqual([b], g.no_predecessors_iter()) self.assertItemsEqual([a, c], g.no_successors_iter()) + def test_empty_flow_in_linear_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = gf.Flow("empty") + flow.add(a, empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertItemsEqual(g.edges(data=True), [ + (a, b, {'invariant': True}), + ]) + + def test_many_empty_in_graph_flow(self): + flow = gf.Flow('root') + + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + flow.add(a) + + b = lf.Flow('b') + b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[]) + b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[]) + b.add( + b_0, + lf.Flow('b.1'), lf.Flow('b.2'), + b_3, + ) + flow.add(b) + + c = lf.Flow('c') + c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2')) + flow.add(c) + + d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[]) + flow.add(d) + + flow.link(b, d) + flow.link(a, d) + flow.link(c, d) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertTrue(g.has_edge(b_0, b_3)) + self.assertTrue(g.has_edge(b_3, d)) + self.assertEqual(4, len(g)) + + def test_empty_flow_in_nested_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + + flow2 = lf.Flow("lf-2") + c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[]) + d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[]) + empty_flow = gf.Flow("empty") + flow2.add(c, empty_flow, d) + flow.add(a, flow2, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + + self.assertTrue(g.has_edge(a, c)) + self.assertTrue(g.has_edge(c, d)) + self.assertTrue(g.has_edge(d, b)) + + def test_empty_flow_in_graph_flow(self): + flow = lf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=['a'], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['a']) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertTrue(g.has_edge(a, b)) + + def test_empty_flow_in_graph_flow_empty_linkage(self): + flow = gf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + flow.link(empty_flow, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertEqual(0, len(g.edges())) + + def test_empty_flow_in_graph_flow_linkage(self): + flow = gf.Flow('lf') + a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[]) + b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[]) + empty_flow = lf.Flow("empty") + flow.add(a, empty_flow, b) + flow.link(a, b) + + compilation = compiler.PatternCompiler(flow).compile() + g = compilation.execution_graph + self.assertEqual(1, len(g.edges())) + self.assertTrue(g.has_edge(a, b)) + def test_checks_for_dups(self): flo = gf.Flow("test").add( test_utils.DummyTask(name="a"),