diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index be1db685..fe907f59 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -101,14 +101,10 @@ class PikaEngine(object): # processing rpc options self.default_rpc_exchange = ( - conf.oslo_messaging_pika.default_rpc_exchange if - conf.oslo_messaging_pika.default_rpc_exchange else - default_exchange + conf.oslo_messaging_pika.default_rpc_exchange ) self.rpc_reply_exchange = ( - conf.oslo_messaging_pika.rpc_reply_exchange if - conf.oslo_messaging_pika.rpc_reply_exchange else - default_exchange + conf.oslo_messaging_pika.rpc_reply_exchange ) self.allowed_remote_exmods = [_EXCEPTIONS_MODULE] @@ -149,9 +145,7 @@ class PikaEngine(object): # processing notification options self.default_notification_exchange = ( - conf.oslo_messaging_pika.default_notification_exchange if - conf.oslo_messaging_pika.default_notification_exchange else - default_exchange + conf.oslo_messaging_pika.default_notification_exchange ) self.notification_persistence = ( diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index 2c33168e..54ede123 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -77,7 +77,7 @@ class RpcReplyPikaListener(object): ) ) - self._reply_poller.start(timeout=expiration_time - time.time()) + self._reply_poller.start() # start reply poller job thread if needed if self._poller_thread is None: diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 4edfa0d5..838bf4a4 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -17,9 +17,9 @@ import time from oslo_log import log as logging import pika_pool -import retrying import six +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg LOG = logging.getLogger(__name__) @@ -190,6 +190,8 @@ class PikaPoller(object): """ self._started = True + self.reconnect() + def stop(self): """Stops poller. Should be called when polling is not needed anymore to stop new message consuming. After that it is necessary to poll already @@ -207,9 +209,14 @@ class PikaPoller(object): self._cleanup() try: self._reconnect() - except Exception: + except Exception as exc: self._cleanup() - raise + if isinstance(exc, pika_pool.Connection.connectivity_errors): + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during establishing " + "poller's connection. " + str(exc)) + else: + raise exc def cleanup(self): """Safe version of _cleanup. Cleans up allocated resources (channel, @@ -325,29 +332,6 @@ class RpcReplyPikaPoller(PikaPoller): return {self._queue: False} - def start(self, timeout=None): - """Overrides default behaviour of start method. Base start method - does not create connection to RabbitMQ during start method (uses - lazy connecting during first poll method call). This class should be - connected after start call to ensure that exchange and queue for reply - delivery are created before RPC request sending - """ - super(RpcReplyPikaPoller, self).start() - - def on_exception(ex): - LOG.warn(str(ex)) - - return True - - retrier = retrying.retry( - stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts, - stop_max_delay=None if timeout is None else timeout * 1000, - wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, - retry_on_exception=on_exception, - ) - - retrier(self.reconnect)() - class NotificationPikaPoller(PikaPoller): """PikaPoller implementation for polling Notification messages. Overrides