Merge "Add flow durations to DurationListener"

This commit is contained in:
Jenkins 2016-01-20 20:23:11 +00:00 committed by Gerrit Code Review
commit 1b45e5eaec
2 changed files with 55 additions and 25 deletions

View File

@ -17,11 +17,13 @@
from __future__ import absolute_import
import itertools
import six
import time
from debtcollector import moves
from oslo_utils import timeutils
from taskflow.engines.action_engine import compiler as co
from taskflow import exceptions as exc
from taskflow.listeners import base
from taskflow import logging
@ -52,41 +54,54 @@ class DurationListener(base.Listener):
def __init__(self, engine):
super(DurationListener, self).__init__(engine,
task_listen_for=WATCH_STATES,
flow_listen_for=[])
self._timers = {}
flow_listen_for=WATCH_STATES)
self._timers = {co.TASK: {}, co.FLOW: {}}
def deregister(self):
super(DurationListener, self).deregister()
# There should be none that still exist at deregistering time, so log a
# warning if there were any that somehow still got left behind...
leftover_timers = len(self._timers)
if leftover_timers:
LOG.warn("%s task(s) did not enter %s states", leftover_timers,
FINISHED_STATES)
self._timers.clear()
for item_type, timers in six.iteritems(self._timers):
leftover_timers = len(timers)
if leftover_timers:
LOG.warn("%s %s(s) did not enter %s states", leftover_timers,
item_type, FINISHED_STATES)
timers.clear()
def _record_ending(self, timer, task_name):
def _record_ending(self, timer, item_type, item_name, state):
meta_update = {
'duration': timer.elapsed(),
}
try:
storage = self._engine.storage
# Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_atom_metadata(task_name, meta_update)
if item_type == co.FLOW:
storage.update_flow_metadata(meta_update)
else:
storage.update_atom_metadata(item_name, meta_update)
except exc.StorageFailure:
LOG.warn("Failure to store duration update %s for task %s",
meta_update, task_name, exc_info=True)
LOG.warn("Failure to store duration update %s for %s %s",
meta_update, item_type, item_name, exc_info=True)
def _task_receiver(self, state, details):
task_name = details['task_name']
self._receiver(co.TASK, task_name, state)
def _flow_receiver(self, state, details):
flow_name = details['flow_name']
self._receiver(co.FLOW, flow_name, state)
def _receiver(self, item_type, item_name, state):
if state == states.PENDING:
self._timers.pop(task_name, None)
self._timers[item_type].pop(item_name, None)
elif state in STARTING_STATES:
self._timers[task_name] = timeutils.StopWatch().start()
self._timers[item_type][item_name] = timeutils.StopWatch().start()
elif state in FINISHED_STATES:
timer = self._timers.pop(task_name, None)
timer = self._timers[item_type].pop(item_name, None)
if timer is not None:
timer.stop()
self._record_ending(timer, task_name)
self._record_ending(timer, item_type, item_name, state)
TimingListener = moves.moved_class(DurationListener,
@ -104,15 +119,17 @@ class PrintingDurationListener(DurationListener):
else:
self._printer = printer
def _record_ending(self, timer, task_name):
super(PrintingDurationListener, self)._record_ending(timer, task_name)
self._printer("It took task '%s' %0.2f seconds to"
" finish." % (task_name, timer.elapsed()))
def _record_ending(self, timer, item_type, item_name, state):
super(PrintingDurationListener, self)._record_ending(
timer, item_type, item_name, state)
self._printer("It took %s '%s' %0.2f seconds to"
" finish." % (item_type, item_name, timer.elapsed()))
def _task_receiver(self, state, details):
super(PrintingDurationListener, self)._task_receiver(state, details)
def _receiver(self, item_type, item_name, state):
super(PrintingDurationListener, self)._receiver(item_type,
item_name, state)
if state in STARTING_STATES:
self._printer("'%s' task started." % (details['task_name']))
self._printer("'%s' %s started." % (item_name, item_type))
PrintingTimingListener = moves.moved_class(

View File

@ -214,7 +214,7 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
l.register()
l.deregister()
def test_duration(self):
def test_task_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
@ -229,6 +229,19 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration'])
def test_flow_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
with timing.DurationListener(e):
e.run()
self.assertIsNotNone(fd)
self.assertIsNotNone(fd.meta)
self.assertIn('duration', fd.meta)
self.assertGreaterEqual(0.1, fd.meta['duration'])
@mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
@ -242,8 +255,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with duration_listener:
e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
exc_info=True)
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'task',
'test-1', exc_info=True)
class TestEventTimeListener(test.TestCase, EngineMakerMixin):