diff --git a/ceilometer/openstack/common/config/generator.py b/ceilometer/openstack/common/config/generator.py index 1f73956f..34d50cee 100755 --- a/ceilometer/openstack/common/config/generator.py +++ b/ceilometer/openstack/common/config/generator.py @@ -205,6 +205,7 @@ def _print_opt(opt): opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help if not opt_help: sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name) + opt_help = "" opt_type = None try: opt_type = OPTION_REGEX.search(str(type(opt))).group(0) diff --git a/ceilometer/openstack/common/db/sqlalchemy/session.py b/ceilometer/openstack/common/db/sqlalchemy/session.py index efc460dd..8e4f00a3 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/session.py +++ b/ceilometer/openstack/common/db/sqlalchemy/session.py @@ -260,8 +260,6 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import timeutils -DEFAULT = 'DEFAULT' - sqlite_db_opts = [ cfg.StrOpt('sqlite_db', default='ceilometer.sqlite', @@ -278,8 +276,10 @@ database_opts = [ '../', '$sqlite_db')), help='The SQLAlchemy connection string used to connect to the ' 'database', - deprecated_name='sql_connection', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE')], secret=True), cfg.StrOpt('slave_connection', default='', @@ -288,56 +288,71 @@ database_opts = [ secret=True), cfg.IntOpt('idle_timeout', default=3600, - deprecated_name='sql_idle_timeout', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE')], help='timeout before idle sql connections are reaped'), cfg.IntOpt('min_pool_size', default=1, - deprecated_name='sql_min_pool_size', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', + group='DATABASE')], help='Minimum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_pool_size', default=None, - deprecated_name='sql_max_pool_size', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', + group='DATABASE')], help='Maximum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_retries', default=10, - deprecated_name='sql_max_retries', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', + group='DATABASE')], help='maximum db connection retries during startup. ' '(setting -1 implies an infinite retry count)'), cfg.IntOpt('retry_interval', default=10, - deprecated_name='sql_retry_interval', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', + group='DATABASE')], help='interval between retries of opening a sql connection'), cfg.IntOpt('max_overflow', default=None, - deprecated_name='sql_max_overflow', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', + group='DATABASE')], help='If set, use this value for max_overflow with sqlalchemy'), cfg.IntOpt('connection_debug', default=0, - deprecated_name='sql_connection_debug', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], help='Verbosity of SQL debugging information. 0=None, ' '100=Everything'), cfg.BoolOpt('connection_trace', default=False, - deprecated_name='sql_connection_trace', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], help='Add python stack traces to SQL as comment strings'), cfg.IntOpt('pool_timeout', default=None, + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], help='If set, use this value for pool_timeout with sqlalchemy'), ] CONF = cfg.CONF CONF.register_opts(sqlite_db_opts) CONF.register_opts(database_opts, 'database') + LOG = logging.getLogger(__name__) _ENGINE = None diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index fce39595..24a65c20 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -22,6 +22,7 @@ Exception related utilities. import contextlib import logging import sys +import time import traceback from ceilometer.openstack.common.gettextutils import _ @@ -49,3 +50,33 @@ def save_and_reraise_exception(): traceback.format_exception(type_, value, tb)) raise raise type_, value, tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/ceilometer/openstack/common/network_utils.py b/ceilometer/openstack/common/network_utils.py index afefd17d..dbed1ceb 100644 --- a/ceilometer/openstack/common/network_utils.py +++ b/ceilometer/openstack/common/network_utils.py @@ -19,10 +19,7 @@ Network-related utilities and helper functions. """ -from ceilometer.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) +import urlparse def parse_host_port(address, default_port=None): @@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None): port = default_port return (host, None if port is None else int(port)) + + +def urlsplit(url, scheme='', allow_fragments=True): + """Parse a URL using urlparse.urlsplit(), splitting query and fragments. + This function papers over Python issue9374 when needed. + + The parameters are the same as urlparse.urlsplit. + """ + scheme, netloc, path, query, fragment = urlparse.urlsplit( + url, scheme, allow_fragments) + if allow_fragments and '#' in path: + path, fragment = path.split('#', 1) + if '?' in path: + path, query = path.split('?', 1) + return urlparse.SplitResult(scheme, netloc, path, query, fragment) diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index e9702c53..4b746b94 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -221,12 +221,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, failure = rpc_common.serialize_remote_exception(failure, log_failure) - try: - msg = {'result': reply, 'failure': failure} - except TypeError: - msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + msg = {'result': reply, 'failure': failure} if ending: msg['ending'] = True _add_unique_id(msg) diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index 9e7e04e3..d9209ef2 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import network_utils from ceilometer.openstack.common.rpc import amqp as rpc_amqp @@ -748,6 +749,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 24ecbe6a..7a1fd4a2 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils @@ -118,10 +119,17 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.reconnect(session) + self.connect(session) + + def connect(self, session): + """Declare the reciever on connect.""" + self._declare_receiver(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -158,6 +166,9 @@ class ConsumerBase(object): def get_receiver(self): return self.receiver + def get_node_name(self): + return self.address.split(';')[0] + class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -207,6 +218,7 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ + self.conf = conf super(FanoutConsumer, self).__init__( session, callback, @@ -215,6 +227,18 @@ class FanoutConsumer(ConsumerBase): "%s_fanout_%s" % (topic, uuid.uuid4().hex), {"exclusive": True}) + def reconnect(self, session): + topic = self.get_node_name() + params = { + 'session': session, + 'topic': topic, + 'callback': self.callback, + } + + self.__init__(conf=self.conf, **params) + + super(FanoutConsumer, self).reconnect(session) + class Publisher(object): """Base Publisher class.""" @@ -576,6 +600,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/requirements.txt b/requirements.txt index e94746de..3cc449e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,5 +22,5 @@ lxml requests>=1.1,<1.2.1 wsme>=0.5b2 pyyaml -oslo.config>=1.1.0 +http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config-1.2.0a2 happybase>=0.4