Merge "Small fixes"

This commit is contained in:
Jenkins 2016-03-21 19:09:44 +00:00 committed by Gerrit Code Review
commit 39910533fc
3 changed files with 33 additions and 18 deletions

View File

@ -40,6 +40,7 @@ class RpcReplyPikaListener(object):
self._reply_consumer_initialized = False self._reply_consumer_initialized = False
self._reply_consumer_initialization_lock = threading.Lock() self._reply_consumer_initialization_lock = threading.Lock()
self._poller_thread = None self._poller_thread = None
self._shutdown = False
def get_reply_qname(self, expiration_time=None): def get_reply_qname(self, expiration_time=None):
"""As result return reply queue name, shared for whole process, """As result return reply queue name, shared for whole process,
@ -75,7 +76,7 @@ class RpcReplyPikaListener(object):
) )
) )
self._reply_poller.start() self._reply_poller.start()
# start reply poller job thread if needed # start reply poller job thread if needed
if self._poller_thread is None: if self._poller_thread is None:
@ -93,9 +94,11 @@ class RpcReplyPikaListener(object):
"""Reply polling job. Poll replies in infinite loop and notify """Reply polling job. Poll replies in infinite loop and notify
registered features registered features
""" """
while self._reply_poller: while True:
try: try:
messages = self._reply_poller.poll() messages = self._reply_poller.poll()
if not messages and self._shutdown:
break
for message in messages: for message in messages:
try: try:
@ -132,6 +135,8 @@ class RpcReplyPikaListener(object):
def cleanup(self): def cleanup(self):
"""Stop replies consuming and cleanup resources""" """Stop replies consuming and cleanup resources"""
self._shutdown = True
if self._reply_poller: if self._reply_poller:
self._reply_poller.stop() self._reply_poller.stop()
self._reply_poller.cleanup() self._reply_poller.cleanup()

View File

@ -387,7 +387,7 @@ class PikaOutgoingMessage(object):
) )
except pika_exceptions.UnroutableError as e: except pika_exceptions.UnroutableError as e:
raise pika_drv_exc.RoutingException( raise pika_drv_exc.RoutingException(
"Can not deliver message:[body:{}, properties: {}] to any" "Can not deliver message:[body:{}, properties: {}] to any "
"queue using target: [exchange:{}, " "queue using target: [exchange:{}, "
"routing_key:{}]. {}".format( "routing_key:{}]. {}".format(
body, properties, exchange, routing_key, str(e) body, properties, exchange, routing_key, str(e)

View File

@ -94,13 +94,13 @@ class PikaPoller(base.Listener):
for queue_info in self._queues_to_consume: for queue_info in self._queues_to_consume:
no_ack = queue_info["no_ack"] no_ack = queue_info["no_ack"]
on_message_no_ack_callback = ( on_message_callback = (
self._on_message_no_ack_callback if no_ack self._on_message_no_ack_callback if no_ack
else self._on_message_with_ack_callback else self._on_message_with_ack_callback
) )
queue_info["consumer_tag"] = self._channel.basic_consume( queue_info["consumer_tag"] = self._channel.basic_consume(
on_message_no_ack_callback, queue_info["queue_name"], on_message_callback, queue_info["queue_name"],
no_ack=no_ack no_ack=no_ack
) )
except Exception: except Exception:
@ -208,8 +208,10 @@ class PikaPoller(base.Listener):
del self._message_queue[:prefetch_size] del self._message_queue[:prefetch_size]
return result return result
except pika_drv_exc.EstablishConnectionException as e: except pika_drv_exc.EstablishConnectionException as e:
LOG.warn("Problem during establishing connection for" LOG.warning(
"pika poller %s", e, exc_info=True) "Problem during establishing connection for pika "
"poller %s", e, exc_info=True
)
time.sleep( time.sleep(
self._pika_engine.host_connection_reconnect_delay self._pika_engine.host_connection_reconnect_delay
) )
@ -231,19 +233,25 @@ class PikaPoller(base.Listener):
try: try:
self._reconnect() self._reconnect()
except pika_drv_exc.EstablishConnectionException as exc: except pika_drv_exc.EstablishConnectionException as exc:
LOG.warn("Can not establishing connection during pika " LOG.warning(
"Conecting required during first poll() call. %s", "Can not establish connection during pika poller's "
exc, exc_info=True) "start(). Connecting is required during first poll() "
"call. %s", exc, exc_info=True
)
except pika_drv_exc.ConnectionException as exc: except pika_drv_exc.ConnectionException as exc:
self._cleanup() self._cleanup()
LOG.warn("Connectivity problem during pika poller's start(). " LOG.warning(
"Reconnecting required during first poll() call. %s", "Connectivity problem during pika poller's start(). "
exc, exc_info=True) "Reconnecting required during first poll() call. %s",
exc, exc_info=True
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
self._cleanup() self._cleanup()
LOG.warn("Connectivity problem during pika poller's start(). " LOG.warning(
"Reconnecting required during first poll() call. %s", "Connectivity problem during pika poller's start(). "
exc, exc_info=True) "Reconnecting required during first poll() call. %s",
exc, exc_info=True
)
self._started = True self._started = True
def stop(self): def stop(self):
@ -260,8 +268,10 @@ class PikaPoller(base.Listener):
self._stop_consuming() self._stop_consuming()
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
self._cleanup() self._cleanup()
LOG.warn("Connectivity problem detected during consumer " LOG.warning(
"cancellation. %s", exc, exc_info=True) "Connectivity problem detected during consumer "
"cancellation. %s", exc, exc_info=True
)
self._started = False self._started = False
def cleanup(self): def cleanup(self):