Merge "Implement more complete kombu reconnecting"
This commit is contained in:
@@ -14,10 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import kombu
|
||||
import kombu.entity
|
||||
import kombu.messaging
|
||||
import kombu.connection
|
||||
import inspect
|
||||
import itertools
|
||||
import sys
|
||||
@@ -29,17 +25,18 @@ import eventlet
|
||||
from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
import greenlet
|
||||
import kombu
|
||||
import kombu.entity
|
||||
import kombu.messaging
|
||||
import kombu.connection
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova.rpc import common as rpc_common
|
||||
from nova.rpc.common import RemoteError, LOG
|
||||
|
||||
# Needed for tests
|
||||
eventlet.monkey_patch()
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = rpc_common.LOG
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
@@ -340,58 +337,115 @@ class Connection(object):
|
||||
self.connection = None
|
||||
self.reconnect()
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing queues"""
|
||||
def _connect(self):
|
||||
"""Connect to rabbit. Re-establish any queues that may have
|
||||
been declared before if we are reconnecting. Exceptions should
|
||||
be handled by the caller.
|
||||
"""
|
||||
if self.connection:
|
||||
LOG.info(_("Reconnecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % self.params)
|
||||
try:
|
||||
self.connection.close()
|
||||
except self.connection.connection_errors:
|
||||
except self.connection_errors:
|
||||
pass
|
||||
time.sleep(1)
|
||||
self.connection = kombu.connection.BrokerConnection(**self.params)
|
||||
# Setting this in case the next statement fails, though
|
||||
# it shouldn't be doing any network operations, yet.
|
||||
self.connection = None
|
||||
self.connection = kombu.connection.BrokerConnection(
|
||||
**self.params)
|
||||
self.connection_errors = self.connection.connection_errors
|
||||
if self.memory_transport:
|
||||
# Kludge to speed up tests.
|
||||
self.connection.transport.polling_interval = 0.0
|
||||
self.consumer_num = itertools.count(1)
|
||||
|
||||
try:
|
||||
self.connection.ensure_connection(errback=self.connect_error,
|
||||
max_retries=self.max_retries,
|
||||
interval_start=self.interval_start,
|
||||
interval_step=self.interval_stepping,
|
||||
interval_max=self.interval_max)
|
||||
except self.connection.connection_errors, e:
|
||||
# We should only get here if max_retries is set. We'll go
|
||||
# ahead and exit in this case.
|
||||
err_str = str(e)
|
||||
max_retries = self.max_retries
|
||||
LOG.error(_('Unable to connect to AMQP server '
|
||||
'after %(max_retries)d tries: %(err_str)s') % locals())
|
||||
sys.exit(1)
|
||||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
|
||||
self.params))
|
||||
self.connection.connect()
|
||||
self.channel = self.connection.channel()
|
||||
# work around 'memory' transport bug in 1.1.3
|
||||
if self.memory_transport:
|
||||
self.channel._new_queue('ae.undeliver')
|
||||
for consumer in self.consumers:
|
||||
consumer.reconnect(self.channel)
|
||||
if self.consumers:
|
||||
LOG.debug(_("Re-established AMQP queues"))
|
||||
LOG.info(_('Connected to AMQP server on '
|
||||
'%(hostname)s:%(port)d' % self.params))
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing queues.
|
||||
Will retry up to self.max_retries number of times.
|
||||
self.max_retries = 0 means to retry forever.
|
||||
Sleep between tries, starting at self.rabbit_retry_interval
|
||||
seconds, backing off self.rabbit_retry_backoff number of seconds
|
||||
each attempt.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
while True:
|
||||
attempt += 1
|
||||
try:
|
||||
self._connect()
|
||||
return
|
||||
except self.connection_errors, e:
|
||||
pass
|
||||
except Exception, e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
# a protocol response. (See paste link in LP888621)
|
||||
# So, we check all exceptions for 'timeout' in them
|
||||
# and try to reconnect in this case.
|
||||
if 'timeout' not in str(e):
|
||||
raise
|
||||
|
||||
log_info = {}
|
||||
log_info['err_str'] = str(e)
|
||||
log_info['max_retries'] = self.max_retries
|
||||
log_info.update(self.params)
|
||||
|
||||
if self.max_retries and attempt == self.max_retries:
|
||||
LOG.exception(_('Unable to connect to AMQP server on '
|
||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||
'tries: %(err_str)s') % log_info)
|
||||
# NOTE(comstud): Copied from original code. There's
|
||||
# really no better recourse because if this was a queue we
|
||||
# need to consume on, we have no way to consume anymore.
|
||||
sys.exit(1)
|
||||
|
||||
if attempt == 1:
|
||||
sleep_time = self.rabbit_retry_interval or 1
|
||||
elif attempt > 1:
|
||||
sleep_time += self.rabbit_retry_backoff
|
||||
if self.interval_max:
|
||||
sleep_time = min(sleep_time, self.interval_max)
|
||||
|
||||
log_info['sleep_time'] = sleep_time
|
||||
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
|
||||
' unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def ensure(self, error_callback, method, *args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except self.connection_errors, e:
|
||||
pass
|
||||
except Exception, e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
# a protocol response. (See paste link in LP888621)
|
||||
# So, we check all exceptions for 'timeout' in them
|
||||
# and try to reconnect in this case.
|
||||
if 'timeout' not in str(e):
|
||||
raise
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
|
||||
def get_channel(self):
|
||||
"""Convenience call for bin/clear_rabbit_queues"""
|
||||
return self.channel
|
||||
|
||||
def connect_error(self, exc, interval):
|
||||
"""Callback when there are connection re-tries by kombu"""
|
||||
info = self.params.copy()
|
||||
info['intv'] = interval
|
||||
info['e'] = exc
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
|
||||
' unreachable: %(e)s. Trying again in %(intv)d'
|
||||
' seconds.') % info)
|
||||
|
||||
def close(self):
|
||||
"""Close/release this connection"""
|
||||
self.cancel_consumer_thread()
|
||||
@@ -412,29 +466,44 @@ class Connection(object):
|
||||
"""Create a Consumer using the class that was passed in and
|
||||
add it to our list of consumers
|
||||
"""
|
||||
consumer = consumer_cls(self.channel, topic, callback,
|
||||
self.consumer_num.next())
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||
"%(err_str)s") % log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.channel, topic, callback,
|
||||
self.consumer_num.next())
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
return self.ensure(_connect_error, _declare_consumer)
|
||||
|
||||
def iterconsume(self, limit=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
while True:
|
||||
try:
|
||||
|
||||
info = {'do_consume': True}
|
||||
|
||||
def _error_callback(exc):
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
info['do_consume'] = True
|
||||
|
||||
def _consume():
|
||||
if info['do_consume']:
|
||||
queues_head = self.consumers[:-1]
|
||||
queues_tail = self.consumers[-1]
|
||||
for queue in queues_head:
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
info['do_consume'] = False
|
||||
return self.connection.drain_events()
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
raise StopIteration
|
||||
yield self.connection.drain_events()
|
||||
except self.connection.connection_errors, e:
|
||||
LOG.exception(_('Failed to consume message from queue: '
|
||||
'%s' % str(e)))
|
||||
self.reconnect()
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
raise StopIteration
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread"""
|
||||
@@ -448,17 +517,17 @@ class Connection(object):
|
||||
|
||||
def publisher_send(self, cls, topic, msg, **kwargs):
|
||||
"""Send to a publisher based on the publisher class"""
|
||||
while True:
|
||||
try:
|
||||
publisher = cls(self.channel, topic, **kwargs)
|
||||
publisher.send(msg)
|
||||
return
|
||||
except self.connection.connection_errors, e:
|
||||
LOG.exception(_('Failed to publish message %s' % str(e)))
|
||||
try:
|
||||
self.reconnect()
|
||||
except self.connection.connection_errors, e:
|
||||
pass
|
||||
|
||||
def _error_callback(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.exception(_("Failed to publish message to topic "
|
||||
"'%(topic)s': %(err_str)s") % log_info)
|
||||
|
||||
def _publish():
|
||||
publisher = cls(self.channel, topic, **kwargs)
|
||||
publisher.send(msg)
|
||||
|
||||
self.ensure(_error_callback, _publish)
|
||||
|
||||
def declare_direct_consumer(self, topic, callback):
|
||||
"""Create a 'direct' queue.
|
||||
@@ -723,7 +792,7 @@ class MulticallWaiter(object):
|
||||
def __call__(self, data):
|
||||
"""The consume() callback will call this. Store the result."""
|
||||
if data['failure']:
|
||||
self._result = RemoteError(*data['failure'])
|
||||
self._result = rpc_common.RemoteError(*data['failure'])
|
||||
elif data.get('ending', False):
|
||||
self._got_ending = True
|
||||
else:
|
||||
|
||||
@@ -29,6 +29,23 @@ from nova.tests.rpc import common
|
||||
LOG = logging.getLogger('nova.tests.rpc')
|
||||
|
||||
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _raise_exc_stub(stubs, times, obj, method, exc_msg):
|
||||
info = {'called': 0}
|
||||
orig_method = getattr(obj, method)
|
||||
|
||||
def _raise_stub(*args, **kwargs):
|
||||
info['called'] += 1
|
||||
if info['called'] <= times:
|
||||
raise MyException(exc_msg)
|
||||
orig_method(*args, **kwargs)
|
||||
stubs.Set(obj, method, _raise_stub)
|
||||
return info
|
||||
|
||||
|
||||
class RpcKombuTestCase(common._BaseRpcTestCase):
|
||||
def setUp(self):
|
||||
self.rpc = impl_kombu
|
||||
@@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
|
||||
conn2.consume(limit=1)
|
||||
conn2.close()
|
||||
self.assertEqual(self.received_message, message)
|
||||
|
||||
def test_declare_consumer_errors_will_reconnect(self):
|
||||
# Test that any exception with 'timeout' in it causes a
|
||||
# reconnection
|
||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
||||
'__init__', 'foo timeout foo')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||
'test_topic', None)
|
||||
|
||||
self.assertEqual(info['called'], 3)
|
||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||
|
||||
# Test that any exception in transport.connection_errors causes
|
||||
# a reconnection
|
||||
self.stubs.UnsetAll()
|
||||
|
||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
|
||||
'__init__', 'meow')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
conn.connection_errors = (MyException, )
|
||||
|
||||
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||
'test_topic', None)
|
||||
|
||||
self.assertEqual(info['called'], 2)
|
||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||
|
||||
def test_publishing_errors_will_reconnect(self):
|
||||
# Test that any exception with 'timeout' in it causes a
|
||||
# reconnection when declaring the publisher class and when
|
||||
# calling send()
|
||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
||||
'__init__', 'foo timeout foo')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||
|
||||
self.assertEqual(info['called'], 3)
|
||||
self.stubs.UnsetAll()
|
||||
|
||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
||||
'send', 'foo timeout foo')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||
|
||||
self.assertEqual(info['called'], 3)
|
||||
|
||||
# Test that any exception in transport.connection_errors causes
|
||||
# a reconnection when declaring the publisher class and when
|
||||
# calling send()
|
||||
self.stubs.UnsetAll()
|
||||
|
||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
||||
'__init__', 'meow')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
conn.connection_errors = (MyException, )
|
||||
|
||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||
|
||||
self.assertEqual(info['called'], 2)
|
||||
self.stubs.UnsetAll()
|
||||
|
||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
||||
'send', 'meow')
|
||||
|
||||
conn = self.rpc.Connection()
|
||||
conn.connection_errors = (MyException, )
|
||||
|
||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||
|
||||
self.assertEqual(info['called'], 2)
|
||||
|
||||
def test_iterconsume_errors_will_reconnect(self):
|
||||
conn = self.rpc.Connection()
|
||||
message = 'reconnect test message'
|
||||
|
||||
self.received_message = None
|
||||
|
||||
def _callback(message):
|
||||
self.received_message = message
|
||||
|
||||
conn.declare_direct_consumer('a_direct', _callback)
|
||||
conn.direct_send('a_direct', message)
|
||||
|
||||
info = _raise_exc_stub(self.stubs, 1, conn.connection,
|
||||
'drain_events', 'foo timeout foo')
|
||||
conn.consume(limit=1)
|
||||
conn.close()
|
||||
|
||||
self.assertEqual(self.received_message, message)
|
||||
# Only called once, because our stub goes away during reconnection
|
||||
self.assertEqual(info['called'], 1)
|
||||
|
||||
Reference in New Issue
Block a user