Small fixes

1) Typos in log messages
2) Change LOG.warn -> LOG.warning
3) Fix names of varibles
4) Stop logic of RPC reply listener fixed

Change-Id: Icf4579f25f32f65428a3e7e6383bf8b1e34a6fa6
This commit is contained in:
Dmitriy Ukhlov 2016-03-19 16:18:40 +02:00
parent f0d251d19d
commit 50b5468109
3 changed files with 33 additions and 18 deletions

View File

@ -40,6 +40,7 @@ class RpcReplyPikaListener(object):
self._reply_consumer_initialized = False
self._reply_consumer_initialization_lock = threading.Lock()
self._poller_thread = None
self._shutdown = False
def get_reply_qname(self, expiration_time=None):
"""As result return reply queue name, shared for whole process,
@ -75,7 +76,7 @@ class RpcReplyPikaListener(object):
)
)
self._reply_poller.start()
self._reply_poller.start()
# start reply poller job thread if needed
if self._poller_thread is None:
@ -93,9 +94,11 @@ class RpcReplyPikaListener(object):
"""Reply polling job. Poll replies in infinite loop and notify
registered features
"""
while self._reply_poller:
while True:
try:
messages = self._reply_poller.poll()
if not messages and self._shutdown:
break
for message in messages:
try:
@ -132,6 +135,8 @@ class RpcReplyPikaListener(object):
def cleanup(self):
"""Stop replies consuming and cleanup resources"""
self._shutdown = True
if self._reply_poller:
self._reply_poller.stop()
self._reply_poller.cleanup()

View File

@ -387,7 +387,7 @@ class PikaOutgoingMessage(object):
)
except pika_exceptions.UnroutableError as e:
raise pika_drv_exc.RoutingException(
"Can not deliver message:[body:{}, properties: {}] to any"
"Can not deliver message:[body:{}, properties: {}] to any "
"queue using target: [exchange:{}, "
"routing_key:{}]. {}".format(
body, properties, exchange, routing_key, str(e)

View File

@ -94,13 +94,13 @@ class PikaPoller(base.Listener):
for queue_info in self._queues_to_consume:
no_ack = queue_info["no_ack"]
on_message_no_ack_callback = (
on_message_callback = (
self._on_message_no_ack_callback if no_ack
else self._on_message_with_ack_callback
)
queue_info["consumer_tag"] = self._channel.basic_consume(
on_message_no_ack_callback, queue_info["queue_name"],
on_message_callback, queue_info["queue_name"],
no_ack=no_ack
)
except Exception:
@ -208,8 +208,10 @@ class PikaPoller(base.Listener):
del self._message_queue[:prefetch_size]
return result
except pika_drv_exc.EstablishConnectionException as e:
LOG.warn("Problem during establishing connection for"
"pika poller %s", e, exc_info=True)
LOG.warning(
"Problem during establishing connection for pika "
"poller %s", e, exc_info=True
)
time.sleep(
self._pika_engine.host_connection_reconnect_delay
)
@ -231,19 +233,25 @@ class PikaPoller(base.Listener):
try:
self._reconnect()
except pika_drv_exc.EstablishConnectionException as exc:
LOG.warn("Can not establishing connection during pika "
"Conecting required during first poll() call. %s",
exc, exc_info=True)
LOG.warning(
"Can not establish connection during pika poller's "
"start(). Connecting is required during first poll() "
"call. %s", exc, exc_info=True
)
except pika_drv_exc.ConnectionException as exc:
self._cleanup()
LOG.warn("Connectivity problem during pika poller's start(). "
"Reconnecting required during first poll() call. %s",
exc, exc_info=True)
LOG.warning(
"Connectivity problem during pika poller's start(). "
"Reconnecting required during first poll() call. %s",
exc, exc_info=True
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
self._cleanup()
LOG.warn("Connectivity problem during pika poller's start(). "
"Reconnecting required during first poll() call. %s",
exc, exc_info=True)
LOG.warning(
"Connectivity problem during pika poller's start(). "
"Reconnecting required during first poll() call. %s",
exc, exc_info=True
)
self._started = True
def stop(self):
@ -260,8 +268,10 @@ class PikaPoller(base.Listener):
self._stop_consuming()
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
self._cleanup()
LOG.warn("Connectivity problem detected during consumer "
"cancellation. %s", exc, exc_info=True)
LOG.warning(
"Connectivity problem detected during consumer "
"cancellation. %s", exc, exc_info=True
)
self._started = False
def cleanup(self):