Merge "rabbit: uses kombu instead of builtin stuffs"

This commit is contained in:
Jenkins 2014-11-20 23:19:33 +00:00 committed by Gerrit Code Review
commit e67e9a6aa9
2 changed files with 150 additions and 283 deletions

View File

@ -15,7 +15,6 @@
import functools
import itertools
import logging
import random
import socket
import ssl
import time
@ -24,8 +23,10 @@ import uuid
import kombu
import kombu.connection
import kombu.entity
import kombu.exceptions
import kombu.messaging
import six
from six.moves.urllib import parse
from oslo.config import cfg
from oslo.messaging._drivers import amqp as rpc_amqp
@ -35,6 +36,7 @@ from oslo.messaging._i18n import _
from oslo.messaging import exceptions
from oslo.utils import netutils
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
@ -424,6 +426,7 @@ class Connection(object):
def __init__(self, conf, url):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@ -433,62 +436,62 @@ class Connection(object):
self.interval_stepping = self.conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
ssl_params = self._fetch_ssl_params()
self._ssl_params = self._fetch_ssl_params()
self._login_method = self.conf.rabbit_login_method
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.conf.rabbit_virtual_host
self.brokers_params = []
if url.hosts:
self._url = ''
if self.conf.fake_rabbit:
# TODO(sileht): use memory://virtual_host into
# unit tests to remove cfg.CONF.fake_rabbit
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
for host in url.hosts:
params = {
'hostname': host.hostname,
'port': host.port or 5672,
'userid': host.username or '',
'password': host.password or '',
'login_method': self.conf.rabbit_login_method,
'virtual_host': virtual_host
}
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
self.brokers_params.append(params)
transport = url.transport.replace('kombu+', '')
transport = url.transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
parse.quote(host.username or ''),
parse.quote(host.password or ''),
host.hostname or '', str(host.port or 5672),
virtual_host)
else:
# Old configuration format
for adr in self.conf.rabbit_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
parse.quote(self.conf.rabbit_userid),
parse.quote(self.conf.rabbit_password),
hostname, port,
virtual_host)
params = {
'hostname': hostname,
'port': port,
'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password,
'login_method': self.conf.rabbit_login_method,
'virtual_host': virtual_host
}
self.do_consume = True
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
self.channel = None
self.connection = kombu.connection.Connection(
self._url, ssl=self._ssl_params, login_method=self._login_method,
failover_strategy="shuffle")
self.brokers_params.append(params)
LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda channel: True)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
random.shuffle(self.brokers_params)
self.brokers = itertools.cycle(self.brokers_params)
self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.do_consume = None
self.reconnect()
if self.conf.fake_rabbit:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
# FIXME(markmc): use oslo sslutils when it is available as a library
_SSL_PROTOCOLS = {
@ -531,153 +534,87 @@ class Connection(object):
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
# Return the extended behavior or just have the default behavior
return ssl_params or True
return ssl_params or None
def _connect(self, broker):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
def _setup_new_channel(self, new_channel):
"""Callback invoked when the kombu connection have created
a new channel, we use it the reconfigure our consumers.
"""
LOG.info(_("Connecting to AMQP server on "
"%(hostname)s:%(port)d"), broker)
self.connection = kombu.connection.BrokerConnection(**broker)
self.connection_errors = self.connection.connection_errors
self.channel_errors = self.connection.channel_errors
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.do_consume = True
self.consumer_num = itertools.count(1)
self.connection.connect()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
broker)
consumer.reconnect(new_channel)
def ensure(self, error_callback, method, retry=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
"""
if retry is None:
retry = self.max_retries
if retry is None or retry < 0:
retry = None
def on_error(exc, interval):
error_callback and error_callback(exc)
info = {'hostname': self.connection.hostname,
'port': self.connection.port,
'err_str': exc, 'sleep_time': interval}
if 'Socket closed' in six.text_type(exc):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
def _disconnect(self):
if self.connection:
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
# connection can hang "indefinitely" somewhere deep down
# in Kombu. Blocking the thread for a bit prior to
# release seems to kludge around the problem where it is
# otherwise reproduceable.
# TODO(sileht): Check if this is useful since we
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
try:
self.connection.release()
except self.connection_errors:
pass
self.connection = None
def reconnect(self, retry=None):
"""Handles reconnecting and re-establishing queues.
Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
Sleep between tries, starting at self.interval_start
seconds, backing off self.interval_stepping number of seconds
each attempt.
"""
attempt = 0
loop_forever = False
if retry is None:
retry = self.max_retries
if retry is None or retry < 0:
loop_forever = True
while True:
self._disconnect()
broker = six.next(self.brokers)
attempt += 1
try:
self._connect(broker)
return
except IOError as ex:
e = ex
except self.connection_errors as ex:
e = ex
except Exception as ex:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in six.text_type(e):
raise
e = ex
log_info = {}
log_info['err_str'] = e
log_info['retry'] = retry or 0
log_info.update(broker)
if not loop_forever and attempt > retry:
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(retry)d '
'tries: %(err_str)s') % log_info
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
else:
if attempt == 1:
sleep_time = self.interval_start or 1
elif attempt > 1:
sleep_time += self.interval_stepping
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
if 'Socket closed' in six.text_type(e):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), log_info)
else:
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, retry=None):
while True:
try:
return method()
except self.connection_errors as e:
if error_callback:
error_callback(e)
except self.channel_errors as e:
if error_callback:
error_callback(e)
except (socket.timeout, IOError) as e:
if error_callback:
error_callback(e)
except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in six.text_type(e):
raise
if error_callback:
error_callback(e)
self.reconnect(retry=retry)
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues."""
return self.channel
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
autoretry_method = self.connection.autoretry(
method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
on_revive=self._setup_new_channel,
)
ret, channel = autoretry_method()
self.channel = channel
return ret
except recoverable_errors as exc:
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(retry)d '
'tries: %(err_str)s') % {
'hostname': self.connection.hostname,
'port': self.connection.port,
'err_str': exc,
'retry': retry}
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
def close(self):
"""Close/release this connection."""
@ -689,10 +626,8 @@ class Connection(object):
"""Reset a connection so it can be used again."""
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
self.consumers = []
self._setup_new_channel(self.channel)
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
@ -704,8 +639,8 @@ class Connection(object):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
def _declare_consumer(channel):
consumer = consumer_cls(self.conf, channel, topic, callback,
six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
@ -716,15 +651,11 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers."""
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
self.do_consume = True
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
self.do_consume = True
def _consume():
def _consume(channel):
if self.do_consume:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
@ -732,7 +663,11 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.do_consume = False
return self.connection.drain_events(timeout=timeout)
try:
return self.connection.drain_events(timeout=timeout)
except socket.timeout as exc:
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
for iteration in itertools.count(0):
if limit and iteration >= limit:
@ -748,8 +683,8 @@ class Connection(object):
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
def _publish(channel):
publisher = cls(self.conf, channel, topic=topic, **kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry)

View File

@ -13,7 +13,6 @@
# under the License.
import datetime
import operator
import sys
import threading
import uuid
@ -21,6 +20,7 @@ import uuid
import fixtures
import kombu
import mock
from oslotest import mockpatch
import testscenarios
from oslo import messaging
@ -49,87 +49,43 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='/')])),
expected=["amqp://guest:guest@localhost:5672//"])),
('empty',
dict(url='rabbit:///',
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='')])),
expected=['amqp://guest:guest@localhost:5672/'])),
('localhost',
dict(url='rabbit://localhost/',
expected=[dict(hostname='localhost',
port=5672,
userid='',
password='',
virtual_host='')])),
expected=['amqp://:@localhost:5672/'])),
('virtual_host',
dict(url='rabbit:///vhost',
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='vhost')])),
expected=['amqp://guest:guest@localhost:5672/vhost'])),
('no_creds',
dict(url='rabbit://host/virtual_host',
expected=[dict(hostname='host',
port=5672,
userid='',
password='',
virtual_host='virtual_host')])),
expected=['amqp://:@host:5672/virtual_host'])),
('no_port',
dict(url='rabbit://user:password@host/virtual_host',
expected=[dict(hostname='host',
port=5672,
userid='user',
password='password',
virtual_host='virtual_host')])),
expected=['amqp://user:password@host:5672/virtual_host'])),
('full_url',
dict(url='rabbit://user:password@host:10/virtual_host',
expected=[dict(hostname='host',
port=10,
userid='user',
password='password',
virtual_host='virtual_host')])),
expected=['amqp://user:password@host:10/virtual_host'])),
('full_two_url',
dict(url='rabbit://user:password@host:10,'
'user2:password2@host2:12/virtual_host',
expected=[dict(hostname='host',
port=10,
userid='user',
password='password',
virtual_host='virtual_host'),
dict(hostname='host2',
port=12,
userid='user2',
password='password2',
virtual_host='virtual_host')
]
expected=["amqp://user:password@host:10/virtual_host",
"amqp://user2:password2@host2:12/virtual_host"]
)),
]
def test_transport_url(self):
self.messaging_conf.in_memory = True
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure')
def test_transport_url(self, fake_ensure):
self.messaging_conf.in_memory = False
transport = messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
brokers_params = driver._get_connection().brokers_params[:]
brokers_params = [dict((k, v) for k, v in broker.items()
if k not in ['transport', 'login_method'])
for broker in brokers_params]
self.assertEqual(sorted(self.expected,
key=operator.itemgetter('hostname')),
sorted(brokers_params,
key=operator.itemgetter('hostname')))
urls = driver._get_connection()._url.split(";")
self.assertEqual(sorted(self.expected), sorted(urls))
class TestSendReceive(test_utils.BaseTestCase):
@ -674,62 +630,38 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(rabbit_hosts=self.brokers)
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, params):
# do as little work that is enough to pass connection attempt
myself.connection = kombu.connection.BrokerConnection(**params)
myself.connection_errors = myself.connection.connection_errors
hostname = params['hostname']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise IOError('fake fail')
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.channel'))
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
def test_reconnect_order(self):
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)