Files
deb-python-taskflow/doc/source/inputs_and_outputs.rst
Ivan A. Melnikov 98fade1412 Rework documentation of notifications
- move notifications docs from inputs and outputs to separate page;
- listeners and TransitionNotifier documented;
  - docstrings improved, missing docstrings added;
  - documentation put to notifications page;
- inputs and outputs page edited.

Change-Id: Ib283836173a806fbec81aa07b3292e2672bf6483
2014-03-28 23:54:56 +04:00

4.9 KiB

Inputs and Outputs

In TaskFlow there are multiple ways to provide inputs for your tasks and flows and get information from them. This document describes one of them, that involves task arguments and results. There are also notifications, which allow you to get notified when task or flow changed state. You may also opt to use persistence directly.

Flow Inputs and Outputs

Tasks accept inputs via task arguments and provide outputs via task results (see arguments_and_results for more details). This the standard and recommended way to pass data from one task to another. Of course not every task argument needs to be provided to some other task of a flow, and not every task result should be consumed by every task.

If some value is required by one or more tasks of a flow, but is not provided by any task, it is considered to be flow input, and must be put into the storage before the flow is run. A set of names required by a flow can be retrieved via that flow's requires property. These names can be used to determine what names may be applicable for placing in storage ahead of time and which names are not applicable.

All values provided by tasks of the flow are considered to be flow outputs; the set of names of such values is available via provides property of the flow.

from taskflow import task from taskflow.patterns import linear_flow from taskflow import engines

For example:

>>> class MyTask(task.Task): ... def execute(self, **kwargs): ... return 1, 2 ... >>> flow = linear_flow.Flow('test').add( ... MyTask(requires='a', provides=('b', 'c')), ... MyTask(requires='b', provides='d') ... ) >>> flow.requires set(['a']) >>> sorted(flow.provides) ['b', 'c', 'd']

As you can see, this flow does not require b, as it is provided by the fist task.

Note

There is no difference between processing of Task and Retry inputs and outputs.

Engine and Storage

The storage layer is how an engine persists flow and task details. For more in-depth design details see persistence and storage.

Inputs

As mentioned above, if some value is required by one or more tasks of a flow, but is not provided by any task, it is considered to be flow input, and must be put into the storage before the flow is run. On failure to do so :py~taskflow.exceptions.MissingDependencies is raised by engine:

>>> class CatTalk(task.Task): ... def execute(self, meow): ... print meow ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print woof ... return "dog" ... >>> flo = linear_flow.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) <taskflow.patterns.linear_flow.Flow object at 0x...> >>> engines.run(flo) Traceback (most recent call last): ... taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog; 2 requires ['meow', 'woof'] but no other entity produces said requirements

The recommended way to provide flow inputs is to use store parameter of engine helpers (:py~taskflow.engines.helpers.run or :py~taskflow.engines.helpers.load):

>>> class CatTalk(task.Task): ... def execute(self, meow): ... print meow ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print woof ... return "dog" ... >>> flo = linear_flow.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) <taskflow.patterns.linear_flow.Flow object at 0x...> >>> engines.run(flo, store={'meow': 'meow', 'woof': 'woof'}) meow woof {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}

You can also directly interact with the engine storage layer to add additional values, also you can't use :py~taskflow.engines.helpers.run in this case:

>>> flo = linear_flow.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) <taskflow.patterns.linear_flow.Flow object at 0x...> >>> eng = engines.load(flo, store={'meow': 'meow'}) >>> eng.storage.inject({"woof": "bark"}) >>> eng.run() meow bark

Outputs

As you can see from examples above, run method returns all flow outputs in a dict. This same data can be fetched via :py~taskflow.storage.Storage.fetch_all method of the storage. You can also get single results using :py~taskflow.storage.Storage.fetch_all. For example:

>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'}) >>> eng.run() meow woof >>> print(eng.storage.fetch_all()) {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'} >>> print(eng.storage.fetch("dog")) dog