Implement more complete kombu reconnecting

Fixes bug 888621

We were missing some wrapping around when consumers are declared and
a case where we had an exception we weren't trapping.  In the latter
case, it's not easy to trap it because you'd have to bypass the kombu
interface and import amqplib and try to trap one of its exceptions.
What I've implemented here looks for 'timeout' in any exception, even
though I really don't like it. :)

Fixes HACKING violations while I'm at it.

Change-Id: I0132fbc4377e221b0a366d0340652147ddb33c87
This commit is contained in:
Chris Behrens
2012-01-11 12:35:42 -08:00
parent 6d80851279
commit 59e8ae1362
2 changed files with 250 additions and 67 deletions

View File

@@ -14,10 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import kombu
import kombu.entity
import kombu.messaging
import kombu.connection
import inspect
import itertools
import sys
@@ -29,17 +25,18 @@ import eventlet
from eventlet import greenpool
from eventlet import pools
import greenlet
import kombu
import kombu.entity
import kombu.messaging
import kombu.connection
from nova import context
from nova import exception
from nova import flags
from nova.rpc import common as rpc_common
from nova.rpc.common import RemoteError, LOG
# Needed for tests
eventlet.monkey_patch()
FLAGS = flags.FLAGS
LOG = rpc_common.LOG
class ConsumerBase(object):
@@ -340,58 +337,115 @@ class Connection(object):
self.connection = None
self.reconnect()
def reconnect(self):
"""Handles reconnecting and re-establishing queues"""
def _connect(self):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection.connection_errors:
except self.connection_errors:
pass
time.sleep(1)
self.connection = kombu.connection.BrokerConnection(**self.params)
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
self.connection = kombu.connection.BrokerConnection(
**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
try:
self.connection.ensure_connection(errback=self.connect_error,
max_retries=self.max_retries,
interval_start=self.interval_start,
interval_step=self.interval_stepping,
interval_max=self.interval_max)
except self.connection.connection_errors, e:
# We should only get here if max_retries is set. We'll go
# ahead and exit in this case.
err_str = str(e)
max_retries = self.max_retries
LOG.error(_('Unable to connect to AMQP server '
'after %(max_retries)d tries: %(err_str)s') % locals())
sys.exit(1)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
self.params))
self.connection.connect()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
if self.consumers:
LOG.debug(_("Re-established AMQP queues"))
LOG.info(_('Connected to AMQP server on '
'%(hostname)s:%(port)d' % self.params))
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
Will retry up to self.max_retries number of times.
self.max_retries = 0 means to retry forever.
Sleep between tries, starting at self.rabbit_retry_interval
seconds, backing off self.rabbit_retry_backoff number of seconds
each attempt.
"""
attempt = 0
while True:
attempt += 1
try:
self._connect()
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
log_info.update(self.params)
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1:
sleep_time = self.rabbit_retry_interval or 1
elif attempt > 1:
sleep_time += self.rabbit_retry_backoff
if self.interval_max:
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
if error_callback:
error_callback(e)
self.reconnect()
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues"""
return self.channel
def connect_error(self, exc, interval):
"""Callback when there are connection re-tries by kombu"""
info = self.params.copy()
info['intv'] = interval
info['e'] = exc
LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(e)s. Trying again in %(intv)d'
' seconds.') % info)
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
@@ -412,29 +466,44 @@ class Connection(object):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
consumer = consumer_cls(self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None):
"""Return an iterator that will consume from all queues/consumers"""
while True:
try:
info = {'do_consume': True}
def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
def _consume():
if info['do_consume']:
queues_head = self.consumers[:-1]
queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
return self.connection.drain_events()
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.connection.drain_events()
except self.connection.connection_errors, e:
LOG.exception(_('Failed to consume message from queue: '
'%s' % str(e)))
self.reconnect()
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
@@ -448,17 +517,17 @@ class Connection(object):
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
while True:
try:
publisher = cls(self.channel, topic, **kwargs)
publisher.send(msg)
return
except self.connection.connection_errors, e:
LOG.exception(_('Failed to publish message %s' % str(e)))
try:
self.reconnect()
except self.connection.connection_errors, e:
pass
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publish():
publisher = cls(self.channel, topic, **kwargs)
publisher.send(msg)
self.ensure(_error_callback, _publish)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -723,7 +792,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
self._result = rpc_common.RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:

View File

@@ -29,6 +29,23 @@ from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
class MyException(Exception):
pass
def _raise_exc_stub(stubs, times, obj, method, exc_msg):
info = {'called': 0}
orig_method = getattr(obj, method)
def _raise_stub(*args, **kwargs):
info['called'] += 1
if info['called'] <= times:
raise MyException(exc_msg)
orig_method(*args, **kwargs)
stubs.Set(obj, method, _raise_stub)
return info
class RpcKombuTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_kombu
@@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
def test_declare_consumer_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection()
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
# Test that any exception in transport.connection_errors causes
# a reconnection
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
'__init__', 'meow')
conn = self.rpc.Connection()
conn.connection_errors = (MyException, )
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 2)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
def test_publishing_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection when declaring the publisher class and when
# calling send()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection()
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'send', 'foo timeout foo')
conn = self.rpc.Connection()
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
# Test that any exception in transport.connection_errors causes
# a reconnection when declaring the publisher class and when
# calling send()
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'__init__', 'meow')
conn = self.rpc.Connection()
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'send', 'meow')
conn = self.rpc.Connection()
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
def test_iterconsume_errors_will_reconnect(self):
conn = self.rpc.Connection()
message = 'reconnect test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
info = _raise_exc_stub(self.stubs, 1, conn.connection,
'drain_events', 'foo timeout foo')
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
# Only called once, because our stub goes away during reconnection
self.assertEqual(info['called'], 1)