diff --git a/ceilometer/openstack/common/config/__init__.py b/ceilometer/openstack/common/config/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/conf/extract_opts.py b/ceilometer/openstack/common/config/generator.py similarity index 93% rename from tools/conf/extract_opts.py rename to ceilometer/openstack/common/config/generator.py index 1ad162bf7e..1ef56dd287 100755 --- a/tools/conf/extract_opts.py +++ b/ceilometer/openstack/common/config/generator.py @@ -18,10 +18,8 @@ # # @author: Zhongyue Luo, SINA Corporation. # - """Extracts OpenStack config option info from module(s).""" -import gettext import imp import os import re @@ -31,9 +29,10 @@ import textwrap from oslo.config import cfg +from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common import importutils -gettext.install('ceilometer', unicode=1) +gettextutils.install('ceilometer') STROPT = "StrOpt" BOOLOPT = "BoolOpt" @@ -61,7 +60,7 @@ BASEDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) WORDWRAP_WIDTH = 60 -def main(srcfiles): +def generate(srcfiles): mods_by_pkg = dict() for filepath in srcfiles: pkg_name = filepath.split(os.sep)[1] @@ -107,16 +106,14 @@ def _import_module(mod_str): return sys.modules[mod_str[4:]] else: return importutils.import_module(mod_str) - except (ValueError, AttributeError), err: - return None - except ImportError, ie: + except ImportError as ie: sys.stderr.write("%s\n" % str(ie)) return None - except Exception, e: + except Exception: return None -def _is_in_group (opt, group): +def _is_in_group(opt, group): "Check if opt is in group." for key, value in group._opts.items(): if value['opt'] == opt: @@ -135,7 +132,11 @@ def _guess_groups(opt, mod_obj): if _is_in_group(opt, value._group): return value._group.name - raise RuntimeError("Unable to find group for option %s" % opt.name) + raise RuntimeError( + "Unable to find group for option %s, " + "maybe it's defined twice in the same group?" + % opt.name + ) def _list_opts(obj): @@ -193,7 +194,7 @@ def _sanitize_default(s): elif s == _get_my_ip(): return '10.0.0.1' elif s == socket.getfqdn(): - return 'nova' + return 'ceilometer' elif s.strip() != s: return '"%s"' % s return s @@ -242,8 +243,11 @@ def _print_opt(opt): sys.exit(1) -if __name__ == '__main__': +def main(): if len(sys.argv) < 2: - print "usage: python %s [srcfile]...\n" % sys.argv[0] + print "usage: %s [srcfile]...\n" % sys.argv[0] sys.exit(0) - main(sys.argv[1:]) + generate(sys.argv[1:]) + +if __name__ == '__main__': + main() diff --git a/ceilometer/openstack/common/db/api.py b/ceilometer/openstack/common/db/api.py index c3f1e2c6f3..e39e18bd0b 100644 --- a/ceilometer/openstack/common/db/api.py +++ b/ceilometer/openstack/common/db/api.py @@ -19,8 +19,9 @@ Supported configuration options: -`db_backend`: DB backend name or full module path to DB backend module. -`dbapi_use_tpool`: Enable thread pooling of DB API calls. +The following two parameters are in the 'database' group: +`backend`: DB backend name or full module path to DB backend module. +`use_tpool`: Enable thread pooling of DB API calls. A DB backend module should implement a method named 'get_backend' which takes no arguments. The method can return any object that implements DB @@ -44,17 +45,21 @@ from ceilometer.openstack.common import lockutils db_opts = [ - cfg.StrOpt('db_backend', + cfg.StrOpt('backend', default='sqlalchemy', + deprecated_name='db_backend', + deprecated_group='DEFAULT', help='The backend to use for db'), - cfg.BoolOpt('dbapi_use_tpool', + cfg.BoolOpt('use_tpool', default=False, + deprecated_name='dbapi_use_tpool', + deprecated_group='DEFAULT', help='Enable the experimental use of thread pooling for ' 'all DB API calls') ] CONF = cfg.CONF -CONF.register_opts(db_opts) +CONF.register_opts(db_opts, 'database') class DBAPI(object): @@ -75,8 +80,8 @@ class DBAPI(object): if self.__backend: # Another thread assigned it return self.__backend - backend_name = CONF.db_backend - self.__use_tpool = CONF.dbapi_use_tpool + backend_name = CONF.database.backend + self.__use_tpool = CONF.database.use_tpool if self.__use_tpool: from eventlet import tpool self.__tpool = tpool diff --git a/ceilometer/openstack/common/db/sqlalchemy/models.py b/ceilometer/openstack/common/db/sqlalchemy/models.py index 8e3d333fe4..e98fa14bac 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/models.py +++ b/ceilometer/openstack/common/db/sqlalchemy/models.py @@ -33,9 +33,6 @@ from ceilometer.openstack.common import timeutils class ModelBase(object): """Base class for models.""" __table_initialized__ = False - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) - metadata = None def save(self, session=None): """Save this object.""" @@ -92,6 +89,11 @@ class ModelBase(object): return local.iteritems() +class TimestampMixin(object): + created_at = Column(DateTime, default=timeutils.utcnow) + updated_at = Column(DateTime, onupdate=timeutils.utcnow) + + class SoftDeleteMixin(object): deleted_at = Column(DateTime) deleted = Column(Integer, default=0) diff --git a/ceilometer/openstack/common/db/sqlalchemy/session.py b/ceilometer/openstack/common/db/sqlalchemy/session.py index 60e784aca7..e3a7812166 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/session.py +++ b/ceilometer/openstack/common/db/sqlalchemy/session.py @@ -247,8 +247,10 @@ import time from eventlet import greenthread from oslo.config import cfg +import six from sqlalchemy import exc as sqla_exc import sqlalchemy.interfaces +from sqlalchemy.interfaces import PoolListener import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column @@ -258,53 +260,76 @@ from ceilometer.openstack.common import log as logging from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import timeutils +DEFAULT = 'DEFAULT' -sql_opts = [ - cfg.StrOpt('sql_connection', +sqlite_db_opts = [ + cfg.StrOpt('sqlite_db', + default='ceilometer.sqlite', + help='the filename to use with sqlite'), + cfg.BoolOpt('sqlite_synchronous', + default=True, + help='If true, use synchronous mode for sqlite'), +] + +database_opts = [ + cfg.StrOpt('connection', default='sqlite:///' + os.path.abspath(os.path.join(os.path.dirname(__file__), '../', '$sqlite_db')), help='The SQLAlchemy connection string used to connect to the ' 'database', + deprecated_name='sql_connection', + deprecated_group=DEFAULT, secret=True), - cfg.StrOpt('sqlite_db', - default='ceilometer.sqlite', - help='the filename to use with sqlite'), - cfg.IntOpt('sql_idle_timeout', + cfg.IntOpt('idle_timeout', default=3600, + deprecated_name='sql_idle_timeout', + deprecated_group=DEFAULT, help='timeout before idle sql connections are reaped'), - cfg.BoolOpt('sqlite_synchronous', - default=True, - help='If passed, use synchronous mode for sqlite'), - cfg.IntOpt('sql_min_pool_size', + cfg.IntOpt('min_pool_size', default=1, + deprecated_name='sql_min_pool_size', + deprecated_group=DEFAULT, help='Minimum number of SQL connections to keep open in a ' 'pool'), - cfg.IntOpt('sql_max_pool_size', + cfg.IntOpt('max_pool_size', default=5, + deprecated_name='sql_max_pool_size', + deprecated_group=DEFAULT, help='Maximum number of SQL connections to keep open in a ' 'pool'), - cfg.IntOpt('sql_max_retries', + cfg.IntOpt('max_retries', default=10, + deprecated_name='sql_max_retries', + deprecated_group=DEFAULT, help='maximum db connection retries during startup. ' '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('sql_retry_interval', + cfg.IntOpt('retry_interval', default=10, + deprecated_name='sql_retry_interval', + deprecated_group=DEFAULT, help='interval between retries of opening a sql connection'), - cfg.IntOpt('sql_max_overflow', + cfg.IntOpt('max_overflow', default=None, + deprecated_name='sql_max_overflow', + deprecated_group=DEFAULT, help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('sql_connection_debug', + cfg.IntOpt('connection_debug', default=0, + deprecated_name='sql_connection_debug', + deprecated_group=DEFAULT, help='Verbosity of SQL debugging information. 0=None, ' '100=Everything'), - cfg.BoolOpt('sql_connection_trace', + cfg.BoolOpt('connection_trace', default=False, + deprecated_name='sql_connection_trace', + deprecated_group=DEFAULT, help='Add python stack traces to SQL as comment strings'), ] CONF = cfg.CONF -CONF.register_opts(sql_opts) +CONF.register_opts(sqlite_db_opts) +CONF.register_opts(database_opts, 'database') LOG = logging.getLogger(__name__) _ENGINE = None @@ -313,17 +338,42 @@ _MAKER = None def set_defaults(sql_connection, sqlite_db): """Set defaults for configuration variables.""" - cfg.set_defaults(sql_opts, - sql_connection=sql_connection, + cfg.set_defaults(database_opts, + connection=sql_connection) + cfg.set_defaults(sqlite_db_opts, sqlite_db=sqlite_db) -def get_session(autocommit=True, expire_on_commit=False): +def cleanup(): + global _ENGINE, _MAKER + + if _MAKER: + _MAKER.close_all() + _MAKER = None + if _ENGINE: + _ENGINE.dispose() + _ENGINE = None + + +class SqliteForeignKeysListener(PoolListener): + """ + Ensures that the foreign key constraints are enforced in SQLite. + + The foreign key constraints are disabled by default in SQLite, + so the foreign key constraints will be enabled here for every + database connection + """ + def connect(self, dbapi_con, con_record): + dbapi_con.execute('pragma foreign_keys=ON') + + +def get_session(autocommit=True, expire_on_commit=False, + sqlite_fk=False): """Return a SQLAlchemy session.""" global _MAKER if _MAKER is None: - engine = get_engine() + engine = get_engine(sqlite_fk=sqlite_fk) _MAKER = get_maker(engine, autocommit, expire_on_commit) session = _MAKER() @@ -355,21 +405,22 @@ _DUP_KEY_RE_DB = { } -def raise_if_duplicate_entry_error(integrity_error, engine_name): +def _raise_if_duplicate_entry_error(integrity_error, engine_name): """ In this function will be raised DBDuplicateEntry exception if integrity error wrap unique constraint violation. """ def get_columns_from_uniq_cons_or_name(columns): - # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3" - # means that columns c1, c2, c3 are in UniqueConstraint. + # note(vsergeyev): UniqueConstraint name convention: "uniq_t$c1$c2" + # where `t` it is table name and columns `c1`, `c2` + # are in UniqueConstraint. uniqbase = "uniq_" if not columns.startswith(uniqbase): if engine_name == "postgresql": return [columns[columns.index("_") + 1:columns.rindex("_")]] return [columns] - return columns[len(uniqbase):].split("_x_") + return columns[len(uniqbase):].split("$")[1:] if engine_name not in ["mysql", "sqlite", "postgresql"]: return @@ -397,7 +448,7 @@ _DEADLOCK_RE_DB = { } -def raise_if_deadlock_error(operational_error, engine_name): +def _raise_if_deadlock_error(operational_error, engine_name): """ Raise DBDeadlock exception if OperationalError contains a Deadlock condition. @@ -411,7 +462,7 @@ def raise_if_deadlock_error(operational_error, engine_name): raise exception.DBDeadlock(operational_error) -def wrap_db_error(f): +def _wrap_db_error(f): def _wrap(*args, **kwargs): try: return f(*args, **kwargs) @@ -420,49 +471,50 @@ def wrap_db_error(f): # note(boris-42): We should catch unique constraint violation and # wrap it by our own DBDuplicateEntry exception. Unique constraint # violation is wrapped by IntegrityError. - except sqla_exc.OperationalError, e: - raise_if_deadlock_error(e, get_engine().name) + except sqla_exc.OperationalError as e: + _raise_if_deadlock_error(e, get_engine().name) # NOTE(comstud): A lot of code is checking for OperationalError # so let's not wrap it for now. raise - except sqla_exc.IntegrityError, e: + except sqla_exc.IntegrityError as e: # note(boris-42): SqlAlchemy doesn't unify errors from different # DBs so we must do this. Also in some tables (for example # instance_types) there are more than one unique constraint. This # means we should get names of columns, which values violate # unique constraint, from error message. - raise_if_duplicate_entry_error(e, get_engine().name) + _raise_if_duplicate_entry_error(e, get_engine().name) raise exception.DBError(e) - except Exception, e: + except Exception as e: LOG.exception(_('DB exception wrapped.')) raise exception.DBError(e) _wrap.func_name = f.func_name return _wrap -def get_engine(): +def get_engine(sqlite_fk=False): """Return a SQLAlchemy engine.""" global _ENGINE if _ENGINE is None: - _ENGINE = create_engine(CONF.sql_connection) + _ENGINE = create_engine(CONF.database.connection, + sqlite_fk=sqlite_fk) return _ENGINE -def synchronous_switch_listener(dbapi_conn, connection_rec): +def _synchronous_switch_listener(dbapi_conn, connection_rec): """Switch sqlite connections to non-synchronous mode.""" dbapi_conn.execute("PRAGMA synchronous = OFF") -def add_regexp_listener(dbapi_con, con_record): +def _add_regexp_listener(dbapi_con, con_record): """Add REGEXP function to sqlite connections.""" def regexp(expr, item): reg = re.compile(expr) - return reg.search(unicode(item)) is not None + return reg.search(six.text_type(item)) is not None dbapi_con.create_function('regexp', 2, regexp) -def greenthread_yield(dbapi_con, con_record): +def _greenthread_yield(dbapi_con, con_record): """ Ensure other greenthreads get a chance to execute by forcing a context switch. With common database backends (eg MySQLdb and sqlite), there is @@ -472,7 +524,7 @@ def greenthread_yield(dbapi_con, con_record): greenthread.sleep(0) -def ping_listener(dbapi_conn, connection_rec, connection_proxy): +def _ping_listener(dbapi_conn, connection_rec, connection_proxy): """ Ensures that MySQL connections checked out of the pool are alive. @@ -482,7 +534,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy): """ try: dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError, ex: + except dbapi_conn.OperationalError as ex: if ex.args[0] in (2006, 2013, 2014, 2045, 2055): LOG.warn(_('Got mysql server has gone away: %s'), ex) raise sqla_exc.DisconnectionError("Database server went away") @@ -490,7 +542,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy): raise -def is_db_connection_error(args): +def _is_db_connection_error(args): """Return True if error in connecting to db.""" # NOTE(adam_g): This is currently MySQL specific and needs to be extended # to support Postgres and others. @@ -501,56 +553,58 @@ def is_db_connection_error(args): return False -def create_engine(sql_connection): +def create_engine(sql_connection, sqlite_fk=False): """Return a new SQLAlchemy engine.""" connection_dict = sqlalchemy.engine.url.make_url(sql_connection) engine_args = { - "pool_recycle": CONF.sql_idle_timeout, + "pool_recycle": CONF.database.idle_timeout, "echo": False, 'convert_unicode': True, } # Map our SQL debug level to SQLAlchemy's options - if CONF.sql_connection_debug >= 100: + if CONF.database.connection_debug >= 100: engine_args['echo'] = 'debug' - elif CONF.sql_connection_debug >= 50: + elif CONF.database.connection_debug >= 50: engine_args['echo'] = True if "sqlite" in connection_dict.drivername: + if sqlite_fk: + engine_args["listeners"] = [SqliteForeignKeysListener()] engine_args["poolclass"] = NullPool - if CONF.sql_connection == "sqlite://": + if CONF.database.connection == "sqlite://": engine_args["poolclass"] = StaticPool engine_args["connect_args"] = {'check_same_thread': False} else: - engine_args['pool_size'] = CONF.sql_max_pool_size - if CONF.sql_max_overflow is not None: - engine_args['max_overflow'] = CONF.sql_max_overflow + engine_args['pool_size'] = CONF.database.max_pool_size + if CONF.database.max_overflow is not None: + engine_args['max_overflow'] = CONF.database.max_overflow engine = sqlalchemy.create_engine(sql_connection, **engine_args) - sqlalchemy.event.listen(engine, 'checkin', greenthread_yield) + sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(engine, 'checkout', ping_listener) + sqlalchemy.event.listen(engine, 'checkout', _ping_listener) elif 'sqlite' in connection_dict.drivername: if not CONF.sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', - synchronous_switch_listener) - sqlalchemy.event.listen(engine, 'connect', add_regexp_listener) + _synchronous_switch_listener) + sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) - if (CONF.sql_connection_trace and + if (CONF.database.connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb'): - patch_mysqldb_with_stacktrace_comments() + _patch_mysqldb_with_stacktrace_comments() try: engine.connect() - except sqla_exc.OperationalError, e: - if not is_db_connection_error(e.args[0]): + except sqla_exc.OperationalError as e: + if not _is_db_connection_error(e.args[0]): raise - remaining = CONF.sql_max_retries + remaining = CONF.database.max_retries if remaining == -1: remaining = 'infinite' while True: @@ -558,13 +612,13 @@ def create_engine(sql_connection): LOG.warn(msg % remaining) if remaining != 'infinite': remaining -= 1 - time.sleep(CONF.sql_retry_interval) + time.sleep(CONF.database.retry_interval) try: engine.connect() break - except sqla_exc.OperationalError, e: + except sqla_exc.OperationalError as e: if (remaining != 'infinite' and remaining == 0) or \ - not is_db_connection_error(e.args[0]): + not _is_db_connection_error(e.args[0]): raise return engine @@ -580,15 +634,15 @@ class Query(sqlalchemy.orm.query.Query): class Session(sqlalchemy.orm.session.Session): """Custom Session class to avoid SqlAlchemy Session monkey patching.""" - @wrap_db_error + @_wrap_db_error def query(self, *args, **kwargs): return super(Session, self).query(*args, **kwargs) - @wrap_db_error + @_wrap_db_error def flush(self, *args, **kwargs): return super(Session, self).flush(*args, **kwargs) - @wrap_db_error + @_wrap_db_error def execute(self, *args, **kwargs): return super(Session, self).execute(*args, **kwargs) @@ -602,7 +656,7 @@ def get_maker(engine, autocommit=True, expire_on_commit=False): query_cls=Query) -def patch_mysqldb_with_stacktrace_comments(): +def _patch_mysqldb_with_stacktrace_comments(): """Adds current stack trace as a comment in queries by patching MySQLdb.cursors.BaseCursor._do_query. """ diff --git a/ceilometer/openstack/common/db/sqlalchemy/utils.py b/ceilometer/openstack/common/db/sqlalchemy/utils.py index 048c009066..b3762ad0b3 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/utils.py +++ b/ceilometer/openstack/common/db/sqlalchemy/utils.py @@ -105,9 +105,9 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Build up an array of sort criteria as in the docstring criteria_list = [] - for i in xrange(0, len(sort_keys)): + for i in range(0, len(sort_keys)): crit_attrs = [] - for j in xrange(0, i): + for j in range(0, i): model_attr = getattr(model, sort_keys[j]) crit_attrs.append((model_attr == marker_values[j])) diff --git a/ceilometer/openstack/common/eventlet_backdoor.py b/ceilometer/openstack/common/eventlet_backdoor.py index c0ad460fe6..57b89ae914 100644 --- a/ceilometer/openstack/common/eventlet_backdoor.py +++ b/ceilometer/openstack/common/eventlet_backdoor.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import print_function + import gc import pprint import sys @@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts) def _dont_use_this(): - print "Don't use this, just disconnect instead" + print("Don't use this, just disconnect instead") def _find_objects(t): @@ -46,16 +48,16 @@ def _find_objects(t): def _print_greenthreads(): for i, gt in enumerate(_find_objects(greenlet.greenlet)): - print i, gt + print(i, gt) traceback.print_stack(gt.gr_frame) - print + print() def _print_nativethreads(): for threadId, stack in sys._current_frames().items(): - print threadId + print(threadId) traceback.print_stack(stack) - print + print() def initialize_if_enabled(): diff --git a/ceilometer/openstack/common/fileutils.py b/ceilometer/openstack/common/fileutils.py new file mode 100644 index 0000000000..b988ad03d5 --- /dev/null +++ b/ceilometer/openstack/common/fileutils.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import errno +import os + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index 7e8ace2fae..e34aee4e75 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -41,6 +41,8 @@ import json import types import xmlrpclib +import six + from ceilometer.openstack.common import timeutils @@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # value of itertools.count doesn't get caught by nasty_type_tests # and results in infinite loop when list(value) is called. if type(value) == itertools.count: - return unicode(value) + return six.text_type(value) # FIXME(vish): Workaround for LP bug 852095. Without this workaround, # tests that raise an exception in a mocked method that @@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, return recursive(value.__dict__, level=level + 1) else: if any(test(value) for test in _nasty_type_tests): - return unicode(value) + return six.text_type(value) return value except TypeError: # Class objects are tricky since they may define something like # __iter__ defined but it isn't callable as list(). - return unicode(value) + return six.text_type(value) def dumps(value, default=to_primitive, **kwargs): diff --git a/ceilometer/openstack/common/lockutils.py b/ceilometer/openstack/common/lockutils.py new file mode 100644 index 0000000000..9d85bec2a9 --- /dev/null +++ b/ceilometer/openstack/common/lockutils.py @@ -0,0 +1,278 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import errno +import functools +import os +import shutil +import tempfile +import time +import weakref + +from eventlet import semaphore +from oslo.config import cfg + +from ceilometer.openstack.common import fileutils +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import local +from ceilometer.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +util_opts = [ + cfg.BoolOpt('disable_process_locking', default=False, + help='Whether to disable inter-process locks'), + cfg.StrOpt('lock_path', + help=('Directory to use for lock files. Default to a ' + 'temp directory')) +] + + +CONF = cfg.CONF +CONF.register_opts(util_opts) + + +def set_defaults(lock_path): + cfg.set_defaults(util_opts, lock_path=lock_path) + + +class _InterProcessLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. Exclusive + access between local threads should be achieved using the semaphores + in the @synchronized decorator. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self.lockfile = None + self.fname = name + + def __enter__(self): + self.lockfile = open(self.fname, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + return self + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + # external locks synchronise things like iptables + # updates - give it some time to prevent busy spinning + time.sleep(0.01) + else: + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.unlock() + self.lockfile.close() + except IOError: + LOG.exception(_("Could not release the acquired lock `%s`"), + self.fname) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_InterProcessLock): + def trylock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) + + +class _PosixLock(_InterProcessLock): + def trylock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_UN) + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock +else: + import fcntl + InterProcessLock = _PosixLock + +_semaphores = weakref.WeakValueDictionary() + + +def synchronized(name, lock_file_prefix, external=False, lock_path=None): + """Synchronization decorator. + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. The prefix should end with a hyphen ('-') if specified. + + The external keyword argument denotes whether this lock should work across + multiple processes. This means that if two different workers both run a + a method decorated with @synchronized('mylock', external=True), only one + of them will execute at a time. + + The lock_path keyword argument is used to specify a special location for + external lock files to live. If nothing is set, then CONF.lock_path is + used as a default. + """ + + def wrap(f): + @functools.wraps(f) + def inner(*args, **kwargs): + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn + # amically-allocating-and-destroying-mutexes + sem = _semaphores.get(name, semaphore.Semaphore()) + if name not in _semaphores: + # this check is not racy - we're already holding ref locally + # so GC won't remove the item and there was no IO switch + # (only valid in greenthreads) + _semaphores[name] = sem + + with sem: + LOG.debug(_('Got semaphore "%(lock)s" for method ' + '"%(method)s"...'), {'lock': name, + 'method': f.__name__}) + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s" ' + 'for method "%(method)s"...'), + {'lock': name, 'method': f.__name__}) + cleanup_dir = False + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path + if not local_lock_path: + local_lock_path = CONF.lock_path + + if not local_lock_path: + cleanup_dir = True + local_lock_path = tempfile.mkdtemp() + + if not os.path.exists(local_lock_path): + fileutils.ensure_tree(local_lock_path) + + # NOTE(mikal): the lock name cannot contain directory + # separators + safe_name = name.replace(os.sep, '_') + lock_file_name = '%s%s' % (lock_file_prefix, safe_name) + lock_file_path = os.path.join(local_lock_path, + lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock: + LOG.debug(_('Got file lock "%(lock)s" at ' + '%(path)s for method ' + '"%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + retval = f(*args, **kwargs) + finally: + LOG.debug(_('Released file lock "%(lock)s" at ' + '%(path)s for method "%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + # NOTE(vish): This removes the tempdir if we needed + # to create one. This is used to + # cleanup the locks left behind by unit + # tests. + if cleanup_dir: + shutil.rmtree(local_lock_path) + else: + retval = f(*args, **kwargs) + + finally: + local.strong_store.locks_held.remove(name) + + return retval + return inner + return wrap + + +def synchronized_with_prefix(lock_file_prefix): + """Partial object generator for the synchronization decorator. + + Redefine @synchronized in each project like so:: + + (in nova/utils.py) + from nova.openstack.common import lockutils + + synchronized = lockutils.synchronized_with_prefix('nova-') + + + (in nova/foo.py) + from nova import utils + + @utils.synchronized('mylock') + def bar(self, *args): + ... + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. The prefix should end with a hyphen ('-') if specified. + """ + + return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index 00cc22c6d4..19f4ba0733 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -43,12 +43,11 @@ import traceback from oslo.config import cfg from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import local -from ceilometer.openstack.common import notifier -_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" common_cli_opts = [ @@ -73,11 +72,13 @@ logging_cli_opts = [ 'documentation for details on logging configuration ' 'files.'), cfg.StrOpt('log-format', - default=_DEFAULT_LOG_FORMAT, + default=None, metavar='FORMAT', help='A logging.Formatter log message format string which may ' 'use any of the available logging.LogRecord attributes. ' - 'Default: %(default)s'), + 'This option is deprecated. Please use ' + 'logging_context_format_string and ' + 'logging_default_format_string instead.'), cfg.StrOpt('log-date-format', default=_DEFAULT_LOG_DATE_FORMAT, metavar='DATE_FORMAT', @@ -321,17 +322,6 @@ class JSONFormatter(logging.Formatter): return jsonutils.dumps(message) -class PublishErrorsHandler(logging.Handler): - def emit(self, record): - if ('ceilometer.openstack.common.notifier.log_notifier' in - CONF.notification_driver): - return - notifier.api.notify(None, 'error.publisher', - 'error_notification', - notifier.api.ERROR, - dict(error=record.msg)) - - def _create_logging_excepthook(product_name): def logging_excepthook(type, value, tb): extra = {} @@ -427,15 +417,22 @@ def _setup_logging_from_conf(): log_root.addHandler(streamlog) if CONF.publish_errors: - log_root.addHandler(PublishErrorsHandler(logging.ERROR)) + handler = importutils.import_object( + "ceilometer.openstack.common.log_handler.PublishErrorsHandler", + logging.ERROR) + log_root.addHandler(handler) + datefmt = CONF.log_date_format for handler in log_root.handlers: - datefmt = CONF.log_date_format + # NOTE(alaski): CONF.log_format overrides everything currently. This + # should be deprecated in favor of context aware formatting. if CONF.log_format: handler.setFormatter(logging.Formatter(fmt=CONF.log_format, datefmt=datefmt)) + log_root.info('Deprecated: log_format is now deprecated and will ' + 'be removed in the next release') else: - handler.setFormatter(LegacyFormatter(datefmt=datefmt)) + handler.setFormatter(ContextFormatter(datefmt=datefmt)) if CONF.debug: log_root.setLevel(logging.DEBUG) @@ -481,7 +478,7 @@ class WritableLogger(object): self.logger.log(self.level, msg) -class LegacyFormatter(logging.Formatter): +class ContextFormatter(logging.Formatter): """A context.RequestContext aware formatter configured through flags. The flags used to set format strings are: logging_context_format_string diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index 04d4e6d9ab..03bea7ffa6 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -60,6 +60,7 @@ import abc import re import urllib +import six import urllib2 from ceilometer.openstack.common.gettextutils import _ @@ -436,7 +437,7 @@ def _parse_list_rule(rule): or_list.append(AndCheck(and_list)) # If we have only one check, omit the "or" - if len(or_list) == 0: + if not or_list: return FalseCheck() elif len(or_list) == 1: return or_list[0] @@ -775,5 +776,5 @@ class GenericCheck(Check): # TODO(termie): do dict inspection via dot syntax match = self.match % target if self.kind in creds: - return match == unicode(creds[self.kind]) + return match == six.text_type(creds[self.kind]) return False diff --git a/ceilometer/openstack/common/processutils.py b/ceilometer/openstack/common/processutils.py index 3541626c35..f93fd9384e 100644 --- a/ceilometer/openstack/common/processutils.py +++ b/ceilometer/openstack/common/processutils.py @@ -19,8 +19,10 @@ System-level utilities and helper functions. """ +import os import random import shlex +import signal from eventlet.green import subprocess from eventlet import greenthread @@ -32,6 +34,11 @@ from ceilometer.openstack.common import log as logging LOG = logging.getLogger(__name__) +class InvalidArgumentError(Exception): + def __init__(self, message=None): + super(InvalidArgumentError, self).__init__(message) + + class UnknownArgumentError(Exception): def __init__(self, message=None): super(UnknownArgumentError, self).__init__(message) @@ -40,6 +47,12 @@ class UnknownArgumentError(Exception): class ProcessExecutionError(Exception): def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, description=None): + self.exit_code = exit_code + self.stderr = stderr + self.stdout = stdout + self.cmd = cmd + self.description = description + if description is None: description = "Unexpected error while running command." if exit_code is None: @@ -49,6 +62,17 @@ class ProcessExecutionError(Exception): super(ProcessExecutionError, self).__init__(message) +class NoRootWrapSpecified(Exception): + def __init__(self, message=None): + super(NoRootWrapSpecified, self).__init__(message) + + +def _subprocess_setup(): + # Python installs a SIGPIPE handler by default. This is usually not what + # non-Python subprocesses expect. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + + def execute(*cmd, **kwargs): """ Helper method to shell out and execute a command through subprocess with @@ -58,11 +82,11 @@ def execute(*cmd, **kwargs): :type cmd: string :param process_input: Send to opened process. :type proces_input: string - :param check_exit_code: Defaults to 0. Will raise - :class:`ProcessExecutionError` - if the command exits without returning this value - as a returncode - :type check_exit_code: int + :param check_exit_code: Single bool, int, or list of allowed exit + codes. Defaults to [0]. Raise + :class:`ProcessExecutionError` unless + program exits with one of these code. + :type check_exit_code: boolean, int, or [int] :param delay_on_retry: True | False. Defaults to True. If set to True, wait a short amount of time before retrying. :type delay_on_retry: boolean @@ -72,8 +96,12 @@ def execute(*cmd, **kwargs): the command is prefixed by the command specified in the root_helper kwarg. :type run_as_root: boolean - :param root_helper: command to prefix all cmd's with + :param root_helper: command to prefix to commands called with + run_as_root=True :type root_helper: string + :param shell: whether or not there should be a shell used to + execute this command. Defaults to false. + :type shell: boolean :returns: (stdout, stderr) from process execution :raises: :class:`UnknownArgumentError` on receiving unknown arguments @@ -81,16 +109,31 @@ def execute(*cmd, **kwargs): """ process_input = kwargs.pop('process_input', None) - check_exit_code = kwargs.pop('check_exit_code', 0) + check_exit_code = kwargs.pop('check_exit_code', [0]) + ignore_exit_code = False delay_on_retry = kwargs.pop('delay_on_retry', True) attempts = kwargs.pop('attempts', 1) run_as_root = kwargs.pop('run_as_root', False) root_helper = kwargs.pop('root_helper', '') - if len(kwargs): + shell = kwargs.pop('shell', False) + + if isinstance(check_exit_code, bool): + ignore_exit_code = not check_exit_code + check_exit_code = [0] + elif isinstance(check_exit_code, int): + check_exit_code = [check_exit_code] + + if kwargs: raise UnknownArgumentError(_('Got unknown keyword args ' 'to utils.execute: %r') % kwargs) - if run_as_root: + + if run_as_root and os.geteuid() != 0: + if not root_helper: + raise NoRootWrapSpecified( + message=('Command requested root, but did not specify a root ' + 'helper.')) cmd = shlex.split(root_helper) + list(cmd) + cmd = map(str, cmd) while attempts > 0: @@ -98,11 +141,21 @@ def execute(*cmd, **kwargs): try: LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) _PIPE = subprocess.PIPE # pylint: disable=E1101 + + if os.name == 'nt': + preexec_fn = None + close_fds = False + else: + preexec_fn = _subprocess_setup + close_fds = True + obj = subprocess.Popen(cmd, stdin=_PIPE, stdout=_PIPE, stderr=_PIPE, - close_fds=True) + close_fds=close_fds, + preexec_fn=preexec_fn, + shell=shell) result = None if process_input is not None: result = obj.communicate(process_input) @@ -112,9 +165,7 @@ def execute(*cmd, **kwargs): _returncode = obj.returncode # pylint: disable=E1101 if _returncode: LOG.debug(_('Result was %s') % _returncode) - if (isinstance(check_exit_code, int) and - not isinstance(check_exit_code, bool) and - _returncode != check_exit_code): + if not ignore_exit_code and _returncode not in check_exit_code: (stdout, stderr) = result raise ProcessExecutionError(exit_code=_returncode, stdout=stdout, @@ -133,3 +184,64 @@ def execute(*cmd, **kwargs): # call clean something up in between calls, without # it two execute calls in a row hangs the second one greenthread.sleep(0) + + +def trycmd(*args, **kwargs): + """ + A wrapper around execute() to more easily handle warnings and errors. + + Returns an (out, err) tuple of strings containing the output of + the command's stdout and stderr. If 'err' is not empty then the + command can be considered to have failed. + + :discard_warnings True | False. Defaults to False. If set to True, + then for succeeding commands, stderr is cleared + + """ + discard_warnings = kwargs.pop('discard_warnings', False) + + try: + out, err = execute(*args, **kwargs) + failed = False + except ProcessExecutionError, exn: + out, err = '', str(exn) + failed = True + + if not failed and discard_warnings and err: + # Handle commands that output to stderr but otherwise succeed + err = '' + + return out, err + + +def ssh_execute(ssh, cmd, process_input=None, + addl_env=None, check_exit_code=True): + LOG.debug(_('Running cmd (SSH): %s'), cmd) + if addl_env: + raise InvalidArgumentError(_('Environment not supported over SSH')) + + if process_input: + # This is (probably) fixable if we need it... + raise InvalidArgumentError(_('process_input not supported over SSH')) + + stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd) + channel = stdout_stream.channel + + # NOTE(justinsb): This seems suspicious... + # ...other SSH clients have buffering issues with this approach + stdout = stdout_stream.read() + stderr = stderr_stream.read() + stdin_stream.close() + + exit_status = channel.recv_exit_status() + + # exit_status == -1 if no exit code was returned + if exit_status != -1: + LOG.debug(_('Result was %s') % exit_status) + if check_exit_code and exit_status != 0: + raise ProcessExecutionError(exit_code=exit_status, + stdout=stdout, + stderr=stderr, + cmd=cmd) + + return (stdout, stderr) diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index 3261f65044..35848dfbc7 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext): msg_id = message_data.pop('_msg_id', None) waiter = self._call_waiters.get(msg_id) if not waiter: - LOG.warn(_('no calling threads waiting for msg_id : %s' - ', message : %s') % (msg_id, message_data)) + LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s' + ', message : %(data)s'), {'msg_id': msg_id, + 'data': message_data}) else: waiter.put(message_data) diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 92e6c00e18..a99e16f5d1 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -22,6 +22,7 @@ import sys import traceback from oslo.config import cfg +import six from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils @@ -157,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException): "not supported by this endpoint.") +class RpcVersionCapError(RPCException): + message = _("Specified RPC version cap, %(version_cap)s, is too low") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -299,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True): tb = traceback.format_exception(*failure_info) failure = failure_info[1] if log_failure: - LOG.error(_("Returning exception %s to caller"), unicode(failure)) + LOG.error(_("Returning exception %s to caller"), + six.text_type(failure)) LOG.error(tb) kwargs = {} @@ -309,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True): data = { 'class': str(failure.__class__.__name__), 'module': str(failure.__class__.__module__), - 'message': unicode(failure), + 'message': six.text_type(failure), 'tb': tb, 'args': failure.args, 'kwargs': kwargs diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py index 33734509b8..ff41de5b25 100644 --- a/ceilometer/openstack/common/rpc/dispatcher.py +++ b/ceilometer/openstack/common/rpc/dispatcher.py @@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified. """ from ceilometer.openstack.common.rpc import common as rpc_common +from ceilometer.openstack.common.rpc import serializer as rpc_serializer class RpcDispatcher(object): @@ -93,16 +94,38 @@ class RpcDispatcher(object): contains a list of underlying managers that have an API_VERSION attribute. """ - def __init__(self, callbacks): + def __init__(self, callbacks, serializer=None): """Initialize the rpc dispatcher. :param callbacks: List of proxy objects that are an instance of a class with rpc methods exposed. Each proxy object should have an RPC_API_VERSION attribute. + :param serializer: The Serializer object that will be used to + deserialize arguments before the method call and + to serialize the result after it returns. """ self.callbacks = callbacks + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcDispatcher, self).__init__() + def _deserialize_args(self, context, kwargs): + """Helper method called to deserialize args before dispatch. + + This calls our serializer on each argument, returning a new set of + args that have been deserialized. + + :param context: The request context + :param kwargs: The arguments to be deserialized + :returns: A new set of deserialized args + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.deserialize_entity(context, + arg) + return new_kwargs + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. @@ -145,7 +168,9 @@ class RpcDispatcher(object): if not hasattr(proxyobj, method): continue if is_compatible: - return getattr(proxyobj, method)(ctxt, **kwargs) + kwargs = self._deserialize_args(ctxt, kwargs) + result = getattr(proxyobj, method)(ctxt, **kwargs) + return self.serializer.serialize_entity(ctxt, result) if had_compatible: raise AttributeError("No such RPC function '%s'" % method) diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index e52c26eb6f..76ee72578f 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -331,15 +331,16 @@ class Connection(object): def reconnect(self): """Handles reconnecting and re-establishing sessions and queues""" - if self.connection.opened(): - try: - self.connection.close() - except qpid_exceptions.ConnectionError: - pass - attempt = 0 delay = 1 while True: + # Close the session if necessary + if self.connection.opened(): + try: + self.connection.close() + except qpid_exceptions.ConnectionError: + pass + broker = self.brokers[attempt % len(self.brokers)] attempt += 1 @@ -374,7 +375,7 @@ class Connection(object): try: return method(*args, **kwargs) except (qpid_exceptions.Empty, - qpid_exceptions.ConnectionError), e: + qpid_exceptions.ConnectionError) as e: if error_callback: error_callback(e) self.reconnect() diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index dbf91df29a..995207c876 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -180,7 +180,7 @@ class ZmqSocket(object): return # We must unsubscribe, or we'll leak descriptors. - if len(self.subscriptions) > 0: + if self.subscriptions: for f in self.subscriptions: try: self.sock.setsockopt(zmq.UNSUBSCRIBE, f) @@ -763,7 +763,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results - if len(queues) == 0: + if not queues: LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. @@ -846,6 +846,11 @@ def _get_ctxt(): def _get_matchmaker(*args, **kwargs): global matchmaker if not matchmaker: - matchmaker = importutils.import_object( - CONF.rpc_zmq_matchmaker, *args, **kwargs) + mm = CONF.rpc_zmq_matchmaker + if mm.endswith('matchmaker.MatchMakerRing'): + mm.replace('matchmaker', 'matchmaker_ring') + LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use' + ' %(new)s instead') % dict( + orig=CONF.rpc_zmq_matchmaker, new=mm)) + matchmaker = importutils.import_object(mm, *args, **kwargs) return matchmaker diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index 571ab99ae6..659ca5a86a 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -19,8 +19,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance. """ import contextlib -import itertools -import json import eventlet from oslo.config import cfg @@ -30,10 +28,6 @@ from ceilometer.openstack.common import log as logging matchmaker_opts = [ - # Matchmaker ring file - cfg.StrOpt('matchmaker_ringfile', - default='/etc/nova/matchmaker_ring.json', - help='Matchmaker ring file (JSON)'), cfg.IntOpt('matchmaker_heartbeat_freq', default=300, help='Heartbeat frequency'), @@ -236,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase): self.hosts.discard(host) self.backend_unregister(key, '.'.join((key, host))) - LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"), + {'key': key, 'host': host}) def start_heartbeat(self): """ @@ -245,7 +240,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase): yielding for CONF.matchmaker_heartbeat_freq seconds between iterations. """ - if len(self.hosts) == 0: + if not self.hosts: raise MatchMakerException( _("Register before starting heartbeat.")) @@ -304,67 +299,6 @@ class StubExchange(Exchange): return [(key, None)] -class RingExchange(Exchange): - """ - Match Maker where hosts are loaded from a static file containing - a hashmap (JSON formatted). - - __init__ takes optional ring dictionary argument, otherwise - loads the ringfile from CONF.mathcmaker_ringfile. - """ - def __init__(self, ring=None): - super(RingExchange, self).__init__() - - if ring: - self.ring = ring - else: - fh = open(CONF.matchmaker_ringfile, 'r') - self.ring = json.load(fh) - fh.close() - - self.ring0 = {} - for k in self.ring.keys(): - self.ring0[k] = itertools.cycle(self.ring[k]) - - def _ring_has(self, key): - if key in self.ring0: - return True - return False - - -class RoundRobinRingExchange(RingExchange): - """A Topic Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(RoundRobinRingExchange, self).__init__(ring) - - def run(self, key): - if not self._ring_has(key): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (key, ) - ) - return [] - host = next(self.ring0[key]) - return [(key + '.' + host, host)] - - -class FanoutRingExchange(RingExchange): - """Fanout Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(FanoutRingExchange, self).__init__(ring) - - def run(self, key): - # Assume starts with "fanout~", strip it for lookup. - nkey = key.split('fanout~')[1:][0] - if not self._ring_has(nkey): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) - ) - return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) - - class LocalhostExchange(Exchange): """Exchange where all direct topics are local.""" def __init__(self, host='localhost'): @@ -388,17 +322,6 @@ class DirectExchange(Exchange): return [(key, e)] -class MatchMakerRing(MatchMakerBase): - """ - Match Maker where hosts are loaded from a static hashmap. - """ - def __init__(self, ring=None): - super(MatchMakerRing, self).__init__() - self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) - self.add_binding(DirectBinding(), DirectExchange()) - self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) - - class MatchMakerLocalhost(MatchMakerBase): """ Match Maker where all bare topics resolve to localhost. diff --git a/ceilometer/openstack/common/rpc/matchmaker_ring.py b/ceilometer/openstack/common/rpc/matchmaker_ring.py new file mode 100644 index 0000000000..e733d17b99 --- /dev/null +++ b/ceilometer/openstack/common/rpc/matchmaker_ring.py @@ -0,0 +1,114 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011-2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import itertools +import json + +from oslo.config import cfg + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common.rpc import matchmaker as mm + + +matchmaker_opts = [ + # Matchmaker ring file + cfg.StrOpt('ringfile', + deprecated_name='matchmaker_ringfile', + deprecated_group='DEFAULT', + default='/etc/oslo/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts, 'matchmaker_ring') +LOG = logging.getLogger(__name__) + + +class RingExchange(mm.Exchange): + """ + Match Maker where hosts are loaded from a static file containing + a hashmap (JSON formatted). + + __init__ takes optional ring dictionary argument, otherwise + loads the ringfile from CONF.mathcmaker_ringfile. + """ + def __init__(self, ring=None): + super(RingExchange, self).__init__() + + if ring: + self.ring = ring + else: + fh = open(CONF.matchmaker_ring.ringfile, 'r') + self.ring = json.load(fh) + fh.close() + + self.ring0 = {} + for k in self.ring.keys(): + self.ring0[k] = itertools.cycle(self.ring[k]) + + def _ring_has(self, key): + if key in self.ring0: + return True + return False + + +class RoundRobinRingExchange(RingExchange): + """A Topic Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(RoundRobinRingExchange, self).__init__(ring) + + def run(self, key): + if not self._ring_has(key): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) + ) + return [] + host = next(self.ring0[key]) + return [(key + '.' + host, host)] + + +class FanoutRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(FanoutRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "fanout~", strip it for lookup. + nkey = key.split('fanout~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + +class MatchMakerRing(mm.MatchMakerBase): + """ + Match Maker where hosts are loaded from a static hashmap. + """ + def __init__(self, ring=None): + super(MatchMakerRing, self).__init__() + self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring)) + self.add_binding(mm.DirectBinding(), mm.DirectExchange()) + self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring)) diff --git a/ceilometer/openstack/common/rpc/proxy.py b/ceilometer/openstack/common/rpc/proxy.py index 192d9f6de1..2ea0f1a005 100644 --- a/ceilometer/openstack/common/rpc/proxy.py +++ b/ceilometer/openstack/common/rpc/proxy.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 Red Hat, Inc. +# Copyright 2012-2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -23,6 +23,8 @@ For more information about rpc API version numbers, see: from ceilometer.openstack.common import rpc +from ceilometer.openstack.common.rpc import common as rpc_common +from ceilometer.openstack.common.rpc import serializer as rpc_serializer class RpcProxy(object): @@ -34,16 +36,28 @@ class RpcProxy(object): rpc API. """ - def __init__(self, topic, default_version): + # The default namespace, which can be overriden in a subclass. + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None, + serializer=None): """Initialize an RpcProxy. :param topic: The topic to use for all messages. :param default_version: The default API version to request in all outgoing messages. This can be overridden on a per-message basis. + :param version_cap: Optionally cap the maximum version used for sent + messages. + :param serializer: Optionaly (de-)serialize entities with a + provided helper. """ self.topic = topic self.default_version = default_version + self.version_cap = version_cap + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcProxy, self).__init__() def _set_version(self, msg, vers): @@ -52,7 +66,11 @@ class RpcProxy(object): :param msg: The message having a version added to it. :param vers: The version number to add to the message. """ - msg['version'] = vers if vers else self.default_version + v = vers if vers else self.default_version + if (self.version_cap and not + rpc_common.version_is_compatible(self.version_cap, v)): + raise rpc_common.RpcVersionCapError(version=self.version_cap) + msg['version'] = v def _get_topic(self, topic): """Return the topic to use for a message.""" @@ -62,9 +80,25 @@ class RpcProxy(object): def make_namespaced_msg(method, namespace, **kwargs): return {'method': method, 'namespace': namespace, 'args': kwargs} - @staticmethod - def make_msg(method, **kwargs): - return RpcProxy.make_namespaced_msg(method, None, **kwargs) + def make_msg(self, method, **kwargs): + return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE, + **kwargs) + + def _serialize_msg_args(self, context, kwargs): + """Helper method called to serialize message arguments. + + This calls our serializer on each argument, returning a new + set of args that have been serialized. + + :param context: The request context + :param kwargs: The arguments to serialize + :returns: A new set of serialized arguments + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.serialize_entity(context, + arg) + return new_kwargs def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. @@ -81,9 +115,11 @@ class RpcProxy(object): :returns: The return value from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.call(context, real_topic, msg, timeout) + result = rpc.call(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -104,9 +140,11 @@ class RpcProxy(object): from the remote method as they arrive. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.multicall(context, real_topic, msg, timeout) + result = rpc.multicall(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -124,6 +162,7 @@ class RpcProxy(object): remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast(context, self._get_topic(topic), msg) def fanout_cast(self, context, msg, topic=None, version=None): @@ -139,6 +178,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, @@ -157,6 +197,7 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) def fanout_cast_to_server(self, context, server_params, msg, topic=None, @@ -175,5 +216,6 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast_to_server(context, server_params, self._get_topic(topic), msg) diff --git a/ceilometer/openstack/common/rpc/serializer.py b/ceilometer/openstack/common/rpc/serializer.py new file mode 100644 index 0000000000..0a2c9c4f11 --- /dev/null +++ b/ceilometer/openstack/common/rpc/serializer.py @@ -0,0 +1,52 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provides the definition of an RPC serialization handler""" + +import abc + + +class Serializer(object): + """Generic (de-)serialization definition base class""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def serialize_entity(self, context, entity): + """Serialize something to primitive form. + + :param context: Security context + :param entity: Entity to be serialized + :returns: Serialized form of entity + """ + pass + + @abc.abstractmethod + def deserialize_entity(self, context, entity): + """Deserialize something from primitive form. + + :param context: Security context + :param entity: Primitive to be deserialized + :returns: Deserialized form of entity + """ + pass + + +class NoOpSerializer(Serializer): + """A serializer that does nothing""" + + def serialize_entity(self, context, entity): + return entity + + def deserialize_entity(self, context, entity): + return entity diff --git a/ceilometer/openstack/common/service.py b/ceilometer/openstack/common/service.py index d906f0c9b6..ba094c83e0 100644 --- a/ceilometer/openstack/common/service.py +++ b/ceilometer/openstack/common/service.py @@ -52,7 +52,7 @@ class Launcher(object): """ self._services = threadgroup.ThreadGroup() - eventlet_backdoor.initialize_if_enabled() + self.backdoor_port = eventlet_backdoor.initialize_if_enabled() @staticmethod def run_service(service): @@ -72,6 +72,7 @@ class Launcher(object): :returns: None """ + service.backdoor_port = self.backdoor_port self._services.add_thread(self.run_service, service) def stop(self): diff --git a/ceilometer/openstack/common/threadgroup.py b/ceilometer/openstack/common/threadgroup.py index b8f29e3554..08b39d4e53 100644 --- a/ceilometer/openstack/common/threadgroup.py +++ b/ceilometer/openstack/common/threadgroup.py @@ -61,6 +61,13 @@ class ThreadGroup(object): self.threads = [] self.timers = [] + def add_dynamic_timer(self, callback, initial_delay=None, + periodic_interval_max=None, *args, **kwargs): + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + self.timers.append(timer) + def add_timer(self, interval, callback, initial_delay=None, *args, **kwargs): pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 7d58d74318..dc23a7a4d4 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -143,48 +143,12 @@ # Options defined in ceilometer.openstack.common.db.sqlalchemy.session # -# The SQLAlchemy connection string used to connect to the -# database (string value) -#sql_connection=sqlite:////ceilometer/openstack/common/db/$sqlite_db - # the filename to use with sqlite (string value) #sqlite_db=ceilometer.sqlite -# timeout before idle sql connections are reaped (integer -# value) -#sql_idle_timeout=3600 - -# If passed, use synchronous mode for sqlite (boolean value) +# If true, use synchronous mode for sqlite (boolean value) #sqlite_synchronous=true -# Minimum number of SQL connections to keep open in a pool -# (integer value) -#sql_min_pool_size=1 - -# Maximum number of SQL connections to keep open in a pool -# (integer value) -#sql_max_pool_size=5 - -# maximum db connection retries during startup. (setting -1 -# implies an infinite retry count) (integer value) -#sql_max_retries=10 - -# interval between retries of opening a sql connection -# (integer value) -#sql_retry_interval=10 - -# If set, use this value for max_overflow with sqlalchemy -# (integer value) -#sql_max_overflow= - -# Verbosity of SQL debugging information. 0=None, -# 100=Everything (integer value) -#sql_connection_debug=0 - -# Add python stack traces to SQL as comment strings (boolean -# value) -#sql_connection_trace=false - # # Options defined in ceilometer.openstack.common.eventlet_backdoor @@ -194,6 +158,18 @@ #backdoor_port= +# +# Options defined in ceilometer.openstack.common.lockutils +# + +# Whether to disable inter-process locks (boolean value) +#disable_process_locking=false + +# Directory to use for lock files. Default to a temp directory +# (string value) +#lock_path= + + # # Options defined in ceilometer.openstack.common.log # @@ -250,9 +226,11 @@ #log_config= # A logging.Formatter log message format string which may use -# any of the available logging.LogRecord attributes. Default: -# %(default)s (string value) -#log_format=%(asctime)s %(levelname)8s [%(name)s] %(message)s +# any of the available logging.LogRecord attributes. This +# option is deprecated. Please use +# logging_context_format_string and +# logging_default_format_string instead. (string value) +#log_format= # Format string for %%(asctime)s in log records. Default: # %(default)s (string value) @@ -474,16 +452,13 @@ # Name of this node. Must be a valid hostname, FQDN, or IP # address. Must match "host" option, if running Nova. (string # value) -#rpc_zmq_host=nova +#rpc_zmq_host=dex # # Options defined in ceilometer.openstack.common.rpc.matchmaker # -# Matchmaker ring file (JSON) (string value) -#matchmaker_ringfile=/etc/nova/matchmaker_ring.json - # Heartbeat frequency (integer value) #matchmaker_heartbeat_freq=300 @@ -529,6 +504,61 @@ #port=4952 +[database] + +# +# Options defined in ceilometer.openstack.common.db.api +# + +# The backend to use for db (string value) +#backend=sqlalchemy + +# Enable the experimental use of thread pooling for all DB API +# calls (boolean value) +#use_tpool=false + + +# +# Options defined in ceilometer.openstack.common.db.sqlalchemy.session +# + +# The SQLAlchemy connection string used to connect to the +# database (string value) +#connection=sqlite:////common/db/$sqlite_db + +# timeout before idle sql connections are reaped (integer +# value) +#idle_timeout=3600 + +# Minimum number of SQL connections to keep open in a pool +# (integer value) +#min_pool_size=1 + +# Maximum number of SQL connections to keep open in a pool +# (integer value) +#max_pool_size=5 + +# maximum db connection retries during startup. (setting -1 +# implies an infinite retry count) (integer value) +#max_retries=10 + +# interval between retries of opening a sql connection +# (integer value) +#retry_interval=10 + +# If set, use this value for max_overflow with sqlalchemy +# (integer value) +#max_overflow= + +# Verbosity of SQL debugging information. 0=None, +# 100=Everything (integer value) +#connection_debug=0 + +# Add python stack traces to SQL as comment strings (boolean +# value) +#connection_trace=false + + [publisher_meter] # @@ -586,4 +616,14 @@ #udp_port=4952 -# Total option count: 115 +[matchmaker_ring] + +# +# Options defined in ceilometer.openstack.common.rpc.matchmaker_ring +# + +# Matchmaker ring file (JSON) (string value) +#ringfile=/etc/oslo/matchmaker_ring.json + + +# Total option count: 119 diff --git a/openstack-common.conf b/openstack-common.conf index 2b2351154b..e15ba6dc57 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -17,4 +17,5 @@ module=rpc module=service module=threadgroup module=timeutils +module=config base=ceilometer diff --git a/tools/conf/generate_sample.sh b/tools/conf/generate_sample.sh index 5379abcad9..9d368a0065 100755 --- a/tools/conf/generate_sample.sh +++ b/tools/conf/generate_sample.sh @@ -21,5 +21,5 @@ FILES=$(find ceilometer -type f -name "*.py" ! -path "ceilometer/tests/*" -exec grep -l "Opt(" {} \; | sort -u) PYTHONPATH=./:${PYTHONPATH} \ - python $(dirname "$0")/extract_opts.py ${FILES} > \ + python $(dirname "$0")/../../ceilometer/openstack/common/config/generator.py ${FILES} > \ etc/ceilometer/ceilometer.conf.sample