Include the 'old_state' in all currently provided listeners

Since the key 'old_state' is now standard across all notifications
sent from flows, retries or tasks we can safely use it in the listeners
without worrying about hitting key errors when it does not exist/is not
provided.

This also adds an example which shows how to use the dynamic logging
listener to view this information.

Change-Id: If4456440674347a3fe4972d9d0fa16ba8373ef9f
This commit is contained in:
Joshua Harlow
2015-01-09 16:48:37 -08:00
committed by Joshua Harlow
parent 96e6d971a0
commit 778e210e17
5 changed files with 95 additions and 24 deletions

View File

@@ -22,6 +22,18 @@ Passing values from and to tasks
:linenos:
:lines: 16-
Using listeners
===============
.. note::
Full source located at :example:`echo_listener`.
.. literalinclude:: ../../taskflow/examples/echo_listener.py
:language: python
:linenos:
:lines: 16-
Making phone calls
==================

View File

@@ -136,14 +136,14 @@ For example, this is how you can use
>>> with printing.PrintingListener(eng):
... eng.run()
...
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING'
meow
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING'
woof
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING'
Basic listener
--------------

View File

@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
logging.basicConfig(level=logging.DEBUG)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.listeners import logging as logging_listener
from taskflow.patterns import linear_flow as lf
from taskflow import task
# INTRO: This example walks through a miniature workflow which will do a
# simple echo operation; during this execution a listener is assocated with
# the engine to recieve all notifications about what the flow has performed,
# this example dumps that output to the stdout for viewing (at debug level
# to show all the information which is possible).
class Echo(task.Task):
def execute(self):
print(self.name)
# Generate the work to be done (but don't do it yet).
wf = lf.Flow('abc')
wf.add(Echo('a'))
wf.add(Echo('b'))
wf.add(Echo('c'))
# This will associate the listener with the engine (the listener
# will automatically register for notifications with the engine and deregister
# when the context is exited).
e = engines.load(wf)
with logging_listener.DynamicLoggingListener(e):
e.run()

View File

@@ -187,9 +187,9 @@ class DumpingListener(Listener):
"""Dumps the provided *templated* message to some output."""
def _flow_receiver(self, state, details):
self._dump("%s has moved flow '%s' (%s) into state '%s'",
self._engine, details['flow_name'],
details['flow_uuid'], state)
self._dump("%s has moved flow '%s' (%s) into state '%s'"
" from state '%s'", self._engine, details['flow_name'],
details['flow_uuid'], state, details['old_state'])
def _task_receiver(self, state, details):
if state in FINISH_STATES:
@@ -201,14 +201,14 @@ class DumpingListener(Listener):
exc_info = tuple(result.exc_info)
was_failure = True
self._dump("%s has moved task '%s' (%s) into state '%s'"
" with result '%s' (failure=%s)",
" from state '%s' with result '%s' (failure=%s)",
self._engine, details['task_name'],
details['task_uuid'], state, result, was_failure,
exc_info=exc_info)
details['task_uuid'], state, details['old_state'],
result, was_failure, exc_info=exc_info)
else:
self._dump("%s has moved task '%s' (%s) into state '%s'",
self._engine, details['task_name'],
details['task_uuid'], state)
self._dump("%s has moved task '%s' (%s) into state '%s'"
" from state '%s'", self._engine, details['task_name'],
details['task_uuid'], state, details['old_state'])
# TODO(harlowja): remove in 0.7 or later...

View File

@@ -167,8 +167,9 @@ class DynamicLoggingListener(base.Listener):
exc_info, exc_details = self._format_failure(result)
self._logger.log(self._failure_level,
"Task '%s' (%s) transitioned into state"
" '%s'%s", details['task_name'],
details['task_uuid'], state, exc_details,
" '%s' from state '%s'%s",
details['task_name'], details['task_uuid'],
state, details['old_state'], exc_details,
exc_info=exc_info)
else:
# Otherwise, depending on the enabled logging level/state we
@@ -178,17 +179,19 @@ class DynamicLoggingListener(base.Listener):
if (_isEnabledFor(self._logger, self._level)
or state == states.FAILURE):
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s' with result '%s'",
details['task_name'],
" state '%s' from state '%s' with"
" result '%s'", details['task_name'],
details['task_uuid'], state,
result)
details['old_state'], result)
else:
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s'", details['task_name'],
details['task_uuid'], state)
" state '%s' from state '%s'",
details['task_name'],
details['task_uuid'], state,
details['old_state'])
else:
# Just a intermediary state, carry on!
level = self._task_log_levels.get(state, self._level)
self._logger.log(level, "Task '%s' (%s) transitioned into state"
" '%s'", details['task_name'],
details['task_uuid'], state)
" '%s' from state '%s'", details['task_name'],
details['task_uuid'], state, details['old_state'])