Browse Source

rabbit: uses kombu instead of builtin stuffs

The rabbit driver have a custom code to reconnect to a broker,
to change the broker in HA configuration, to retry a to send
a message, to handle the interval between reconnection.

But all of that exists in kombu, so just use it.

Using the kombu Connection object with the url make also the rabbit
driver more generic.

Futher patches can rename rabbit* oslo.config options to a more generic
name and add a new driver entry_point 'kombu' to allow to use this driver
with any borker supported by kombu.

Change-Id: Id6b89d5448126ca652b46fe6ce5a9b3ed5839795
changes/75/135275/8
Mehdi Abaakouk 7 years ago
parent
commit
973301aa70
  1. 311
      oslo/messaging/_drivers/impl_rabbit.py
  2. 122
      tests/drivers/test_impl_rabbit.py

311
oslo/messaging/_drivers/impl_rabbit.py

@ -15,7 +15,6 @@
import functools
import itertools
import logging
import random
import socket
import ssl
import time
@ -24,8 +23,10 @@ import uuid
import kombu
import kombu.connection
import kombu.entity
import kombu.exceptions
import kombu.messaging
import six
from six.moves.urllib import parse
from oslo.config import cfg
from oslo.messaging._drivers import amqp as rpc_amqp
@ -35,6 +36,7 @@ from oslo.messaging._i18n import _
from oslo.messaging import exceptions
from oslo.utils import netutils
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
@ -424,6 +426,7 @@ class Connection(object):
def __init__(self, conf, url):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@ -433,62 +436,62 @@ class Connection(object):
self.interval_stepping = self.conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
ssl_params = self._fetch_ssl_params()
self._ssl_params = self._fetch_ssl_params()
self._login_method = self.conf.rabbit_login_method
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.conf.rabbit_virtual_host
self.brokers_params = []
if url.hosts:
self._url = ''
if self.conf.fake_rabbit:
# TODO(sileht): use memory://virtual_host into
# unit tests to remove cfg.CONF.fake_rabbit
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
for host in url.hosts:
params = {
'hostname': host.hostname,
'port': host.port or 5672,
'userid': host.username or '',
'password': host.password or '',
'login_method': self.conf.rabbit_login_method,
'virtual_host': virtual_host
}
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
self.brokers_params.append(params)
transport = url.transport.replace('kombu+', '')
transport = url.transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
parse.quote(host.username or ''),
parse.quote(host.password or ''),
host.hostname or '', str(host.port or 5672),
virtual_host)
else:
# Old configuration format
for adr in self.conf.rabbit_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
parse.quote(self.conf.rabbit_userid),
parse.quote(self.conf.rabbit_password),
hostname, port,
virtual_host)
params = {
'hostname': hostname,
'port': port,
'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password,
'login_method': self.conf.rabbit_login_method,
'virtual_host': virtual_host
}
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
self.brokers_params.append(params)
random.shuffle(self.brokers_params)
self.brokers = itertools.cycle(self.brokers_params)
self.do_consume = True
self.memory_transport = self.conf.fake_rabbit
self.channel = None
self.connection = kombu.connection.Connection(
self._url, ssl=self._ssl_params, login_method=self._login_method,
failover_strategy="shuffle")
LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda channel: True)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
self.connection = None
self.do_consume = None
self.reconnect()
if self.conf.fake_rabbit:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
# FIXME(markmc): use oslo sslutils when it is available as a library
_SSL_PROTOCOLS = {
@ -531,153 +534,87 @@ class Connection(object):
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
# Return the extended behavior or just have the default behavior
return ssl_params or True
return ssl_params or None
def _connect(self, broker):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
def _setup_new_channel(self, new_channel):
"""Callback invoked when the kombu connection have created
a new channel, we use it the reconfigure our consumers.
"""
LOG.info(_("Connecting to AMQP server on "
"%(hostname)s:%(port)d"), broker)
self.connection = kombu.connection.BrokerConnection(**broker)
self.connection_errors = self.connection.connection_errors
self.channel_errors = self.connection.channel_errors
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.do_consume = True
self.consumer_num = itertools.count(1)
self.connection.connect()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
broker)
def _disconnect(self):
if self.connection:
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
# connection can hang "indefinitely" somewhere deep down
# in Kombu. Blocking the thread for a bit prior to
# release seems to kludge around the problem where it is
# otherwise reproduceable.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
try:
self.connection.release()
except self.connection_errors:
pass
self.connection = None
consumer.reconnect(new_channel)
def reconnect(self, retry=None):
"""Handles reconnecting and re-establishing queues.
Will retry up to retry number of times.
def ensure(self, error_callback, method, retry=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
Sleep between tries, starting at self.interval_start
seconds, backing off self.interval_stepping number of seconds
each attempt.
"""
attempt = 0
loop_forever = False
if retry is None:
retry = self.max_retries
if retry is None or retry < 0:
loop_forever = True
retry = None
while True:
self._disconnect()
def on_error(exc, interval):
error_callback and error_callback(exc)
broker = six.next(self.brokers)
attempt += 1
try:
self._connect(broker)
return
except IOError as ex:
e = ex
except self.connection_errors as ex:
e = ex
except Exception as ex:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in six.text_type(e):
raise
e = ex
log_info = {}
log_info['err_str'] = e
log_info['retry'] = retry or 0
log_info.update(broker)
if not loop_forever and attempt > retry:
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(retry)d '
'tries: %(err_str)s') % log_info
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
info = {'hostname': self.connection.hostname,
'port': self.connection.port,
'err_str': exc, 'sleep_time': interval}
if 'Socket closed' in six.text_type(exc):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
if attempt == 1:
sleep_time = self.interval_start or 1
elif attempt > 1:
sleep_time += self.interval_stepping
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
if 'Socket closed' in six.text_type(e):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), log_info)
else:
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, retry=None):
while True:
try:
return method()
except self.connection_errors as e:
if error_callback:
error_callback(e)
except self.channel_errors as e:
if error_callback:
error_callback(e)
except (socket.timeout, IOError) as e:
if error_callback:
error_callback(e)
except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in six.text_type(e):
raise
if error_callback:
error_callback(e)
self.reconnect(retry=retry)
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues."""
return self.channel
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
# connection can hang "indefinitely" somewhere deep down
# in Kombu. Blocking the thread for a bit prior to
# release seems to kludge around the problem where it is
# otherwise reproduceable.
# TODO(sileht): Check if this is useful since we
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
autoretry_method = self.connection.autoretry(
method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
on_revive=self._setup_new_channel,
)
ret, channel = autoretry_method()
self.channel = channel
return ret
except recoverable_errors as exc:
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(retry)d '
'tries: %(err_str)s') % {
'hostname': self.connection.hostname,
'port': self.connection.port,
'err_str': exc,
'retry': retry}
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
def close(self):
"""Close/release this connection."""
@ -689,10 +626,8 @@ class Connection(object):
"""Reset a connection so it can be used again."""
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
self.consumers = []
self._setup_new_channel(self.channel)
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
@ -704,8 +639,8 @@ class Connection(object):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
def _declare_consumer(channel):
consumer = consumer_cls(self.conf, channel, topic, callback,
six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
@ -716,15 +651,11 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers."""
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
self.do_consume = True
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
self.do_consume = True
def _consume():
def _consume(channel):
if self.do_consume:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
@ -732,7 +663,11 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.do_consume = False
return self.connection.drain_events(timeout=timeout)
try:
return self.connection.drain_events(timeout=timeout)
except socket.timeout as exc:
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
for iteration in itertools.count(0):
if limit and iteration >= limit:
@ -748,8 +683,8 @@ class Connection(object):
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
def _publish(channel):
publisher = cls(self.conf, channel, topic=topic, **kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry)

