From a1f9321c3fda9af6ed4a240091f9c0c4917dc75d Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 13 Mar 2015 00:03:35 -0700 Subject: [PATCH] Ensure we register & deregister conductor listeners Instead of just registering engine listeners that were returned, make sure we also deregister them when the engine has either finished or failed. This ensures that if a listener has hold of any resources (or other) that it can clean those up and be sure that its deregister call will be made. Change-Id: Ia1420c435156362698702fed2bda11c2a0fef803 --- taskflow/conductors/backends/impl_blocking.py | 15 +++++++-- taskflow/conductors/base.py | 2 -- taskflow/utils/misc.py | 32 +++++++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 1f6a9ee61..b53452af1 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -21,6 +21,7 @@ from taskflow import logging from taskflow.types import timing as tt from taskflow.utils import async_utils from taskflow.utils import deprecation +from taskflow.utils import misc from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -88,11 +89,19 @@ class BlockingConductor(base.Conductor): def dispatching(self): return not self._dead.is_set() + def _listeners_from_job(self, job, engine): + listeners = super(BlockingConductor, self)._listeners_from_job(job, + engine) + listeners.append(logging_listener.LoggingListener(engine, log=LOG)) + return listeners + def _dispatch_job(self, job): engine = self._engine_from_job(job) - consume = True - with logging_listener.LoggingListener(engine, log=LOG): + listeners = self._listeners_from_job(job, engine) + with misc.ListenerStack(LOG) as stack: + stack.register(listeners) LOG.debug("Dispatching engine %s for job: %s", engine, job) + consume = True try: engine.run() except excp.WrappedFailure as e: @@ -117,7 +126,7 @@ class BlockingConductor(base.Conductor): job, exc_info=True) else: LOG.info("Job completed successfully: %s", job) - return async_utils.make_completed_future(consume) + return async_utils.make_completed_future(consume) def run(self): self._dead.clear() diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 33c4441d5..7a6b8ce89 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -92,8 +92,6 @@ class Conductor(object): engine=self._engine, backend=self._persistence, **self._engine_options) - for listener in self._listeners_from_job(job, engine): - listener.register() return engine def _listeners_from_job(self, job, engine): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 57a28ea5f..5b1cb209c 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -436,6 +436,38 @@ def get_duplicate_keys(iterable, key=None): return duplicates +class ListenerStack(object): + """Listeners that are deregistered on context manager exit. + + TODO(harlowja): replace this with ``contextlib.ExitStack`` or equivalent + in the future (that code is in python3.2+ and in a few backports that + provide nearly equivalent functionality). When/if + https://review.openstack.org/#/c/164222/ merges we should be able to + remove this since listeners are already context managers. + """ + + def __init__(self, log): + self._registered = [] + self._log = log + + def register(self, listeners): + for listener in listeners: + listener.register() + self._registered.append(listener) + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + while self._registered: + listener = self._registered.pop() + try: + listener.deregister() + except Exception: + self._log.warn("Failed deregistering listener '%s'", + listener, exc_info=True) + + class ExponentialBackoff(object): """An iterable object that will yield back an exponential delay sequence.