From 6bbf85b5a50437a65a8ce2acba9eb73c5003ff78 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 8 Jul 2014 14:51:33 -0700 Subject: [PATCH] Add a timing listener that also prints the results Instead of just recording them it can also be quite useful (especially for debugging) to print the start and stop timings as they occur. Also adds an example that shows how this can be used and an explanation of why it is useful to have this type of capability. Part of blueprint more-examples Change-Id: Id2dc3f8dc9ac94e511470e39f499f325b33537ee --- doc/source/examples.rst | 12 ++++++ doc/source/notifications.rst | 2 + taskflow/examples/timing_listener.py | 59 ++++++++++++++++++++++++++++ taskflow/listeners/timing.py | 51 ++++++++++++++++++++---- 4 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 taskflow/examples/timing_listener.py diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 9199bc11..365794af 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -34,6 +34,18 @@ Building a car :linenos: :lines: 16- +Watching execution timing +========================= + +.. note:: + + Full source located at :example:`timing_listener`. + +.. literalinclude:: ../../taskflow/examples/timing_listener.py + :language: python + :linenos: + :lines: 16- + Linear equation solver (explicit dependencies) ============================================== diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 3fe430de..249c3a7f 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -165,3 +165,5 @@ Timing listener --------------- .. autoclass:: taskflow.listeners.timing.TimingListener + +.. autoclass:: taskflow.listeners.timing.PrintingTimingListener diff --git a/taskflow/examples/timing_listener.py b/taskflow/examples/timing_listener.py new file mode 100644 index 00000000..ab53a9aa --- /dev/null +++ b/taskflow/examples/timing_listener.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import random +import sys +import time + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.listeners import timing +from taskflow.patterns import linear_flow as lf +from taskflow import task + +# INTRO: in this example we will attach a listener to an engine +# and have variable run time tasks run and show how the listener will print +# out how long those tasks took (when they started and when they finished). +# +# This shows how timing metrics can be gathered (or attached onto a engine) +# after a workflow has been constructed, making it easy to gather metrics +# dynamically for situations where this kind of information is applicable (or +# even adding this information on at a later point in the future when your +# application starts to slow down). + + +class VariableTask(task.Task): + def __init__(self, name): + super(VariableTask, self).__init__(name) + self._sleepy_time = random.random() + + def execute(self): + time.sleep(self._sleepy_time) + + +f = lf.Flow('root') +f.add(VariableTask('a'), VariableTask('b'), VariableTask('c')) +e = engines.load(f) +with timing.PrintingTimingListener(e): + e.run() diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index e21dd642..4a08256e 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -16,6 +16,7 @@ from __future__ import absolute_import +import itertools import logging from taskflow import exceptions as exc @@ -23,14 +24,21 @@ from taskflow.listeners import base from taskflow import states from taskflow.types import timing as tt -STARTING_STATES = (states.RUNNING, states.REVERTING) -FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,) -WATCH_STATES = frozenset(FINISHED_STATES + STARTING_STATES + - (states.PENDING,)) +STARTING_STATES = frozenset((states.RUNNING, states.REVERTING)) +FINISHED_STATES = frozenset((base.FINISH_STATES + (states.REVERTED,))) +WATCH_STATES = frozenset(itertools.chain(FINISHED_STATES, STARTING_STATES, + [states.PENDING])) LOG = logging.getLogger(__name__) +# TODO(harlowja): get rid of this when we can just support python 3.x and use +# its print function directly instead of having to wrap it in a helper function +# due to how python 2.x print is a language built-in and not a function... +def _printer(message): + print(message) + + class TimingListener(base.ListenerBase): """Listener that captures task duration. @@ -46,11 +54,17 @@ class TimingListener(base.ListenerBase): def deregister(self): super(TimingListener, self).deregister() + # There should be none that still exist at deregistering time, so log a + # warning if there were any that somehow still got left behind... + leftover_timers = len(self._timers) + if leftover_timers: + LOG.warn("%s task(s) did not enter %s states", leftover_timers, + FINISHED_STATES) self._timers.clear() def _record_ending(self, timer, task_name): meta_update = { - 'duration': float(timer.elapsed()), + 'duration': timer.elapsed(), } try: # Don't let storage failures throw exceptions in a listener method. @@ -66,5 +80,28 @@ class TimingListener(base.ListenerBase): elif state in STARTING_STATES: self._timers[task_name] = tt.StopWatch().start() elif state in FINISHED_STATES: - if task_name in self._timers: - self._record_ending(self._timers[task_name], task_name) + timer = self._timers.pop(task_name, None) + if timer is not None: + timer.stop() + self._record_ending(timer, task_name) + + +class PrintingTimingListener(TimingListener): + """Listener that prints the start & stop timing as well as recording it.""" + + def __init__(self, engine, printer=None): + super(PrintingTimingListener, self).__init__(engine) + if printer is None: + self._printer = _printer + else: + self._printer = printer + + def _record_ending(self, timer, task_name): + super(PrintingTimingListener, self)._record_ending(timer, task_name) + self._printer("It took task '%s' %0.2f seconds to" + " finish." % (task_name, timer.elapsed())) + + def _task_receiver(self, state, details): + super(PrintingTimingListener, self)._task_receiver(state, details) + if state in STARTING_STATES: + self._printer("'%s' task started." % (details['task_name']))