From 973301aa70527171749fa34897276c43898aeeb2 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 17 Nov 2014 16:46:46 +0100 Subject: [PATCH] rabbit: uses kombu instead of builtin stuffs The rabbit driver have a custom code to reconnect to a broker, to change the broker in HA configuration, to retry a to send a message, to handle the interval between reconnection. But all of that exists in kombu, so just use it. Using the kombu Connection object with the url make also the rabbit driver more generic. Futher patches can rename rabbit* oslo.config options to a more generic name and add a new driver entry_point 'kombu' to allow to use this driver with any borker supported by kombu. Change-Id: Id6b89d5448126ca652b46fe6ce5a9b3ed5839795 --- oslo/messaging/_drivers/impl_rabbit.py | 311 ++++++++++--------------- tests/drivers/test_impl_rabbit.py | 122 +++------- 2 files changed, 150 insertions(+), 283 deletions(-) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 0297e1e30..939a3cec2 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -15,7 +15,6 @@ import functools import itertools import logging -import random import socket import ssl import time @@ -24,8 +23,10 @@ import uuid import kombu import kombu.connection import kombu.entity +import kombu.exceptions import kombu.messaging import six +from six.moves.urllib import parse from oslo.config import cfg from oslo.messaging._drivers import amqp as rpc_amqp @@ -35,6 +36,7 @@ from oslo.messaging._i18n import _ from oslo.messaging import exceptions from oslo.utils import netutils + rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', default='', @@ -424,6 +426,7 @@ class Connection(object): def __init__(self, conf, url): self.consumers = [] + self.consumer_num = itertools.count(1) self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -433,62 +436,62 @@ class Connection(object): self.interval_stepping = self.conf.rabbit_retry_backoff # max retry-interval = 30 seconds self.interval_max = 30 - self.memory_transport = False - ssl_params = self._fetch_ssl_params() + self._ssl_params = self._fetch_ssl_params() + self._login_method = self.conf.rabbit_login_method if url.virtual_host is not None: virtual_host = url.virtual_host else: virtual_host = self.conf.rabbit_virtual_host - self.brokers_params = [] - if url.hosts: + self._url = '' + if self.conf.fake_rabbit: + # TODO(sileht): use memory://virtual_host into + # unit tests to remove cfg.CONF.fake_rabbit + self._url = 'memory://%s/' % virtual_host + elif url.hosts: for host in url.hosts: - params = { - 'hostname': host.hostname, - 'port': host.port or 5672, - 'userid': host.username or '', - 'password': host.password or '', - 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host - } - if self.conf.fake_rabbit: - params['transport'] = 'memory' - if self.conf.rabbit_use_ssl: - params['ssl'] = ssl_params - - self.brokers_params.append(params) + transport = url.transport.replace('kombu+', '') + transport = url.transport.replace('rabbit', 'amqp') + self._url += '%s%s://%s:%s@%s:%s/%s' % ( + ";" if self._url else '', + transport, + parse.quote(host.username or ''), + parse.quote(host.password or ''), + host.hostname or '', str(host.port or 5672), + virtual_host) else: - # Old configuration format for adr in self.conf.rabbit_hosts: hostname, port = netutils.parse_host_port( adr, default_port=self.conf.rabbit_port) + self._url += '%samqp://%s:%s@%s:%s/%s' % ( + ";" if self._url else '', + parse.quote(self.conf.rabbit_userid), + parse.quote(self.conf.rabbit_password), + hostname, port, + virtual_host) - params = { - 'hostname': hostname, - 'port': port, - 'userid': self.conf.rabbit_userid, - 'password': self.conf.rabbit_password, - 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host - } + self.do_consume = True - if self.conf.fake_rabbit: - params['transport'] = 'memory' - if self.conf.rabbit_use_ssl: - params['ssl'] = ssl_params + self.channel = None + self.connection = kombu.connection.Connection( + self._url, ssl=self._ssl_params, login_method=self._login_method, + failover_strategy="shuffle") - self.brokers_params.append(params) + LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'), + {'hostname': self.connection.hostname, + 'port': self.connection.port}) + # NOTE(sileht): just ensure the connection is setuped at startup + self.ensure(error_callback=None, + method=lambda channel: True) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), + {'hostname': self.connection.hostname, + 'port': self.connection.port}) - random.shuffle(self.brokers_params) - self.brokers = itertools.cycle(self.brokers_params) - - self.memory_transport = self.conf.fake_rabbit - - self.connection = None - self.do_consume = None - self.reconnect() + if self.conf.fake_rabbit: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 # FIXME(markmc): use oslo sslutils when it is available as a library _SSL_PROTOCOLS = { @@ -531,153 +534,87 @@ class Connection(object): ssl_params['cert_reqs'] = ssl.CERT_REQUIRED # Return the extended behavior or just have the default behavior - return ssl_params or True + return ssl_params or None - def _connect(self, broker): - """Connect to rabbit. Re-establish any queues that may have - been declared before if we are reconnecting. Exceptions should - be handled by the caller. + def _setup_new_channel(self, new_channel): + """Callback invoked when the kombu connection have created + a new channel, we use it the reconfigure our consumers. """ - LOG.info(_("Connecting to AMQP server on " - "%(hostname)s:%(port)d"), broker) - self.connection = kombu.connection.BrokerConnection(**broker) - self.connection_errors = self.connection.connection_errors - self.channel_errors = self.connection.channel_errors - if self.memory_transport: - # Kludge to speed up tests. - self.connection.transport.polling_interval = 0.0 - self.do_consume = True self.consumer_num = itertools.count(1) - self.connection.connect() - self.channel = self.connection.channel() - # work around 'memory' transport bug in 1.1.3 - if self.memory_transport: - self.channel._new_queue('ae.undeliver') for consumer in self.consumers: - consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - broker) + consumer.reconnect(new_channel) + + def ensure(self, error_callback, method, retry=None, + timeout_is_error=True): + """Will retry up to retry number of times. + retry = None means use the value of rabbit_max_retries + retry = -1 means to retry forever + retry = 0 means no retry + retry = N means N retries + """ + + if retry is None: + retry = self.max_retries + if retry is None or retry < 0: + retry = None + + def on_error(exc, interval): + error_callback and error_callback(exc) + + info = {'hostname': self.connection.hostname, + 'port': self.connection.port, + 'err_str': exc, 'sleep_time': interval} + + if 'Socket closed' in six.text_type(exc): + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' + ' the connection. Check login credentials:' + ' %(err_str)s'), info) + else: + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.'), info) - def _disconnect(self): - if self.connection: # XXX(nic): when reconnecting to a RabbitMQ cluster # with mirrored queues in use, the attempt to release the # connection can hang "indefinitely" somewhere deep down # in Kombu. Blocking the thread for a bit prior to # release seems to kludge around the problem where it is # otherwise reproduceable. + # TODO(sileht): Check if this is useful since we + # use kombu for HA connection, the interval_step + # should sufficient, because the underlying kombu transport + # connection object freed. if self.conf.kombu_reconnect_delay > 0: LOG.info(_("Delaying reconnect for %1.1f seconds...") % self.conf.kombu_reconnect_delay) time.sleep(self.conf.kombu_reconnect_delay) - try: - self.connection.release() - except self.connection_errors: - pass - self.connection = None - - def reconnect(self, retry=None): - """Handles reconnecting and re-establishing queues. - Will retry up to retry number of times. - retry = None means use the value of rabbit_max_retries - retry = -1 means to retry forever - retry = 0 means no retry - retry = N means N retries - Sleep between tries, starting at self.interval_start - seconds, backing off self.interval_stepping number of seconds - each attempt. - """ - - attempt = 0 - loop_forever = False - if retry is None: - retry = self.max_retries - if retry is None or retry < 0: - loop_forever = True - - while True: - self._disconnect() - - broker = six.next(self.brokers) - attempt += 1 - try: - self._connect(broker) - return - except IOError as ex: - e = ex - except self.connection_errors as ex: - e = ex - except Exception as ex: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621) - # So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in six.text_type(e): - raise - e = ex - - log_info = {} - log_info['err_str'] = e - log_info['retry'] = retry or 0 - log_info.update(broker) - - if not loop_forever and attempt > retry: - msg = _('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(retry)d ' - 'tries: %(err_str)s') % log_info - LOG.error(msg) - raise exceptions.MessageDeliveryFailure(msg) - else: - if attempt == 1: - sleep_time = self.interval_start or 1 - elif attempt > 1: - sleep_time += self.interval_stepping - - sleep_time = min(sleep_time, self.interval_max) - - log_info['sleep_time'] = sleep_time - if 'Socket closed' in six.text_type(e): - LOG.error(_('AMQP server %(hostname)s:%(port)d closed' - ' the connection. Check login credentials:' - ' %(err_str)s'), log_info) - else: - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.'), log_info) - time.sleep(sleep_time) - - def ensure(self, error_callback, method, retry=None): - while True: - try: - return method() - except self.connection_errors as e: - if error_callback: - error_callback(e) - except self.channel_errors as e: - if error_callback: - error_callback(e) - except (socket.timeout, IOError) as e: - if error_callback: - error_callback(e) - except Exception as e: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621) - # So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in six.text_type(e): - raise - if error_callback: - error_callback(e) - self.reconnect(retry=retry) - - def get_channel(self): - """Convenience call for bin/clear_rabbit_queues.""" - return self.channel + recoverable_errors = (self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + try: + autoretry_method = self.connection.autoretry( + method, channel=self.channel, + max_retries=retry, + errback=on_error, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + on_revive=self._setup_new_channel, + ) + ret, channel = autoretry_method() + self.channel = channel + return ret + except recoverable_errors as exc: + # NOTE(sileht): number of retry exceeded and the connection + # is still broken + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(retry)d ' + 'tries: %(err_str)s') % { + 'hostname': self.connection.hostname, + 'port': self.connection.port, + 'err_str': exc, + 'retry': retry} + LOG.error(msg) + raise exceptions.MessageDeliveryFailure(msg) def close(self): """Close/release this connection.""" @@ -689,10 +626,8 @@ class Connection(object): """Reset a connection so it can be used again.""" self.channel.close() self.channel = self.connection.channel() - # work around 'memory' transport bug in 1.1.3 - if self.memory_transport: - self.channel._new_queue('ae.undeliver') self.consumers = [] + self._setup_new_channel(self.channel) def declare_consumer(self, consumer_cls, topic, callback): """Create a Consumer using the class that was passed in and @@ -704,8 +639,8 @@ class Connection(object): LOG.error(_("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s"), log_info) - def _declare_consumer(): - consumer = consumer_cls(self.conf, self.channel, topic, callback, + def _declare_consumer(channel): + consumer = consumer_cls(self.conf, channel, topic, callback, six.next(self.consumer_num)) self.consumers.append(consumer) return consumer @@ -716,15 +651,11 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers.""" def _error_callback(exc): - if isinstance(exc, socket.timeout): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - else: - LOG.exception(_('Failed to consume message from queue: %s'), - exc) - self.do_consume = True + LOG.exception(_('Failed to consume message from queue: %s'), + exc) + self.do_consume = True - def _consume(): + def _consume(channel): if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout @@ -732,7 +663,11 @@ class Connection(object): queue.consume(nowait=True) queues_tail.consume(nowait=False) self.do_consume = False - return self.connection.drain_events(timeout=timeout) + try: + return self.connection.drain_events(timeout=timeout) + except socket.timeout as exc: + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() for iteration in itertools.count(0): if limit and iteration >= limit: @@ -748,8 +683,8 @@ class Connection(object): LOG.exception(_("Failed to publish message to topic " "'%(topic)s': %(err_str)s"), log_info) - def _publish(): - publisher = cls(self.conf, self.channel, topic=topic, **kwargs) + def _publish(channel): + publisher = cls(self.conf, channel, topic=topic, **kwargs) publisher.send(msg, timeout) self.ensure(_error_callback, _publish, retry=retry) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index de331ee8f..dc0d1a3f8 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -13,7 +13,6 @@ # under the License. import datetime -import operator import sys import threading import uuid @@ -21,6 +20,7 @@ import uuid import fixtures import kombu import mock +from oslotest import mockpatch import testscenarios from oslo import messaging @@ -49,87 +49,43 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='/')])), + expected=["amqp://guest:guest@localhost:5672//"])), ('empty', dict(url='rabbit:///', - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='')])), + expected=['amqp://guest:guest@localhost:5672/'])), ('localhost', dict(url='rabbit://localhost/', - expected=[dict(hostname='localhost', - port=5672, - userid='', - password='', - virtual_host='')])), + expected=['amqp://:@localhost:5672/'])), ('virtual_host', dict(url='rabbit:///vhost', - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='vhost')])), + expected=['amqp://guest:guest@localhost:5672/vhost'])), ('no_creds', dict(url='rabbit://host/virtual_host', - expected=[dict(hostname='host', - port=5672, - userid='', - password='', - virtual_host='virtual_host')])), + expected=['amqp://:@host:5672/virtual_host'])), ('no_port', dict(url='rabbit://user:password@host/virtual_host', - expected=[dict(hostname='host', - port=5672, - userid='user', - password='password', - virtual_host='virtual_host')])), + expected=['amqp://user:password@host:5672/virtual_host'])), ('full_url', dict(url='rabbit://user:password@host:10/virtual_host', - expected=[dict(hostname='host', - port=10, - userid='user', - password='password', - virtual_host='virtual_host')])), + expected=['amqp://user:password@host:10/virtual_host'])), ('full_two_url', dict(url='rabbit://user:password@host:10,' 'user2:password2@host2:12/virtual_host', - expected=[dict(hostname='host', - port=10, - userid='user', - password='password', - virtual_host='virtual_host'), - dict(hostname='host2', - port=12, - userid='user2', - password='password2', - virtual_host='virtual_host') - ] + expected=["amqp://user:password@host:10/virtual_host", + "amqp://user2:password2@host2:12/virtual_host"] )), - ] - def test_transport_url(self): - self.messaging_conf.in_memory = True + @mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure') + def test_transport_url(self, fake_ensure): + self.messaging_conf.in_memory = False transport = messaging.get_transport(self.conf, self.url) self.addCleanup(transport.cleanup) driver = transport._driver - brokers_params = driver._get_connection().brokers_params[:] - brokers_params = [dict((k, v) for k, v in broker.items() - if k not in ['transport', 'login_method']) - for broker in brokers_params] - - self.assertEqual(sorted(self.expected, - key=operator.itemgetter('hostname')), - sorted(brokers_params, - key=operator.itemgetter('hostname'))) + urls = driver._get_connection()._url.split(";") + self.assertEqual(sorted(self.expected), sorted(urls)) class TestSendReceive(test_utils.BaseTestCase): @@ -674,62 +630,38 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] self.config(rabbit_hosts=self.brokers) - hostname_sets = set() - self.info = {'attempt': 0, - 'fail': False} - - def _connect(myself, params): - # do as little work that is enough to pass connection attempt - myself.connection = kombu.connection.BrokerConnection(**params) - myself.connection_errors = myself.connection.connection_errors - - hostname = params['hostname'] - self.assertNotIn(hostname, hostname_sets) - hostname_sets.add(hostname) - - self.info['attempt'] += 1 - if self.info['fail']: - raise IOError('fake fail') - - # just make sure connection instantiation does not fail with an - # exception - self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) + self.kombu_connect = mock.Mock() + self.useFixture(mockpatch.Patch( + 'kombu.connection.Connection.connect', + side_effect=self.kombu_connect)) + self.useFixture(mockpatch.Patch( + 'kombu.connection.Connection.channel')) # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) self.connection = rabbit_driver.Connection(self.conf, url) self.addCleanup(self.connection.close) - self.info.update({'attempt': 0, - 'fail': True}) - hostname_sets.clear() - - def test_reconnect_order(self): - self.assertRaises(messaging.MessageDeliveryFailure, - self.connection.reconnect, - retry=len(self.brokers) - 1) - self.assertEqual(len(self.brokers), self.info['attempt']) - def test_ensure_four_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=4) - self.assertEqual(5, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(5, self.kombu_connect.call_count) + self.assertEqual(6, mock_callback.call_count) def test_ensure_one_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=1) - self.assertEqual(2, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(2, self.kombu_connect.call_count) + self.assertEqual(3, mock_callback.call_count) def test_ensure_no_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=0) - self.assertEqual(1, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(1, self.kombu_connect.call_count) + self.assertEqual(2, mock_callback.call_count)