diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index c36a4324b..d0a8eca6e 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -18,6 +18,7 @@ import sys import uuid import fixtures +from kombu import connection from oslo_serialization import jsonutils from oslo_utils import strutils from oslo_utils import timeutils @@ -228,6 +229,73 @@ class TestMessagingNotifier(test_utils.BaseTestCase): TestMessagingNotifier.generate_scenarios() +class TestMessagingNotifierRetry(test_utils.BaseTestCase): + + class TestingException(BaseException): + pass + + def test_notifier_retry_connection_fails_rabbit(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group="oslo_messaging_notifications") + transport = oslo_messaging.get_notification_transport( + self.conf, url='rabbit://') + notifier = oslo_messaging.Notifier(transport) + + orig_establish_connection = connection.Connection._establish_connection + calls = [] + + def wrapped_establish_connection(*args, **kwargs): + if len(calls) > 2: + raise self.TestingException( + "Connection should only be retried twice due to " + "configuration") + else: + calls.append((args, kwargs)) + orig_establish_connection(*args, **kwargs) + + with mock.patch( + 'kombu.connection.Connection._establish_connection', + new=wrapped_establish_connection + ): + # FIXME(gibi) This is bug 1917645 as the driver does not stop + # retrying the connection after two retries only our test fixture + # stops the retry by raising TestingException + self.assertRaises( + self.TestingException, + notifier.info, {}, "test", {}) + + def test_notifier_retry_connection_fails_kafka(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group='oslo_messaging_notifications') + + transport = oslo_messaging.get_notification_transport( + self.conf, url='kafka://') + + notifier = oslo_messaging.Notifier(transport) + + # Kafka's message producer interface is async, and there is no way + # from the oslo interface to force sending a pending message. So this + # call simply returns without i) failing to deliver the message to + # the non existent kafka bus ii) retrying the message delivery twice + # as the configuration requested it. + notifier.info({}, "test", {}) + + class TestSerializer(test_utils.BaseTestCase): def setUp(self):