Update listeners to ensure they correctly handle all atoms

Instead of looking for keys that are task specific (as well
as using the deprecated 'task_notifier') we need to update the
listeners to be agnostic to atoms (retry or task) that are
sent to them so that key errors do not occur when extracting
any data sent along with the event notification.

Fixes bug 1395966

Change-Id: Ib61b34b83203f5999f92b6e8616efd90cb259f81
This commit is contained in:
Joshua Harlow 2014-11-24 19:44:52 -08:00
parent cf45a70459
commit b8e975e885
3 changed files with 95 additions and 50 deletions

View File

@ -36,32 +36,80 @@ FINISH_STATES = (states.FAILURE, states.SUCCESS)
DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,) DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,)
def _task_matcher(details):
"""Matches task details emitted."""
if not details:
return False
if 'task_name' in details and 'task_uuid' in details:
return True
return False
def _retry_matcher(details):
"""Matches retry details emitted."""
if not details:
return False
if 'retry_name' in details and 'retry_uuid' in details:
return True
return False
def _bulk_deregister(notifier, registered, details_filter=None):
"""Bulk deregisters callbacks associated with many states."""
while registered:
state, cb = registered.pop()
notifier.deregister(state, cb,
details_filter=details_filter)
def _bulk_register(watch_states, notifier, cb, details_filter=None):
"""Bulk registers a callback associated with many states."""
registered = []
try:
for state in watch_states:
if not notifier.is_registered(state, cb,
details_filter=details_filter):
notifier.register(state, cb,
details_filter=details_filter)
registered.append((state, cb))
except ValueError:
with excutils.save_and_reraise_exception():
_bulk_deregister(notifier, registered,
details_filter=details_filter)
else:
return registered
class ListenerBase(object): class ListenerBase(object):
"""Base class for listeners. """Base class for listeners.
A listener can be attached to an engine to do various actions on flow and A listener can be attached to an engine to do various actions on flow and
task state transitions. It implements context manager protocol to be able atom state transitions. It implements the context manager protocol to be
to register and unregister with a given engine automatically when a context able to register and unregister with a given engine automatically when a
is entered and when it is exited. context is entered and when it is exited.
To implement a listener, derive from this class and override To implement a listener, derive from this class and override
``_flow_receiver`` and/or ``_task_receiver`` methods (in this class, ``_flow_receiver`` and/or ``_task_receiver`` and/or ``_retry_receiver``
they do nothing). methods (in this class, they do nothing).
""" """
def __init__(self, engine, def __init__(self, engine,
task_listen_for=DEFAULT_LISTEN_FOR, task_listen_for=DEFAULT_LISTEN_FOR,
flow_listen_for=DEFAULT_LISTEN_FOR): flow_listen_for=DEFAULT_LISTEN_FOR,
retry_listen_for=DEFAULT_LISTEN_FOR):
if not task_listen_for: if not task_listen_for:
task_listen_for = [] task_listen_for = []
if not retry_listen_for:
retry_listen_for = []
if not flow_listen_for: if not flow_listen_for:
flow_listen_for = [] flow_listen_for = []
self._listen_for = { self._listen_for = {
'task': list(task_listen_for), 'task': list(task_listen_for),
'retry': list(retry_listen_for),
'flow': list(flow_listen_for), 'flow': list(flow_listen_for),
} }
self._engine = engine self._engine = engine
self._registered = False self._registered = {}
def _flow_receiver(self, state, details): def _flow_receiver(self, state, details):
pass pass
@ -69,44 +117,39 @@ class ListenerBase(object):
def _task_receiver(self, state, details): def _task_receiver(self, state, details):
pass pass
def _retry_receiver(self, state, details):
pass
def deregister(self): def deregister(self):
if not self._registered: if 'task' in self._registered:
return _bulk_deregister(self._engine.atom_notifier,
self._registered['task'],
def _deregister(watch_states, notifier, cb): details_filter=_task_matcher)
for s in watch_states: del self._registered['task']
notifier.deregister(s, cb) if 'retry' in self._registered:
_bulk_deregister(self._engine.atom_notifier,
_deregister(self._listen_for['task'], self._engine.task_notifier, self._registered['retry'],
self._task_receiver) details_filter=_retry_matcher)
_deregister(self._listen_for['flow'], self._engine.notifier, del self._registered['retry']
self._flow_receiver) if 'flow' in self._registered:
_bulk_deregister(self._engine.notifier,
self._registered = False self._registered['flow'])
del self._registered['flow']
def register(self): def register(self):
if self._registered: if 'task' not in self._registered:
return self._registered['task'] = _bulk_register(
self._listen_for['task'], self._engine.atom_notifier,
def _register(watch_states, notifier, cb): self._task_receiver, details_filter=_task_matcher)
registered = [] if 'retry' not in self._registered:
try: self._registered['retry'] = _bulk_register(
for s in watch_states: self._listen_for['retry'], self._engine.atom_notifier,
if not notifier.is_registered(s, cb): self._retry_receiver, details_filter=_retry_matcher)
notifier.register(s, cb) if 'flow' not in self._registered:
registered.append((s, cb)) self._registered['flow'] = _bulk_register(
except ValueError: self._listen_for['flow'], self._engine.notifier,
with excutils.save_and_reraise_exception():
for (s, cb) in registered:
notifier.deregister(s, cb)
_register(self._listen_for['task'], self._engine.task_notifier,
self._task_receiver)
_register(self._listen_for['flow'], self._engine.notifier,
self._flow_receiver) self._flow_receiver)
self._registered = True
def __enter__(self): def __enter__(self):
self.register() self.register()

View File

@ -53,11 +53,12 @@ class LoggingListener(base.LoggingBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
log=None, log=None,
level=logging.DEBUG): level=logging.DEBUG):
super(LoggingListener, self).__init__(engine, super(LoggingListener, self).__init__(
task_listen_for=task_listen_for, engine, task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for) flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
if not log: if not log:
self._logger = LOG self._logger = LOG
else: else:
@ -101,12 +102,12 @@ class DynamicLoggingListener(base.ListenerBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
log=None, failure_level=logging.WARNING, log=None, failure_level=logging.WARNING,
level=logging.DEBUG): level=logging.DEBUG):
super(DynamicLoggingListener, self).__init__( super(DynamicLoggingListener, self).__init__(
engine, engine, task_listen_for=task_listen_for,
task_listen_for=task_listen_for, flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
flow_listen_for=flow_listen_for)
self._failure_level = failure_level self._failure_level = failure_level
self._level = level self._level = level
self._task_log_levels = { self._task_log_levels = {

View File

@ -27,10 +27,11 @@ class PrintingListener(base.LoggingBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
stderr=False): stderr=False):
super(PrintingListener, self).__init__(engine, super(PrintingListener, self).__init__(
task_listen_for=task_listen_for, engine, task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for) flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
if stderr: if stderr:
self._file = sys.stderr self._file = sys.stderr
else: else: