Finish factoring apart the graph_action module

Factor out the scheduling, running and completion components
of graph_action so that we can allow this to be plugged in with
other types of scheduling, running and completion strategies.

The newly added components are the following:

- A runtime container class (serves as a holder of some small
  utility functions) and all the other runtime components.
- A runner class that acts as the action engines run loop.
- A scheduler class that schedules nodes using a provided executor
  and returns futures that can be used to introspect there results as
  they complete.
- A completer class that completes nodes and futures that the
  scheduler started, persisting there results and doing any further
  post-execution analysis.

Part of blueprint plug-engine

Change-Id: I1dbf46654377fc34e9d90eeabf7b0062020bdc5e
This commit is contained in:
Joshua Harlow
2014-05-19 17:06:54 -07:00
parent 55c4267cc0
commit 7f525de0f9
5 changed files with 455 additions and 300 deletions

View File

@@ -204,15 +204,20 @@ Compiling
---------
During this stage the flow will be converted into an internal graph
representation using a flow :py:func:`~taskflow.utils.flow_utils.flatten`
function. This function converts the flow objects and contained atoms into a
representation using a
:py:class:`~taskflow.engines.action_engine.compiler.Compiler` (the default
implementation for patterns is the
:py:class:`~taskflow.engines.action_engine.compiler.PatternCompiler`). This
class compiles/converts the flow objects and contained atoms into a
`networkx`_ directed graph that contains the equivalent atoms defined in the
flow and any nested flows & atoms as well as the constraints that are created
by the application of the different flow patterns. This graph is then what will
be analyzed & traversed during the engines execution. At this point a few
helper object are also created and saved to internal engine variables (these
object help in execution of atoms, analyzing the graph and performing other
internal engine activities).
internal engine activities). At the finishing of this stage a
:py:class:`~taskflow.engines.action_engine.runtime.Runtime` object is created
which contains references to all needed runtime components.
Preparation
-----------
@@ -231,7 +236,7 @@ Execution
The graph (and helper objects) previously created are now used for guiding
further execution. The flow is put into the ``RUNNING`` :doc:`state <states>`
and a
:py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction`
:py:class:`~taskflow.engines.action_engine.runner.Runner` implementation
object starts to take over and begins going through the stages listed
below (for a more visual diagram/representation see
the :ref:`engine state diagram <engine states>`).
@@ -252,35 +257,45 @@ for things like retry atom which can influence what a tasks intention should be
object which was designed to provide helper methods for this analysis). Once
these intentions are determined and associated with each task (the intention is
also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object)
the scheduling stage starts.
the :ref:`scheduling <scheduling>` stage starts.
.. _scheduling:
Scheduling
^^^^^^^^^^
This stage selects which atoms are eligible to run (looking at there intention,
checking if predecessor atoms have ran and so-on, again using the
This stage selects which atoms are eligible to run by using a
:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` implementation
(the default implementation looks at there intention, checking if predecessor
atoms have ran and so-on, using a
:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper
object) and submits those atoms to a previously provided compatible
`executor`_ for asynchronous execution. This executor will return a `future`_
object for each atom submitted; all of which are collected into a list of not
done futures. This will end the initial round of scheduling and at this point
the engine enters the waiting stage.
object as needed) and submits those atoms to a previously provided compatible
`executor`_ for asynchronous execution. This
:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` will return a
`future`_ object for each atom scheduled; all of which are collected into a
list of not done futures. This will end the initial round of scheduling and at
this point the engine enters the :ref:`waiting <waiting>` stage.
.. _waiting:
Waiting
^^^^^^^
In this stage the engine waits for any of the future objects previously
submitted to complete. Once one of the future objects completes (or fails) that
atoms result will be examined and persisted to the persistence backend (saved
atoms result will be examined and finalized using a
:py:class:`~taskflow.engines.action_engine.runtime.Completer` implementation.
It typically will persist results to a provided persistence backend (saved
into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
object) and the state of the atom is changed. At this point what happens falls
into two categories, one for if that atom failed and one for if it did not. If
the atom failed it may be set to a new intention such as ``RETRY`` or
and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect
the new state of the atom. At this point what typically happens falls into two
categories, one for if that atom failed and one for if it did not. If the atom
failed it may be set to a new intention such as ``RETRY`` or
``REVERT`` (other atoms that were predecessors of this failing atom may also
have there intention altered). Once this intention adjustment has happened a
new round of scheduling occurs and this process repeats until the engine
succeeds or fails (if the process running the engine dies the above stages will
be restarted and resuming will occur).
new round of :ref:`scheduling <scheduling>` occurs and this process repeats
until the engine succeeds or fails (if the process running the engine dies the
above stages will be restarted and resuming will occur).
.. note::
@@ -293,7 +308,7 @@ Finishing
---------
At this point the
:py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` has
:py:class:`~taskflow.engines.action_engine.runner.Runner` has
now finished successfully, failed, or the execution was suspended. Depending on
which one of these occurs will cause the flow to enter a new state (typically
one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``).
@@ -307,10 +322,12 @@ saved for this execution.
Interfaces
==========
.. automodule:: taskflow.engines.base
.. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.engine
.. automodule:: taskflow.engines.action_engine.graph_action
.. automodule:: taskflow.engines.action_engine.graph_analyzer
.. automodule:: taskflow.engines.action_engine.runner
.. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.base
Hierarchy
=========

