From 50b5468109f9fcbeb4791e00725a8b050a0b3d74 Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Sat, 19 Mar 2016 16:18:40 +0200 Subject: [PATCH] Small fixes 1) Typos in log messages 2) Change LOG.warn -> LOG.warning 3) Fix names of varibles 4) Stop logic of RPC reply listener fixed Change-Id: Icf4579f25f32f65428a3e7e6383bf8b1e34a6fa6 --- .../_drivers/pika_driver/pika_listener.py | 9 ++++- .../_drivers/pika_driver/pika_message.py | 2 +- .../_drivers/pika_driver/pika_poller.py | 40 ++++++++++++------- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index 1e52969b7..a58e54236 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -40,6 +40,7 @@ class RpcReplyPikaListener(object): self._reply_consumer_initialized = False self._reply_consumer_initialization_lock = threading.Lock() self._poller_thread = None + self._shutdown = False def get_reply_qname(self, expiration_time=None): """As result return reply queue name, shared for whole process, @@ -75,7 +76,7 @@ class RpcReplyPikaListener(object): ) ) - self._reply_poller.start() + self._reply_poller.start() # start reply poller job thread if needed if self._poller_thread is None: @@ -93,9 +94,11 @@ class RpcReplyPikaListener(object): """Reply polling job. Poll replies in infinite loop and notify registered features """ - while self._reply_poller: + while True: try: messages = self._reply_poller.poll() + if not messages and self._shutdown: + break for message in messages: try: @@ -132,6 +135,8 @@ class RpcReplyPikaListener(object): def cleanup(self): """Stop replies consuming and cleanup resources""" + self._shutdown = True + if self._reply_poller: self._reply_poller.stop() self._reply_poller.cleanup() diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 4ee4e9a12..1f687ff68 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -387,7 +387,7 @@ class PikaOutgoingMessage(object): ) except pika_exceptions.UnroutableError as e: raise pika_drv_exc.RoutingException( - "Can not deliver message:[body:{}, properties: {}] to any" + "Can not deliver message:[body:{}, properties: {}] to any " "queue using target: [exchange:{}, " "routing_key:{}]. {}".format( body, properties, exchange, routing_key, str(e) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index dc3d27912..69f73deb1 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -94,13 +94,13 @@ class PikaPoller(base.Listener): for queue_info in self._queues_to_consume: no_ack = queue_info["no_ack"] - on_message_no_ack_callback = ( + on_message_callback = ( self._on_message_no_ack_callback if no_ack else self._on_message_with_ack_callback ) queue_info["consumer_tag"] = self._channel.basic_consume( - on_message_no_ack_callback, queue_info["queue_name"], + on_message_callback, queue_info["queue_name"], no_ack=no_ack ) except Exception: @@ -208,8 +208,10 @@ class PikaPoller(base.Listener): del self._message_queue[:prefetch_size] return result except pika_drv_exc.EstablishConnectionException as e: - LOG.warn("Problem during establishing connection for" - "pika poller %s", e, exc_info=True) + LOG.warning( + "Problem during establishing connection for pika " + "poller %s", e, exc_info=True + ) time.sleep( self._pika_engine.host_connection_reconnect_delay ) @@ -231,19 +233,25 @@ class PikaPoller(base.Listener): try: self._reconnect() except pika_drv_exc.EstablishConnectionException as exc: - LOG.warn("Can not establishing connection during pika " - "Conecting required during first poll() call. %s", - exc, exc_info=True) + LOG.warning( + "Can not establish connection during pika poller's " + "start(). Connecting is required during first poll() " + "call. %s", exc, exc_info=True + ) except pika_drv_exc.ConnectionException as exc: self._cleanup() - LOG.warn("Connectivity problem during pika poller's start(). " - "Reconnecting required during first poll() call. %s", - exc, exc_info=True) + LOG.warning( + "Connectivity problem during pika poller's start(). " + "Reconnecting required during first poll() call. %s", + exc, exc_info=True + ) except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: self._cleanup() - LOG.warn("Connectivity problem during pika poller's start(). " - "Reconnecting required during first poll() call. %s", - exc, exc_info=True) + LOG.warning( + "Connectivity problem during pika poller's start(). " + "Reconnecting required during first poll() call. %s", + exc, exc_info=True + ) self._started = True def stop(self): @@ -260,8 +268,10 @@ class PikaPoller(base.Listener): self._stop_consuming() except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: self._cleanup() - LOG.warn("Connectivity problem detected during consumer " - "cancellation. %s", exc, exc_info=True) + LOG.warning( + "Connectivity problem detected during consumer " + "cancellation. %s", exc, exc_info=True + ) self._started = False def cleanup(self):