diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index 739db49c5..f69bb87e4 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -36,32 +36,80 @@ FINISH_STATES = (states.FAILURE, states.SUCCESS) DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,) +def _task_matcher(details): + """Matches task details emitted.""" + if not details: + return False + if 'task_name' in details and 'task_uuid' in details: + return True + return False + + +def _retry_matcher(details): + """Matches retry details emitted.""" + if not details: + return False + if 'retry_name' in details and 'retry_uuid' in details: + return True + return False + + +def _bulk_deregister(notifier, registered, details_filter=None): + """Bulk deregisters callbacks associated with many states.""" + while registered: + state, cb = registered.pop() + notifier.deregister(state, cb, + details_filter=details_filter) + + +def _bulk_register(watch_states, notifier, cb, details_filter=None): + """Bulk registers a callback associated with many states.""" + registered = [] + try: + for state in watch_states: + if not notifier.is_registered(state, cb, + details_filter=details_filter): + notifier.register(state, cb, + details_filter=details_filter) + registered.append((state, cb)) + except ValueError: + with excutils.save_and_reraise_exception(): + _bulk_deregister(notifier, registered, + details_filter=details_filter) + else: + return registered + + class ListenerBase(object): """Base class for listeners. A listener can be attached to an engine to do various actions on flow and - task state transitions. It implements context manager protocol to be able - to register and unregister with a given engine automatically when a context - is entered and when it is exited. + atom state transitions. It implements the context manager protocol to be + able to register and unregister with a given engine automatically when a + context is entered and when it is exited. To implement a listener, derive from this class and override - ``_flow_receiver`` and/or ``_task_receiver`` methods (in this class, - they do nothing). + ``_flow_receiver`` and/or ``_task_receiver`` and/or ``_retry_receiver`` + methods (in this class, they do nothing). """ def __init__(self, engine, task_listen_for=DEFAULT_LISTEN_FOR, - flow_listen_for=DEFAULT_LISTEN_FOR): + flow_listen_for=DEFAULT_LISTEN_FOR, + retry_listen_for=DEFAULT_LISTEN_FOR): if not task_listen_for: task_listen_for = [] + if not retry_listen_for: + retry_listen_for = [] if not flow_listen_for: flow_listen_for = [] self._listen_for = { 'task': list(task_listen_for), + 'retry': list(retry_listen_for), 'flow': list(flow_listen_for), } self._engine = engine - self._registered = False + self._registered = {} def _flow_receiver(self, state, details): pass @@ -69,43 +117,38 @@ class ListenerBase(object): def _task_receiver(self, state, details): pass + def _retry_receiver(self, state, details): + pass + def deregister(self): - if not self._registered: - return - - def _deregister(watch_states, notifier, cb): - for s in watch_states: - notifier.deregister(s, cb) - - _deregister(self._listen_for['task'], self._engine.task_notifier, - self._task_receiver) - _deregister(self._listen_for['flow'], self._engine.notifier, - self._flow_receiver) - - self._registered = False + if 'task' in self._registered: + _bulk_deregister(self._engine.atom_notifier, + self._registered['task'], + details_filter=_task_matcher) + del self._registered['task'] + if 'retry' in self._registered: + _bulk_deregister(self._engine.atom_notifier, + self._registered['retry'], + details_filter=_retry_matcher) + del self._registered['retry'] + if 'flow' in self._registered: + _bulk_deregister(self._engine.notifier, + self._registered['flow']) + del self._registered['flow'] def register(self): - if self._registered: - return - - def _register(watch_states, notifier, cb): - registered = [] - try: - for s in watch_states: - if not notifier.is_registered(s, cb): - notifier.register(s, cb) - registered.append((s, cb)) - except ValueError: - with excutils.save_and_reraise_exception(): - for (s, cb) in registered: - notifier.deregister(s, cb) - - _register(self._listen_for['task'], self._engine.task_notifier, - self._task_receiver) - _register(self._listen_for['flow'], self._engine.notifier, - self._flow_receiver) - - self._registered = True + if 'task' not in self._registered: + self._registered['task'] = _bulk_register( + self._listen_for['task'], self._engine.atom_notifier, + self._task_receiver, details_filter=_task_matcher) + if 'retry' not in self._registered: + self._registered['retry'] = _bulk_register( + self._listen_for['retry'], self._engine.atom_notifier, + self._retry_receiver, details_filter=_retry_matcher) + if 'flow' not in self._registered: + self._registered['flow'] = _bulk_register( + self._listen_for['flow'], self._engine.notifier, + self._flow_receiver) def __enter__(self): self.register() diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 20ff1baaf..51bf693c5 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -53,11 +53,12 @@ class LoggingListener(base.LoggingBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, log=None, level=logging.DEBUG): - super(LoggingListener, self).__init__(engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + super(LoggingListener, self).__init__( + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) if not log: self._logger = LOG else: @@ -101,12 +102,12 @@ class DynamicLoggingListener(base.ListenerBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, log=None, failure_level=logging.WARNING, level=logging.DEBUG): super(DynamicLoggingListener, self).__init__( - engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) self._failure_level = failure_level self._level = level self._task_log_levels = { diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py index 397914ed4..719d20424 100644 --- a/taskflow/listeners/printing.py +++ b/taskflow/listeners/printing.py @@ -27,10 +27,11 @@ class PrintingListener(base.LoggingBase): def __init__(self, engine, task_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, stderr=False): - super(PrintingListener, self).__init__(engine, - task_listen_for=task_listen_for, - flow_listen_for=flow_listen_for) + super(PrintingListener, self).__init__( + engine, task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) if stderr: self._file = sys.stderr else: