Depends-On: I442249783da4a6ae10c78b95e0b279409c95d2e6 Change-Id: I877928c858e8d6176d3f01ad9de2765104acf5c3
		
			
				
	
	
		
			439 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			439 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
import collections
 | 
						|
import threading
 | 
						|
 | 
						|
import fasteners
 | 
						|
 | 
						|
from taskflow import exceptions as exc
 | 
						|
from taskflow import flow
 | 
						|
from taskflow import logging
 | 
						|
from taskflow import task
 | 
						|
from taskflow.types import graph as gr
 | 
						|
from taskflow.types import tree as tr
 | 
						|
from taskflow.utils import misc
 | 
						|
 | 
						|
LOG = logging.getLogger(__name__)
 | 
						|
 | 
						|
_RETRY_EDGE_DATA = {
 | 
						|
    flow.LINK_RETRY: True,
 | 
						|
}
 | 
						|
_EDGE_INVARIANTS = (flow.LINK_INVARIANT, flow.LINK_MANUAL, flow.LINK_RETRY)
 | 
						|
_EDGE_REASONS = flow.LINK_REASONS
 | 
						|
 | 
						|
 | 
						|
class Compilation(object):
 | 
						|
    """The result of a compilers compile() is this *immutable* object."""
 | 
						|
 | 
						|
    def __init__(self, execution_graph, hierarchy):
 | 
						|
        self._execution_graph = execution_graph
 | 
						|
        self._hierarchy = hierarchy
 | 
						|
 | 
						|
    @property
 | 
						|
    def execution_graph(self):
 | 
						|
        """The execution ordering of atoms (as a graph structure)."""
 | 
						|
        return self._execution_graph
 | 
						|
 | 
						|
    @property
 | 
						|
    def hierarchy(self):
 | 
						|
        """The hierachy of patterns (as a tree structure)."""
 | 
						|
        return self._hierarchy
 | 
						|
 | 
						|
 | 
						|
def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
 | 
						|
    """Adds/updates edges from nodes to other nodes in the specified graph.
 | 
						|
 | 
						|
    It will connect the 'nodes_from' to the 'nodes_to' if an edge currently
 | 
						|
    does *not* exist (if it does already exist then the edges attributes
 | 
						|
    are just updated instead). When an edge is created the provided edge
 | 
						|
    attributes dictionary will be applied to the new edge between these two
 | 
						|
    nodes.
 | 
						|
    """
 | 
						|
    # NOTE(harlowja): give each edge its own attr copy so that if it's
 | 
						|
    # later modified that the same copy isn't modified...
 | 
						|
    for u in nodes_from:
 | 
						|
        for v in nodes_to:
 | 
						|
            if not graph.has_edge(u, v):
 | 
						|
                if attr_dict:
 | 
						|
                    graph.add_edge(u, v, attr_dict=attr_dict.copy())
 | 
						|
                else:
 | 
						|
                    graph.add_edge(u, v)
 | 
						|
            else:
 | 
						|
                # Just update the attr_dict (if any).
 | 
						|
                if attr_dict:
 | 
						|
                    graph.add_edge(u, v, attr_dict=attr_dict.copy())
 | 
						|
 | 
						|
 | 
						|
