Update oslo, use new configuration generator

Change-Id: I28f079a3466a247bce7b4614db3e4c960d769ed7
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-05-28 16:01:57 +02:00 committed by Gordon Chung
parent 45640c55cc
commit 4afe645fdc
27 changed files with 995 additions and 285 deletions

View File

@ -18,10 +18,8 @@
# #
# @author: Zhongyue Luo, SINA Corporation. # @author: Zhongyue Luo, SINA Corporation.
# #
"""Extracts OpenStack config option info from module(s).""" """Extracts OpenStack config option info from module(s)."""
import gettext
import imp import imp
import os import os
import re import re
@ -31,9 +29,10 @@ import textwrap
from oslo.config import cfg from oslo.config import cfg
from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common import importutils from ceilometer.openstack.common import importutils
gettext.install('ceilometer', unicode=1) gettextutils.install('ceilometer')
STROPT = "StrOpt" STROPT = "StrOpt"
BOOLOPT = "BoolOpt" BOOLOPT = "BoolOpt"
@ -61,7 +60,7 @@ BASEDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
WORDWRAP_WIDTH = 60 WORDWRAP_WIDTH = 60
def main(srcfiles): def generate(srcfiles):
mods_by_pkg = dict() mods_by_pkg = dict()
for filepath in srcfiles: for filepath in srcfiles:
pkg_name = filepath.split(os.sep)[1] pkg_name = filepath.split(os.sep)[1]
@ -107,16 +106,14 @@ def _import_module(mod_str):
return sys.modules[mod_str[4:]] return sys.modules[mod_str[4:]]
else: else:
return importutils.import_module(mod_str) return importutils.import_module(mod_str)
except (ValueError, AttributeError), err: except ImportError as ie:
return None
except ImportError, ie:
sys.stderr.write("%s\n" % str(ie)) sys.stderr.write("%s\n" % str(ie))
return None return None
except Exception, e: except Exception:
return None return None
def _is_in_group (opt, group): def _is_in_group(opt, group):
"Check if opt is in group." "Check if opt is in group."
for key, value in group._opts.items(): for key, value in group._opts.items():
if value['opt'] == opt: if value['opt'] == opt:
@ -135,7 +132,11 @@ def _guess_groups(opt, mod_obj):
if _is_in_group(opt, value._group): if _is_in_group(opt, value._group):
return value._group.name return value._group.name
raise RuntimeError("Unable to find group for option %s" % opt.name) raise RuntimeError(
"Unable to find group for option %s, "
"maybe it's defined twice in the same group?"
% opt.name
)
def _list_opts(obj): def _list_opts(obj):
@ -193,7 +194,7 @@ def _sanitize_default(s):
elif s == _get_my_ip(): elif s == _get_my_ip():
return '10.0.0.1' return '10.0.0.1'
elif s == socket.getfqdn(): elif s == socket.getfqdn():
return 'nova' return 'ceilometer'
elif s.strip() != s: elif s.strip() != s:
return '"%s"' % s return '"%s"' % s
return s return s
@ -242,8 +243,11 @@ def _print_opt(opt):
sys.exit(1) sys.exit(1)
if __name__ == '__main__': def main():
if len(sys.argv) < 2: if len(sys.argv) < 2:
print "usage: python %s [srcfile]...\n" % sys.argv[0] print "usage: %s [srcfile]...\n" % sys.argv[0]
sys.exit(0) sys.exit(0)
main(sys.argv[1:]) generate(sys.argv[1:])
if __name__ == '__main__':
main()

View File

