diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 08cb20517..2d4f7558b 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -885,6 +885,13 @@ class Connection(object): exc) def _consume(): + # NOTE(sileht): in case the acknowledgement or requeue of a + # message fail, the kombu transport can be disconnected + # In this case, we must redeclare our consumers, so raise + # a recoverable error to trigger the reconnection code. + if not self.connection.connected: + raise self.connection.recoverable_connection_errors[0] + if self._new_consumers: for tag, consumer in enumerate(self._consumers): if consumer in self._new_consumers: diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index c346019ee..945c5c018 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -208,6 +208,21 @@ class TestRabbitConsume(test_utils.BaseTestCase): conn.reset() self.assertEqual(channel, conn.channel) + def test_connection_ack_have_disconnected_kombu_connection(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN + ).connection + channel = conn.channel + with mock.patch('kombu.connection.Connection.connected', + new_callable=mock.PropertyMock, + return_value=False): + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=0.01) + # Ensure a new channel have been setuped + self.assertNotEqual(channel, conn.channel) + class TestRabbitTransportURL(test_utils.BaseTestCase):