Merge "Reproduce bug 1917645"

This commit is contained in:
Zuul 2021-12-21 12:27:45 +00:00 committed by Gerrit Code Review
commit 5eeccdd425

View File

@ -18,6 +18,7 @@ import sys
import uuid
import fixtures
from kombu import connection
from oslo_serialization import jsonutils
from oslo_utils import strutils
from oslo_utils import timeutils
@ -228,6 +229,73 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
TestMessagingNotifier.generate_scenarios()
class TestMessagingNotifierRetry(test_utils.BaseTestCase):
class TestingException(BaseException):
pass
def test_notifier_retry_connection_fails_rabbit(self):
"""This test sets a small retry number for notification sending and
configures a non reachable message bus. The expectation that after the
configured number of retries the driver gives up the message sending.
"""
self.config(
driver=["messagingv2"],
topics=["test-retry"],
retry=2,
group="oslo_messaging_notifications")
transport = oslo_messaging.get_notification_transport(
self.conf, url='rabbit://')
notifier = oslo_messaging.Notifier(transport)
orig_establish_connection = connection.Connection._establish_connection
calls = []
def wrapped_establish_connection(*args, **kwargs):
if len(calls) > 2:
raise self.TestingException(
"Connection should only be retried twice due to "
"configuration")
else:
calls.append((args, kwargs))
orig_establish_connection(*args, **kwargs)
with mock.patch(
'kombu.connection.Connection._establish_connection',
new=wrapped_establish_connection
):
# FIXME(gibi) This is bug 1917645 as the driver does not stop
# retrying the connection after two retries only our test fixture
# stops the retry by raising TestingException
self.assertRaises(
self.TestingException,
notifier.info, {}, "test", {})
def test_notifier_retry_connection_fails_kafka(self):
"""This test sets a small retry number for notification sending and
configures a non reachable message bus. The expectation that after the
configured number of retries the driver gives up the message sending.
"""
self.config(
driver=["messagingv2"],
topics=["test-retry"],
retry=2,
group='oslo_messaging_notifications')
transport = oslo_messaging.get_notification_transport(
self.conf, url='kafka://')
notifier = oslo_messaging.Notifier(transport)
# Kafka's message producer interface is async, and there is no way
# from the oslo interface to force sending a pending message. So this
# call simply returns without i) failing to deliver the message to
# the non existent kafka bus ii) retrying the message delivery twice
# as the configuration requested it.
notifier.info({}, "test", {})
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):