Move flattening to the action engine compiler

Since flattening is only one way to compile a flow and
nested flows and atoms into a compilation unit move this
functionality into the engine module where it is used.

Change-Id: Ifea6b56cf5f2a9c1d16acabfaae6f28aeb6534a0
This commit is contained in:
Joshua Harlow
2014-05-30 17:22:10 -07:00
parent 86e651b9ce
commit a32f9245f9
4 changed files with 247 additions and 246 deletions

View File

@@ -22,8 +22,3 @@ The following classes and modules are *recommended* for external usage:
.. autofunction:: taskflow.utils.persistence_utils.temporary_flow_detail
.. autofunction:: taskflow.utils.persistence_utils.pformat
Internal usage
==============
.. automodule:: taskflow.utils.flow_utils

View File

@@ -15,9 +15,17 @@
# under the License.
import collections
import logging
from taskflow import exceptions as exc
from taskflow.utils import flow_utils
from taskflow import flow
from taskflow import retry
from taskflow import task
from taskflow.types import graph as gr
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
# The result of a compilers compile() is this tuple (for now it is just a
# execution graph but in the future it may grow to include more attributes
@@ -39,7 +47,7 @@ class PatternCompiler(object):
useful to retain part of this relationship).
"""
def compile(self, root):
graph = flow_utils.flatten(root)
graph = _Flattener(root).flatten()
if graph.number_of_nodes() == 0:
# Try to get a name attribute, otherwise just use the object
# string representation directly if that attribute does not exist.
@@ -47,3 +55,151 @@ class PatternCompiler(object):
raise exc.Empty("Root container '%s' (%s) is empty."
% (name, type(root)))
return Compilation(graph)
_RETRY_EDGE_DATA = {
'retry': True,
}
class _Flattener(object):
"""Flattens a root item (task/flow) into a execution graph."""
def __init__(self, root, freeze=True):
self._root = root
self._graph = None
self._history = set()
self._freeze = bool(freeze)
def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs):
"""Adds new edges from nodes to other nodes in the specified graph,
with the following edge attributes (defaulting to the class provided
edge_data if None), if the edge does not already exist.
"""
nodes_to = list(nodes_to)
for u in nodes_from:
for v in nodes_to:
if not graph.has_edge(u, v):
# NOTE(harlowja): give each edge its own attr copy so that
# if it's later modified that the same copy isn't modified.
graph.add_edge(u, v, attr_dict=edge_attrs.copy())
def _flatten(self, item):
functor = self._find_flattener(item)
if not functor:
raise TypeError("Unknown type requested to flatten: %s (%s)"
% (item, type(item)))
self._pre_item_flatten(item)
graph = functor(item)
self._post_item_flatten(item, graph)
return graph
def _find_flattener(self, item):
"""Locates the flattening function to use to flatten the given item."""
if isinstance(item, flow.Flow):
return self._flatten_flow
elif isinstance(item, task.BaseTask):
return self._flatten_task
elif isinstance(item, retry.Retry):
raise TypeError("Retry controller %s (%s) is used not as a flow "
"parameter" % (item, type(item)))
else:
return None
def _connect_retry(self, retry, graph):
graph.add_node(retry)
# All graph nodes that have no predecessors should depend on its retry
nodes_to = [n for n in graph.no_predecessors_iter() if n != retry]
self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA)
# Add link to retry for each node of subgraph that hasn't
# a parent retry
for n in graph.nodes_iter():
if n != retry and 'retry' not in graph.node[n]:
graph.node[n]['retry'] = retry
def _flatten_task(self, task):
"""Flattens a individual task."""
graph = gr.DiGraph(name=task.name)
graph.add_node(task)
return graph
def _flatten_flow(self, flow):
"""Flattens a graph flow."""
graph = gr.DiGraph(name=flow.name)
# Flatten all nodes into a single subgraph per node.
subgraph_map = {}
for item in flow:
subgraph = self._flatten(item)
subgraph_map[item] = subgraph
graph = gr.merge_graphs([graph, subgraph])
# Reconnect all node edges to their corresponding subgraphs.
for (u, v, attrs) in flow.iter_links():
u_g = subgraph_map[u]
v_g = subgraph_map[v]
if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')):
# Connect nodes with no predecessors in v to nodes with
# no successors in u (thus maintaining the edge dependency).
self._add_new_edges(graph,
u_g.no_successors_iter(),
v_g.no_predecessors_iter(),
edge_attrs=attrs)
else:
# This is dependency-only edge, connect corresponding
# providers and consumers.
for provider in u_g:
for consumer in v_g:
reasons = provider.provides & consumer.requires
if reasons:
graph.add_edge(provider, consumer, reasons=reasons)
if flow.retry is not None:
self._connect_retry(flow.retry, graph)
return graph
def _pre_item_flatten(self, item):
"""Called before a item is flattened; any pre-flattening actions."""
if id(item) in self._history:
raise ValueError("Already flattened item: %s (%s), recursive"
" flattening not supported" % (item, id(item)))
self._history.add(id(item))
def _post_item_flatten(self, item, graph):
"""Called before a item is flattened; any post-flattening actions."""
def _pre_flatten(self):
"""Called before the flattening of the item starts."""
self._history.clear()
def _post_flatten(self, graph):
"""Called after the flattening of the item finishes successfully."""
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
key=lambda node: node.name)
if dup_names:
dup_names = ', '.join(sorted(dup_names))
raise exc.Duplicate("Atoms with duplicate names "
"found: %s" % (dup_names))
self._history.clear()
# NOTE(harlowja): this one can be expensive to calculate (especially
# the cycle detection), so only do it if we know debugging is enabled
# and not under all cases.
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("Translated '%s' into a graph:", self._root)
for line in graph.pformat().splitlines():
# Indent it so that it's slightly offset from the above line.
LOG.debug(" %s", line)
def flatten(self):
"""Flattens a item (a task or flow) into a single execution graph."""
if self._graph is not None:
return self._graph
self._pre_flatten()
graph = self._flatten(self._root)
self._post_flatten(graph)
self._graph = graph
if self._freeze:
self._graph.freeze()
return self._graph

View File

@@ -24,7 +24,8 @@ from taskflow import retry
from taskflow import test
from taskflow.tests import utils as t_utils
from taskflow.utils import flow_utils as f_utils
from taskflow.engines.action_engine import compiler
def _make_many(amount):
@@ -35,24 +36,26 @@ def _make_many(amount):
return tasks
class FlattenTest(test.TestCase):
def test_flatten_task(self):
class PatternCompileTest(test.TestCase):
def test_task(self):
task = t_utils.DummyTask(name='a')
g = f_utils.flatten(task)
compilation = compiler.PatternCompiler().compile(task)
g = compilation.execution_graph
self.assertEqual(list(g.nodes()), [task])
self.assertEqual(list(g.edges()), [])
def test_flatten_retry(self):
def test_retry(self):
r = retry.AlwaysRevert('r1')
msg_regex = "^Retry controller .* is used not as a flow parameter"
self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, r)
self.assertRaisesRegexp(TypeError, msg_regex,
compiler.PatternCompiler().compile, r)
def test_flatten_wrong_object(self):
def test_wrong_object(self):
msg_regex = '^Unknown type requested to flatten'
self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, 42)
self.assertRaisesRegexp(TypeError, msg_regex,
compiler.PatternCompiler().compile, 42)
def test_linear_flatten(self):
def test_linear(self):
a, b, c, d = _make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
@@ -60,7 +63,8 @@ class FlattenTest(test.TestCase):
sflo.add(d)
flo.add(sflo)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
order = g.topological_sort()
@@ -71,18 +75,20 @@ class FlattenTest(test.TestCase):
self.assertEqual([d], list(g.no_successors_iter()))
self.assertEqual([a], list(g.no_predecessors_iter()))
def test_invalid_flatten(self):
def test_invalid(self):
a, b, c = _make_many(3)
flo = lf.Flow("test")
flo.add(a, b, c)
flo.add(flo)
self.assertRaises(ValueError, f_utils.flatten, flo)
self.assertRaises(ValueError,
compiler.PatternCompiler().compile, flo)
def test_unordered_flatten(self):
def test_unordered(self):
a, b, c, d = _make_many(4)
flo = uf.Flow("test")
flo.add(a, b, c, d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(set([a, b, c, d]),
@@ -90,14 +96,16 @@ class FlattenTest(test.TestCase):
self.assertEqual(set([a, b, c, d]),
set(g.no_predecessors_iter()))
def test_linear_nested_flatten(self):
def test_linear_nested(self):
a, b, c, d = _make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
flo2 = uf.Flow("test2")
flo2.add(c, d)
flo.add(flo2)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
lb = g.subgraph([a, b])
@@ -112,7 +120,7 @@ class FlattenTest(test.TestCase):
self.assertTrue(g.has_edge(b, c))
self.assertTrue(g.has_edge(b, d))
def test_unordered_nested_flatten(self):
def test_unordered_nested(self):
a, b, c, d = _make_many(4)
flo = uf.Flow("test")
flo.add(a, b)
@@ -120,7 +128,8 @@ class FlattenTest(test.TestCase):
flo2.add(c, d)
flo.add(flo2)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
for n in [a, b]:
self.assertFalse(g.has_edge(n, c))
@@ -134,14 +143,15 @@ class FlattenTest(test.TestCase):
lb = g.subgraph([c, d])
self.assertEqual(1, lb.number_of_edges())
def test_unordered_nested_in_linear_flatten(self):
def test_unordered_nested_in_linear(self):
a, b, c, d = _make_many(4)
flo = lf.Flow('lt').add(
a,
uf.Flow('ut').add(b, c),
d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(), [
(a, b),
@@ -150,16 +160,17 @@ class FlattenTest(test.TestCase):
(c, d)
])
def test_graph_flatten(self):
def test_graph(self):
a, b, c, d = _make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
def test_graph_flatten_nested(self):
def test_graph_nested(self):
a, b, c, d, e, f, g = _make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -168,14 +179,15 @@ class FlattenTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
graph = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
graph = compilation.execution_graph
self.assertEqual(7, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(e, f, {'invariant': True}),
(f, g, {'invariant': True})
])
def test_graph_flatten_nested_graph(self):
def test_graph_nested_graph(self):
a, b, c, d, e, f, g = _make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -184,11 +196,12 @@ class FlattenTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(7, len(g))
self.assertEqual(0, g.number_of_edges())
def test_graph_flatten_links(self):
def test_graph_links(self):
a, b, c, d = _make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -196,7 +209,8 @@ class FlattenTest(test.TestCase):
flo.link(b, c)
flo.link(c, d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'manual': True}),
@@ -206,12 +220,13 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
def test_graph_flatten_dependencies(self):
def test_graph_dependencies(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'reasons': set(['x'])})
@@ -219,7 +234,7 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
def test_graph_flatten_nested_requires(self):
def test_graph_nested_requires(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
@@ -228,7 +243,8 @@ class FlattenTest(test.TestCase):
lf.Flow("test2").add(b, c)
)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, c, {'reasons': set(['x'])}),
@@ -237,7 +253,7 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a, b], g.no_predecessors_iter())
self.assertItemsEqual([c], g.no_successors_iter())
def test_graph_flatten_nested_provides(self):
def test_graph_nested_provides(self):
a = t_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = t_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=[])
@@ -246,7 +262,8 @@ class FlattenTest(test.TestCase):
lf.Flow("test2").add(b, c)
)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(b, c, {'invariant': True}),
@@ -255,46 +272,50 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([b], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
def test_flatten_checks_for_dups(self):
def test_checks_for_dups(self):
flo = gf.Flow("test").add(
t_utils.DummyTask(name="a"),
t_utils.DummyTask(name="a")
)
self.assertRaisesRegexp(exc.Duplicate,
'^Tasks with duplicate names',
f_utils.flatten, flo)
'^Atoms with duplicate names',
compiler.PatternCompiler().compile, flo)
def test_flatten_checks_for_dups_globally(self):
def test_checks_for_dups_globally(self):
flo = gf.Flow("test").add(
gf.Flow("int1").add(t_utils.DummyTask(name="a")),
gf.Flow("int2").add(t_utils.DummyTask(name="a")))
self.assertRaisesRegexp(exc.Duplicate,
'^Tasks with duplicate names',
f_utils.flatten, flo)
'^Atoms with duplicate names',
compiler.PatternCompiler().compile, flo)
def test_flatten_retry_in_linear_flow(self):
def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_unordered_flow(self):
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_graph_flow(self):
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
def test_flatten_retry_in_nested_flows(self):
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertItemsEqual(g.edges(data=True), [
@@ -304,11 +325,13 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([c1], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
def test_flatten_retry_in_linear_flow_with_tasks(self):
def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = lf.Flow("test", c).add(a, b)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'invariant': True}),
@@ -320,11 +343,13 @@ class FlattenTest(test.TestCase):
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
def test_flatten_retry_in_unordered_flow_with_tasks(self):
def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = uf.Flow("test", c).add(a, b)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c, a, {'retry': True}),
@@ -336,11 +361,12 @@ class FlattenTest(test.TestCase):
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
def test_flatten_retry_in_graph_flow_with_tasks(self):
def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("cp")
a, b, c = _make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
@@ -355,7 +381,7 @@ class FlattenTest(test.TestCase):
self.assertIs(r, g.node[b]['retry'])
self.assertIs(r, g.node[c]['retry'])
def test_flatten_retries_hierarchy(self):
def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = _make_many(4)
@@ -363,7 +389,9 @@ class FlattenTest(test.TestCase):
a,
lf.Flow("test", c2).add(b, c),
d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),
@@ -379,14 +407,16 @@ class FlattenTest(test.TestCase):
self.assertIs(c1, g.node[c2]['retry'])
self.assertIs(None, g.node[c1].get('retry'))
def test_flatten_retry_subflows_hierarchy(self):
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = _make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test").add(b, c),
d)
g = f_utils.flatten(flo)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),

View File

@@ -1,180 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow import exceptions
from taskflow import flow
from taskflow import retry
from taskflow import task
from taskflow.types import graph as gr
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
RETRY_EDGE_DATA = {
'retry': True,
}
class Flattener(object):
def __init__(self, root, freeze=True):
self._root = root
self._graph = None
self._history = set()
self._freeze = bool(freeze)
def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs):
"""Adds new edges from nodes to other nodes in the specified graph,
with the following edge attributes (defaulting to the class provided
edge_data if None), if the edge does not already exist.
"""
nodes_to = list(nodes_to)
for u in nodes_from:
for v in nodes_to:
if not graph.has_edge(u, v):
# NOTE(harlowja): give each edge its own attr copy so that
# if it's later modified that the same copy isn't modified.
graph.add_edge(u, v, attr_dict=edge_attrs.copy())
def _flatten(self, item):
functor = self._find_flattener(item)
if not functor:
raise TypeError("Unknown type requested to flatten: %s (%s)"
% (item, type(item)))
self._pre_item_flatten(item)
graph = functor(item)
self._post_item_flatten(item, graph)
return graph
def _find_flattener(self, item):
"""Locates the flattening function to use to flatten the given item."""
if isinstance(item, flow.Flow):
return self._flatten_flow
elif isinstance(item, task.BaseTask):
return self._flatten_task
elif isinstance(item, retry.Retry):
raise TypeError("Retry controller %s (%s) is used not as a flow "
"parameter" % (item, type(item)))
else:
return None
def _connect_retry(self, retry, graph):
graph.add_node(retry)
# All graph nodes that have no predecessors should depend on its retry
nodes_to = [n for n in graph.no_predecessors_iter() if n != retry]
self._add_new_edges(graph, [retry], nodes_to, RETRY_EDGE_DATA)
# Add link to retry for each node of subgraph that hasn't
# a parent retry
for n in graph.nodes_iter():
if n != retry and 'retry' not in graph.node[n]:
graph.node[n]['retry'] = retry
def _flatten_task(self, task):
"""Flattens a individual task."""
graph = gr.DiGraph(name=task.name)
graph.add_node(task)
return graph
def _flatten_flow(self, flow):
"""Flattens a graph flow."""
graph = gr.DiGraph(name=flow.name)
# Flatten all nodes into a single subgraph per node.
subgraph_map = {}
for item in flow:
subgraph = self._flatten(item)
subgraph_map[item] = subgraph
graph = gr.merge_graphs([graph, subgraph])
# Reconnect all node edges to their corresponding subgraphs.
for (u, v, attrs) in flow.iter_links():
u_g = subgraph_map[u]
v_g = subgraph_map[v]
if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')):
# Connect nodes with no predecessors in v to nodes with
# no successors in u (thus maintaining the edge dependency).
self._add_new_edges(graph,
u_g.no_successors_iter(),
v_g.no_predecessors_iter(),
edge_attrs=attrs)
else:
# This is dependency-only edge, connect corresponding
# providers and consumers.
for provider in u_g:
for consumer in v_g:
reasons = provider.provides & consumer.requires
if reasons:
graph.add_edge(provider, consumer, reasons=reasons)
if flow.retry is not None:
self._connect_retry(flow.retry, graph)
return graph
def _pre_item_flatten(self, item):
"""Called before a item is flattened; any pre-flattening actions."""
if id(item) in self._history:
raise ValueError("Already flattened item: %s (%s), recursive"
" flattening not supported" % (item, id(item)))
LOG.debug("Starting to flatten '%s'", item)
self._history.add(id(item))
def _post_item_flatten(self, item, graph):
"""Called before a item is flattened; any post-flattening actions."""
LOG.debug("Finished flattening '%s'", item)
# NOTE(harlowja): this one can be expensive to calculate (especially
# the cycle detection), so only do it if we know debugging is enabled
# and not under all cases.
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("Translated '%s' into a graph:", item)
for line in graph.pformat().splitlines():
# Indent it so that it's slightly offset from the above line.
LOG.debug(" %s", line)
def _pre_flatten(self):
"""Called before the flattening of the item starts."""
self._history.clear()
def _post_flatten(self, graph):
"""Called after the flattening of the item finishes successfully."""
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
key=lambda node: node.name)
if dup_names:
dup_names = ', '.join(sorted(dup_names))
raise exceptions.Duplicate("Tasks with duplicate names "
"found: %s" % (dup_names))
self._history.clear()
def flatten(self):
"""Flattens a item (a task or flow) into a single execution graph."""
if self._graph is not None:
return self._graph
self._pre_flatten()
graph = self._flatten(self._root)
self._post_flatten(graph)
self._graph = graph
if self._freeze:
self._graph.freeze()
return self._graph
def flatten(item, freeze=True):
"""Flattens a item (a task or flow) into a single execution graph."""
return Flattener(item, freeze=freeze).flatten()