Merge "Check documentation for simple style requirements"

This commit is contained in:
Jenkins
2014-05-08 18:14:43 +00:00
committed by Gerrit Code Review
13 changed files with 570 additions and 362 deletions

View File

@@ -11,10 +11,10 @@ Atom Arguments and Results
In taskflow, all flow and task state goes to (potentially persistent) storage. In taskflow, all flow and task state goes to (potentially persistent) storage.
That includes all the information that :doc:`atoms <atoms>` (e.g. tasks) in the That includes all the information that :doc:`atoms <atoms>` (e.g. tasks) in the
flow need when they are executed, and all the information task produces (via flow need when they are executed, and all the information task produces (via
serializable task results). A developer who implements tasks or flows can specify serializable task results). A developer who implements tasks or flows can
what arguments a task accepts and what result it returns in several ways. This specify what arguments a task accepts and what result it returns in several
document will help you understand what those ways are and how to use those ways ways. This document will help you understand what those ways are and how to use
to accomplish your desired usage pattern. those ways to accomplish your desired usage pattern.
.. glossary:: .. glossary::
@@ -191,11 +191,11 @@ Results Specification
===================== =====================
In python, function results are not named, so we can not infer what a task In python, function results are not named, so we can not infer what a task
returns. This is important since the complete task result (what the |task.execute| returns. This is important since the complete task result (what the
method returns) is saved in (potentially persistent) storage, and it is |task.execute| method returns) is saved in (potentially persistent) storage,
typically (but not always) desirable to make those results accessible to other and it is typically (but not always) desirable to make those results accessible
tasks. To accomplish this the task specifies names of those values via its to other tasks. To accomplish this the task specifies names of those values via
``provides`` task constructor parameter or other method (see below). its ``provides`` task constructor parameter or other method (see below).
Returning One Value Returning One Value
------------------- -------------------
@@ -267,7 +267,8 @@ Another option is to return several values as a dictionary (aka a ``dict``).
'pieces': 'PIECEs' 'pieces': 'PIECEs'
} }
TaskFlow expects that a dict will be returned if ``provides`` argument is a ``set``: TaskFlow expects that a dict will be returned if ``provides`` argument is a
``set``:
:: ::
@@ -314,15 +315,15 @@ Of course, the flow author can override this to change names if needed:
BitsAndPiecesTask(provides=('b', 'p')) BitsAndPiecesTask(provides=('b', 'p'))
or to change structure -- e.g. this instance will make whole tuple accessible to or to change structure -- e.g. this instance will make whole tuple accessible
other tasks by name 'bnp': to other tasks by name 'bnp':
:: ::
BitsAndPiecesTask(provides='bnp') BitsAndPiecesTask(provides='bnp')
or the flow author may want to return default behavior and hide the results of the or the flow author may want to return default behavior and hide the results of
task from other tasks in the flow (e.g. to avoid naming conflicts): the task from other tasks in the flow (e.g. to avoid naming conflicts):
:: ::
@@ -339,7 +340,8 @@ For ``result`` value, two cases are possible:
* if task is being reverted because it failed (an exception was raised from its * if task is being reverted because it failed (an exception was raised from its
|task.execute| method), ``result`` value is instance of |task.execute| method), ``result`` value is instance of
:py:class:`taskflow.utils.misc.Failure` object that holds exception information; :py:class:`taskflow.utils.misc.Failure` object that holds exception
information;
* if task is being reverted because some other task failed, and this task * if task is being reverted because some other task failed, and this task
finished successfully, ``result`` value is task result fetched from storage: finished successfully, ``result`` value is task result fetched from storage:
@@ -360,7 +362,8 @@ To determine if task failed you can check whether ``result`` is instance of
def revert(self, result, spam, eggs): def revert(self, result, spam, eggs):
if isinstance(result, misc.Failure): if isinstance(result, misc.Failure):
print("This task failed, exception: %s" % result.exception_str) print("This task failed, exception: %s"
% result.exception_str)
else: else:
print("do_something returned %r" % result) print("do_something returned %r" % result)
@@ -372,9 +375,10 @@ representation of result.
Retry Arguments Retry Arguments
=============== ===============
A Retry controller works with arguments in the same way as a Task. But it has an additional parameter 'history' that is A Retry controller works with arguments in the same way as a Task. But it has
a list of tuples. Each tuple contains a result of the previous Retry run and a table where a key is a failed task and a value an additional parameter 'history' that is a list of tuples. Each tuple contains
is a :py:class:`taskflow.utils.misc.Failure`. a result of the previous Retry run and a table where a key is a failed task and
a value is a :py:class:`taskflow.utils.misc.Failure`.
Consider the following Retry:: Consider the following Retry::
@@ -393,15 +397,18 @@ Consider the following Retry::
def revert(self, history, *args, **kwargs): def revert(self, history, *args, **kwargs):
print history print history
Imagine the following Retry had returned a value '5' and then some task 'A' failed with some exception. Imagine the following Retry had returned a value '5' and then some task 'A'
In this case ``on_failure`` method will receive the following history:: failed with some exception. In this case ``on_failure`` method will receive
the following history::
[('5', {'A': misc.Failure()})] [('5', {'A': misc.Failure()})]
Then the |retry.execute| method will be called again and it'll receive the same history. Then the |retry.execute| method will be called again and it'll receive the same
history.
If the |retry.execute| method raises an exception, the |retry.revert| method of Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be present If the |retry.execute| method raises an exception, the |retry.revert| method of
in the history instead of Retry result:: Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be
present in the history instead of Retry result::
[('5', {'A': misc.Failure()}), (misc.Failure(), {})] [('5', {'A': misc.Failure()}), (misc.Failure(), {})]

View File