@ -19,8 +19,9 @@
Supported configuration options: Supported configuration options:
`db_backend`: DB backend name or full module path to DB backend module. The following two parameters are in the 'database' group:
`dbapi_use_tpool`: Enable thread pooling of DB API calls. `backend`: DB backend name or full module path to DB backend module.
`use_tpool`: Enable thread pooling of DB API calls.
A DB backend module should implement a method named 'get_backend' which A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB takes no arguments. The method can return any object that implements DB
@ -44,17 +45,21 @@ from ceilometer.openstack.common import lockutils
db_opts = [ db_opts = [
cfg.StrOpt('db_backend', cfg.StrOpt('backend',
default='sqlalchemy', default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'), help='The backend to use for db'),
cfg.BoolOpt('dbapi_use_tpool', cfg.BoolOpt('use_tpool',
default=False, default=False,
deprecated_name='dbapi_use_tpool',
deprecated_group='DEFAULT',
help='Enable the experimental use of thread pooling for ' help='Enable the experimental use of thread pooling for '
'all DB API calls') 'all DB API calls')
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(db_opts) CONF.register_opts(db_opts, 'database')
class DBAPI(object): class DBAPI(object):
@ -75,8 +80,8 @@ class DBAPI(object):
if self.__backend: if self.__backend:
# Another thread assigned it # Another thread assigned it
return self.__backend return self.__backend
backend_name = CONF.db_backend backend_name = CONF.database.backend
self.__use_tpool = CONF.dbapi_use_tpool self.__use_tpool = CONF.database.use_tpool
if self.__use_tpool: if self.__use_tpool:
from eventlet import tpool from eventlet import tpool
self.__tpool = tpool self.__tpool = tpool

View File

@ -33,9 +33,6 @@ from ceilometer.openstack.common import timeutils
class ModelBase(object): class ModelBase(object):
"""Base class for models.""" """Base class for models."""
__table_initialized__ = False __table_initialized__ = False
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
metadata = None
def save(self, session=None): def save(self, session=None):
"""Save this object.""" """Save this object."""
@ -92,6 +89,11 @@ class ModelBase(object):
return local.iteritems() return local.iteritems()
class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
class SoftDeleteMixin(object): class SoftDeleteMixin(object):
deleted_at = Column(DateTime) deleted_at = Column(DateTime)
deleted = Column(Integer, default=0) deleted = Column(Integer, default=0)

View File

@ -247,8 +247,10 @@ import time
from eventlet import greenthread from eventlet import greenthread
from oslo.config import cfg from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc from sqlalchemy import exc as sqla_exc
import sqlalchemy.interfaces import sqlalchemy.interfaces
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import literal_column
@ -258,53 +260,76 @@ from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
DEFAULT = 'DEFAULT'
sql_opts = [ sqlite_db_opts = [
cfg.StrOpt('sql_connection', cfg.StrOpt('sqlite_db',
default='ceilometer.sqlite',
help='the filename to use with sqlite'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If true, use synchronous mode for sqlite'),
]
database_opts = [
cfg.StrOpt('connection',
default='sqlite:///' + default='sqlite:///' +
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', '$sqlite_db')), '../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'database', 'database',
deprecated_name='sql_connection',
deprecated_group=DEFAULT,
secret=True), secret=True),
cfg.StrOpt('sqlite_db', cfg.IntOpt('idle_timeout',
default='ceilometer.sqlite',
help='the filename to use with sqlite'),
cfg.IntOpt('sql_idle_timeout',
default=3600, default=3600,
deprecated_name='sql_idle_timeout',
deprecated_group=DEFAULT,
help='timeout before idle sql connections are reaped'), help='timeout before idle sql connections are reaped'),
cfg.BoolOpt('sqlite_synchronous', cfg.IntOpt('min_pool_size',
default=True,
help='If passed, use synchronous mode for sqlite'),
cfg.IntOpt('sql_min_pool_size',
default=1, default=1,
deprecated_name='sql_min_pool_size',
deprecated_group=DEFAULT,
help='Minimum number of SQL connections to keep open in a ' help='Minimum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('sql_max_pool_size', cfg.IntOpt('max_pool_size',
default=5, default=5,
deprecated_name='sql_max_pool_size',
deprecated_group=DEFAULT,
help='Maximum number of SQL connections to keep open in a ' help='Maximum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('sql_max_retries', cfg.IntOpt('max_retries',
default=10, default=10,
deprecated_name='sql_max_retries',
deprecated_group=DEFAULT,
help='maximum db connection retries during startup. ' help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'), '(setting -1 implies an infinite retry count)'),
cfg.IntOpt('sql_retry_interval', cfg.IntOpt('retry_interval',
default=10, default=10,
deprecated_name='sql_retry_interval',
deprecated_group=DEFAULT,
help='interval between retries of opening a sql connection'), help='interval between retries of opening a sql connection'),
cfg.IntOpt('sql_max_overflow', cfg.IntOpt('max_overflow',
default=None, default=None,
deprecated_name='sql_max_overflow',
deprecated_group=DEFAULT,
help='If set, use this value for max_overflow with sqlalchemy'), help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('sql_connection_debug', cfg.IntOpt('connection_debug',
default=0, default=0,
deprecated_name='sql_connection_debug',
deprecated_group=DEFAULT,
help='Verbosity of SQL debugging information. 0=None, ' help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'), '100=Everything'),
cfg.BoolOpt('sql_connection_trace', cfg.BoolOpt('connection_trace',
default=False, default=False,
deprecated_name='sql_connection_trace',
deprecated_group=DEFAULT,
help='Add python stack traces to SQL as comment strings'), help='Add python stack traces to SQL as comment strings'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(sql_opts) CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_ENGINE = None _ENGINE = None
@ -313,17 +338,42 @@ _MAKER = None
def set_defaults(sql_connection, sqlite_db): def set_defaults(sql_connection, sqlite_db):
"""Set defaults for configuration variables.""" """Set defaults for configuration variables."""
cfg.set_defaults(sql_opts, cfg.set_defaults(database_opts,
sql_connection=sql_connection, connection=sql_connection)
cfg.set_defaults(sqlite_db_opts,
sqlite_db=sqlite_db) sqlite_db=sqlite_db)
def get_session(autocommit=True, expire_on_commit=False): def cleanup():
global _ENGINE, _MAKER
if _MAKER:
_MAKER.close_all()
_MAKER = None
if _ENGINE:
_ENGINE.dispose()
_ENGINE = None
class SqliteForeignKeysListener(PoolListener):
"""
Ensures that the foreign key constraints are enforced in SQLite.
The foreign key constraints are disabled by default in SQLite,
so the foreign key constraints will be enabled here for every
database connection
"""
def connect(self, dbapi_con, con_record):
dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False,
sqlite_fk=False):
"""Return a SQLAlchemy session.""" """Return a SQLAlchemy session."""
global _MAKER global _MAKER
if _MAKER is None: if _MAKER is None:
engine = get_engine() engine = get_engine(sqlite_fk=sqlite_fk)
_MAKER = get_maker(engine, autocommit, expire_on_commit) _MAKER = get_maker(engine, autocommit, expire_on_commit)
session = _MAKER() session = _MAKER()
@ -355,21 +405,22 @@ _DUP_KEY_RE_DB = {
} }
def raise_if_duplicate_entry_error(integrity_error, engine_name): def _raise_if_duplicate_entry_error(integrity_error, engine_name):
""" """
In this function will be raised DBDuplicateEntry exception if integrity In this function will be raised DBDuplicateEntry exception if integrity
error wrap unique constraint violation. error wrap unique constraint violation.
""" """
def get_columns_from_uniq_cons_or_name(columns): def get_columns_from_uniq_cons_or_name(columns):
# note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3" # note(vsergeyev): UniqueConstraint name convention: "uniq_t$c1$c2"
# means that columns c1, c2, c3 are in UniqueConstraint. # where `t` it is table name and columns `c1`, `c2`
# are in UniqueConstraint.
uniqbase = "uniq_" uniqbase = "uniq_"
if not columns.startswith(uniqbase): if not columns.startswith(uniqbase):
if engine_name == "postgresql": if engine_name == "postgresql":
return [columns[columns.index("_") + 1:columns.rindex("_")]] return [columns[columns.index("_") + 1:columns.rindex("_")]]
return [columns] return [columns]
return columns[len(uniqbase):].split("_x_") return columns[len(uniqbase):].split("$")[1:]
if engine_name not in ["mysql", "sqlite", "postgresql"]: if engine_name not in ["mysql", "sqlite", "postgresql"]:
return return
@ -397,7 +448,7 @@ _DEADLOCK_RE_DB = {
} }
def raise_if_deadlock_error(operational_error, engine_name): def _raise_if_deadlock_error(operational_error, engine_name):
""" """
Raise DBDeadlock exception if OperationalError contains a Deadlock Raise DBDeadlock exception if OperationalError contains a Deadlock
condition. condition.
@ -411,7 +462,7 @@ def raise_if_deadlock_error(operational_error, engine_name):
raise exception.DBDeadlock(operational_error) raise exception.DBDeadlock(operational_error)
def wrap_db_error(f): def _wrap_db_error(f):
def _wrap(*args, **kwargs): def _wrap(*args, **kwargs):
try: try:
return f(*args, **kwargs) return f(*args, **kwargs)
@ -420,49 +471,50 @@ def wrap_db_error(f):
# note(boris-42): We should catch unique constraint violation and # note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint # wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError. # violation is wrapped by IntegrityError.
except sqla_exc.OperationalError, e: except sqla_exc.OperationalError as e:
raise_if_deadlock_error(e, get_engine().name) _raise_if_deadlock_error(e, get_engine().name)
# NOTE(comstud): A lot of code is checking for OperationalError # NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now. # so let's not wrap it for now.
raise raise
except sqla_exc.IntegrityError, e: except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different # note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example # DBs so we must do this. Also in some tables (for example
# instance_types) there are more than one unique constraint. This # instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate # means we should get names of columns, which values violate
# unique constraint, from error message. # unique constraint, from error message.
raise_if_duplicate_entry_error(e, get_engine().name) _raise_if_duplicate_entry_error(e, get_engine().name)
raise exception.DBError(e) raise exception.DBError(e)
except Exception, e: except Exception as e:
LOG.exception(_('DB exception wrapped.')) LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e) raise exception.DBError(e)
_wrap.func_name = f.func_name _wrap.func_name = f.func_name
return _wrap return _wrap
def get_engine(): def get_engine(sqlite_fk=False):
"""Return a SQLAlchemy engine.""" """Return a SQLAlchemy engine."""
global _ENGINE global _ENGINE
if _ENGINE is None: if _ENGINE is None:
_ENGINE = create_engine(CONF.sql_connection) _ENGINE = create_engine(CONF.database.connection,
sqlite_fk=sqlite_fk)
return _ENGINE return _ENGINE
def synchronous_switch_listener(dbapi_conn, connection_rec): def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode.""" """Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF") dbapi_conn.execute("PRAGMA synchronous = OFF")
def add_regexp_listener(dbapi_con, con_record): def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections.""" """Add REGEXP function to sqlite connections."""
def regexp(expr, item): def regexp(expr, item):
reg = re.compile(expr) reg = re.compile(expr)
return reg.search(unicode(item)) is not None return reg.search(six.text_type(item)) is not None
dbapi_con.create_function('regexp', 2, regexp) dbapi_con.create_function('regexp', 2, regexp)
def greenthread_yield(dbapi_con, con_record): def _greenthread_yield(dbapi_con, con_record):
""" """
Ensure other greenthreads get a chance to execute by forcing a context Ensure other greenthreads get a chance to execute by forcing a context
switch. With common database backends (eg MySQLdb and sqlite), there is switch. With common database backends (eg MySQLdb and sqlite), there is
@ -472,7 +524,7 @@ def greenthread_yield(dbapi_con, con_record):
greenthread.sleep(0) greenthread.sleep(0)
def ping_listener(dbapi_conn, connection_rec, connection_proxy): def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
""" """
Ensures that MySQL connections checked out of the Ensures that MySQL connections checked out of the
pool are alive. pool are alive.
@ -482,7 +534,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy):
""" """
try: try:
dbapi_conn.cursor().execute('select 1') dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex: except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055): if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex) LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away") raise sqla_exc.DisconnectionError("Database server went away")
@ -490,7 +542,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy):
raise raise
def is_db_connection_error(args): def _is_db_connection_error(args):
"""Return True if error in connecting to db.""" """Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended # NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others. # to support Postgres and others.
@ -501,56 +553,58 @@ def is_db_connection_error(args):
return False return False
def create_engine(sql_connection): def create_engine(sql_connection, sqlite_fk=False):
"""Return a new SQLAlchemy engine.""" """Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection) connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = { engine_args = {
"pool_recycle": CONF.sql_idle_timeout, "pool_recycle": CONF.database.idle_timeout,
"echo": False, "echo": False,
'convert_unicode': True, 'convert_unicode': True,
} }
# Map our SQL debug level to SQLAlchemy's options # Map our SQL debug level to SQLAlchemy's options
if CONF.sql_connection_debug >= 100: if CONF.database.connection_debug >= 100:
engine_args['echo'] = 'debug' engine_args['echo'] = 'debug'
elif CONF.sql_connection_debug >= 50: elif CONF.database.connection_debug >= 50:
engine_args['echo'] = True engine_args['echo'] = True
if "sqlite" in connection_dict.drivername: if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool engine_args["poolclass"] = NullPool
if CONF.sql_connection == "sqlite://": if CONF.database.connection == "sqlite://":
engine_args["poolclass"] = StaticPool engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False} engine_args["connect_args"] = {'check_same_thread': False}
else: else:
engine_args['pool_size'] = CONF.sql_max_pool_size engine_args['pool_size'] = CONF.database.max_pool_size
if CONF.sql_max_overflow is not None: if CONF.database.max_overflow is not None:
engine_args['max_overflow'] = CONF.sql_max_overflow engine_args['max_overflow'] = CONF.database.max_overflow
engine = sqlalchemy.create_engine(sql_connection, **engine_args) engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', greenthread_yield) sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
if 'mysql' in connection_dict.drivername: if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', ping_listener) sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
elif 'sqlite' in connection_dict.drivername: elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous: if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect', sqlalchemy.event.listen(engine, 'connect',
synchronous_switch_listener) _synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', add_regexp_listener) sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if (CONF.sql_connection_trace and if (CONF.database.connection_trace and
engine.dialect.dbapi.__name__ == 'MySQLdb'): engine.dialect.dbapi.__name__ == 'MySQLdb'):
patch_mysqldb_with_stacktrace_comments() _patch_mysqldb_with_stacktrace_comments()
try: try:
engine.connect() engine.connect()
except sqla_exc.OperationalError, e: except sqla_exc.OperationalError as e:
if not is_db_connection_error(e.args[0]): if not _is_db_connection_error(e.args[0]):
raise raise
remaining = CONF.sql_max_retries remaining = CONF.database.max_retries
if remaining == -1: if remaining == -1:
remaining = 'infinite' remaining = 'infinite'
while True: while True:
@ -558,13 +612,13 @@ def create_engine(sql_connection):
LOG.warn(msg % remaining) LOG.warn(msg % remaining)
if remaining != 'infinite': if remaining != 'infinite':
remaining -= 1 remaining -= 1
time.sleep(CONF.sql_retry_interval) time.sleep(CONF.database.retry_interval)
try: try:
engine.connect() engine.connect()
break break
except sqla_exc.OperationalError, e: except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \ if (remaining != 'infinite' and remaining == 0) or \
not is_db_connection_error(e.args[0]): not _is_db_connection_error(e.args[0]):
raise raise
return engine return engine
@ -580,15 +634,15 @@ class Query(sqlalchemy.orm.query.Query):
class Session(sqlalchemy.orm.session.Session): class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching.""" """Custom Session class to avoid SqlAlchemy Session monkey patching."""
@wrap_db_error @_wrap_db_error
def query(self, *args, **kwargs): def query(self, *args, **kwargs):
return super(Session, self).query(*args, **kwargs) return super(Session, self).query(*args, **kwargs)
@wrap_db_error @_wrap_db_error
def flush(self, *args, **kwargs): def flush(self, *args, **kwargs):
return super(Session, self).flush(*args, **kwargs) return super(Session, self).flush(*args, **kwargs)
@wrap_db_error @_wrap_db_error
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
return super(Session, self).execute(*args, **kwargs) return super(Session, self).execute(*args, **kwargs)
@ -602,7 +656,7 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
query_cls=Query) query_cls=Query)
def patch_mysqldb_with_stacktrace_comments(): def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries by patching """Adds current stack trace as a comment in queries by patching
MySQLdb.cursors.BaseCursor._do_query. MySQLdb.cursors.BaseCursor._do_query.
""" """

View File

@ -105,9 +105,9 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Build up an array of sort criteria as in the docstring # Build up an array of sort criteria as in the docstring
criteria_list = [] criteria_list = []
for i in xrange(0, len(sort_keys)): for i in range(0, len(sort_keys)):
crit_attrs = [] crit_attrs = []
for j in xrange(0, i): for j in range(0, i):
model_attr = getattr(model, sort_keys[j]) model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j])) crit_attrs.append((model_attr == marker_values[j]))

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import print_function
import gc import gc
import pprint import pprint
import sys import sys
@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts)
def _dont_use_this(): def _dont_use_this():
print "Don't use this, just disconnect instead" print("Don't use this, just disconnect instead")
def _find_objects(t): def _find_objects(t):
@ -46,16 +48,16 @@ def _find_objects(t):
def _print_greenthreads(): def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)): for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt print(i, gt)
traceback.print_stack(gt.gr_frame) traceback.print_stack(gt.gr_frame)
print print()
def _print_nativethreads(): def _print_nativethreads():
for threadId, stack in sys._current_frames().items(): for threadId, stack in sys._current_frames().items():
print threadId print(threadId)
traceback.print_stack(stack) traceback.print_stack(stack)
print print()
def initialize_if_enabled(): def initialize_if_enabled():

View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import os
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise

View File

@ -41,6 +41,8 @@ import json
import types import types
import xmlrpclib import xmlrpclib
import six
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# value of itertools.count doesn't get caught by nasty_type_tests # value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called. # and results in infinite loop when list(value) is called.
if type(value) == itertools.count: if type(value) == itertools.count:
return unicode(value) return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround, # FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that # tests that raise an exception in a mocked method that
@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
return recursive(value.__dict__, level=level + 1) return recursive(value.__dict__, level=level + 1)
else: else:
if any(test(value) for test in _nasty_type_tests): if any(test(value) for test in _nasty_type_tests):
return unicode(value) return six.text_type(value)
return value return value
except TypeError: except TypeError:
# Class objects are tricky since they may define something like # Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list(). # __iter__ defined but it isn't callable as list().
return unicode(value) return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs): def dumps(value, default=to_primitive, **kwargs):

View File

@ -0,0 +1,278 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import functools
import os
import shutil
import tempfile
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from ceilometer.openstack.common import fileutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
help=('Directory to use for lock files. Default to a '
'temp directory'))
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then CONF.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -43,12 +43,11 @@ import traceback
from oslo.config import cfg from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local from ceilometer.openstack.common import local
from ceilometer.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [ common_cli_opts = [
@ -73,11 +72,13 @@ logging_cli_opts = [
'documentation for details on logging configuration ' 'documentation for details on logging configuration '
'files.'), 'files.'),
cfg.StrOpt('log-format', cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT, default=None,
metavar='FORMAT', metavar='FORMAT',
help='A logging.Formatter log message format string which may ' help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. ' 'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'), 'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format', cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT, default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT', metavar='DATE_FORMAT',
@ -321,17 +322,6 @@ class JSONFormatter(logging.Formatter):
return jsonutils.dumps(message) return jsonutils.dumps(message)
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('ceilometer.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.msg))
def _create_logging_excepthook(product_name): def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb): def logging_excepthook(type, value, tb):
extra = {} extra = {}
@ -427,15 +417,22 @@ def _setup_logging_from_conf():
log_root.addHandler(streamlog) log_root.addHandler(streamlog)
if CONF.publish_errors: if CONF.publish_errors:
log_root.addHandler(PublishErrorsHandler(logging.ERROR)) handler = importutils.import_object(
"ceilometer.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler)
datefmt = CONF.log_date_format
for handler in log_root.handlers: for handler in log_root.handlers:
datefmt = CONF.log_date_format # NOTE(alaski): CONF.log_format overrides everything currently. This
# should be deprecated in favor of context aware formatting.
if CONF.log_format: if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format, handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt)) datefmt=datefmt))
log_root.info('Deprecated: log_format is now deprecated and will '
'be removed in the next release')
else: else:
handler.setFormatter(LegacyFormatter(datefmt=datefmt)) handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug: if CONF.debug:
log_root.setLevel(logging.DEBUG) log_root.setLevel(logging.DEBUG)
@ -481,7 +478,7 @@ class WritableLogger(object):
self.logger.log(self.level, msg) self.logger.log(self.level, msg)
class LegacyFormatter(logging.Formatter): class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags. """A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string The flags used to set format strings are: logging_context_format_string

View File

@ -60,6 +60,7 @@ import abc
import re import re
import urllib import urllib
import six
import urllib2 import urllib2
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
@ -436,7 +437,7 @@ def _parse_list_rule(rule):
or_list.append(AndCheck(and_list)) or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or" # If we have only one check, omit the "or"
if len(or_list) == 0: if not or_list:
return FalseCheck() return FalseCheck()
elif len(or_list) == 1: elif len(or_list) == 1:
return or_list[0] return or_list[0]
@ -775,5 +776,5 @@ class GenericCheck(Check):
# TODO(termie): do dict inspection via dot syntax # TODO(termie): do dict inspection via dot syntax
match = self.match % target match = self.match % target
if self.kind in creds: if self.kind in creds:
return match == unicode(creds[self.kind]) return match == six.text_type(creds[self.kind])
return False return False

View File

@ -19,8 +19,10 @@
System-level utilities and helper functions. System-level utilities and helper functions.
""" """
import os
import random import random
import shlex import shlex
import signal
from eventlet.green import subprocess from eventlet.green import subprocess
from eventlet import greenthread from eventlet import greenthread
@ -32,6 +34,11 @@ from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class InvalidArgumentError(Exception):
def __init__(self, message=None):
super(InvalidArgumentError, self).__init__(message)
class UnknownArgumentError(Exception): class UnknownArgumentError(Exception):
def __init__(self, message=None): def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message) super(UnknownArgumentError, self).__init__(message)
@ -40,6 +47,12 @@ class UnknownArgumentError(Exception):
class ProcessExecutionError(Exception): class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None): description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if description is None: if description is None:
description = "Unexpected error while running command." description = "Unexpected error while running command."
if exit_code is None: if exit_code is None:
@ -49,6 +62,17 @@ class ProcessExecutionError(Exception):
super(ProcessExecutionError, self).__init__(message) super(ProcessExecutionError, self).__init__(message)
class NoRootWrapSpecified(Exception):
def __init__(self, message=None):
super(NoRootWrapSpecified, self).__init__(message)
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def execute(*cmd, **kwargs): def execute(*cmd, **kwargs):
""" """
Helper method to shell out and execute a command through subprocess with Helper method to shell out and execute a command through subprocess with
@ -58,11 +82,11 @@ def execute(*cmd, **kwargs):
:type cmd: string :type cmd: string
:param process_input: Send to opened process. :param process_input: Send to opened process.
:type proces_input: string :type proces_input: string
:param check_exit_code: Defaults to 0. Will raise :param check_exit_code: Single bool, int, or list of allowed exit
:class:`ProcessExecutionError` codes. Defaults to [0]. Raise
if the command exits without returning this value :class:`ProcessExecutionError` unless
as a returncode program exits with one of these code.
:type check_exit_code: int :type check_exit_code: boolean, int, or [int]
:param delay_on_retry: True | False. Defaults to True. If set to True, :param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying. wait a short amount of time before retrying.
:type delay_on_retry: boolean :type delay_on_retry: boolean
@ -72,8 +96,12 @@ def execute(*cmd, **kwargs):
the command is prefixed by the command specified the command is prefixed by the command specified
in the root_helper kwarg. in the root_helper kwarg.
:type run_as_root: boolean :type run_as_root: boolean
:param root_helper: command to prefix all cmd's with :param root_helper: command to prefix to commands called with
run_as_root=True
:type root_helper: string :type root_helper: string
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
:returns: (stdout, stderr) from process execution :returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on :raises: :class:`UnknownArgumentError` on
receiving unknown arguments receiving unknown arguments
@ -81,16 +109,31 @@ def execute(*cmd, **kwargs):
""" """
process_input = kwargs.pop('process_input', None) process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0) check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False
delay_on_retry = kwargs.pop('delay_on_retry', True) delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1) attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False) run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '') root_helper = kwargs.pop('root_helper', '')
if len(kwargs): shell = kwargs.pop('shell', False)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args ' raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs) 'to utils.execute: %r') % kwargs)
if run_as_root:
if run_as_root and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
'helper.'))
cmd = shlex.split(root_helper) + list(cmd) cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd) cmd = map(str, cmd)
while attempts > 0: while attempts > 0:
@ -98,11 +141,21 @@ def execute(*cmd, **kwargs):
try: try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101 _PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
preexec_fn = None
close_fds = False
else:
preexec_fn = _subprocess_setup
close_fds = True
obj = subprocess.Popen(cmd, obj = subprocess.Popen(cmd,
stdin=_PIPE, stdin=_PIPE,
stdout=_PIPE, stdout=_PIPE,
stderr=_PIPE, stderr=_PIPE,
close_fds=True) close_fds=close_fds,
preexec_fn=preexec_fn,
shell=shell)
result = None result = None
if process_input is not None: if process_input is not None:
result = obj.communicate(process_input) result = obj.communicate(process_input)
@ -112,9 +165,7 @@ def execute(*cmd, **kwargs):
_returncode = obj.returncode # pylint: disable=E1101 _returncode = obj.returncode # pylint: disable=E1101
if _returncode: if _returncode:
LOG.debug(_('Result was %s') % _returncode) LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and if not ignore_exit_code and _returncode not in check_exit_code:
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result (stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode, raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout, stdout=stdout,
@ -133,3 +184,64 @@ def execute(*cmd, **kwargs):
# call clean something up in between calls, without # call clean something up in between calls, without
# it two execute calls in a row hangs the second one # it two execute calls in a row hangs the second one
greenthread.sleep(0) greenthread.sleep(0)
def trycmd(*args, **kwargs):
"""
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the
command can be considered to have failed.
:discard_warnings True | False. Defaults to False. If set to True,
then for succeeding commands, stderr is cleared
"""
discard_warnings = kwargs.pop('discard_warnings', False)
try:
out, err = execute(*args, **kwargs)
failed = False
except ProcessExecutionError, exn:
out, err = '', str(exn)
failed = True
if not failed and discard_warnings and err:
# Handle commands that output to stderr but otherwise succeed
err = ''
return out, err
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
if process_input:
# This is (probably) fixable if we need it...
raise InvalidArgumentError(_('process_input not supported over SSH'))
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
channel = stdout_stream.channel
# NOTE(justinsb): This seems suspicious...
# ...other SSH clients have buffering issues with this approach
stdout = stdout_stream.read()
stderr = stderr_stream.read()
stdin_stream.close()
exit_status = channel.recv_exit_status()
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return (stdout, stderr)

View File

@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None) msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id) waiter = self._call_waiters.get(msg_id)
if not waiter: if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %s' LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
', message : %s') % (msg_id, message_data)) ', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
else: else:
waiter.put(message_data) waiter.put(message_data)

