Simplify flow action engine compilation

Instead of the added complexity of discarding flow nodes
we can simplify the compilation process by just retaining
them and jumping over them in further iteration and graph
and tree runtime usage.

This change moves toward a model that does just this, which
makes it also easier to in the future use the newly added
flow graph nodes to do meaningful things (like use them as
a point to change which flow_detail is used).

Change-Id: Icb1695f4b995a0392f940837514774768f222db4
This commit is contained in:
Joshua Harlow
2015-09-04 13:14:25 -07:00
committed by Joshua Harlow
parent ba4704cd18
commit 79d25e69e8
12 changed files with 535 additions and 526 deletions

View File

@@ -18,10 +18,31 @@ import abc
import itertools
import weakref
from networkx.algorithms import traversal
import six
from taskflow.engines.action_engine import compiler as co
from taskflow import states as st
from taskflow.utils import iter_utils
def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
"""Iterates connected nodes in execution graph (from starting set).
Jumps over nodes with ``noop`` attribute (does not yield them back).
"""
stack = list(initial_nodes_iter)
while stack:
node = stack.pop()
node_attrs = graph.node[node]
if not node_attrs.get('noop'):
yield node
try:
node_kind = node_attrs['kind']
connected_to_functor = connected_to_functors[node_kind]
except KeyError:
pass
else:
stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
@@ -74,8 +95,8 @@ class IgnoreDecider(Decider):
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
successors_iter = runtime.analyzer.iterate_subgraph(self._atom)
runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
@@ -105,66 +126,67 @@ class Analyzer(object):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
def get_next_nodes(self, node=None):
"""Get next nodes to run (originating from node or all nodes)."""
if node is None:
execute = self.browse_nodes_for_execute()
revert = self.browse_nodes_for_revert()
return execute + revert
state = self.get_state(node)
intention = self._storage.get_atom_intention(node.name)
def iter_next_atoms(self, atom=None):
"""Iterate next atoms to run (originating from atom or all atoms)."""
if atom is None:
return iter_utils.unique_seen(self.browse_atoms_for_execute(),
self.browse_atoms_for_revert())
state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name)
if state == st.SUCCESS:
if intention == st.REVERT:
return [
(node, NoOpDecider()),
]
return iter([
(atom, NoOpDecider()),
])
elif intention == st.EXECUTE:
return self.browse_nodes_for_execute(node)
return self.browse_atoms_for_execute(atom=atom)
else:
return []
return iter([])
elif state == st.REVERTED:
return self.browse_nodes_for_revert(node)
return self.browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
return self.browse_nodes_for_revert()
return self.browse_atoms_for_revert()
else:
return []
return iter([])
def browse_nodes_for_execute(self, node=None):
"""Browse next nodes to execute.
def browse_atoms_for_execute(self, atom=None):
"""Browse next atoms to execute.
This returns a collection of nodes that *may* be ready to be
executed, if given a specific node it will only examine the successors
of that node, otherwise it will examine the whole graph.
This returns a iterator of atoms that *may* be ready to be
executed, if given a specific atom, it will only examine the successors
of that atom, otherwise it will examine the whole graph.
"""
if node is not None:
nodes = self._execution_graph.successors(node)
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
else:
nodes = self._execution_graph.nodes_iter()
ready_nodes = []
for node in nodes:
is_ready, late_decider = self._get_maybe_ready_for_execute(node)
successors_iter = self._execution_graph.successors_iter
atom_it = _depth_first_iterate(self._execution_graph,
{co.FLOW: successors_iter},
successors_iter(atom))
for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready:
ready_nodes.append((node, late_decider))
return ready_nodes
yield (atom, late_decider)
def browse_nodes_for_revert(self, node=None):
"""Browse next nodes to revert.
def browse_atoms_for_revert(self, atom=None):
"""Browse next atoms to revert.
This returns a collection of nodes that *may* be ready to be be
reverted, if given a specific node it will only examine the
predecessors of that node, otherwise it will examine the whole
This returns a iterator of atoms that *may* be ready to be be
reverted, if given a specific atom it will only examine the
predecessors of that atom, otherwise it will examine the whole
graph.
"""
if node is not None:
nodes = self._execution_graph.predecessors(node)
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
else:
nodes = self._execution_graph.nodes_iter()
ready_nodes = []
for node in nodes:
is_ready, late_decider = self._get_maybe_ready_for_revert(node)
predecessors_iter = self._execution_graph.predecessors_iter
atom_it = _depth_first_iterate(self._execution_graph,
{co.FLOW: predecessors_iter},
predecessors_iter(atom))
for atom in atom_it:
is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready:
ready_nodes.append((node, late_decider))
return ready_nodes
yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker,
@@ -187,59 +209,71 @@ class Analyzer(object):
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
def decider_fetcher(atom):
edge_deciders = self._runtime.fetch_edge_deciders(atom)
if edge_deciders:
return IgnoreDecider(atom, edge_deciders)
else:
return NoOpDecider()
predecessors_iter = self._execution_graph.predecessors_iter
connected_fetcher = lambda atom: \
_depth_first_iterate(self._execution_graph,
{co.FLOW: predecessors_iter},
predecessors_iter(atom))
connected_checker = lambda connected_iter: \
all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in connected_iter)
connected_fetcher = self._execution_graph.predecessors_iter
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker,
decider_fetcher)
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
successors_iter = self._execution_graph.successors_iter
connected_fetcher = lambda atom: \
_depth_first_iterate(self._execution_graph,
{co.FLOW: successors_iter},
successors_iter(atom))
connected_checker = lambda connected_iter: \
all(state in (st.PENDING, st.REVERTED)
for state, _intention in connected_iter)
decider_fetcher = lambda atom: NoOpDecider()
connected_fetcher = self._execution_graph.successors_iter
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker,
decider_fetcher)
def iterate_subgraph(self, atom):
"""Iterates a subgraph connected to given atom."""
for _src, dst in traversal.dfs_edges(self._execution_graph, atom):
yield dst
def iterate_connected_atoms(self, atom):
"""Iterates **all** successor atoms connected to given atom."""
successors_iter = self._execution_graph.successors_iter
return _depth_first_iterate(
self._execution_graph, {
co.FLOW: successors_iter,
co.TASK: successors_iter,
co.RETRY: successors_iter,
}, successors_iter(atom))
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
for atom in self._runtime.fetch_atoms_by_kind('retry'):
for atom in self.iterate_nodes((co.RETRY,)):
if not state or self.get_state(atom) == state:
yield atom
def iterate_all_nodes(self):
"""Yields back all nodes in the execution graph."""
for node in self._execution_graph.nodes_iter():
yield node
def iterate_nodes(self, allowed_kinds):
"""Yields back all nodes of specified kinds in the execution graph."""
for node, node_data in self._execution_graph.nodes_iter(data=True):
if node_data['kind'] in allowed_kinds:
yield node
def find_atom_retry(self, atom):
"""Returns the retry atom associated to the given atom (or none)."""
return self._execution_graph.node[atom].get('retry')
def find_retry(self, node):
"""Returns the retry atom associated to the given node (or none)."""
return self._execution_graph.node[node].get(co.RETRY)
def is_success(self):
"""Checks if all nodes in the execution graph are in 'happy' state."""
for atom in self.iterate_all_nodes():
"""Checks if all atoms in the execution graph are in 'happy' state."""
for atom in self.iterate_nodes(co.ATOMS):
atom_state = self.get_state(atom)
if atom_state == st.IGNORE:
continue