View File

@@ -18,10 +18,8 @@ import threading
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import graph_analyzer
from taskflow.engines.action_engine import retry_action
from taskflow.engines.action_engine import task_action
from taskflow.engines.action_engine import runner
from taskflow.engines.action_engine import runtime
from taskflow.engines import base
from taskflow import exceptions as exc
@@ -38,28 +36,27 @@ from taskflow.utils import reflection
class ActionEngine(base.EngineBase):
"""Generic action-based engine.
This engine flattens the flow (and any subflows) into a execution graph
This engine compiles the flow (and any subflows) into a compilation unit
which contains the full runtime definition to be executed and then uses
this graph in combination with the action classes & storage to attempt to
run your flow (and any subflows & contained tasks) to completion.
this compilation unit in combination with the executor, runtime, runner
and storage classes to attempt to run your flow (and any subflows &
contained atoms) to completion.
During this process it is permissible and valid to have a task or multiple
tasks in the execution graph fail, which will cause the process of
reversion to commence. See the valid states in the states module to learn
more about what other states the tasks & flow being ran can go through.
NOTE(harlowja): during this process it is permissible and valid to have a
task or multiple tasks in the execution graph fail (at the same time even),
which will cause the process of reversion or retrying to commence. See the
valid states in the states module to learn more about what other states
the tasks and flow being ran can go through.
"""
_graph_action_factory = graph_action.FutureGraphAction
_graph_analyzer_factory = graph_analyzer.GraphAnalyzer
_task_action_factory = task_action.TaskAction
_task_executor_factory = executor.SerialTaskExecutor
_retry_action_factory = retry_action.RetryAction
_compiler_factory = compiler.PatternCompiler
_task_executor_factory = executor.SerialTaskExecutor
def __init__(self, flow, flow_detail, backend, conf):
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
self._analyzer = None
self._root = None
self._runner = None
self._runtime = None
self._compiled = False
self._compilation = None
self._lock = threading.RLock()
self._state_lock = threading.RLock()
self._storage_ensured = False
@@ -80,8 +77,8 @@ class ActionEngine(base.EngineBase):
NOTE(harlowja): Only accessible after compilation has completed.
"""
g = None
if self._compiled and self._analyzer:
g = self._analyzer.execution_graph
if self._compiled:
g = self._compilation.execution_graph
return g
def run(self):
@@ -119,7 +116,7 @@ class ActionEngine(base.EngineBase):
state = None
try:
self._change_state(states.RUNNING)
for state in self._root.execute_iter(timeout=timeout):
for state in self._runner.run_iter(timeout=timeout):
try:
try_suspend = yield state
except GeneratorExit:
@@ -131,7 +128,7 @@ class ActionEngine(base.EngineBase):
with excutils.save_and_reraise_exception():
self._change_state(states.FAILURE)
else:
ignorable_states = getattr(self._root, 'ignorable_states', [])
ignorable_states = getattr(self._runner, 'ignorable_states', [])
if state and state not in ignorable_states:
self._change_state(state)
if state != states.SUSPENDED and state != states.SUCCESS:
@@ -162,12 +159,12 @@ class ActionEngine(base.EngineBase):
old_state=old_state)
self.notifier.notify(state, details)
def _ensure_storage_for(self, execution_graph):
def _ensure_storage(self):
# NOTE(harlowja): signal to the tasks that exist that we are about to
# resume, if they have a previous state, they will now transition to
# a resuming state (and then to suspended).
self._change_state(states.RESUMING) # does nothing in PENDING state
for node in execution_graph.nodes_iter():
for node in self._compilation.execution_graph.nodes_iter():
version = misc.get_version_string(node)
if isinstance(node, retry.Retry):
self.storage.ensure_retry(node.name, version, node.save_as)
@@ -175,7 +172,6 @@ class ActionEngine(base.EngineBase):
self.storage.ensure_task(node.name, version, node.save_as)
if node.inject:
self.storage.inject_task_args(node.name, node.inject)
self._change_state(states.SUSPENDED) # does nothing in PENDING state
@lock_utils.locked
@@ -184,7 +180,7 @@ class ActionEngine(base.EngineBase):
raise exc.InvalidState("Can not prepare an engine"
" which has not been compiled")
if not self._storage_ensured:
self._ensure_storage_for(self.execution_graph)
self._ensure_storage()
self._storage_ensured = True
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
@@ -196,22 +192,13 @@ class ActionEngine(base.EngineBase):
raise exc.MissingDependencies(self._flow, sorted(missing))
# Reset everything back to pending (if we were previously reverted).
if self.storage.get_flow_state() == states.REVERTED:
self._root.reset_all()
self._runtime.reset_all()
self._change_state(states.PENDING)
@misc.cachedproperty
def _retry_action(self):
return self._retry_action_factory(self.storage, self.task_notifier)
@misc.cachedproperty
def _task_executor(self):
return self._task_executor_factory()
@misc.cachedproperty
def _task_action(self):
return self._task_action_factory(self.storage, self._task_executor,
self.task_notifier)
@misc.cachedproperty
def _compiler(self):
return self._compiler_factory()
@@ -220,16 +207,13 @@ class ActionEngine(base.EngineBase):
def compile(self):
if self._compiled:
return
compilation = self._compiler.compile(self._flow)
if self._analyzer is None:
self._analyzer = self._graph_analyzer_factory(
compilation.execution_graph, self.storage)
self._root = self._graph_action_factory(self._analyzer,
self.storage,
self._task_action,
self._retry_action)
self._compilation = self._compiler.compile(self._flow)
self._runtime = runtime.Runtime(self._compilation,
self.storage,
self.task_notifier,
self._task_executor)
self._runner = runner.Runner(self._runtime, self._task_executor)
self._compiled = True
return
class SingleThreadedActionEngine(ActionEngine):

View File

@@ -1,233 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import executor as ex
from taskflow import exceptions as excp
from taskflow import retry as retry_atom
from taskflow import states as st
from taskflow import task as task_atom
from taskflow.utils import misc
_WAITING_TIMEOUT = 60 # in seconds
class FutureGraphAction(object):
"""Graph action build around futures returned by task action.
This graph action schedules all task it can for execution and than
waits on returned futures. If task executor is able to execute tasks
in parallel, this enables parallel flow run and reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, analyzer, storage, task_action, retry_action):
self._analyzer = analyzer
self._storage = storage
self._task_action = task_action
self._retry_action = retry_action
def is_running(self):
return self._storage.get_flow_state() == st.RUNNING
def _schedule_node(self, node):
"""Schedule a single node for execution."""
if isinstance(node, task_atom.BaseTask):
return self._schedule_task(node)
elif isinstance(node, retry_atom.Retry):
return self._schedule_retry(node)
else:
raise TypeError("Unknown how to schedule node %s" % node)
def _schedule(self, nodes):
"""Schedule a group of nodes for execution."""
futures = set()
for node in nodes:
try:
futures.add(self._schedule_node(node))
except Exception:
# Immediately stop scheduling future work so that we can
# exit execution early (rather than later) if a single task
# fails to schedule correctly.
return (futures, [misc.Failure()])
return (futures, [])
def execute_iter(self, timeout=None):
if timeout is None:
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
yield st.RESUMING
next_nodes = self._prepare_flow_for_resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
yield st.SCHEDULING
if self.is_running():
not_done, failures = self._schedule(next_nodes)
else:
not_done, failures = (set(), [])
# Run!
#
# At this point we need to ensure we wait for all active nodes to
# finish running (even if we are asked to suspend) since we can not
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
yield st.WAITING
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
# py2 and py3.
done, not_done = self._task_action.wait_for_any(not_done, timeout)
# Analyze the results and schedule more nodes (unless we had
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
yield st.ANALYZING
next_nodes = set()
for future in done:
try:
node, event, result = future.result()
if isinstance(node, task_atom.BaseTask):
self._complete_task(node, event, result)
if isinstance(result, misc.Failure):
if event == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
failures.append(result)
except Exception:
failures.append(misc.Failure())
else:
try:
more_nodes = self._analyzer.get_next_nodes(node)
except Exception:
failures.append(misc.Failure())
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
yield st.SCHEDULING
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._schedule(next_nodes)
not_done.update(more_not_done)
if failures:
misc.Failure.reraise_if_any(failures)
if self._analyzer.get_next_nodes():
yield st.SUSPENDED
elif self._analyzer.is_success():
yield st.SUCCESS
else:
yield st.REVERTED
def _schedule_task(self, task):
"""Schedules the given task for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(task.name)
if intention == st.EXECUTE:
return self._task_action.schedule_execution(task)
elif intention == st.REVERT:
return self._task_action.schedule_reversion(task)
else:
raise excp.ExecutionFailure("Unknown how to schedule task with"
" intention: %s" % intention)
def _complete_task(self, task, event, result):
"""Completes the given task, process task failure."""
if event == ex.EXECUTED:
self._task_action.complete_execution(task, result)
else:
self._task_action.complete_reversion(task, result)
def _schedule_retry(self, retry):
"""Schedules the given retry for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(retry.name)
if intention == st.EXECUTE:
return self._retry_action.execute(retry)
elif intention == st.REVERT:
return self._retry_action.revert(retry)
elif intention == st.RETRY:
self._retry_action.change_state(retry, st.RETRYING)
self._retry_subflow(retry)
return self._retry_action.execute(retry)
else:
raise excp.ExecutionFailure("Unknown how to schedule retry with"
" intention: %s" % intention)
def _process_atom_failure(self, atom, failure):
"""On atom failure find its retry controller, ask for the action to
perform with failed subflow and set proper intention for subflow nodes.
"""
retry = self._analyzer.find_atom_retry(atom)
if retry:
# Ask retry controller what to do in case of failure
action = self._retry_action.on_failure(retry, atom, failure)
if action == retry_atom.RETRY:
# Prepare subflow for revert
self._storage.set_atom_intention(retry.name, st.RETRY)
for node in self._analyzer.iterate_subgraph(retry):
self._storage.set_atom_intention(node.name, st.REVERT)
elif action == retry_atom.REVERT:
# Ask parent checkpoint
self._process_atom_failure(retry, failure)
elif action == retry_atom.REVERT_ALL:
# Prepare all flow for revert
self._revert_all()
else:
self._revert_all()
def _revert_all(self):
for node in self._analyzer.iterate_all_nodes():
self._storage.set_atom_intention(node.name, st.REVERT)
def _prepare_flow_for_resume(self):
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) == st.FAILURE:
self._process_atom_failure(node, self._storage.get(node.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._retry_subflow(retry)
next_nodes = set()
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
next_nodes.add(node)
return next_nodes
def _reset_nodes(self, nodes_iter, intention=st.EXECUTE):
for node in nodes_iter:
if isinstance(node, task_atom.BaseTask):
self._task_action.change_state(node, st.PENDING, progress=0.0)
elif isinstance(node, retry_atom.Retry):
self._retry_action.change_state(node, st.PENDING)
else:
raise TypeError("Unknown how to reset node %s" % node)
self._storage.set_atom_intention(node.name, intention)
def reset_all(self):
self._reset_nodes(self._analyzer.iterate_all_nodes())
def _retry_subflow(self, retry):
self._storage.set_atom_intention(retry.name, st.EXECUTE)
self._reset_nodes(self._analyzer.iterate_subgraph(retry))

View File

@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import states as st
from taskflow.utils import misc
_WAITING_TIMEOUT = 60 # in seconds
class Runner(object):
"""Runner that iterates while executing nodes using the given runtime.
This runner acts as the action engine run loop, it resumes the workflow,
schedules all task it can for execution using the runtimes scheduler and
analyzer components, and than waits on returned futures and then activates
the runtimes completion component to finish up those tasks.
This process repeats until the analzyer runs out of next nodes, when the
scheduler can no longer schedule tasks or when the the engine has been
suspended or a task has failed and that failure could not be resolved.
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter):
self._runtime = runtime
self._scheduler = runtime.scheduler
self._completer = runtime.completer
self._storage = runtime.storage
self._analyzer = runtime.graph_analyzer
self._waiter = waiter
def is_running(self):
return self._storage.get_flow_state() == st.RUNNING
def run_iter(self, timeout=None):
"""Runs the nodes using the runtime components.
NOTE(harlowja): the states that this generator will go through are:
RESUMING -> SCHEDULING
SCHEDULING -> WAITING
WAITING -> ANALYZING
ANALYZING -> SCHEDULING
Between any of these yielded states if the engine has been suspended
or the engine has failed (due to a non-resolveable task failure or
scheduling failure) the engine will stop executing new tasks (currently
running tasks will be allowed to complete) and this iteration loop
will be broken.
"""
if timeout is None:
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
yield st.RESUMING
next_nodes = self._completer.resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
yield st.SCHEDULING
if self.is_running():
not_done, failures = self._scheduler.schedule(next_nodes)
else:
not_done, failures = (set(), [])
# Run!
#
# At this point we need to ensure we wait for all active nodes to
# finish running (even if we are asked to suspend) since we can not
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
yield st.WAITING
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
# py2 and py3.
done, not_done = self._waiter.wait_for_any(not_done, timeout)
# Analyze the results and schedule more nodes (unless we had
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
yield st.ANALYZING
next_nodes = set()
for future in done:
try:
node, event, result = future.result()
retain = self._completer.complete(node, event, result)
if retain and isinstance(result, misc.Failure):
failures.append(result)
except Exception:
failures.append(misc.Failure())
else:
try:
more_nodes = self._analyzer.get_next_nodes(node)
except Exception:
failures.append(misc.Failure())
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
yield st.SCHEDULING
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._scheduler.schedule(
next_nodes)
not_done.update(more_not_done)
if failures:
misc.Failure.reraise_if_any(failures)
if self._analyzer.get_next_nodes():
yield st.SUSPENDED
elif self._analyzer.is_success():
yield st.SUCCESS
else:
yield st.REVERTED

View File

@@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as excp
from taskflow import retry as retry_atom
from taskflow import states as st
from taskflow import task as task_atom
from taskflow.utils import misc
from taskflow.engines.action_engine import executor as ex
from taskflow.engines.action_engine import graph_analyzer as ga
from taskflow.engines.action_engine import retry_action as ra
from taskflow.engines.action_engine import task_action as ta
class Runtime(object):
"""An object that contains various utility methods and properties that
represent the collection of runtime components and functionality needed
for an action engine to run to completion.
"""
def __init__(self, compilation, storage, task_notifier, task_executor):
self._task_notifier = task_notifier
self._task_executor = task_executor
self._storage = storage
self._compilation = compilation
@property
def compilation(self):
return self._compilation
@property
def storage(self):
return self._storage
@misc.cachedproperty
def graph_analyzer(self):
return ga.GraphAnalyzer(self._compilation.execution_graph,
self._storage)
@misc.cachedproperty
def completer(self):
return Completer(self)
@misc.cachedproperty
def scheduler(self):
return Scheduler(self)
@misc.cachedproperty
def retry_action(self):
return ra.RetryAction(self.storage, self._task_notifier)
@misc.cachedproperty
def task_action(self):
return ta.TaskAction(self.storage, self._task_executor,
self._task_notifier)
def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE):
for node in nodes:
if state:
if isinstance(node, task_atom.BaseTask):
self.task_action.change_state(node, state, progress=0.0)
elif isinstance(node, retry_atom.Retry):
self.retry_action.change_state(node, state)
else:
raise TypeError("Unknown how to reset node %s, %s"
% (node, type(node)))
if intention:
self.storage.set_atom_intention(node.name, intention)
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
self.reset_nodes(self.graph_analyzer.iterate_all_nodes(),
state=state, intention=intention)
def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE):
self.reset_nodes(self.graph_analyzer.iterate_subgraph(node),
state=state, intention=intention)
# Various helper methods used by completer and scheduler.
def _retry_subflow(retry, runtime):
runtime.storage.set_atom_intention(retry.name, st.EXECUTE)
runtime.reset_subgraph(retry)
class Completer(object):
"""Completes atoms using actions to complete them."""
def __init__(self, runtime):
self._analyzer = runtime.graph_analyzer
self._retry_action = runtime.retry_action
self._runtime = runtime
self._storage = runtime.storage
self._task_action = runtime.task_action
def _complete_task(self, task, event, result):
"""Completes the given task, processes task failure."""
if event == ex.EXECUTED:
self._task_action.complete_execution(task, result)
else:
self._task_action.complete_reversion(task, result)
def resume(self):
"""Resumes nodes in the contained graph.
This is done to allow any previously completed or failed nodes to
be analyzed, there results processed and any potential nodes affected
to be adjusted as needed.
This should return a set of nodes which should be the initial set of
nodes that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) == st.FAILURE:
self._process_atom_failure(node, self._storage.get(node.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
_retry_subflow(retry, self._runtime)
unfinished_nodes = set()
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
unfinished_nodes.add(node)
return unfinished_nodes
def complete(self, node, event, result):
"""Performs post-execution completion of a node.
Returns whether the result should be saved into an accumulator of
failures or whether this should not be done.
"""
if isinstance(node, task_atom.BaseTask):
self._complete_task(node, event, result)
if isinstance(result, misc.Failure):
if event == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
return True
return False
def _process_atom_failure(self, atom, failure):
"""On atom failure find its retry controller, ask for the action to
perform with failed subflow and set proper intention for subflow nodes.
"""
retry = self._analyzer.find_atom_retry(atom)
if retry:
# Ask retry controller what to do in case of failure
action = self._retry_action.on_failure(retry, atom, failure)
if action == retry_atom.RETRY:
# Prepare subflow for revert
self._storage.set_atom_intention(retry.name, st.RETRY)
self._runtime.reset_subgraph(retry, state=None,
intention=st.REVERT)
elif action == retry_atom.REVERT:
# Ask parent checkpoint
self._process_atom_failure(retry, failure)
elif action == retry_atom.REVERT_ALL:
# Prepare all flow for revert
self._revert_all()
else:
# Prepare all flow for revert
self._revert_all()
def _revert_all(self):
"""Attempts to set all nodes to the REVERT intention."""
self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(),
state=None, intention=st.REVERT)
class Scheduler(object):
"""Schedules atoms using actions to schedule."""
def __init__(self, runtime):
self._analyzer = runtime.graph_analyzer
self._retry_action = runtime.retry_action
self._runtime = runtime
self._storage = runtime.storage
self._task_action = runtime.task_action
def _schedule_node(self, node):
"""Schedule a single node for execution."""
if isinstance(node, task_atom.BaseTask):
return self._schedule_task(node)
elif isinstance(node, retry_atom.Retry):
return self._schedule_retry(node)
else:
raise TypeError("Unknown how to schedule node %s, %s"
% (node, type(node)))
def _schedule_retry(self, retry):
"""Schedules the given retry for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(retry.name)
if intention == st.EXECUTE:
return self._retry_action.execute(retry)
elif intention == st.REVERT:
return self._retry_action.revert(retry)
elif intention == st.RETRY:
self._retry_action.change_state(retry, st.RETRYING)
_retry_subflow(retry, self._runtime)
return self._retry_action.execute(retry)
else:
raise excp.ExecutionFailure("Unknown how to schedule retry with"
" intention: %s" % intention)
def _schedule_task(self, task):
"""Schedules the given task for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(task.name)
if intention == st.EXECUTE:
return self._task_action.schedule_execution(task)
elif intention == st.REVERT:
return self._task_action.schedule_reversion(task)
else:
raise excp.ExecutionFailure("Unknown how to schedule task with"
" intention: %s" % intention)
def schedule(self, nodes):
"""Schedules the provided nodes for *future* completion.
This method should schedule a future for each node provided and return
a set of those futures to be waited on (or used for other similar
purposes). It should also return any failure objects that represented
scheduling failures that may have occurred during this scheduling
process.
"""
futures = set()
for node in nodes:
try:
futures.add(self._schedule_node(node))
except Exception:
# Immediately stop scheduling future work so that we can
# exit execution early (rather than later) if a single task
# fails to schedule correctly.
return (futures, [misc.Failure()])
return (futures, [])