View File

@ -22,6 +22,7 @@ import sys
import traceback import traceback
from oslo.config import cfg from oslo.config import cfg
import six
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils from ceilometer.openstack.common import importutils
@ -157,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.") "not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object): class Connection(object):
"""A connection, returned by rpc.create_connection(). """A connection, returned by rpc.create_connection().
@ -299,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info) tb = traceback.format_exception(*failure_info)
failure = failure_info[1] failure = failure_info[1]
if log_failure: if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure)) LOG.error(_("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb) LOG.error(tb)
kwargs = {} kwargs = {}
@ -309,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
data = { data = {
'class': str(failure.__class__.__name__), 'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__), 'module': str(failure.__class__.__module__),
'message': unicode(failure), 'message': six.text_type(failure),
'tb': tb, 'tb': tb,
'args': failure.args, 'args': failure.args,
'kwargs': kwargs 'kwargs': kwargs

View File

@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
""" """
from ceilometer.openstack.common.rpc import common as rpc_common from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object): class RpcDispatcher(object):
@ -93,16 +94,38 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute. contains a list of underlying managers that have an API_VERSION attribute.
""" """
def __init__(self, callbacks): def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher. """Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance :param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute. object should have an RPC_API_VERSION attribute.
:param serializer: The Serializer object that will be used to
deserialize arguments before the method call and
to serialize the result after it returns.
""" """
self.callbacks = callbacks self.callbacks = callbacks
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcDispatcher, self).__init__() super(RpcDispatcher, self).__init__()
def _deserialize_args(self, context, kwargs):
"""Helper method called to deserialize args before dispatch.
This calls our serializer on each argument, returning a new set of
args that have been deserialized.
:param context: The request context
:param kwargs: The arguments to be deserialized
:returns: A new set of deserialized args
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
def dispatch(self, ctxt, version, method, namespace, **kwargs): def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version. """Dispatch a message based on a requested version.
@ -145,7 +168,9 @@ class RpcDispatcher(object):
if not hasattr(proxyobj, method): if not hasattr(proxyobj, method):
continue continue
if is_compatible: if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs) kwargs = self._deserialize_args(ctxt, kwargs)
result = getattr(proxyobj, method)(ctxt, **kwargs)
return self.serializer.serialize_entity(ctxt, result)
if had_compatible: if had_compatible:
raise AttributeError("No such RPC function '%s'" % method) raise AttributeError("No such RPC function '%s'" % method)

View File

@ -331,15 +331,16 @@ class Connection(object):
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues""" """Handles reconnecting and re-establishing sessions and queues"""
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
attempt = 0 attempt = 0
delay = 1 delay = 1
while True: while True:
# Close the session if necessary
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
broker = self.brokers[attempt % len(self.brokers)] broker = self.brokers[attempt % len(self.brokers)]
attempt += 1 attempt += 1
@ -374,7 +375,7 @@ class Connection(object):
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (qpid_exceptions.Empty, except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e: qpid_exceptions.ConnectionError) as e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()

View File

@ -180,7 +180,7 @@ class ZmqSocket(object):
return return
# We must unsubscribe, or we'll leak descriptors. # We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0: if self.subscriptions:
for f in self.subscriptions: for f in self.subscriptions:
try: try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f) self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@ -763,7 +763,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.debug(_("Sending message(s) to: %s"), queues) LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results # Don't stack if we have no matchmaker results
if len(queues) == 0: if not queues:
LOG.warn(_("No matchmaker results. Not casting.")) LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle # While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie. # this exception and a timeout isn't too big a lie.
@ -846,6 +846,11 @@ def _get_ctxt():
def _get_matchmaker(*args, **kwargs): def _get_matchmaker(*args, **kwargs):
global matchmaker global matchmaker
if not matchmaker: if not matchmaker:
matchmaker = importutils.import_object( mm = CONF.rpc_zmq_matchmaker
CONF.rpc_zmq_matchmaker, *args, **kwargs) if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker return matchmaker

View File

@ -19,8 +19,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
""" """
import contextlib import contextlib
import itertools
import json
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
@ -30,10 +28,6 @@ from ceilometer.openstack.common import log as logging
matchmaker_opts = [ matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq', cfg.IntOpt('matchmaker_heartbeat_freq',
default=300, default=300,
help='Heartbeat frequency'), help='Heartbeat frequency'),
@ -236,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.hosts.discard(host) self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host))) self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self): def start_heartbeat(self):
""" """
@ -245,7 +240,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
yielding for CONF.matchmaker_heartbeat_freq seconds yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations. between iterations.
""" """
if len(self.hosts) == 0: if not self.hosts:
raise MatchMakerException( raise MatchMakerException(
_("Register before starting heartbeat.")) _("Register before starting heartbeat."))
@ -304,67 +299,6 @@ class StubExchange(Exchange):
return [(key, None)] return [(key, None)]
class RingExchange(Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange): class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local.""" """Exchange where all direct topics are local."""
def __init__(self, host='localhost'): def __init__(self, host='localhost'):
@ -388,17 +322,6 @@ class DirectExchange(Exchange):
return [(key, e)] return [(key, e)]
class MatchMakerRing(MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
class MatchMakerLocalhost(MatchMakerBase): class MatchMakerLocalhost(MatchMakerBase):
""" """
Match Maker where all bare topics resolve to localhost. Match Maker where all bare topics resolve to localhost.

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import matchmaker as mm
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc. # Copyright 2012-2013 Red Hat, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -23,6 +23,8 @@ For more information about rpc API version numbers, see:
from ceilometer.openstack.common import rpc from ceilometer.openstack.common import rpc
from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object): class RpcProxy(object):
@ -34,16 +36,28 @@ class RpcProxy(object):
rpc API. rpc API.
""" """
def __init__(self, topic, default_version): # The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,
serializer=None):
"""Initialize an RpcProxy. """Initialize an RpcProxy.
:param topic: The topic to use for all messages. :param topic: The topic to use for all messages.
:param default_version: The default API version to request in all :param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message outgoing messages. This can be overridden on a per-message
basis. basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
:param serializer: Optionaly (de-)serialize entities with a
provided helper.
""" """
self.topic = topic self.topic = topic
self.default_version = default_version self.default_version = default_version
self.version_cap = version_cap
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcProxy, self).__init__() super(RpcProxy, self).__init__()
def _set_version(self, msg, vers): def _set_version(self, msg, vers):
@ -52,7 +66,11 @@ class RpcProxy(object):
:param msg: The message having a version added to it. :param msg: The message having a version added to it.
:param vers: The version number to add to the message. :param vers: The version number to add to the message.
""" """
msg['version'] = vers if vers else self.default_version v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
msg['version'] = v
def _get_topic(self, topic): def _get_topic(self, topic):
"""Return the topic to use for a message.""" """Return the topic to use for a message."""
@ -62,9 +80,25 @@ class RpcProxy(object):
def make_namespaced_msg(method, namespace, **kwargs): def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs} return {'method': method, 'namespace': namespace, 'args': kwargs}
@staticmethod def make_msg(self, method, **kwargs):
def make_msg(method, **kwargs): return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
return RpcProxy.make_namespaced_msg(method, None, **kwargs) **kwargs)
def _serialize_msg_args(self, context, kwargs):
"""Helper method called to serialize message arguments.
This calls our serializer on each argument, returning a new
set of args that have been serialized.
:param context: The request context
:param kwargs: The arguments to serialize
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None): def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method. """rpc.call() a remote method.
@ -81,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method. :returns: The return value from the remote method.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic) real_topic = self._get_topic(topic)
try: try:
return rpc.call(context, real_topic, msg, timeout) result = rpc.call(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc: except rpc.common.Timeout as exc:
raise rpc.common.Timeout( raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method')) exc.info, real_topic, msg.get('method'))
@ -104,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive. from the remote method as they arrive.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic) real_topic = self._get_topic(topic)
try: try:
return rpc.multicall(context, real_topic, msg, timeout) result = rpc.multicall(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc: except rpc.common.Timeout as exc:
raise rpc.common.Timeout( raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method')) exc.info, real_topic, msg.get('method'))
@ -124,6 +162,7 @@ class RpcProxy(object):
remote method. remote method.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg) rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None): def fanout_cast(self, context, msg, topic=None, version=None):
@ -139,6 +178,7 @@ class RpcProxy(object):
from the remote method. from the remote method.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg) rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None, def cast_to_server(self, context, server_params, msg, topic=None,
@ -157,6 +197,7 @@ class RpcProxy(object):
return values. return values.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None, def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@ -175,5 +216,6 @@ class RpcProxy(object):
return values. return values.
""" """
self._set_version(msg, version) self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params, rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg) self._get_topic(topic), msg)

View File

@ -0,0 +1,52 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides the definition of an RPC serialization handler"""
import abc
class Serializer(object):
"""Generic (de-)serialization definition base class"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):
"""Serialize something to primitive form.
:param context: Security context
:param entity: Entity to be serialized
:returns: Serialized form of entity
"""
pass
@abc.abstractmethod
def deserialize_entity(self, context, entity):
"""Deserialize something from primitive form.
:param context: Security context
:param entity: Primitive to be deserialized
:returns: Deserialized form of entity
"""
pass
class NoOpSerializer(Serializer):
"""A serializer that does nothing"""
def serialize_entity(self, context, entity):
return entity
def deserialize_entity(self, context, entity):
return entity

View File

@ -52,7 +52,7 @@ class Launcher(object):
""" """
self._services = threadgroup.ThreadGroup() self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled() self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod @staticmethod
def run_service(service): def run_service(service):
@ -72,6 +72,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service) self._services.add_thread(self.run_service, service)
def stop(self): def stop(self):

View File

@ -61,6 +61,13 @@ class ThreadGroup(object):
self.threads = [] self.threads = []
self.timers = [] self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None, def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs): *args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)

View File

@ -143,48 +143,12 @@
# Options defined in ceilometer.openstack.common.db.sqlalchemy.session # Options defined in ceilometer.openstack.common.db.sqlalchemy.session
# #
# The SQLAlchemy connection string used to connect to the
# database (string value)
#sql_connection=sqlite:////ceilometer/openstack/common/db/$sqlite_db
# the filename to use with sqlite (string value) # the filename to use with sqlite (string value)
#sqlite_db=ceilometer.sqlite #sqlite_db=ceilometer.sqlite
# timeout before idle sql connections are reaped (integer # If true, use synchronous mode for sqlite (boolean value)
# value)
#sql_idle_timeout=3600
# If passed, use synchronous mode for sqlite (boolean value)
#sqlite_synchronous=true #sqlite_synchronous=true
# Minimum number of SQL connections to keep open in a pool
# (integer value)
#sql_min_pool_size=1
# Maximum number of SQL connections to keep open in a pool
# (integer value)
#sql_max_pool_size=5
# maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
#sql_max_retries=10
# interval between retries of opening a sql connection
# (integer value)
#sql_retry_interval=10
# If set, use this value for max_overflow with sqlalchemy
# (integer value)
#sql_max_overflow=<None>
# Verbosity of SQL debugging information. 0=None,
# 100=Everything (integer value)
#sql_connection_debug=0
# Add python stack traces to SQL as comment strings (boolean
# value)
#sql_connection_trace=false
# #
# Options defined in ceilometer.openstack.common.eventlet_backdoor # Options defined in ceilometer.openstack.common.eventlet_backdoor
@ -194,6 +158,18 @@
#backdoor_port=<None> #backdoor_port=<None>
#
# Options defined in ceilometer.openstack.common.lockutils
#
# Whether to disable inter-process locks (boolean value)
#disable_process_locking=false
# Directory to use for lock files. Default to a temp directory
# (string value)
#lock_path=<None>
# #
# Options defined in ceilometer.openstack.common.log # Options defined in ceilometer.openstack.common.log
# #
@ -250,9 +226,11 @@
#log_config=<None> #log_config=<None>
# A logging.Formatter log message format string which may use # A logging.Formatter log message format string which may use
# any of the available logging.LogRecord attributes. Default: # any of the available logging.LogRecord attributes. This
# %(default)s (string value) # option is deprecated. Please use
#log_format=%(asctime)s %(levelname)8s [%(name)s] %(message)s # logging_context_format_string and
# logging_default_format_string instead. (string value)
#log_format=<None>
# Format string for %%(asctime)s in log records. Default: # Format string for %%(asctime)s in log records. Default:
# %(default)s (string value) # %(default)s (string value)
@ -474,16 +452,13 @@
# Name of this node. Must be a valid hostname, FQDN, or IP # Name of this node. Must be a valid hostname, FQDN, or IP
# address. Must match "host" option, if running Nova. (string # address. Must match "host" option, if running Nova. (string
# value) # value)
#rpc_zmq_host=nova #rpc_zmq_host=dex
# #
# Options defined in ceilometer.openstack.common.rpc.matchmaker # Options defined in ceilometer.openstack.common.rpc.matchmaker
# #
# Matchmaker ring file (JSON) (string value)
#matchmaker_ringfile=/etc/nova/matchmaker_ring.json
# Heartbeat frequency (integer value) # Heartbeat frequency (integer value)
#matchmaker_heartbeat_freq=300 #matchmaker_heartbeat_freq=300
@ -529,6 +504,61 @@
#port=4952 #port=4952
[database]
#
# Options defined in ceilometer.openstack.common.db.api
#
# The backend to use for db (string value)
#backend=sqlalchemy
# Enable the experimental use of thread pooling for all DB API
# calls (boolean value)
#use_tpool=false
#
# Options defined in ceilometer.openstack.common.db.sqlalchemy.session
#
# The SQLAlchemy connection string used to connect to the
# database (string value)
#connection=sqlite:////common/db/$sqlite_db
# timeout before idle sql connections are reaped (integer
# value)
#idle_timeout=3600
# Minimum number of SQL connections to keep open in a pool
# (integer value)
#min_pool_size=1
# Maximum number of SQL connections to keep open in a pool
# (integer value)
#max_pool_size=5
# maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
#max_retries=10
# interval between retries of opening a sql connection
# (integer value)
#retry_interval=10
# If set, use this value for max_overflow with sqlalchemy
# (integer value)
#max_overflow=<None>
# Verbosity of SQL debugging information. 0=None,
# 100=Everything (integer value)
#connection_debug=0
# Add python stack traces to SQL as comment strings (boolean
# value)
#connection_trace=false
[publisher_meter] [publisher_meter]
# #
@ -586,4 +616,14 @@
#udp_port=4952 #udp_port=4952
# Total option count: 115 [matchmaker_ring]
#
# Options defined in ceilometer.openstack.common.rpc.matchmaker_ring
#
# Matchmaker ring file (JSON) (string value)
#ringfile=/etc/oslo/matchmaker_ring.json
# Total option count: 119

View File

@ -17,4 +17,5 @@ module=rpc
module=service module=service
module=threadgroup module=threadgroup
module=timeutils module=timeutils
module=config
base=ceilometer base=ceilometer

View File

@ -21,5 +21,5 @@ FILES=$(find ceilometer -type f -name "*.py" ! -path "ceilometer/tests/*" -exec
grep -l "Opt(" {} \; | sort -u) grep -l "Opt(" {} \; | sort -u)
PYTHONPATH=./:${PYTHONPATH} \ PYTHONPATH=./:${PYTHONPATH} \
python $(dirname "$0")/extract_opts.py ${FILES} > \ python $(dirname "$0")/../../ceilometer/openstack/common/config/generator.py ${FILES} > \
etc/ceilometer/ceilometer.conf.sample etc/ceilometer/ceilometer.conf.sample