Merge "Update listeners to ensure they correctly handle all atoms"

This commit is contained in:
Jenkins 2014-12-06 02:02:49 +00:00 committed by Gerrit Code Review
commit 361835ddc9
3 changed files with 95 additions and 50 deletions

View File

@ -36,32 +36,80 @@ FINISH_STATES = (states.FAILURE, states.SUCCESS)
DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,) DEFAULT_LISTEN_FOR = (notifier.Notifier.ANY,)
def _task_matcher(details):
"""Matches task details emitted."""
if not details:
return False
if 'task_name' in details and 'task_uuid' in details:
return True
return False
def _retry_matcher(details):
"""Matches retry details emitted."""
if not details:
return False
if 'retry_name' in details and 'retry_uuid' in details:
return True
return False
def _bulk_deregister(notifier, registered, details_filter=None):
"""Bulk deregisters callbacks associated with many states."""
while registered:
state, cb = registered.pop()
notifier.deregister(state, cb,
details_filter=details_filter)
def _bulk_register(watch_states, notifier, cb, details_filter=None):
"""Bulk registers a callback associated with many states."""
registered = []
try:
for state in watch_states:
if not notifier.is_registered(state, cb,
details_filter=details_filter):
notifier.register(state, cb,
details_filter=details_filter)
registered.append((state, cb))
except ValueError:
with excutils.save_and_reraise_exception():
_bulk_deregister(notifier, registered,
details_filter=details_filter)
else:
return registered
class ListenerBase(object): class ListenerBase(object):
"""Base class for listeners. """Base class for listeners.
A listener can be attached to an engine to do various actions on flow and A listener can be attached to an engine to do various actions on flow and
task state transitions. It implements context manager protocol to be able atom state transitions. It implements the context manager protocol to be
to register and unregister with a given engine automatically when a context able to register and unregister with a given engine automatically when a
is entered and when it is exited. context is entered and when it is exited.
To implement a listener, derive from this class and override To implement a listener, derive from this class and override
``_flow_receiver`` and/or ``_task_receiver`` methods (in this class, ``_flow_receiver`` and/or ``_task_receiver`` and/or ``_retry_receiver``
they do nothing). methods (in this class, they do nothing).
""" """
def __init__(self, engine, def __init__(self, engine,
task_listen_for=DEFAULT_LISTEN_FOR, task_listen_for=DEFAULT_LISTEN_FOR,
flow_listen_for=DEFAULT_LISTEN_FOR): flow_listen_for=DEFAULT_LISTEN_FOR,
retry_listen_for=DEFAULT_LISTEN_FOR):
if not task_listen_for: if not task_listen_for:
task_listen_for = [] task_listen_for = []
if not retry_listen_for:
retry_listen_for = []
if not flow_listen_for: if not flow_listen_for:
flow_listen_for = [] flow_listen_for = []
self._listen_for = { self._listen_for = {
'task': list(task_listen_for), 'task': list(task_listen_for),
'retry': list(retry_listen_for),
'flow': list(flow_listen_for), 'flow': list(flow_listen_for),
} }
self._engine = engine self._engine = engine
self._registered = False self._registered = {}
def _flow_receiver(self, state, details): def _flow_receiver(self, state, details):
pass pass
@ -69,43 +117,38 @@ class ListenerBase(object):
def _task_receiver(self, state, details): def _task_receiver(self, state, details):
pass pass
def _retry_receiver(self, state, details):
pass
def deregister(self): def deregister(self):
if not self._registered: if 'task' in self._registered:
return _bulk_deregister(self._engine.atom_notifier,
self._registered['task'],
def _deregister(watch_states, notifier, cb): details_filter=_task_matcher)
for s in watch_states: del self._registered['task']
notifier.deregister(s, cb) if 'retry' in self._registered:
_bulk_deregister(self._engine.atom_notifier,
_deregister(self._listen_for['task'], self._engine.task_notifier, self._registered['retry'],
self._task_receiver) details_filter=_retry_matcher)
_deregister(self._listen_for['flow'], self._engine.notifier, del self._registered['retry']
self._flow_receiver) if 'flow' in self._registered:
_bulk_deregister(self._engine.notifier,
self._registered = False self._registered['flow'])
del self._registered['flow']
def register(self): def register(self):
if self._registered: if 'task' not in self._registered:
return self._registered['task'] = _bulk_register(
self._listen_for['task'], self._engine.atom_notifier,
def _register(watch_states, notifier, cb): self._task_receiver, details_filter=_task_matcher)
registered = [] if 'retry' not in self._registered:
try: self._registered['retry'] = _bulk_register(
for s in watch_states: self._listen_for['retry'], self._engine.atom_notifier,
if not notifier.is_registered(s, cb): self._retry_receiver, details_filter=_retry_matcher)
notifier.register(s, cb) if 'flow' not in self._registered:
registered.append((s, cb)) self._registered['flow'] = _bulk_register(
except ValueError: self._listen_for['flow'], self._engine.notifier,
with excutils.save_and_reraise_exception(): self._flow_receiver)
for (s, cb) in registered:
notifier.deregister(s, cb)
_register(self._listen_for['task'], self._engine.task_notifier,
self._task_receiver)
_register(self._listen_for['flow'], self._engine.notifier,
self._flow_receiver)
self._registered = True
def __enter__(self): def __enter__(self):
self.register() self.register()

View File

@ -53,11 +53,12 @@ class LoggingListener(base.LoggingBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
log=None, log=None,
level=logging.DEBUG): level=logging.DEBUG):
super(LoggingListener, self).__init__(engine, super(LoggingListener, self).__init__(
task_listen_for=task_listen_for, engine, task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for) flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
if not log: if not log:
self._logger = LOG self._logger = LOG
else: else:
@ -101,12 +102,12 @@ class DynamicLoggingListener(base.ListenerBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
log=None, failure_level=logging.WARNING, log=None, failure_level=logging.WARNING,
level=logging.DEBUG): level=logging.DEBUG):
super(DynamicLoggingListener, self).__init__( super(DynamicLoggingListener, self).__init__(
engine, engine, task_listen_for=task_listen_for,
task_listen_for=task_listen_for, flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
flow_listen_for=flow_listen_for)
self._failure_level = failure_level self._failure_level = failure_level
self._level = level self._level = level
self._task_log_levels = { self._task_log_levels = {

View File

@ -27,10 +27,11 @@ class PrintingListener(base.LoggingBase):
def __init__(self, engine, def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR, task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR, flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
stderr=False): stderr=False):
super(PrintingListener, self).__init__(engine, super(PrintingListener, self).__init__(
task_listen_for=task_listen_for, engine, task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for) flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
if stderr: if stderr:
self._file = sys.stderr self._file = sys.stderr
else: else: