diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py new file mode 100644 index 00000000..446ded95 --- /dev/null +++ b/taskflow/engines/action_engine/compiler.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from taskflow import exceptions as exc +from taskflow.utils import flow_utils + +# The result of a compilers compile() is this tuple (for now it is just a +# execution graph but in the future it may grow to include more attributes +# that help the runtime units execute in a more optimal/featureful manner). +Compilation = collections.namedtuple("Compilation", ["execution_graph"]) + + +class PatternCompiler(object): + """Compiles patterns & atoms (potentially nested) into an compilation + unit with a *logically* equivalent directed acyclic graph representation. + + NOTE(harlowja): during this pattern translation process any nested flows + will be converted into there equivalent subgraphs. This currently implies + that contained atoms in those nested flows, post-translation will no longer + be associated with there previously containing flow but instead will lose + this identity and what will remain is the logical constraints that there + contained flow mandated. In the future this may be changed so that this + association is not lost via the compilation process (since it is sometime + useful to retain part of this relationship). + """ + def compile(self, root): + graph = flow_utils.flatten(root) + if graph.number_of_nodes() == 0: + # Try to get a name attribute, otherwise just use the object + # string representation directly if that attribute does not exist. + name = getattr(root, 'name', root) + raise exc.Empty("Root container '%s' (%s) is empty." + % (name, type(root))) + return Compilation(graph) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index eecba801..ae69aac2 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,6 +16,7 @@ import threading +from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import graph_analyzer @@ -29,7 +30,6 @@ from taskflow import retry from taskflow import states from taskflow import storage as t_storage -from taskflow.utils import flow_utils from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -53,6 +53,7 @@ class ActionEngine(base.EngineBase): _task_action_factory = task_action.TaskAction _task_executor_factory = executor.SerialTaskExecutor _retry_action_factory = retry_action.RetryAction + _compiler_factory = compiler.PatternCompiler def __init__(self, flow, flow_detail, backend, conf): super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) @@ -208,15 +209,18 @@ class ActionEngine(base.EngineBase): return self._task_action_factory(self.storage, self._task_executor, self.task_notifier) + @misc.cachedproperty + def _compiler(self): + return self._compiler_factory() + @lock_utils.locked def compile(self): if self._compiled: return - execution_graph = flow_utils.flatten(self._flow) - if execution_graph.number_of_nodes() == 0: - raise exc.Empty("Flow %s is empty." % self._flow.name) - self._analyzer = self._graph_analyzer_factory(execution_graph, - self.storage) + compilation = self._compiler.compile(self._flow) + if self._analyzer is None: + self._analyzer = self._graph_analyzer_factory( + compilation.execution_graph, self.storage) self._root = self._graph_action_factory(self._analyzer, self.storage, self._task_action,