Rework documentation of notifications
- move notifications docs from inputs and outputs to separate page; - listeners and TransitionNotifier documented; - docstrings improved, missing docstrings added; - documentation put to notifications page; - inputs and outputs page edited. Change-Id: Ib283836173a806fbec81aa07b3292e2672bf6483
This commit is contained in:
@@ -13,8 +13,9 @@ Contents
|
||||
atoms
|
||||
arguments_and_results
|
||||
patterns
|
||||
inputs_and_outputs
|
||||
engines
|
||||
inputs_and_outputs
|
||||
notifications
|
||||
storage
|
||||
persistence
|
||||
exceptions
|
||||
|
@@ -2,28 +2,27 @@
|
||||
Inputs and Outputs
|
||||
==================
|
||||
|
||||
--------
|
||||
Overview
|
||||
--------
|
||||
In TaskFlow there are multiple ways to provide inputs for your tasks and flows
|
||||
and get information from them. This document describes one of them, that
|
||||
involves task arguments and results. There are also :doc:`notifications`, which
|
||||
allow you to get notified when task or flow changed state. You may also opt to
|
||||
use :doc:`persistence` directly.
|
||||
|
||||
In taskflow there are multiple ways to design how your tasks/flows and engines get inputs and produce outputs. This document will help you understand what those ways are and how to use those ways to accomplish your desired taskflow usage pattern as well as include examples that show common ways of providing input and getting output.
|
||||
-----------------------
|
||||
Flow Inputs and Outputs
|
||||
-----------------------
|
||||
|
||||
------------------------------
|
||||
Task & Flow Inputs and Outputs
|
||||
------------------------------
|
||||
Tasks accept inputs via task arguments and provide outputs via task results (see :doc:`arguments_and_results` for more details). This the standard and recommended way to pass data from one task to another. Of course not every task argument needs to be provided to some other task of a flow, and not every task result should be consumed by every task.
|
||||
|
||||
A task accept inputs via task arguments and provides outputs via task results (see :doc:`arguments_and_results` for more details). This the standard and recommended way to pass data from one task to another. Of course not every task argument needs to be provided to some other task of a flow, and not every task result should be consumed by every task.
|
||||
If some value is required by one or more tasks of a flow, but is not provided by any task, it is considered to be flow input, and **must** be put into the storage before the flow is run. A set of names required by a flow can be retrieved via that flow's ``requires`` property. These names can be used to determine what names may be applicable for placing in storage ahead of time and which names are not applicable.
|
||||
|
||||
If some value is required by one or more tasks of a flow, but is not provided by any task, it is considered to be flow input, and MUST be put into the storage before the flow is run. A set of names required by a flow can be retrieved via that flows requires property. These names can be used to determine what names may be applicable for placing in storage ahead of time and which names are not applicable.
|
||||
|
||||
All values provided by tasks of the flow are considered to be flow outputs; the set of names of such values is available via the provides property of the flow.
|
||||
All values provided by tasks of the flow are considered to be flow outputs; the set of names of such values is available via ``provides`` property of the flow.
|
||||
|
||||
.. testsetup::
|
||||
|
||||
from taskflow import task
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow import engines
|
||||
from taskflow.listeners import printing
|
||||
|
||||
For example:
|
||||
|
||||
@@ -42,26 +41,23 @@ For example:
|
||||
>>> sorted(flow.provides)
|
||||
['b', 'c', 'd']
|
||||
|
||||
.. make vim syntax highlighter happy**
|
||||
|
||||
As you can see, this flow does not require b, as it is provided by the fist task.
|
||||
|
||||
.. note::
|
||||
There is no difference between processing of Task and Retry inputs and outputs.
|
||||
|
||||
-------------------------
|
||||
Engine Inputs and Outputs
|
||||
-------------------------
|
||||
------------------
|
||||
Engine and Storage
|
||||
------------------
|
||||
|
||||
Storage
|
||||
=======
|
||||
|
||||
The storage layer is how an engine persists flow and task details.
|
||||
|
||||
For more in-depth design details: :doc:`persistence`.
|
||||
The storage layer is how an engine persists flow and task details. For more in-depth design details see :doc:`persistence` and :doc:`storage`.
|
||||
|
||||
Inputs
|
||||
------
|
||||
|
||||
**The problem:** you should prepopulate storage with all required flow inputs before running it:
|
||||
As mentioned above, if some value is required by one or more tasks of a flow, but is not provided by any task, it is considered to be flow input, and **must** be put into the storage before the flow is run. On failure to do so :py:class:`~taskflow.exceptions.MissingDependencies` is raised by engine:
|
||||
|
||||
.. doctest::
|
||||
|
||||
@@ -84,7 +80,7 @@ Inputs
|
||||
taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog;
|
||||
2 requires ['meow', 'woof'] but no other entity produces said requirements
|
||||
|
||||
**The solution:** provide necessary data via store parameter of engines.run:
|
||||
The recommended way to provide flow inputs is to use ``store`` parameter of engine helpers (:py:func:`~taskflow.engines.helpers.run` or :py:func:`~taskflow.engines.helpers.load`):
|
||||
|
||||
.. doctest::
|
||||
|
||||
@@ -106,8 +102,7 @@ Inputs
|
||||
woof
|
||||
{'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
|
||||
|
||||
.. note::
|
||||
You can also directly interact with the engine storage layer to add additional values although you must use the load method instead.
|
||||
You can also directly interact with the engine storage layer to add additional values, also you can't use :py:func:`~taskflow.engines.helpers.run` in this case:
|
||||
|
||||
.. doctest::
|
||||
|
||||
@@ -123,9 +118,7 @@ Inputs
|
||||
Outputs
|
||||
-------
|
||||
|
||||
As you can see from examples above, run method returns all flow outputs in a dict. This same data can be fetched via fetch_all method of the storage, or in a more precise manner by using fetch method.
|
||||
|
||||
For example:
|
||||
As you can see from examples above, run method returns all flow outputs in a ``dict``. This same data can be fetched via :py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can also get single results using :py:meth:`~taskflow.storage.Storage.fetch_all`. For example:
|
||||
|
||||
.. doctest::
|
||||
|
||||
@@ -138,116 +131,3 @@ For example:
|
||||
>>> print(eng.storage.fetch("dog"))
|
||||
dog
|
||||
|
||||
Notifications
|
||||
=============
|
||||
|
||||
**What:** engines provide a way to receive notification on task and flow state transitions.
|
||||
|
||||
**Why:** state transition notifications are useful for monitoring, logging, metrics, debugging, affecting further engine state (and other unknown future usage).
|
||||
|
||||
Flow notifications
|
||||
------------------
|
||||
|
||||
A basic example is the following:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>> def flow_transition(state, details):
|
||||
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog")
|
||||
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
||||
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> eng.notifier.register("*", flow_transition)
|
||||
>>> eng.run()
|
||||
Flow 'cat-dog' transition to state RUNNING
|
||||
meow
|
||||
woof
|
||||
Flow 'cat-dog' transition to state SUCCESS
|
||||
|
||||
Task notifications
|
||||
------------------
|
||||
|
||||
A basic example is the following:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>> def task_transition(state, details):
|
||||
... print("Task '%s' transition to state %s" % (details['task_name'], state))
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog")
|
||||
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
||||
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> eng.task_notifier.register("*", task_transition)
|
||||
>>> eng.run()
|
||||
Task 'CatTalk' transition to state RUNNING
|
||||
meow
|
||||
Task 'CatTalk' transition to state SUCCESS
|
||||
Task 'DogTalk' transition to state RUNNING
|
||||
woof
|
||||
Task 'DogTalk' transition to state SUCCESS
|
||||
|
||||
Common notification classes
|
||||
---------------------------
|
||||
|
||||
There exists common helper classes that can be used to accomplish common ways of notifying.
|
||||
|
||||
Helper to output to stderr/stdout
|
||||
Helper to output to a logging backend
|
||||
|
||||
A basic example is the following:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog")
|
||||
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
||||
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> with printing.PrintingListener(eng):
|
||||
... eng.run()
|
||||
...
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING'
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING'
|
||||
meow
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING'
|
||||
woof
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS'
|
||||
|
||||
|
||||
|
166
doc/source/notifications.rst
Normal file
166
doc/source/notifications.rst
Normal file
@@ -0,0 +1,166 @@
|
||||
===========================
|
||||
Notifications and Listeners
|
||||
===========================
|
||||
|
||||
.. testsetup::
|
||||
|
||||
from taskflow import task
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow import engines
|
||||
|
||||
--------
|
||||
Overview
|
||||
--------
|
||||
|
||||
Engines provide a way to receive notification on task and flow state
|
||||
transitions, which is useful for monitoring, logging, metrics, debugging
|
||||
and plenty of other tasks.
|
||||
|
||||
To receive these notifications you should register a callback in
|
||||
:py:class:`~taskflow.utils.misc.TransitionNotifier` provided by engine.
|
||||
Each engine provides two of them: one notifies about flow state changes,
|
||||
and another notifies about changes of tasks.
|
||||
|
||||
TaskFlow also has a set of predefined :ref:`listeners`, and provides
|
||||
means to write your own listeners, which can be more convenient than
|
||||
using raw callbacks.
|
||||
|
||||
--------------------------------------
|
||||
Receiving Notifications with Callbacks
|
||||
--------------------------------------
|
||||
|
||||
To manage notifications instances of
|
||||
:py:class:`~taskflow.utils.misc.TransitionNotifier` are used.
|
||||
|
||||
.. autoclass:: taskflow.utils.misc.TransitionNotifier
|
||||
|
||||
Flow Notifications
|
||||
------------------
|
||||
|
||||
To receive notification on flow state changes use
|
||||
:py:class:`~taskflow.utils.misc.TransitionNotifier` available as
|
||||
``notifier`` property of the engine. A basic example is:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>> def flow_transition(state, details):
|
||||
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog").add(
|
||||
... CatTalk(), DogTalk(provides="dog"))
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> eng.notifier.register("*", flow_transition)
|
||||
>>> eng.run()
|
||||
Flow 'cat-dog' transition to state RUNNING
|
||||
meow
|
||||
woof
|
||||
Flow 'cat-dog' transition to state SUCCESS
|
||||
|
||||
Task notifications
|
||||
------------------
|
||||
|
||||
To receive notification on task state changes use
|
||||
:py:class:`~taskflow.utils.misc.TransitionNotifier` available as
|
||||
``task_notifier`` property of the engine. A basic example is:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>> def task_transition(state, details):
|
||||
... print("Task '%s' transition to state %s" % (details['task_name'], state))
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog")
|
||||
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
|
||||
<taskflow.patterns.linear_flow.Flow object at 0x...>
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> eng.task_notifier.register("*", task_transition)
|
||||
>>> eng.run()
|
||||
Task 'CatTalk' transition to state RUNNING
|
||||
meow
|
||||
Task 'CatTalk' transition to state SUCCESS
|
||||
Task 'DogTalk' transition to state RUNNING
|
||||
woof
|
||||
Task 'DogTalk' transition to state SUCCESS
|
||||
|
||||
.. _listeners:
|
||||
|
||||
---------
|
||||
Listeners
|
||||
---------
|
||||
|
||||
TaskFlow comes with a set of predefined listeners -- helper classes that can be
|
||||
used to do various actions on flow and/or tasks transitions. You can also
|
||||
create your own listeners easily, which may be more convenient than using raw
|
||||
callbacks for some use cases.
|
||||
|
||||
For example, this is how you can use
|
||||
:py:class:`~taskflow.listeners.printing.PrintingListener`:
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> from taskflow.listeners import printing
|
||||
>>> class CatTalk(task.Task):
|
||||
... def execute(self, meow):
|
||||
... print(meow)
|
||||
... return "cat"
|
||||
...
|
||||
>>> class DogTalk(task.Task):
|
||||
... def execute(self, woof):
|
||||
... print(woof)
|
||||
... return 'dog'
|
||||
...
|
||||
>>>
|
||||
>>> flo = linear_flow.Flow("cat-dog").add(
|
||||
... CatTalk(), DogTalk(provides="dog"))
|
||||
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
|
||||
>>> with printing.PrintingListener(eng):
|
||||
... eng.run()
|
||||
...
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING'
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING'
|
||||
meow
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING'
|
||||
woof
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
|
||||
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS'
|
||||
|
||||
Basic Listener
|
||||
--------------
|
||||
|
||||
.. autoclass:: taskflow.listeners.base.ListenerBase
|
||||
|
||||
Printing and Logging Listeners
|
||||
------------------------------
|
||||
|
||||
.. autoclass:: taskflow.listeners.base.LoggingBase
|
||||
|
||||
.. autoclass:: taskflow.listeners.logging.LoggingListener
|
||||
|
||||
.. autoclass:: taskflow.listeners.printing.PrintingListener
|
||||
|
||||
Timing Listener
|
||||
---------------
|
||||
|
||||
.. autoclass:: taskflow.listeners.timing.TimingListener
|
@@ -33,11 +33,16 @@ FINISH_STATES = (states.FAILURE, states.SUCCESS)
|
||||
|
||||
|
||||
class ListenerBase(object):
|
||||
"""This provides a simple listener that can be attached to an engine which
|
||||
can be derived from to do some action on various flow and task state
|
||||
transitions. It provides a useful context manager access to be able to
|
||||
register and unregister with a given engine automatically when a context
|
||||
"""Base class for listeners.
|
||||
|
||||
A listener can be attached to an engine to do various actions on flow and
|
||||
task state transitions. It implements context manager protocol to be able
|
||||
to register and unregister with a given engine automatically when a context
|
||||
is entered and when it is exited.
|
||||
|
||||
To implement a listener, derive from this class and override
|
||||
``_flow_receiver`` and/or ``_task_receiver`` methods (in this class,
|
||||
they do nothing).
|
||||
"""
|
||||
|
||||
def __init__(self, engine,
|
||||
@@ -112,11 +117,14 @@ class ListenerBase(object):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class LoggingBase(ListenerBase):
|
||||
"""This provides a simple listener that can be attached to an engine which
|
||||
can be derived from to log the received actions to some logging backend. It
|
||||
provides a useful context manager access to be able to register and
|
||||
unregister with a given engine automatically when a context is entered and
|
||||
when it is exited.
|
||||
"""Abstract base class for logging listeners.
|
||||
|
||||
This provides a simple listener that can be attached to an engine which can
|
||||
be derived from to log task and/or flow state transitions to some logging
|
||||
backend.
|
||||
|
||||
To implement your own logging listener derive form this class and
|
||||
override ``_log`` method.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@@ -25,10 +25,12 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoggingListener(base.LoggingBase):
|
||||
"""Listens for task and flow notifications and writes those notifications
|
||||
to a provided logging backend (if none is provided then this modules
|
||||
logger is used instead) using a configurable logging level (logging.DEBUG
|
||||
if not provided).
|
||||
"""Listener that logs notifications it receives.
|
||||
|
||||
It listens for task and flow notifications and writes those
|
||||
notifications to provided logger, or logger of its module
|
||||
(``taskflow.listeners.logging``) if none provided. Log level
|
||||
can also be configured, ``logging.DEBUG`` is used by default.
|
||||
"""
|
||||
def __init__(self, engine,
|
||||
task_listen_for=(misc.TransitionNotifier.ANY,),
|
||||
|
@@ -32,6 +32,12 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TimingListener(base.ListenerBase):
|
||||
"""Listener that captures task duration.
|
||||
|
||||
It records how long a task took to execute (or fail)
|
||||
to storage. It saves the duration in seconds as float value
|
||||
to task metadata with key ``'duration'``.
|
||||
"""
|
||||
def __init__(self, engine):
|
||||
super(TimingListener, self).__init__(engine,
|
||||
task_listen_for=WATCH_STATES,
|
||||
|
@@ -412,6 +412,7 @@ class TransitionNotifier(object):
|
||||
return count
|
||||
|
||||
def is_registered(self, state, callback):
|
||||
"""Check if a callback is registered."""
|
||||
listeners = list(self._listeners.get(state, []))
|
||||
for (cb, _args, _kwargs) in listeners:
|
||||
if reflection.is_same_callback(cb, callback):
|
||||
@@ -419,9 +420,18 @@ class TransitionNotifier(object):
|
||||
return False
|
||||
|
||||
def reset(self):
|
||||
"""Forget all previously registered callbacks."""
|
||||
self._listeners.clear()
|
||||
|
||||
def notify(self, state, details):
|
||||
"""Notify about state change.
|
||||
|
||||
All callbacks registered to receive notifications about given
|
||||
state will be called.
|
||||
|
||||
:param state: state we moved to
|
||||
:param details: addition transition details
|
||||
"""
|
||||
listeners = list(self._listeners.get(self.ANY, []))
|
||||
for i in self._listeners[state]:
|
||||
if i not in listeners:
|
||||
@@ -442,6 +452,14 @@ class TransitionNotifier(object):
|
||||
callback, state, details, exc_info=True)
|
||||
|
||||
def register(self, state, callback, args=None, kwargs=None):
|
||||
"""Register a callback to be called when state is changed.
|
||||
|
||||
Callback will be called with provided ``args`` and ``kwargs`` and
|
||||
when state is changed to ``state`` (or on any state change if
|
||||
``state`` equals to ``TransitionNotifier.ANY``). It will also
|
||||
get additional keyword argument, ``details``, that will hold
|
||||
transition details provided to :py:meth:`notify` method.
|
||||
"""
|
||||
assert six.callable(callback), "Callback must be callable"
|
||||
if self.is_registered(state, callback):
|
||||
raise ValueError("Callback %s already registered" % (callback))
|
||||
@@ -456,6 +474,7 @@ class TransitionNotifier(object):
|
||||
self._listeners[state].append((callback, args, kwargs))
|
||||
|
||||
def deregister(self, state, callback):
|
||||
"""Remove callback from listening to state ``state``."""
|
||||
if state not in self._listeners:
|
||||
return
|
||||
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
|
||||
|
Reference in New Issue
Block a user