From 7bce31a2d14de29f1fc306938c7085f5205c9110 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Thu, 27 Nov 2014 11:13:50 +0100 Subject: [PATCH] Always use a poll timeout in the executor This change allows to stop the eventlet executor without using eventlet.kill. And also, the kombu docs actually recommend that drain_events by default have a 1 timeout. Co-Authored-by: Joshua Harlow Change-Id: I3a3919e38d08267439843a127346300fd2131540 --- oslo/messaging/_executors/base.py | 4 ++++ oslo/messaging/_executors/impl_blocking.py | 17 ++++++++++++----- oslo/messaging/_executors/impl_eventlet.py | 14 +++++++++----- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py index 8019017b3..095394ffb 100644 --- a/oslo/messaging/_executors/base.py +++ b/oslo/messaging/_executors/base.py @@ -16,6 +16,10 @@ import abc import six +# NOTE(sileht): value choosen according the best practice from kombu +# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop +POLL_TIMEOUT = 1 + @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): diff --git a/oslo/messaging/_executors/impl_blocking.py b/oslo/messaging/_executors/impl_blocking.py index 68c1632e1..d659482f8 100644 --- a/oslo/messaging/_executors/impl_blocking.py +++ b/oslo/messaging/_executors/impl_blocking.py @@ -13,7 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + from oslo.messaging._executors import base +from oslo.messaging._i18n import _ + +LOG = logging.getLogger(__name__) class BlockingExecutor(base.ExecutorBase): @@ -36,11 +41,13 @@ class BlockingExecutor(base.ExecutorBase): def start(self): self._running = True while self._running: - message = self.listener.poll(timeout=0.01) - if not message: - continue - with self.dispatcher(message) as callback: - callback() + try: + incoming = self.listener.poll(timeout=base.POLL_TIMEOUT) + if incoming is not None: + with self.dispatcher(incoming) as callback: + callback() + except Exception: + LOG.exception(_("Unexpected exception occurred.")) def stop(self): self._running = False diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py index ed0fe8645..13eeeb108 100644 --- a/oslo/messaging/_executors/impl_eventlet.py +++ b/oslo/messaging/_executors/impl_eventlet.py @@ -75,6 +75,7 @@ class EventletExecutor(base.ExecutorBase): self.conf.register_opts(_eventlet_opts) self._thread = None self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) + self._running = False def start(self): if self._thread is not None: @@ -83,19 +84,22 @@ class EventletExecutor(base.ExecutorBase): @excutils.forever_retry_uncaught_exceptions def _executor_thread(): try: - while True: - incoming = self.listener.poll() - spawn_with(ctxt=self.dispatcher(incoming), - pool=self._greenpool) + while self._running: + incoming = self.listener.poll(timeout=base.POLL_TIMEOUT) + if incoming is not None: + spawn_with(ctxt=self.dispatcher(incoming), + pool=self._greenpool) except greenlet.GreenletExit: return + self._running = True self._thread = eventlet.spawn(_executor_thread) def stop(self): if self._thread is None: return - self._thread.kill() + self._running = False + self._thread.cancel() def wait(self): if self._thread is None: