Update Oslo

Change-Id: Ia657b83f6d1b9f1fe887c6813834efef04c0cbf2
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-06-27 15:41:07 +02:00
parent 132c85e075
commit afba75f61b
8 changed files with 113 additions and 32 deletions

View File

@ -205,6 +205,7 @@ def _print_opt(opt):
opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help
if not opt_help:
sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name)
opt_help = ""
opt_type = None
try:
opt_type = OPTION_REGEX.search(str(type(opt))).group(0)

View File

@ -260,8 +260,6 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import timeutils
DEFAULT = 'DEFAULT'
sqlite_db_opts = [
cfg.StrOpt('sqlite_db',
default='ceilometer.sqlite',
@ -278,8 +276,10 @@ database_opts = [
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
deprecated_name='sql_connection',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE')],
secret=True),
cfg.StrOpt('slave_connection',
default='',
@ -288,56 +288,71 @@ database_opts = [
secret=True),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_name='sql_idle_timeout',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE')],
help='timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_name='sql_min_pool_size',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_name='sql_max_pool_size',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_name='sql_max_retries',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_name='sql_retry_interval',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_name='sql_max_overflow',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_name='sql_connection_debug',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_name='sql_connection_trace',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
]
CONF = cfg.CONF
CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__)
_ENGINE = None

View File

@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
import time
import traceback
from ceilometer.openstack.common.gettextutils import _
@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -19,10 +19,7 @@
Network-related utilities and helper functions.
"""
from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__)
import urlparse
def parse_host_port(address, default_port=None):
@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None):
port = default_port
return (host, None if port is None else int(port))
def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
This function papers over Python issue9374 when needed.
The parameters are the same as urlparse.urlsplit.
"""
scheme, netloc, path, query, fragment = urlparse.urlsplit(
url, scheme, allow_fragments)
if allow_fragments and '#' in path:
path, fragment = path.split('#', 1)
if '?' in path:
path, query = path.split('?', 1)
return urlparse.SplitResult(scheme, netloc, path, query, fragment)

View File

@ -221,12 +221,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try:
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)

View File

@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common.rpc import amqp as rpc_amqp
@ -748,6 +749,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()

View File

@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
@ -158,6 +166,9 @@ class ConsumerBase(object):
def get_receiver(self):
return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
@ -207,6 +218,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
@ -215,6 +227,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object):
"""Base Publisher class."""
@ -576,6 +600,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()

View File

@ -22,5 +22,5 @@ lxml
requests>=1.1,<1.2.1
wsme>=0.5b2
pyyaml
oslo.config>=1.1.0
http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config-1.2.0a2
happybase>=0.4