diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 24fdbc737..991bf46c3 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None): return rpc_common.ConnectionContext(self._connection_pool, - purpose=purpose) + purpose=purpose, + retry=retry) def _get_reply_q(self): with self._reply_q_lock: @@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_common.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn: if notify: exchange = self._get_exchange(target) LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 54c6f7fcb..b6c3adb55 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -392,7 +392,7 @@ class ConnectionContext(Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, purpose): + def __init__(self, connection_pool, purpose, retry): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool @@ -420,7 +420,7 @@ class ConnectionContext(Connection): pooled = purpose == PURPOSE_SEND if pooled: - self.connection = connection_pool.get() + self.connection = connection_pool.get(retry=retry) else: self.connection = connection_pool.create(purpose) self.pooled = pooled diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 20e30adb1..3ba541876 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -452,13 +452,14 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues @@ -728,7 +729,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 8090e8d26..9e5288d68 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta): self._items.append((ttl_watch, item)) self._cond.notify() - def get(self): + def get(self, retry=None): """Return an item from the pool, when one is available. This may cause the calling thread to block. @@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta): # We've grabbed a slot and dropped the lock, now do the creation try: - return self.create() + return self.create(retry=retry) except Exception: with self._cond: self._current_size -= 1 @@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta): return @abc.abstractmethod - def create(self): + def create(self, retry=None): """Construct a new item.""" @@ -130,9 +130,9 @@ class ConnectionPool(Pool): LOG.debug("Idle connection has expired and been closed." " Pool size: %d" % len(self._items)) - def create(self, purpose=common.PURPOSE_SEND): + def create(self, purpose=common.PURPOSE_SEND, retry=None): LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) + return self.connection_cls(self.conf, self.url, purpose, retry=retry) def empty(self): for item in self.iter_free(): diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index 61c7357c3..da633d892 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging. The messaging drivers publish notification messages to notification listeners. -The driver will block the notifier's thread until the notification message has -been passed to the messaging transport. There is no guarantee that the -notification message will be consumed by a notification listener. +In case of the rabbit backend the driver will block the notifier's thread +until the notification message has been passed to the messaging transport. +There is no guarantee that the notification message will be consumed by a +notification listener. + +In case of the kafka backend the driver will not block the notifier's thread +but return immediately. The driver will try to deliver the message in the +background. Notification messages are sent 'at-most-once' - ensuring that they are not duplicated. If the connection to the messaging service is not active when a notification is -sent this driver will block waiting for the connection to complete. If the -connection fails to complete, the driver will try to re-establish that +sent the rabbit backend will block waiting for the connection to complete. +If the connection fails to complete, the driver will try to re-establish that connection. By default this will continue indefinitely until the connection completes. However, the retry parameter can be used to have the notification -send fail with a MessageDeliveryFailure after the given number of retries. +send fail. In this case an error is logged and the notifier's thread is resumed +without any error. + +If the connection to the messaging service is not active when a notification is +sent the kafka backend will return immediately and the backend tries to +establish the connection and deliver the messages in the background. + """ import logging diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py index d5c642014..82a10e1ba 100644 --- a/oslo_messaging/tests/drivers/test_pool.py +++ b/oslo_messaging/tests/drivers/test_pool.py @@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase): class TestPool(pool.Pool): - def create(self): + def create(self, retry=None): return uuid.uuid4() class ThreadWaitWaiter(object): @@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase): p = self.TestPool(**kwargs) if self.create_error: - def create_error(): + def create_error(retry=None): raise RuntimeError orig_create = p.create self.useFixture(fixtures.MockPatchObject( diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index d0a8eca6e..330bdaba5 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): topics=["test-retry"], retry=2, group="oslo_messaging_notifications") + self.config( + # just to speed up the test execution + rabbit_retry_backoff=0, + group="oslo_messaging_rabbit") transport = oslo_messaging.get_notification_transport( self.conf, url='rabbit://') notifier = oslo_messaging.Notifier(transport) @@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): 'kombu.connection.Connection._establish_connection', new=wrapped_establish_connection ): - # FIXME(gibi) This is bug 1917645 as the driver does not stop - # retrying the connection after two retries only our test fixture - # stops the retry by raising TestingException - self.assertRaises( - self.TestingException, - notifier.info, {}, "test", {}) + with mock.patch( + 'oslo_messaging.notify.messaging.LOG.exception' + ) as mock_log: + notifier.info({}, "test", {}) + + # one normal call plus two retries + self.assertEqual(3, len(calls)) + # the error was caught and logged + mock_log.assert_called_once() def test_notifier_retry_connection_fails_kafka(self): """This test sets a small retry number for notification sending and diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml new file mode 100644 index 000000000..d3d62cb9d --- /dev/null +++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + As a fix for `bug 1917645 `_ the rabbit + backend is changed to use the ``[oslo_messaging_notifications]retry`` + parameter when driver tries to connect to the message bus during + notification sending. Before this fix the rabbit backend retried the + connection forever blocking the caller thread.