122
tests/drivers/test_impl_rabbit.py

@ -13,7 +13,6 @@
# under the License.
import datetime
import operator
import sys
import threading
import uuid
@ -21,6 +20,7 @@ import uuid
import fixtures
import kombu
import mock
from oslotest import mockpatch
import testscenarios
from oslo import messaging
@ -49,87 +49,43 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='/')])),
expected=["amqp://guest:guest@localhost:5672//"])),
('empty',
dict(url='rabbit:///',
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='')])),
expected=['amqp://guest:guest@localhost:5672/'])),
('localhost',
dict(url='rabbit://localhost/',
expected=[dict(hostname='localhost',
port=5672,
userid='',
password='',
virtual_host='')])),
expected=['amqp://:@localhost:5672/'])),
('virtual_host',
dict(url='rabbit:///vhost',
expected=[dict(hostname='localhost',
port=5672,
userid='guest',
password='guest',
virtual_host='vhost')])),
expected=['amqp://guest:guest@localhost:5672/vhost'])),
('no_creds',
dict(url='rabbit://host/virtual_host',
expected=[dict(hostname='host',
port=5672,
userid='',
password='',
virtual_host='virtual_host')])),
expected=['amqp://:@host:5672/virtual_host'])),
('no_port',
dict(url='rabbit://user:password@host/virtual_host',
expected=[dict(hostname='host',
port=5672,
userid='user',
password='password',
virtual_host='virtual_host')])),
expected=['amqp://user:password@host:5672/virtual_host'])),
('full_url',
dict(url='rabbit://user:password@host:10/virtual_host',
expected=[dict(hostname='host',
port=10,
userid='user',
password='password',
virtual_host='virtual_host')])),
expected=['amqp://user:password@host:10/virtual_host'])),
('full_two_url',
dict(url='rabbit://user:password@host:10,'
'user2:password2@host2:12/virtual_host',
expected=[dict(hostname='host',
port=10,
userid='user',
password='password',
virtual_host='virtual_host'),
dict(hostname='host2',
port=12,
userid='user2',
password='password2',
virtual_host='virtual_host')
]
expected=["amqp://user:password@host:10/virtual_host",
"amqp://user2:password2@host2:12/virtual_host"]
)),
]
def test_transport_url(self):
self.messaging_conf.in_memory = True
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure')
def test_transport_url(self, fake_ensure):
self.messaging_conf.in_memory = False
transport = messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
brokers_params = driver._get_connection().brokers_params[:]
brokers_params = [dict((k, v) for k, v in broker.items()
if k not in ['transport', 'login_method'])
for broker in brokers_params]
self.assertEqual(sorted(self.expected,
key=operator.itemgetter('hostname')),
sorted(brokers_params,
key=operator.itemgetter('hostname')))
urls = driver._get_connection()._url.split(";")
self.assertEqual(sorted(self.expected), sorted(urls))
class TestSendReceive(test_utils.BaseTestCase):
@ -674,62 +630,38 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(rabbit_hosts=self.brokers)
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, params):
# do as little work that is enough to pass connection attempt
myself.connection = kombu.connection.BrokerConnection(**params)
myself.connection_errors = myself.connection.connection_errors
hostname = params['hostname']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise IOError('fake fail')
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.channel'))
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
def test_reconnect_order(self):
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)
Loading…
Cancel
Save