View File

@@ -49,7 +49,7 @@ class MachineMemory(object):
"""State machine memory."""
def __init__(self):
self.next_nodes = set()
self.next_up = set()
self.not_done = set()
self.failures = []
self.done = set()
@@ -115,24 +115,25 @@ class MachineBuilder(object):
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING
def iter_next_nodes(target_node=None, apply_deciders=True):
# Yields and filters and tweaks the next nodes to execute...
maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
for node, late_decider in maybe_nodes:
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
for atom, late_decider in maybe_atoms_it:
if apply_deciders:
proceed = late_decider.check_and_affect(self._runtime)
if proceed:
yield node
yield atom
else:
yield node
yield atom
def resume(old_state, new_state, event):
# This reaction function just updates the state machines memory
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
memory.next_nodes.update(self._completer.resume())
memory.next_nodes.update(iter_next_nodes())
memory.next_up.update(
iter_utils.unique_seen(self._completer.resume(),
iter_next_atoms()))
return SCHEDULE
def game_over(old_state, new_state, event):
@@ -142,17 +143,17 @@ class MachineBuilder(object):
# it is *always* called before the final state is entered.
if memory.failures:
return FAILED
leftover_nodes = iter_utils.count(
leftover_atoms = iter_utils.count(
# Avoid activating the deciders, since at this point
# the engine is finishing and there will be no more further
# work done anyway...
iter_next_nodes(apply_deciders=False))
if leftover_nodes:
iter_next_atoms(apply_deciders=False))
if leftover_atoms:
# Ok we didn't finish (either reverting or executing...) so
# that means we must of been stopped at some point...
LOG.blather("Suspension determined to have been reacted to"
" since (at least) %s nodes have been left in an"
" unfinished state", leftover_nodes)
" since (at least) %s atoms have been left in an"
" unfinished state", leftover_atoms)
return SUSPENDED
elif self._analyzer.is_success():
return SUCCESS
@@ -165,13 +166,13 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
if is_runnable() and memory.next_nodes:
not_done, failures = do_schedule(memory.next_nodes)
if is_runnable() and memory.next_up:
not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_nodes.intersection_update(not_done)
memory.next_up.intersection_update(not_done)
return WAIT
def wait(old_state, new_state, event):
@@ -190,13 +191,13 @@ class MachineBuilder(object):
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_nodes = set()
next_up = set()
while memory.done:
fut = memory.done.pop()
node = fut.atom
atom = fut.atom
try:
event, result = fut.result()
retain = do_complete(node, event, result)
retain = do_complete(atom, event, result)
if isinstance(result, failure.Failure):
if retain:
memory.failures.append(result)
@@ -208,24 +209,24 @@ class MachineBuilder(object):
# is not enabled, which would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = self._storage.get_atom_intention(
node.name)
atom.name)
LOG.debug("Discarding failure '%s' (in"
" response to event '%s') under"
" completion units request during"
" completion of node '%s' (intention"
" completion of atom '%s' (intention"
" is to %s)", result, event,
node, intention)
atom, intention)
except Exception:
memory.failures.append(failure.Failure())
else:
try:
more_nodes = set(iter_next_nodes(target_node=node))
more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
else:
next_nodes.update(more_nodes)
if is_runnable() and next_nodes and not memory.failures:
memory.next_nodes.update(next_nodes)
next_up.update(more_work)
if is_runnable() and next_up and not memory.failures:
memory.next_up.update(next_up)
return SCHEDULE
elif memory.not_done:
return WAIT

View File

