Documentation cleanups and tweaks
Apply some adjustments on the docs by rewording certain statements, linking to the correct classes and methods and linking to the correct exceptions to make it easier for users to follow the docs and there associated types and links. Change-Id: I03aac77c814fc4c376003f09a45559a0995b3c6c
This commit is contained in:
@@ -1,32 +1,35 @@
|
|||||||
==========================
|
=====================
|
||||||
Atom Arguments and Results
|
Arguments and results
|
||||||
==========================
|
=====================
|
||||||
|
|
||||||
.. |task.execute| replace:: :py:meth:`~taskflow.task.BaseTask.execute`
|
.. |task.execute| replace:: :py:meth:`~taskflow.task.BaseTask.execute`
|
||||||
.. |task.revert| replace:: :py:meth:`~taskflow.task.BaseTask.revert`
|
.. |task.revert| replace:: :py:meth:`~taskflow.task.BaseTask.revert`
|
||||||
.. |retry.execute| replace:: :py:meth:`~taskflow.retry.Retry.execute`
|
.. |retry.execute| replace:: :py:meth:`~taskflow.retry.Retry.execute`
|
||||||
.. |retry.revert| replace:: :py:meth:`~taskflow.retry.Retry.revert`
|
.. |retry.revert| replace:: :py:meth:`~taskflow.retry.Retry.revert`
|
||||||
|
.. |Retry| replace:: :py:class:`~taskflow.retry.Retry`
|
||||||
|
.. |Task| replace:: :py:class:`Task <taskflow.task.BaseTask>`
|
||||||
|
|
||||||
In TaskFlow, all flow and task state goes to (potentially persistent) storage.
|
In TaskFlow, all flow and task state goes to (potentially persistent) storage
|
||||||
That includes all the information that :doc:`atoms <atoms>` (e.g. tasks) in the
|
(see :doc:`persistence <persistence>` for more details). That includes all the
|
||||||
flow need when they are executed, and all the information task produces (via
|
information that :doc:`atoms <atoms>` (e.g. tasks, retry objects...) in the
|
||||||
serializable task results). A developer who implements tasks or flows can
|
workflow need when they are executed, and all the information task/retry
|
||||||
specify what arguments a task accepts and what result it returns in several
|
produces (via serializable results). A developer who implements tasks/retries
|
||||||
ways. This document will help you understand what those ways are and how to use
|
or flows can specify what arguments a task/retry accepts and what result it
|
||||||
those ways to accomplish your desired usage pattern.
|
returns in several ways. This document will help you understand what those ways
|
||||||
|
are and how to use those ways to accomplish your desired usage pattern.
|
||||||
|
|
||||||
.. glossary::
|
.. glossary::
|
||||||
|
|
||||||
Task arguments
|
Task/retry arguments
|
||||||
Set of names of task arguments available as the ``requires``
|
Set of names of task/retry arguments available as the ``requires``
|
||||||
property of the task instance. When a task is about to be executed
|
property of the task/retry instance. When a task or retry object is
|
||||||
values with these names are retrieved from storage and passed to
|
about to be executed values with these names are retrieved from storage
|
||||||
|task.execute| method of the task.
|
and passed to the ``execute`` method of the task/retry.
|
||||||
|
|
||||||
Task results
|
Task/retry results
|
||||||
Set of names of task results (what task provides) available as
|
Set of names of task/retry results (what task/retry provides) available
|
||||||
``provides`` property of task instance. After a task finishes
|
as ``provides`` property of task or retry instance. After a task/retry
|
||||||
successfully, its result(s) (what the task |task.execute| method
|
finishes successfully, its result(s) (what the ``execute`` method
|
||||||
returns) are available by these names from storage (see examples
|
returns) are available by these names from storage (see examples
|
||||||
below).
|
below).
|
||||||
|
|
||||||
@@ -44,8 +47,8 @@ There are different ways to specify the task argument ``requires`` set.
|
|||||||
Arguments inference
|
Arguments inference
|
||||||
-------------------
|
-------------------
|
||||||
|
|
||||||
Task arguments can be inferred from arguments of the |task.execute| method of
|
Task/retry arguments can be inferred from arguments of the |task.execute|
|
||||||
the task.
|
method of a task (or the |retry.execute| of a retry object).
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|
||||||
@@ -56,10 +59,10 @@ the task.
|
|||||||
>>> sorted(MyTask().requires)
|
>>> sorted(MyTask().requires)
|
||||||
['eggs', 'spam']
|
['eggs', 'spam']
|
||||||
|
|
||||||
Inference from the method signature is the ''simplest'' way to specify task
|
Inference from the method signature is the ''simplest'' way to specify
|
||||||
arguments. Optional arguments (with default values), and special arguments like
|
arguments. Optional arguments (with default values), and special arguments like
|
||||||
``self``, ``*args`` and ``**kwargs`` are ignored on inference (as these names
|
``self``, ``*args`` and ``**kwargs`` are ignored during inference (as these
|
||||||
have special meaning/usage in python).
|
names have special meaning/usage in python).
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|
||||||
@@ -83,14 +86,14 @@ have special meaning/usage in python).
|
|||||||
Rebinding
|
Rebinding
|
||||||
---------
|
---------
|
||||||
|
|
||||||
**Why:** There are cases when the value you want to pass to a task is stored
|
**Why:** There are cases when the value you want to pass to a task/retry is
|
||||||
with a name other then the corresponding task arguments name. That's when the
|
stored with a name other then the corresponding arguments name. That's when the
|
||||||
``rebind`` task constructor parameter comes in handy. Using it the flow author
|
``rebind`` constructor parameter comes in handy. Using it the flow author
|
||||||
can instruct the engine to fetch a value from storage by one name, but pass it
|
can instruct the engine to fetch a value from storage by one name, but pass it
|
||||||
to a tasks |task.execute| method with another name. There are two possible ways
|
to a tasks/retrys ``execute`` method with another name. There are two possible
|
||||||
of accomplishing this.
|
ways of accomplishing this.
|
||||||
|
|
||||||
The first is to pass a dictionary that maps the task argument name to the name
|
The first is to pass a dictionary that maps the argument name to the name
|
||||||
of a saved value.
|
of a saved value.
|
||||||
|
|
||||||
For example, if you have task::
|
For example, if you have task::
|
||||||
@@ -100,24 +103,25 @@ For example, if you have task::
|
|||||||
def execute(self, vm_name, vm_image_id, **kwargs):
|
def execute(self, vm_name, vm_image_id, **kwargs):
|
||||||
pass # TODO(imelnikov): use parameters to spawn vm
|
pass # TODO(imelnikov): use parameters to spawn vm
|
||||||
|
|
||||||
and you saved 'vm_name' with 'name' key in storage, you can spawn a vm with
|
and you saved ``'vm_name'`` with ``'name'`` key in storage, you can spawn a vm
|
||||||
such 'name' like this::
|
with such ``'name'`` like this::
|
||||||
|
|
||||||
SpawnVMTask(rebind={'vm_name': 'name'})
|
SpawnVMTask(rebind={'vm_name': 'name'})
|
||||||
|
|
||||||
The second way is to pass a tuple/list/dict of argument names. The length of
|
The second way is to pass a tuple/list/dict of argument names. The length of
|
||||||
the tuple/list/dict should not be less then number of task required parameters.
|
the tuple/list/dict should not be less then number of required parameters.
|
||||||
|
|
||||||
For example, you can achieve the same effect as the previous example with::
|
For example, you can achieve the same effect as the previous example with::
|
||||||
|
|
||||||
SpawnVMTask(rebind_args=('name', 'vm_image_id'))
|
SpawnVMTask(rebind_args=('name', 'vm_image_id'))
|
||||||
|
|
||||||
which is equivalent to a more elaborate::
|
This is equivalent to a more elaborate::
|
||||||
|
|
||||||
SpawnVMTask(rebind=dict(vm_name='name',
|
SpawnVMTask(rebind=dict(vm_name='name',
|
||||||
vm_image_id='vm_image_id'))
|
vm_image_id='vm_image_id'))
|
||||||
|
|
||||||
In both cases, if your task accepts arbitrary arguments with ``**kwargs``
|
In both cases, if your task (or retry) accepts arbitrary arguments
|
||||||
construct, you can specify extra arguments.
|
with the ``**kwargs`` construct, you can specify extra arguments.
|
||||||
|
|
||||||
::
|
::
|
||||||
|
|
||||||
@@ -158,7 +162,8 @@ arguments) will appear in the ``kwargs`` of the |task.execute| method.
|
|||||||
|
|
||||||
When constructing a task instance the flow author can also add more
|
When constructing a task instance the flow author can also add more
|
||||||
requirements if desired. Those manual requirements (if they are not functional
|
requirements if desired. Those manual requirements (if they are not functional
|
||||||
arguments) will appear in the ``**kwargs`` the |task.execute| method.
|
arguments) will appear in the ``kwargs`` parameter of the |task.execute|
|
||||||
|
method.
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|
||||||
@@ -189,12 +194,13 @@ avoid invalid argument mappings.
|
|||||||
Results specification
|
Results specification
|
||||||
=====================
|
=====================
|
||||||
|
|
||||||
In python, function results are not named, so we can not infer what a task
|
In python, function results are not named, so we can not infer what a
|
||||||
returns. This is important since the complete task result (what the
|
task/retry returns. This is important since the complete result (what the
|
||||||
|task.execute| method returns) is saved in (potentially persistent) storage,
|
task |task.execute| or retry |retry.execute| method returns) is saved
|
||||||
and it is typically (but not always) desirable to make those results accessible
|
in (potentially persistent) storage, and it is typically (but not always)
|
||||||
to other tasks. To accomplish this the task specifies names of those values via
|
desirable to make those results accessible to others. To accomplish this
|
||||||
its ``provides`` task constructor parameter or other method (see below).
|
the task/retry specifies names of those values via its ``provides`` constructor
|
||||||
|
parameter or by its default provides attribute.
|
||||||
|
|
||||||
Returning one value
|
Returning one value
|
||||||
-------------------
|
-------------------
|
||||||
@@ -242,14 +248,14 @@ tasks) will be able to get those elements from storage by name:
|
|||||||
|
|
||||||
Provides argument can be shorter then the actual tuple returned by a task --
|
Provides argument can be shorter then the actual tuple returned by a task --
|
||||||
then extra values are ignored (but, as expected, **all** those values are saved
|
then extra values are ignored (but, as expected, **all** those values are saved
|
||||||
and passed to the |task.revert| method).
|
and passed to the task |task.revert| or retry |retry.revert| method).
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
Provides arguments tuple can also be longer then the actual tuple returned
|
Provides arguments tuple can also be longer then the actual tuple returned
|
||||||
by task -- when this happens the extra parameters are left undefined: a
|
by task -- when this happens the extra parameters are left undefined: a
|
||||||
warning is printed to logs and if use of such parameter is attempted a
|
warning is printed to logs and if use of such parameter is attempted a
|
||||||
``NotFound`` exception is raised.
|
:py:class:`~taskflow.exceptions.NotFound` exception is raised.
|
||||||
|
|
||||||
Returning a dictionary
|
Returning a dictionary
|
||||||
----------------------
|
----------------------
|
||||||
@@ -290,16 +296,17 @@ will be able to get elements from storage by name:
|
|||||||
and passed to the |task.revert| method). If the provides argument has some
|
and passed to the |task.revert| method). If the provides argument has some
|
||||||
items not present in the actual dict returned by the task -- then extra
|
items not present in the actual dict returned by the task -- then extra
|
||||||
parameters are left undefined: a warning is printed to logs and if use of
|
parameters are left undefined: a warning is printed to logs and if use of
|
||||||
such parameter is attempted a ``NotFound`` exception is raised.
|
such parameter is attempted a :py:class:`~taskflow.exceptions.NotFound`
|
||||||
|
exception is raised.
|
||||||
|
|
||||||
Default provides
|
Default provides
|
||||||
----------------
|
----------------
|
||||||
|
|
||||||
As mentioned above, the default task base class provides nothing, which means
|
As mentioned above, the default base class provides nothing, which means
|
||||||
task results are not accessible to other tasks in the flow.
|
results are not accessible to other tasks/retrys in the flow.
|
||||||
|
|
||||||
The task author can override this and specify default value for provides using
|
The author can override this and specify default value for provides using
|
||||||
``default_provides`` class variable:
|
the ``default_provides`` class/instance variable:
|
||||||
|
|
||||||
::
|
::
|
||||||
|
|
||||||
@@ -314,8 +321,8 @@ Of course, the flow author can override this to change names if needed:
|
|||||||
|
|
||||||
BitsAndPiecesTask(provides=('b', 'p'))
|
BitsAndPiecesTask(provides=('b', 'p'))
|
||||||
|
|
||||||
or to change structure -- e.g. this instance will make whole tuple accessible
|
or to change structure -- e.g. this instance will make tuple accessible
|
||||||
to other tasks by name 'bnp':
|
to other tasks by name ``'bnp'``:
|
||||||
|
|
||||||
::
|
::
|
||||||
|
|
||||||
@@ -331,26 +338,27 @@ the task from other tasks in the flow (e.g. to avoid naming conflicts):
|
|||||||
Revert arguments
|
Revert arguments
|
||||||
================
|
================
|
||||||
|
|
||||||
To revert a task engine calls its |task.revert| method. This method
|
To revert a task the :doc:`engine <engines>` calls the tasks
|
||||||
should accept same arguments as |task.execute| method of the task and one
|
|task.revert| method. This method should accept the same arguments
|
||||||
more special keyword argument, named ``result``.
|
as the |task.execute| method of the task and one more special keyword
|
||||||
|
argument, named ``result``.
|
||||||
|
|
||||||
For ``result`` value, two cases are possible:
|
For ``result`` value, two cases are possible:
|
||||||
|
|
||||||
* if task is being reverted because it failed (an exception was raised from its
|
* If the task is being reverted because it failed (an exception was raised
|
||||||
|task.execute| method), ``result`` value is instance of
|
from its |task.execute| method), the ``result`` value is an instance of a
|
||||||
:py:class:`taskflow.utils.misc.Failure` object that holds exception
|
:py:class:`~taskflow.utils.misc.Failure` object that holds the exception
|
||||||
information;
|
information.
|
||||||
|
|
||||||
* if task is being reverted because some other task failed, and this task
|
* If the task is being reverted because some other task failed, and this task
|
||||||
finished successfully, ``result`` value is task result fetched from storage:
|
finished successfully, ``result`` value is the result fetched from storage:
|
||||||
basically, that's what |task.execute| method returned.
|
ie, what the |task.execute| method returned.
|
||||||
|
|
||||||
All other arguments are fetched from storage in the same way it is done for
|
All other arguments are fetched from storage in the same way it is done for
|
||||||
|task.execute| method.
|
|task.execute| method.
|
||||||
|
|
||||||
To determine if task failed you can check whether ``result`` is instance of
|
To determine if a task failed you can check whether ``result`` is instance of
|
||||||
:py:class:`taskflow.utils.misc.Failure`::
|
:py:class:`~taskflow.utils.misc.Failure`::
|
||||||
|
|
||||||
from taskflow.utils import misc
|
from taskflow.utils import misc
|
||||||
|
|
||||||
@@ -366,20 +374,21 @@ To determine if task failed you can check whether ``result`` is instance of
|
|||||||
else:
|
else:
|
||||||
print("do_something returned %r" % result)
|
print("do_something returned %r" % result)
|
||||||
|
|
||||||
If this task failed (``do_something`` raised exception) it will print ``"This
|
If this task failed (ie ``do_something`` raised an exception) it will print
|
||||||
task failed, exception:"`` and exception message on revert. If this task
|
``"This task failed, exception:"`` and a exception message on revert. If this
|
||||||
finished successfully, it will print ``"do_something returned"`` and
|
task finished successfully, it will print ``"do_something returned"`` and a
|
||||||
representation of result.
|
representation of the ``do_something`` result.
|
||||||
|
|
||||||
Retry arguments
|
Retry arguments
|
||||||
===============
|
===============
|
||||||
|
|
||||||
A Retry controller works with arguments in the same way as a Task. But it has
|
A |Retry| controller works with arguments in the same way as a |Task|. But
|
||||||
an additional parameter 'history' that is a list of tuples. Each tuple contains
|
it has an additional parameter ``'history'`` that is a list of tuples. Each
|
||||||
a result of the previous Retry run and a table where a key is a failed task and
|
tuple contains a result of the previous retry run and a table where the key
|
||||||
a value is a :py:class:`taskflow.utils.misc.Failure`.
|
is a failed task and the value is a
|
||||||
|
:py:class:`~taskflow.utils.misc.Failure` object.
|
||||||
|
|
||||||
Consider the following Retry::
|
Consider the following implementation::
|
||||||
|
|
||||||
class MyRetry(retry.Retry):
|
class MyRetry(retry.Retry):
|
||||||
|
|
||||||
@@ -396,19 +405,24 @@ Consider the following Retry::
|
|||||||
def revert(self, history, *args, **kwargs):
|
def revert(self, history, *args, **kwargs):
|
||||||
print history
|
print history
|
||||||
|
|
||||||
Imagine the following Retry had returned a value '5' and then some task 'A'
|
Imagine the above retry had returned a value ``'5'`` and then some task ``'A'``
|
||||||
failed with some exception. In this case ``on_failure`` method will receive
|
failed with some exception. In this case the above retrys ``on_failure``
|
||||||
the following history::
|
method will receive the following history::
|
||||||
|
|
||||||
[('5', {'A': misc.Failure()})]
|
[('5', {'A': misc.Failure()})]
|
||||||
|
|
||||||
Then the |retry.execute| method will be called again and it'll receive the same
|
At this point (since the implementation returned ``RETRY``) the
|
||||||
history.
|
|retry.execute| method will be called again and it will receive the same
|
||||||
|
history and it can then return a value that subseqent tasks can use to alter
|
||||||
|
there behavior.
|
||||||
|
|
||||||
If the |retry.execute| method raises an exception, the |retry.revert| method of
|
If instead the |retry.execute| method raises an exception, the |retry.revert|
|
||||||
Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be
|
method of the implementation will be called and
|
||||||
present in the history instead of Retry result::
|
a :py:class:`~taskflow.utils.misc.Failure` object will be present in the
|
||||||
|
history instead of the typical result::
|
||||||
|
|
||||||
[('5', {'A': misc.Failure()}), (misc.Failure(), {})]
|
[('5', {'A': misc.Failure()}), (misc.Failure(), {})]
|
||||||
|
|
||||||
After the Retry has been reverted, the Retry history will be cleaned.
|
.. note::
|
||||||
|
|
||||||
|
After a |Retry| has been reverted, the objects history will be cleaned.
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
------------------------
|
------------------------
|
||||||
Atoms, Tasks and Retries
|
Atoms, tasks and retries
|
||||||
------------------------
|
------------------------
|
||||||
|
|
||||||
Atom
|
Atom
|
||||||
|
|||||||
@@ -13,23 +13,23 @@ and uses it to decide which :doc:`atom <atoms>` to run and when.
|
|||||||
TaskFlow provides different implementations of engines. Some may be easier to
|
TaskFlow provides different implementations of engines. Some may be easier to
|
||||||
use (ie, require no additional infrastructure setup) and understand; others
|
use (ie, require no additional infrastructure setup) and understand; others
|
||||||
might require more complicated setup but provide better scalability. The idea
|
might require more complicated setup but provide better scalability. The idea
|
||||||
and *ideal* is that deployers or developers of a service that uses TaskFlow can
|
and *ideal* is that deployers or developers of a service that use TaskFlow can
|
||||||
select an engine that suites their setup best without modifying the code of
|
select an engine that suites their setup best without modifying the code of
|
||||||
said service.
|
said service.
|
||||||
|
|
||||||
Engines usually have different capabilities and configuration, but all of them
|
Engines usually have different capabilities and configuration, but all of them
|
||||||
**must** implement the same interface and preserve the semantics of patterns
|
**must** implement the same interface and preserve the semantics of patterns
|
||||||
(e.g. parts of :py:class:`linear flow <taskflow.patterns.linear_flow.Flow>`
|
(e.g. parts of a :py:class:`.linear_flow.Flow`
|
||||||
are run one after another, in order, even if engine is *capable* of running
|
are run one after another, in order, even if the selected engine is *capable*
|
||||||
tasks in parallel).
|
of running tasks in parallel).
|
||||||
|
|
||||||
Why they exist
|
Why they exist
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
An engine being the core component which actually makes your flows progress is
|
An engine being *the* core component which actually makes your flows progress
|
||||||
likely a new concept for many programmers so let's describe how it operates in
|
is likely a new concept for many programmers so let's describe how it operates
|
||||||
more depth and some of the reasoning behind why it exists. This will hopefully
|
in more depth and some of the reasoning behind why it exists. This will
|
||||||
make it more clear on there value add to the TaskFlow library user.
|
hopefully make it more clear on there value add to the TaskFlow library user.
|
||||||
|
|
||||||
First though let us discuss something most are familiar already with; the
|
First though let us discuss something most are familiar already with; the
|
||||||
difference between `declarative`_ and `imperative`_ programming models. The
|
difference between `declarative`_ and `imperative`_ programming models. The
|
||||||
@@ -48,15 +48,15 @@ more of a *pure* function that executes, reverts and may require inputs and
|
|||||||
provide outputs). This is where engines get involved; they do the execution of
|
provide outputs). This is where engines get involved; they do the execution of
|
||||||
the *what* defined via :doc:`atoms <atoms>`, tasks, flows and the relationships
|
the *what* defined via :doc:`atoms <atoms>`, tasks, flows and the relationships
|
||||||
defined there-in and execute these in a well-defined manner (and the engine is
|
defined there-in and execute these in a well-defined manner (and the engine is
|
||||||
responsible for *most* of the state manipulation instead).
|
responsible for any state manipulation instead).
|
||||||
|
|
||||||
This mix of imperative and declarative (with a stronger emphasis on the
|
This mix of imperative and declarative (with a stronger emphasis on the
|
||||||
declarative model) allows for the following functionality to be possible:
|
declarative model) allows for the following functionality to become possible:
|
||||||
|
|
||||||
* Enhancing reliability: Decoupling of state alterations from what should be
|
* Enhancing reliability: Decoupling of state alterations from what should be
|
||||||
accomplished allows for a *natural* way of resuming by allowing the engine to
|
accomplished allows for a *natural* way of resuming by allowing the engine to
|
||||||
track the current state and know at which point a flow is in and how to get
|
track the current state and know at which point a workflow is in and how to
|
||||||
back into that state when resumption occurs.
|
get back into that state when resumption occurs.
|
||||||
* Enhancing scalability: When a engine is responsible for executing your
|
* Enhancing scalability: When a engine is responsible for executing your
|
||||||
desired work it becomes possible to alter the *how* in the future by creating
|
desired work it becomes possible to alter the *how* in the future by creating
|
||||||
new types of execution backends (for example the worker model which does not
|
new types of execution backends (for example the worker model which does not
|
||||||
@@ -83,13 +83,14 @@ Of course these kind of features can come with some drawbacks:
|
|||||||
away from (and this is likely a mindset change for programmers used to the
|
away from (and this is likely a mindset change for programmers used to the
|
||||||
imperative model). We have worked to make this less of a concern by creating
|
imperative model). We have worked to make this less of a concern by creating
|
||||||
and encouraging the usage of :doc:`persistence <persistence>`, to help make
|
and encouraging the usage of :doc:`persistence <persistence>`, to help make
|
||||||
it possible to have some level of provided state transfer mechanism.
|
it possible to have state and tranfer that state via a argument input and
|
||||||
|
output mechanism.
|
||||||
* Depending on how much imperative code exists (and state inside that code)
|
* Depending on how much imperative code exists (and state inside that code)
|
||||||
there can be *significant* rework of that code and converting or refactoring
|
there *may* be *significant* rework of that code and converting or
|
||||||
it to these new concepts. We have tried to help here by allowing you to have
|
refactoring it to these new concepts. We have tried to help here by allowing
|
||||||
tasks that internally use regular python code (and internally can be written
|
you to have tasks that internally use regular python code (and internally can
|
||||||
in an imperative style) as well as by providing examples and these developer
|
be written in an imperative style) as well as by providing
|
||||||
docs; helping this process be as seamless as possible.
|
:doc:`examples <examples>` that show how to use these concepts.
|
||||||
* Another one of the downsides of decoupling the *what* from the *how* is that
|
* Another one of the downsides of decoupling the *what* from the *how* is that
|
||||||
it may become harder to use traditional techniques to debug failures
|
it may become harder to use traditional techniques to debug failures
|
||||||
(especially if remote workers are involved). We try to help here by making it
|
(especially if remote workers are involved). We try to help here by making it
|
||||||
@@ -110,7 +111,7 @@ All engines are mere classes that implement the same interface, and of course
|
|||||||
it is possible to import them and create instances just like with any classes
|
it is possible to import them and create instances just like with any classes
|
||||||
in Python. But the easier (and recommended) way for creating an engine is using
|
in Python. But the easier (and recommended) way for creating an engine is using
|
||||||
the engine helper functions. All of these functions are imported into the
|
the engine helper functions. All of these functions are imported into the
|
||||||
`taskflow.engines` module namespace, so the typical usage of these functions
|
``taskflow.engines`` module namespace, so the typical usage of these functions
|
||||||
might look like::
|
might look like::
|
||||||
|
|
||||||
from taskflow import engines
|
from taskflow import engines
|
||||||
@@ -130,8 +131,8 @@ Usage
|
|||||||
To select which engine to use and pass parameters to an engine you should use
|
To select which engine to use and pass parameters to an engine you should use
|
||||||
the ``engine_conf`` parameter any helper factory function accepts. It may be:
|
the ``engine_conf`` parameter any helper factory function accepts. It may be:
|
||||||
|
|
||||||
* a string, naming engine type;
|
* A string, naming the engine type.
|
||||||
* a dictionary, holding engine type with key ``'engine'`` and possibly
|
* A dictionary, naming engine type with key ``'engine'`` and possibly
|
||||||
type-specific engine configuration parameters.
|
type-specific engine configuration parameters.
|
||||||
|
|
||||||
Single-Threaded
|
Single-Threaded
|
||||||
@@ -139,15 +140,20 @@ Single-Threaded
|
|||||||
|
|
||||||
**Engine type**: ``'serial'``
|
**Engine type**: ``'serial'``
|
||||||
|
|
||||||
Runs all tasks on the single thread -- the same thread `engine.run()` is called
|
Runs all tasks on a single thread -- the same thread ``engine.run()`` is
|
||||||
on. This engine is used by default.
|
called from.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
This engine is used by default.
|
||||||
|
|
||||||
.. tip::
|
.. tip::
|
||||||
|
|
||||||
If eventlet is used then this engine will not block other threads
|
If eventlet is used then this engine will not block other threads
|
||||||
from running as eventlet automatically creates a co-routine system (using
|
from running as eventlet automatically creates a implicit co-routine
|
||||||
greenthreads and monkey patching). See `eventlet <http://eventlet.net/>`_
|
system (using greenthreads and monkey patching). See
|
||||||
and `greenlet <http://greenlet.readthedocs.org/>`_ for more details.
|
`eventlet <http://eventlet.net/>`_ and
|
||||||
|
`greenlet <http://greenlet.readthedocs.org/>`_ for more details.
|
||||||
|
|
||||||
Parallel
|
Parallel
|
||||||
--------
|
--------
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
==================
|
==================
|
||||||
Inputs and Outputs
|
Inputs and outputs
|
||||||
==================
|
==================
|
||||||
|
|
||||||
In TaskFlow there are multiple ways to provide inputs for your tasks and flows
|
In TaskFlow there are multiple ways to provide inputs for your tasks and flows
|
||||||
and get information from them. This document describes one of them, that
|
and get information from them. This document describes one of them, that
|
||||||
involves task arguments and results. There are also :doc:`notifications
|
involves task arguments and results. There are also :doc:`notifications
|
||||||
<notifications>`, which allow you to get notified when task or flow changed
|
<notifications>`, which allow you to get notified when a task or flow changes
|
||||||
state. You may also opt to use the :doc:`persistence <persistence>` layer
|
state. You may also opt to use the :doc:`persistence <persistence>` layer
|
||||||
itself directly.
|
itself directly.
|
||||||
|
|
||||||
@@ -19,15 +19,16 @@ This is the standard and recommended way to pass data from one task to another.
|
|||||||
Of course not every task argument needs to be provided to some other task of a
|
Of course not every task argument needs to be provided to some other task of a
|
||||||
flow, and not every task result should be consumed by every task.
|
flow, and not every task result should be consumed by every task.
|
||||||
|
|
||||||
If some value is required by one or more tasks of a flow, but is not provided
|
If some value is required by one or more tasks of a flow, but it is not
|
||||||
by any task, it is considered to be flow input, and **must** be put into the
|
provided by any task, it is considered to be flow input, and **must** be put
|
||||||
storage before the flow is run. A set of names required by a flow can be
|
into the storage before the flow is run. A set of names required by a flow can
|
||||||
retrieved via that flow's ``requires`` property. These names can be used to
|
be retrieved via that flow's ``requires`` property. These names can be used to
|
||||||
determine what names may be applicable for placing in storage ahead of time
|
determine what names may be applicable for placing in storage ahead of time
|
||||||
and which names are not applicable.
|
and which names are not applicable.
|
||||||
|
|
||||||
All values provided by tasks of the flow are considered to be flow outputs; the
|
All values provided by tasks of the flow are considered to be flow outputs; the
|
||||||
set of names of such values is available via ``provides`` property of the flow.
|
set of names of such values is available via the ``provides`` property of the
|
||||||
|
flow.
|
||||||
|
|
||||||
.. testsetup::
|
.. testsetup::
|
||||||
|
|
||||||
@@ -59,8 +60,10 @@ As you can see, this flow does not require b, as it is provided by the fist
|
|||||||
task.
|
task.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
There is no difference between processing of Task and Retry inputs
|
|
||||||
and outputs.
|
There is no difference between processing of
|
||||||
|
:py:class:`Task <taskflow.task.BaseTask>` and
|
||||||
|
:py:class:`~taskflow.retry.Retry` inputs and outputs.
|
||||||
|
|
||||||
------------------
|
------------------
|
||||||
Engine and storage
|
Engine and storage
|
||||||
@@ -146,8 +149,10 @@ Outputs
|
|||||||
|
|
||||||
As you can see from examples above, the run method returns all flow outputs in
|
As you can see from examples above, the run method returns all flow outputs in
|
||||||
a ``dict``. This same data can be fetched via
|
a ``dict``. This same data can be fetched via
|
||||||
:py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can
|
:py:meth:`~taskflow.storage.Storage.fetch_all` method of the engines storage
|
||||||
also get single results using :py:meth:`~taskflow.storage.Storage.fetch`.
|
object. You can also get single results using the
|
||||||
|
engines storage objects :py:meth:`~taskflow.storage.Storage.fetch` method.
|
||||||
|
|
||||||
For example:
|
For example:
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
===========================
|
===========================
|
||||||
Notifications and Listeners
|
Notifications and listeners
|
||||||
===========================
|
===========================
|
||||||
|
|
||||||
.. testsetup::
|
.. testsetup::
|
||||||
@@ -17,9 +17,9 @@ transitions, which is useful for monitoring, logging, metrics, debugging
|
|||||||
and plenty of other tasks.
|
and plenty of other tasks.
|
||||||
|
|
||||||
To receive these notifications you should register a callback with
|
To receive these notifications you should register a callback with
|
||||||
an instance of the the :py:class:`notifier <taskflow.utils.misc.Notifier>`
|
an instance of the :py:class:`~taskflow.utils.misc.Notifier`
|
||||||
class that is attached
|
class that is attached
|
||||||
to :py:class:`engine <taskflow.engines.base.EngineBase>`
|
to :py:class:`Engine <taskflow.engines.base.EngineBase>`
|
||||||
attributes ``task_notifier`` and ``notifier``.
|
attributes ``task_notifier`` and ``notifier``.
|
||||||
|
|
||||||
TaskFlow also comes with a set of predefined :ref:`listeners <listeners>`, and
|
TaskFlow also comes with a set of predefined :ref:`listeners <listeners>`, and
|
||||||
@@ -30,17 +30,14 @@ using raw callbacks.
|
|||||||
Receiving notifications with callbacks
|
Receiving notifications with callbacks
|
||||||
--------------------------------------
|
--------------------------------------
|
||||||
|
|
||||||
To manage notifications instances of
|
|
||||||
:py:class:`~taskflow.utils.misc.Notifier` are used.
|
|
||||||
|
|
||||||
.. autoclass:: taskflow.utils.misc.Notifier
|
|
||||||
|
|
||||||
Flow notifications
|
Flow notifications
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
To receive notification on flow state changes use
|
To receive notification on flow state changes use the
|
||||||
:py:class:`~taskflow.utils.misc.Notifier` available as
|
:py:class:`~taskflow.utils.misc.Notifier` instance available as the
|
||||||
``notifier`` property of the engine. A basic example is:
|
``notifier`` property of an engine.
|
||||||
|
|
||||||
|
A basic example is:
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|
||||||
@@ -71,9 +68,11 @@ To receive notification on flow state changes use
|
|||||||
Task notifications
|
Task notifications
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
To receive notification on task state changes use
|
To receive notification on task state changes use the
|
||||||
:py:class:`~taskflow.utils.misc.Notifier` available as
|
:py:class:`~taskflow.utils.misc.Notifier` instance available as the
|
||||||
``task_notifier`` property of the engine. A basic example is:
|
``task_notifier`` property of an engine.
|
||||||
|
|
||||||
|
A basic example is:
|
||||||
|
|
||||||
.. doctest::
|
.. doctest::
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,11 @@ Cache
|
|||||||
|
|
||||||
.. automodule:: taskflow.types.cache
|
.. automodule:: taskflow.types.cache
|
||||||
|
|
||||||
|
Failure
|
||||||
|
=======
|
||||||
|
|
||||||
|
.. autoclass:: taskflow.utils.misc.Failure
|
||||||
|
|
||||||
FSM
|
FSM
|
||||||
===
|
===
|
||||||
|
|
||||||
@@ -17,6 +22,11 @@ Graph
|
|||||||
|
|
||||||
.. automodule:: taskflow.types.graph
|
.. automodule:: taskflow.types.graph
|
||||||
|
|
||||||
|
Notifier
|
||||||
|
========
|
||||||
|
|
||||||
|
.. autoclass:: taskflow.utils.misc.Notifier
|
||||||
|
|
||||||
Table
|
Table
|
||||||
=====
|
=====
|
||||||
|
|
||||||
|
|||||||
@@ -69,34 +69,16 @@ class Worker(object):
|
|||||||
:param url: broker url
|
:param url: broker url
|
||||||
:param exchange: broker exchange name
|
:param exchange: broker exchange name
|
||||||
:param topic: topic name under which worker is stated
|
:param topic: topic name under which worker is stated
|
||||||
:param tasks: tasks list that worker is capable to perform
|
:param tasks: task list that worker is capable of performing, items in
|
||||||
|
the list can be one of the following types; 1, a string naming the
|
||||||
Tasks list item can be one of the following types:
|
python module name to search for tasks in or the task class name; 2, a
|
||||||
1. String:
|
python module to search for tasks in; 3, a task class object that
|
||||||
|
will be used to create tasks from.
|
||||||
1.1 Python module name:
|
:param executor: custom executor object that can used for processing
|
||||||
|
requests in separate threads (if not provided one will be created)
|
||||||
> tasks=['taskflow.tests.utils']
|
:param threads_count: threads count to be passed to the default executor
|
||||||
|
:param transport: transport to be used (e.g. amqp, memory, etc.)
|
||||||
1.2. Task class (BaseTask subclass) name:
|
:param transport_options: transport specific options
|
||||||
|
|
||||||
> tasks=['taskflow.test.utils.DummyTask']
|
|
||||||
|
|
||||||
3. Python module:
|
|
||||||
|
|
||||||
> from taskflow.tests import utils
|
|
||||||
> tasks=[utils]
|
|
||||||
|
|
||||||
4. Task class (BaseTask subclass):
|
|
||||||
|
|
||||||
> from taskflow.tests import utils
|
|
||||||
> tasks=[utils.DummyTask]
|
|
||||||
|
|
||||||
:param executor: custom executor object that is used for processing
|
|
||||||
requests in separate threads
|
|
||||||
:keyword threads_count: threads count to be passed to the default executor
|
|
||||||
:keyword transport: transport to be used (e.g. amqp, memory, etc.)
|
|
||||||
:keyword transport_options: transport specific options
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
|
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
|
||||||
|
|||||||
@@ -416,7 +416,10 @@ class Notifier(object):
|
|||||||
notification occurs.
|
notification occurs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
#: Keys that can not be used in callbacks arguments
|
||||||
RESERVED_KEYS = ('details',)
|
RESERVED_KEYS = ('details',)
|
||||||
|
|
||||||
|
#: Kleene star constant that is used to recieve all notifications
|
||||||
ANY = '*'
|
ANY = '*'
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -474,9 +477,9 @@ class Notifier(object):
|
|||||||
|
|
||||||
Callback will be called with provided ``args`` and ``kwargs`` and
|
Callback will be called with provided ``args`` and ``kwargs`` and
|
||||||
when event type occurs (or on any event if ``event_type`` equals to
|
when event type occurs (or on any event if ``event_type`` equals to
|
||||||
``Notifier.ANY``). It will also get additional keyword argument,
|
:attr:`.ANY`). It will also get additional keyword argument,
|
||||||
``details``, that will hold event details provided to
|
``details``, that will hold event details provided to the
|
||||||
:py:meth:`notify` method.
|
:meth:`.notify` method.
|
||||||
"""
|
"""
|
||||||
assert six.callable(callback), "Callback must be callable"
|
assert six.callable(callback), "Callback must be callable"
|
||||||
if self.is_registered(event_type, callback):
|
if self.is_registered(event_type, callback):
|
||||||
@@ -576,9 +579,10 @@ class Failure(object):
|
|||||||
remote worker throws an exception, the WBE based engine will receive that
|
remote worker throws an exception, the WBE based engine will receive that
|
||||||
exception and desire to reraise it to the user/caller of the WBE based
|
exception and desire to reraise it to the user/caller of the WBE based
|
||||||
engine for appropriate handling (this matches the behavior of non-remote
|
engine for appropriate handling (this matches the behavior of non-remote
|
||||||
engines). To accomplish this a failure object (or a to_dict() form) would
|
engines). To accomplish this a failure object (or a
|
||||||
be sent over the WBE channel and the WBE based engine would deserialize it
|
:py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel
|
||||||
and use this objects reraise() method to cause an exception that contains
|
and the WBE based engine would deserialize it and use this objects
|
||||||
|
:meth:`.reraise` method to cause an exception that contains
|
||||||
similar/equivalent information as the original exception to be reraised,
|
similar/equivalent information as the original exception to be reraised,
|
||||||
allowing the user (or the WBE engine itself) to then handle the worker
|
allowing the user (or the WBE engine itself) to then handle the worker
|
||||||
failure/exception as they desire.
|
failure/exception as they desire.
|
||||||
@@ -642,6 +646,7 @@ class Failure(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_exception(cls, exception):
|
def from_exception(cls, exception):
|
||||||
|
"""Creates a failure object from a exception instance."""
|
||||||
return cls((type(exception), exception, None))
|
return cls((type(exception), exception, None))
|
||||||
|
|
||||||
def _matches(self, other):
|
def _matches(self, other):
|
||||||
@@ -652,6 +657,7 @@ class Failure(object):
|
|||||||
and self.traceback_str == other.traceback_str)
|
and self.traceback_str == other.traceback_str)
|
||||||
|
|
||||||
def matches(self, other):
|
def matches(self, other):
|
||||||
|
"""Checks if another object is equivalent to this object."""
|
||||||
if not isinstance(other, Failure):
|
if not isinstance(other, Failure):
|
||||||
return False
|
return False
|
||||||
if self.exc_info is None or other.exc_info is None:
|
if self.exc_info is None or other.exc_info is None:
|
||||||
@@ -706,9 +712,10 @@ class Failure(object):
|
|||||||
"""Re-raise exceptions if argument is not empty.
|
"""Re-raise exceptions if argument is not empty.
|
||||||
|
|
||||||
If argument is empty list, this method returns None. If
|
If argument is empty list, this method returns None. If
|
||||||
argument is list with single Failure object in it,
|
argument is a list with a single ``Failure`` object in it,
|
||||||
this failure is reraised. Else, WrappedFailure exception
|
that failure is reraised. Else, a
|
||||||
is raised with failures list as causes.
|
:class:`~taskflow.exceptions.WrappedFailure` exception
|
||||||
|
is raised with a failure list as causes.
|
||||||
"""
|
"""
|
||||||
failures = list(failures)
|
failures = list(failures)
|
||||||
if len(failures) == 1:
|
if len(failures) == 1:
|
||||||
@@ -724,7 +731,7 @@ class Failure(object):
|
|||||||
raise exc.WrappedFailure([self])
|
raise exc.WrappedFailure([self])
|
||||||
|
|
||||||
def check(self, *exc_classes):
|
def check(self, *exc_classes):
|
||||||
"""Check if any of exc_classes caused the failure.
|
"""Check if any of ``exc_classes`` caused the failure.
|
||||||
|
|
||||||
Arguments of this method can be exception types or type
|
Arguments of this method can be exception types or type
|
||||||
names (stings). If captured exception is instance of
|
names (stings). If captured exception is instance of
|
||||||
@@ -744,6 +751,7 @@ class Failure(object):
|
|||||||
return self.pformat()
|
return self.pformat()
|
||||||
|
|
||||||
def pformat(self, traceback=False):
|
def pformat(self, traceback=False):
|
||||||
|
"""Pretty formats the failure object into a string."""
|
||||||
buf = six.StringIO()
|
buf = six.StringIO()
|
||||||
buf.write(
|
buf.write(
|
||||||
'Failure: %s: %s' % (self._exc_type_names[0], self._exception_str))
|
'Failure: %s: %s' % (self._exc_type_names[0], self._exception_str))
|
||||||
@@ -766,6 +774,7 @@ class Failure(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, data):
|
def from_dict(cls, data):
|
||||||
|
"""Converts this from a dictionary to a object."""
|
||||||
data = dict(data)
|
data = dict(data)
|
||||||
version = data.pop('version', None)
|
version = data.pop('version', None)
|
||||||
if version != cls.DICT_VERSION:
|
if version != cls.DICT_VERSION:
|
||||||
@@ -774,6 +783,7 @@ class Failure(object):
|
|||||||
return cls(**data)
|
return cls(**data)
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
|
"""Converts this object to a dictionary."""
|
||||||
return {
|
return {
|
||||||
'exception_str': self.exception_str,
|
'exception_str': self.exception_str,
|
||||||
'traceback_str': self.traceback_str,
|
'traceback_str': self.traceback_str,
|
||||||
@@ -782,6 +792,7 @@ class Failure(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def copy(self):
|
def copy(self):
|
||||||
|
"""Copies this object."""
|
||||||
return Failure(exc_info=copy_exc_info(self.exc_info),
|
return Failure(exc_info=copy_exc_info(self.exc_info),
|
||||||
exception_str=self.exception_str,
|
exception_str=self.exception_str,
|
||||||
traceback_str=self.traceback_str,
|
traceback_str=self.traceback_str,
|
||||||
|
|||||||
Reference in New Issue
Block a user