diff --git a/nova/openstack/common/network_utils.py b/nova/openstack/common/network_utils.py new file mode 100644 index 000000000..69f673216 --- /dev/null +++ b/nova/openstack/common/network_utils.py @@ -0,0 +1,68 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network-related utilities and helper functions. +""" + +import logging + +LOG = logging.getLogger(__name__) + + +def parse_host_port(address, default_port=None): + """ + Interpret a string as a host:port pair. + An IPv6 address MUST be escaped if accompanied by a port, + because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334 + means both [2001:db8:85a3::8a2e:370:7334] and + [2001:db8:85a3::8a2e:370]:7334. + + >>> parse_host_port('server01:80') + ('server01', 80) + >>> parse_host_port('server01') + ('server01', None) + >>> parse_host_port('server01', default_port=1234) + ('server01', 1234) + >>> parse_host_port('[::1]:80') + ('::1', 80) + >>> parse_host_port('[::1]') + ('::1', None) + >>> parse_host_port('[::1]', default_port=1234) + ('::1', 1234) + >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) + ('2001:db8:85a3::8a2e:370:7334', 1234) + + """ + if address[0] == '[': + # Escaped ipv6 + _host, _port = address[1:].split(']') + host = _host + if ':' in _port: + port = _port.split(':')[1] + else: + port = default_port + else: + if address.count(':') == 1: + host, port = address.split(':') + else: + # 0 means ipv4, >1 means ipv6. + # We prohibit unescaped ipv6 addresses with port. + host = address + port = default_port + + return (host, None if port is None else int(port)) diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index fff1ed9ce..979ad88bd 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import common as rpc_common +from nova.openstack.common import network_utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='The RabbitMQ broker address where a single node is used'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='The RabbitMQ broker port where a single node is used'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -207,6 +230,7 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -331,6 +355,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -343,7 +368,8 @@ class NotifyPublisher(TopicPublisher): exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) queue.declare() @@ -368,31 +394,37 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = network_utils.parse_host_port( + adr, default_port=self.conf.rabbit_port) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list + + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -422,14 +454,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -437,7 +469,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -450,8 +482,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -464,11 +496,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -483,12 +516,12 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + LOG.error(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -502,9 +535,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -512,7 +545,8 @@ class Connection(object): try: return method(*args, **kwargs) except (self.connection_errors, socket.timeout, IOError), e: - pass + if error_callback: + error_callback(e) except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport @@ -522,8 +556,8 @@ class Connection(object): # and try to reconnect in this case. if 'timeout' not in str(e): raise - if error_callback: - error_callback(e) + if error_callback: + error_callback(e) self.reconnect() def get_channel(self): diff --git a/openstack-common.conf b/openstack-common.conf index 7014ff9f4..398bfb4ae 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,notifier,plugin,policy,setup,timeutils,rpc +modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc # The base module to hold the copy of openstack.common base=nova