- Increase the level of docs around why the persistence objects exist and what they are used for and how an engine interacts with persistence objects to accomplish the wider goals. - Pass backend conf as arg, not kwarg and to match with the other fetch/load functions allow for providing a kwargs which can be used for backend specific params. Change-Id: Ia83e63196bcfcc966f68212f84f79a1e9f18e8bc
158 lines
5.0 KiB
ReStructuredText
158 lines
5.0 KiB
ReStructuredText
==================
|
|
Inputs and Outputs
|
|
==================
|
|
|
|
In TaskFlow there are multiple ways to provide inputs for your tasks and flows
|
|
and get information from them. This document describes one of them, that
|
|
involves task arguments and results. There are also :doc:`notifications`, which
|
|
allow you to get notified when task or flow changed state. You may also opt to
|
|
use :doc:`persistence` directly.
|
|
|
|
-----------------------
|
|
Flow Inputs and Outputs
|
|
-----------------------
|
|
|
|
Tasks accept inputs via task arguments and provide outputs via task results
|
|
(see :doc:`arguments_and_results` for more details). This is the standard and
|
|
recommended way to pass data from one task to another. Of course not every task
|
|
argument needs to be provided to some other task of a flow, and not every task
|
|
result should be consumed by every task.
|
|
|
|
If some value is required by one or more tasks of a flow, but is not provided
|
|
by any task, it is considered to be flow input, and **must** be put into the
|
|
storage before the flow is run. A set of names required by a flow can be
|
|
retrieved via that flow's ``requires`` property. These names can be used to
|
|
determine what names may be applicable for placing in storage ahead of time
|
|
and which names are not applicable.
|
|
|
|
All values provided by tasks of the flow are considered to be flow outputs; the
|
|
set of names of such values is available via ``provides`` property of the flow.
|
|
|
|
.. testsetup::
|
|
|
|
from taskflow import task
|
|
from taskflow.patterns import linear_flow
|
|
from taskflow import engines
|
|
|
|
For example:
|
|
|
|
.. doctest::
|
|
|
|
>>> class MyTask(task.Task):
|
|
... def execute(self, **kwargs):
|
|
... return 1, 2
|
|
...
|
|
>>> flow = linear_flow.Flow('test').add(
|
|
... MyTask(requires='a', provides=('b', 'c')),
|
|
... MyTask(requires='b', provides='d')
|
|
... )
|
|
>>> flow.requires
|
|
set(['a'])
|
|
>>> sorted(flow.provides)
|
|
['b', 'c', 'd']
|
|
|
|
.. make vim syntax highlighter happy**
|
|
|
|
As you can see, this flow does not require b, as it is provided by the fist task.
|
|
|
|
.. note::
|
|
There is no difference between processing of Task and Retry inputs and outputs.
|
|
|
|
------------------
|
|
Engine and Storage
|
|
------------------
|
|
|
|
The storage layer is how an engine persists flow and task details. For more
|
|
in-depth design details see :doc:`persistence`.
|
|
|
|
Inputs
|
|
------
|
|
|
|
As mentioned above, if some value is required by one or more tasks of a flow,
|
|
but is not provided by any task, it is considered to be flow input, and
|
|
**must** be put into the storage before the flow is run. On failure to do
|
|
so :py:class:`~taskflow.exceptions.MissingDependencies` is raised by the engine
|
|
prior to running:
|
|
|
|
.. doctest::
|
|
|
|
>>> class CatTalk(task.Task):
|
|
... def execute(self, meow):
|
|
... print meow
|
|
... return "cat"
|
|
...
|
|
>>> class DogTalk(task.Task):
|
|
... def execute(self, woof):
|
|
... print woof
|
|
... return "dog"
|
|
...
|
|
>>> flo = linear_flow.Flow("cat-dog")
|
|
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
|
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
|
>>> engines.run(flo)
|
|
Traceback (most recent call last):
|
|
...
|
|
taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog;
|
|
2 requires ['meow', 'woof'] but no other entity produces said requirements
|
|
|
|
The recommended way to provide flow inputs is to use the ``store`` parameter
|
|
of the engine helpers (:py:func:`~taskflow.engines.helpers.run` or
|
|
:py:func:`~taskflow.engines.helpers.load`):
|
|
|
|
.. doctest::
|
|
|
|
>>> class CatTalk(task.Task):
|
|
... def execute(self, meow):
|
|
... print meow
|
|
... return "cat"
|
|
...
|
|
>>> class DogTalk(task.Task):
|
|
... def execute(self, woof):
|
|
... print woof
|
|
... return "dog"
|
|
...
|
|
>>> flo = linear_flow.Flow("cat-dog")
|
|
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
|
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
|
>>> engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
|
|
meow
|
|
woof
|
|
{'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
|
|
|
|
You can also directly interact with the engine storage layer to add
|
|
additional values, note that if this route is used you can't use
|
|
:py:func:`~taskflow.engines.helpers.run` in this case to run your engine (instead
|
|
your must activate the engines run method directly):
|
|
|
|
.. doctest::
|
|
|
|
>>> flo = linear_flow.Flow("cat-dog")
|
|
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
|
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
|
>>> eng = engines.load(flo, store={'meow': 'meow'})
|
|
>>> eng.storage.inject({"woof": "bark"})
|
|
>>> eng.run()
|
|
meow
|
|
bark
|
|
|
|
Outputs
|
|
-------
|
|
|
|
As you can see from examples above, the run method returns all flow outputs in
|
|
a ``dict``. This same data can be fetched via
|
|
:py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can
|
|
also get single results using :py:meth:`~taskflow.storage.Storage.fetch_all`. For
|
|
example:
|
|
|
|
.. doctest::
|
|
|
|
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
|
>>> eng.run()
|
|
meow
|
|
woof
|
|
>>> print(eng.storage.fetch_all())
|
|
{'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
|
|
>>> print(eng.storage.fetch("dog"))
|
|
dog
|
|
|