Merge "Make kombu failures retry on IOError"
This commit is contained in:
@@ -445,7 +445,7 @@ class Connection(object):
|
|||||||
try:
|
try:
|
||||||
self._connect()
|
self._connect()
|
||||||
return
|
return
|
||||||
except self.connection_errors, e:
|
except (self.connection_errors, IOError), e:
|
||||||
pass
|
pass
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||||
@@ -488,7 +488,7 @@ class Connection(object):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return method(*args, **kwargs)
|
return method(*args, **kwargs)
|
||||||
except (self.connection_errors, socket.timeout), e:
|
except (self.connection_errors, socket.timeout, IOError), e:
|
||||||
pass
|
pass
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||||
|
|||||||
@@ -35,14 +35,15 @@ class MyException(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _raise_exc_stub(stubs, times, obj, method, exc_msg):
|
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
|
||||||
|
exc_class=MyException):
|
||||||
info = {'called': 0}
|
info = {'called': 0}
|
||||||
orig_method = getattr(obj, method)
|
orig_method = getattr(obj, method)
|
||||||
|
|
||||||
def _raise_stub(*args, **kwargs):
|
def _raise_stub(*args, **kwargs):
|
||||||
info['called'] += 1
|
info['called'] += 1
|
||||||
if info['called'] <= times:
|
if info['called'] <= times:
|
||||||
raise MyException(exc_msg)
|
raise exc_class(exc_msg)
|
||||||
orig_method(*args, **kwargs)
|
orig_method(*args, **kwargs)
|
||||||
stubs.Set(obj, method, _raise_stub)
|
stubs.Set(obj, method, _raise_stub)
|
||||||
return info
|
return info
|
||||||
@@ -213,6 +214,18 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.assertEqual(info['called'], 2)
|
self.assertEqual(info['called'], 2)
|
||||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||||
|
|
||||||
|
def test_declare_consumer_ioerrors_will_reconnect(self):
|
||||||
|
"""Test that an IOError exception causes a reconnection"""
|
||||||
|
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
||||||
|
'__init__', 'Socket closed', exc_class=IOError)
|
||||||
|
|
||||||
|
conn = self.rpc.Connection()
|
||||||
|
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||||
|
'test_topic', None)
|
||||||
|
|
||||||
|
self.assertEqual(info['called'], 3)
|
||||||
|
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||||
|
|
||||||
def test_publishing_errors_will_reconnect(self):
|
def test_publishing_errors_will_reconnect(self):
|
||||||
# Test that any exception with 'timeout' in it causes a
|
# Test that any exception with 'timeout' in it causes a
|
||||||
# reconnection when declaring the publisher class and when
|
# reconnection when declaring the publisher class and when
|
||||||
|
|||||||
Reference in New Issue
Block a user