Merge "Refactor task/flow flattening"
This commit is contained in:
@@ -16,8 +16,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import networkx as nx
|
||||
|
||||
@@ -27,6 +27,7 @@ from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
from taskflow.utils import graph_utils as gu
|
||||
from taskflow.utils import lock_utils as lu
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -39,15 +40,123 @@ FLATTEN_EDGE_DATA = {
|
||||
}
|
||||
|
||||
|
||||
def _graph_name(flow):
|
||||
return "F:%s" % flow.name
|
||||
class Flattener(object):
|
||||
def __init__(self, root, freeze=True):
|
||||
self._root = root
|
||||
self._graph = None
|
||||
self._history = set()
|
||||
self._freeze = bool(freeze)
|
||||
self._lock = threading.Lock()
|
||||
self._edge_data = FLATTEN_EDGE_DATA.copy()
|
||||
|
||||
def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs=None):
|
||||
"""Adds new edges from nodes to other nodes in the specified graph,
|
||||
with the following edge attributes (defaulting to the class provided
|
||||
edge_data if None), if the edge does not already exist.
|
||||
"""
|
||||
if edge_attrs is None:
|
||||
edge_attrs = self._edge_data
|
||||
else:
|
||||
edge_attrs = edge_attrs.copy()
|
||||
edge_attrs.update(self._edge_data)
|
||||
for u in nodes_from:
|
||||
for v in nodes_to:
|
||||
if not graph.has_edge(u, v):
|
||||
# NOTE(harlowja): give each edge its own attr copy so that
|
||||
# if it's later modified that the same copy isn't modified.
|
||||
graph.add_edge(u, v, attr_dict=edge_attrs.copy())
|
||||
|
||||
def _log_flatten(func):
|
||||
def _flatten(self, item):
|
||||
functor = self._find_flattener(item)
|
||||
if not functor:
|
||||
raise TypeError("Unknown type requested to flatten: %s (%s)"
|
||||
% (item, type(item)))
|
||||
self._pre_item_flatten(item)
|
||||
graph = functor(item)
|
||||
self._post_item_flatten(item, graph)
|
||||
return graph
|
||||
|
||||
@misc.wraps(func)
|
||||
def wrapper(item, flattened):
|
||||
graph = func(item, flattened)
|
||||
def _find_flattener(self, item):
|
||||
"""Locates the flattening function to use to flatten the given item."""
|
||||
if isinstance(item, lf.Flow):
|
||||
return self._flatten_linear
|
||||
elif isinstance(item, uf.Flow):
|
||||
return self._flatten_unordered
|
||||
elif isinstance(item, gf.Flow):
|
||||
return self._flatten_graph
|
||||
elif isinstance(item, task.BaseTask):
|
||||
return self._flatten_task
|
||||
else:
|
||||
return None
|
||||
|
||||
def _flatten_linear(self, flow):
|
||||
"""Flattens a linear flow."""
|
||||
graph = nx.DiGraph(name=flow.name)
|
||||
previous_nodes = []
|
||||
for item in flow:
|
||||
subgraph = self._flatten(item)
|
||||
graph = gu.merge_graphs([graph, subgraph])
|
||||
# Find nodes that have no predecessor, make them have a predecessor
|
||||
# of the previous nodes so that the linearity ordering is
|
||||
# maintained. Find the ones with no successors and use this list
|
||||
# to connect the next subgraph (if any).
|
||||
self._add_new_edges(graph,
|
||||
previous_nodes,
|
||||
list(gu.get_no_predecessors(subgraph)))
|
||||
# There should always be someone without successors, otherwise we
|
||||
# have a cycle A -> B -> A situation, which should not be possible.
|
||||
previous_nodes = list(gu.get_no_successors(subgraph))
|
||||
return graph
|
||||
|
||||
def _flatten_unordered(self, flow):
|
||||
"""Flattens a unordered flow."""
|
||||
graph = nx.DiGraph(name=flow.name)
|
||||
for item in flow:
|
||||
# NOTE(harlowja): we do *not* connect the graphs together, this
|
||||
# retains that each item (translated to subgraph) is disconnected
|
||||
# from each other which will result in unordered execution while
|
||||
# running.
|
||||
graph = gu.merge_graphs([graph, self._flatten(item)])
|
||||
return graph
|
||||
|
||||
def _flatten_task(self, task):
|
||||
"""Flattens a individual task."""
|
||||
graph = nx.DiGraph(name=task.name)
|
||||
graph.add_node(task)
|
||||
return graph
|
||||
|
||||
def _flatten_graph(self, flow):
|
||||
"""Flattens a graph flow."""
|
||||
graph = nx.DiGraph(name=flow.name)
|
||||
# Flatten all nodes into a single subgraph per node.
|
||||
subgraph_map = {}
|
||||
for item in flow:
|
||||
subgraph = self._flatten(item)
|
||||
subgraph_map[item] = subgraph
|
||||
graph = gu.merge_graphs([graph, subgraph])
|
||||
# Reconnect all node edges to there corresponding subgraphs.
|
||||
for (u, v) in flow.graph.edges_iter():
|
||||
# Retain and update the original edge attributes.
|
||||
u_v_attrs = gu.get_edge_attrs(flow.graph, u, v)
|
||||
# Connect the ones with no predecessors in v to the ones with no
|
||||
# successors in u (thus maintaining the edge dependency).
|
||||
self._add_new_edges(graph,
|
||||
list(gu.get_no_successors(subgraph_map[u])),
|
||||
list(gu.get_no_predecessors(subgraph_map[v])),
|
||||
edge_attrs=u_v_attrs)
|
||||
return graph
|
||||
|
||||
def _pre_item_flatten(self, item):
|
||||
"""Called before a item is flattened; any pre-flattening actions."""
|
||||
if id(item) in self._history:
|
||||
raise ValueError("Already flattened item: %s (%s), recursive"
|
||||
" flattening not supported" % (item, id(item)))
|
||||
LOG.debug("Starting to flatten '%s'", item)
|
||||
self._history.add(id(item))
|
||||
|
||||
def _post_item_flatten(self, item, graph):
|
||||
"""Called before a item is flattened; any post-flattening actions."""
|
||||
LOG.debug("Finished flattening '%s'", item)
|
||||
# NOTE(harlowja): this one can be expensive to calculate (especially
|
||||
# the cycle detection), so only do it if we know debugging is enabled
|
||||
# and not under all cases.
|
||||
@@ -56,107 +165,36 @@ def _log_flatten(func):
|
||||
for line in gu.pformat(graph).splitlines():
|
||||
# Indent it so that it's slightly offset from the above line.
|
||||
LOG.debug(" %s", line)
|
||||
return graph
|
||||
|
||||
return wrapper
|
||||
def _pre_flatten(self):
|
||||
"""Called before the flattening of the item starts."""
|
||||
self._history.clear()
|
||||
|
||||
def _post_flatten(self, graph):
|
||||
"""Called after the flattening of the item finishes successfully."""
|
||||
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
|
||||
key=lambda node: node.name)
|
||||
if dup_names:
|
||||
dup_names = ', '.join(sorted(dup_names))
|
||||
raise exceptions.InvariantViolation("Tasks with duplicate names "
|
||||
"found: %s" % (dup_names))
|
||||
self._history.clear()
|
||||
|
||||
def _flatten_linear(flow, flattened):
|
||||
graph = nx.DiGraph(name=_graph_name(flow))
|
||||
previous_nodes = []
|
||||
for f in flow:
|
||||
subgraph = _flatten(f, flattened)
|
||||
graph = gu.merge_graphs([graph, subgraph])
|
||||
# Find nodes that have no predecessor, make them have a predecessor of
|
||||
# the previous nodes so that the linearity ordering is maintained. Find
|
||||
# the ones with no successors and use this list to connect the next
|
||||
# subgraph (if any).
|
||||
for n in gu.get_no_predecessors(subgraph):
|
||||
# NOTE(harlowja): give each edge its own copy so that if its later
|
||||
# modified that the same copy isn't modified.
|
||||
graph.add_edges_from(((n2, n, FLATTEN_EDGE_DATA.copy())
|
||||
for n2 in previous_nodes
|
||||
if not graph.has_edge(n2, n)))
|
||||
# There should always be someone without successors, otherwise we have
|
||||
# a cycle A -> B -> A situation, which should not be possible.
|
||||
previous_nodes = list(gu.get_no_successors(subgraph))
|
||||
return graph
|
||||
|
||||
|
||||
def _flatten_unordered(flow, flattened):
|
||||
graph = nx.DiGraph(name=_graph_name(flow))
|
||||
for f in flow:
|
||||
graph = gu.merge_graphs([graph, _flatten(f, flattened)])
|
||||
return graph
|
||||
|
||||
|
||||
def _flatten_task(task):
|
||||
graph = nx.DiGraph(name='T:%s' % (task))
|
||||
graph.add_node(task)
|
||||
return graph
|
||||
|
||||
|
||||
def _flatten_graph(flow, flattened):
|
||||
graph = nx.DiGraph(name=_graph_name(flow))
|
||||
subgraph_map = {}
|
||||
# Flatten all nodes.
|
||||
for n in flow.graph.nodes_iter():
|
||||
subgraph = _flatten(n, flattened)
|
||||
subgraph_map[n] = subgraph
|
||||
graph = gu.merge_graphs([graph, subgraph])
|
||||
# Reconnect all nodes to there corresponding subgraphs.
|
||||
for (u, v) in flow.graph.edges_iter():
|
||||
# Retain and update the original edge attributes.
|
||||
u_v_attrs = gu.get_edge_attrs(flow.graph, u, v)
|
||||
if not u_v_attrs:
|
||||
u_v_attrs = FLATTEN_EDGE_DATA.copy()
|
||||
@lu.locked
|
||||
def flatten(self):
|
||||
"""Flattens a item (a task or flow) into a single execution graph."""
|
||||
if self._graph is not None:
|
||||
return self._graph
|
||||
self._pre_flatten()
|
||||
graph = self._flatten(self._root)
|
||||
self._post_flatten(graph)
|
||||
if self._freeze:
|
||||
self._graph = nx.freeze(graph)
|
||||
else:
|
||||
u_v_attrs.update(FLATTEN_EDGE_DATA)
|
||||
u_no_succ = list(gu.get_no_successors(subgraph_map[u]))
|
||||
# Connect the ones with no predecessors in v to the ones with no
|
||||
# successors in u (thus maintaining the edge dependency).
|
||||
for n in gu.get_no_predecessors(subgraph_map[v]):
|
||||
# NOTE(harlowja): give each edge its own copy so that if its later
|
||||
# modified that the same copy isn't modified.
|
||||
graph.add_edges_from(((n2, n, copy.deepcopy(u_v_attrs))
|
||||
for n2 in u_no_succ
|
||||
if not graph.has_edge(n2, n)))
|
||||
return graph
|
||||
|
||||
|
||||
@_log_flatten
|
||||
def _flatten(item, flattened):
|
||||
"""Flattens a item (task/flow+subflows) into an execution graph."""
|
||||
if item in flattened:
|
||||
raise ValueError("Already flattened item: %s" % (item))
|
||||
if isinstance(item, lf.Flow):
|
||||
f = _flatten_linear(item, flattened)
|
||||
elif isinstance(item, uf.Flow):
|
||||
f = _flatten_unordered(item, flattened)
|
||||
elif isinstance(item, gf.Flow):
|
||||
f = _flatten_graph(item, flattened)
|
||||
elif isinstance(item, task.BaseTask):
|
||||
f = _flatten_task(item)
|
||||
else:
|
||||
raise TypeError("Unknown item: %r, %s" % (type(item), item))
|
||||
flattened.add(item)
|
||||
return f
|
||||
|
||||
|
||||
def _post_flatten(graph):
|
||||
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
|
||||
key=lambda node: node.name)
|
||||
if dup_names:
|
||||
raise exceptions.InvariantViolation(
|
||||
"Tasks with duplicate names found: %s"
|
||||
% ', '.join(sorted(dup_names)))
|
||||
return graph
|
||||
self._graph = graph
|
||||
return self._graph
|
||||
|
||||
|
||||
def flatten(item, freeze=True):
|
||||
"""Flattens a item (a task or flow) into a single execution graph."""
|
||||
graph = _post_flatten(_flatten(item, set()))
|
||||
if freeze:
|
||||
# Frozen graph can't be modified...
|
||||
return nx.freeze(graph)
|
||||
return graph
|
||||
return Flattener(item, freeze=freeze).flatten()
|
||||
|
||||
Reference in New Issue
Block a user