class Linker(object):
 | 
						|
    """Compiler helper that adds pattern(s) constraints onto a graph."""
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _is_not_empty(graph):
 | 
						|
        # Returns true if the given graph is *not* empty...
 | 
						|
        return graph.number_of_nodes() > 0
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _find_first_decomposed(node, priors,
 | 
						|
                               decomposed_members, decomposed_filter):
 | 
						|
        # How this works; traverse backwards and find only the predecessor
 | 
						|
        # items that are actually connected to this entity, and avoid any
 | 
						|
        # linkage that is not directly connected. This is guaranteed to be
 | 
						|
        # valid since we always iter_links() over predecessors before
 | 
						|
        # successors in all currently known patterns; a queue is used here
 | 
						|
        # since it is possible for a node to have 2+ different predecessors so
 | 
						|
        # we must search back through all of them in a reverse BFS order...
 | 
						|
        #
 | 
						|
        # Returns the first decomposed graph of those nodes (including the
 | 
						|
        # passed in node) that passes the provided filter
 | 
						|
        # function (returns none if none match).
 | 
						|
        frontier = collections.deque([node])
 | 
						|
        # NOTE(harowja): None is in this initial set since the first prior in
 | 
						|
        # the priors list has None as its predecessor (which we don't want to
 | 
						|
        # look for a decomposed member of).
 | 
						|
        visited = set([None])
 | 
						|
        while frontier:
 | 
						|
            node = frontier.popleft()
 | 
						|
            if node in visited:
 | 
						|
                continue
 | 
						|
            node_graph = decomposed_members[node]
 | 
						|
            if decomposed_filter(node_graph):
 | 
						|
                return node_graph
 | 
						|
            visited.add(node)
 | 
						|
            # TODO(harlowja): optimize this more to avoid searching through
 | 
						|
            # things already searched...
 | 
						|
            for (u, v) in reversed(priors):
 | 
						|
                if node == v:
 | 
						|
                    # Queue its predecessor to be searched in the future...
 | 
						|
                    frontier.append(u)
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def apply_constraints(self, graph, flow, decomposed_members):
 | 
						|
        # This list is used to track the links that have been previously
 | 
						|
        # iterated over, so that when we are trying to find a entry to
 | 
						|
        # connect to that we iterate backwards through this list, finding
 | 
						|
        # connected nodes to the current target (lets call it v) and find
 | 
						|
        # the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into
 | 
						|
        # a non-empty graph. We also retain all predecessors of v so that we
 | 
						|
        # can correctly locate u_n - 1 if u_n turns out to have decomposed into
 | 
						|
        # an empty graph (and so on).
 | 
						|
        priors = []
 | 
						|
        # NOTE(harlowja): u, v are flows/tasks (also graph terminology since
 | 
						|
        # we are compiling things down into a flattened graph), the meaning
 | 
						|
        # of this link iteration via iter_links() is that u -> v (with the
 | 
						|
        # provided dictionary attributes, if any).
 | 
						|
        for (u, v, attr_dict) in flow.iter_links():
 | 
						|
            if not priors:
 | 
						|
                priors.append((None, u))
 | 
						|
            v_g = decomposed_members[v]
 | 
						|
            if not v_g.number_of_nodes():
 | 
						|
                priors.append((u, v))
 | 
						|
                continue
 | 
						|
            invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS)
 | 
						|
            if not invariant:
 | 
						|
                # This is a symbol *only* dependency, connect
 | 
						|
                # corresponding providers and consumers to allow the consumer
 | 
						|
                # to be executed immediately after the provider finishes (this
 | 
						|
                # is an optimization for these types of dependencies...)
 | 
						|
                u_g = decomposed_members[u]
 | 
						|
                if not u_g.number_of_nodes():
 | 
						|
                    # This must always exist, but incase it somehow doesn't...
 | 
						|
                    raise exc.CompilationFailure(
 | 
						|
                        "Non-invariant link being created from '%s' ->"
 | 
						|
                        " '%s' even though the target '%s' was found to be"
 | 
						|
                        " decomposed into an empty graph" % (v, u, u))
 | 
						|
                for u in u_g.nodes_iter():
 | 
						|
                    for v in v_g.nodes_iter():
 | 
						|
                        # This is using the intersection() method vs the &
 | 
						|
                        # operator since the latter doesn't work with frozen
 | 
						|
                        # sets (when used in combination with ordered sets).
 | 
						|
                        #
 | 
						|
                        # If this is not done the following happens...
 | 
						|
                        #
 | 
						|
                        # TypeError: unsupported operand type(s)
 | 
						|
                        # for &: 'frozenset' and 'OrderedSet'
 | 
						|
                        depends_on = u.provides.intersection(v.requires)
 | 
						|
                        if depends_on:
 | 
						|
                            edge_attrs = {
 | 
						|
                                _EDGE_REASONS: frozenset(depends_on),
 | 
						|
                            }
 | 
						|
                            _add_update_edges(graph,
 | 
						|
                                              [u], [v],
 | 
						|
                                              attr_dict=edge_attrs)
 | 
						|
            else:
 | 
						|
                # Connect nodes with no predecessors in v to nodes with no
 | 
						|
                # successors in the *first* non-empty predecessor of v (thus
 | 
						|
                # maintaining the edge dependency).
 | 
						|
                match = self._find_first_decomposed(u, priors,
 | 
						|
                                                    decomposed_members,
 | 
						|
                                                    self._is_not_empty)
 | 
						|
                if match is not None:
 | 
						|
                    _add_update_edges(graph,
 | 
						|
                                      match.no_successors_iter(),
 | 
						|
                                      list(v_g.no_predecessors_iter()),
 | 
						|
                                      attr_dict=attr_dict)
 | 
						|
            priors.append((u, v))
 | 
						|
 | 
						|
 | 
						|
