Sync with openstack/oslo-incubator

Revision 234f64d608266f43d8856ff98c89ceba6699d752

Excludes openstack.common.policy

Change-Id: I06d15da024df08cae25cf89e5f1c37520a5f6ac5
This commit is contained in:
Kiall Mac Innes
2014-04-10 21:15:21 +01:00
parent a9a30bd130
commit 2617644513
30 changed files with 743 additions and 314 deletions
+5 -1
View File
@@ -1,4 +1,8 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ ./ $LISTOPT $IDOPTION
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-160} \
${PYTHON:-python} -m subunit.run discover -t ./ ./designate/tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
+17
View File
@@ -0,0 +1,17 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
+12 -1
View File
@@ -25,7 +25,7 @@ import uuid
def generate_request_id():
return 'req-%s' % str(uuid.uuid4())
return b'req-' + str(uuid.uuid4()).encode('ascii')
class RequestContext(object):
@@ -98,3 +98,14 @@ def get_context_from_function_and_args(function, args, kwargs):
return arg
return None
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True
+29 -14
View File
@@ -22,10 +22,11 @@ API methods.
import functools
import logging
import threading
import time
from designate.openstack.common.db import exception
from designate.openstack.common.gettextutils import _ # noqa
from designate.openstack.common.gettextutils import _LE
from designate.openstack.common import importutils
@@ -68,11 +69,11 @@ class wrap_db_retry(object):
return f(*args, **kwargs)
except exception.DBConnectionError as e:
if remaining == 0:
LOG.exception(_('DB exceeded retry limit.'))
LOG.exception(_LE('DB exceeded retry limit.'))
raise exception.DBError(e)
if remaining != -1:
remaining -= 1
LOG.exception(_('DB connection error.'))
LOG.exception(_LE('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so
# this effectively yields the execution
# context to another green thread.
@@ -86,8 +87,9 @@ class wrap_db_retry(object):
class DBAPI(object):
def __init__(self, backend_name, backend_mapping=None, **kwargs):
"""Initialize the choosen DB API backend.
def __init__(self, backend_name, backend_mapping=None, lazy=False,
**kwargs):
"""Initialize the chosen DB API backend.
:param backend_name: name of the backend to load
:type backend_name: str
@@ -95,6 +97,9 @@ class DBAPI(object):
:param backend_mapping: backend name -> module/class to load mapping
:type backend_mapping: dict
:param lazy: load the DB backend lazily on the first DB API method call
:type lazy: bool
Keyword arguments:
:keyword use_db_reconnect: retry DB transactions on disconnect or not
@@ -114,14 +119,13 @@ class DBAPI(object):
"""
if backend_mapping is None:
backend_mapping = {}
self._backend = None
self._backend_name = backend_name
self._backend_mapping = backend_mapping or {}
self._lock = threading.Lock()
# Import the untranslated name if we don't have a
# mapping.
backend_path = backend_mapping.get(backend_name, backend_name)
backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend()
if not lazy:
self._load_backend()
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
self.retry_interval = kwargs.get('retry_interval', 1)
@@ -129,9 +133,20 @@ class DBAPI(object):
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
self.max_retries = kwargs.get('max_retries', 20)
def __getattr__(self, key):
attr = getattr(self.__backend, key)
def _load_backend(self):
with self._lock:
if not self._backend:
# Import the untranslated name if we don't have a mapping
backend_path = self._backend_mapping.get(self._backend_name,
self._backend_name)
backend_mod = importutils.import_module(backend_path)
self._backend = backend_mod.get_backend()
def __getattr__(self, key):
if not self._backend:
self._load_backend()
attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'):
return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
+12 -18
View File
@@ -11,30 +11,25 @@
# under the License.
import copy
import os
from oslo.config import cfg
sqlite_db_opts = [
database_opts = [
cfg.StrOpt('sqlite_db',
deprecated_group='DEFAULT',
default='designate.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
deprecated_group='DEFAULT',
default=True,
help='If True, SQLite uses synchronous mode'),
]
database_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.StrOpt('connection',
default='sqlite:///' +
os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
@@ -44,6 +39,13 @@ database_opts = [
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
@@ -129,7 +131,6 @@ database_opts = [
]
CONF = cfg.CONF
CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
@@ -137,8 +138,7 @@ def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection)
cfg.set_defaults(sqlite_db_opts,
connection=sql_connection,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
@@ -152,12 +152,6 @@ def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
pool_timeout=pool_timeout)
_opts = [
(None, sqlite_db_opts),
('database', database_opts),
]
def list_opts():
"""Returns a list of oslo.config options available in the library.
@@ -174,4 +168,4 @@ def list_opts():
:returns: a list of (group_name, opts) tuples
"""
return [(g, copy.deepcopy(o)) for g, o in _opts]
return [('database', copy.deepcopy(database_opts))]
@@ -168,7 +168,7 @@ def patch_migrate():
sqlite.SQLiteConstraintGenerator)
def db_sync(engine, abs_path, version=None, init_version=0):
def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
@@ -179,7 +179,9 @@ def db_sync(engine, abs_path, version=None, init_version=0):
If None - database will update to the latest
available version.
:param init_version: Initial database version
:param sanity_check: Require schema sanity checking for all tables
"""
if version is not None:
try:
version = int(version)
@@ -189,7 +191,8 @@ def db_sync(engine, abs_path, version=None, init_version=0):
current_version = db_version(engine, abs_path, init_version)
repository = _find_migrate_repo(abs_path)
_db_schema_sanity_check(engine)
if sanity_check:
_db_schema_sanity_check(engine)
if version is None or version > current_version:
return versioning_api.upgrade(engine, repository, version)
else:
@@ -210,8 +213,15 @@ def _db_schema_sanity_check(engine):
'where TABLE_SCHEMA=%s and '
'TABLE_COLLATION NOT LIKE "%%utf8%%"')
table_names = [res[0] for res in engine.execute(onlyutf8_sql,
engine.url.database)]
# NOTE(morganfainberg): exclude the sqlalchemy-migrate and alembic
# versioning tables from the tables we need to verify utf8 status on.
# Non-standard table names are not supported.
EXCLUDED_TABLES = ['migrate_version', 'alembic_version']
table_names = [res[0] for res in
engine.execute(onlyutf8_sql, engine.url.database) if
res[0].lower() not in EXCLUDED_TABLES]
if len(table_names) > 0:
raise ValueError(_('Tables "%s" have non utf8 collation, '
'please make sure all tables are CHARSET=utf8'
@@ -233,7 +243,7 @@ def db_version(engine, abs_path, init_version):
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0 or 'alembic_version' in tables:
db_version_control(abs_path, init_version)
db_version_control(engine, abs_path, version=init_version)
return versioning_api.db_version(engine, repository)
else:
raise exception.DbMigrationError(
@@ -29,7 +29,7 @@ from sqlalchemy.orm import object_mapper
from designate.openstack.common import timeutils
class ModelBase(object):
class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
@@ -78,10 +78,14 @@ class ModelBase(object):
self._i = iter(columns)
return self
def next(self):
# In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):
@@ -16,6 +16,7 @@
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
@@ -26,23 +27,12 @@ import sqlalchemy
from designate.openstack.common.db import exception as exc
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
LOG = logging.getLogger(__name__)
def _gen_credentials(*names):
"""Generate credentials."""
auth_dict = {}
for name in names:
val = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
auth_dict[name] = val
return auth_dict
def _get_engine(uri=SQL_CONNECTION):
def get_engine(uri):
"""Engine creation
By default the uri is SQL_CONNECTION which is admin credentials.
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
@@ -62,50 +52,43 @@ def _execute_sql(engine, sql, driver):
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
raise exc.DBConnectionError(msg % SQL_CONNECTION)
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = _gen_credentials('database', 'user', 'passwd')
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"grant all on %(database)s.* to '%(user)s'@'localhost'"
" identified by '%(passwd)s';",
"create database %(database)s;",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
"create user %(user)s with password '%(passwd)s';",
"create database %(database)s owner %(user)s;",
]
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(engine, sql_query, driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(engine, current_uri):
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = _get_engine(current_uri)
admin_engine = _get_engine()
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
@@ -114,26 +97,11 @@ def drop_database(engine, current_uri):
os.remove(auth['database'])
except OSError:
pass
return
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"drop user '%(user)s'@'localhost';",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
]
}
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(admin_engine, sql_query, driver)
def main():
@@ -172,7 +140,9 @@ def main():
args = parser.parse_args()
engine = _get_engine()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":
@@ -291,7 +291,7 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from designate.openstack.common.db import exception
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LE, _LW
from designate.openstack.common import timeutils
@@ -332,11 +332,20 @@ class SqliteForeignKeysListener(PoolListener):
# 'c1'")
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'")
#
# ibm_db_sa:
# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
# statement, UPDATE statement, or foreign key update caused by a
# DELETE statement are not valid because the primary key, unique
# constraint or unique index identified by "2" constrains table
# "NOVA.KEY_PAIRS" from having duplicate values for the index
# key.
_DUP_KEY_RE_DB = {
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
"ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
}
@@ -358,7 +367,7 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
return [columns]
return columns[len(uniqbase):].split("0")[1:]
if engine_name not in ["mysql", "sqlite", "postgresql"]:
if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
return
# FIXME(johannes): The usage of the .message attribute has been
@@ -373,7 +382,12 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
else:
return
columns = match.group(1)
# NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
# columns so we have to omit that from the DBDuplicateEntry error.
columns = ''
if engine_name != 'ibm_db_sa':
columns = match.group(1)
if engine_name == "sqlite":
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
@@ -414,13 +428,14 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f):
#TODO(rpodolyaka): in a subsequent commit make this a class decorator to
# ensure it can only applied to Session subclasses instances (as we use
# Session instance bind attribute below)
@functools.wraps(f)
def _wrap(self, *args, **kwargs):
try:
assert issubclass(
self.__class__, sqlalchemy.orm.session.Session
), ('_wrap_db_error() can only be applied to methods of '
'subclasses of sqlalchemy.orm.session.Session.')
return f(self, *args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
@@ -442,7 +457,7 @@ def _wrap_db_error(f):
_raise_if_duplicate_entry_error(e, self.bind.dialect.name)
raise exception.DBError(e)
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
LOG.exception(_LE('DB exception wrapped.'))
raise exception.DBError(e)
return _wrap
@@ -488,22 +503,78 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex
msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg)
# if the database server has gone away, all connections in the pool
# have become invalid and we can safely close all of them here,
# rather than waste time on checking of every single connection
engine.dispose()
# this will be handled by SQLAlchemy and will force it to create
# a new connection and retry the original action
raise sqla_exc.DisconnectionError(msg)
else:
raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
"""Set engine mode to 'traditional'.
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
"""Set the sql_mode session variable.
Required to prevent silent truncates at insert or update operations
under MySQL. By default MySQL truncates inserted string if it longer
than a declared field just with warning. That is fraught with data
corruption.
MySQL supports several server modes. The default is None, but sessions
may choose to enable server modes like TRADITIONAL, ANSI,
several STRICT_* modes and others.
Note: passing in '' (empty string) for sql_mode clears
the SQL mode for the session, overriding a potentially set
server default.
"""
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
def _mysql_get_effective_sql_mode(engine):
"""Returns the effective SQL mode for connections from the engine pool.
Returns ``None`` if the mode isn't available, otherwise returns the mode.
"""
# Get the real effective SQL mode. Even when unset by
# our own config, the server may still be operating in a specific
# SQL mode as set by the server configuration.
# Also note that the checkout listener will be called on execute to
# set the mode if it's registered.
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
if row is None:
return
return row[1]
def _mysql_check_effective_sql_mode(engine):
"""Logs a message based on the effective SQL mode for MySQL connections."""
realmode = _mysql_get_effective_sql_mode(engine)
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
return
LOG.debug('MySQL server mode set to %s', realmode)
# 'TRADITIONAL' mode enables several other modes, so
# we need a substring match here
if not ('TRADITIONAL' in realmode.upper() or
'STRICT_ALL_TABLES' in realmode.upper()):
LOG.warning(_LW("MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
def _mysql_set_mode_callback(engine, sql_mode):
if sql_mode is not None:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=sql_mode)
sqlalchemy.event.listen(engine, 'connect', mode_callback)
_mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args):
@@ -530,8 +601,8 @@ def _raise_if_db_connection_lost(error, engine):
raise exception.DBConnectionError(error)
def create_engine(sql_connection, sqlite_fk=False,
mysql_traditional_mode=False, idle_timeout=3600,
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10):
@@ -541,15 +612,18 @@ def create_engine(sql_connection, sqlite_fk=False,
engine_args = {
"pool_recycle": idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
logger = logging.getLogger('sqlalchemy.engine')
# Map SQL debug level to Python log level
if connection_debug >= 100:
engine_args['echo'] = 'debug'
logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
engine_args['echo'] = True
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
@@ -572,18 +646,11 @@ def create_engine(sql_connection, sqlite_fk=False,
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if engine.name in ['mysql', 'ibm_db_sa']:
callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
ping_callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql':
if mysql_traditional_mode:
sqlalchemy.event.listen(engine, 'checkout',
_set_mode_traditional)
else:
LOG.warning(_("This application has not enabled MySQL "
"traditional mode, which means silent "
"data corruption may occur. "
"Please encourage the application "
"developers to enable this mode."))
if mysql_sql_mode:
_mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
@@ -603,7 +670,7 @@ def create_engine(sql_connection, sqlite_fk=False,
if remaining == -1:
remaining = 'infinite'
while True:
msg = _('SQL connection failed. %s attempts left.')
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg % remaining)
if remaining != 'infinite':
remaining -= 1
@@ -714,25 +781,23 @@ class EngineFacade(object):
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection,
sqlite_fk=False, mysql_traditional_mode=False,
autocommit=True, expire_on_commit=False, **kwargs):
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param mysql_traditional_mode: enable traditional mode in MySQL
:type mysql_traditional_mode: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
@@ -741,6 +806,8 @@ class EngineFacade(object):
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
@@ -768,12 +835,12 @@ class EngineFacade(object):
self._engine = create_engine(
sql_connection=sql_connection,
sqlite_fk=sqlite_fk,
mysql_traditional_mode=mysql_traditional_mode,
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
idle_timeout=kwargs.get('idle_timeout', 3600),
connection_debug=kwargs.get('connection_debug', 0),
max_pool_size=kwargs.get('max_pool_size', None),
max_overflow=kwargs.get('max_overflow', None),
pool_timeout=kwargs.get('pool_timeout', None),
max_pool_size=kwargs.get('max_pool_size'),
max_overflow=kwargs.get('max_overflow'),
pool_timeout=kwargs.get('pool_timeout'),
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
connection_trace=kwargs.get('connection_trace', False),
max_retries=kwargs.get('max_retries', 10),
@@ -807,3 +874,31 @@ class EngineFacade(object):
del kwargs[arg]
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, connection_string, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param connection_string: SQLAlchemy connection string
:type connection_string: string
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(sql_connection=connection_string,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
**dict(conf.database.items()))
@@ -22,6 +22,7 @@ import six
from designate.openstack.common.db.sqlalchemy import session
from designate.openstack.common.db.sqlalchemy import utils
from designate.openstack.common.fixture import lockutils
from designate.openstack.common import test
@@ -120,6 +121,9 @@ class OpportunisticTestCase(DbTestCase):
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
# TODO(bnemec): Remove this once infra is ready for
# https://review.openstack.org/#/c/74963/ to merge.
self.useFixture(lockutils.LockFixture('opportunistic-db'))
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,
@@ -26,7 +26,7 @@ import sqlalchemy
import sqlalchemy.exc
from designate.openstack.common.db.sqlalchemy import utils
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LE
from designate.openstack.common import test
LOG = logging.getLogger(__name__)
@@ -60,10 +60,10 @@ def _set_db_lock(lock_path=None, lock_prefix=None):
path = lock_path or os.environ.get("DESIGNATE_LOCK_PATH")
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
with lock:
LOG.debug(_('Got lock "%s"') % f.__name__)
LOG.debug('Got lock "%s"' % f.__name__)
return f(*args, **kwargs)
finally:
LOG.debug(_('Lock released "%s"') % f.__name__)
LOG.debug('Lock released "%s"' % f.__name__)
return wrapper
return decorator
@@ -264,6 +264,6 @@ class WalkVersionsMixin(object):
if check:
check(engine, data)
except Exception:
LOG.error("Failed to migrate to version %s on engine %s" %
LOG.error(_LE("Failed to migrate to version %s on engine %s") %
(version, engine))
raise
+111 -11
View File
@@ -19,7 +19,6 @@
import logging
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
@@ -30,14 +29,16 @@ from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from designate.openstack.common.gettextutils import _
from designate.openstack.common import context as request_context
from designate.openstack.common.db.sqlalchemy import models
from designate.openstack.common.gettextutils import _, _LI, _LW
from designate.openstack.common import timeutils
@@ -93,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id
LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs))
@@ -156,6 +157,98 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
return query
def _read_deleted_filter(query, db_model, read_deleted):
if 'deleted' not in db_model.__table__.columns:
raise ValueError(_("There is no `deleted` column in `%s` table. "
"Project doesn't use soft-deleted feature.")
% db_model.__name__)
default_deleted_value = db_model.__table__.c.deleted.default.arg
if read_deleted == 'no':
query = query.filter(db_model.deleted == default_deleted_value)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter(db_model.deleted != default_deleted_value)
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
return query
def _project_filter(query, db_model, context, project_only):
if project_only and 'project_id' not in db_model.__table__.columns:
raise ValueError(_("There is no `project_id` column in `%s` table.")
% db_model.__name__)
if request_context.is_user_context(context) and project_only:
if project_only == 'allow_none':
is_none = None
query = query.filter(or_(db_model.project_id == context.project_id,
db_model.project_id == is_none))
else:
query = query.filter(db_model.project_id == context.project_id)
return query
def model_query(context, model, session, args=None, project_only=False,
read_deleted=None):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: Model to query. Must be a subclass of ModelBase.
:type model: models.ModelBase
:param session: The session to use.
:type session: sqlalchemy.orm.session.Session
:param args: Arguments to query. If None - model is used.
:type args: tuple
:param project_only: If present and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
:type project_only: bool
:param read_deleted: If present, overrides context's read_deleted field.
:type read_deleted: bool
Usage:
..code:: python
result = (utils.model_query(context, models.Instance, session=session)
.filter_by(uuid=instance_uuid)
.all())
query = utils.model_query(
context, Node,
session=session,
args=(func.count(Node.id), func.sum(Node.ram))
).filter_by(project_id=project_id)
"""
if not read_deleted:
if hasattr(context, 'read_deleted'):
# NOTE(viktors): some projects use `read_deleted` attribute in
# their contexts instead of `show_deleted`.
read_deleted = context.read_deleted
else:
read_deleted = context.show_deleted
if not issubclass(model, models.ModelBase):
raise TypeError(_("model should be a subclass of ModelBase"))
query = session.query(model) if not args else session.query(*args)
query = _read_deleted_filter(query, model, read_deleted)
query = _project_filter(query, model, context, project_only)
return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
@@ -207,6 +300,10 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
DEPRECATED: this function is deprecated and will be removed from designate.db
in a few releases. Please use UniqueConstraint.drop() method directly for
sqlalchemy-migrate migration scripts.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
@@ -223,6 +320,8 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
types by sqlite. For example BigInteger.
"""
from migrate.changeset import UniqueConstraint
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
@@ -262,9 +361,9 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
duplicated_rows_select = sqlalchemy.sql.select(
columns_for_select, group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
@@ -274,10 +373,11 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
rows_to_delete_select = sqlalchemy.sql.select(
[table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
@@ -385,7 +485,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
migrate_engine.execute(ins)
table.drop()
@@ -29,7 +29,7 @@ import eventlet.backdoor
import greenlet
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LI
from designate.openstack.common import log as logging
help_for_backdoor_port = (
@@ -137,8 +137,10 @@ def initialize_if_enabled():
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()})
LOG.info(
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()}
)
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port
+23 -9
View File
@@ -24,7 +24,7 @@ import traceback
import six
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LE
class save_and_reraise_exception(object):
@@ -49,9 +49,22 @@ class save_and_reraise_exception(object):
decide_if_need_reraise()
if not should_be_reraised:
ctxt.reraise = False
If another exception occurs and reraise flag is False,
the saved exception will not be logged.
If the caller wants to raise new exception during exception handling
he/she sets reraise to False initially with an ability to set it back to
True if needed::
except Exception:
with save_and_reraise_exception(reraise=False) as ctxt:
[if statements to determine whether to raise a new exception]
# Not raising a new exception, so reraise
ctxt.reraise = True
"""
def __init__(self):
self.reraise = True
def __init__(self, reraise=True):
self.reraise = reraise
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
@@ -59,10 +72,11 @@ class save_and_reraise_exception(object):
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
if self.reraise:
logging.error(_LE('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
return False
if self.reraise:
six.reraise(self.type_, self.value, self.tb)
@@ -88,8 +102,8 @@ def forever_retry_uncaught_exceptions(infunc):
if (cur_time - last_log_time > 60 or
this_exc_message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
_LE('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = this_exc_message
exc_count = 0
+1 -2
View File
@@ -19,7 +19,6 @@ import os
import tempfile
from designate.openstack.common import excutils
from designate.openstack.common.gettextutils import _
from designate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@@ -59,7 +58,7 @@ def read_cached_file(filename, force_reload=False):
cache_info = _FILE_CACHE.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
LOG.debug(_("Reloading cached file %s") % filename)
LOG.debug("Reloading cached file %s" % filename)
with open(filename) as fap:
cache_info['data'] = fap.read()
cache_info['mtime'] = mtime
@@ -48,4 +48,4 @@ class LockFixture(fixtures.Fixture):
def setUp(self):
super(LockFixture, self).setUp()
self.addCleanup(self.mgr.__exit__, None, None, None)
self.mgr.__enter__()
self.lock = self.mgr.__enter__()
@@ -15,6 +15,17 @@
# License for the specific language governing permissions and limitations
# under the License.
##############################################################################
##############################################################################
##
## DO NOT MODIFY THIS FILE
##
## This file is being graduated to the designatetest library. Please make all
## changes there, and only backport critical fixes here. - dhellmann
##
##############################################################################
##############################################################################
import fixtures
import mock
@@ -15,8 +15,19 @@
# License for the specific language governing permissions and limitations
# under the License.
##############################################################################
##############################################################################
##
## DO NOT MODIFY THIS FILE
##
## This file is being graduated to the designatetest library. Please make all
## changes there, and only backport critical fixes here. - dhellmann
##
##############################################################################
##############################################################################
import fixtures
from six.moves import mox # noqa
from six.moves import mox
class MoxStubout(fixtures.Fixture):
+17 -42
View File
@@ -28,7 +28,6 @@ import gettext
import locale
from logging import handlers
import os
import re
from babel import localedata
import six
@@ -248,47 +247,22 @@ class Message(six.text_type):
if other is None:
params = (other,)
elif isinstance(other, dict):
params = self._trim_dictionary_parameters(other)
# Merge the dictionaries
# Copy each item in case one does not support deep copy.
params = {}
if isinstance(self.params, dict):
for key, val in self.params.items():
params[key] = self._copy_param(val)
for key, val in other.items():
params[key] = self._copy_param(val)
else:
params = self._copy_param(other)
return params
def _trim_dictionary_parameters(self, dict_param):
"""Return a dict that only has matching entries in the msgid."""
# NOTE(luisg): Here we trim down the dictionary passed as parameters
# to avoid carrying a lot of unnecessary weight around in the message
# object, for example if someone passes in Message() % locals() but
# only some params are used, and additionally we prevent errors for
# non-deepcopyable objects by unicoding() them.
# Look for %(param) keys in msgid;
# Skip %% and deal with the case where % is first character on the line
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
# If we don't find any %(param) keys but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
# Apparently the full dictionary is the parameter
params = self._copy_param(dict_param)
else:
params = {}
# Save our existing parameters as defaults to protect
# ourselves from losing values if we are called through an
# (erroneous) chain that builds a valid Message with
# arguments, and then does something like "msg % kwds"
# where kwds is an empty dictionary.
src = {}
if isinstance(self.params, dict):
src.update(self.params)
src.update(dict_param)
for key in keys:
params[key] = self._copy_param(src[key])
return params
def _copy_param(self, param):
try:
return copy.deepcopy(param)
except TypeError:
except Exception:
# Fallback to casting to unicode this will handle the
# python code-like objects that can't be deep-copied
return six.text_type(param)
@@ -300,13 +274,14 @@ class Message(six.text_type):
def __radd__(self, other):
return self.__add__(other)
def __str__(self):
# NOTE(luisg): Logging in python 2.6 tries to str() log records,
# and it expects specifically a UnicodeError in order to proceed.
msg = _('Message objects do not support str() because they may '
'contain non-ascii characters. '
'Please use unicode() or translate() instead.')
raise UnicodeError(msg)
if six.PY2:
def __str__(self):
# NOTE(luisg): Logging in python 2.6 tries to str() log records,
# and it expects specifically a UnicodeError in order to proceed.
msg = _('Message objects do not support str() because they may '
'contain non-ascii characters. '
'Please use unicode() or translate() instead.')
raise UnicodeError(msg)
def get_available_languages(domain):
+1 -9
View File
@@ -36,17 +36,9 @@ import functools
import inspect
import itertools
import json
try:
import xmlrpclib
except ImportError:
# NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3
# however the function and object call signatures
# remained the same. This whole try/except block should
# be removed and replaced with a call to six.moves once
# six 1.4.2 is released. See http://bit.ly/1bqrVzu
import xmlrpc.client as xmlrpclib
import six
import six.moves.xmlrpc_client as xmlrpclib
from designate.openstack.common import gettextutils
from designate.openstack.common import importutils
+111 -34
View File
@@ -15,6 +15,7 @@
import contextlib
import errno
import fcntl
import functools
import os
import shutil
@@ -28,7 +29,7 @@ import weakref
from oslo.config import cfg
from designate.openstack.common import fileutils
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _, _LE, _LI
from designate.openstack.common import log as logging
@@ -52,7 +53,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
@@ -79,7 +80,7 @@ class _InterProcessLock(object):
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_('Created lock path: %s'), basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w')
@@ -90,7 +91,7 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
LOG.debug(_('Got file lock "%s"'), self.fname)
LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
@@ -114,14 +115,17 @@ class _InterProcessLock(object):
try:
self.unlock()
self.lockfile.close()
LOG.debug(_('Released file lock "%s"'), self.fname)
LOG.debug('Released file lock "%s"', self.fname)
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self):
raise NotImplementedError()
@@ -129,7 +133,7 @@ class _InterProcessLock(object):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@@ -137,7 +141,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@@ -145,35 +149,106 @@ class _PosixLock(_InterProcessLock):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
class _PosixLock(object):
def __init__(self, name):
# Hash the name because it's not valid to have POSIX semaphore
# names with things like / in them. Then use base64 to encode
# the digest() instead taking the hexdigest() because the
# result is shorter and most systems can't have shm sempahore
# names longer than 31 characters.
h = hashlib.sha1()
h.update(name.encode('ascii'))
self.name = str((b'/' + base64.urlsafe_b64encode(
h.digest())).decode('ascii'))
def acquire(self, timeout=None):
self.semaphore = posix_ipc.Semaphore(self.name,
flags=posix_ipc.O_CREAT,
initial_value=1)
self.semaphore.acquire(timeout)
return self
def __enter__(self):
self.acquire()
return self
def release(self):
self.semaphore.release()
self.semaphore.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
try:
semaphore = posix_ipc.Semaphore(self.name)
except posix_ipc.ExistentialError:
return False
else:
semaphore.close()
return True
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
FileLock = _WindowsLock
else:
import fcntl
import base64
import hashlib
import posix_ipc
InterProcessLock = _PosixLock
FileLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def external_lock(name, lock_file_prefix=None):
with internal_lock(name):
LOG.debug(_('Attempting to grab external lock "%(lock)s"'),
{'lock': name})
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not CONF.lock_path:
if not local_lock_path:
# NOTE(bnemec): Create a fake lock path for posix locks so we don't
# unnecessarily raise the RequiredOptError below.
if InterProcessLock is not _PosixLock:
raise cfg.RequiredOptError('lock_path')
local_lock_path = 'posixlock:/'
lock_file_path = os.path.join(CONF.lock_path, name)
return os.path.join(local_lock_path, name)
return InterProcessLock(lock_file_path)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
# NOTE(bnemec): If an explicit lock_path was passed to us then it
# means the caller is relying on file-based locking behavior, so
# we can't use posix locks for those calls.
if lock_path:
return FileLock(lock_file_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove a external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
@@ -184,12 +259,12 @@ def internal_lock(name):
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
LOG.debug('Got semaphore "%(lock)s"', {'lock': name})
return sem
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False):
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@@ -204,15 +279,17 @@ def lock(name, lock_file_prefix=None, external=False):
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
"""
if external and not CONF.disable_process_locking:
lock = external_lock(name, lock_file_prefix)
else:
lock = internal_lock(name)
with lock:
yield lock
int_lock = internal_lock(name)
with int_lock:
if external and not CONF.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
with ext_lock:
yield ext_lock
else:
yield int_lock
def synchronized(name, lock_file_prefix=None, external=False):
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@@ -240,12 +317,12 @@ def synchronized(name, lock_file_prefix=None, external=False):
@functools.wraps(f)
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap
+5 -4
View File
@@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Openstack logging handler.
"""OpenStack logging handler.
This module adds to logging functionality by adding the option to specify
a context object when calling the various log methods. If the context object
@@ -163,6 +163,7 @@ log_opts = [
'qpid=WARN',
'sqlalchemy=WARN',
'suds=INFO',
'oslo.messaging=INFO',
'iso8601=WARN',
'requests.packages.urllib3.connectionpool=WARN'
],
@@ -357,7 +358,7 @@ class ContextAdapter(BaseLoggerAdapter):
extra.update(_dictify_context(context))
instance = kwargs.pop('instance', None)
instance_uuid = (extra.get('instance_uuid', None) or
instance_uuid = (extra.get('instance_uuid') or
kwargs.pop('instance_uuid', None))
instance_extra = ''
if instance:
@@ -650,11 +651,11 @@ class ContextFormatter(logging.Formatter):
# NOTE(sdague): default the fancier formatting params
# to an empty string so we don't throw an exception if
# they get used
for key in ('instance', 'color'):
for key in ('instance', 'color', 'user_identity'):
if key not in record.__dict__:
record.__dict__[key] = ''
if record.__dict__.get('request_id', None):
if record.__dict__.get('request_id'):
self._fmt = CONF.logging_context_format_string
else:
self._fmt = CONF.logging_default_format_string
+6 -6
View File
@@ -20,7 +20,7 @@ import sys
from eventlet import event
from eventlet import greenthread
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LE, _LW
from designate.openstack.common import log as logging
from designate.openstack.common import timeutils
@@ -79,14 +79,14 @@ class FixedIntervalLoopingCall(LoopingCallBase):
break
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
LOG.warn(_LW('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in fixed duration looping call'))
LOG.exception(_LE('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
@@ -126,14 +126,14 @@ class DynamicLoopingCall(LoopingCallBase):
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
LOG.debug('Dynamic looping call sleeping for %.02f '
'seconds', idle)
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in dynamic looping call'))
LOG.exception(_LE('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
+6 -5
View File
@@ -151,7 +151,8 @@ def execute(*cmd, **kwargs):
while attempts > 0:
attempts -= 1
try:
LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd))
LOG.log(loglevel, 'Running cmd (subprocess): %s',
' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
@@ -184,7 +185,7 @@ def execute(*cmd, **kwargs):
break
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
LOG.log(loglevel, _('Result was %s') % _returncode)
LOG.log(loglevel, 'Result was %s' % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
@@ -196,7 +197,7 @@ def execute(*cmd, **kwargs):
if not attempts:
raise
else:
LOG.log(loglevel, _('%r failed. Retrying.'), cmd)
LOG.log(loglevel, '%r failed. Retrying.', cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
@@ -235,7 +236,7 @@ def trycmd(*args, **kwargs):
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
LOG.debug('Running cmd (SSH): %s', cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
@@ -256,7 +257,7 @@ def ssh_execute(ssh, cmd, process_input=None,
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
LOG.debug('Result was %s' % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
+32 -27
View File
@@ -38,9 +38,10 @@ from eventlet import event
from oslo.config import cfg
from designate.openstack.common import eventlet_backdoor
from designate.openstack.common.gettextutils import _
from designate.openstack.common.gettextutils import _LE, _LI, _LW
from designate.openstack.common import importutils
from designate.openstack.common import log as logging
from designate.openstack.common import systemd
from designate.openstack.common import threadgroup
@@ -163,7 +164,7 @@ class ServiceLauncher(Launcher):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG)
try:
@@ -172,7 +173,7 @@ class ServiceLauncher(Launcher):
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
@@ -184,7 +185,7 @@ class ServiceLauncher(Launcher):
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
LOG.exception(_LE('Exception during rpc cleanup.'))
return status, signo
@@ -235,7 +236,7 @@ class ProcessLauncher(object):
# dies unexpectedly
self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting'))
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
sys.exit(1)
@@ -266,13 +267,13 @@ class ProcessLauncher(object):
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
LOG.exception(_LE('Unhandled exception'))
status = 2
finally:
launcher.stop()
@@ -305,7 +306,7 @@ class ProcessLauncher(object):
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
LOG.info(_LI('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
@@ -324,7 +325,7 @@ class ProcessLauncher(object):
os._exit(status)
LOG.info(_('Started child %d'), pid)
LOG.info(_LI('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
@@ -334,7 +335,7 @@ class ProcessLauncher(object):
def launch_service(self, service, workers=1):
wrap = ServiceWrapper(service, workers)
LOG.info(_('Starting %d workers'), wrap.workers)
LOG.info(_LI('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
@@ -351,15 +352,15 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)s exited with status %(code)d'),
LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
LOG.warning(_LW('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
@@ -381,22 +382,25 @@ class ProcessLauncher(object):
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if not _is_sighup_and_daemon(self.sigcaught):
break
try:
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = _signo_to_signame(self.sigcaught)
LOG.info(_LI('Caught %s, stopping children'), signame)
if not _is_sighup_and_daemon(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
except eventlet.greenlet.GreenletExit:
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
for pid in self.children:
try:
@@ -407,7 +411,7 @@ class ProcessLauncher(object):
# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
@@ -484,6 +488,7 @@ class Services(object):
"""
service.start()
systemd.notify_once()
done.wait()
+3 -3
View File
@@ -24,15 +24,15 @@ ssl_opts = [
cfg.StrOpt('ca_file',
default=None,
help="CA certificate file to use to verify "
"connecting clients"),
"connecting clients."),
cfg.StrOpt('cert_file',
default=None,
help="Certificate file to use when starting "
"the server securely"),
"the server securely."),
cfg.StrOpt('key_file',
default=None,
help="Private key file to use when starting "
"the server securely"),
"the server securely."),
]
+5 -4
View File
@@ -98,7 +98,8 @@ def bool_from_string(subject, strict=False, default=False):
def safe_decode(text, incoming=None, errors='strict'):
"""Decodes incoming str using `incoming` if they're not already unicode.
"""Decodes incoming text/bytes string using `incoming` if they're not
already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
@@ -107,7 +108,7 @@ def safe_decode(text, incoming=None, errors='strict'):
representation of it.
:raises TypeError: If text is not an instance of str
"""
if not isinstance(text, six.string_types):
if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, six.text_type):
@@ -137,7 +138,7 @@ def safe_decode(text, incoming=None, errors='strict'):
def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'):
"""Encodes incoming str/unicode using `encoding`.
"""Encodes incoming text/bytes string using `encoding`.
If incoming is not specified, text is expected to be encoded with
current python's default encoding. (`sys.getdefaultencoding`)
@@ -150,7 +151,7 @@ def safe_encode(text, incoming=None,
representation of it.
:raises TypeError: If text is not an instance of str
"""
if not isinstance(text, six.string_types):
if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
+104
View File
@@ -0,0 +1,104 @@
# Copyright 2012-2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helper module for systemd service readiness notification.
"""
import os
import socket
import sys
from designate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def _abstractify(socket_name):
if socket_name.startswith('@'):
# abstract namespace socket
socket_name = '\0%s' % socket_name[1:]
return socket_name
def _sd_notify(unset_env, msg):
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
sock.connect(_abstractify(notify_socket))
sock.sendall(msg)
if unset_env:
del os.environ['NOTIFY_SOCKET']
except EnvironmentError:
LOG.debug("Systemd notification failed", exc_info=True)
finally:
sock.close()
def notify():
"""Send notification to Systemd that service is ready.
For details see
http://www.freedesktop.org/software/systemd/man/sd_notify.html
"""
_sd_notify(False, 'READY=1')
def notify_once():
"""Send notification once to Systemd that service is ready.
Systemd sets NOTIFY_SOCKET environment variable with the name of the
socket listening for notifications from services.
This method removes the NOTIFY_SOCKET environment variable to ensure
notification is sent only once.
"""
_sd_notify(True, 'READY=1')
def onready(notify_socket, timeout):
"""Wait for systemd style notification on the socket.
:param notify_socket: local socket address
:type notify_socket: string
:param timeout: socket timeout
:type timeout: float
:returns: 0 service ready
1 service not ready
2 timeout occured
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.bind(_abstractify(notify_socket))
try:
msg = sock.recv(512)
except socket.timeout:
return 2
finally:
sock.close()
if 'READY=1' in msg:
return 0
else:
return 1
if __name__ == '__main__':
# simple CLI for testing
if len(sys.argv) == 1:
notify()
elif len(sys.argv) >= 2:
timeout = float(sys.argv[1])
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
retval = onready(notify_socket, timeout)
sys.exit(retval)
+11
View File
@@ -13,6 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
##############################################################################
##############################################################################
##
## DO NOT MODIFY THIS FILE
##
## This file is being graduated to the designatetest library. Please make all
## changes there, and only backport critical fixes here. - dhellmann
##
##############################################################################
##############################################################################
"""Common utilities used in testing"""
import logging
+1
View File
@@ -23,3 +23,4 @@ sqlalchemy-migrate>=0.8.2,!=0.8.4
stevedore>=0.14
WebOb>=1.2.3
dnspython>=1.9.4
posix_ipc