From 127f282d88240749845c21d69f8efbb3edc005e6 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Thu, 29 Mar 2012 18:44:16 +0000 Subject: [PATCH] Make kombu failures retry on IOError Fixes bug 797770 Unfortunately if rabbit decides protocol negotiation is taking too long, it will close the socket on us. This ends up raising IOError with a 'socket closed' message. This patch will catch IOError and re-try. Change-Id: I9110c845b71118c0fad760d90e91c585e6db46ed --- nova/rpc/impl_kombu.py | 4 ++-- nova/tests/rpc/test_kombu.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index f78005e3..f3b78edc 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -445,7 +445,7 @@ class Connection(object): try: self._connect() return - except self.connection_errors, e: + except (self.connection_errors, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -488,7 +488,7 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (self.connection_errors, socket.timeout), e: + except (self.connection_errors, socket.timeout, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 47d15f84..aa49b5d5 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -35,14 +35,15 @@ class MyException(Exception): pass -def _raise_exc_stub(stubs, times, obj, method, exc_msg): +def _raise_exc_stub(stubs, times, obj, method, exc_msg, + exc_class=MyException): info = {'called': 0} orig_method = getattr(obj, method) def _raise_stub(*args, **kwargs): info['called'] += 1 if info['called'] <= times: - raise MyException(exc_msg) + raise exc_class(exc_msg) orig_method(*args, **kwargs) stubs.Set(obj, method, _raise_stub) return info @@ -213,6 +214,18 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(info['called'], 2) self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + def test_declare_consumer_ioerrors_will_reconnect(self): + """Test that an IOError exception causes a reconnection""" + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, + '__init__', 'Socket closed', exc_class=IOError) + + conn = self.rpc.Connection() + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 3) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + def test_publishing_errors_will_reconnect(self): # Test that any exception with 'timeout' in it causes a # reconnection when declaring the publisher class and when