From b8e975e885e4f5aaab52edcd23fee99d450273ad Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 24 Nov 2014 19:44:52 -0800 Subject: [PATCH] Update listeners to ensure they correctly handle all atoms Instead of looking for keys that are task specific (as well as using the deprecated 'task_notifier') we need to update the listeners to be agnostic to atoms (retry or task) that are sent to them so that key errors do not occur when extracting any data sent along with the event notification. Fixes bug 1395966 Change-Id: Ib61b34b83203f5999f92b6e8616efd90cb259f81 --- taskflow/listeners/base.py | 125 ++++++++++++++++++++++----------- taskflow/listeners/logging.py | 13 ++-- taskflow/listeners/printing.py | 7 +- 3 files changed, 95 insertions(+), 50 deletions(-) diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index 739db49c5..f69bb87e4 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -36,32 +36,80 @@ FINISH_STATES = (states.FAILURE, states.SUCCESS) DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,) +def _task_matcher(details): + """Matches task details emitted.""" + if not details: + return False + if 'task_name' in details and 'task_uuid' in details: + return True + return False + + +def _retry_matcher(details): + """Matches retry details emitted.""" + if not details: + return False + if 'retry_name' in details and 'retry_uuid' in details: + return True + return False + + +def _bulk_deregister(notifier, registered, details_filter=None): + """Bulk deregisters callbacks associated with many states.""" + while registered: + state, cb = registered.pop() + notifier.deregister(state, cb, + details_filter=details_filter) + + +def _bulk_register(watch_states, notifier, cb, details_filter=None): + """Bulk registers a callback associated with many states.""" + registered = [] + try: + for state in watch_states: + if not notifier.is_registered(state, cb, + details_filter=details_filter): + notifier.register(state, cb, + details_filter=details_filter) + registered.append((state, cb)) + except ValueError: + with excutils.save_and_reraise_exception(): + _bulk_deregister(notifier, registered, + details_filter=details_filter) + else: + return registered + + class ListenerBase(object): """Base class for listeners. A listener can be attached to an engine to do various actions on flow and - task state transitions. It implements context manager protocol to be able - to register and unregister with a given engine automatically when a context - is entered and when it is exited. + atom state transitions. It implements the context manager protocol to be + able to register and unregister with a given engine automatically when a + context is entered and when it is exited. To implement a listener, derive from this class and override - ``_flow_receiver`` and/or ``_task_receiver`` methods (in this class, - they do nothing). + ``_flow_receiver`` and/or ``_task_receiver`` and/or ``_retry_receiver`` + methods (in this class, they do nothing). """ def __init__(self, engine, task_listen_for=DEFAULT_LISTEN_FOR, - flow_listen_for=DEFAULT_LISTEN_FOR): + flow_listen_for=DEFAULT_LISTEN_FOR, + retry_listen_for=DEFAULT_LISTEN_FOR): if not task_listen_for: task_listen_for = [] + if not retry_listen_for: + retry_listen_for = [] if not flow_listen_for: flow_listen_for = [] self._listen_for = { 'task': list(task_listen_for), + 'retry': list(retry_listen_for), 'flow': list(flow_listen_for), } self._engine = engine - self._registered = False + self._registered = {} def _flow_receiver(self, state, details): pass @@ -69,43 +117,38 @@ class ListenerBase(object): def _task_receiver(self, state, details): pass + def _retry_receiver(self, state, details): + pass + def deregister(self): - if not self._registered: - return - - def _deregister(watch_states, notifier, cb): - for s in watch_states: - notifier.deregister(s, cb) - - _deregister(self._listen_for['task'], self._engine.task_notifier, - self._task_receiver) - _deregister(self._listen_for['flow'], self._engine.notifier, - self._flow_receiver) - - self._registered = False + if 'task' in self._registered: + _bulk_deregister(self._engine.atom_notifier, + self._registered['task'], + details_filter=_task_matcher) + del self._registered['task'] + if 'retry' in self._registered: + _bulk_deregister(self._engine.atom_notifier, + self._registered['retry'], + details_filter=_retry_matcher) + del self._registered['retry'] + if 'flow' in self._registered: + _bulk_deregister(self._engine.notifier, + self._registered['flow']) + del self._registered['flow'] def register(self): - if self._registered: - return - - def _register(watch_states, notifier, cb): - registered = [] - try: - for s in watch_states: - if not notifier.is_registered(s, cb): - notifier.register(s, cb) - registered.append((s, cb)) - except ValueError: - with excutils.save_and_reraise_exception(): - for (s, cb) in registered: - notifier.deregister(s, cb) - - _register(self._listen_for['task'], self._engine.task_notifier, - self._task_receiver) - _register(self._listen_for['flow'], self._engine.notifier, - self._flow_receiver) - - self._registered = True + if 'task' not in self._registered: + self._registered['task'] = _bulk_register( + self._listen_for['task'], self._engine.atom_notifier, + self._task_receiver, details_filter=_task_matcher) + if 'retry' not in self._registered: + self._registered['retry'] = _bulk_register( + self._listen_for['retry'], self._engine.atom_notifier, + self._retry_receiver, details_filter=_retry_matcher) + if 'flow' not in self._registered: + self._registered['flow'] = _bulk_register( + self._listen_for['flow'], self._engine.notifier, + self._flow_receiver) def __enter__(self): self.register() diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 20ff1baaf..51bf693c5 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -53,11 +53,12 @@ class LoggingListener(base.LoggingBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, log=None, level=logging.DEBUG): - super(LoggingListener, self).__init__(engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + super(LoggingListener, self).__init__( + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) if not log: self._logger = LOG else: @@ -101,12 +102,12 @@ class DynamicLoggingListener(base.ListenerBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, log=None, failure_level=logging.WARNING, level=logging.DEBUG): super(DynamicLoggingListener, self).__init__( - engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) self._failure_level = failure_level self._level = level self._task_log_levels = { diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py index 397914ed4..719d20424 100644 --- a/taskflow/listeners/printing.py +++ b/taskflow/listeners/printing.py @@ -27,10 +27,11 @@ class PrintingListener(base.LoggingBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, stderr=False): - super(PrintingListener, self).__init__(engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + super(PrintingListener, self).__init__( + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) if stderr: self._file = sys.stderr else: