Added EventTimeListner to record when events occur

Change-Id: Iaa3342efb1a98109ca5eb382efcd6bde402d437c
This commit is contained in:
Dan Krause
2015-03-10 12:35:50 -05:00
parent 8c088696ed
commit 7d3ae77dca
2 changed files with 59 additions and 0 deletions

View File

@@ -17,6 +17,7 @@
from __future__ import absolute_import
import itertools
import time
from debtcollector import moves
@@ -117,3 +118,38 @@ class PrintingDurationListener(DurationListener):
PrintingTimingListener = moves.moved_class(
PrintingDurationListener, 'PrintingTimingListener', __name__,
version="0.8", removal_version="?")
class EventTimeListener(base.Listener):
"""Writes task, flow, and retry event timestamps to atom metadata."""
def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR):
super(EventTimeListener, self).__init__(
engine, task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
def _record_atom_event(self, state, atom_name):
meta_update = {'%s-timestamp' % state: time.time()}
try:
# Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_atom_metadata(atom_name, meta_update)
except exc.StorageFailure:
LOG.warn("Failure to store timestamp %s for atom %s",
meta_update, atom_name, exc_info=True)
def _flow_receiver(self, state, details):
meta_update = {'%s-timestamp' % state: time.time()}
try:
# Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_flow_metadata(meta_update)
except exc.StorageFailure:
LOG.warn("Failure to store timestamp %s for flow %s",
meta_update, details['flow_name'], exc_info=True)
def _task_receiver(self, state, details):
self._record_atom_event(state, details['task_name'])
def _retry_receiver(self, state, details):
self._record_atom_event(state, details['retry_name'])

View File

@@ -235,6 +235,29 @@ class TestTimingListener(test.TestCase, EngineMakerMixin):
exc_info=True)
class TestEventTimeListener(test.TestCase, EngineMakerMixin):
def test_event_time(self):
flow = lf.Flow('flow1').add(SleepyTask("task1", sleep_for=0.1))
engine = self._make_engine(flow)
with timing.EventTimeListener(engine):
engine.run()
t_uuid = engine.storage.get_atom_uuid("task1")
td = engine.storage._flowdetail.find(t_uuid)
self.assertIsNotNone(td)
self.assertIsNotNone(td.meta)
running_field = '%s-timestamp' % states.RUNNING
success_field = '%s-timestamp' % states.SUCCESS
self.assertIn(running_field, td.meta)
self.assertIn(success_field, td.meta)
td_duration = td.meta[success_field] - td.meta[running_field]
self.assertGreaterEqual(0.1, td_duration)
fd_meta = engine.storage._flowdetail.meta
self.assertIn(running_field, fd_meta)
self.assertIn(success_field, fd_meta)
fd_duration = fd_meta[success_field] - fd_meta[running_field]
self.assertGreaterEqual(0.1, fd_duration)
class TestLoggingListeners(test.TestCase, EngineMakerMixin):
def _make_logger(self, level=logging.DEBUG):
log = logging.getLogger(