@@ -7,8 +7,8 @@ Overview
Engines are what **really** runs your atoms. Engines are what **really** runs your atoms.
An *engine* takes a flow structure (described by :doc:`patterns <patterns>`) and An *engine* takes a flow structure (described by :doc:`patterns <patterns>`)
uses it to decide which :doc:`atom <atoms>` to run and when. and uses it to decide which :doc:`atom <atoms>` to run and when.
TaskFlow provides different implementations of engines. Some may be easier to TaskFlow provides different implementations of engines. Some may be easier to
use (ie, require no additional infrastructure setup) and understand; others use (ie, require no additional infrastructure setup) and understand; others
@@ -18,10 +18,10 @@ select an engine that suites their setup best without modifying the code of
said service. said service.
Engines usually have different capabilities and configuration, but all of them Engines usually have different capabilities and configuration, but all of them
**must** implement the same interface and preserve the semantics of patterns (e.g. **must** implement the same interface and preserve the semantics of patterns
parts of :py:class:`linear flow <taskflow.patterns.linear_flow.Flow>` are run (e.g. parts of :py:class:`linear flow <taskflow.patterns.linear_flow.Flow>`
one after another, in order, even if engine is *capable* of running tasks in are run one after another, in order, even if engine is *capable* of running
parallel). tasks in parallel).
Why they exist Why they exist
-------------- --------------
@@ -31,66 +31,71 @@ likely a new concept for many programmers so let's describe how it operates in
more depth and some of the reasoning behind why it exists. This will hopefully more depth and some of the reasoning behind why it exists. This will hopefully
make it more clear on there value add to the TaskFlow library user. make it more clear on there value add to the TaskFlow library user.
First though let us discuss something most are familiar already with; the difference First though let us discuss something most are familiar already with; the
between `declarative`_ and `imperative`_ programming models. The imperative model difference between `declarative`_ and `imperative`_ programming models. The
involves establishing statements that accomplish a programs action (likely using imperative model involves establishing statements that accomplish a programs
conditionals and such other language features to do this). This kind of program embeds action (likely using conditionals and such other language features to do this).
the *how* to accomplish a goal while also defining *what* the goal actually is (and the state This kind of program embeds the *how* to accomplish a goal while also defining
of this is maintained in memory or on the stack while these statements execute). In contrast *what* the goal actually is (and the state of this is maintained in memory or
there is the the declarative model which instead of combining the *how* to accomplish a goal on the stack while these statements execute). In contrast there is the the
along side the *what* is to be accomplished splits these two into only declaring what declarative model which instead of combining the *how* to accomplish a goal
the intended goal is and not the *how*. In TaskFlow terminology the *what* is the structure along side the *what* is to be accomplished splits these two into only
of your flows and the tasks and other atoms you have inside those flows, but the *how* declaring what the intended goal is and not the *how*. In TaskFlow terminology
is not defined (the line becomes blurred since tasks themselves contain imperative the *what* is the structure of your flows and the tasks and other atoms you
code, but for now consider a task as more of a *pure* function that executes, reverts and may have inside those flows, but the *how* is not defined (the line becomes blurred
require inputs and provide outputs). This is where engines get involved; they do since tasks themselves contain imperative code, but for now consider a task as
the execution of the *what* defined via :doc:`atoms <atoms>`, tasks, flows and more of a *pure* function that executes, reverts and may require inputs and
the relationships defined there-in and execute these in a well-defined provide outputs). This is where engines get involved; they do the execution of
manner (and the engine is responsible for *most* of the state manipulation the *what* defined via :doc:`atoms <atoms>`, tasks, flows and the relationships
instead). defined there-in and execute these in a well-defined manner (and the engine is
responsible for *most* of the state manipulation instead).
This mix of imperative and declarative (with a stronger emphasis on the This mix of imperative and declarative (with a stronger emphasis on the
declarative model) allows for the following functionality to be possible: declarative model) allows for the following functionality to be possible:
* Enhancing reliability: Decoupling of state alterations from what should be accomplished * Enhancing reliability: Decoupling of state alterations from what should be
allows for a *natural* way of resuming by allowing the engine to track the current state accomplished allows for a *natural* way of resuming by allowing the engine to
and know at which point a flow is in and how to get back into that state when track the current state and know at which point a flow is in and how to get
resumption occurs. back into that state when resumption occurs.
* Enhancing scalability: When a engine is responsible for executing your desired work * Enhancing scalability: When a engine is responsible for executing your
it becomes possible to alter the *how* in the future by creating new types of execution desired work it becomes possible to alter the *how* in the future by creating
backends (for example the worker model which does not execute locally). Without the decoupling new types of execution backends (for example the worker model which does not
of the *what* and the *how* it is not possible to provide such a feature (since by the very execute locally). Without the decoupling of the *what* and the *how* it is
nature of that coupling this kind of functionality is inherently hard to provide). not possible to provide such a feature (since by the very nature of that
* Enhancing consistency: Since the engine is responsible for executing atoms and the coupling this kind of functionality is inherently hard to provide).
associated workflow, it can be one (if not the only) of the primary entities * Enhancing consistency: Since the engine is responsible for executing atoms
that is working to keep the execution model in a consistent state. Coupled with atoms and the associated workflow, it can be one (if not the only) of the primary
which *should* be immutable and have have limited (if any) internal state the entities that is working to keep the execution model in a consistent state.
ability to reason about and obtain consistency can be vastly improved. Coupled with atoms which *should* be immutable and have have limited (if any)
internal state the ability to reason about and obtain consistency can be
vastly improved.
* With future features around locking (using `tooz`_ to help) engines can also * With future features around locking (using `tooz`_ to help) engines can
help ensure that resources being accessed by tasks are reliably obtained and also help ensure that resources being accessed by tasks are reliably
mutated on. This will help ensure that other processes, threads, or other types obtained and mutated on. This will help ensure that other processes,
of entities are also not executing tasks that manipulate those same resources (further threads, or other types of entities are also not executing tasks that
increasing consistency). manipulate those same resources (further increasing consistency).
Of course these kind of features can come with some drawbacks: Of course these kind of features can come with some drawbacks:
* The downside of decoupling the *how* and the *what* is that the imperative model * The downside of decoupling the *how* and the *what* is that the imperative
where functions control & manipulate state must start to be shifted away from model where functions control & manipulate state must start to be shifted
(and this is likely a mindset change for programmers used to the imperative away from (and this is likely a mindset change for programmers used to the
model). We have worked to make this less of a concern by creating and imperative model). We have worked to make this less of a concern by creating
encouraging the usage of :doc:`persistence <persistence>`, to help make it possible and encouraging the usage of :doc:`persistence <persistence>`, to help make
to have some level of provided state transfer mechanism. it possible to have some level of provided state transfer mechanism.
* Depending on how much imperative code exists (and state inside that code) there * Depending on how much imperative code exists (and state inside that code)
can be *significant* rework of that code and converting or refactoring it to these new concepts. there can be *significant* rework of that code and converting or refactoring
We have tried to help here by allowing you to have tasks that internally use regular python it to these new concepts. We have tried to help here by allowing you to have
code (and internally can be written in an imperative style) as well as by providing examples tasks that internally use regular python code (and internally can be written
and these developer docs; helping this process be as seamless as possible. in an imperative style) as well as by providing examples and these developer
* Another one of the downsides of decoupling the *what* from the *how* is that it may become docs; helping this process be as seamless as possible.
harder to use traditional techniques to debug failures (especially if remote workers are * Another one of the downsides of decoupling the *what* from the *how* is that
involved). We try to help here by making it easy to track, monitor and introspect it may become harder to use traditional techniques to debug failures
the actions & state changes that are occurring inside an engine (see (especially if remote workers are involved). We try to help here by making it
:doc:`notifications <notifications>` for how to use some of these capabilities). easy to track, monitor and introspect the actions & state changes that are
occurring inside an engine (see :doc:`notifications <notifications>` for how
to use some of these capabilities).
.. _declarative: http://en.wikipedia.org/wiki/Declarative_programming .. _declarative: http://en.wikipedia.org/wiki/Declarative_programming
.. _imperative: http://en.wikipedia.org/wiki/Imperative_programming .. _imperative: http://en.wikipedia.org/wiki/Imperative_programming
@@ -112,7 +117,8 @@ might look like::
... ...
flow = make_flow() flow = make_flow()
engine = engines.load(flow, engine_conf=my_conf, backend=my_persistence_conf) engine = engines.load(flow, engine_conf=my_conf,
backend=my_persistence_conf)
engine.run engine.run
@@ -153,10 +159,10 @@ Parallel engine schedules tasks onto different threads to run them in parallel.
Additional supported keyword arguments: Additional supported keyword arguments:
* ``executor``: a object that implements a :pep:`3148` compatible `executor`_ * ``executor``: a object that implements a :pep:`3148` compatible `executor`_
interface; it will be used for scheduling tasks. You can use instances interface; it will be used for scheduling tasks. You can use instances of a
of a `thread pool executor`_ or a `thread pool executor`_ or a :py:class:`green executor
:py:class:`green executor <taskflow.utils.eventlet_utils.GreenExecutor>` <taskflow.utils.eventlet_utils.GreenExecutor>` (which internally uses
(which internally uses `eventlet <http://eventlet.net/>`_ and greenthread pools). `eventlet <http://eventlet.net/>`_ and greenthread pools).
.. tip:: .. tip::
@@ -190,22 +196,23 @@ Creation
The first thing that occurs is that the user creates an engine for a given The first thing that occurs is that the user creates an engine for a given
flow, providing a flow detail (where results will be saved into a provided flow, providing a flow detail (where results will be saved into a provided
:doc:`persistence <persistence>` backend). This is typically accomplished via :doc:`persistence <persistence>` backend). This is typically accomplished via
the methods described above in `creating engines`_. The engine at this point now will the methods described above in `creating engines`_. The engine at this point
have references to your flow and backends and other internal variables are now will have references to your flow and backends and other internal variables
setup. are setup.
Compiling Compiling
--------- ---------
During this stage the flow will be converted into an internal graph representation During this stage the flow will be converted into an internal graph
using a flow :py:func:`~taskflow.utils.flow_utils.flatten` function. This function representation using a flow :py:func:`~taskflow.utils.flow_utils.flatten`
converts the flow objects and contained atoms into a `networkx`_ directed graph that function. This function converts the flow objects and contained atoms into a
contains the equivalent atoms defined in the flow and any nested flows & atoms as `networkx`_ directed graph that contains the equivalent atoms defined in the
well as the constraints that are created by the application of the different flow flow and any nested flows & atoms as well as the constraints that are created
patterns. This graph is then what will be analyzed & traversed during the engines by the application of the different flow patterns. This graph is then what will
execution. At this point a few helper object are also created and saved to be analyzed & traversed during the engines execution. At this point a few
internal engine variables (these object help in execution of atoms, analyzing helper object are also created and saved to internal engine variables (these
the graph and performing other internal engine activities). object help in execution of atoms, analyzing the graph and performing other
internal engine activities).
Preparation Preparation
----------- -----------
@@ -213,35 +220,37 @@ Preparation
This stage starts by setting up the storage needed for all atoms in the This stage starts by setting up the storage needed for all atoms in the
previously created graph, ensuring that corresponding previously created graph, ensuring that corresponding
:py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects :py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects
are created for each node in the graph. Once this is done final validation occurs are created for each node in the graph. Once this is done final validation
on the requirements that are needed to start execution and what storage provides. occurs on the requirements that are needed to start execution and what storage
If there is any atom or flow requirements not satisfied then execution will not be provides. If there is any atom or flow requirements not satisfied then
allowed to continue. execution will not be allowed to continue.
Execution Execution
--------- ---------
The graph (and helper objects) previously created are now used for guiding further The graph (and helper objects) previously created are now used for guiding
execution. The flow is put into the ``RUNNING`` :doc:`state <states>` and a further execution. The flow is put into the ``RUNNING`` :doc:`state <states>`
and a
:py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` :py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction`
object starts to take over and begins going through the stages listed below. object starts to take over and begins going through the stages listed below.
Resumption Resumption
^^^^^^^^^^ ^^^^^^^^^^
One of the first stages is to analyze the :doc:`state <states>` of the tasks in the graph, One of the first stages is to analyze the :doc:`state <states>` of the tasks in
determining which ones have failed, which one were previously running and the graph, determining which ones have failed, which one were previously
determining what the intention of that task should now be (typically an running and determining what the intention of that task should now be
intention can be that it should ``REVERT``, or that it should ``EXECUTE`` or (typically an intention can be that it should ``REVERT``, or that it should
that it should be ``IGNORED``). This intention is determined by analyzing the ``EXECUTE`` or that it should be ``IGNORED``). This intention is determined by
current state of the task; which is determined by looking at the state in the task analyzing the current state of the task; which is determined by looking at the
detail object for that task and analyzing edges of the graph for things like state in the task detail object for that task and analyzing edges of the graph
retry atom which can influence what a tasks intention should be (this is aided for things like retry atom which can influence what a tasks intention should be
by the usage of the :py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` (this is aided by the usage of the
helper object which was designed to provide helper methods for this analysis). Once :py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper
object which was designed to provide helper methods for this analysis). Once
these intentions are determined and associated with each task (the intention is these intentions are determined and associated with each task (the intention is
also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object) the also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object)
scheduling stage starts. the scheduling stage starts.
Scheduling Scheduling
^^^^^^^^^^ ^^^^^^^^^^
@@ -250,25 +259,26 @@ This stage selects which atoms are eligible to run (looking at there intention,
checking if predecessor atoms have ran and so-on, again using the checking if predecessor atoms have ran and so-on, again using the
:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper :py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper
object) and submits those atoms to a previously provided compatible object) and submits those atoms to a previously provided compatible
`executor`_ for asynchronous execution. This executor will return a `future`_ object `executor`_ for asynchronous execution. This executor will return a `future`_
for each atom submitted; all of which are collected into a list of not done object for each atom submitted; all of which are collected into a list of not
futures. This will end the initial round of scheduling and at this point the done futures. This will end the initial round of scheduling and at this point
engine enters the waiting stage. the engine enters the waiting stage.
Waiting Waiting
^^^^^^^ ^^^^^^^
In this stage the engine waits for any of the future objects previously submitted In this stage the engine waits for any of the future objects previously
to complete. Once one of the future objects completes (or fails) that atoms result submitted to complete. Once one of the future objects completes (or fails) that
will be examined and persisted to the persistence backend (saved into the atoms result will be examined and persisted to the persistence backend (saved
corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` object) and into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
the state of the atom is changed. At this point what happens falls into two categories, object) and the state of the atom is changed. At this point what happens falls
one for if that atom failed and one for if it did not. If the atom failed it may into two categories, one for if that atom failed and one for if it did not. If
be set to a new intention such as ``RETRY`` or ``REVERT`` (other atoms that were the atom failed it may be set to a new intention such as ``RETRY`` or
predecessors of this failing atom may also have there intention altered). Once this ``REVERT`` (other atoms that were predecessors of this failing atom may also
intention adjustment has happened a new round of scheduling occurs and this process have there intention altered). Once this intention adjustment has happened a
repeats until the engine succeeds or fails (if the process running the engine new round of scheduling occurs and this process repeats until the engine
dies the above stages will be restarted and resuming will occur). succeeds or fails (if the process running the engine dies the above stages will
be restarted and resuming will occur).
.. note:: .. note::
@@ -280,15 +290,17 @@ dies the above stages will be restarted and resuming will occur).
Finishing Finishing
--------- ---------
At this point the :py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` At this point the
has now finished successfully, failed, or the execution was suspended. Depending :py:class:`~taskflow.engines.action_engine.graph_action.FutureGraphAction` has
on which one of these occurs will cause the flow to enter a new state (typically one now finished successfully, failed, or the execution was suspended. Depending on
of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``). :doc:`Notifications <notifications>` which one of these occurs will cause the flow to enter a new state (typically
will be sent out about this final state change (other state changes also send out notifications) one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``).
and any failures that occurred will be reraised (the failure objects are wrapped :doc:`Notifications <notifications>` will be sent out about this final state
exceptions). If no failures have occurred then the engine will have finished and change (other state changes also send out notifications) and any failures that
if so desired the :doc:`persistence <persistence>` can be used to cleanup any occurred will be reraised (the failure objects are wrapped exceptions). If no
details that were saved for this execution. failures have occurred then the engine will have finished and if so desired the
:doc:`persistence <persistence>` can be used to cleanup any details that were
saved for this execution.
Interfaces Interfaces
========== ==========

View File

@@ -6,7 +6,8 @@ easy, consistent, and reliable.*
.. note:: .. note::
Additional documentation is also hosted on wiki: https://wiki.openstack.org/wiki/TaskFlow Additional documentation is also hosted on wiki:
https://wiki.openstack.org/wiki/TaskFlow
Contents Contents
======== ========

View File

@@ -4,19 +4,19 @@ Inputs and Outputs
In TaskFlow there are multiple ways to provide inputs for your tasks and flows In TaskFlow there are multiple ways to provide inputs for your tasks and flows
and get information from them. This document describes one of them, that and get information from them. This document describes one of them, that
involves task arguments and results. There are also involves task arguments and results. There are also :doc:`notifications
:doc:`notifications <notifications>`, which allow you to get notified when task <notifications>`, which allow you to get notified when task or flow changed
or flow changed state. You may also opt to use the :doc:`persistence <persistence>` state. You may also opt to use the :doc:`persistence <persistence>` layer
layer itself directly. itself directly.
----------------------- -----------------------
Flow Inputs and Outputs Flow Inputs and Outputs
----------------------- -----------------------
Tasks accept inputs via task arguments and provide outputs via task results Tasks accept inputs via task arguments and provide outputs via task results
(see :doc:`arguments and results <arguments_and_results>` for more details). This (see :doc:`arguments and results <arguments_and_results>` for more details).
is the standard and recommended way to pass data from one task to another. Of This is the standard and recommended way to pass data from one task to another.
course not every task argument needs to be provided to some other task of a Of course not every task argument needs to be provided to some other task of a
flow, and not every task result should be consumed by every task. flow, and not every task result should be consumed by every task.
If some value is required by one or more tasks of a flow, but is not provided If some value is required by one or more tasks of a flow, but is not provided
@@ -54,10 +54,12 @@ For example:
.. make vim syntax highlighter happy** .. make vim syntax highlighter happy**
As you can see, this flow does not require b, as it is provided by the fist task. As you can see, this flow does not require b, as it is provided by the fist
task.
.. note:: .. note::
There is no difference between processing of Task and Retry inputs and outputs. There is no difference between processing of Task and Retry inputs
and outputs.
------------------ ------------------
Engine and Storage Engine and Storage
@@ -93,7 +95,8 @@ prior to running:
>>> engines.run(flo) >>> engines.run(flo)
Traceback (most recent call last): Traceback (most recent call last):
... ...
taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog; taskflow.exceptions.MissingDependencies:
taskflow.patterns.linear_flow.Flow: cat-dog;
2 requires ['meow', 'woof'] but no other entity produces said requirements 2 requires ['meow', 'woof'] but no other entity produces said requirements
The recommended way to provide flow inputs is to use the ``store`` parameter The recommended way to provide flow inputs is to use the ``store`` parameter
@@ -120,10 +123,10 @@ of the engine helpers (:py:func:`~taskflow.engines.helpers.run` or
woof woof
{'meow': 'meow', 'woof': 'woof', 'dog': 'dog'} {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
You can also directly interact with the engine storage layer to add You can also directly interact with the engine storage layer to add additional
additional values, note that if this route is used you can't use values, note that if this route is used you can't use
:py:func:`~taskflow.engines.helpers.run` in this case to run your engine (instead :py:func:`~taskflow.engines.helpers.run` in this case to run your engine
your must activate the engines run method directly): (instead your must activate the engines run method directly):
.. doctest:: .. doctest::
@@ -142,8 +145,8 @@ Outputs
As you can see from examples above, the run method returns all flow outputs in As you can see from examples above, the run method returns all flow outputs in
a ``dict``. This same data can be fetched via a ``dict``. This same data can be fetched via
:py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can :py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can
also get single results using :py:meth:`~taskflow.storage.Storage.fetch_all`. For also get single results using :py:meth:`~taskflow.storage.Storage.fetch_all`.
example: For example:
.. doctest:: .. doctest::

View File

@@ -6,13 +6,13 @@ Overview
======== ========
Jobs and jobboards are a **novel** concept that taskflow provides to allow for Jobs and jobboards are a **novel** concept that taskflow provides to allow for
automatic ownership transfer of workflows between capable automatic ownership transfer of workflows between capable owners (those owners
owners (those owners usually then use :doc:`engines <engines>` to complete the usually then use :doc:`engines <engines>` to complete the workflow). They
workflow). They provide the necessary semantics to be able to atomically provide the necessary semantics to be able to atomically transfer a job from a
transfer a job from a producer to a consumer in a reliable and fault tolerant producer to a consumer in a reliable and fault tolerant manner. They are
manner. They are modeled off the concept used to post and acquire work in the modeled off the concept used to post and acquire work in the physical world
physical world (typically a job listing in a newspaper or online website (typically a job listing in a newspaper or online website serves a similar
serves a similar role). role).
**TLDR:** It's similar to a queue, but consumers lock items on the queue when **TLDR:** It's similar to a queue, but consumers lock items on the queue when
claiming them, and only remove them from the queue when they're done with the claiming them, and only remove them from the queue when they're done with the
@@ -25,20 +25,22 @@ Definitions
=========== ===========
Jobs Jobs
A :py:class:`job <taskflow.jobs.job.Job>` consists of a unique identifier, name, A :py:class:`job <taskflow.jobs.job.Job>` consists of a unique identifier,
and a reference to a :py:class:`logbook <taskflow.persistence.logbook.LogBook>` name, and a reference to a :py:class:`logbook
which contains the details of the work that has been or should be/will be <taskflow.persistence.logbook.LogBook>` which contains the details of the
completed to finish the work that has been created for that job. work that has been or should be/will be completed to finish the work that has
been created for that job.
Jobboards Jobboards
A :py:class:`jobboard <taskflow.jobs.jobboard.JobBoard>` is responsible for managing A :py:class:`jobboard <taskflow.jobs.jobboard.JobBoard>` is responsible for
the posting, ownership, and delivery of jobs. It acts as the location where jobs managing the posting, ownership, and delivery of jobs. It acts as the
can be posted, claimed and searched for; typically by iteration or notification. location where jobs can be posted, claimed and searched for; typically by
Jobboards may be backed by different *capable* implementations (each with potentially differing iteration or notification. Jobboards may be backed by different *capable*
configuration) but all jobboards implement the same interface and semantics so implementations (each with potentially differing configuration) but all
that the backend usage is as transparent as possible. This allows deployers or jobboards implement the same interface and semantics so that the backend
developers of a service that uses TaskFlow to select a jobboard implementation usage is as transparent as possible. This allows deployers or developers of a
that fits their setup (and there intended usage) best. service that uses TaskFlow to select a jobboard implementation that fits
their setup (and there intended usage) best.
Features Features
======== ========
@@ -197,18 +199,19 @@ non-issues but for now they are worth mentioning.
Dual-engine jobs Dual-engine jobs
---------------- ----------------
**What:** Since atoms and engines are not currently `preemptable`_ we can not force **What:** Since atoms and engines are not currently `preemptable`_ we can not
a engine (or the threads/remote workers... it is using to run) to stop working on force a engine (or the threads/remote workers... it is using to run) to stop
an atom (it is general bad behavior to force code to stop without its consent anyway) if it has working on an atom (it is general bad behavior to force code to stop without
already started working on an atom (short of doing a ``kill -9`` on the running interpreter). its consent anyway) if it has already started working on an atom (short of
This could cause problems since the points an engine can notice that it no longer owns a doing a ``kill -9`` on the running interpreter). This could cause problems
claim is at any :doc:`state <states>` change that occurs (transitioning to a since the points an engine can notice that it no longer owns a claim is at any
new atom or recording a result for example), where upon noticing the claim has :doc:`state <states>` change that occurs (transitioning to a new atom or
been lost the engine can immediately stop doing further work. The effect that this recording a result for example), where upon noticing the claim has been lost
causes is that when a claim is lost another engine can immediately attempt to acquire the engine can immediately stop doing further work. The effect that this causes
the claim that was previously lost and it *could* begin working on the unfinished tasks is that when a claim is lost another engine can immediately attempt to acquire
that the later engine may also still be executing (since that engine is not yet the claim that was previously lost and it *could* begin working on the
aware that it has lost the claim). unfinished tasks that the later engine may also still be executing (since that
engine is not yet aware that it has lost the claim).
**TLDR:** not `preemptable`_, possible to become aware of losing a claim **TLDR:** not `preemptable`_, possible to become aware of losing a claim
after the fact (at the next state change), another engine could have acquired after the fact (at the next state change), another engine could have acquired
@@ -219,17 +222,18 @@ the claim by then, therefore both would be *working* on a job.
#. Ensure your atoms are `idempotent`_, this will cause an engine that may be #. Ensure your atoms are `idempotent`_, this will cause an engine that may be
executing the same atom to be able to continue executing without causing executing the same atom to be able to continue executing without causing
any conflicts/problems (idempotency guarantees this). any conflicts/problems (idempotency guarantees this).
#. On claiming jobs that have been claimed previously enforce a policy that happens #. On claiming jobs that have been claimed previously enforce a policy that
before the jobs workflow begins to execute (possibly prior to an engine beginning happens before the jobs workflow begins to execute (possibly prior to an
the jobs work) that ensures that any prior work has been rolled back before engine beginning the jobs work) that ensures that any prior work has been
continuing rolling forward. For example: rolled back before continuing rolling forward. For example:
* Rolling back the last atom/set of atoms that finished. * Rolling back the last atom/set of atoms that finished.
* Rolling back the last state change that occurred. * Rolling back the last state change that occurred.
#. Delay claiming partially completed work by adding a wait period (to allow the #. Delay claiming partially completed work by adding a wait period (to allow
previous engine to coalesce) before working on a partially completed job (combine the previous engine to coalesce) before working on a partially completed job
this with the prior suggestions and dual-engine issues should be avoided). (combine this with the prior suggestions and dual-engine issues should be
avoided).
.. _idempotent: http://en.wikipedia.org/wiki/Idempotence .. _idempotent: http://en.wikipedia.org/wiki/Idempotence
.. _preemptable: http://en.wikipedia.org/wiki/Preemption_%28computing%29 .. _preemptable: http://en.wikipedia.org/wiki/Preemption_%28computing%29

View File

@@ -21,8 +21,8 @@ To receive these notifications you should register a callback in
Each engine provides two of them: one notifies about flow state changes, Each engine provides two of them: one notifies about flow state changes,
and another notifies about changes of tasks. and another notifies about changes of tasks.
TaskFlow also has a set of predefined :ref:`listeners <listeners>`, and provides TaskFlow also has a set of predefined :ref:`listeners <listeners>`, and
means to write your own listeners, which can be more convenient than provides means to write your own listeners, which can be more convenient than
using raw callbacks. using raw callbacks.
-------------------------------------- --------------------------------------

View File

@@ -17,17 +17,18 @@ This abstraction serves the following *major* purposes:
* Tracking of what was done (introspection). * Tracking of what was done (introspection).
* Saving *memory* which allows for restarting from the last saved state * Saving *memory* which allows for restarting from the last saved state
which is a critical feature to restart and resume workflows (checkpointing). which is a critical feature to restart and resume workflows (checkpointing).
* Associating additional metadata with atoms while running (without having those * Associating additional metadata with atoms while running (without having
atoms need to save this data themselves). This makes it possible to add-on those atoms need to save this data themselves). This makes it possible to
new metadata in the future without having to change the atoms themselves. For add-on new metadata in the future without having to change the atoms
example the following can be saved: themselves. For example the following can be saved:
* Timing information (how long a task took to run). * Timing information (how long a task took to run).
* User information (who the task ran as). * User information (who the task ran as).
* When a atom/workflow was ran (and why). * When a atom/workflow was ran (and why).
* Saving historical data (failures, successes, intermediary results...) to allow * Saving historical data (failures, successes, intermediary results...)
for retry atoms to be able to decide if they should should continue vs. stop. to allow for retry atoms to be able to decide if they should should continue
vs. stop.
* *Something you create...* * *Something you create...*
.. _stevedore: http://stevedore.readthedocs.org/ .. _stevedore: http://stevedore.readthedocs.org/
@@ -35,39 +36,47 @@ This abstraction serves the following *major* purposes:
How it is used How it is used
============== ==============
On :doc:`engine <engines>` construction typically a backend (it can be optional) On :doc:`engine <engines>` construction typically a backend (it can be
will be provided which satisfies the :py:class:`~taskflow.persistence.backends.base.Backend` optional) will be provided which satisfies the
abstraction. Along with providing a backend object a :py:class:`~taskflow.persistence.logbook.FlowDetail` :py:class:`~taskflow.persistence.backends.base.Backend` abstraction. Along with
object will also be created and provided (this object will contain the details about providing a backend object a
the flow to be ran) to the engine constructor (or associated :py:meth:`load() <taskflow.engines.helpers.load>` helper functions). :py:class:`~taskflow.persistence.logbook.FlowDetail` object will also be
Typically a :py:class:`~taskflow.persistence.logbook.FlowDetail` object is created from created and provided (this object will contain the details about the flow to be
a :py:class:`~taskflow.persistence.logbook.LogBook` object (the book object ran) to the engine constructor (or associated :py:meth:`load()
acts as a type of container for :py:class:`~taskflow.persistence.logbook.FlowDetail` <taskflow.engines.helpers.load>` helper functions). Typically a
:py:class:`~taskflow.persistence.logbook.FlowDetail` object is created from a
:py:class:`~taskflow.persistence.logbook.LogBook` object (the book object acts
as a type of container for :py:class:`~taskflow.persistence.logbook.FlowDetail`
and :py:class:`~taskflow.persistence.logbook.AtomDetail` objects). and :py:class:`~taskflow.persistence.logbook.AtomDetail` objects).
**Preparation**: Once an engine starts to run it will create a :py:class:`~taskflow.storage.Storage` **Preparation**: Once an engine starts to run it will create a
object which will act as the engines interface to the underlying backend storage :py:class:`~taskflow.storage.Storage` object which will act as the engines
objects (it provides helper functions that are commonly used by the engine, interface to the underlying backend storage objects (it provides helper
avoiding repeating code when interacting with the provided :py:class:`~taskflow.persistence.logbook.FlowDetail` functions that are commonly used by the engine, avoiding repeating code when
and :py:class:`~taskflow.persistence.backends.base.Backend` objects). As an engine interacting with the provided
initializes it will extract (or create) :py:class:`~taskflow.persistence.logbook.AtomDetail` :py:class:`~taskflow.persistence.logbook.FlowDetail` and
objects for each atom in the workflow the engine will be executing. :py:class:`~taskflow.persistence.backends.base.Backend` objects). As an engine
initializes it will extract (or create)
:py:class:`~taskflow.persistence.logbook.AtomDetail` objects for each atom in
the workflow the engine will be executing.
**Execution:** When an engine beings to execute (see :doc:`engine <engines>` for more **Execution:** When an engine beings to execute (see :doc:`engine <engines>`
of the details about how an engine goes about this process) it will examine any for more of the details about how an engine goes about this process) it will
previously existing :py:class:`~taskflow.persistence.logbook.AtomDetail` objects to examine any previously existing
see if they can be used for resuming; see :doc:`resumption <resumption>` for more details :py:class:`~taskflow.persistence.logbook.AtomDetail` objects to see if they can
on this subject. For atoms which have not finished (or did not finish correctly from a be used for resuming; see :doc:`resumption <resumption>` for more details on
previous run) they will begin executing only after any dependent inputs are ready. This this subject. For atoms which have not finished (or did not finish correctly
is done by analyzing the execution graph and looking at predecessor :py:class:`~taskflow.persistence.logbook.AtomDetail` from a previous run) they will begin executing only after any dependent inputs
outputs and states (which may have been persisted in a past run). This will result are ready. This is done by analyzing the execution graph and looking at
in either using there previous information or by running those predecessors and predecessor :py:class:`~taskflow.persistence.logbook.AtomDetail` outputs and
saving their output to the :py:class:`~taskflow.persistence.logbook.FlowDetail` and states (which may have been persisted in a past run). This will result in
:py:class:`~taskflow.persistence.backends.base.Backend` objects. This execution, analysis either using there previous information or by running those predecessors and
and interaction with the storage objects continues (what is described here is saving their output to the :py:class:`~taskflow.persistence.logbook.FlowDetail`
a simplification of what really happens; which is quite a bit more complex) and :py:class:`~taskflow.persistence.backends.base.Backend` objects. This
until the engine has finished running (at which point the engine will have execution, analysis and interaction with the storage objects continues (what is
succeeded or failed in its attempt to run the workflow). described here is a simplification of what really happens; which is quite a bit
more complex) until the engine has finished running (at which point the engine
will have succeeded or failed in its attempt to run the workflow).
**Post-execution:** Typically when an engine is done running the logbook would **Post-execution:** Typically when an engine is done running the logbook would
be discarded (to avoid creating a stockpile of useless data) and the backend be discarded (to avoid creating a stockpile of useless data) and the backend
@@ -91,23 +100,24 @@ A few scenarios come to mind:
It should be emphasized that logbook is the authoritative, and, preferably, It should be emphasized that logbook is the authoritative, and, preferably,
the **only** (see :doc:`inputs and outputs <inputs_and_outputs>`) source of the **only** (see :doc:`inputs and outputs <inputs_and_outputs>`) source of
run-time state information (breaking this principle makes it hard/impossible run-time state information (breaking this principle makes it
to restart or resume in any type of automated fashion). When an atom returns hard/impossible to restart or resume in any type of automated fashion).
a result, it should be written directly to a logbook. When atom or flow state When an atom returns a result, it should be written directly to a logbook.
changes in any way, logbook is first to know (see :doc:`notifications <notifications>` When atom or flow state changes in any way, logbook is first to know (see
for how a user may also get notified of those same state changes). The logbook :doc:`notifications <notifications>` for how a user may also get notified
and a backend and associated storage helper class are responsible to store the actual data. of those same state changes). The logbook and a backend and associated
These components used together specify the persistence mechanism (how data storage helper class are responsible to store the actual data. These
is saved and where -- memory, database, whatever...) and the persistence policy components used together specify the persistence mechanism (how data is
(when data is saved -- every time it changes or at some particular moments saved and where -- memory, database, whatever...) and the persistence
or simply never). policy (when data is saved -- every time it changes or at some particular
moments or simply never).
Usage Usage
===== =====
To select which persistence backend to use you should use the To select which persistence backend to use you should use the :py:meth:`fetch()
:py:meth:`fetch() <taskflow.persistence.backends.fetch>` function which uses <taskflow.persistence.backends.fetch>` function which uses entrypoints
entrypoints (internally using `stevedore`_) to fetch and configure your backend. This makes (internally using `stevedore`_) to fetch and configure your backend. This makes
it simpler than accessing the backend data types directly and provides a common it simpler than accessing the backend data types directly and provides a common
function from which a backend can be fetched. function from which a backend can be fetched.
@@ -158,11 +168,11 @@ Sqlalchemy
**Connection**: ``'mysql'`` or ``'postgres'`` or ``'sqlite'`` **Connection**: ``'mysql'`` or ``'postgres'`` or ``'sqlite'``
Retains all data in a `ACID`_ compliant database using the `sqlalchemy`_ library Retains all data in a `ACID`_ compliant database using the `sqlalchemy`_
for schemas, connections, and database interaction functionality. Useful when library for schemas, connections, and database interaction functionality.
you need a higher level of durability than offered by the previous solutions. When Useful when you need a higher level of durability than offered by the previous
using these connection types it is possible to resume a engine from a peer machine (this solutions. When using these connection types it is possible to resume a engine
does not apply when using sqlite). from a peer machine (this does not apply when using sqlite).
.. _sqlalchemy: http://www.sqlalchemy.org/docs/ .. _sqlalchemy: http://www.sqlalchemy.org/docs/
.. _ACID: https://en.wikipedia.org/wiki/ACID .. _ACID: https://en.wikipedia.org/wiki/ACID

View File

@@ -5,34 +5,34 @@ Resumption
Overview Overview
======== ========
**Question**: *How can we persist the flow so that it can be resumed, restarted or **Question**: *How can we persist the flow so that it can be resumed, restarted
rolled-back on engine failure?* or rolled-back on engine failure?*
**Answer:** Since a flow is a set of :doc:`atoms <atoms>` and relations between atoms we **Answer:** Since a flow is a set of :doc:`atoms <atoms>` and relations between
need to create a model and corresponding information that allows us to persist atoms we need to create a model and corresponding information that allows us to
the *right* amount of information to preserve, resume, and rollback a flow on persist the *right* amount of information to preserve, resume, and rollback a
software or hardware failure. flow on software or hardware failure.
To allow for resumption taskflow must be able to re-create the flow and re-connect To allow for resumption taskflow must be able to re-create the flow and
the links between atom (and between atoms->atom details and so on) in order to re-connect the links between atom (and between atoms->atom details and so on)
revert those atoms or resume those atoms in the correct ordering. Taskflow provides in order to revert those atoms or resume those atoms in the correct ordering.
a pattern that can help in automating this process (it does **not** prohibit the user Taskflow provides a pattern that can help in automating this process (it does
from creating their own strategies for doing this). **not** prohibit the user from creating their own strategies for doing this).
Factories Factories
========= =========
The default provided way is to provide a `factory`_ function which will create (or The default provided way is to provide a `factory`_ function which will create
recreate your workflow). This function can be provided when loading (or recreate your workflow). This function can be provided when loading a flow
a flow and corresponding engine via the provided and corresponding engine via the provided :py:meth:`load_from_factory()
:py:meth:`load_from_factory() <taskflow.engines.helpers.load_from_factory>` method. This <taskflow.engines.helpers.load_from_factory>` method. This `factory`_ function
`factory`_ function is expected to be a function (or ``staticmethod``) which is reimportable (aka is expected to be a function (or ``staticmethod``) which is reimportable (aka
has a well defined name that can be located by the ``__import__`` function in python, this has a well defined name that can be located by the ``__import__`` function in
excludes ``lambda`` style functions and ``instance`` methods). The `factory`_ function python, this excludes ``lambda`` style functions and ``instance`` methods). The
name will be saved into the logbook and it will be imported and called to create the `factory`_ function name will be saved into the logbook and it will be imported
workflow objects (or recreate it if resumption happens). This allows for the flow and called to create the workflow objects (or recreate it if resumption
to be recreated if and when that is needed (even on remote machines, as long as the happens). This allows for the flow to be recreated if and when that is needed
reimportable name can be located). (even on remote machines, as long as the reimportable name can be located).
.. _factory: https://en.wikipedia.org/wiki/Factory_%28object-oriented_programming%29 .. _factory: https://en.wikipedia.org/wiki/Factory_%28object-oriented_programming%29
@@ -40,10 +40,10 @@ Names
===== =====
When a flow is created it is expected that each atom has a unique name, this When a flow is created it is expected that each atom has a unique name, this
name serves a special purpose in the resumption process (as well as serving name serves a special purpose in the resumption process (as well as serving a
a useful purpose when running, allowing for atom identification in the useful purpose when running, allowing for atom identification in the
:doc:`notification <notifications>` process). The reason for having names is that :doc:`notification <notifications>` process). The reason for having names is
an atom in a flow needs to be somehow matched with (a potentially) that an atom in a flow needs to be somehow matched with (a potentially)
existing :py:class:`~taskflow.persistence.logbook.AtomDetail` during engine existing :py:class:`~taskflow.persistence.logbook.AtomDetail` during engine
resumption & subsequent running. resumption & subsequent running.
@@ -61,27 +61,29 @@ Names provide this although they do have weaknesses:
.. note:: .. note::
Even though these weaknesses names were selected as a *good enough* solution for the above Even though these weaknesses names were selected as a *good enough*
matching requirements (until something better is invented/created that can satisfy those solution for the above matching requirements (until something better is
same requirements). invented/created that can satisfy those same requirements).
Scenarios Scenarios
========= =========
When new flow is loaded into engine, there is no persisted data When new flow is loaded into engine, there is no persisted data for it yet, so
for it yet, so a corresponding :py:class:`~taskflow.persistence.logbook.FlowDetail` object a corresponding :py:class:`~taskflow.persistence.logbook.FlowDetail` object
will be created, as well as a :py:class:`~taskflow.persistence.logbook.AtomDetail` object for will be created, as well as a
each atom that is contained in it. These will be immediately saved into the persistence backend :py:class:`~taskflow.persistence.logbook.AtomDetail` object for each atom that
that is configured. If no persistence backend is configured, then as expected nothing will be is contained in it. These will be immediately saved into the persistence
saved and the atoms and flow will be ran in a non-persistent manner. backend that is configured. If no persistence backend is configured, then as
expected nothing will be saved and the atoms and flow will be ran in a
non-persistent manner.
**Subsequent run:** When we resume the flow from a persistent backend (for example, **Subsequent run:** When we resume the flow from a persistent backend (for
if the flow was interrupted and engine destroyed to save resources or if the example, if the flow was interrupted and engine destroyed to save resources or
service was restarted), we need to re-create the flow. For that, we will call if the service was restarted), we need to re-create the flow. For that, we will
the function that was saved on first-time loading that builds the flow for call the function that was saved on first-time loading that builds the flow for
us (aka; the flow factory function described above) and the engine will run. The us (aka; the flow factory function described above) and the engine will run.
following scenarios explain some expected structural changes and how they can The following scenarios explain some expected structural changes and how they
be accommodated (and what the effect will be when resuming & running). can be accommodated (and what the effect will be when resuming & running).
Same atoms Same atoms
---------- ----------
@@ -96,61 +98,64 @@ and then the engine resumes.
Atom was added Atom was added
-------------- --------------
When the factory function mentioned above alters the flow by adding When the factory function mentioned above alters the flow by adding a new atom
a new atom in (for example for changing the runtime structure of what was previously in (for example for changing the runtime structure of what was previously ran
ran in the first run). in the first run).
**Runtime change:** By default when the engine resumes it will notice that **Runtime change:** By default when the engine resumes it will notice that a
a corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` does not corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` does not
exist and one will be created and associated. exist and one will be created and associated.
Atom was removed Atom was removed
---------------- ----------------
When the factory function mentioned above alters the flow by removing When the factory function mentioned above alters the flow by removing a new
a new atom in (for example for changing the runtime structure of what was previously atom in (for example for changing the runtime structure of what was previously
ran in the first run). ran in the first run).
**Runtime change:** Nothing should be done -- flow structure is reloaded from factory **Runtime change:** Nothing should be done -- flow structure is reloaded from
function, and removed atom is not in it -- so, flow will be ran as if it was factory function, and removed atom is not in it -- so, flow will be ran as if
not there, and any results it returned if it was completed before will be ignored. it was not there, and any results it returned if it was completed before will
be ignored.
Atom code was changed Atom code was changed
--------------------- ---------------------
When the factory function mentioned above alters the flow by deciding that a newer When the factory function mentioned above alters the flow by deciding that a
version of a previously existing atom should be ran (possibly to perform some newer version of a previously existing atom should be ran (possibly to perform
kind of upgrade or to fix a bug in a prior atoms code). some kind of upgrade or to fix a bug in a prior atoms code).
**Factory change:** The atom name & version will have to be altered. The **Factory change:** The atom name & version will have to be altered. The
factory should replace this name where it was being used previously. factory should replace this name where it was being used previously.
**Runtime change:** This will fall under the same runtime adjustments that exist **Runtime change:** This will fall under the same runtime adjustments that
when a new atom is added. In the future taskflow could make this easier by exist when a new atom is added. In the future taskflow could make this easier
providing a ``upgrade()`` function that can be used to give users the ability by providing a ``upgrade()`` function that can be used to give users the
to upgrade atoms before running (manual introspection & modification of a ability to upgrade atoms before running (manual introspection & modification of
:py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine loading a :py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine
and running to accomplish this in the meantime). loading and running to accomplish this in the meantime).
Atom was split in two atoms or merged from two (or more) to one atom Atom was split in two atoms or merged from two (or more) to one atom
-------------------------------------------------------------------- --------------------------------------------------------------------
When the factory function mentioned above alters the flow by deciding that a previously When the factory function mentioned above alters the flow by deciding that a
existing atom should be split into N atoms or the factory function decides that N atoms previously existing atom should be split into N atoms or the factory function
should be merged in <N atoms (typically occurring during refactoring). decides that N atoms should be merged in <N atoms (typically occurring during
refactoring).
**Runtime change:** This will fall under the same runtime adjustments that exist **Runtime change:** This will fall under the same runtime adjustments that
when a new atom is added or removed. In the future taskflow could make this easier by exist when a new atom is added or removed. In the future taskflow could make
providing a ``migrate()`` function that can be used to give users the ability this easier by providing a ``migrate()`` function that can be used to give
to migrate atoms previous data before running (manual introspection & modification of a users the ability to migrate atoms previous data before running (manual
:py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine loading introspection & modification of a
and running to accomplish this in the meantime). :py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine
loading and running to accomplish this in the meantime).
Flow structure was changed Flow structure was changed
-------------------------- --------------------------
If manual links were added or removed from graph, or task requirements were changed, or If manual links were added or removed from graph, or task requirements were
flow was refactored (atom moved into or out of subflows, linear flow was replaced with changed, or flow was refactored (atom moved into or out of subflows, linear
graph flow, tasks were reordered in linear flow, etc). flow was replaced with graph flow, tasks were reordered in linear flow, etc).
**Runtime change:** Nothing should be done. **Runtime change:** Nothing should be done.

View File

@@ -12,23 +12,52 @@ Flow States
**PENDING** - A flow starts its life in this state. **PENDING** - A flow starts its life in this state.
**RUNNING** - In this state flow makes a progress, executes and/or reverts its tasks. **RUNNING** - In this state flow makes a progress, executes and/or reverts its
tasks.
**SUCCESS** - Once all tasks have finished successfully the flow transitions to the SUCCESS state. **SUCCESS** - Once all tasks have finished successfully the flow transitions to
the SUCCESS state.
**REVERTED** - The flow transitions to this state when it has been reverted successfully after the failure. **REVERTED** - The flow transitions to this state when it has been reverted
successfully after the failure.
**FAILURE** - The flow transitions to this state when it can not be reverted after the failure. **FAILURE** - The flow transitions to this state when it can not be reverted
after the failure.
**SUSPENDING** - In the RUNNING state the flow can be suspended. When this happens, flow transitions to the SUSPENDING state immediately. In that state the engine running the flow waits for running tasks to finish (since the engine can not preempt tasks that are active). **SUSPENDING** - In the RUNNING state the flow can be suspended. When this
happens, flow transitions to the SUSPENDING state immediately. In that state
the engine running the flow waits for running tasks to finish (since the engine
can not preempt tasks that are active).
**SUSPENDED** - When no tasks are running and all results received so far are saved, the flow transitions from the SUSPENDING state to SUSPENDED. Also it may go to the SUCCESS state if all tasks were in fact ran, or to the REVERTED state if the flow was reverting and all tasks were reverted while the engine was waiting for running tasks to finish, or to the FAILURE state if tasks were run or reverted and some of them failed. **SUSPENDED** - When no tasks are running and all results received so far are
saved, the flow transitions from the SUSPENDING state to SUSPENDED. Also it may
go to the SUCCESS state if all tasks were in fact ran, or to the REVERTED state
if the flow was reverting and all tasks were reverted while the engine was
waiting for running tasks to finish, or to the FAILURE state if tasks were run
or reverted and some of them failed.
**RESUMING** - When the flow is interrupted 'in a hard way' (e.g. server crashed), it can be loaded from storage in any state. If the state is not PENDING (aka, the flow was never ran) or SUCCESS, FAILURE or REVERTED (in which case the flow has already finished), the flow gets set to the RESUMING state for the short time period while it is being loaded from backend storage [a database, a filesystem...] (this transition is not shown on the diagram). When the flow is finally loaded, it goes to the SUSPENDED state. **RESUMING** - When the flow is interrupted 'in a hard way' (e.g. server
crashed), it can be loaded from storage in any state. If the state is not
PENDING (aka, the flow was never ran) or SUCCESS, FAILURE or REVERTED (in which
case the flow has already finished), the flow gets set to the RESUMING state
for the short time period while it is being loaded from backend storage [a
database, a filesystem...] (this transition is not shown on the diagram). When
the flow is finally loaded, it goes to the SUSPENDED state.
From the SUCCESS, FAILURE or REVERTED states the flow can be ran again (and thus it goes back into the RUNNING state). One of the possible use cases for this transition is to allow for alteration of a flow or flow details associated with a previously ran flow after the flow has finished, and client code wants to ensure that each task from this new (potentially updated) flow has its chance to run. From the SUCCESS, FAILURE or REVERTED states the flow can be ran again (and
thus it goes back into the RUNNING state). One of the possible use cases for
this transition is to allow for alteration of a flow or flow details associated
with a previously ran flow after the flow has finished, and client code wants
to ensure that each task from this new (potentially updated) flow has its
chance to run.
Note: The current code also contains strong checks during each flow state transition using the model described above and raises the InvalidState exception if an invalid transition is attempted. This exception being triggered usually means there is some kind of bug in the engine code or some type of misuse/state violation is occurring, and should be reported as such. .. note::
The current code also contains strong checks during each flow state
transition using the model described above and raises the InvalidState
exception if an invalid transition is attempted. This exception being
triggered usually means there is some kind of bug in the engine code or some
type of misuse/state violation is occurring, and should be reported as such.
Task States Task States
@@ -39,17 +68,25 @@ Task States
:align: right :align: right
:alt: Task state transitions :alt: Task state transitions
**PENDING** - When a task is added to a flow, it starts in the PENDING state, which means it can be executed immediately or waits for all of task it depends on to complete. **PENDING** - When a task is added to a flow, it starts in the PENDING state,
The task transitions to the PENDING state after it was reverted and its flow was restarted or retried. which means it can be executed immediately or waits for all of task it depends
on to complete. The task transitions to the PENDING state after it was
reverted and its flow was restarted or retried.
**RUNNING** - When flow starts to execute the task, it transitions to the RUNNING state, and stays in this state until its execute() method returns. **RUNNING** - When flow starts to execute the task, it transitions to the
RUNNING state, and stays in this state until its execute() method returns.
**SUCCESS** - The task transitions to this state after it was finished successfully. **SUCCESS** - The task transitions to this state after it was finished
successfully.
**FAILURE** - The task transitions to this state after it was finished with error. When the flow containing this task is being reverted, all its tasks are walked in particular order. **FAILURE** - The task transitions to this state after it was finished with
error. When the flow containing this task is being reverted, all its tasks are
walked in particular order.
**REVERTING** - The task transitions to this state when the flow starts to revert it and its revert() method is called. Only tasks in the SUCCESS or FAILURE state can be reverted. **REVERTING** - The task transitions to this state when the flow starts to
If this method fails (raises exception), task goes to the FAILURE state. revert it and its revert() method is called. Only tasks in the SUCCESS or
FAILURE state can be reverted. If this method fails (raises exception), task
goes to the FAILURE state.
**REVERTED** - The task that has been reverted appears it this state. **REVERTED** - The task that has been reverted appears it this state.
@@ -64,20 +101,30 @@ Retry States
Retry has the same states as a task and one additional state. Retry has the same states as a task and one additional state.
**PENDING** - When a retry is added to a flow, it starts in the PENDING state, which means it can be executed immediately or waits for all of task it depends on to complete. **PENDING** - When a retry is added to a flow, it starts in the PENDING state,
The retry transitions to the PENDING state after it was reverted and its flow was restarted or retried. which means it can be executed immediately or waits for all of task it depends
on to complete. The retry transitions to the PENDING state after it was
reverted and its flow was restarted or retried.
**RUNNING** - When flow starts to execute the retry, it transitions to the RUNNING state, and stays in this state until its execute() method returns. **RUNNING** - When flow starts to execute the retry, it transitions to the
RUNNING state, and stays in this state until its execute() method returns.
**SUCCESS** - The retry transitions to this state after it was finished successfully. **SUCCESS** - The retry transitions to this state after it was finished
successfully.
**FAILURE** - The retry transitions to this state after it was finished with error. When the flow containing this retry is being reverted, all its tasks are walked in particular order. **FAILURE** - The retry transitions to this state after it was finished with
error. When the flow containing this retry is being reverted, all its tasks are
walked in particular order.
**REVERTING** - The retry transitions to this state when the flow starts to revert it and its revert() method is called. Only retries in SUCCESS or FAILURE state can be reverted. **REVERTING** - The retry transitions to this state when the flow starts to
If this method fails (raises exception), task goes to the FAILURE. revert it and its revert() method is called. Only retries in SUCCESS or FAILURE
state can be reverted. If this method fails (raises exception), task goes to
the FAILURE.
**REVERTED** - The retry that has been reverted appears it this state. **REVERTED** - The retry that has been reverted appears it this state.
**RETRYING** - If flow that is managed by the current retry was failed and reverted, the retry prepares it for the next run and transitions to the RETRYING state. **RETRYING** - If flow that is managed by the current retry was failed and
reverted, the retry prepares it for the next run and transitions to the
RETRYING state.

View File

@@ -7,8 +7,8 @@ Overview
This is engine that schedules tasks to **workers** -- separate processes This is engine that schedules tasks to **workers** -- separate processes
dedicated for certain atoms execution, possibly running on other machines, dedicated for certain atoms execution, possibly running on other machines,
connected via `amqp`_ (or other supported connected via `amqp`_ (or other supported `kombu
`kombu <http://kombu.readthedocs.org/>`_ transports). <http://kombu.readthedocs.org/>`_ transports).
.. note:: .. note::
@@ -43,8 +43,9 @@ Worker
configured to run in as many threads (green or not) as desired. configured to run in as many threads (green or not) as desired.
Proxy Proxy
Executors interact with workers via a proxy. The proxy maintains the underlying Executors interact with workers via a proxy. The proxy maintains the
transport and publishes messages (and invokes callbacks on message reception). underlying transport and publishes messages (and invokes callbacks on message
reception).
Requirements Requirements
------------ ------------
@@ -122,12 +123,13 @@ engine executor in the following manner:
1. The executor initiates task execution/reversion using a proxy object. 1. The executor initiates task execution/reversion using a proxy object.
2. :py:class:`~taskflow.engines.worker_based.proxy.Proxy` publishes task 2. :py:class:`~taskflow.engines.worker_based.proxy.Proxy` publishes task
request (format is described below) into a named exchange using a routing request (format is described below) into a named exchange using a routing
key that is used to deliver request to particular workers topic. The executor key that is used to deliver request to particular workers topic. The
then waits for the task requests to be accepted and confirmed by workers. If executor then waits for the task requests to be accepted and confirmed by
the executor doesn't get a task confirmation from workers within the given workers. If the executor doesn't get a task confirmation from workers within
timeout the task is considered as timed-out and a timeout exception is the given timeout the task is considered as timed-out and a timeout
raised. exception is raised.
3. A worker receives a request message and starts a new thread for processing it. 3. A worker receives a request message and starts a new thread for processing
it.
1. The worker dispatches the request (gets desired endpoint that actually 1. The worker dispatches the request (gets desired endpoint that actually
executes the task). executes the task).
@@ -141,8 +143,8 @@ engine executor in the following manner:
handled by the engine, dispatching to listeners and so-on). handled by the engine, dispatching to listeners and so-on).
4. The executor gets the task request confirmation from the worker and the task 4. The executor gets the task request confirmation from the worker and the task
request state changes from the ``PENDING`` to the ``RUNNING`` state. Once request state changes from the ``PENDING`` to the ``RUNNING`` state. Once a
a task request is in the ``RUNNING`` state it can't be timed-out (considering task request is in the ``RUNNING`` state it can't be timed-out (considering
that task execution process may take unpredictable time). that task execution process may take unpredictable time).
5. The executor gets the task execution result from the worker and passes it 5. The executor gets the task execution result from the worker and passes it
back to the executor and worker-based engine to finish task processing (this back to the executor and worker-based engine to finish task processing (this
@@ -303,7 +305,8 @@ Additional supported keyword arguments:
* ``executor``: a class that provides a * ``executor``: a class that provides a
:py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor` :py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor`
interface; it will be used for executing, reverting and waiting for remote tasks. interface; it will be used for executing, reverting and waiting for remote
tasks.
Limitations Limitations
=========== ===========

114
tools/check_doc.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Ivan Melnikov <iv at altlinux dot org>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Check documentation for simple style requirements.
What is checked:
- lines should not be longer than 79 characters
- exception: line with no whitespace except maybe in the beginning
- exception: line that starts with '..' -- longer directives are allowed,
including footnotes
- no tabulation for indentation
- no trailing whitespace
"""
import fnmatch
import os
import re
import sys
FILE_PATTERNS = ['*.rst', '*.txt']
MAX_LINE_LENGTH = 79
TRAILING_WHITESPACE_REGEX = re.compile('\s$')
STARTING_WHITESPACE_REGEX = re.compile('^(\s+)')
def check_max_length(line):
if len(line) > MAX_LINE_LENGTH:
stripped = line.strip()
if not any((
line.startswith('..'), # this is directive
stripped.startswith('>>>'), # this is doctest
stripped.startswith('...'), # and this
stripped.startswith('taskflow.'),
' ' not in stripped # line can't be split
)):
yield ('D001', 'Line too long')
def check_trailing_whitespace(line):
if TRAILING_WHITESPACE_REGEX.search(line):
yield ('D002', 'Trailing whitespace')
def check_indentation_no_tab(line):
match = STARTING_WHITESPACE_REGEX.search(line)
if match:
spaces = match.group(1)
if '\t' in spaces:
yield ('D003', 'Tabulation used for indentation')
LINE_CHECKS = (check_max_length,
check_trailing_whitespace,
check_indentation_no_tab)
def check_lines(lines):
for idx, line in enumerate(lines, 1):
line = line.rstrip('\n')
for check in LINE_CHECKS:
for code, message in check(line):
yield idx, code, message
def check_files(filenames):
for fn in filenames:
with open(fn) as f:
for line_num, code, message in check_lines(f):
yield fn, line_num, code, message
def find_files(pathes, patterns):
for path in pathes:
if os.path.isfile(path):
yield path
elif os.path.isdir(path):
for root, dirnames, filenames in os.walk(path):
for filename in filenames:
if any(fnmatch.fnmatch(filename, pattern)
for pattern in patterns):
yield os.path.join(root, filename)
else:
print('Invalid path: %s' % path)
def main():
ok = True
if len(sys.argv) > 1:
dirs = sys.argv[1:]
else:
dirs = ['.']
for error in check_files(find_files(dirs, FILE_PATTERNS)):
ok = False
print('%s:%s: %s %s' % error)
sys.exit(0 if ok else 1)
if __name__ == '__main__':
main()

View File

@@ -54,6 +54,7 @@ deps = -r{toxinidir}/requirements.txt
commands = commands =
python setup.py testr --slowest --testr-args='{posargs}' python setup.py testr --slowest --testr-args='{posargs}'
sphinx-build -b doctest doc/source doc/build sphinx-build -b doctest doc/source doc/build
python tools/check_doc.py doc/source
[testenv:py33] [testenv:py33]
deps = {[testenv]deps} deps = {[testenv]deps}

View File

@@ -81,6 +81,7 @@ deps = -r{toxinidir}/requirements.txt
commands = commands =
python setup.py testr --slowest --testr-args='{posargs}' python setup.py testr --slowest --testr-args='{posargs}'
sphinx-build -b doctest doc/source doc/build sphinx-build -b doctest doc/source doc/build
python tools/check_doc.py doc/source
[testenv:py33] [testenv:py33]
deps = {[testenv]deps} deps = {[testenv]deps}