From 778e210e17e2a3858f644e7f11418f6ad2c465f1 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 9 Jan 2015 16:48:37 -0800 Subject: [PATCH] Include the 'old_state' in all currently provided listeners Since the key 'old_state' is now standard across all notifications sent from flows, retries or tasks we can safely use it in the listeners without worrying about hitting key errors when it does not exist/is not provided. This also adds an example which shows how to use the dynamic logging listener to view this information. Change-Id: If4456440674347a3fe4972d9d0fa16ba8373ef9f --- doc/source/examples.rst | 12 +++++++ doc/source/notifications.rst | 12 +++---- taskflow/examples/echo_listener.py | 56 ++++++++++++++++++++++++++++++ taskflow/listeners/base.py | 18 +++++----- taskflow/listeners/logging.py | 21 ++++++----- 5 files changed, 95 insertions(+), 24 deletions(-) create mode 100644 taskflow/examples/echo_listener.py diff --git a/doc/source/examples.rst b/doc/source/examples.rst index f839e353..d30bd85f 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -22,6 +22,18 @@ Passing values from and to tasks :linenos: :lines: 16- +Using listeners +=============== + +.. note:: + + Full source located at :example:`echo_listener`. + +.. literalinclude:: ../../taskflow/examples/echo_listener.py + :language: python + :linenos: + :lines: 16- + Making phone calls ================== diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 13c550ee..3fd35e2b 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -136,14 +136,14 @@ For example, this is how you can use >>> with printing.PrintingListener(eng): ... eng.run() ... - has moved flow 'cat-dog' (...) into state 'RUNNING' - has moved task 'CatTalk' (...) into state 'RUNNING' + has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING' + has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING' meow - has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False) - has moved task 'DogTalk' (...) into state 'RUNNING' + has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False) + has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING' woof - has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False) - has moved flow 'cat-dog' (...) into state 'SUCCESS' + has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False) + has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING' Basic listener -------------- diff --git a/taskflow/examples/echo_listener.py b/taskflow/examples/echo_listener.py new file mode 100644 index 00000000..a8eebf60 --- /dev/null +++ b/taskflow/examples/echo_listener.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.DEBUG) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.listeners import logging as logging_listener +from taskflow.patterns import linear_flow as lf +from taskflow import task + +# INTRO: This example walks through a miniature workflow which will do a +# simple echo operation; during this execution a listener is assocated with +# the engine to recieve all notifications about what the flow has performed, +# this example dumps that output to the stdout for viewing (at debug level +# to show all the information which is possible). + + +class Echo(task.Task): + def execute(self): + print(self.name) + + +# Generate the work to be done (but don't do it yet). +wf = lf.Flow('abc') +wf.add(Echo('a')) +wf.add(Echo('b')) +wf.add(Echo('c')) + +# This will associate the listener with the engine (the listener +# will automatically register for notifications with the engine and deregister +# when the context is exited). +e = engines.load(wf) +with logging_listener.DynamicLoggingListener(e): + e.run() diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index 0101e8de..42dc87e9 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -187,9 +187,9 @@ class DumpingListener(Listener): """Dumps the provided *templated* message to some output.""" def _flow_receiver(self, state, details): - self._dump("%s has moved flow '%s' (%s) into state '%s'", - self._engine, details['flow_name'], - details['flow_uuid'], state) + self._dump("%s has moved flow '%s' (%s) into state '%s'" + " from state '%s'", self._engine, details['flow_name'], + details['flow_uuid'], state, details['old_state']) def _task_receiver(self, state, details): if state in FINISH_STATES: @@ -201,14 +201,14 @@ class DumpingListener(Listener): exc_info = tuple(result.exc_info) was_failure = True self._dump("%s has moved task '%s' (%s) into state '%s'" - " with result '%s' (failure=%s)", + " from state '%s' with result '%s' (failure=%s)", self._engine, details['task_name'], - details['task_uuid'], state, result, was_failure, - exc_info=exc_info) + details['task_uuid'], state, details['old_state'], + result, was_failure, exc_info=exc_info) else: - self._dump("%s has moved task '%s' (%s) into state '%s'", - self._engine, details['task_name'], - details['task_uuid'], state) + self._dump("%s has moved task '%s' (%s) into state '%s'" + " from state '%s'", self._engine, details['task_name'], + details['task_uuid'], state, details['old_state']) # TODO(harlowja): remove in 0.7 or later... diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 03055257..3245d2ba 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -167,8 +167,9 @@ class DynamicLoggingListener(base.Listener): exc_info, exc_details = self._format_failure(result) self._logger.log(self._failure_level, "Task '%s' (%s) transitioned into state" - " '%s'%s", details['task_name'], - details['task_uuid'], state, exc_details, + " '%s' from state '%s'%s", + details['task_name'], details['task_uuid'], + state, details['old_state'], exc_details, exc_info=exc_info) else: # Otherwise, depending on the enabled logging level/state we @@ -178,17 +179,19 @@ class DynamicLoggingListener(base.Listener): if (_isEnabledFor(self._logger, self._level) or state == states.FAILURE): self._logger.log(level, "Task '%s' (%s) transitioned into" - " state '%s' with result '%s'", - details['task_name'], + " state '%s' from state '%s' with" + " result '%s'", details['task_name'], details['task_uuid'], state, - result) + details['old_state'], result) else: self._logger.log(level, "Task '%s' (%s) transitioned into" - " state '%s'", details['task_name'], - details['task_uuid'], state) + " state '%s' from state '%s'", + details['task_name'], + details['task_uuid'], state, + details['old_state']) else: # Just a intermediary state, carry on! level = self._task_log_levels.get(state, self._level) self._logger.log(level, "Task '%s' (%s) transitioned into state" - " '%s'", details['task_name'], - details['task_uuid'], state) + " '%s' from state '%s'", details['task_name'], + details['task_uuid'], state, details['old_state'])