Update latest openstack-common code

This fixes bug 1073999 (quantum/openstack/common/rpc/impl_qpid.py)

In addition to this the common code is updated.

Change-Id: I41223963baf34772edcd0d6d7ef5686a5fad1035
This commit is contained in:
Gary Kotton 2012-11-08 21:16:01 +00:00
parent 9fb4563e7d
commit 8eb8327151
10 changed files with 52 additions and 81 deletions

View File

@ -236,7 +236,7 @@ log files:
This module also contains a global instance of the CommonConfigOpts class This module also contains a global instance of the CommonConfigOpts class
in order to support a common usage pattern in OpenStack: in order to support a common usage pattern in OpenStack:
from openstack.common import cfg from quantum.openstack.common import cfg
opts = [ opts = [
cfg.StrOpt('bind_host', default='0.0.0.0'), cfg.StrOpt('bind_host', default='0.0.0.0'),

View File

@ -22,18 +22,6 @@ Exceptions common to OpenStack projects
import logging import logging
class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
description, cmd, exit_code, stdout, stderr)
IOError.__init__(self, message)
class Error(Exception): class Error(Exception):
def __init__(self, message=None): def __init__(self, message=None):
super(Error, self).__init__(message) super(Error, self).__init__(message)

View File

@ -20,7 +20,7 @@ gettext for openstack-common modules.
Usual usage in an openstack.common module: Usual usage in an openstack.common module:
from openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
""" """
import gettext import gettext

View File

@ -257,7 +257,7 @@ class JSONFormatter(logging.Formatter):
class PublishErrorsHandler(logging.Handler): class PublishErrorsHandler(logging.Handler):
def emit(self, record): def emit(self, record):
if ('openstack.common.notifier.log_notifier' in if ('quantum.openstack.common.notifier.log_notifier' in
CONF.notification_driver): CONF.notification_driver):
return return
notifier.api.notify(None, 'error.publisher', notifier.api.notify(None, 'error.publisher',

View File

@ -250,7 +250,7 @@ def queue_get_for(context, topic, host):
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>. <host>.
""" """
return '%s.%s' % (topic, host) return '%s.%s' % (topic, host) if host else topic
_RPCIMPL = None _RPCIMPL = None

View File

@ -267,6 +267,7 @@ class FanoutConsumer(ConsumerBase):
# Default options # Default options
options = {'durable': False, options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': True}
options.update(kwargs) options.update(kwargs)
@ -408,18 +409,18 @@ class Connection(object):
hostname, port = network_utils.parse_host_port( hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port) adr, default_port=self.conf.rabbit_port)
params = {} params = {
'hostname': hostname,
'port': port,
'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password,
'virtual_host': self.conf.rabbit_virtual_host,
}
for sp_key, value in server_params.iteritems(): for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key) p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value params[p_key] = value
params.setdefault('hostname', hostname)
params.setdefault('port', port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.fake_rabbit: if self.conf.fake_rabbit:
params['transport'] = 'memory' params['transport'] = 'memory'
if self.conf.rabbit_use_ssl: if self.conf.rabbit_use_ssl:
@ -776,7 +777,7 @@ def cast_to_server(conf, context, server_params, topic, msg):
def fanout_cast_to_server(conf, context, server_params, topic, msg): def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server.""" """Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server( return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg, conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))

View File

@ -50,24 +50,6 @@ qpid_opts = [
cfg.StrOpt('qpid_sasl_mechanisms', cfg.StrOpt('qpid_sasl_mechanisms',
default='', default='',
help='Space separated list of SASL mechanisms to use for auth'), help='Space separated list of SASL mechanisms to use for auth'),
cfg.BoolOpt('qpid_reconnect',
default=True,
help='Automatically reconnect'),
cfg.IntOpt('qpid_reconnect_timeout',
default=0,
help='Reconnection timeout in seconds'),
cfg.IntOpt('qpid_reconnect_limit',
default=0,
help='Max reconnections before giving up'),
cfg.IntOpt('qpid_reconnect_interval_min',
default=0,
help='Minimum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval_max',
default=0,
help='Maximum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval',
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat', cfg.IntOpt('qpid_heartbeat',
default=60, default=60,
help='Seconds between connection keepalive heartbeats'), help='Seconds between connection keepalive heartbeats'),
@ -294,50 +276,36 @@ class Connection(object):
self.consumer_thread = None self.consumer_thread = None
self.conf = conf self.conf = conf
if server_params is None: params = {
server_params = {} 'hostname': self.conf.qpid_hostname,
'port': self.conf.qpid_port,
default_params = dict(hostname=self.conf.qpid_hostname, 'username': self.conf.qpid_username,
port=self.conf.qpid_port, 'password': self.conf.qpid_password,
username=self.conf.qpid_username, }
password=self.conf.qpid_password) params.update(server_params or {})
params = server_params
for key in default_params.keys():
params.setdefault(key, default_params[key])
self.broker = params['hostname'] + ":" + str(params['port']) self.broker = params['hostname'] + ":" + str(params['port'])
self.username = params['username']
self.password = params['password']
self.connection_create()
self.reconnect()
def connection_create(self):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker) self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
self.connection.username = params['username'] self.connection.username = self.username
self.connection.password = params['password'] self.connection.password = self.password
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
self.connection.reconnect = self.conf.qpid_reconnect # Reconnection is done by self.reconnect()
if self.conf.qpid_reconnect_timeout: self.connection.reconnect = False
self.connection.reconnect_timeout = (
self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
self.reconnect()
def _register_consumer(self, consumer): def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer self.consumers[str(consumer.get_receiver())] = consumer
@ -352,12 +320,18 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError: except qpid.messaging.exceptions.ConnectionError:
pass pass
delay = 1
while True: while True:
try: try:
self.connection_create()
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid.messaging.exceptions.ConnectionError, e:
LOG.error(_('Unable to connect to AMQP server: %s'), e) msg_dict = dict(e=e, delay=delay)
time.sleep(self.conf.qpid_reconnect_interval or 1) msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
LOG.error(msg)
time.sleep(delay)
delay = min(2 * delay, 60)
else: else:
break break
@ -365,10 +339,14 @@ class Connection(object):
self.session = self.connection.session() self.session = self.connection.session()
for consumer in self.consumers.itervalues():
consumer.reconnect(self.session)
if self.consumers: if self.consumers:
consumers = self.consumers
self.consumers = {}
for consumer in consumers.itervalues():
consumer.reconnect(self.session)
self._register_consumer(consumer)
LOG.debug(_("Re-established AMQP queues")) LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs): def ensure(self, error_callback, method, *args, **kwargs):

View File

@ -546,7 +546,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies. # The msg_id is used to track replies.
msg_id = str(uuid.uuid4().hex) msg_id = uuid.uuid4().hex
# Replies always come into the reply service. # Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host

View File

@ -47,7 +47,7 @@ class Thread(object):
self.thread.link(_thread_done, group=group, thread=self) self.thread.link(_thread_done, group=group, thread=self)
def stop(self): def stop(self):
self.thread.cancel() self.thread.kill()
def wait(self): def wait(self):
return self.thread.wait() return self.thread.wait()

View File

@ -22,6 +22,10 @@ UUID related utilities and helper functions.
import uuid import uuid
def generate_uuid():
return str(uuid.uuid4())
def is_uuid_like(val): def is_uuid_like(val):
"""Returns validation of a value as a UUID. """Returns validation of a value as a UUID.