Add flow durations to DurationListener

This modifies the private interface a little so if
people have custom subclasses, it will require them
to modify their _record_ending function to take additional
arguments.  Afaik, we're the only ones insane enough to
do such a thing and we're ok with the change.

Change-Id: I24444d65a566023afcd1546e4f784ecae36d6126
Fixes: #1510248
This commit is contained in:
Greg Hill
2015-10-28 12:01:36 -05:00
parent 0095b0439a
commit 634f8a5923
2 changed files with 55 additions and 25 deletions

View File

@@ -17,11 +17,13 @@
from __future__ import absolute_import from __future__ import absolute_import
import itertools import itertools
import six
import time import time
from debtcollector import moves from debtcollector import moves
from oslo_utils import timeutils from oslo_utils import timeutils
from taskflow.engines.action_engine import compiler as co
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow.listeners import base from taskflow.listeners import base
from taskflow import logging from taskflow import logging
@@ -52,41 +54,54 @@ class DurationListener(base.Listener):
def __init__(self, engine): def __init__(self, engine):
super(DurationListener, self).__init__(engine, super(DurationListener, self).__init__(engine,
task_listen_for=WATCH_STATES, task_listen_for=WATCH_STATES,
flow_listen_for=[]) flow_listen_for=WATCH_STATES)
self._timers = {} self._timers = {co.TASK: {}, co.FLOW: {}}
def deregister(self): def deregister(self):
super(DurationListener, self).deregister() super(DurationListener, self).deregister()
# There should be none that still exist at deregistering time, so log a # There should be none that still exist at deregistering time, so log a
# warning if there were any that somehow still got left behind... # warning if there were any that somehow still got left behind...
leftover_timers = len(self._timers) for item_type, timers in six.iteritems(self._timers):
leftover_timers = len(timers)
if leftover_timers: if leftover_timers:
LOG.warn("%s task(s) did not enter %s states", leftover_timers, LOG.warn("%s %s(s) did not enter %s states", leftover_timers,
FINISHED_STATES) item_type, FINISHED_STATES)
self._timers.clear() timers.clear()
def _record_ending(self, timer, task_name): def _record_ending(self, timer, item_type, item_name, state):
meta_update = { meta_update = {
'duration': timer.elapsed(), 'duration': timer.elapsed(),
} }
try: try:
storage = self._engine.storage
# Don't let storage failures throw exceptions in a listener method. # Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_atom_metadata(task_name, meta_update) if item_type == co.FLOW:
storage.update_flow_metadata(meta_update)
else:
storage.update_atom_metadata(item_name, meta_update)
except exc.StorageFailure: except exc.StorageFailure:
LOG.warn("Failure to store duration update %s for task %s", LOG.warn("Failure to store duration update %s for %s %s",
meta_update, task_name, exc_info=True) meta_update, item_type, item_name, exc_info=True)
def _task_receiver(self, state, details): def _task_receiver(self, state, details):
task_name = details['task_name'] task_name = details['task_name']
self._receiver(co.TASK, task_name, state)
def _flow_receiver(self, state, details):
flow_name = details['flow_name']
self._receiver(co.FLOW, flow_name, state)
def _receiver(self, item_type, item_name, state):
if state == states.PENDING: if state == states.PENDING:
self._timers.pop(task_name, None) self._timers[item_type].pop(item_name, None)
elif state in STARTING_STATES: elif state in STARTING_STATES:
self._timers[task_name] = timeutils.StopWatch().start() self._timers[item_type][item_name] = timeutils.StopWatch().start()
elif state in FINISHED_STATES: elif state in FINISHED_STATES:
timer = self._timers.pop(task_name, None) timer = self._timers[item_type].pop(item_name, None)
if timer is not None: if timer is not None:
timer.stop() timer.stop()
self._record_ending(timer, task_name) self._record_ending(timer, item_type, item_name, state)
TimingListener = moves.moved_class(DurationListener, TimingListener = moves.moved_class(DurationListener,
@@ -104,15 +119,17 @@ class PrintingDurationListener(DurationListener):
else: else:
self._printer = printer self._printer = printer
def _record_ending(self, timer, task_name): def _record_ending(self, timer, item_type, item_name, state):
super(PrintingDurationListener, self)._record_ending(timer, task_name) super(PrintingDurationListener, self)._record_ending(
self._printer("It took task '%s' %0.2f seconds to" timer, item_type, item_name, state)
" finish." % (task_name, timer.elapsed())) self._printer("It took %s '%s' %0.2f seconds to"
" finish." % (item_type, item_name, timer.elapsed()))
def _task_receiver(self, state, details): def _receiver(self, item_type, item_name, state):
super(PrintingDurationListener, self)._task_receiver(state, details) super(PrintingDurationListener, self)._receiver(item_type,
item_name, state)
if state in STARTING_STATES: if state in STARTING_STATES:
self._printer("'%s' task started." % (details['task_name'])) self._printer("'%s' %s started." % (item_name, item_type))
PrintingTimingListener = moves.moved_class( PrintingTimingListener = moves.moved_class(

View File

@@ -214,7 +214,7 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
l.register() l.register()
l.deregister() l.deregister()
def test_duration(self): def test_task_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be: with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test") flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1)) flow.add(SleepyTask("test-1", sleep_for=0.1))
@@ -229,6 +229,19 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
self.assertIn('duration', td.meta) self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration']) self.assertGreaterEqual(0.1, td.meta['duration'])
def test_flow_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
with timing.DurationListener(e):
e.run()
self.assertIsNotNone(fd)
self.assertIsNotNone(fd.meta)
self.assertIn('duration', fd.meta)
self.assertGreaterEqual(0.1, fd.meta['duration'])
@mock.patch.object(timing.LOG, 'warn') @mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn): def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend()) as be: with contextlib.closing(impl_memory.MemoryBackend()) as be:
@@ -242,8 +255,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
mocked_uam.side_effect = exc.StorageFailure('Woot!') mocked_uam.side_effect = exc.StorageFailure('Woot!')
with duration_listener: with duration_listener:
e.run() e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1', mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'task',
exc_info=True) 'test-1', exc_info=True)
class TestEventTimeListener(test.TestCase, EngineMakerMixin): class TestEventTimeListener(test.TestCase, EngineMakerMixin):