From 3b5a0543e97619ca8f8cf98193f6b6375d77cbf2 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Tue, 23 Nov 2021 16:58:05 +0100 Subject: [PATCH] [rabbit] use retry parameters during notification sending The rabbit backend now applies the [oslo_messaging_notifications]retry, [oslo_messaging_rabbit]rabbit_retry_interval, rabbit_retry_backoff and rabbit_interval_max configuration parameters when tries to establish the connection to the message bus during notification sending. This patch also clarifies the differences between the behavior of the kafka and the rabbit drivers in this regard. Closes-Bug: #1917645 Change-Id: Id4ccafc95314c86ae918336e42cca64a6acd4d94 (cherry picked from commit 7b3968d9b012e873a9b393fcefa578c46fca18c6) --- oslo_messaging/_drivers/amqpdriver.py | 7 +++--- oslo_messaging/_drivers/common.py | 4 ++-- oslo_messaging/_drivers/impl_rabbit.py | 11 +++++++-- oslo_messaging/_drivers/pool.py | 10 ++++---- oslo_messaging/notify/messaging.py | 23 ++++++++++++++----- oslo_messaging/tests/drivers/test_pool.py | 4 ++-- oslo_messaging/tests/notify/test_notifier.py | 19 ++++++++++----- ...er-for-notifications-3f7c508ab4437579.yaml | 8 +++++++ 8 files changed, 60 insertions(+), 26 deletions(-) create mode 100644 releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 24fdbc737..991bf46c3 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None): return rpc_common.ConnectionContext(self._connection_pool, - purpose=purpose) + purpose=purpose, + retry=retry) def _get_reply_q(self): with self._reply_q_lock: @@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_common.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn: if notify: exchange = self._get_exchange(target) LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 54c6f7fcb..b6c3adb55 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -392,7 +392,7 @@ class ConnectionContext(Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, purpose): + def __init__(self, connection_pool, purpose, retry): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool @@ -420,7 +420,7 @@ class ConnectionContext(Connection): pooled = purpose == PURPOSE_SEND if pooled: - self.connection = connection_pool.get() + self.connection = connection_pool.get(retry=retry) else: self.connection = connection_pool.create(purpose) self.pooled = pooled diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822d5..d603f89ea 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues @@ -741,7 +742,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 8090e8d26..9e5288d68 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta): self._items.append((ttl_watch, item)) self._cond.notify() - def get(self): + def get(self, retry=None): """Return an item from the pool, when one is available. This may cause the calling thread to block. @@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta): # We've grabbed a slot and dropped the lock, now do the creation try: - return self.create() + return self.create(retry=retry) except Exception: with self._cond: self._current_size -= 1 @@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta): return @abc.abstractmethod - def create(self): + def create(self, retry=None): """Construct a new item.""" @@ -130,9 +130,9 @@ class ConnectionPool(Pool): LOG.debug("Idle connection has expired and been closed." " Pool size: %d" % len(self._items)) - def create(self, purpose=common.PURPOSE_SEND): + def create(self, purpose=common.PURPOSE_SEND, retry=None): LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) + return self.connection_cls(self.conf, self.url, purpose, retry=retry) def empty(self): for item in self.iter_free(): diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index 61c7357c3..da633d892 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging. The messaging drivers publish notification messages to notification listeners. -The driver will block the notifier's thread until the notification message has -been passed to the messaging transport. There is no guarantee that the -notification message will be consumed by a notification listener. +In case of the rabbit backend the driver will block the notifier's thread +until the notification message has been passed to the messaging transport. +There is no guarantee that the notification message will be consumed by a +notification listener. + +In case of the kafka backend the driver will not block the notifier's thread +but return immediately. The driver will try to deliver the message in the +background. Notification messages are sent 'at-most-once' - ensuring that they are not duplicated. If the connection to the messaging service is not active when a notification is -sent this driver will block waiting for the connection to complete. If the -connection fails to complete, the driver will try to re-establish that +sent the rabbit backend will block waiting for the connection to complete. +If the connection fails to complete, the driver will try to re-establish that connection. By default this will continue indefinitely until the connection completes. However, the retry parameter can be used to have the notification -send fail with a MessageDeliveryFailure after the given number of retries. +send fail. In this case an error is logged and the notifier's thread is resumed +without any error. + +If the connection to the messaging service is not active when a notification is +sent the kafka backend will return immediately and the backend tries to +establish the connection and deliver the messages in the background. + """ import logging diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py index d5c642014..82a10e1ba 100644 --- a/oslo_messaging/tests/drivers/test_pool.py +++ b/oslo_messaging/tests/drivers/test_pool.py @@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase): class TestPool(pool.Pool): - def create(self): + def create(self, retry=None): return uuid.uuid4() class ThreadWaitWaiter(object): @@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase): p = self.TestPool(**kwargs) if self.create_error: - def create_error(): + def create_error(retry=None): raise RuntimeError orig_create = p.create self.useFixture(fixtures.MockPatchObject( diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index d0a8eca6e..330bdaba5 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): topics=["test-retry"], retry=2, group="oslo_messaging_notifications") + self.config( + # just to speed up the test execution + rabbit_retry_backoff=0, + group="oslo_messaging_rabbit") transport = oslo_messaging.get_notification_transport( self.conf, url='rabbit://') notifier = oslo_messaging.Notifier(transport) @@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): 'kombu.connection.Connection._establish_connection', new=wrapped_establish_connection ): - # FIXME(gibi) This is bug 1917645 as the driver does not stop - # retrying the connection after two retries only our test fixture - # stops the retry by raising TestingException - self.assertRaises( - self.TestingException, - notifier.info, {}, "test", {}) + with mock.patch( + 'oslo_messaging.notify.messaging.LOG.exception' + ) as mock_log: + notifier.info({}, "test", {}) + + # one normal call plus two retries + self.assertEqual(3, len(calls)) + # the error was caught and logged + mock_log.assert_called_once() def test_notifier_retry_connection_fails_kafka(self): """This test sets a small retry number for notification sending and diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml new file mode 100644 index 000000000..d3d62cb9d --- /dev/null +++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + As a fix for `bug 1917645 `_ the rabbit + backend is changed to use the ``[oslo_messaging_notifications]retry`` + parameter when driver tries to connect to the message bus during + notification sending. Before this fix the rabbit backend retried the + connection forever blocking the caller thread.