Most TaskFlow exception don't have 'Exception' suffix in their names, so we drop it from the few that have it, for sake of consistency and more concise and expressive code. MissingDependencies parent object is changed from InvalidState to InvariantViolation, which is more appropriate. Unused ClosedException is dropped. Breaking change: any client code that used renamed exceptions, ClosedException or relied on catching MissingDependencies as InvariantViolationException is affected. Change-Id: I649b7760d382ccb6df333bdffd667ba6b67a431e
163 lines
5.7 KiB
Python
163 lines
5.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
import logging
|
|
|
|
import networkx as nx
|
|
|
|
from taskflow import exceptions
|
|
from taskflow.patterns import graph_flow as gf
|
|
from taskflow.patterns import linear_flow as lf
|
|
from taskflow.patterns import unordered_flow as uf
|
|
from taskflow import task
|
|
from taskflow.utils import graph_utils as gu
|
|
from taskflow.utils import misc
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
# Use the 'flatten' attribute as the need to add an edge here, which is useful
|
|
# for doing later analysis of the edges (to determine why the edges were
|
|
# created).
|
|
FLATTEN_EDGE_DATA = {
|
|
'flatten': True,
|
|
}
|
|
|
|
|
|
def _graph_name(flow):
|
|
return "F:%s" % flow.name
|
|
|
|
|
|
def _log_flatten(func):
|
|
|
|
@misc.wraps(func)
|
|
def wrapper(item, flattened):
|
|
graph = func(item, flattened)
|
|
# NOTE(harlowja): this one can be expensive to calculate (especially
|
|
# the cycle detection), so only do it if we know debugging is enabled
|
|
# and not under all cases.
|
|
if LOG.isEnabledFor(logging.DEBUG):
|
|
LOG.debug("Translated '%s' into a graph:", item)
|
|
for line in gu.pformat(graph).splitlines():
|
|
# Indent it so that it's slightly offset from the above line.
|
|
LOG.debug(" %s", line)
|
|
return graph
|
|
|
|
return wrapper
|
|
|
|
|
|
def _flatten_linear(flow, flattened):
|
|
graph = nx.DiGraph(name=_graph_name(flow))
|
|
previous_nodes = []
|
|
for f in flow:
|
|
subgraph = _flatten(f, flattened)
|
|
graph = gu.merge_graphs([graph, subgraph])
|
|
# Find nodes that have no predecessor, make them have a predecessor of
|
|
# the previous nodes so that the linearity ordering is maintained. Find
|
|
# the ones with no successors and use this list to connect the next
|
|
# subgraph (if any).
|
|
for n in gu.get_no_predecessors(subgraph):
|
|
# NOTE(harlowja): give each edge its own copy so that if its later
|
|
# modified that the same copy isn't modified.
|
|
graph.add_edges_from(((n2, n, FLATTEN_EDGE_DATA.copy())
|
|
for n2 in previous_nodes
|
|
if not graph.has_edge(n2, n)))
|
|
# There should always be someone without successors, otherwise we have
|
|
# a cycle A -> B -> A situation, which should not be possible.
|
|
previous_nodes = list(gu.get_no_successors(subgraph))
|
|
return graph
|
|
|
|
|
|
def _flatten_unordered(flow, flattened):
|
|
graph = nx.DiGraph(name=_graph_name(flow))
|
|
for f in flow:
|
|
graph = gu.merge_graphs([graph, _flatten(f, flattened)])
|
|
return graph
|
|
|
|
|
|
def _flatten_task(task):
|
|
graph = nx.DiGraph(name='T:%s' % (task))
|
|
graph.add_node(task)
|
|
return graph
|
|
|
|
|
|
def _flatten_graph(flow, flattened):
|
|
graph = nx.DiGraph(name=_graph_name(flow))
|
|
subgraph_map = {}
|
|
# Flatten all nodes
|
|
for n in flow.graph.nodes_iter():
|
|
subgraph = _flatten(n, flattened)
|
|
subgraph_map[n] = subgraph
|
|
graph = gu.merge_graphs([graph, subgraph])
|
|
# Reconnect all nodes to there corresponding subgraphs
|
|
for (u, v) in flow.graph.edges_iter():
|
|
# Retain and update the original edge attributes.
|
|
u_v_attrs = gu.get_edge_attrs(flow.graph, u, v)
|
|
if not u_v_attrs:
|
|
u_v_attrs = FLATTEN_EDGE_DATA.copy()
|
|
else:
|
|
u_v_attrs.update(FLATTEN_EDGE_DATA)
|
|
u_no_succ = list(gu.get_no_successors(subgraph_map[u]))
|
|
# Connect the ones with no predecessors in v to the ones with no
|
|
# successors in u (thus maintaining the edge dependency).
|
|
for n in gu.get_no_predecessors(subgraph_map[v]):
|
|
# NOTE(harlowja): give each edge its own copy so that if its later
|
|
# modified that the same copy isn't modified.
|
|
graph.add_edges_from(((n2, n, copy.deepcopy(u_v_attrs))
|
|
for n2 in u_no_succ
|
|
if not graph.has_edge(n2, n)))
|
|
return graph
|
|
|
|
|
|
@_log_flatten
|
|
def _flatten(item, flattened):
|
|
"""Flattens a item (task/flow+subflows) into an execution graph."""
|
|
if item in flattened:
|
|
raise ValueError("Already flattened item: %s" % (item))
|
|
if isinstance(item, lf.Flow):
|
|
f = _flatten_linear(item, flattened)
|
|
elif isinstance(item, uf.Flow):
|
|
f = _flatten_unordered(item, flattened)
|
|
elif isinstance(item, gf.Flow):
|
|
f = _flatten_graph(item, flattened)
|
|
elif isinstance(item, task.BaseTask):
|
|
f = _flatten_task(item)
|
|
else:
|
|
raise TypeError("Unknown item: %r, %s" % (type(item), item))
|
|
flattened.add(item)
|
|
return f
|
|
|
|
|
|
def _post_flatten(graph):
|
|
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
|
|
key=lambda node: node.name)
|
|
if dup_names:
|
|
raise exceptions.InvariantViolation(
|
|
"Tasks with duplicate names found: %s"
|
|
% ', '.join(sorted(dup_names)))
|
|
return graph
|
|
|
|
|
|
def flatten(item, freeze=True):
|
|
"""Flattens a item (a task or flow) into a single execution graph."""
|
|
graph = _post_flatten(_flatten(item, set()))
|
|
if freeze:
|
|
# Frozen graph can't be modified...
|
|
return nx.freeze(graph)
|
|
return graph
|