class _TaskCompiler(object):
 | 
						|
    """Non-recursive compiler of tasks."""
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def handles(obj):
 | 
						|
        return isinstance(obj, task.BaseTask)
 | 
						|
 | 
						|
    def compile(self, task, parent=None):
 | 
						|
        graph = gr.DiGraph(name=task.name)
 | 
						|
        graph.add_node(task)
 | 
						|
        node = tr.Node(task)
 | 
						|
        if parent is not None:
 | 
						|
            parent.add(node)
 | 
						|
        return graph, node
 | 
						|
 | 
						|
 | 
						|
class _FlowCompiler(object):
 | 
						|
    """Recursive compiler of flows."""
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def handles(obj):
 | 
						|
        return isinstance(obj, flow.Flow)
 | 
						|
 | 
						|
    def __init__(self, deep_compiler_func, linker):
 | 
						|
        self._deep_compiler_func = deep_compiler_func
 | 
						|
        self._linker = linker
 | 
						|
 | 
						|
    def _connect_retry(self, retry, graph):
 | 
						|
        graph.add_node(retry)
 | 
						|
 | 
						|
        # All nodes that have no predecessors should depend on this retry.
 | 
						|
        nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry]
 | 
						|
        if nodes_to:
 | 
						|
            _add_update_edges(graph, [retry], nodes_to,
 | 
						|
                              attr_dict=_RETRY_EDGE_DATA)
 | 
						|
 | 
						|
        # Add association for each node of graph that has no existing retry.
 | 
						|
        for n in graph.nodes_iter():
 | 
						|
            if n is not retry and flow.LINK_RETRY not in graph.node[n]:
 | 
						|
                graph.node[n][flow.LINK_RETRY] = retry
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _occurence_detector(to_graph, from_graph):
 | 
						|
        return sum(1 for node in from_graph.nodes_iter()
 | 
						|
                   if node in to_graph)
 | 
						|
 | 
						|
    def _decompose_flow(self, flow, parent=None):
 | 
						|
        """Decomposes a flow into a graph, tree node + decomposed subgraphs."""
 | 
						|
        graph = gr.DiGraph(name=flow.name)
 | 
						|
        node = tr.Node(flow)
 | 
						|
        if parent is not None:
 | 
						|
            parent.add(node)
 | 
						|
        if flow.retry is not None:
 | 
						|
            node.add(tr.Node(flow.retry))
 | 
						|
        decomposed_members = {}
 | 
						|
        for item in flow:
 | 
						|
            subgraph, _subnode = self._deep_compiler_func(item, parent=node)
 | 
						|
            decomposed_members[item] = subgraph
 | 
						|
            if subgraph.number_of_nodes():
 | 
						|
                graph = gr.merge_graphs(
 | 
						|
                    graph, subgraph,
 | 
						|
                    # We can specialize this to be simpler than the default
 | 
						|
                    # algorithm which creates overhead that we don't
 | 
						|
                    # need for our purposes...
 | 
						|
                    overlap_detector=self._occurence_detector)
 | 
						|
        return graph, node, decomposed_members
 | 
						|
 | 
						|
    def compile(self, flow, parent=None):
 | 
						|
        graph, node, decomposed_members = self._decompose_flow(flow,
 | 
						|
                                                               parent=parent)
 | 
						|
        self._linker.apply_constraints(graph, flow, decomposed_members)
 | 
						|
        if flow.retry is not None:
 | 
						|
            self._connect_retry(flow.retry, graph)
 | 
						|
        return graph, node
 | 
						|
 | 
						|
 | 
						|
