diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py index 61fc0825..8ba4e719 100644 --- a/taskflow/utils/flow_utils.py +++ b/taskflow/utils/flow_utils.py @@ -16,8 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import logging +import threading import networkx as nx @@ -27,6 +27,7 @@ from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow.utils import graph_utils as gu +from taskflow.utils import lock_utils as lu from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -39,15 +40,123 @@ FLATTEN_EDGE_DATA = { } -def _graph_name(flow): - return "F:%s" % flow.name +class Flattener(object): + def __init__(self, root, freeze=True): + self._root = root + self._graph = None + self._history = set() + self._freeze = bool(freeze) + self._lock = threading.Lock() + self._edge_data = FLATTEN_EDGE_DATA.copy() + def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs=None): + """Adds new edges from nodes to other nodes in the specified graph, + with the following edge attributes (defaulting to the class provided + edge_data if None), if the edge does not already exist. + """ + if edge_attrs is None: + edge_attrs = self._edge_data + else: + edge_attrs = edge_attrs.copy() + edge_attrs.update(self._edge_data) + for u in nodes_from: + for v in nodes_to: + if not graph.has_edge(u, v): + # NOTE(harlowja): give each edge its own attr copy so that + # if it's later modified that the same copy isn't modified. + graph.add_edge(u, v, attr_dict=edge_attrs.copy()) -def _log_flatten(func): + def _flatten(self, item): + functor = self._find_flattener(item) + if not functor: + raise TypeError("Unknown type requested to flatten: %s (%s)" + % (item, type(item))) + self._pre_item_flatten(item) + graph = functor(item) + self._post_item_flatten(item, graph) + return graph - @misc.wraps(func) - def wrapper(item, flattened): - graph = func(item, flattened) + def _find_flattener(self, item): + """Locates the flattening function to use to flatten the given item.""" + if isinstance(item, lf.Flow): + return self._flatten_linear + elif isinstance(item, uf.Flow): + return self._flatten_unordered + elif isinstance(item, gf.Flow): + return self._flatten_graph + elif isinstance(item, task.BaseTask): + return self._flatten_task + else: + return None + + def _flatten_linear(self, flow): + """Flattens a linear flow.""" + graph = nx.DiGraph(name=flow.name) + previous_nodes = [] + for item in flow: + subgraph = self._flatten(item) + graph = gu.merge_graphs([graph, subgraph]) + # Find nodes that have no predecessor, make them have a predecessor + # of the previous nodes so that the linearity ordering is + # maintained. Find the ones with no successors and use this list + # to connect the next subgraph (if any). + self._add_new_edges(graph, + previous_nodes, + list(gu.get_no_predecessors(subgraph))) + # There should always be someone without successors, otherwise we + # have a cycle A -> B -> A situation, which should not be possible. + previous_nodes = list(gu.get_no_successors(subgraph)) + return graph + + def _flatten_unordered(self, flow): + """Flattens a unordered flow.""" + graph = nx.DiGraph(name=flow.name) + for item in flow: + # NOTE(harlowja): we do *not* connect the graphs together, this + # retains that each item (translated to subgraph) is disconnected + # from each other which will result in unordered execution while + # running. + graph = gu.merge_graphs([graph, self._flatten(item)]) + return graph + + def _flatten_task(self, task): + """Flattens a individual task.""" + graph = nx.DiGraph(name=task.name) + graph.add_node(task) + return graph + + def _flatten_graph(self, flow): + """Flattens a graph flow.""" + graph = nx.DiGraph(name=flow.name) + # Flatten all nodes into a single subgraph per node. + subgraph_map = {} + for item in flow: + subgraph = self._flatten(item) + subgraph_map[item] = subgraph + graph = gu.merge_graphs([graph, subgraph]) + # Reconnect all node edges to there corresponding subgraphs. + for (u, v) in flow.graph.edges_iter(): + # Retain and update the original edge attributes. + u_v_attrs = gu.get_edge_attrs(flow.graph, u, v) + # Connect the ones with no predecessors in v to the ones with no + # successors in u (thus maintaining the edge dependency). + self._add_new_edges(graph, + list(gu.get_no_successors(subgraph_map[u])), + list(gu.get_no_predecessors(subgraph_map[v])), + edge_attrs=u_v_attrs) + return graph + + def _pre_item_flatten(self, item): + """Called before a item is flattened; any pre-flattening actions.""" + if id(item) in self._history: + raise ValueError("Already flattened item: %s (%s), recursive" + " flattening not supported" % (item, id(item))) + LOG.debug("Starting to flatten '%s'", item) + self._history.add(id(item)) + + def _post_item_flatten(self, item, graph): + """Called before a item is flattened; any post-flattening actions.""" + LOG.debug("Finished flattening '%s'", item) # NOTE(harlowja): this one can be expensive to calculate (especially # the cycle detection), so only do it if we know debugging is enabled # and not under all cases. @@ -56,107 +165,36 @@ def _log_flatten(func): for line in gu.pformat(graph).splitlines(): # Indent it so that it's slightly offset from the above line. LOG.debug(" %s", line) - return graph - return wrapper + def _pre_flatten(self): + """Called before the flattening of the item starts.""" + self._history.clear() + def _post_flatten(self, graph): + """Called after the flattening of the item finishes successfully.""" + dup_names = misc.get_duplicate_keys(graph.nodes_iter(), + key=lambda node: node.name) + if dup_names: + dup_names = ', '.join(sorted(dup_names)) + raise exceptions.InvariantViolation("Tasks with duplicate names " + "found: %s" % (dup_names)) + self._history.clear() -def _flatten_linear(flow, flattened): - graph = nx.DiGraph(name=_graph_name(flow)) - previous_nodes = [] - for f in flow: - subgraph = _flatten(f, flattened) - graph = gu.merge_graphs([graph, subgraph]) - # Find nodes that have no predecessor, make them have a predecessor of - # the previous nodes so that the linearity ordering is maintained. Find - # the ones with no successors and use this list to connect the next - # subgraph (if any). - for n in gu.get_no_predecessors(subgraph): - # NOTE(harlowja): give each edge its own copy so that if its later - # modified that the same copy isn't modified. - graph.add_edges_from(((n2, n, FLATTEN_EDGE_DATA.copy()) - for n2 in previous_nodes - if not graph.has_edge(n2, n))) - # There should always be someone without successors, otherwise we have - # a cycle A -> B -> A situation, which should not be possible. - previous_nodes = list(gu.get_no_successors(subgraph)) - return graph - - -def _flatten_unordered(flow, flattened): - graph = nx.DiGraph(name=_graph_name(flow)) - for f in flow: - graph = gu.merge_graphs([graph, _flatten(f, flattened)]) - return graph - - -def _flatten_task(task): - graph = nx.DiGraph(name='T:%s' % (task)) - graph.add_node(task) - return graph - - -def _flatten_graph(flow, flattened): - graph = nx.DiGraph(name=_graph_name(flow)) - subgraph_map = {} - # Flatten all nodes. - for n in flow.graph.nodes_iter(): - subgraph = _flatten(n, flattened) - subgraph_map[n] = subgraph - graph = gu.merge_graphs([graph, subgraph]) - # Reconnect all nodes to there corresponding subgraphs. - for (u, v) in flow.graph.edges_iter(): - # Retain and update the original edge attributes. - u_v_attrs = gu.get_edge_attrs(flow.graph, u, v) - if not u_v_attrs: - u_v_attrs = FLATTEN_EDGE_DATA.copy() + @lu.locked + def flatten(self): + """Flattens a item (a task or flow) into a single execution graph.""" + if self._graph is not None: + return self._graph + self._pre_flatten() + graph = self._flatten(self._root) + self._post_flatten(graph) + if self._freeze: + self._graph = nx.freeze(graph) else: - u_v_attrs.update(FLATTEN_EDGE_DATA) - u_no_succ = list(gu.get_no_successors(subgraph_map[u])) - # Connect the ones with no predecessors in v to the ones with no - # successors in u (thus maintaining the edge dependency). - for n in gu.get_no_predecessors(subgraph_map[v]): - # NOTE(harlowja): give each edge its own copy so that if its later - # modified that the same copy isn't modified. - graph.add_edges_from(((n2, n, copy.deepcopy(u_v_attrs)) - for n2 in u_no_succ - if not graph.has_edge(n2, n))) - return graph - - -@_log_flatten -def _flatten(item, flattened): - """Flattens a item (task/flow+subflows) into an execution graph.""" - if item in flattened: - raise ValueError("Already flattened item: %s" % (item)) - if isinstance(item, lf.Flow): - f = _flatten_linear(item, flattened) - elif isinstance(item, uf.Flow): - f = _flatten_unordered(item, flattened) - elif isinstance(item, gf.Flow): - f = _flatten_graph(item, flattened) - elif isinstance(item, task.BaseTask): - f = _flatten_task(item) - else: - raise TypeError("Unknown item: %r, %s" % (type(item), item)) - flattened.add(item) - return f - - -def _post_flatten(graph): - dup_names = misc.get_duplicate_keys(graph.nodes_iter(), - key=lambda node: node.name) - if dup_names: - raise exceptions.InvariantViolation( - "Tasks with duplicate names found: %s" - % ', '.join(sorted(dup_names))) - return graph + self._graph = graph + return self._graph def flatten(item, freeze=True): """Flattens a item (a task or flow) into a single execution graph.""" - graph = _post_flatten(_flatten(item, set())) - if freeze: - # Frozen graph can't be modified... - return nx.freeze(graph) - return graph + return Flattener(item, freeze=freeze).flatten()