diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 9199bc11..365794af 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -34,6 +34,18 @@ Building a car :linenos: :lines: 16- +Watching execution timing +========================= + +.. note:: + + Full source located at :example:`timing_listener`. + +.. literalinclude:: ../../taskflow/examples/timing_listener.py + :language: python + :linenos: + :lines: 16- + Linear equation solver (explicit dependencies) ============================================== diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 3fe430de..249c3a7f 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -165,3 +165,5 @@ Timing listener --------------- .. autoclass:: taskflow.listeners.timing.TimingListener + +.. autoclass:: taskflow.listeners.timing.PrintingTimingListener diff --git a/taskflow/examples/timing_listener.py b/taskflow/examples/timing_listener.py new file mode 100644 index 00000000..ab53a9aa --- /dev/null +++ b/taskflow/examples/timing_listener.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import random +import sys +import time + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.listeners import timing +from taskflow.patterns import linear_flow as lf +from taskflow import task + +# INTRO: in this example we will attach a listener to an engine +# and have variable run time tasks run and show how the listener will print +# out how long those tasks took (when they started and when they finished). +# +# This shows how timing metrics can be gathered (or attached onto a engine) +# after a workflow has been constructed, making it easy to gather metrics +# dynamically for situations where this kind of information is applicable (or +# even adding this information on at a later point in the future when your +# application starts to slow down). + + +class VariableTask(task.Task): + def __init__(self, name): + super(VariableTask, self).__init__(name) + self._sleepy_time = random.random() + + def execute(self): + time.sleep(self._sleepy_time) + + +f = lf.Flow('root') +f.add(VariableTask('a'), VariableTask('b'), VariableTask('c')) +e = engines.load(f) +with timing.PrintingTimingListener(e): + e.run() diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index e21dd642..4a08256e 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -16,6 +16,7 @@ from __future__ import absolute_import +import itertools import logging from taskflow import exceptions as exc @@ -23,14 +24,21 @@ from taskflow.listeners import base from taskflow import states from taskflow.types import timing as tt -STARTING_STATES = (states.RUNNING, states.REVERTING) -FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,) -WATCH_STATES = frozenset(FINISHED_STATES + STARTING_STATES + - (states.PENDING,)) +STARTING_STATES = frozenset((states.RUNNING, states.REVERTING)) +FINISHED_STATES = frozenset((base.FINISH_STATES + (states.REVERTED,))) +WATCH_STATES = frozenset(itertools.chain(FINISHED_STATES, STARTING_STATES, + [states.PENDING])) LOG = logging.getLogger(__name__) +# TODO(harlowja): get rid of this when we can just support python 3.x and use +# its print function directly instead of having to wrap it in a helper function +# due to how python 2.x print is a language built-in and not a function... +def _printer(message): + print(message) + + class TimingListener(base.ListenerBase): """Listener that captures task duration. @@ -46,11 +54,17 @@ class TimingListener(base.ListenerBase): def deregister(self): super(TimingListener, self).deregister() + # There should be none that still exist at deregistering time, so log a + # warning if there were any that somehow still got left behind... + leftover_timers = len(self._timers) + if leftover_timers: + LOG.warn("%s task(s) did not enter %s states", leftover_timers, + FINISHED_STATES) self._timers.clear() def _record_ending(self, timer, task_name): meta_update = { - 'duration': float(timer.elapsed()), + 'duration': timer.elapsed(), } try: # Don't let storage failures throw exceptions in a listener method. @@ -66,5 +80,28 @@ class TimingListener(base.ListenerBase): elif state in STARTING_STATES: self._timers[task_name] = tt.StopWatch().start() elif state in FINISHED_STATES: - if task_name in self._timers: - self._record_ending(self._timers[task_name], task_name) + timer = self._timers.pop(task_name, None) + if timer is not None: + timer.stop() + self._record_ending(timer, task_name) + + +class PrintingTimingListener(TimingListener): + """Listener that prints the start & stop timing as well as recording it.""" + + def __init__(self, engine, printer=None): + super(PrintingTimingListener, self).__init__(engine) + if printer is None: + self._printer = _printer + else: + self._printer = printer + + def _record_ending(self, timer, task_name): + super(PrintingTimingListener, self)._record_ending(timer, task_name) + self._printer("It took task '%s' %0.2f seconds to" + " finish." % (task_name, timer.elapsed())) + + def _task_receiver(self, state, details): + super(PrintingTimingListener, self)._task_receiver(state, details) + if state in STARTING_STATES: + self._printer("'%s' task started." % (details['task_name']))