class PatternCompiler(object):
 | 
						|
    """Compiles a flow pattern (or task) into a compilation unit.
 | 
						|
 | 
						|
    Let's dive into the basic idea for how this works:
 | 
						|
 | 
						|
    The compiler here is provided a 'root' object via its __init__ method,
 | 
						|
    this object could be a task, or a flow (one of the supported patterns),
 | 
						|
    the end-goal is to produce a :py:class:`.Compilation` object as the result
 | 
						|
    with the needed components. If this is not possible a
 | 
						|
    :py:class:`~.taskflow.exceptions.CompilationFailure` will be raised.
 | 
						|
    In the case where a **unknown** type is being requested to compile
 | 
						|
    a ``TypeError`` will be raised and when a duplicate object (one that
 | 
						|
    has **already** been compiled) is encountered a ``ValueError`` is raised.
 | 
						|
 | 
						|
    The complexity of this comes into play when the 'root' is a flow that
 | 
						|
    contains itself other nested flows (and so-on); to compile this object and
 | 
						|
    its contained objects into a graph that *preserves* the constraints the
 | 
						|
    pattern mandates we have to go through a recursive algorithm that creates
 | 
						|
    subgraphs for each nesting level, and then on the way back up through
 | 
						|
    the recursion (now with a decomposed mapping from contained patterns or
 | 
						|
    atoms to there corresponding subgraph) we have to then connect the
 | 
						|
    subgraphs (and the atom(s) there-in) that were decomposed for a pattern
 | 
						|
    correctly into a new graph (using a :py:class:`.Linker` object to ensure
 | 
						|
    the pattern mandated constraints are retained) and then return to the
 | 
						|
    caller (and they will do the same thing up until the root node, which by
 | 
						|
    that point one graph is created with all contained atoms in the
 | 
						|
    pattern/nested patterns mandated ordering).
 | 
						|
 | 
						|
    Also maintained in the :py:class:`.Compilation` object is a hierarchy of
 | 
						|
    the nesting of items (which is also built up during the above mentioned
 | 
						|
    recusion, via a much simpler algorithm); this is typically used later to
 | 
						|
    determine the prior atoms of a given atom when looking up values that can
 | 
						|
    be provided to that atom for execution (see the scopes.py file for how this
 | 
						|
    works). Note that although you *could* think that the graph itself could be
 | 
						|
    used for this, which in some ways it can (for limited usage) the hierarchy
 | 
						|
    retains the nested structure (which is useful for scoping analysis/lookup)
 | 
						|
    to be able to provide back a iterator that gives back the scopes visible
 | 
						|
    at each level (the graph does not have this information once flattened).
 | 
						|
 | 
						|
    Let's take an example:
 | 
						|
 | 
						|
    Given the pattern ``f(a(b, c), d)`` where ``f`` is a
 | 
						|
    :py:class:`~taskflow.patterns.linear_flow.Flow` with items ``a(b, c)``
 | 
						|
    where ``a`` is a :py:class:`~taskflow.patterns.linear_flow.Flow` composed
 | 
						|
    of tasks ``(b, c)`` and task ``d``.
 | 
						|
 | 
						|
    The algorithm that will be performed (mirroring the above described logic)
 | 
						|
    will go through the following steps (the tree hierachy building is left
 | 
						|
    out as that is more obvious)::
 | 
						|
 | 
						|
        Compiling f
 | 
						|
          - Decomposing flow f with no parent (must be the root)
 | 
						|
          - Compiling a
 | 
						|
              - Decomposing flow a with parent f
 | 
						|
              - Compiling b
 | 
						|
                  - Decomposing task b with parent a
 | 
						|
                  - Decomposed b into:
 | 
						|
                    Name: b
 | 
						|
                    Nodes: 1
 | 
						|
                      - b
 | 
						|
                    Edges: 0
 | 
						|
              - Compiling c
 | 
						|
                  - Decomposing task c with parent a
 | 
						|
                  - Decomposed c into:
 | 
						|
                    Name: c
 | 
						|
                    Nodes: 1
 | 
						|
                      - c
 | 
						|
                    Edges: 0
 | 
						|
              - Relinking decomposed b -> decomposed c
 | 
						|
              - Decomposed a into:
 | 
						|
                Name: a
 | 
						|
                Nodes: 2
 | 
						|
                  - b
 | 
						|
                  - c
 | 
						|
                Edges: 1
 | 
						|
                  b -> c ({'invariant': True})
 | 
						|
          - Compiling d
 | 
						|
              - Decomposing task d with parent f
 | 
						|
              - Decomposed d into:
 | 
						|
                Name: d
 | 
						|
                Nodes: 1
 | 
						|
                  - d
 | 
						|
                Edges: 0
 | 
						|
          - Relinking decomposed a -> decomposed d
 | 
						|
          - Decomposed f into:
 | 
						|
            Name: f
 | 
						|
            Nodes: 3
 | 
						|
              - c
 | 
						|
              - b
 | 
						|
              - d
 | 
						|
            Edges: 2
 | 
						|
              c -> d ({'invariant': True})
 | 
						|
              b -> c ({'invariant': True})
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, root, freeze=True):
 | 
						|
        self._root = root
 | 
						|
        self._history = set()
 | 
						|
        self._linker = Linker()
 | 
						|
        self._freeze = freeze
 | 
						|
        self._lock = threading.Lock()
 | 
						|
        self._compilation = None
 | 
						|
        self._matchers = [
 | 
						|
            _FlowCompiler(self._compile, self._linker),
 | 
						|
            _TaskCompiler(),
 | 
						|
        ]
 | 
						|
 | 
						|
    def _compile(self, item, parent=None):
 | 
						|
        """Compiles a item (pattern, task) into a graph + tree node."""
 | 
						|
        for m in self._matchers:
 | 
						|
            if m.handles(item):
 | 
						|
                self._pre_item_compile(item)
 | 
						|
                graph, node = m.compile(item, parent=parent)
 | 
						|
                self._post_item_compile(item, graph, node)
 | 
						|
                return graph, node
 | 
						|
        else:
 | 
						|
            raise TypeError("Unknown object '%s' (%s) requested to compile"
 | 
						|
                            % (item, type(item)))
 | 
						|
 | 
						|
    def _pre_item_compile(self, item):
 | 
						|
        """Called before a item is compiled; any pre-compilation actions."""
 | 
						|
        if item in self._history:
 | 
						|
            raise ValueError("Already compiled item '%s' (%s), duplicate"
 | 
						|
                             " and/or recursive compiling is not"
 | 
						|
                             " supported" % (item, type(item)))
 | 
						|
        self._history.add(item)
 | 
						|
 | 
						|
    def _post_item_compile(self, item, graph, node):
 | 
						|
        """Called after a item is compiled; doing post-compilation actions."""
 | 
						|
 | 
						|
    def _pre_compile(self):
 | 
						|
        """Called before the compilation of the root starts."""
 | 
						|
        self._history.clear()
 | 
						|
 | 
						|
    def _post_compile(self, graph, node):
 | 
						|
        """Called after the compilation of the root finishes successfully."""
 | 
						|
        dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
 | 
						|
                                            key=lambda node: node.name)
 | 
						|
        if dup_names:
 | 
						|
            raise exc.Duplicate(
 | 
						|
                "Atoms with duplicate names found: %s" % (sorted(dup_names)))
 | 
						|
        if graph.number_of_nodes() == 0:
 | 
						|
            raise exc.Empty("Root container '%s' (%s) is empty"
 | 
						|
                            % (self._root, type(self._root)))
 | 
						|
        self._history.clear()
 | 
						|
        # NOTE(harlowja): this one can be expensive to calculate (especially
 | 
						|
        # the cycle detection), so only do it if we know BLATHER is enabled
 | 
						|
        # and not under all cases.
 | 
						|
        if LOG.isEnabledFor(logging.BLATHER):
 | 
						|
            LOG.blather("Translated '%s'", self._root)
 | 
						|
            LOG.blather("Graph:")
 | 
						|
            for line in graph.pformat().splitlines():
 | 
						|
                # Indent it so that it's slightly offset from the above line.
 | 
						|
                LOG.blather("  %s", line)
 | 
						|
            LOG.blather("Hierarchy:")
 | 
						|
            for line in node.pformat().splitlines():
 | 
						|
                # Indent it so that it's slightly offset from the above line.
 | 
						|
                LOG.blather("  %s", line)
 | 
						|
 | 
						|
    @fasteners.locked
 | 
						|
    def compile(self):
 | 
						|
        """Compiles the contained item into a compiled equivalent."""
 | 
						|
        if self._compilation is None:
 | 
						|
            self._pre_compile()
 | 
						|
            graph, node = self._compile(self._root, parent=None)
 | 
						|
            self._post_compile(graph, node)
 | 
						|
            if self._freeze:
 | 
						|
                graph.freeze()
 | 
						|
                node.freeze()
 | 
						|
            self._compilation = Compilation(graph, node)
 | 
						|
        return self._compilation
 |