Ensure we register & deregister conductor listeners

Instead of just registering engine listeners that were
returned, make sure we also deregister them when the engine
has either finished or failed. This ensures that if a listener
has hold of any resources (or other) that it can clean those
up and be sure that its deregister call will be made.

Change-Id: Ia1420c435156362698702fed2bda11c2a0fef803
This commit is contained in:
Joshua Harlow
2015-03-13 00:03:35 -07:00
parent 384eb9a065
commit a1f9321c3f
3 changed files with 44 additions and 5 deletions

View File

@@ -21,6 +21,7 @@ from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import deprecation
from taskflow.utils import misc
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -88,11 +89,19 @@ class BlockingConductor(base.Conductor):
def dispatching(self):
return not self._dead.is_set()
def _listeners_from_job(self, job, engine):
listeners = super(BlockingConductor, self)._listeners_from_job(job,
engine)
listeners.append(logging_listener.LoggingListener(engine, log=LOG))
return listeners
def _dispatch_job(self, job):
engine = self._engine_from_job(job)
consume = True
with logging_listener.LoggingListener(engine, log=LOG):
listeners = self._listeners_from_job(job, engine)
with misc.ListenerStack(LOG) as stack:
stack.register(listeners)
LOG.debug("Dispatching engine %s for job: %s", engine, job)
consume = True
try:
engine.run()
except excp.WrappedFailure as e:
@@ -117,7 +126,7 @@ class BlockingConductor(base.Conductor):
job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
return async_utils.make_completed_future(consume)
return async_utils.make_completed_future(consume)
def run(self):
self._dead.clear()

View File

@@ -92,8 +92,6 @@ class Conductor(object):
engine=self._engine,
backend=self._persistence,
**self._engine_options)
for listener in self._listeners_from_job(job, engine):
listener.register()
return engine
def _listeners_from_job(self, job, engine):

View File

@@ -436,6 +436,38 @@ def get_duplicate_keys(iterable, key=None):
return duplicates
class ListenerStack(object):
"""Listeners that are deregistered on context manager exit.
TODO(harlowja): replace this with ``contextlib.ExitStack`` or equivalent
in the future (that code is in python3.2+ and in a few backports that
provide nearly equivalent functionality). When/if
https://review.openstack.org/#/c/164222/ merges we should be able to
remove this since listeners are already context managers.
"""
def __init__(self, log):
self._registered = []
self._log = log
def register(self, listeners):
for listener in listeners:
listener.register()
self._registered.append(listener)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
while self._registered:
listener = self._registered.pop()
try:
listener.deregister()
except Exception:
self._log.warn("Failed deregistering listener '%s'",
listener, exc_info=True)
class ExponentialBackoff(object):
"""An iterable object that will yield back an exponential delay sequence.