@@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
import fasteners
import six
from taskflow import exceptions as exc
from taskflow import flow
@@ -28,18 +28,35 @@ from taskflow.types import tree as tr
from taskflow.utils import iter_utils
from taskflow.utils import misc
from taskflow.flow import (LINK_INVARIANT, LINK_RETRY) # noqa
LOG = logging.getLogger(__name__)
_RETRY_EDGE_DATA = {
flow.LINK_RETRY: True,
}
_EDGE_INVARIANTS = (flow.LINK_INVARIANT, flow.LINK_MANUAL, flow.LINK_RETRY)
_EDGE_REASONS = flow.LINK_REASONS
# Constants attached to node attributes in the execution graph (and tree
# node metadata), provided as constants here and constants in the compilation
# class (so that users will not have to import this file to access them); but
# provide them as module constants so that internal code can more
# easily access them...
TASK = 'task'
RETRY = 'retry'
FLOW = 'flow'
# Quite often used together, so make a tuple everyone can share...
ATOMS = (TASK, RETRY)
class Compilation(object):
"""The result of a compilers compile() is this *immutable* object."""
#: Task nodes will have a ``kind`` attribute/metadata key with this value.
TASK = TASK
#: Retry nodes will have a ``kind`` attribute/metadata key with this value.
RETRY = RETRY
#: Flow nodes will have a ``kind`` attribute/metadata key with this value.
FLOW = FLOW
def __init__(self, execution_graph, hierarchy):
self._execution_graph = execution_graph
self._hierarchy = hierarchy
@@ -55,6 +72,12 @@ class Compilation(object):
return self._hierarchy
def _overlap_occurence_detector(to_graph, from_graph):
"""Returns how many nodes in 'from' graph are in 'to' graph (if any)."""
return iter_utils.count(node for node in from_graph.nodes_iter()
if node in to_graph)
def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
"""Adds/updates edges from nodes to other nodes in the specified graph.
@@ -79,118 +102,7 @@ def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
graph.add_edge(u, v, attr_dict=attr_dict.copy())
class Linker(object):
"""Compiler helper that adds pattern(s) constraints onto a graph."""
@staticmethod
def _is_not_empty(graph):
# Returns true if the given graph is *not* empty...
return graph.number_of_nodes() > 0
@staticmethod
def _find_first_decomposed(node, priors,
decomposed_members, decomposed_filter):
# How this works; traverse backwards and find only the predecessor
# items that are actually connected to this entity, and avoid any
# linkage that is not directly connected. This is guaranteed to be
# valid since we always iter_links() over predecessors before
# successors in all currently known patterns; a queue is used here
# since it is possible for a node to have 2+ different predecessors so
# we must search back through all of them in a reverse BFS order...
#
# Returns the first decomposed graph of those nodes (including the
# passed in node) that passes the provided filter
# function (returns none if none match).
frontier = collections.deque([node])
# NOTE(harowja): None is in this initial set since the first prior in
# the priors list has None as its predecessor (which we don't want to
# look for a decomposed member of).
visited = set([None])
while frontier:
node = frontier.popleft()
if node in visited:
continue
node_graph = decomposed_members[node]
if decomposed_filter(node_graph):
return node_graph
visited.add(node)
# TODO(harlowja): optimize this more to avoid searching through
# things already searched...
for (u, v) in reversed(priors):
if node == v:
# Queue its predecessor to be searched in the future...
frontier.append(u)
else:
return None
def apply_constraints(self, graph, flow, decomposed_members):
# This list is used to track the links that have been previously
# iterated over, so that when we are trying to find a entry to
# connect to that we iterate backwards through this list, finding
# connected nodes to the current target (lets call it v) and find
# the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into
# a non-empty graph. We also retain all predecessors of v so that we
# can correctly locate u_n - 1 if u_n turns out to have decomposed into
# an empty graph (and so on).
priors = []
# NOTE(harlowja): u, v are flows/tasks (also graph terminology since
# we are compiling things down into a flattened graph), the meaning
# of this link iteration via iter_links() is that u -> v (with the
# provided dictionary attributes, if any).
for (u, v, attr_dict) in flow.iter_links():
if not priors:
priors.append((None, u))
v_g = decomposed_members[v]
if not v_g.number_of_nodes():
priors.append((u, v))
continue
invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS)
if not invariant:
# This is a symbol *only* dependency, connect
# corresponding providers and consumers to allow the consumer
# to be executed immediately after the provider finishes (this
# is an optimization for these types of dependencies...)
u_g = decomposed_members[u]
if not u_g.number_of_nodes():
# This must always exist, but incase it somehow doesn't...
raise exc.CompilationFailure(
"Non-invariant link being created from '%s' ->"
" '%s' even though the target '%s' was found to be"
" decomposed into an empty graph" % (v, u, u))
for u in u_g.nodes_iter():
for v in v_g.nodes_iter():
# This is using the intersection() method vs the &
# operator since the latter doesn't work with frozen
# sets (when used in combination with ordered sets).
#
# If this is not done the following happens...
#
# TypeError: unsupported operand type(s)
# for &: 'frozenset' and 'OrderedSet'
depends_on = u.provides.intersection(v.requires)
if depends_on:
edge_attrs = {
_EDGE_REASONS: frozenset(depends_on),
}
_add_update_edges(graph,
[u], [v],
attr_dict=edge_attrs)
else:
# Connect nodes with no predecessors in v to nodes with no
# successors in the *first* non-empty predecessor of v (thus
# maintaining the edge dependency).
match = self._find_first_decomposed(u, priors,
decomposed_members,
self._is_not_empty)
if match is not None:
_add_update_edges(graph,
match.no_successors_iter(),
list(v_g.no_predecessors_iter()),
attr_dict=attr_dict)
priors.append((u, v))
class _TaskCompiler(object):
class TaskCompiler(object):
"""Non-recursive compiler of tasks."""
@staticmethod
@@ -199,71 +111,67 @@ class _TaskCompiler(object):
def compile(self, task, parent=None):
graph = gr.DiGraph(name=task.name)
graph.add_node(task)
node = tr.Node(task)
graph.add_node(task, kind=TASK)
node = tr.Node(task, kind=TASK)
if parent is not None:
parent.add(node)
return graph, node
class _FlowCompiler(object):
class FlowCompiler(object):
"""Recursive compiler of flows."""
@staticmethod
def handles(obj):
return isinstance(obj, flow.Flow)
def __init__(self, deep_compiler_func, linker):
def __init__(self, deep_compiler_func):
self._deep_compiler_func = deep_compiler_func
self._linker = linker
def _connect_retry(self, retry, graph):
graph.add_node(retry)
# All nodes that have no predecessors should depend on this retry.
nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry]
if nodes_to:
_add_update_edges(graph, [retry], nodes_to,
attr_dict=_RETRY_EDGE_DATA)
# Add association for each node of graph that has no existing retry.
for n in graph.nodes_iter():
if n is not retry and flow.LINK_RETRY not in graph.node[n]:
graph.node[n][flow.LINK_RETRY] = retry
@staticmethod
def _occurence_detector(to_graph, from_graph):
return iter_utils.count(node for node in from_graph.nodes_iter()
if node in to_graph)
def _decompose_flow(self, flow, parent=None):
"""Decomposes a flow into a graph, tree node + decomposed subgraphs."""
graph = gr.DiGraph(name=flow.name)
node = tr.Node(flow)
if parent is not None:
parent.add(node)
if flow.retry is not None:
node.add(tr.Node(flow.retry))
decomposed_members = {}
for item in flow:
subgraph, _subnode = self._deep_compiler_func(item, parent=node)
decomposed_members[item] = subgraph
if subgraph.number_of_nodes():
graph = gr.merge_graphs(
graph, subgraph,
# We can specialize this to be simpler than the default
# algorithm which creates overhead that we don't
# need for our purposes...
overlap_detector=self._occurence_detector)
return graph, node, decomposed_members
def compile(self, flow, parent=None):
graph, node, decomposed_members = self._decompose_flow(flow,
parent=parent)
self._linker.apply_constraints(graph, flow, decomposed_members)
"""Decomposes a flow into a graph and scope tree hierarchy."""
graph = gr.DiGraph(name=flow.name)
graph.add_node(flow, kind=FLOW, noop=True)
tree_node = tr.Node(flow, kind=FLOW, noop=True)
if parent is not None:
parent.add(tree_node)
if flow.retry is not None:
self._connect_retry(flow.retry, graph)
return graph, node
tree_node.add(tr.Node(flow.retry, kind=RETRY))
decomposed = dict(
(child, self._deep_compiler_func(child, parent=tree_node)[0])
for child in flow)
decomposed_graphs = list(six.itervalues(decomposed))
graph = gr.merge_graphs(graph, *decomposed_graphs,
overlap_detector=_overlap_occurence_detector)
for u, v, attr_dict in flow.iter_links():
u_graph = decomposed[u]
v_graph = decomposed[v]
_add_update_edges(graph, u_graph.no_successors_iter(),
list(v_graph.no_predecessors_iter()),
attr_dict=attr_dict)
if flow.retry is not None:
graph.add_node(flow.retry, kind=RETRY)
_add_update_edges(graph, [flow], [flow.retry],
attr_dict={LINK_INVARIANT: True})
for node in graph.nodes_iter():
if node is not flow.retry and node is not flow:
graph.node[node].setdefault(RETRY, flow.retry)
from_nodes = [flow.retry]
connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
else:
from_nodes = [flow]
connected_attr_dict = {LINK_INVARIANT: True}
connected_to = [
node for node in graph.no_predecessors_iter() if node is not flow
]
if connected_to:
# Ensure all nodes in this graph(s) that have no
# predecessors depend on this flow (or this flow's retry) so that
# we can depend on the flow being traversed before its
# children (even though at the current time it will be skipped).
_add_update_edges(graph, from_nodes, connected_to,
attr_dict=connected_attr_dict)
return graph, tree_node
class PatternCompiler(object):
@@ -288,8 +196,8 @@ class PatternCompiler(object):
the recursion (now with a decomposed mapping from contained patterns or
atoms to there corresponding subgraph) we have to then connect the
subgraphs (and the atom(s) there-in) that were decomposed for a pattern
correctly into a new graph (using a :py:class:`.Linker` object to ensure
the pattern mandated constraints are retained) and then return to the
correctly into a new graph and then ensure the pattern mandated
constraints are retained. Finally we then return to the
caller (and they will do the same thing up until the root node, which by
that point one graph is created with all contained atoms in the
pattern/nested patterns mandated ordering).
@@ -364,14 +272,10 @@ class PatternCompiler(object):
def __init__(self, root, freeze=True):
self._root = root
self._history = set()
self._linker = Linker()
self._freeze = freeze
self._lock = threading.Lock()
self._compilation = None
self._matchers = [
_FlowCompiler(self._compile, self._linker),
_TaskCompiler(),
]
self._matchers = (FlowCompiler(self._compile), TaskCompiler())
self._level = 0
def _compile(self, item, parent=None):
@@ -418,12 +322,17 @@ class PatternCompiler(object):
def _post_compile(self, graph, node):
"""Called after the compilation of the root finishes successfully."""
dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
key=lambda node: node.name)
dup_names = misc.get_duplicate_keys(
(node for node, node_attrs in graph.nodes_iter(data=True)
if node_attrs['kind'] in ATOMS),
key=lambda node: node.name)
if dup_names:
raise exc.Duplicate(
"Atoms with duplicate names found: %s" % (sorted(dup_names)))
if graph.number_of_nodes() == 0:
atoms = iter_utils.count(
node for node, node_attrs in graph.nodes_iter(data=True)
if node_attrs['kind'] in ATOMS)
if atoms == 0:
raise exc.Empty("Root container '%s' (%s) is empty"
% (self._root, type(self._root)))
self._history.clear()

View File

@@ -20,6 +20,7 @@ import weakref
from oslo_utils import reflection
import six
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
@@ -62,7 +63,7 @@ class RevertAndRetry(Strategy):
self._retry = retry
def apply(self):
tweaked = self._runtime.reset_nodes([self._retry], state=None,
tweaked = self._runtime.reset_atoms([self._retry], state=None,
intention=st.RETRY)
tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None,
intention=st.REVERT))
@@ -79,8 +80,9 @@ class RevertAll(Strategy):
self._analyzer = runtime.analyzer
def apply(self):
return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(),
state=None, intention=st.REVERT)
return self._runtime.reset_atoms(
self._analyzer.iterate_nodes(co.ATOMS),
state=None, intention=st.REVERT)
class Revert(Strategy):
@@ -93,7 +95,7 @@ class Revert(Strategy):
self._atom = atom
def apply(self):
tweaked = self._runtime.reset_nodes([self._atom], state=None,
tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.REVERT)
tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None,
intention=st.REVERT))
@@ -126,26 +128,26 @@ class Completer(object):
self._retry_action.complete_reversion(retry, result)
def resume(self):
"""Resumes nodes in the contained graph.
"""Resumes atoms in the contained graph.
This is done to allow any previously completed or failed nodes to
be analyzed, there results processed and any potential nodes affected
This is done to allow any previously completed or failed atoms to
be analyzed, there results processed and any potential atoms affected
to be adjusted as needed.
This should return a set of nodes which should be the initial set of
nodes that were previously not finished (due to a RUNNING or REVERTING
This should return a set of atoms which should be the initial set of
atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) == st.FAILURE:
self._process_atom_failure(node, self._storage.get(node.name))
for atom in self._analyzer.iterate_nodes(co.ATOMS):
if self._analyzer.get_state(atom) == st.FAILURE:
self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._runtime.retry_subflow(retry)
unfinished_nodes = set()
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
unfinished_nodes.add(node)
return unfinished_nodes
unfinished_atoms = set()
for atom in self._analyzer.iterate_nodes(co.ATOMS):
if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING):
unfinished_atoms.add(atom)
return unfinished_atoms
def complete(self, node, event, result):
"""Performs post-execution completion of a node.
@@ -167,7 +169,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
retry = self._analyzer.find_atom_retry(atom)
retry = self._analyzer.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
strategy = self._retry_action.on_failure(retry, atom, failure)

