diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 41129e7c2..6e1451807 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -17,7 +17,6 @@ __all__ = ['AMQPDriverBase'] import logging import threading -import time import uuid from six import moves @@ -94,6 +93,7 @@ class AMQPListener(base.Listener): self.conn = conn self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] + self._stopped = threading.Event() def __call__(self, message): # FIXME(markmc): logging isn't driver specific @@ -110,23 +110,17 @@ class AMQPListener(base.Listener): ctxt.reply_q)) def poll(self, timeout=None): - if timeout is not None: - deadline = time.time() + timeout - else: - deadline = None - while True: + while not self._stopped.is_set(): if self.incoming: return self.incoming.pop(0) - if deadline is not None: - timeout = deadline - time.time() - if timeout < 0: - return None - try: - self.conn.consume(limit=1, timeout=timeout) - except rpc_common.Timeout: - return None - else: - self.conn.consume(limit=1) + try: + self.conn.consume(limit=1, timeout=timeout) + except rpc_common.Timeout: + return None + + def stop(self): + self._stopped.set() + self.conn.stop_consuming() def cleanup(self): # Closes listener connection diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index ec24460f5..ffaebe206 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -56,9 +56,15 @@ class Listener(object): def poll(self, timeout=None): """Blocking until a message is pending and return IncomingMessage. Return None after timeout seconds if timeout is set and no message is - ending. + ending or if the listener have been stopped. """ + def stop(self): + """Stop listener. + Stop the listener message polling + """ + pass + def cleanup(self): """Cleanup listener. Close connection used by listener if any. For some listeners like diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 457ef0a22..1a7e87a7b 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -46,6 +46,7 @@ class FakeListener(base.Listener): self._exchange_manager = exchange_manager self._targets = targets self._pool = pool + self._stopped = threading.Event() # NOTE(sileht): Ensure that all needed queues exists even the listener # have not been polled yet @@ -58,7 +59,7 @@ class FakeListener(base.Listener): deadline = time.time() + timeout else: deadline = None - while True: + while not self._stopped.is_set(): for target in self._targets: exchange = self._exchange_manager.get_exchange(target.exchange) (ctxt, message, reply_q, requeue) = exchange.poll(target, @@ -77,6 +78,9 @@ class FakeListener(base.Listener): time.sleep(pause) return None + def stop(self): + self._stopped.set() + class FakeExchange(object): diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 43c7f57e3..8c9b86d8c 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -460,6 +460,8 @@ class Connection(object): self.consumers = {} self.conf = conf + self._consume_loop_stopped = False + self.brokers_params = [] if url.hosts: for host in url.hosts: @@ -651,8 +653,16 @@ class Connection(object): LOG.exception(_('Failed to consume message from queue: %s'), exc) def _consume(): + # NOTE(sileht): + # maximun value choosen according the best practice from kombu: + # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop poll_timeout = 1 if timeout is None else min(timeout, 1) + while True: + if self._consume_loop_stopped: + self._consume_loop_stopped = False + raise StopIteration + try: nxt_receiver = self.session.next_receiver( timeout=poll_timeout) @@ -745,6 +755,9 @@ class Connection(object): except StopIteration: return + def stop_consuming(self): + self._consume_loop_stopped = True + class QpidDriver(amqpdriver.AMQPDriverBase): diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 05219ffcd..4d9d7c893 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -497,6 +497,7 @@ class Connection(object): self._initial_pid = os.getpid() self.do_consume = True + self._consume_loop_stopped = False self.channel = None self.connection = kombu.connection.Connection( @@ -715,8 +716,15 @@ class Connection(object): queues_tail.consume(nowait=False) self.do_consume = False + # NOTE(sileht): + # maximun value choosen according the best practice from kombu: + # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop poll_timeout = 1 if timeout is None else min(timeout, 1) while True: + if self._consume_loop_stopped: + self._consume_loop_stopped = False + raise StopIteration + try: return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: @@ -790,6 +798,9 @@ class Connection(object): except StopIteration: return + def stop_consuming(self): + self._consume_loop_stopped = True + class RabbitDriver(amqpdriver.AMQPDriverBase): diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py index 095394ffb..8019017b3 100644 --- a/oslo/messaging/_executors/base.py +++ b/oslo/messaging/_executors/base.py @@ -16,10 +16,6 @@ import abc import six -# NOTE(sileht): value choosen according the best practice from kombu -# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop -POLL_TIMEOUT = 1 - @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): diff --git a/oslo/messaging/_executors/impl_blocking.py b/oslo/messaging/_executors/impl_blocking.py index d659482f8..1b039b991 100644 --- a/oslo/messaging/_executors/impl_blocking.py +++ b/oslo/messaging/_executors/impl_blocking.py @@ -42,7 +42,7 @@ class BlockingExecutor(base.ExecutorBase): self._running = True while self._running: try: - incoming = self.listener.poll(timeout=base.POLL_TIMEOUT) + incoming = self.listener.poll() if incoming is not None: with self.dispatcher(incoming) as callback: callback() @@ -51,6 +51,7 @@ class BlockingExecutor(base.ExecutorBase): def stop(self): self._running = False + self.listener.stop() def wait(self): pass diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py index 13eeeb108..c3391675a 100644 --- a/oslo/messaging/_executors/impl_eventlet.py +++ b/oslo/messaging/_executors/impl_eventlet.py @@ -85,7 +85,7 @@ class EventletExecutor(base.ExecutorBase): def _executor_thread(): try: while self._running: - incoming = self.listener.poll(timeout=base.POLL_TIMEOUT) + incoming = self.listener.poll() if incoming is not None: spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool) @@ -99,6 +99,7 @@ class EventletExecutor(base.ExecutorBase): if self._thread is None: return self._running = False + self.listener.stop() self._thread.cancel() def wait(self): diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py index ef247c288..445e2283b 100644 --- a/tests/executors/test_executor.py +++ b/tests/executors/test_executor.py @@ -39,12 +39,10 @@ class TestExecutor(test_utils.BaseTestCase): @classmethod def generate_scenarios(cls): - impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor, - stop_before_return=True))] + impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))] if impl_eventlet is not None: impl.append( - ('eventlet', dict(executor=impl_eventlet.EventletExecutor, - stop_before_return=False))) + ('eventlet', dict(executor=impl_eventlet.EventletExecutor))) cls.scenarios = testscenarios.multiply_scenarios(impl) @staticmethod @@ -72,13 +70,9 @@ class TestExecutor(test_utils.BaseTestCase): message={'payload': 'data'}) def fake_poll(timeout=None): - if self.stop_before_return: - executor.stop() + if listener.poll.call_count == 1: return incoming_message - else: - if listener.poll.call_count == 1: - return incoming_message - executor.stop() + executor.stop() listener.poll.side_effect = fake_poll