Merge "Properly handle and skip empty intermediary flows"
This commit is contained in:
commit
22415f7c1d
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import threading
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
@ -43,39 +44,241 @@ class Compilation(object):
|
||||
|
||||
@property
|
||||
def execution_graph(self):
|
||||
"""The execution ordering of atoms (as a graph structure)."""
|
||||
return self._execution_graph
|
||||
|
||||
@property
|
||||
def hierarchy(self):
|
||||
"""The hierachy of patterns (as a tree structure)."""
|
||||
return self._hierarchy
|
||||
|
||||
|
||||
def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
|
||||
"""Adds/updates edges from nodes to other nodes in the specified graph.
|
||||
|
||||
It will connect the 'nodes_from' to the 'nodes_to' if an edge currently
|
||||
does *not* exist (if it does already exist then the edges attributes
|
||||
are just updated instead). When an edge is created the provided edge
|
||||
attributes dictionary will be applied to the new edge between these two
|
||||
nodes.
|
||||
"""
|
||||
# NOTE(harlowja): give each edge its own attr copy so that if it's
|
||||
# later modified that the same copy isn't modified...
|
||||
for u in nodes_from:
|
||||
for v in nodes_to:
|
||||
if not graph.has_edge(u, v):
|
||||
if attr_dict:
|
||||
graph.add_edge(u, v, attr_dict=attr_dict.copy())
|
||||
else:
|
||||
graph.add_edge(u, v)
|
||||
else:
|
||||
# Just update the attr_dict (if any).
|
||||
if attr_dict:
|
||||
graph.add_edge(u, v, attr_dict=attr_dict.copy())
|
||||
|
||||
|
||||
class Linker(object):
|
||||
"""Compiler helper that adds pattern(s) constraints onto a graph."""
|
||||
|
||||
@staticmethod
|
||||
def _is_not_empty(graph):
|
||||
# Returns true if the given graph is *not* empty...
|
||||
return graph.number_of_nodes() > 0
|
||||
|
||||
@staticmethod
|
||||
def _find_first_decomposed(node, priors,
|
||||
decomposed_members, decomposed_filter):
|
||||
# How this works; traverse backwards and find only the predecessor
|
||||
# items that are actually connected to this entity, and avoid any
|
||||
# linkage that is not directly connected. This is guaranteed to be
|
||||
# valid since we always iter_links() over predecessors before
|
||||
# successors in all currently known patterns; a queue is used here
|
||||
# since it is possible for a node to have 2+ different predecessors so
|
||||
# we must search back through all of them in a reverse BFS order...
|
||||
#
|
||||
# Returns the first decomposed graph of those nodes (including the
|
||||
# passed in node) that passes the provided filter
|
||||
# function (returns none if none match).
|
||||
frontier = collections.deque([node])
|
||||
# NOTE(harowja): None is in this initial set since the first prior in
|
||||
# the priors list has None as its predecessor (which we don't want to
|
||||
# look for a decomposed member of).
|
||||
visited = set([None])
|
||||
while frontier:
|
||||
node = frontier.popleft()
|
||||
if node in visited:
|
||||
continue
|
||||
node_graph = decomposed_members[node]
|
||||
if decomposed_filter(node_graph):
|
||||
return node_graph
|
||||
visited.add(node)
|
||||
# TODO(harlowja): optimize this more to avoid searching through
|
||||
# things already searched...
|
||||
for (u, v) in reversed(priors):
|
||||
if node == v:
|
||||
# Queue its predecessor to be searched in the future...
|
||||
frontier.append(u)
|
||||
else:
|
||||
return None
|
||||
|
||||
def apply_constraints(self, graph, flow, decomposed_members):
|
||||
# This list is used to track the links that have been previously
|
||||
# iterated over, so that when we are trying to find a entry to
|
||||
# connect to that we iterate backwards through this list, finding
|
||||
# connected nodes to the current target (lets call it v) and find
|
||||
# the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into
|
||||
# a non-empty graph. We also retain all predecessors of v so that we
|
||||
# can correctly locate u_n - 1 if u_n turns out to have decomposed into
|
||||
# an empty graph (and so on).
|
||||
priors = []
|
||||
# NOTE(harlowja): u, v are flows/tasks (also graph terminology since
|
||||
# we are compiling things down into a flattened graph), the meaning
|
||||
# of this link iteration via iter_links() is that u -> v (with the
|
||||
# provided dictionary attributes, if any).
|
||||
for (u, v, attr_dict) in flow.iter_links():
|
||||
if not priors:
|
||||
priors.append((None, u))
|
||||
v_g = decomposed_members[v]
|
||||
if not v_g.number_of_nodes():
|
||||
priors.append((u, v))
|
||||
continue
|
||||
invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS)
|
||||
if not invariant:
|
||||
# This is a symbol *only* dependency, connect
|
||||
# corresponding providers and consumers to allow the consumer
|
||||
# to be executed immediately after the provider finishes (this
|
||||
# is an optimization for these types of dependencies...)
|
||||
u_g = decomposed_members[u]
|
||||
if not u_g.number_of_nodes():
|
||||
# This must always exist, but incase it somehow doesn't...
|
||||
raise exc.CompilationFailure(
|
||||
"Non-invariant link being created from '%s' ->"
|
||||
" '%s' even though the target '%s' was found to be"
|
||||
" decomposed into an empty graph" % (v, u, u))
|
||||
for provider in u_g:
|
||||
for consumer in v_g:
|
||||
reasons = provider.provides & consumer.requires
|
||||
if reasons:
|
||||
graph.add_edge(provider, consumer, reasons=reasons)
|
||||
else:
|
||||
# Connect nodes with no predecessors in v to nodes with no
|
||||
# successors in the *first* non-empty predecessor of v (thus
|
||||
# maintaining the edge dependency).
|
||||
match = self._find_first_decomposed(u, priors,
|
||||
decomposed_members,
|
||||
self._is_not_empty)
|
||||
if match is not None:
|
||||
_add_update_edges(graph,
|
||||
match.no_successors_iter(),
|
||||
list(v_g.no_predecessors_iter()),
|
||||
attr_dict=attr_dict)
|
||||
priors.append((u, v))
|
||||
|
||||
|
||||
class PatternCompiler(object):
|
||||
"""Compiles a pattern (or task) into a compilation unit."""
|
||||
"""Compiles a pattern (or task) into a compilation unit.
|
||||
|
||||
Let's dive into the basic idea for how this works:
|
||||
|
||||
The compiler here is provided a 'root' object via its __init__ method,
|
||||
this object could be a task, or a flow (one of the supported patterns),
|
||||
the end-goal is to produce a :py:class:`.Compilation` object as the result
|
||||
with the needed components. If this is not possible a
|
||||
:py:class:`~.taskflow.exceptions.CompilationFailure` will be raised (or
|
||||
in the case where a unknown type is being requested to compile
|
||||
a ``TypeError`` will be raised).
|
||||
|
||||
The complexity of this comes into play when the 'root' is a flow that
|
||||
contains itself other nested flows (and so-on); to compile this object and
|
||||
its contained objects into a graph that *preserves* the constraints the
|
||||
pattern mandates we have to go through a recursive algorithm that creates
|
||||
subgraphs for each nesting level, and then on the way back up through
|
||||
the recursion (now with a decomposed mapping from contained patterns or
|
||||
atoms to there corresponding subgraph) we have to then connect the
|
||||
subgraphs (and the atom(s) there-in) that were decomposed for a pattern
|
||||
correctly into a new graph (using a :py:class:`.Linker` object to ensure
|
||||
the pattern mandated constraints are retained) and then return to the
|
||||
caller (and they will do the same thing up until the root node, which by
|
||||
that point one graph is created with all contained atoms in the
|
||||
pattern/nested patterns mandated ordering).
|
||||
|
||||
Also maintained in the :py:class:`.Compilation` object is a hierarchy of
|
||||
the nesting of items (which is also built up during the above mentioned
|
||||
recusion, via a much simpler algorithm); this is typically used later to
|
||||
determine the prior atoms of a given atom when looking up values that can
|
||||
be provided to that atom for execution (see the scopes.py file for how this
|
||||
works). Note that although you *could* think that the graph itself could be
|
||||
used for this, which in some ways it can (for limited usage) the hierarchy
|
||||
retains the nested structure (which is useful for scoping analysis/lookup)
|
||||
to be able to provide back a iterator that gives back the scopes visible
|
||||
at each level (the graph does not have this information once flattened).
|
||||
|
||||
Let's take an example:
|
||||
|
||||
Given the pattern ``f(a(b, c), d)`` where ``f`` is a
|
||||
:py:class:`~taskflow.patterns.linear_flow.Flow` with items ``a(b, c)``
|
||||
where ``a`` is a :py:class:`~taskflow.patterns.linear_flow.Flow` composed
|
||||
of tasks ``(b, c)`` and task ``d``.
|
||||
|
||||
The algorithm that will be performed (mirroring the above described logic)
|
||||
will go through the following steps (the tree hierachy building is left
|
||||
out as that is more obvious)::
|
||||
|
||||
Compiling f
|
||||
- Decomposing flow f with no parent (must be the root)
|
||||
- Compiling a
|
||||
- Decomposing flow a with parent f
|
||||
- Compiling b
|
||||
- Decomposing task b with parent a
|
||||
- Decomposed b into:
|
||||
Name: b
|
||||
Nodes: 1
|
||||
- b
|
||||
Edges: 0
|
||||
- Compiling c
|
||||
- Decomposing task c with parent a
|
||||
- Decomposed c into:
|
||||
Name: c
|
||||
Nodes: 1
|
||||
- c
|
||||
Edges: 0
|
||||
- Relinking decomposed b -> decomposed c
|
||||
- Decomposed a into:
|
||||
Name: a
|
||||
Nodes: 2
|
||||
- b
|
||||
- c
|
||||
Edges: 1
|
||||
b -> c ({'invariant': True})
|
||||
- Compiling d
|
||||
- Decomposing task d with parent f
|
||||
- Decomposed d into:
|
||||
Name: d
|
||||
Nodes: 1
|
||||
- d
|
||||
Edges: 0
|
||||
- Relinking decomposed a -> decomposed d
|
||||
- Decomposed f into:
|
||||
Name: f
|
||||
Nodes: 3
|
||||
- c
|
||||
- b
|
||||
- d
|
||||
Edges: 2
|
||||
c -> d ({'invariant': True})
|
||||
b -> c ({'invariant': True})
|
||||
"""
|
||||
|
||||
def __init__(self, root, freeze=True):
|
||||
self._root = root
|
||||
self._history = set()
|
||||
self._linker = Linker()
|
||||
self._freeze = freeze
|
||||
self._lock = threading.Lock()
|
||||
self._compilation = None
|
||||
|
||||
def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs):
|
||||
"""Adds new edges from nodes to other nodes in the specified graph.
|
||||
|
||||
It will connect the nodes_from to the nodes_to if an edge currently
|
||||
does *not* exist. When an edge is created the provided edge attributes
|
||||
will be applied to the new edge between these two nodes.
|
||||
"""
|
||||
nodes_to = list(nodes_to)
|
||||
for u in nodes_from:
|
||||
for v in nodes_to:
|
||||
if not graph.has_edge(u, v):
|
||||
# NOTE(harlowja): give each edge its own attr copy so that
|
||||
# if it's later modified that the same copy isn't modified.
|
||||
graph.add_edge(u, v, attr_dict=edge_attrs.copy())
|
||||
|
||||
def _flatten(self, item, parent):
|
||||
"""Flattens a item (pattern, task) into a graph + tree node."""
|
||||
functor = self._find_flattener(item, parent)
|
||||
self._pre_item_flatten(item)
|
||||
graph, node = functor(item, parent)
|
||||
@ -106,7 +309,9 @@ class PatternCompiler(object):
|
||||
|
||||
# All nodes that have no predecessors should depend on this retry.
|
||||
nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry]
|
||||
self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA)
|
||||
if nodes_to:
|
||||
_add_update_edges(graph, [retry], nodes_to,
|
||||
attr_dict=_RETRY_EDGE_DATA)
|
||||
|
||||
# Add association for each node of graph that has no existing retry.
|
||||
for n in graph.nodes_iter():
|
||||
@ -122,43 +327,26 @@ class PatternCompiler(object):
|
||||
parent.add(node)
|
||||
return graph, node
|
||||
|
||||
def _flatten_flow(self, flow, parent):
|
||||
"""Flattens a flow."""
|
||||
def _decompose_flow(self, flow, parent):
|
||||
"""Decomposes a flow into a graph, tree node + decomposed subgraphs."""
|
||||
graph = gr.DiGraph(name=flow.name)
|
||||
node = tr.Node(flow)
|
||||
if parent is not None:
|
||||
parent.add(node)
|
||||
if flow.retry is not None:
|
||||
node.add(tr.Node(flow.retry))
|
||||
|
||||
# Flatten all nodes into a single subgraph per item (and track origin
|
||||
# item to its newly expanded graph).
|
||||
subgraphs = {}
|
||||
decomposed_members = {}
|
||||
for item in flow:
|
||||
subgraph = self._flatten(item, node)[0]
|
||||
subgraphs[item] = subgraph
|
||||
graph = gr.merge_graphs([graph, subgraph])
|
||||
|
||||
# Reconnect all items edges to their corresponding subgraphs.
|
||||
for (u, v, attrs) in flow.iter_links():
|
||||
u_g = subgraphs[u]
|
||||
v_g = subgraphs[v]
|
||||
if any(attrs.get(k) for k in _EDGE_INVARIANTS):
|
||||
# Connect nodes with no predecessors in v to nodes with
|
||||
# no successors in u (thus maintaining the edge dependency).
|
||||
self._add_new_edges(graph,
|
||||
u_g.no_successors_iter(),
|
||||
v_g.no_predecessors_iter(),
|
||||
edge_attrs=attrs)
|
||||
else:
|
||||
# This is symbol dependency edge, connect corresponding
|
||||
# providers and consumers.
|
||||
for provider in u_g:
|
||||
for consumer in v_g:
|
||||
reasons = provider.provides & consumer.requires
|
||||
if reasons:
|
||||
graph.add_edge(provider, consumer, reasons=reasons)
|
||||
subgraph, subnode = self._flatten(item, node)
|
||||
decomposed_members[item] = subgraph
|
||||
if subgraph.number_of_nodes():
|
||||
graph = gr.merge_graphs([graph, subgraph])
|
||||
return graph, node, decomposed_members
|
||||
|
||||
def _flatten_flow(self, flow, parent):
|
||||
"""Flattens a flow."""
|
||||
graph, node, decomposed_members = self._decompose_flow(flow, parent)
|
||||
self._linker.apply_constraints(graph, flow, decomposed_members)
|
||||
if flow.retry is not None:
|
||||
self._connect_retry(flow.retry, graph)
|
||||
return graph, node
|
||||
|
@ -135,6 +135,10 @@ class MissingDependencies(DependencyFailure):
|
||||
self.missing_requirements = requirements
|
||||
|
||||
|
||||
class CompilationFailure(TaskFlowException):
|
||||
"""Raised when some type of compilation issue is found."""
|
||||
|
||||
|
||||
class IncompatibleVersion(TaskFlowException):
|
||||
"""Raised when some type of version incompatibility is found."""
|
||||
|
||||
|
@ -264,6 +264,107 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertItemsEqual([b], g.no_predecessors_iter())
|
||||
self.assertItemsEqual([a, c], g.no_successors_iter())
|
||||
|
||||
def test_empty_flow_in_linear_flow(self):
|
||||
flow = lf.Flow('lf')
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
empty_flow = gf.Flow("empty")
|
||||
flow.add(a, empty_flow, b)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
self.assertItemsEqual(g.edges(data=True), [
|
||||
(a, b, {'invariant': True}),
|
||||
])
|
||||
|
||||
def test_many_empty_in_graph_flow(self):
|
||||
flow = gf.Flow('root')
|
||||
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
|
||||
flow.add(a)
|
||||
|
||||
b = lf.Flow('b')
|
||||
b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[])
|
||||
b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[])
|
||||
b.add(
|
||||
b_0,
|
||||
lf.Flow('b.1'), lf.Flow('b.2'),
|
||||
b_3,
|
||||
)
|
||||
flow.add(b)
|
||||
|
||||
c = lf.Flow('c')
|
||||
c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2'))
|
||||
flow.add(c)
|
||||
|
||||
d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[])
|
||||
flow.add(d)
|
||||
|
||||
flow.link(b, d)
|
||||
flow.link(a, d)
|
||||
flow.link(c, d)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
self.assertTrue(g.has_edge(b_0, b_3))
|
||||
self.assertTrue(g.has_edge(b_3, d))
|
||||
self.assertEqual(4, len(g))
|
||||
|
||||
def test_empty_flow_in_nested_flow(self):
|
||||
flow = lf.Flow('lf')
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
|
||||
flow2 = lf.Flow("lf-2")
|
||||
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
|
||||
d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[])
|
||||
empty_flow = gf.Flow("empty")
|
||||
flow2.add(c, empty_flow, d)
|
||||
flow.add(a, flow2, b)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
|
||||
self.assertTrue(g.has_edge(a, c))
|
||||
self.assertTrue(g.has_edge(c, d))
|
||||
self.assertTrue(g.has_edge(d, b))
|
||||
|
||||
def test_empty_flow_in_graph_flow(self):
|
||||
flow = lf.Flow('lf')
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=['a'], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['a'])
|
||||
empty_flow = lf.Flow("empty")
|
||||
flow.add(a, empty_flow, b)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
self.assertTrue(g.has_edge(a, b))
|
||||
|
||||
def test_empty_flow_in_graph_flow_empty_linkage(self):
|
||||
flow = gf.Flow('lf')
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
empty_flow = lf.Flow("empty")
|
||||
flow.add(a, empty_flow, b)
|
||||
flow.link(empty_flow, b)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
self.assertEqual(0, len(g.edges()))
|
||||
|
||||
def test_empty_flow_in_graph_flow_linkage(self):
|
||||
flow = gf.Flow('lf')
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
empty_flow = lf.Flow("empty")
|
||||
flow.add(a, empty_flow, b)
|
||||
flow.link(a, b)
|
||||
|
||||
compilation = compiler.PatternCompiler(flow).compile()
|
||||
g = compilation.execution_graph
|
||||
self.assertEqual(1, len(g.edges()))
|
||||
self.assertTrue(g.has_edge(a, b))
|
||||
|
||||
def test_checks_for_dups(self):
|
||||
flo = gf.Flow("test").add(
|
||||
test_utils.DummyTask(name="a"),
|
||||
|
Loading…
Reference in New Issue
Block a user