[rabbit] use retry parameters during notification sending

The rabbit backend now applies the [oslo_messaging_notifications]retry,
[oslo_messaging_rabbit]rabbit_retry_interval, rabbit_retry_backoff and
rabbit_interval_max configuration parameters when tries to establish the
connection to the message bus during notification sending.

This patch also clarifies the differences between the behavior
of the kafka and the rabbit drivers in this regard.

Closes-Bug: #1917645
Change-Id: Id4ccafc95314c86ae918336e42cca64a6acd4d94
(cherry picked from commit 7b3968d9b0)
This commit is contained in:
Balazs Gibizer 2021-11-23 16:58:05 +01:00
parent 7390034e47
commit 3b5a0543e9
8 changed files with 60 additions and 26 deletions

View File

@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
purpose=purpose)
purpose=purpose,
retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"

View File

@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, connection_pool, purpose):
def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled

View File

@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
def __init__(self, conf, url, purpose):
def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@ -741,7 +742,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
self.connection.ensure_connection(errback=on_error)
self.connection.ensure_connection(
errback=on_error,
max_retries=self.max_retries,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
interval_max=self.interval_max,
)
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()

View File

@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
def get(self):
def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
return self.create()
return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
def create(self):
def create(self, retry=None):
"""Construct a new item."""
@ -130,9 +130,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
def create(self, purpose=common.PURPOSE_SEND):
def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():

View File

@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification
listeners.
The driver will block the notifier's thread until the notification message has
been passed to the messaging transport. There is no guarantee that the
notification message will be consumed by a notification listener.
In case of the rabbit backend the driver will block the notifier's thread
until the notification message has been passed to the messaging transport.
There is no guarantee that the notification message will be consumed by a
notification listener.
In case of the kafka backend the driver will not block the notifier's thread
but return immediately. The driver will try to deliver the message in the
background.
Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated.
If the connection to the messaging service is not active when a notification is
sent this driver will block waiting for the connection to complete. If the
connection fails to complete, the driver will try to re-establish that
sent the rabbit backend will block waiting for the connection to complete.
If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification
send fail with a MessageDeliveryFailure after the given number of retries.
send fail. In this case an error is logged and the notifier's thread is resumed
without any error.
If the connection to the messaging service is not active when a notification is
sent the kafka backend will return immediately and the backend tries to
establish the connection and deliver the messages in the background.
"""
import logging

View File

@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool):
def create(self):
def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs)
if self.create_error:
def create_error():
def create_error(retry=None):
raise RuntimeError
orig_create = p.create
self.useFixture(fixtures.MockPatchObject(

View File

@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
topics=["test-retry"],
retry=2,
group="oslo_messaging_notifications")
self.config(
# just to speed up the test execution
rabbit_retry_backoff=0,
group="oslo_messaging_rabbit")
transport = oslo_messaging.get_notification_transport(
self.conf, url='rabbit://')
notifier = oslo_messaging.Notifier(transport)
@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
'kombu.connection.Connection._establish_connection',
new=wrapped_establish_connection
):
# FIXME(gibi) This is bug 1917645 as the driver does not stop
# retrying the connection after two retries only our test fixture
# stops the retry by raising TestingException
self.assertRaises(
self.TestingException,
notifier.info, {}, "test", {})
with mock.patch(
'oslo_messaging.notify.messaging.LOG.exception'
) as mock_log:
notifier.info({}, "test", {})
# one normal call plus two retries
self.assertEqual(3, len(calls))
# the error was caught and logged
mock_log.assert_called_once()
def test_notifier_retry_connection_fails_kafka(self):
"""This test sets a small retry number for notification sending and

View File

@ -0,0 +1,8 @@
---
fixes:
- |
As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
backend is changed to use the ``[oslo_messaging_notifications]retry``
parameter when driver tries to connect to the message bus during
notification sending. Before this fix the rabbit backend retried the
connection forever blocking the caller thread.