Merge "Create and use a new compilation module"

This commit is contained in:
Jenkins
2014-05-29 04:10:42 +00:00
committed by Gerrit Code Review
2 changed files with 59 additions and 6 deletions

View File

@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from taskflow import exceptions as exc
from taskflow.utils import flow_utils
# The result of a compilers compile() is this tuple (for now it is just a
# execution graph but in the future it may grow to include more attributes
# that help the runtime units execute in a more optimal/featureful manner).
Compilation = collections.namedtuple("Compilation", ["execution_graph"])
class PatternCompiler(object):
"""Compiles patterns & atoms (potentially nested) into an compilation
unit with a *logically* equivalent directed acyclic graph representation.
NOTE(harlowja): during this pattern translation process any nested flows
will be converted into there equivalent subgraphs. This currently implies
that contained atoms in those nested flows, post-translation will no longer
be associated with there previously containing flow but instead will lose
this identity and what will remain is the logical constraints that there
contained flow mandated. In the future this may be changed so that this
association is not lost via the compilation process (since it is sometime
useful to retain part of this relationship).
"""
def compile(self, root):
graph = flow_utils.flatten(root)
if graph.number_of_nodes() == 0:
# Try to get a name attribute, otherwise just use the object
# string representation directly if that attribute does not exist.
name = getattr(root, 'name', root)
raise exc.Empty("Root container '%s' (%s) is empty."
% (name, type(root)))
return Compilation(graph)

View File

@@ -16,6 +16,7 @@
import threading
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import graph_analyzer
@@ -29,7 +30,6 @@ from taskflow import retry
from taskflow import states
from taskflow import storage as t_storage
from taskflow.utils import flow_utils
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -53,6 +53,7 @@ class ActionEngine(base.EngineBase):
_task_action_factory = task_action.TaskAction
_task_executor_factory = executor.SerialTaskExecutor
_retry_action_factory = retry_action.RetryAction
_compiler_factory = compiler.PatternCompiler
def __init__(self, flow, flow_detail, backend, conf):
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
@@ -211,15 +212,18 @@ class ActionEngine(base.EngineBase):
return self._task_action_factory(self.storage, self._task_executor,
self.task_notifier)
@misc.cachedproperty
def _compiler(self):
return self._compiler_factory()
@lock_utils.locked
def compile(self):
if self._compiled:
return
execution_graph = flow_utils.flatten(self._flow)
if execution_graph.number_of_nodes() == 0:
raise exc.Empty("Flow %s is empty." % self._flow.name)
self._analyzer = self._graph_analyzer_factory(execution_graph,
self.storage)
compilation = self._compiler.compile(self._flow)
if self._analyzer is None:
self._analyzer = self._graph_analyzer_factory(
compilation.execution_graph, self.storage)
self._root = self._graph_action_factory(self._analyzer,
self.storage,
self._task_action,