From 34cf55ac1c43e46421be316f342c6d04a6b5499c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 19 May 2014 17:06:54 -0700 Subject: [PATCH] Finish factoring apart the graph_action module Factor out the scheduling, running and completion components of graph_action so that we can allow this to be plugged in with other types of scheduling, running and completion strategies. The newly added components are the following: - A runtime container class (serves as a holder of some small utility functions) and all the other runtime components. - A runner class that acts as the action engines run loop. - A scheduler class that schedules nodes using a provided executor and returns futures that can be used to introspect there results as they complete. - A completer class that completes nodes and futures that the scheduler started, persisting there results and doing any further post-execution analysis. Part of blueprint plug-engine Change-Id: I1dbf46654377fc34e9d90eeabf7b0062020bdc5e --- doc/source/engines.rst | 61 +++-- taskflow/engines/action_engine/engine.py | 74 ++---- .../engines/action_engine/graph_action.py | 233 ---------------- taskflow/engines/action_engine/runner.py | 137 ++++++++++ taskflow/engines/action_engine/runtime.py | 250 ++++++++++++++++++ 5 files changed, 455 insertions(+), 300 deletions(-) delete mode 100644 taskflow/engines/action_engine/graph_action.py create mode 100644 taskflow/engines/action_engine/runner.py create mode 100644 taskflow/engines/action_engine/runtime.py diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 6aac42e0..03526491 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -204,15 +204,20 @@ Compiling --------- During this stage the flow will be converted into an internal graph -representation using a flow :py:func:`~taskflow.utils.flow_utils.flatten` -function. This function converts the flow objects and contained atoms into a +representation using a +:py:class:`~taskflow.engines.action_engine.compiler.Compiler` (the default +implementation for patterns is the +:py:class:`~taskflow.engines.action_engine.compiler.PatternCompiler`). This +class compiles/converts the flow objects and contained atoms into a `networkx`_ directed graph that contains the equivalent atoms defined in the flow and any nested flows & atoms as well as the constraints that are created by the application of the different flow patterns. This graph is then what will be analyzed & traversed during the engines execution. At this point a few helper object are also created and saved to internal engine variables (these object help in execution of atoms, analyzing the graph and performing other -internal engine activities). +internal engine activities). At the finishing of this stage a +:py:class:`~taskflow.engines.action_engine.runtime.Runtime` object is created +which contains references to all needed runtime components. Preparation ----------- @@ -231,7 +236,7 @@ Execution The graph (and helper objects) previously created are now used for guiding further execution. The flow is put into the ``RUNNING`` :doc:`state ` and a -:py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` +:py:class:`~taskflow.engines.action_engine.runner.Runner` implementation object starts to take over and begins going through the stages listed below (for a more visual diagram/representation see the :ref:`engine state diagram `). @@ -252,35 +257,45 @@ for things like retry atom which can influence what a tasks intention should be object which was designed to provide helper methods for this analysis). Once these intentions are determined and associated with each task (the intention is also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object) -the scheduling stage starts. +the :ref:`scheduling ` stage starts. + +.. _scheduling: Scheduling ^^^^^^^^^^ -This stage selects which atoms are eligible to run (looking at there intention, -checking if predecessor atoms have ran and so-on, again using the +This stage selects which atoms are eligible to run by using a +:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` implementation +(the default implementation looks at there intention, checking if predecessor +atoms have ran and so-on, using a :py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper -object) and submits those atoms to a previously provided compatible -`executor`_ for asynchronous execution. This executor will return a `future`_ -object for each atom submitted; all of which are collected into a list of not -done futures. This will end the initial round of scheduling and at this point -the engine enters the waiting stage. +object as needed) and submits those atoms to a previously provided compatible +`executor`_ for asynchronous execution. This +:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` will return a +`future`_ object for each atom scheduled; all of which are collected into a +list of not done futures. This will end the initial round of scheduling and at +this point the engine enters the :ref:`waiting ` stage. + +.. _waiting: Waiting ^^^^^^^ In this stage the engine waits for any of the future objects previously submitted to complete. Once one of the future objects completes (or fails) that -atoms result will be examined and persisted to the persistence backend (saved +atoms result will be examined and finalized using a +:py:class:`~taskflow.engines.action_engine.runtime.Completer` implementation. +It typically will persist results to a provided persistence backend (saved into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` -object) and the state of the atom is changed. At this point what happens falls -into two categories, one for if that atom failed and one for if it did not. If -the atom failed it may be set to a new intention such as ``RETRY`` or +and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect +the new state of the atom. At this point what typically happens falls into two +categories, one for if that atom failed and one for if it did not. If the atom +failed it may be set to a new intention such as ``RETRY`` or ``REVERT`` (other atoms that were predecessors of this failing atom may also have there intention altered). Once this intention adjustment has happened a -new round of scheduling occurs and this process repeats until the engine -succeeds or fails (if the process running the engine dies the above stages will -be restarted and resuming will occur). +new round of :ref:`scheduling ` occurs and this process repeats +until the engine succeeds or fails (if the process running the engine dies the +above stages will be restarted and resuming will occur). .. note:: @@ -293,7 +308,7 @@ Finishing --------- At this point the -:py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` has +:py:class:`~taskflow.engines.action_engine.runner.Runner` has now finished successfully, failed, or the execution was suspended. Depending on which one of these occurs will cause the flow to enter a new state (typically one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``). @@ -307,10 +322,12 @@ saved for this execution. Interfaces ========== -.. automodule:: taskflow.engines.base +.. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.engine -.. automodule:: taskflow.engines.action_engine.graph_action .. automodule:: taskflow.engines.action_engine.graph_analyzer +.. automodule:: taskflow.engines.action_engine.runner +.. automodule:: taskflow.engines.action_engine.runtime +.. automodule:: taskflow.engines.base Hierarchy ========= diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ef979168..e63aeb31 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -18,10 +18,8 @@ import threading from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import graph_action -from taskflow.engines.action_engine import graph_analyzer -from taskflow.engines.action_engine import retry_action -from taskflow.engines.action_engine import task_action +from taskflow.engines.action_engine import runner +from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc @@ -38,28 +36,27 @@ from taskflow.utils import reflection class ActionEngine(base.EngineBase): """Generic action-based engine. - This engine flattens the flow (and any subflows) into a execution graph + This engine compiles the flow (and any subflows) into a compilation unit which contains the full runtime definition to be executed and then uses - this graph in combination with the action classes & storage to attempt to - run your flow (and any subflows & contained tasks) to completion. + this compilation unit in combination with the executor, runtime, runner + and storage classes to attempt to run your flow (and any subflows & + contained atoms) to completion. - During this process it is permissible and valid to have a task or multiple - tasks in the execution graph fail, which will cause the process of - reversion to commence. See the valid states in the states module to learn - more about what other states the tasks & flow being ran can go through. + NOTE(harlowja): during this process it is permissible and valid to have a + task or multiple tasks in the execution graph fail (at the same time even), + which will cause the process of reversion or retrying to commence. See the + valid states in the states module to learn more about what other states + the tasks and flow being ran can go through. """ - _graph_action_factory = graph_action.FutureGraphAction - _graph_analyzer_factory = graph_analyzer.GraphAnalyzer - _task_action_factory = task_action.TaskAction - _task_executor_factory = executor.SerialTaskExecutor - _retry_action_factory = retry_action.RetryAction _compiler_factory = compiler.PatternCompiler + _task_executor_factory = executor.SerialTaskExecutor def __init__(self, flow, flow_detail, backend, conf): super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) - self._analyzer = None - self._root = None + self._runner = None + self._runtime = None self._compiled = False + self._compilation = None self._lock = threading.RLock() self._state_lock = threading.RLock() self._storage_ensured = False @@ -80,8 +77,8 @@ class ActionEngine(base.EngineBase): NOTE(harlowja): Only accessible after compilation has completed. """ g = None - if self._compiled and self._analyzer: - g = self._analyzer.execution_graph + if self._compiled: + g = self._compilation.execution_graph return g def run(self): @@ -119,7 +116,7 @@ class ActionEngine(base.EngineBase): state = None try: self._change_state(states.RUNNING) - for state in self._root.execute_iter(timeout=timeout): + for state in self._runner.run_iter(timeout=timeout): try: try_suspend = yield state except GeneratorExit: @@ -131,7 +128,7 @@ class ActionEngine(base.EngineBase): with excutils.save_and_reraise_exception(): self._change_state(states.FAILURE) else: - ignorable_states = getattr(self._root, 'ignorable_states', []) + ignorable_states = getattr(self._runner, 'ignorable_states', []) if state and state not in ignorable_states: self._change_state(state) if state != states.SUSPENDED and state != states.SUCCESS: @@ -162,12 +159,12 @@ class ActionEngine(base.EngineBase): old_state=old_state) self.notifier.notify(state, details) - def _ensure_storage_for(self, execution_graph): + def _ensure_storage(self): # NOTE(harlowja): signal to the tasks that exist that we are about to # resume, if they have a previous state, they will now transition to # a resuming state (and then to suspended). self._change_state(states.RESUMING) # does nothing in PENDING state - for node in execution_graph.nodes_iter(): + for node in self._compilation.execution_graph.nodes_iter(): version = misc.get_version_string(node) if isinstance(node, retry.Retry): self.storage.ensure_retry(node.name, version, node.save_as) @@ -175,7 +172,6 @@ class ActionEngine(base.EngineBase): self.storage.ensure_task(node.name, version, node.save_as) if node.inject: self.storage.inject_task_args(node.name, node.inject) - self._change_state(states.SUSPENDED) # does nothing in PENDING state @lock_utils.locked @@ -184,7 +180,7 @@ class ActionEngine(base.EngineBase): raise exc.InvalidState("Can not prepare an engine" " which has not been compiled") if not self._storage_ensured: - self._ensure_storage_for(self.execution_graph) + self._ensure_storage() self._storage_ensured = True # At this point we can check to ensure all dependencies are either # flow/task provided or storage provided, if there are still missing @@ -196,22 +192,13 @@ class ActionEngine(base.EngineBase): raise exc.MissingDependencies(self._flow, sorted(missing)) # Reset everything back to pending (if we were previously reverted). if self.storage.get_flow_state() == states.REVERTED: - self._root.reset_all() + self._runtime.reset_all() self._change_state(states.PENDING) - @misc.cachedproperty - def _retry_action(self): - return self._retry_action_factory(self.storage, self.task_notifier) - @misc.cachedproperty def _task_executor(self): return self._task_executor_factory() - @misc.cachedproperty - def _task_action(self): - return self._task_action_factory(self.storage, self._task_executor, - self.task_notifier) - @misc.cachedproperty def _compiler(self): return self._compiler_factory() @@ -220,16 +207,13 @@ class ActionEngine(base.EngineBase): def compile(self): if self._compiled: return - compilation = self._compiler.compile(self._flow) - if self._analyzer is None: - self._analyzer = self._graph_analyzer_factory( - compilation.execution_graph, self.storage) - self._root = self._graph_action_factory(self._analyzer, - self.storage, - self._task_action, - self._retry_action) + self._compilation = self._compiler.compile(self._flow) + self._runtime = runtime.Runtime(self._compilation, + self.storage, + self.task_notifier, + self._task_executor) + self._runner = runner.Runner(self._runtime, self._task_executor) self._compiled = True - return class SingleThreadedActionEngine(ActionEngine): diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py deleted file mode 100644 index c7bad3b7..00000000 --- a/taskflow/engines/action_engine/graph_action.py +++ /dev/null @@ -1,233 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from taskflow.engines.action_engine import executor as ex -from taskflow import exceptions as excp -from taskflow import retry as retry_atom -from taskflow import states as st -from taskflow import task as task_atom -from taskflow.utils import misc - - -_WAITING_TIMEOUT = 60 # in seconds - - -class FutureGraphAction(object): - """Graph action build around futures returned by task action. - - This graph action schedules all task it can for execution and than - waits on returned futures. If task executor is able to execute tasks - in parallel, this enables parallel flow run and reversion. - """ - - # Informational states this action yields while running, not useful to - # have the engine record but useful to provide to end-users when doing - # execution iterations. - ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) - - def __init__(self, analyzer, storage, task_action, retry_action): - self._analyzer = analyzer - self._storage = storage - self._task_action = task_action - self._retry_action = retry_action - - def is_running(self): - return self._storage.get_flow_state() == st.RUNNING - - def _schedule_node(self, node): - """Schedule a single node for execution.""" - if isinstance(node, task_atom.BaseTask): - return self._schedule_task(node) - elif isinstance(node, retry_atom.Retry): - return self._schedule_retry(node) - else: - raise TypeError("Unknown how to schedule node %s" % node) - - def _schedule(self, nodes): - """Schedule a group of nodes for execution.""" - futures = set() - for node in nodes: - try: - futures.add(self._schedule_node(node)) - except Exception: - # Immediately stop scheduling future work so that we can - # exit execution early (rather than later) if a single task - # fails to schedule correctly. - return (futures, [misc.Failure()]) - return (futures, []) - - def execute_iter(self, timeout=None): - if timeout is None: - timeout = _WAITING_TIMEOUT - - # Prepare flow to be resumed - yield st.RESUMING - next_nodes = self._prepare_flow_for_resume() - next_nodes.update(self._analyzer.get_next_nodes()) - - # Schedule nodes to be worked on - yield st.SCHEDULING - if self.is_running(): - not_done, failures = self._schedule(next_nodes) - else: - not_done, failures = (set(), []) - - # Run! - # - # At this point we need to ensure we wait for all active nodes to - # finish running (even if we are asked to suspend) since we can not - # preempt those tasks (maybe in the future we will be better able to do - # this). - while not_done: - yield st.WAITING - - # TODO(harlowja): maybe we should start doing 'yield from' this - # call sometime in the future, or equivalent that will work in - # py2 and py3. - done, not_done = self._task_action.wait_for_any(not_done, timeout) - - # Analyze the results and schedule more nodes (unless we had - # failures). If failures occurred just continue processing what - # is running (so that we don't leave it abandoned) but do not - # schedule anything new. - yield st.ANALYZING - next_nodes = set() - for future in done: - try: - node, event, result = future.result() - if isinstance(node, task_atom.BaseTask): - self._complete_task(node, event, result) - if isinstance(result, misc.Failure): - if event == ex.EXECUTED: - self._process_atom_failure(node, result) - else: - failures.append(result) - except Exception: - failures.append(misc.Failure()) - else: - try: - more_nodes = self._analyzer.get_next_nodes(node) - except Exception: - failures.append(misc.Failure()) - else: - next_nodes.update(more_nodes) - if next_nodes and not failures and self.is_running(): - yield st.SCHEDULING - # Recheck incase someone suspended it. - if self.is_running(): - more_not_done, failures = self._schedule(next_nodes) - not_done.update(more_not_done) - - if failures: - misc.Failure.reraise_if_any(failures) - if self._analyzer.get_next_nodes(): - yield st.SUSPENDED - elif self._analyzer.is_success(): - yield st.SUCCESS - else: - yield st.REVERTED - - def _schedule_task(self, task): - """Schedules the given task for revert or execute depending - on its intention. - """ - intention = self._storage.get_atom_intention(task.name) - if intention == st.EXECUTE: - return self._task_action.schedule_execution(task) - elif intention == st.REVERT: - return self._task_action.schedule_reversion(task) - else: - raise excp.ExecutionFailure("Unknown how to schedule task with" - " intention: %s" % intention) - - def _complete_task(self, task, event, result): - """Completes the given task, process task failure.""" - if event == ex.EXECUTED: - self._task_action.complete_execution(task, result) - else: - self._task_action.complete_reversion(task, result) - - def _schedule_retry(self, retry): - """Schedules the given retry for revert or execute depending - on its intention. - """ - intention = self._storage.get_atom_intention(retry.name) - if intention == st.EXECUTE: - return self._retry_action.execute(retry) - elif intention == st.REVERT: - return self._retry_action.revert(retry) - elif intention == st.RETRY: - self._retry_action.change_state(retry, st.RETRYING) - self._retry_subflow(retry) - return self._retry_action.execute(retry) - else: - raise excp.ExecutionFailure("Unknown how to schedule retry with" - " intention: %s" % intention) - - def _process_atom_failure(self, atom, failure): - """On atom failure find its retry controller, ask for the action to - perform with failed subflow and set proper intention for subflow nodes. - """ - retry = self._analyzer.find_atom_retry(atom) - if retry: - # Ask retry controller what to do in case of failure - action = self._retry_action.on_failure(retry, atom, failure) - if action == retry_atom.RETRY: - # Prepare subflow for revert - self._storage.set_atom_intention(retry.name, st.RETRY) - for node in self._analyzer.iterate_subgraph(retry): - self._storage.set_atom_intention(node.name, st.REVERT) - elif action == retry_atom.REVERT: - # Ask parent checkpoint - self._process_atom_failure(retry, failure) - elif action == retry_atom.REVERT_ALL: - # Prepare all flow for revert - self._revert_all() - else: - self._revert_all() - - def _revert_all(self): - for node in self._analyzer.iterate_all_nodes(): - self._storage.set_atom_intention(node.name, st.REVERT) - - def _prepare_flow_for_resume(self): - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) == st.FAILURE: - self._process_atom_failure(node, self._storage.get(node.name)) - for retry in self._analyzer.iterate_retries(st.RETRYING): - self._retry_subflow(retry) - next_nodes = set() - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): - next_nodes.add(node) - return next_nodes - - def _reset_nodes(self, nodes_iter, intention=st.EXECUTE): - for node in nodes_iter: - if isinstance(node, task_atom.BaseTask): - self._task_action.change_state(node, st.PENDING, progress=0.0) - elif isinstance(node, retry_atom.Retry): - self._retry_action.change_state(node, st.PENDING) - else: - raise TypeError("Unknown how to reset node %s" % node) - self._storage.set_atom_intention(node.name, intention) - - def reset_all(self): - self._reset_nodes(self._analyzer.iterate_all_nodes()) - - def _retry_subflow(self, retry): - self._storage.set_atom_intention(retry.name, st.EXECUTE) - self._reset_nodes(self._analyzer.iterate_subgraph(retry)) diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py new file mode 100644 index 00000000..dc8c1003 --- /dev/null +++ b/taskflow/engines/action_engine/runner.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import states as st +from taskflow.utils import misc + + +_WAITING_TIMEOUT = 60 # in seconds + + +class Runner(object): + """Runner that iterates while executing nodes using the given runtime. + + This runner acts as the action engine run loop, it resumes the workflow, + schedules all task it can for execution using the runtimes scheduler and + analyzer components, and than waits on returned futures and then activates + the runtimes completion component to finish up those tasks. + + This process repeats until the analzyer runs out of next nodes, when the + scheduler can no longer schedule tasks or when the the engine has been + suspended or a task has failed and that failure could not be resolved. + + NOTE(harlowja): If the runtimes scheduler component is able to schedule + tasks in parallel, this enables parallel running and/or reversion. + """ + + # Informational states this action yields while running, not useful to + # have the engine record but useful to provide to end-users when doing + # execution iterations. + ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) + + def __init__(self, runtime, waiter): + self._runtime = runtime + self._scheduler = runtime.scheduler + self._completer = runtime.completer + self._storage = runtime.storage + self._analyzer = runtime.graph_analyzer + self._waiter = waiter + + def is_running(self): + return self._storage.get_flow_state() == st.RUNNING + + def run_iter(self, timeout=None): + """Runs the nodes using the runtime components. + + NOTE(harlowja): the states that this generator will go through are: + + RESUMING -> SCHEDULING + SCHEDULING -> WAITING + WAITING -> ANALYZING + ANALYZING -> SCHEDULING + + Between any of these yielded states if the engine has been suspended + or the engine has failed (due to a non-resolveable task failure or + scheduling failure) the engine will stop executing new tasks (currently + running tasks will be allowed to complete) and this iteration loop + will be broken. + """ + if timeout is None: + timeout = _WAITING_TIMEOUT + + # Prepare flow to be resumed + yield st.RESUMING + next_nodes = self._completer.resume() + next_nodes.update(self._analyzer.get_next_nodes()) + + # Schedule nodes to be worked on + yield st.SCHEDULING + if self.is_running(): + not_done, failures = self._scheduler.schedule(next_nodes) + else: + not_done, failures = (set(), []) + + # Run! + # + # At this point we need to ensure we wait for all active nodes to + # finish running (even if we are asked to suspend) since we can not + # preempt those tasks (maybe in the future we will be better able to do + # this). + while not_done: + yield st.WAITING + + # TODO(harlowja): maybe we should start doing 'yield from' this + # call sometime in the future, or equivalent that will work in + # py2 and py3. + done, not_done = self._waiter.wait_for_any(not_done, timeout) + + # Analyze the results and schedule more nodes (unless we had + # failures). If failures occurred just continue processing what + # is running (so that we don't leave it abandoned) but do not + # schedule anything new. + yield st.ANALYZING + next_nodes = set() + for future in done: + try: + node, event, result = future.result() + retain = self._completer.complete(node, event, result) + if retain and isinstance(result, misc.Failure): + failures.append(result) + except Exception: + failures.append(misc.Failure()) + else: + try: + more_nodes = self._analyzer.get_next_nodes(node) + except Exception: + failures.append(misc.Failure()) + else: + next_nodes.update(more_nodes) + if next_nodes and not failures and self.is_running(): + yield st.SCHEDULING + # Recheck incase someone suspended it. + if self.is_running(): + more_not_done, failures = self._scheduler.schedule( + next_nodes) + not_done.update(more_not_done) + + if failures: + misc.Failure.reraise_if_any(failures) + if self._analyzer.get_next_nodes(): + yield st.SUSPENDED + elif self._analyzer.is_success(): + yield st.SUCCESS + else: + yield st.REVERTED diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py new file mode 100644 index 00000000..709ff78a --- /dev/null +++ b/taskflow/engines/action_engine/runtime.py @@ -0,0 +1,250 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as excp +from taskflow import retry as retry_atom +from taskflow import states as st +from taskflow import task as task_atom +from taskflow.utils import misc + +from taskflow.engines.action_engine import executor as ex +from taskflow.engines.action_engine import graph_analyzer as ga +from taskflow.engines.action_engine import retry_action as ra +from taskflow.engines.action_engine import task_action as ta + + +class Runtime(object): + """An object that contains various utility methods and properties that + represent the collection of runtime components and functionality needed + for an action engine to run to completion. + """ + + def __init__(self, compilation, storage, task_notifier, task_executor): + self._task_notifier = task_notifier + self._task_executor = task_executor + self._storage = storage + self._compilation = compilation + + @property + def compilation(self): + return self._compilation + + @property + def storage(self): + return self._storage + + @misc.cachedproperty + def graph_analyzer(self): + return ga.GraphAnalyzer(self._compilation.execution_graph, + self._storage) + + @misc.cachedproperty + def completer(self): + return Completer(self) + + @misc.cachedproperty + def scheduler(self): + return Scheduler(self) + + @misc.cachedproperty + def retry_action(self): + return ra.RetryAction(self.storage, self._task_notifier) + + @misc.cachedproperty + def task_action(self): + return ta.TaskAction(self.storage, self._task_executor, + self._task_notifier) + + def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE): + for node in nodes: + if state: + if isinstance(node, task_atom.BaseTask): + self.task_action.change_state(node, state, progress=0.0) + elif isinstance(node, retry_atom.Retry): + self.retry_action.change_state(node, state) + else: + raise TypeError("Unknown how to reset node %s, %s" + % (node, type(node))) + if intention: + self.storage.set_atom_intention(node.name, intention) + + def reset_all(self, state=st.PENDING, intention=st.EXECUTE): + self.reset_nodes(self.graph_analyzer.iterate_all_nodes(), + state=state, intention=intention) + + def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE): + self.reset_nodes(self.graph_analyzer.iterate_subgraph(node), + state=state, intention=intention) + + +# Various helper methods used by completer and scheduler. +def _retry_subflow(retry, runtime): + runtime.storage.set_atom_intention(retry.name, st.EXECUTE) + runtime.reset_subgraph(retry) + + +class Completer(object): + """Completes atoms using actions to complete them.""" + + def __init__(self, runtime): + self._analyzer = runtime.graph_analyzer + self._retry_action = runtime.retry_action + self._runtime = runtime + self._storage = runtime.storage + self._task_action = runtime.task_action + + def _complete_task(self, task, event, result): + """Completes the given task, processes task failure.""" + if event == ex.EXECUTED: + self._task_action.complete_execution(task, result) + else: + self._task_action.complete_reversion(task, result) + + def resume(self): + """Resumes nodes in the contained graph. + + This is done to allow any previously completed or failed nodes to + be analyzed, there results processed and any potential nodes affected + to be adjusted as needed. + + This should return a set of nodes which should be the initial set of + nodes that were previously not finished (due to a RUNNING or REVERTING + attempt not previously finishing). + """ + for node in self._analyzer.iterate_all_nodes(): + if self._analyzer.get_state(node) == st.FAILURE: + self._process_atom_failure(node, self._storage.get(node.name)) + for retry in self._analyzer.iterate_retries(st.RETRYING): + _retry_subflow(retry, self._runtime) + unfinished_nodes = set() + for node in self._analyzer.iterate_all_nodes(): + if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): + unfinished_nodes.add(node) + return unfinished_nodes + + def complete(self, node, event, result): + """Performs post-execution completion of a node. + + Returns whether the result should be saved into an accumulator of + failures or whether this should not be done. + """ + if isinstance(node, task_atom.BaseTask): + self._complete_task(node, event, result) + if isinstance(result, misc.Failure): + if event == ex.EXECUTED: + self._process_atom_failure(node, result) + else: + return True + return False + + def _process_atom_failure(self, atom, failure): + """On atom failure find its retry controller, ask for the action to + perform with failed subflow and set proper intention for subflow nodes. + """ + retry = self._analyzer.find_atom_retry(atom) + if retry: + # Ask retry controller what to do in case of failure + action = self._retry_action.on_failure(retry, atom, failure) + if action == retry_atom.RETRY: + # Prepare subflow for revert + self._storage.set_atom_intention(retry.name, st.RETRY) + self._runtime.reset_subgraph(retry, state=None, + intention=st.REVERT) + elif action == retry_atom.REVERT: + # Ask parent checkpoint + self._process_atom_failure(retry, failure) + elif action == retry_atom.REVERT_ALL: + # Prepare all flow for revert + self._revert_all() + else: + # Prepare all flow for revert + self._revert_all() + + def _revert_all(self): + """Attempts to set all nodes to the REVERT intention.""" + self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(), + state=None, intention=st.REVERT) + + +class Scheduler(object): + """Schedules atoms using actions to schedule.""" + + def __init__(self, runtime): + self._analyzer = runtime.graph_analyzer + self._retry_action = runtime.retry_action + self._runtime = runtime + self._storage = runtime.storage + self._task_action = runtime.task_action + + def _schedule_node(self, node): + """Schedule a single node for execution.""" + if isinstance(node, task_atom.BaseTask): + return self._schedule_task(node) + elif isinstance(node, retry_atom.Retry): + return self._schedule_retry(node) + else: + raise TypeError("Unknown how to schedule node %s, %s" + % (node, type(node))) + + def _schedule_retry(self, retry): + """Schedules the given retry for revert or execute depending + on its intention. + """ + intention = self._storage.get_atom_intention(retry.name) + if intention == st.EXECUTE: + return self._retry_action.execute(retry) + elif intention == st.REVERT: + return self._retry_action.revert(retry) + elif intention == st.RETRY: + self._retry_action.change_state(retry, st.RETRYING) + _retry_subflow(retry, self._runtime) + return self._retry_action.execute(retry) + else: + raise excp.ExecutionFailure("Unknown how to schedule retry with" + " intention: %s" % intention) + + def _schedule_task(self, task): + """Schedules the given task for revert or execute depending + on its intention. + """ + intention = self._storage.get_atom_intention(task.name) + if intention == st.EXECUTE: + return self._task_action.schedule_execution(task) + elif intention == st.REVERT: + return self._task_action.schedule_reversion(task) + else: + raise excp.ExecutionFailure("Unknown how to schedule task with" + " intention: %s" % intention) + + def schedule(self, nodes): + """Schedules the provided nodes for *future* completion. + + This method should schedule a future for each node provided and return + a set of those futures to be waited on (or used for other similar + purposes). It should also return any failure objects that represented + scheduling failures that may have occurred during this scheduling + process. + """ + futures = set() + for node in nodes: + try: + futures.add(self._schedule_node(node)) + except Exception: + # Immediately stop scheduling future work so that we can + # exit execution early (rather than later) if a single task + # fails to schedule correctly. + return (futures, [misc.Failure()]) + return (futures, [])