Add a timing listener that also prints the results

Instead of just recording them it can also be quite
useful (especially for debugging) to print the start
and stop timings as they occur.

Also adds an example that shows how this can be used
and an explanation of why it is useful to have this type
of capability.

Part of blueprint more-examples

Change-Id: Id2dc3f8dc9ac94e511470e39f499f325b33537ee
This commit is contained in:
Joshua Harlow
2014-07-08 14:51:33 -07:00
parent 3465e0340b
commit 6bbf85b5a5
4 changed files with 117 additions and 7 deletions

View File

@@ -34,6 +34,18 @@ Building a car
:linenos:
:lines: 16-
Watching execution timing
=========================
.. note::
Full source located at :example:`timing_listener`.
.. literalinclude:: ../../taskflow/examples/timing_listener.py
:language: python
:linenos:
:lines: 16-
Linear equation solver (explicit dependencies)
==============================================

View File

@@ -165,3 +165,5 @@ Timing listener
---------------
.. autoclass:: taskflow.listeners.timing.TimingListener
.. autoclass:: taskflow.listeners.timing.PrintingTimingListener

View File

@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import random
import sys
import time
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow import task
# INTRO: in this example we will attach a listener to an engine
# and have variable run time tasks run and show how the listener will print
# out how long those tasks took (when they started and when they finished).
#
# This shows how timing metrics can be gathered (or attached onto a engine)
# after a workflow has been constructed, making it easy to gather metrics
# dynamically for situations where this kind of information is applicable (or
# even adding this information on at a later point in the future when your
# application starts to slow down).
class VariableTask(task.Task):
def __init__(self, name):
super(VariableTask, self).__init__(name)
self._sleepy_time = random.random()
def execute(self):
time.sleep(self._sleepy_time)
f = lf.Flow('root')
f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
e = engines.load(f)
with timing.PrintingTimingListener(e):
e.run()

View File

@@ -16,6 +16,7 @@
from __future__ import absolute_import
import itertools
import logging
from taskflow import exceptions as exc
@@ -23,14 +24,21 @@ from taskflow.listeners import base
from taskflow import states
from taskflow.types import timing as tt
STARTING_STATES = (states.RUNNING, states.REVERTING)
FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,)
WATCH_STATES = frozenset(FINISHED_STATES + STARTING_STATES +
(states.PENDING,))
STARTING_STATES = frozenset((states.RUNNING, states.REVERTING))
FINISHED_STATES = frozenset((base.FINISH_STATES + (states.REVERTED,)))
WATCH_STATES = frozenset(itertools.chain(FINISHED_STATES, STARTING_STATES,
[states.PENDING]))
LOG = logging.getLogger(__name__)
# TODO(harlowja): get rid of this when we can just support python 3.x and use
# its print function directly instead of having to wrap it in a helper function
# due to how python 2.x print is a language built-in and not a function...
def _printer(message):
print(message)
class TimingListener(base.ListenerBase):
"""Listener that captures task duration.
@@ -46,11 +54,17 @@ class TimingListener(base.ListenerBase):
def deregister(self):
super(TimingListener, self).deregister()
# There should be none that still exist at deregistering time, so log a
# warning if there were any that somehow still got left behind...
leftover_timers = len(self._timers)
if leftover_timers:
LOG.warn("%s task(s) did not enter %s states", leftover_timers,
FINISHED_STATES)
self._timers.clear()
def _record_ending(self, timer, task_name):
meta_update = {
'duration': float(timer.elapsed()),
'duration': timer.elapsed(),
}
try:
# Don't let storage failures throw exceptions in a listener method.
@@ -66,5 +80,28 @@ class TimingListener(base.ListenerBase):
elif state in STARTING_STATES:
self._timers[task_name] = tt.StopWatch().start()
elif state in FINISHED_STATES:
if task_name in self._timers:
self._record_ending(self._timers[task_name], task_name)
timer = self._timers.pop(task_name, None)
if timer is not None:
timer.stop()
self._record_ending(timer, task_name)
class PrintingTimingListener(TimingListener):
"""Listener that prints the start & stop timing as well as recording it."""
def __init__(self, engine, printer=None):
super(PrintingTimingListener, self).__init__(engine)
if printer is None:
self._printer = _printer
else:
self._printer = printer
def _record_ending(self, timer, task_name):
super(PrintingTimingListener, self)._record_ending(timer, task_name)
self._printer("It took task '%s' %0.2f seconds to"
" finish." % (task_name, timer.elapsed()))
def _task_receiver(self, state, details):
super(PrintingTimingListener, self)._task_receiver(state, details)
if state in STARTING_STATES:
self._printer("'%s' task started." % (details['task_name']))