View File

@@ -241,11 +241,10 @@ class ActionEngine(base.Engine):
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
self.storage.ensure_atoms(
self._compilation.execution_graph.nodes_iter())
for node in self._compilation.execution_graph.nodes_iter():
if node.inject:
self.storage.inject_atom_args(node.name,
node.inject,
self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
if atom.inject:
self.storage.inject_atom_args(atom.name, atom.inject,
transient=transient)
@fasteners.locked
@@ -255,8 +254,8 @@ class ActionEngine(base.Engine):
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
execution_graph = self._compilation.execution_graph
if LOG.isEnabledFor(logging.BLATHER):
execution_graph = self._compilation.execution_graph
LOG.blather("Validating scoping and argument visibility for"
" execution graph with %s nodes and %s edges with"
" density %0.3f", execution_graph.number_of_nodes(),
@@ -269,18 +268,17 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
fetch_func = self.storage.fetch_unsatisfied_args
for node in execution_graph.nodes_iter():
node_missing = fetch_func(node.name, node.rebind,
optional_args=node.optional)
if node_missing:
cause = exc.MissingDependencies(node,
sorted(node_missing),
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
atom_missing = self.storage.fetch_unsatisfied_args(
atom.name, atom.rebind, optional_args=atom.optional)
if atom_missing:
cause = exc.MissingDependencies(atom,
sorted(atom_missing),
cause=last_cause)
last_cause = cause
last_node = node
last_node = atom
missing_nodes += 1
missing.update(node_missing)
missing.update(atom_missing)
if missing:
# For when a task is provided (instead of a flow) and that
# task is the only item in the graph and its missing deps, avoid

View File

@@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow import flow
from taskflow import exceptions as exc
from taskflow.flow import LINK_DECIDER
from taskflow import states as st
from taskflow import task
from taskflow.utils import misc
@@ -47,7 +48,6 @@ class Runtime(object):
self._storage = storage
self._compilation = compilation
self._atom_cache = {}
self._atoms_by_kind = {}
def compile(self):
"""Compiles & caches frequently used execution helper objects.
@@ -59,47 +59,47 @@ class Runtime(object):
specific scheduler and so-on).
"""
change_state_handlers = {
'task': functools.partial(self.task_action.change_state,
progress=0.0),
'retry': self.retry_action.change_state,
com.TASK: functools.partial(self.task_action.change_state,
progress=0.0),
com.RETRY: self.retry_action.change_state,
}
schedulers = {
'retry': self.retry_scheduler,
'task': self.task_scheduler,
com.RETRY: self.retry_scheduler,
com.TASK: self.task_scheduler,
}
execution_graph = self._compilation.execution_graph
all_retry_atoms = []
all_task_atoms = []
for atom in self.analyzer.iterate_all_nodes():
metadata = {}
walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
if isinstance(atom, task.BaseTask):
check_transition_handler = st.check_task_transition
change_state_handler = change_state_handlers['task']
scheduler = schedulers['task']
all_task_atoms.append(atom)
check_transition_handlers = {
com.TASK: st.check_task_transition,
com.RETRY: st.check_retry_transition,
}
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
node_kind = node_data['kind']
if node_kind == com.FLOW:
continue
elif node_kind in com.ATOMS:
check_transition_handler = check_transition_handlers[node_kind]
change_state_handler = change_state_handlers[node_kind]
scheduler = schedulers[node_kind]
else:
check_transition_handler = st.check_retry_transition
change_state_handler = change_state_handlers['retry']
scheduler = schedulers['retry']
all_retry_atoms.append(atom)
raise exc.CompilationFailure("Unknown node kind '%s'"
" encountered" % node_kind)
metadata = {}
walker = sc.ScopeWalker(self.compilation, node, names_only=True)
edge_deciders = {}
for previous_atom in execution_graph.predecessors(atom):
for prev_node in graph.predecessors_iter(node):
# If there is any link function that says if this connection
# is able to run (or should not) ensure we retain it and use
# it later as needed.
u_v_data = execution_graph.adj[previous_atom][atom]
u_v_decider = u_v_data.get(flow.LINK_DECIDER)
u_v_data = graph.adj[prev_node][node]
u_v_decider = u_v_data.get(LINK_DECIDER)
if u_v_decider is not None:
edge_deciders[previous_atom.name] = u_v_decider
edge_deciders[prev_node.name] = u_v_decider
metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler
metadata['edge_deciders'] = edge_deciders
self._atom_cache[atom.name] = metadata
self._atoms_by_kind['retry'] = all_retry_atoms
self._atoms_by_kind['task'] = all_task_atoms
self._atom_cache[node.name] = metadata
@property
def compilation(self):
@@ -162,15 +162,6 @@ class Runtime(object):
metadata = self._atom_cache[atom.name]
return metadata['edge_deciders']
def fetch_atoms_by_kind(self, kind):
"""Fetches all the atoms of a given kind.
NOTE(harlowja): Currently only ``task`` or ``retry`` are valid
kinds of atoms (requesting other kinds will just
return empty lists).
"""
return self._atoms_by_kind.get(kind, [])
def fetch_scheduler(self, atom):
"""Fetches the cached specific scheduler for the given atom."""
# This does not check if the name exists (since this is only used
@@ -197,7 +188,7 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
for atom in atoms:
@@ -213,7 +204,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
return self.reset_nodes(self.analyzer.iterate_all_nodes(),
return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
@@ -221,8 +212,9 @@ class Runtime(object):
The subgraph is contained of all of the atoms successors.
"""
return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
state=state, intention=intention)
return self.reset_atoms(
self.analyzer.iterate_connected_atoms(atom),
state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.

View File

@@ -14,14 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import atom as atom_type
from taskflow import flow as flow_type
from taskflow.engines.action_engine import compiler as co
from taskflow import logging
LOG = logging.getLogger(__name__)
def _extract_atoms_iter(node, idx=-1):
def _depth_first_reverse_iterate(node, idx=-1):
"""Iterates connected (in reverse) nodes in tree (from starting node)."""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if idx == -1:
@@ -29,15 +29,17 @@ def _extract_atoms_iter(node, idx=-1):
else:
children_iter = reversed(node[0:idx])
for child in children_iter:
if isinstance(child.item, flow_type.Flow):
for atom in _extract_atoms_iter(child):
child_kind = child.metadata['kind']
if child_kind == co.FLOW:
# Jump through these...
#
# TODO(harlowja): make this non-recursive and remove this
# style of doing this when
# https://review.openstack.org/#/c/205731/ merges...
for atom in _depth_first_reverse_iterate(child):
yield atom
elif isinstance(child.item, atom_type.Atom):
yield child.item
else:
raise TypeError(
"Unknown extraction item '%s' (%s)" % (child.item,
type(child.item)))
yield child.item
class ScopeWalker(object):
@@ -57,13 +59,10 @@ class ScopeWalker(object):
" hierarchy" % atom)
self._level_cache = {}
self._atom = atom
self._graph = compilation.execution_graph
self._execution_graph = compilation.execution_graph
self._names_only = names_only
self._predecessors = None
#: Function that extracts the *associated* atoms of a given tree node.
_extract_atoms_iter = staticmethod(_extract_atoms_iter)
def __iter__(self):
"""Iterates over the visible scopes.
@@ -99,10 +98,14 @@ class ScopeWalker(object):
nodes (aka we have reached the top of the tree) or we run out of
predecessors.
"""
graph = self._execution_graph
if self._predecessors is None:
pred_iter = self._graph.bfs_predecessors_iter(self._atom)
self._predecessors = set(pred_iter)
predecessors = self._predecessors.copy()
predecessors = set(
node for node in graph.bfs_predecessors_iter(self._atom)
if graph.node[node]['kind'] in co.ATOMS)
self._predecessors = predecessors.copy()
else:
predecessors = self._predecessors.copy()
last = self._node
for lvl, parent in enumerate(self._node.path_iter(include_self=False)):
if not predecessors:
@@ -114,7 +117,7 @@ class ScopeWalker(object):
except KeyError:
visible = []
removals = set()
for atom in self._extract_atoms_iter(parent, idx=last_idx):
for atom in _depth_first_reverse_iterate(parent, idx=last_idx):
if atom in predecessors:
predecessors.remove(atom)
removals.add(atom)

View File

@@ -16,6 +16,7 @@
import functools
from taskflow.engines.action_engine import compiler
from taskflow import exceptions as exc
from taskflow import states
from taskflow.types import tree
@@ -45,7 +46,8 @@ def _fetch_predecessor_tree(graph, atom):
while stack:
parent, node = stack.pop()
for pred_node in graph.predecessors_iter(node):
child = tree.Node(pred_node)
child = tree.Node(pred_node,
**graph.node[pred_node])
parent.add(child)
stack.append((child, pred_node))
seen.add(pred_node)
@@ -62,8 +64,13 @@ class FailureFormatter(object):
def __init__(self, engine, hide_inputs_outputs_of=()):
self._hide_inputs_outputs_of = hide_inputs_outputs_of
self._engine = engine
self._formatter_funcs = {
compiler.FLOW: self._format_flow,
}
for kind in compiler.ATOMS:
self._formatter_funcs[kind] = self._format_atom
def _format_node(self, storage, cache, node):
def _format_atom(self, storage, cache, node):
"""Formats a single tree node (atom) into a string version."""
atom = node.item
atom_name = atom.name
@@ -101,6 +108,16 @@ class FailureFormatter(object):
else:
return "Atom '%s'" % (atom_name)
def _format_flow(self, storage, cache, node):
"""Formats a single tree node (flow) into a string version."""
flow = node.item
return flow.name
def _format_node(self, storage, cache, node):
"""Formats a single tree node into a string version."""
formatter_func = self. _formatter_funcs[node.metadata['kind']]
return formatter_func(storage, cache, node)
def format(self, fail, atom_matcher):
"""Returns a (exc_info, details) tuple about the failure.

View File

@@ -37,18 +37,19 @@ class BuildersTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
store = storage.Storage(flow_detail)
# This ensures the tasks exist in storage...
for task in compilation.execution_graph:
store.ensure_atom(task)
nodes_iter = compilation.execution_graph.nodes_iter(data=True)
for node, node_attrs in nodes_iter:
if node_attrs['kind'] in ('task', 'retry'):
store.ensure_atom(node)
if initial_state:
store.set_flow_state(initial_state)
task_notifier = notifier.Notifier()
atom_notifier = notifier.Notifier()
task_executor = executor.SerialTaskExecutor()
retry_executor = executor.SerialRetryExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
r = runtime.Runtime(compilation, store,
task_notifier, task_executor,
atom_notifier, task_executor,
retry_executor)
r.compile()
return r
@@ -305,6 +306,6 @@ class BuildersTest(test.TestCase):
self.assertEqual(1, occurrences.get((builder.GAME_OVER, st.SUCCESS)))
self.assertEqual(1, occurrences.get((builder.UNDEFINED, st.RESUMING)))
self.assertEqual(0, len(memory.next_nodes))
self.assertEqual(0, len(memory.next_up))
self.assertEqual(0, len(memory.not_done))
self.assertEqual(0, len(memory.failures))

View File

@@ -49,21 +49,22 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
sflo = lf.Flow("sub-test")
sflo.add(d)
flo.add(sflo)
inner_flo = lf.Flow("sub-test")
inner_flo.add(d)
flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(6, len(g))
order = g.topological_sort()
self.assertEqual([a, b, c, d], order)
self.assertTrue(g.has_edge(c, d))
self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
self.assertEqual([flo, a, b, c, inner_flo, d], order)
self.assertTrue(g.has_edge(c, inner_flo))
self.assertTrue(g.has_edge(inner_flo, d))
self.assertEqual(g.get_edge_data(inner_flo, d), {'invariant': True})
self.assertEqual([d], list(g.no_successors_iter()))
self.assertEqual([a], list(g.no_predecessors_iter()))
self.assertEqual([flo], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = test_utils.make_many(3)
@@ -79,36 +80,42 @@ class PatternCompileTest(test.TestCase):
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
])
self.assertEqual(set([a, b, c, d]),
set(g.no_successors_iter()))
self.assertEqual(set([a, b, c, d]),
self.assertEqual(set([flo]),
set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
flo2 = uf.Flow("test2")
flo2.add(c, d)
flo.add(flo2)
inner_flo = uf.Flow("test2")
inner_flo.add(c, d)
flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
graph = compilation.execution_graph
self.assertEqual(6, len(graph))
lb = g.subgraph([a, b])
lb = graph.subgraph([a, b])
self.assertFalse(lb.has_edge(b, a))
self.assertTrue(lb.has_edge(a, b))
self.assertEqual(g.get_edge_data(a, b), {'invariant': True})
self.assertEqual(graph.get_edge_data(a, b), {'invariant': True})
ub = g.subgraph([c, d])
ub = graph.subgraph([c, d])
self.assertEqual(0, ub.number_of_edges())
# This ensures that c and d do not start executing until after b.
self.assertTrue(g.has_edge(b, c))
self.assertTrue(g.has_edge(b, d))
self.assertTrue(graph.has_edge(b, inner_flo))
self.assertTrue(graph.has_edge(inner_flo, c))
self.assertTrue(graph.has_edge(inner_flo, d))
def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4)
@@ -120,34 +127,30 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
for n in [a, b]:
self.assertFalse(g.has_edge(n, c))
self.assertFalse(g.has_edge(n, d))
self.assertFalse(g.has_edge(d, c))
self.assertTrue(g.has_edge(c, d))
self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
ub = g.subgraph([a, b])
self.assertEqual(0, ub.number_of_edges())
lb = g.subgraph([c, d])
self.assertEqual(1, lb.number_of_edges())
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
(flo, a),
(flo, b),
(flo, flo2),
(flo2, c),
(c, d)
])
def test_unordered_nested_in_linear(self):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow('lt').add(
a,
uf.Flow('ut').add(b, c),
d)
inner_flo = uf.Flow('ut').add(b, c)
flo = lf.Flow('lt').add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
(a, b),
(a, c),
(flo, a),
(a, inner_flo),
(inner_flo, b),
(inner_flo, c),
(b, d),
(c, d)
(c, d),
])
def test_graph(self):
@@ -157,8 +160,8 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(5, len(g))
self.assertEqual(4, g.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7)
@@ -171,10 +174,17 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertEqual(7, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(e, f, {'invariant': True}),
(f, g, {'invariant': True})
self.assertEqual(9, len(graph))
self.assertItemsEqual(graph.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
(flo, flo2),
(flo2, e),
(e, f),
(f, g),
])
def test_graph_nested_graph(self):
@@ -187,9 +197,19 @@ class PatternCompileTest(test.TestCase):
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(7, len(g))
self.assertEqual(0, g.number_of_edges())
graph = compilation.execution_graph
self.assertEqual(9, len(graph))
self.assertItemsEqual(graph.edges(), [
(flo, a),
(flo, b),
(flo, c),
(flo, d),
(flo, flo2),
(flo2, e),
(flo2, f),
(flo2, g),
])
def test_graph_links(self):
a, b, c, d = test_utils.make_many(4)
@@ -201,13 +221,15 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}),
(a, b, {'manual': True}),
(b, c, {'manual': True}),
(c, d, {'manual': True}),
])
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
def test_graph_dependencies(self):
@@ -217,96 +239,112 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, a, {'invariant': True}),
(a, b, {'reasons': set(['x'])})
])
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
flo = gf.Flow("test").add(
a,
lf.Flow("test2").add(b, c)
)
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, c, {'reasons': set(['x'])}),
(b, c, {'invariant': True})
graph = compilation.execution_graph
self.assertEqual(5, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(flo, a, {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(a, inner_flo, {'reasons': set(['x'])}),
(b, c, {'invariant': True}),
])
self.assertItemsEqual([a, b], g.no_predecessors_iter())
self.assertItemsEqual([c], g.no_successors_iter())
self.assertItemsEqual([flo], graph.no_predecessors_iter())
self.assertItemsEqual([c], graph.no_successors_iter())
def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = test_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
flo = gf.Flow("test").add(
a,
lf.Flow("test2").add(b, c)
)
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
graph = compilation.execution_graph
self.assertEqual(5, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(flo, inner_flo, {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(b, a, {'reasons': set(['x'])})
(c, a, {'reasons': set(['x'])}),
])
self.assertItemsEqual([b], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
self.assertItemsEqual([flo], graph.no_predecessors_iter())
self.assertItemsEqual([a], graph.no_successors_iter())
def test_empty_flow_in_linear_flow(self):
flow = lf.Flow('lf')
flo = lf.Flow('lf')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
empty_flow = gf.Flow("empty")
flow.add(a, empty_flow, b)
empty_flo = gf.Flow("empty")
flo.add(a, empty_flo, b)
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'invariant': True}),
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertItemsEqual(graph.edges(), [
(flo, a),
(a, empty_flo),
(empty_flo, b),
])
def test_many_empty_in_graph_flow(self):
flow = gf.Flow('root')
flo = gf.Flow('root')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
flow.add(a)
flo.add(a)
b = lf.Flow('b')
b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[])
b_1 = lf.Flow('b.1')
b_2 = lf.Flow('b.2')
b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[])
b.add(
b_0,
lf.Flow('b.1'), lf.Flow('b.2'),
b_3,
)
flow.add(b)
b.add(b_0, b_1, b_2, b_3)
flo.add(b)
c = lf.Flow('c')
c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2'))
flow.add(c)
c_0 = lf.Flow('c.0')
c_1 = lf.Flow('c.1')
c_2 = lf.Flow('c.2')
c.add(c_0, c_1, c_2)
flo.add(c)
d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[])
flow.add(d)
flo.add(d)
flow.link(b, d)
flow.link(a, d)
flow.link(c, d)
flo.link(b, d)
flo.link(a, d)
flo.link(c, d)
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertTrue(g.has_edge(b_0, b_3))
self.assertTrue(g.has_edge(b_3, d))
self.assertEqual(4, len(g))
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
self.assertTrue(graph.has_edge(flo, a))
self.assertTrue(graph.has_edge(flo, b))
self.assertTrue(graph.has_edge(b_0, b_1))
self.assertTrue(graph.has_edge(b_1, b_2))
self.assertTrue(graph.has_edge(b_2, b_3))
self.assertTrue(graph.has_edge(flo, c))
self.assertTrue(graph.has_edge(c_0, c_1))
self.assertTrue(graph.has_edge(c_1, c_2))
self.assertTrue(graph.has_edge(b_3, d))
self.assertEqual(12, len(graph))
def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf')
@@ -323,9 +361,10 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertTrue(g.has_edge(a, c))
self.assertTrue(g.has_edge(c, d))
self.assertTrue(g.has_edge(d, b))
for source, target in [(flow, a), (a, flow2),
(flow2, c), (c, empty_flow),
(empty_flow, d), (d, b)]:
self.assertTrue(g.has_edge(source, target))
def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf')
@@ -336,19 +375,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertTrue(g.has_edge(a, b))
def test_empty_flow_in_graph_flow_empty_linkage(self):
flow = gf.Flow('lf')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
empty_flow = lf.Flow("empty")
flow.add(a, empty_flow, b)
flow.link(empty_flow, b)
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertEqual(0, len(g.edges()))
self.assertTrue(g.has_edge(flow, a))
self.assertTrue(g.has_edge(a, empty_flow))
self.assertTrue(g.has_edge(empty_flow, b))
def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf')
@@ -360,8 +389,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
self.assertEqual(1, len(g.edges()))
self.assertTrue(g.has_edge(a, b))
self.assertTrue(g.has_edge(flow, a))
self.assertTrue(g.has_edge(flow, empty_flow))
def test_checks_for_dups(self):
flo = gf.Flow("test").add(
@@ -384,36 +414,39 @@ class PatternCompileTest(test.TestCase):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(2, len(g))
self.assertEqual(1, g.number_of_edges())
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
inner_flo = lf.Flow("test2", c2)
flo = lf.Flow("test", c1).add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, c2, {'retry': True})
(flo, c1, {'invariant': True}),
(c1, inner_flo, {'invariant': True, 'retry': True}),
(inner_flo, c2, {'invariant': True}),
])
self.assertIs(c1, g.node[c2]['retry'])
self.assertItemsEqual([c1], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
def test_retry_in_linear_flow_with_tasks(self):
@@ -423,13 +456,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(flo, c, {'invariant': True}),
(a, b, {'invariant': True}),
(c, a, {'retry': True})
(c, a, {'invariant': True, 'retry': True})
])
self.assertItemsEqual([c], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -441,13 +475,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c, a, {'retry': True}),
(c, b, {'retry': True})
(flo, c, {'invariant': True}),
(c, a, {'invariant': True, 'retry': True}),
(c, b, {'invariant': True, 'retry': True}),
])
self.assertItemsEqual([c], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -458,15 +493,16 @@ class PatternCompileTest(test.TestCase):
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(r, a, {'retry': True}),
(r, b, {'retry': True}),
(flo, r, {'invariant': True}),
(r, a, {'invariant': True, 'retry': True}),
(r, b, {'invariant': True, 'retry': True}),
(b, c, {'manual': True})
])
self.assertItemsEqual([r], g.no_predecessors_iter())
self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
self.assertIs(r, g.node[a]['retry'])
self.assertIs(r, g.node[b]['retry'])
@@ -476,18 +512,18 @@ class PatternCompileTest(test.TestCase):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test", c2).add(b, c),
d)
inner_flo = lf.Flow("test", c2).add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(6, len(g))
self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),
(a, c2, {'invariant': True}),
(c2, b, {'retry': True}),
(flo, c1, {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}),
(inner_flo, c2, {'invariant': True}),
(c2, b, {'invariant': True, 'retry': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])
@@ -501,17 +537,17 @@ class PatternCompileTest(test.TestCase):
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test").add(b, c),
d)
inner_flo = lf.Flow("test").add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
self.assertEqual(5, len(g))
self.assertEqual(7, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),
(a, b, {'invariant': True}),
(flo, c1, {'invariant': True}),
(c1, a, {'invariant': True, 'retry': True}),
(a, inner_flo, {'invariant': True}),
(inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])

View File

@@ -28,8 +28,11 @@ def _common_format(g, edge_notation):
lines.append("Frozen: %s" % nx.is_frozen(g))
lines.append("Density: %0.3f" % nx.density(g))
lines.append("Nodes: %s" % g.number_of_nodes())
for n in g.nodes_iter():
lines.append(" - %s" % n)
for n, n_data in g.nodes_iter(data=True):
if n_data:
lines.append(" - %s (%s)" % (n, n_data))
else:
lines.append(" - %s" % n)
lines.append("Edges: %s" % g.number_of_edges())
for (u, v, e_data) in g.edges_iter(data=True):
if e_data:

View File

@@ -16,12 +16,25 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
def count(it):
"""Returns how many values in the iterator (depletes the iterator)."""
return sum(1 for _value in it)
def unique_seen(it, *its):
"""Yields unique values from iterator(s) (and retains order)."""
seen = set()
for value in itertools.chain(it, *its):
if value in seen:
continue
else:
yield value
seen.add(value)
def find_first_match(it, matcher, not_found_value=None):
"""Searches iterator for first value that matcher callback returns true."""
for value in it: