diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index a75717a7..bf9b5229 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -17,6 +17,7 @@ from __future__ import absolute_import import itertools +import time from debtcollector import moves @@ -117,3 +118,38 @@ class PrintingDurationListener(DurationListener): PrintingTimingListener = moves.moved_class( PrintingDurationListener, 'PrintingTimingListener', __name__, version="0.8", removal_version="?") + + +class EventTimeListener(base.Listener): + """Writes task, flow, and retry event timestamps to atom metadata.""" + def __init__(self, engine, + task_listen_for=base.DEFAULT_LISTEN_FOR, + flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR): + super(EventTimeListener, self).__init__( + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) + + def _record_atom_event(self, state, atom_name): + meta_update = {'%s-timestamp' % state: time.time()} + try: + # Don't let storage failures throw exceptions in a listener method. + self._engine.storage.update_atom_metadata(atom_name, meta_update) + except exc.StorageFailure: + LOG.warn("Failure to store timestamp %s for atom %s", + meta_update, atom_name, exc_info=True) + + def _flow_receiver(self, state, details): + meta_update = {'%s-timestamp' % state: time.time()} + try: + # Don't let storage failures throw exceptions in a listener method. + self._engine.storage.update_flow_metadata(meta_update) + except exc.StorageFailure: + LOG.warn("Failure to store timestamp %s for flow %s", + meta_update, details['flow_name'], exc_info=True) + + def _task_receiver(self, state, details): + self._record_atom_event(state, details['task_name']) + + def _retry_receiver(self, state, details): + self._record_atom_event(state, details['retry_name']) diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index d6c64bbd..db46b7fb 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -235,6 +235,29 @@ class TestTimingListener(test.TestCase, EngineMakerMixin): exc_info=True) +class TestEventTimeListener(test.TestCase, EngineMakerMixin): + def test_event_time(self): + flow = lf.Flow('flow1').add(SleepyTask("task1", sleep_for=0.1)) + engine = self._make_engine(flow) + with timing.EventTimeListener(engine): + engine.run() + t_uuid = engine.storage.get_atom_uuid("task1") + td = engine.storage._flowdetail.find(t_uuid) + self.assertIsNotNone(td) + self.assertIsNotNone(td.meta) + running_field = '%s-timestamp' % states.RUNNING + success_field = '%s-timestamp' % states.SUCCESS + self.assertIn(running_field, td.meta) + self.assertIn(success_field, td.meta) + td_duration = td.meta[success_field] - td.meta[running_field] + self.assertGreaterEqual(0.1, td_duration) + fd_meta = engine.storage._flowdetail.meta + self.assertIn(running_field, fd_meta) + self.assertIn(success_field, fd_meta) + fd_duration = fd_meta[success_field] - fd_meta[running_field] + self.assertGreaterEqual(0.1, fd_duration) + + class TestLoggingListeners(test.TestCase, EngineMakerMixin): def _make_logger(self, level=logging.DEBUG): log = logging.getLogger(