Support for several HA RabbitMQ servers.

Port from openstack-common: https://review.openstack.org/#/c/10305/

Change-Id: Ib44abf115b42c3df42771344f6722ce1db043bbd
This commit is contained in:
Eugene Kirpichov 2012-09-25 20:33:28 +00:00
parent 0a410e71bf
commit 83fa8ce3e8
3 changed files with 141 additions and 39 deletions

View File

@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network-related utilities and helper functions.
"""
import logging
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None):
"""
Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
"""
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))

View File

@ -33,6 +33,7 @@ from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import network_utils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')), '(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host', cfg.StrOpt('rabbit_host',
default='localhost', default='localhost',
help='the RabbitMQ host'), help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port', cfg.IntOpt('rabbit_port',
default=5672, default=5672,
help='the RabbitMQ port'), help='The RabbitMQ broker port where a single node is used'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl', cfg.BoolOpt('rabbit_use_ssl',
default=False, default=False,
help='connect over SSL for RabbitMQ'), help='connect over SSL for RabbitMQ'),
@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues', cfg.BoolOpt('rabbit_durable_queues',
default=False, default=False,
help='use durable queues in RabbitMQ'), help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
'You need to wipe RabbitMQ database when '
'changing this option.'),
] ]
@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG LOG = rpc_common.LOG
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
as described here:
http://www.rabbitmq.com/ha.html
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
"""
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -207,6 +230,7 @@ class TopicConsumer(ConsumerBase):
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
@ -331,6 +355,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel): def reconnect(self, channel):
@ -343,7 +368,8 @@ class NotifyPublisher(TopicPublisher):
exchange=self.exchange, exchange=self.exchange,
durable=self.durable, durable=self.durable,
name=self.routing_key, name=self.routing_key,
routing_key=self.routing_key) routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare() queue.declare()
@ -368,31 +394,37 @@ class Connection(object):
if server_params is None: if server_params is None:
server_params = {} server_params = {}
# Keys to translate from server_params to kombu params # Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'} server_params_to_kombu_params = {'username': 'userid'}
params = {} ssl_params = self._fetch_ssl_params()
for sp_key, value in server_params.iteritems(): params_list = []
p_key = server_params_to_kombu_params.get(sp_key, sp_key) for adr in self.conf.rabbit_hosts:
params[p_key] = value hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
params.setdefault('hostname', self.conf.rabbit_host) params = {}
params.setdefault('port', self.conf.rabbit_port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
self.params = params for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
if self.conf.fake_rabbit: params.setdefault('hostname', hostname)
self.params['transport'] = 'memory' params.setdefault('port', port)
self.memory_transport = True params.setdefault('userid', self.conf.rabbit_userid)
else: params.setdefault('password', self.conf.rabbit_password)
self.memory_transport = False params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.rabbit_use_ssl: if self.conf.fake_rabbit:
self.params['ssl'] = self._fetch_ssl_params() params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
params_list.append(params)
self.params_list = params_list
self.memory_transport = self.conf.fake_rabbit
self.connection = None self.connection = None
self.reconnect() self.reconnect()
@ -422,14 +454,14 @@ class Connection(object):
# Return the extended behavior # Return the extended behavior
return ssl_params return ssl_params
def _connect(self): def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have """Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should been declared before if we are reconnecting. Exceptions should
be handled by the caller. be handled by the caller.
""" """
if self.connection: if self.connection:
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.close()
except self.connection_errors: except self.connection_errors:
@ -437,7 +469,7 @@ class Connection(object):
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet. # it shouldn't be doing any network operations, yet.
self.connection = None self.connection = None
self.connection = kombu.connection.BrokerConnection(**self.params) self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors self.connection_errors = self.connection.connection_errors
if self.memory_transport: if self.memory_transport:
# Kludge to speed up tests. # Kludge to speed up tests.
@ -450,8 +482,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver') self.channel._new_queue('ae.undeliver')
for consumer in self.consumers: for consumer in self.consumers:
consumer.reconnect(self.channel) consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
self.params) params)
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing queues. """Handles reconnecting and re-establishing queues.
@ -464,11 +496,12 @@ class Connection(object):
attempt = 0 attempt = 0
while True: while True:
params = self.params_list[attempt % len(self.params_list)]
attempt += 1 attempt += 1
try: try:
self._connect() self._connect(params)
return return
except (self.connection_errors, IOError), e: except (IOError, self.connection_errors) as e:
pass pass
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
@ -483,12 +516,12 @@ class Connection(object):
log_info = {} log_info = {}
log_info['err_str'] = str(e) log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries log_info['max_retries'] = self.max_retries
log_info.update(self.params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on ' LOG.error(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's # NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we # really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore. # need to consume on, we have no way to consume anymore.
@ -502,9 +535,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max) sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
' unreachable: %(err_str)s. Trying again in ' 'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info) '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time) time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs): def ensure(self, error_callback, method, *args, **kwargs):
@ -512,7 +545,8 @@ class Connection(object):
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e: except (self.connection_errors, socket.timeout, IOError), e:
pass if error_callback:
error_callback(e)
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport # to return an error not covered by its transport
@ -522,8 +556,8 @@ class Connection(object):
# and try to reconnect in this case. # and try to reconnect in this case.
if 'timeout' not in str(e): if 'timeout' not in str(e):
raise raise
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
def get_channel(self): def get_channel(self):

View File

@ -1,7 +1,7 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,notifier,plugin,policy,setup,timeutils,rpc modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=nova base=nova