Switches pika driver to eager connection to RabbitMQ

In previous implementation PikaPoller returned by PikaDriver.listen
is not connected to RabbitMQ. Connection is established only when
executor calls first poll method

This behaviour is different in comparison with kombu driver and causes
neutron rpc workers hanging.

I tried to fix it there:
https://review.openstack.org/#/c/270832/

But oslo.service team don't accept this workaround
The main problem is that neutron uses olso libraries in wrong way and
it is necessary to change it in neutron. This patch only mask this
problem

Change-Id: Idd6b692bcf2025fc2a3b37b27f4d1c1b01922fdf
This commit is contained in:
dukhlov
2016-02-03 16:52:08 +02:00
committed by Dmitriy Ukhlov
parent 72bba177f0
commit 1cc3b9c80e
3 changed files with 14 additions and 36 deletions

View File

@@ -101,14 +101,10 @@ class PikaEngine(object):
# processing rpc options # processing rpc options
self.default_rpc_exchange = ( self.default_rpc_exchange = (
conf.oslo_messaging_pika.default_rpc_exchange if conf.oslo_messaging_pika.default_rpc_exchange
conf.oslo_messaging_pika.default_rpc_exchange else
default_exchange
) )
self.rpc_reply_exchange = ( self.rpc_reply_exchange = (
conf.oslo_messaging_pika.rpc_reply_exchange if conf.oslo_messaging_pika.rpc_reply_exchange
conf.oslo_messaging_pika.rpc_reply_exchange else
default_exchange
) )
self.allowed_remote_exmods = [_EXCEPTIONS_MODULE] self.allowed_remote_exmods = [_EXCEPTIONS_MODULE]
@@ -149,9 +145,7 @@ class PikaEngine(object):
# processing notification options # processing notification options
self.default_notification_exchange = ( self.default_notification_exchange = (
conf.oslo_messaging_pika.default_notification_exchange if conf.oslo_messaging_pika.default_notification_exchange
conf.oslo_messaging_pika.default_notification_exchange else
default_exchange
) )
self.notification_persistence = ( self.notification_persistence = (

View File

@@ -77,7 +77,7 @@ class RpcReplyPikaListener(object):
) )
) )
self._reply_poller.start(timeout=expiration_time - time.time()) self._reply_poller.start()
# start reply poller job thread if needed # start reply poller job thread if needed
if self._poller_thread is None: if self._poller_thread is None:

View File

@@ -17,9 +17,9 @@ import time
from oslo_log import log as logging from oslo_log import log as logging
import pika_pool import pika_pool
import retrying
import six import six
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -190,6 +190,8 @@ class PikaPoller(object):
""" """
self._started = True self._started = True
self.reconnect()
def stop(self): def stop(self):
"""Stops poller. Should be called when polling is not needed anymore to """Stops poller. Should be called when polling is not needed anymore to
stop new message consuming. After that it is necessary to poll already stop new message consuming. After that it is necessary to poll already
@@ -207,9 +209,14 @@ class PikaPoller(object):
self._cleanup() self._cleanup()
try: try:
self._reconnect() self._reconnect()
except Exception: except Exception as exc:
self._cleanup() self._cleanup()
raise if isinstance(exc, pika_pool.Connection.connectivity_errors):
raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during establishing "
"poller's connection. " + str(exc))
else:
raise exc
def cleanup(self): def cleanup(self):
"""Safe version of _cleanup. Cleans up allocated resources (channel, """Safe version of _cleanup. Cleans up allocated resources (channel,
@@ -325,29 +332,6 @@ class RpcReplyPikaPoller(PikaPoller):
return {self._queue: False} return {self._queue: False}
def start(self, timeout=None):
"""Overrides default behaviour of start method. Base start method
does not create connection to RabbitMQ during start method (uses
lazy connecting during first poll method call). This class should be
connected after start call to ensure that exchange and queue for reply
delivery are created before RPC request sending
"""
super(RpcReplyPikaPoller, self).start()
def on_exception(ex):
LOG.warn(str(ex))
return True
retrier = retrying.retry(
stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts,
stop_max_delay=None if timeout is None else timeout * 1000,
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
retry_on_exception=on_exception,
)
retrier(self.reconnect)()
class NotificationPikaPoller(PikaPoller): class NotificationPikaPoller(PikaPoller):
"""PikaPoller implementation for polling Notification messages. Overrides """PikaPoller implementation for polling Notification messages. Overrides