diff --git a/cinder/db/api.py b/cinder/db/api.py index b81b04a3811..de2664b7edb 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -41,6 +41,7 @@ these objects be simple dictionaries. """ +from eventlet import tpool from oslo.config import cfg from cinder.openstack.common.db import api as db_api @@ -68,12 +69,52 @@ db_opts = [ default='backup-%s', help='Template string to be used to generate backup names'), ] +tpool_opts = [ + cfg.BoolOpt('use_tpool', + default=False, + deprecated_name='dbapi_use_tpool', + deprecated_group='DEFAULT', + help='Enable the experimental use of thread pooling for ' + 'all DB API calls'), +] + + CONF = cfg.CONF CONF.register_opts(db_opts) +CONF.register_opts(tpool_opts, 'database') +CONF.import_opt('backend', 'cinder.openstack.common.db.options', + group='database') _BACKEND_MAPPING = {'sqlalchemy': 'cinder.db.sqlalchemy.api'} -IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING) + +class CinderDBAPI(object): + """Cinder's DB API wrapper class. + + This wraps the oslo DB API with an option to be able to use eventlet's + thread pooling. Since the CONF variable may not be loaded at the time + this class is instantiated, we must look at it on the first DB API call. + """ + + def __init__(self): + self.__db_api = None + + @property + def _db_api(self): + if not self.__db_api: + cinder_db_api = db_api.DBAPI(CONF.database.backend, + backend_mapping=_BACKEND_MAPPING) + if CONF.database.use_tpool: + self.__db_api = tpool.Proxy(cinder_db_api) + else: + self.__db_api = cinder_db_api + return self.__db_api + + def __getattr__(self, key): + return getattr(self._db_api, key) + + +IMPL = CinderDBAPI() ################### diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 219740b1c8f..fd6e74bc508 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -20,6 +20,7 @@ import sys +import threading import uuid import warnings @@ -35,6 +36,7 @@ from cinder.common import sqlalchemyutils from cinder.db.sqlalchemy import models from cinder import exception from cinder.openstack.common.db import exception as db_exc +from cinder.openstack.common.db import options from cinder.openstack.common.db.sqlalchemy import session as db_session from cinder.openstack.common import log as logging from cinder.openstack.common import timeutils @@ -44,11 +46,33 @@ from cinder.openstack.common import uuidutils CONF = cfg.CONF LOG = logging.getLogger(__name__) -db_session.set_defaults(sql_connection='sqlite:///$state_path/$sqlite_db', - sqlite_db='cinder.sqlite') +options.set_defaults(sql_connection='sqlite:///$state_path/$sqlite_db', + sqlite_db='cinder.sqlite') -get_engine = db_session.get_engine -get_session = db_session.get_session +_LOCK = threading.Lock() +_FACADE = None + + +def _create_facade_lazily(): + global _LOCK + with _LOCK: + global _FACADE + if _FACADE is None: + _FACADE = db_session.EngineFacade( + CONF.database.connection, + **dict(CONF.database.iteritems()) + ) + return _FACADE + + +def get_engine(): + facade = _create_facade_lazily() + return facade.get_engine() + + +def get_session(**kwargs): + facade = _create_facade_lazily() + return facade.get_session(**kwargs) _DEFAULT_QUOTA_NAME = 'default' @@ -361,8 +385,11 @@ def service_create(context, values): service_ref.update(values) if not CONF.enable_new_services: service_ref.disabled = True - service_ref.save() - return service_ref + + session = get_session() + with session.begin(): + service_ref.save(session) + return service_ref @require_admin_context @@ -422,11 +449,13 @@ def iscsi_target_create_safe(context, values): for (key, value) in values.iteritems(): iscsi_target_ref[key] = value - try: - iscsi_target_ref.save() - return iscsi_target_ref - except IntegrityError: - return None + session = get_session() + with session.begin(): + try: + iscsi_target_ref.save(session) + return iscsi_target_ref + except IntegrityError: + return None ################### @@ -472,8 +501,11 @@ def quota_create(context, project_id, resource, limit): quota_ref.project_id = project_id quota_ref.resource = resource quota_ref.hard_limit = limit - quota_ref.save() - return quota_ref + + session = get_session() + with session.begin(): + quota_ref.save(session) + return quota_ref @require_admin_context @@ -548,8 +580,11 @@ def quota_class_create(context, class_name, resource, limit): quota_class_ref.class_name = class_name quota_class_ref.resource = resource quota_class_ref.hard_limit = limit - quota_class_ref.save() - return quota_class_ref + + session = get_session() + with session.begin(): + quota_class_ref.save(session) + return quota_class_ref @require_admin_context @@ -2591,8 +2626,11 @@ def backup_create(context, values): if not values.get('id'): values['id'] = str(uuid.uuid4()) backup.update(values) - backup.save() - return backup + + session = get_session() + with session.begin(): + backup.save(session) + return backup @require_context diff --git a/cinder/db/sqlalchemy/models.py b/cinder/db/sqlalchemy/models.py index 5a29e6852db..c4342427dbf 100644 --- a/cinder/db/sqlalchemy/models.py +++ b/cinder/db/sqlalchemy/models.py @@ -47,7 +47,7 @@ class CinderBase(models.TimestampMixin, deleted = Column(Boolean, default=False) metadata = None - def delete(self, session=None): + def delete(self, session): """Delete this object.""" self.deleted = True self.deleted_at = timeutils.utcnow() diff --git a/cinder/openstack/common/context.py b/cinder/openstack/common/context.py index 7c5f6a76a66..09019ee3843 100644 --- a/cinder/openstack/common/context.py +++ b/cinder/openstack/common/context.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # @@ -23,12 +21,11 @@ context or provide additional information in their specific WSGI pipeline. """ import itertools - -from cinder.openstack.common import uuidutils +import uuid def generate_request_id(): - return 'req-%s' % uuidutils.generate_uuid() + return 'req-%s' % str(uuid.uuid4()) class RequestContext(object): @@ -39,29 +36,49 @@ class RequestContext(object): accesses the system, as well as additional request information. """ - def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, - read_only=False, show_deleted=False, request_id=None): + user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}' + + def __init__(self, auth_token=None, user=None, tenant=None, domain=None, + user_domain=None, project_domain=None, is_admin=False, + read_only=False, show_deleted=False, request_id=None, + instance_uuid=None): self.auth_token = auth_token self.user = user self.tenant = tenant + self.domain = domain + self.user_domain = user_domain + self.project_domain = project_domain self.is_admin = is_admin self.read_only = read_only self.show_deleted = show_deleted + self.instance_uuid = instance_uuid if not request_id: request_id = generate_request_id() self.request_id = request_id def to_dict(self): + user_idt = ( + self.user_idt_format.format(user=self.user or '-', + tenant=self.tenant or '-', + domain=self.domain or '-', + user_domain=self.user_domain or '-', + p_domain=self.project_domain or '-')) + return {'user': self.user, 'tenant': self.tenant, + 'domain': self.domain, + 'user_domain': self.user_domain, + 'project_domain': self.project_domain, 'is_admin': self.is_admin, 'read_only': self.read_only, 'show_deleted': self.show_deleted, 'auth_token': self.auth_token, - 'request_id': self.request_id} + 'request_id': self.request_id, + 'instance_uuid': self.instance_uuid, + 'user_identity': user_idt} -def get_admin_context(show_deleted="no"): +def get_admin_context(show_deleted=False): context = RequestContext(None, tenant=None, is_admin=True, @@ -81,3 +98,14 @@ def get_context_from_function_and_args(function, args, kwargs): return arg return None + + +def is_user_context(context): + """Indicates if the request context is a normal user.""" + if not context: + return False + if context.is_admin: + return False + if not context.user_id or not context.project_id: + return False + return True diff --git a/cinder/openstack/common/db/__init__.py b/cinder/openstack/common/db/__init__.py index 1b9b60dec10..e69de29bb2d 100644 --- a/cinder/openstack/common/db/__init__.py +++ b/cinder/openstack/common/db/__init__.py @@ -1,16 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/cinder/openstack/common/db/api.py b/cinder/openstack/common/db/api.py index 5a76184b329..cfd66306566 100644 --- a/cinder/openstack/common/db/api.py +++ b/cinder/openstack/common/db/api.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2013 Rackspace Hosting # All Rights Reserved. # @@ -17,90 +15,148 @@ """Multiple DB API backend support. -Supported configuration options: - -The following two parameters are in the 'database' group: -`backend`: DB backend name or full module path to DB backend module. -`use_tpool`: Enable thread pooling of DB API calls. - A DB backend module should implement a method named 'get_backend' which takes no arguments. The method can return any object that implements DB API methods. - -*NOTE*: There are bugs in eventlet when using tpool combined with -threading locks. The python logging module happens to use such locks. To -work around this issue, be sure to specify thread=False with -eventlet.monkey_patch(). - -A bug for eventlet has been filed here: - -https://bitbucket.org/eventlet/eventlet/issue/137/ """ + import functools +import logging +import threading +import time -from oslo.config import cfg - +from cinder.openstack.common.db import exception +from cinder.openstack.common.gettextutils import _LE from cinder.openstack.common import importutils -from cinder.openstack.common import lockutils -db_opts = [ - cfg.StrOpt('backend', - default='sqlalchemy', - deprecated_name='db_backend', - deprecated_group='DEFAULT', - help='The backend to use for db'), - cfg.BoolOpt('use_tpool', - default=False, - deprecated_name='dbapi_use_tpool', - deprecated_group='DEFAULT', - help='Enable the experimental use of thread pooling for ' - 'all DB API calls') -] +LOG = logging.getLogger(__name__) -CONF = cfg.CONF -CONF.register_opts(db_opts, 'database') + +def safe_for_db_retry(f): + """Enable db-retry for decorated function, if config option enabled.""" + f.__dict__['enable_retry'] = True + return f + + +class wrap_db_retry(object): + """Retry db.api methods, if DBConnectionError() raised + + Retry decorated db.api methods. If we enabled `use_db_reconnect` + in config, this decorator will be applied to all db.api functions, + marked with @safe_for_db_retry decorator. + Decorator catchs DBConnectionError() and retries function in a + loop until it succeeds, or until maximum retries count will be reached. + """ + + def __init__(self, retry_interval, max_retries, inc_retry_interval, + max_retry_interval): + super(wrap_db_retry, self).__init__() + + self.retry_interval = retry_interval + self.max_retries = max_retries + self.inc_retry_interval = inc_retry_interval + self.max_retry_interval = max_retry_interval + + def __call__(self, f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + next_interval = self.retry_interval + remaining = self.max_retries + + while True: + try: + return f(*args, **kwargs) + except exception.DBConnectionError as e: + if remaining == 0: + LOG.exception(_LE('DB exceeded retry limit.')) + raise exception.DBError(e) + if remaining != -1: + remaining -= 1 + LOG.exception(_LE('DB connection error.')) + # NOTE(vsergeyev): We are using patched time module, so + # this effectively yields the execution + # context to another green thread. + time.sleep(next_interval) + if self.inc_retry_interval: + next_interval = min( + next_interval * 2, + self.max_retry_interval + ) + return wrapper class DBAPI(object): - def __init__(self, backend_mapping=None): - if backend_mapping is None: - backend_mapping = {} - self.__backend = None - self.__backend_mapping = backend_mapping + def __init__(self, backend_name, backend_mapping=None, lazy=False, + **kwargs): + """Initialize the chosen DB API backend. + + :param backend_name: name of the backend to load + :type backend_name: str + + :param backend_mapping: backend name -> module/class to load mapping + :type backend_mapping: dict + + :param lazy: load the DB backend lazily on the first DB API method call + :type lazy: bool + + Keyword arguments: + + :keyword use_db_reconnect: retry DB transactions on disconnect or not + :type use_db_reconnect: bool + + :keyword retry_interval: seconds between transaction retries + :type retry_interval: int + + :keyword inc_retry_interval: increase retry interval or not + :type inc_retry_interval: bool + + :keyword max_retry_interval: max interval value between retries + :type max_retry_interval: int + + :keyword max_retries: max number of retries before an error is raised + :type max_retries: int - @lockutils.synchronized('dbapi_backend', 'cinder-') - def __get_backend(self): - """Get the actual backend. May be a module or an instance of - a class. Doesn't matter to us. We do this synchronized as it's - possible multiple greenthreads started very quickly trying to do - DB calls and eventlet can switch threads before self.__backend gets - assigned. """ - if self.__backend: - # Another thread assigned it - return self.__backend - backend_name = CONF.database.backend - self.__use_tpool = CONF.database.use_tpool - if self.__use_tpool: - from eventlet import tpool - self.__tpool = tpool - # Import the untranslated name if we don't have a - # mapping. - backend_path = self.__backend_mapping.get(backend_name, - backend_name) - backend_mod = importutils.import_module(backend_path) - self.__backend = backend_mod.get_backend() - return self.__backend + + self._backend = None + self._backend_name = backend_name + self._backend_mapping = backend_mapping or {} + self._lock = threading.Lock() + + if not lazy: + self._load_backend() + + self.use_db_reconnect = kwargs.get('use_db_reconnect', False) + self.retry_interval = kwargs.get('retry_interval', 1) + self.inc_retry_interval = kwargs.get('inc_retry_interval', True) + self.max_retry_interval = kwargs.get('max_retry_interval', 10) + self.max_retries = kwargs.get('max_retries', 20) + + def _load_backend(self): + with self._lock: + if not self._backend: + # Import the untranslated name if we don't have a mapping + backend_path = self._backend_mapping.get(self._backend_name, + self._backend_name) + backend_mod = importutils.import_module(backend_path) + self._backend = backend_mod.get_backend() def __getattr__(self, key): - backend = self.__backend or self.__get_backend() - attr = getattr(backend, key) - if not self.__use_tpool or not hasattr(attr, '__call__'): + if not self._backend: + self._load_backend() + + attr = getattr(self._backend, key) + if not hasattr(attr, '__call__'): return attr + # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry + # DB API methods, decorated with @safe_for_db_retry + # on disconnect. + if self.use_db_reconnect and hasattr(attr, 'enable_retry'): + attr = wrap_db_retry( + retry_interval=self.retry_interval, + max_retries=self.max_retries, + inc_retry_interval=self.inc_retry_interval, + max_retry_interval=self.max_retry_interval)(attr) - def tpool_wrapper(*args, **kwargs): - return self.__tpool.execute(attr, *args, **kwargs) - - functools.update_wrapper(tpool_wrapper, attr) - return tpool_wrapper + return attr diff --git a/cinder/openstack/common/db/exception.py b/cinder/openstack/common/db/exception.py index 190d234b040..df295f8dafa 100644 --- a/cinder/openstack/common/db/exception.py +++ b/cinder/openstack/common/db/exception.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -18,6 +16,8 @@ """DB related custom exceptions.""" +import six + from cinder.openstack.common.gettextutils import _ @@ -25,7 +25,7 @@ class DBError(Exception): """Wraps an implementation specific exception.""" def __init__(self, inner_exception=None): self.inner_exception = inner_exception - super(DBError, self).__init__(str(inner_exception)) + super(DBError, self).__init__(six.text_type(inner_exception)) class DBDuplicateEntry(DBError): @@ -43,3 +43,14 @@ class DBDeadlock(DBError): class DBInvalidUnicodeParameter(Exception): message = _("Invalid Parameter: " "Unicode is not supported by the current database.") + + +class DbMigrationError(DBError): + """Wraps migration specific exception.""" + def __init__(self, message=None): + super(DbMigrationError, self).__init__(message) + + +class DBConnectionError(DBError): + """Wraps connection specific exception.""" + pass diff --git a/cinder/openstack/common/db/options.py b/cinder/openstack/common/db/options.py new file mode 100644 index 00000000000..e5fd8fa5894 --- /dev/null +++ b/cinder/openstack/common/db/options.py @@ -0,0 +1,171 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from oslo.config import cfg + + +database_opts = [ + cfg.StrOpt('sqlite_db', + deprecated_group='DEFAULT', + default='cinder.sqlite', + help='The file name to use with SQLite'), + cfg.BoolOpt('sqlite_synchronous', + deprecated_group='DEFAULT', + default=True, + help='If True, SQLite uses synchronous mode'), + cfg.StrOpt('backend', + default='sqlalchemy', + deprecated_name='db_backend', + deprecated_group='DEFAULT', + help='The backend to use for db'), + cfg.StrOpt('connection', + help='The SQLAlchemy connection string used to connect to the ' + 'database', + secret=True, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE'), + cfg.DeprecatedOpt('connection', + group='sql'), ]), + cfg.StrOpt('mysql_sql_mode', + default='TRADITIONAL', + help='The SQL mode to be used for MySQL sessions. ' + 'This option, including the default, overrides any ' + 'server-set SQL mode. To use whatever SQL mode ' + 'is set by the server configuration, ' + 'set this to no value. Example: mysql_sql_mode='), + cfg.IntOpt('idle_timeout', + default=3600, + deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE'), + cfg.DeprecatedOpt('idle_timeout', + group='sql')], + help='Timeout before idle sql connections are reaped'), + cfg.IntOpt('min_pool_size', + default=1, + deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', + group='DATABASE')], + help='Minimum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_pool_size', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', + group='DATABASE')], + help='Maximum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_retries', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', + group='DATABASE')], + help='Maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('retry_interval', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', + group='DATABASE')], + help='Interval between retries of opening a sql connection'), + cfg.IntOpt('max_overflow', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', + group='DATABASE')], + help='If set, use this value for max_overflow with sqlalchemy'), + cfg.IntOpt('connection_debug', + default=0, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('connection_trace', + default=False, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], + help='Add python stack traces to SQL as comment strings'), + cfg.IntOpt('pool_timeout', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], + help='If set, use this value for pool_timeout with sqlalchemy'), + cfg.BoolOpt('use_db_reconnect', + default=False, + help='Enable the experimental use of database reconnect ' + 'on connection lost'), + cfg.IntOpt('db_retry_interval', + default=1, + help='seconds between db connection retries'), + cfg.BoolOpt('db_inc_retry_interval', + default=True, + help='Whether to increase interval between db connection ' + 'retries, up to db_max_retry_interval'), + cfg.IntOpt('db_max_retry_interval', + default=10, + help='max seconds between db connection retries, if ' + 'db_inc_retry_interval is enabled'), + cfg.IntOpt('db_max_retries', + default=20, + help='maximum db connection retries before error is raised. ' + '(setting -1 implies an infinite retry count)'), +] + +CONF = cfg.CONF +CONF.register_opts(database_opts, 'database') + + +def set_defaults(sql_connection, sqlite_db, max_pool_size=None, + max_overflow=None, pool_timeout=None): + """Set defaults for configuration variables.""" + cfg.set_defaults(database_opts, + connection=sql_connection, + sqlite_db=sqlite_db) + # Update the QueuePool defaults + if max_pool_size is not None: + cfg.set_defaults(database_opts, + max_pool_size=max_pool_size) + if max_overflow is not None: + cfg.set_defaults(database_opts, + max_overflow=max_overflow) + if pool_timeout is not None: + cfg.set_defaults(database_opts, + pool_timeout=pool_timeout) + + +def list_opts(): + """Returns a list of oslo.config options available in the library. + + The returned list includes all oslo.config options which may be registered + at runtime by the library. + + Each element of the list is a tuple. The first element is the name of the + group under which the list of elements in the second element will be + registered. A group name of None corresponds to the [DEFAULT] group in + config files. + + The purpose of this is to allow tools like the Oslo sample config file + generator to discover the options exposed to users by this library. + + :returns: a list of (group_name, opts) tuples + """ + return [('database', copy.deepcopy(database_opts))] diff --git a/cinder/openstack/common/db/sqlalchemy/__init__.py b/cinder/openstack/common/db/sqlalchemy/__init__.py index 1b9b60dec10..e69de29bb2d 100644 --- a/cinder/openstack/common/db/sqlalchemy/__init__.py +++ b/cinder/openstack/common/db/sqlalchemy/__init__.py @@ -1,16 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/cinder/openstack/common/db/sqlalchemy/migration.py b/cinder/openstack/common/db/sqlalchemy/migration.py new file mode 100644 index 00000000000..f08df650d1e --- /dev/null +++ b/cinder/openstack/common/db/sqlalchemy/migration.py @@ -0,0 +1,278 @@ +# coding: utf-8 +# +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# Base on code in migrate/changeset/databases/sqlite.py which is under +# the following license: +# +# The MIT License +# +# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import os +import re + +from migrate.changeset import ansisql +from migrate.changeset.databases import sqlite +from migrate import exceptions as versioning_exceptions +from migrate.versioning import api as versioning_api +from migrate.versioning.repository import Repository +import sqlalchemy +from sqlalchemy.schema import UniqueConstraint + +from cinder.openstack.common.db import exception +from cinder.openstack.common.gettextutils import _ + + +def _get_unique_constraints(self, table): + """Retrieve information about existing unique constraints of the table + + This feature is needed for _recreate_table() to work properly. + Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x. + + """ + + data = table.metadata.bind.execute( + """SELECT sql + FROM sqlite_master + WHERE + type='table' AND + name=:table_name""", + table_name=table.name + ).fetchone()[0] + + UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)" + return [ + UniqueConstraint( + *[getattr(table.columns, c.strip(' "')) for c in cols.split(",")], + name=name + ) + for name, cols in re.findall(UNIQUE_PATTERN, data) + ] + + +def _recreate_table(self, table, column=None, delta=None, omit_uniques=None): + """Recreate the table properly + + Unlike the corresponding original method of sqlalchemy-migrate this one + doesn't drop existing unique constraints when creating a new one. + + """ + + table_name = self.preparer.format_table(table) + + # we remove all indexes so as not to have + # problems during copy and re-create + for index in table.indexes: + index.drop() + + # reflect existing unique constraints + for uc in self._get_unique_constraints(table): + table.append_constraint(uc) + # omit given unique constraints when creating a new table if required + table.constraints = set([ + cons for cons in table.constraints + if omit_uniques is None or cons.name not in omit_uniques + ]) + + self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) + self.execute() + + insertion_string = self._modify_table(table, column, delta) + + table.create(bind=self.connection) + self.append(insertion_string % {'table_name': table_name}) + self.execute() + self.append('DROP TABLE migration_tmp') + self.execute() + + +def _visit_migrate_unique_constraint(self, *p, **k): + """Drop the given unique constraint + + The corresponding original method of sqlalchemy-migrate just + raises NotImplemented error + + """ + + self.recreate_table(p[0].table, omit_uniques=[p[0].name]) + + +def patch_migrate(): + """A workaround for SQLite's inability to alter things + + SQLite abilities to alter tables are very limited (please read + http://www.sqlite.org/lang_altertable.html for more details). + E. g. one can't drop a column or a constraint in SQLite. The + workaround for this is to recreate the original table omitting + the corresponding constraint (or column). + + sqlalchemy-migrate library has recreate_table() method that + implements this workaround, but it does it wrong: + + - information about unique constraints of a table + is not retrieved. So if you have a table with one + unique constraint and a migration adding another one + you will end up with a table that has only the + latter unique constraint, and the former will be lost + + - dropping of unique constraints is not supported at all + + The proper way to fix this is to provide a pull-request to + sqlalchemy-migrate, but the project seems to be dead. So we + can go on with monkey-patching of the lib at least for now. + + """ + + # this patch is needed to ensure that recreate_table() doesn't drop + # existing unique constraints of the table when creating a new one + helper_cls = sqlite.SQLiteHelper + helper_cls.recreate_table = _recreate_table + helper_cls._get_unique_constraints = _get_unique_constraints + + # this patch is needed to be able to drop existing unique constraints + constraint_cls = sqlite.SQLiteConstraintDropper + constraint_cls.visit_migrate_unique_constraint = \ + _visit_migrate_unique_constraint + constraint_cls.__bases__ = (ansisql.ANSIColumnDropper, + sqlite.SQLiteConstraintGenerator) + + +def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True): + """Upgrade or downgrade a database. + + Function runs the upgrade() or downgrade() functions in change scripts. + + :param engine: SQLAlchemy engine instance for a given database + :param abs_path: Absolute path to migrate repository. + :param version: Database will upgrade/downgrade until this version. + If None - database will update to the latest + available version. + :param init_version: Initial database version + :param sanity_check: Require schema sanity checking for all tables + """ + + if version is not None: + try: + version = int(version) + except ValueError: + raise exception.DbMigrationError( + message=_("version should be an integer")) + + current_version = db_version(engine, abs_path, init_version) + repository = _find_migrate_repo(abs_path) + if sanity_check: + _db_schema_sanity_check(engine) + if version is None or version > current_version: + return versioning_api.upgrade(engine, repository, version) + else: + return versioning_api.downgrade(engine, repository, + version) + + +def _db_schema_sanity_check(engine): + """Ensure all database tables were created with required parameters. + + :param engine: SQLAlchemy engine instance for a given database + + """ + + if engine.name == 'mysql': + onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION ' + 'from information_schema.TABLES ' + 'where TABLE_SCHEMA=%s and ' + 'TABLE_COLLATION NOT LIKE "%%utf8%%"') + + # NOTE(morganfainberg): exclude the sqlalchemy-migrate and alembic + # versioning tables from the tables we need to verify utf8 status on. + # Non-standard table names are not supported. + EXCLUDED_TABLES = ['migrate_version', 'alembic_version'] + + table_names = [res[0] for res in + engine.execute(onlyutf8_sql, engine.url.database) if + res[0].lower() not in EXCLUDED_TABLES] + + if len(table_names) > 0: + raise ValueError(_('Tables "%s" have non utf8 collation, ' + 'please make sure all tables are CHARSET=utf8' + ) % ','.join(table_names)) + + +def db_version(engine, abs_path, init_version): + """Show the current version of the repository. + + :param engine: SQLAlchemy engine instance for a given database + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + try: + return versioning_api.db_version(engine, repository) + except versioning_exceptions.DatabaseNotControlledError: + meta = sqlalchemy.MetaData() + meta.reflect(bind=engine) + tables = meta.tables + if len(tables) == 0 or 'alembic_version' in tables: + db_version_control(engine, abs_path, version=init_version) + return versioning_api.db_version(engine, repository) + else: + raise exception.DbMigrationError( + message=_( + "The database is not under version control, but has " + "tables. Please stamp the current version of the schema " + "manually.")) + + +def db_version_control(engine, abs_path, version=None): + """Mark a database as under this repository's version control. + + Once a database is under version control, schema changes should + only be done via change scripts in this repository. + + :param engine: SQLAlchemy engine instance for a given database + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + versioning_api.version_control(engine, repository, version) + return version + + +def _find_migrate_repo(abs_path): + """Get the project's change script repository + + :param abs_path: Absolute path to migrate repository + """ + if not os.path.exists(abs_path): + raise exception.DbMigrationError("Path %s not found" % abs_path) + return Repository(abs_path) diff --git a/cinder/openstack/common/db/sqlalchemy/models.py b/cinder/openstack/common/db/sqlalchemy/models.py index 33a91039155..2de190dd698 100644 --- a/cinder/openstack/common/db/sqlalchemy/models.py +++ b/cinder/openstack/common/db/sqlalchemy/models.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2011 X.commerce, a business unit of eBay Inc. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. @@ -22,30 +20,30 @@ SQLAlchemy models. """ +import six + from sqlalchemy import Column, Integer from sqlalchemy import DateTime from sqlalchemy.orm import object_mapper -from cinder.openstack.common.db.sqlalchemy.session import get_session from cinder.openstack.common import timeutils -class ModelBase(object): +class ModelBase(six.Iterator): """Base class for models.""" __table_initialized__ = False - def save(self, session=None): + def save(self, session): """Save this object.""" - if not session: - session = get_session() + # NOTE(boris-42): This part of code should be look like: - # sesssion.add(self) + # session.add(self) # session.flush() # But there is a bug in sqlalchemy and eventlet that # raises NoneType exception if there is no running # transaction and rollback is called. As long as # sqlalchemy has this bug we have to create transaction - # explicity. + # explicitly. with session.begin(subtransactions=True): session.add(self) session.flush() @@ -59,46 +57,62 @@ class ModelBase(object): def get(self, key, default=None): return getattr(self, key, default) + @property + def _extra_keys(self): + """Specifies custom fields + + Subclasses can override this property to return a list + of custom fields that should be included in their dict + representation. + + For reference check tests/db/sqlalchemy/test_models.py + """ + return [] + def __iter__(self): - columns = dict(object_mapper(self).columns).keys() + columns = list(dict(object_mapper(self).columns).keys()) # NOTE(russellb): Allow models to specify other keys that can be looked # up, beyond the actual db columns. An example would be the 'name' # property for an Instance. - if hasattr(self, '_extra_keys'): - columns.extend(self._extra_keys()) + columns.extend(self._extra_keys) self._i = iter(columns) return self - def next(self): - n = self._i.next() + # In Python 3, __next__() has replaced next(). + def __next__(self): + n = six.advance_iterator(self._i) return n, getattr(self, n) + def next(self): + return self.__next__() + def update(self, values): """Make the model object behave like a dict.""" - for k, v in values.iteritems(): + for k, v in six.iteritems(values): setattr(self, k, v) def iteritems(self): """Make the model object behave like a dict. - Includes attributes from joins.""" + Includes attributes from joins. + """ local = dict(self) - joined = dict([(k, v) for k, v in self.__dict__.iteritems() + joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) if not k[0] == '_']) local.update(joined) - return local.iteritems() + return six.iteritems(local) class TimestampMixin(object): - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) + created_at = Column(DateTime, default=lambda: timeutils.utcnow()) + updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow()) class SoftDeleteMixin(object): deleted_at = Column(DateTime) deleted = Column(Integer, default=0) - def soft_delete(self, session=None): + def soft_delete(self, session): """Mark this object as deleted.""" self.deleted = self.id self.deleted_at = timeutils.utcnow() diff --git a/cinder/openstack/common/db/sqlalchemy/provision.py b/cinder/openstack/common/db/sqlalchemy/provision.py new file mode 100644 index 00000000000..100df1158c0 --- /dev/null +++ b/cinder/openstack/common/db/sqlalchemy/provision.py @@ -0,0 +1,157 @@ +# Copyright 2013 Mirantis.inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provision test environment for specific DB backends""" + +import argparse +import logging +import os +import random +import string + +from six import moves +import sqlalchemy + +from cinder.openstack.common.db import exception as exc + + +LOG = logging.getLogger(__name__) + + +def get_engine(uri): + """Engine creation + + Call the function without arguments to get admin connection. Admin + connection required to create temporary user and database for each + particular test. Otherwise use existing connection to recreate connection + to the temporary database. + """ + return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool) + + +def _execute_sql(engine, sql, driver): + """Initialize connection, execute sql query and close it.""" + try: + with engine.connect() as conn: + if driver == 'postgresql': + conn.connection.set_isolation_level(0) + for s in sql: + conn.execute(s) + except sqlalchemy.exc.OperationalError: + msg = ('%s does not match database admin ' + 'credentials or database does not exist.') + LOG.exception(msg % engine.url) + raise exc.DBConnectionError(msg % engine.url) + + +def create_database(engine): + """Provide temporary user and database for each particular test.""" + driver = engine.name + + auth = { + 'database': ''.join(random.choice(string.ascii_lowercase) + for i in moves.range(10)), + 'user': engine.url.username, + 'passwd': engine.url.password, + } + + sqls = [ + "drop database if exists %(database)s;", + "create database %(database)s;" + ] + + if driver == 'sqlite': + return 'sqlite:////tmp/%s' % auth['database'] + elif driver in ['mysql', 'postgresql']: + sql_query = map(lambda x: x % auth, sqls) + _execute_sql(engine, sql_query, driver) + else: + raise ValueError('Unsupported RDBMS %s' % driver) + + params = auth.copy() + params['backend'] = driver + return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params + + +def drop_database(admin_engine, current_uri): + """Drop temporary database and user after each particular test.""" + + engine = get_engine(current_uri) + driver = engine.name + auth = {'database': engine.url.database, 'user': engine.url.username} + + if driver == 'sqlite': + try: + os.remove(auth['database']) + except OSError: + pass + elif driver in ['mysql', 'postgresql']: + sql = "drop database if exists %(database)s;" + _execute_sql(admin_engine, [sql % auth], driver) + else: + raise ValueError('Unsupported RDBMS %s' % driver) + + +def main(): + """Controller to handle commands + + ::create: Create test user and database with random names. + ::drop: Drop user and database created by previous command. + """ + parser = argparse.ArgumentParser( + description='Controller to handle database creation and dropping' + ' commands.', + epilog='Under normal circumstances is not used directly.' + ' Used in .testr.conf to automate test database creation' + ' and dropping processes.') + subparsers = parser.add_subparsers( + help='Subcommands to manipulate temporary test databases.') + + create = subparsers.add_parser( + 'create', + help='Create temporary test ' + 'databases and users.') + create.set_defaults(which='create') + create.add_argument( + 'instances_count', + type=int, + help='Number of databases to create.') + + drop = subparsers.add_parser( + 'drop', + help='Drop temporary test databases and users.') + drop.set_defaults(which='drop') + drop.add_argument( + 'instances', + nargs='+', + help='List of databases uri to be dropped.') + + args = parser.parse_args() + + connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', + 'sqlite://') + engine = get_engine(connection_string) + which = args.which + + if which == "create": + for i in range(int(args.instances_count)): + print(create_database(engine)) + elif which == "drop": + for db in args.instances: + drop_database(engine, db) + + +if __name__ == "__main__": + main() diff --git a/cinder/openstack/common/db/sqlalchemy/session.py b/cinder/openstack/common/db/sqlalchemy/session.py index e7d1d1bc53e..29ffc9f4128 100644 --- a/cinder/openstack/common/db/sqlalchemy/session.py +++ b/cinder/openstack/common/db/sqlalchemy/session.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -18,43 +16,34 @@ """Session Handling for SQLAlchemy backend. -Initializing: - -* Call set_defaults with the minimal of the following kwargs: - sql_connection, sqlite_db - - Example: - - session.set_defaults( - sql_connection="sqlite:///var/lib/cinder/sqlite.db", - sqlite_db="/var/lib/cinder/sqlite.db") - Recommended ways to use sessions within this framework: -* Don't use them explicitly; this is like running with AUTOCOMMIT=1. - model_query() will implicitly use a session when called without one +* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. + `model_query()` will implicitly use a session when called without one supplied. This is the ideal situation because it will allow queries to be automatically retried if the database connection is interrupted. - Note: Automatic retry will be enabled in a future patch. + .. note:: Automatic retry will be enabled in a future patch. It is generally fine to issue several queries in a row like this. Even though they may be run in separate transactions and/or separate sessions, each one will see the data from the prior calls. If needed, undo- or rollback-like functionality should be handled at a logical level. For an example, look at - the code around quotas and reservation_rollback(). + the code around quotas and `reservation_rollback()`. Examples: + .. code:: python + def get_foo(context, foo): - return model_query(context, models.Foo).\ - filter_by(foo=foo).\ - first() + return (model_query(context, models.Foo). + filter_by(foo=foo). + first()) def update_foo(context, id, newfoo): - model_query(context, models.Foo).\ - filter_by(id=id).\ - update({'foo': newfoo}) + (model_query(context, models.Foo). + filter_by(id=id). + update({'foo': newfoo})) def create_foo(context, values): foo_ref = models.Foo() @@ -63,21 +52,29 @@ Recommended ways to use sessions within this framework: return foo_ref -* Within the scope of a single method, keeping all the reads and writes within - the context managed by a single session. In this way, the session's __exit__ - handler will take care of calling flush() and commit() for you. - If using this approach, you should not explicitly call flush() or commit(). - Any error within the context of the session will cause the session to emit - a ROLLBACK. If the connection is dropped before this is possible, the - database will implicitly rollback the transaction. +* Within the scope of a single method, keep all the reads and writes within + the context managed by a single session. In this way, the session's + `__exit__` handler will take care of calling `flush()` and `commit()` for + you. If using this approach, you should not explicitly call `flush()` or + `commit()`. Any error within the context of the session will cause the + session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be + raised in `session`'s `__exit__` handler, and any try/except within the + context managed by `session` will not be triggered. And catching other + non-database errors in the session will not trigger the ROLLBACK, so + exception handlers should always be outside the session, unless the + developer wants to do a partial commit on purpose. If the connection is + dropped before this is possible, the database will implicitly roll back the + transaction. - Note: statements in the session scope will not be automatically retried. + .. note:: Statements in the session scope will not be automatically retried. If you create models within the session, they need to be added, but you - do not need to call model.save() + do not need to call `model.save()`: + + .. code:: python def create_many_foo(context, foos): - session = get_session() + session = sessionmaker() with session.begin(): for foo in foos: foo_ref = models.Foo() @@ -85,38 +82,64 @@ Recommended ways to use sessions within this framework: session.add(foo_ref) def update_bar(context, foo_id, newbar): - session = get_session() + session = sessionmaker() with session.begin(): - foo_ref = model_query(context, models.Foo, session).\ - filter_by(id=foo_id).\ - first() - model_query(context, models.Bar, session).\ - filter_by(id=foo_ref['bar_id']).\ - update({'bar': newbar}) + foo_ref = (model_query(context, models.Foo, session). + filter_by(id=foo_id). + first()) + (model_query(context, models.Bar, session). + filter_by(id=foo_ref['bar_id']). + update({'bar': newbar})) - Note: update_bar is a trivially simple example of using "with session.begin". - Whereas create_many_foo is a good example of when a transaction is needed, - it is always best to use as few queries as possible. The two queries in - update_bar can be better expressed using a single query which avoids - the need for an explicit transaction. It can be expressed like so: + .. note:: `update_bar` is a trivially simple example of using + ``with session.begin``. Whereas `create_many_foo` is a good example of + when a transaction is needed, it is always best to use as few queries as + possible. + + The two queries in `update_bar` can be better expressed using a single query + which avoids the need for an explicit transaction. It can be expressed like + so: + + .. code:: python def update_bar(context, foo_id, newbar): - subq = model_query(context, models.Foo.id).\ - filter_by(id=foo_id).\ - limit(1).\ - subquery() - model_query(context, models.Bar).\ - filter_by(id=subq.as_scalar()).\ - update({'bar': newbar}) + subq = (model_query(context, models.Foo.id). + filter_by(id=foo_id). + limit(1). + subquery()) + (model_query(context, models.Bar). + filter_by(id=subq.as_scalar()). + update({'bar': newbar})) - For reference, this emits approximagely the following SQL statement: + For reference, this emits approximately the following SQL statement: + + .. code:: sql UPDATE bar SET bar = ${newbar} WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); + .. note:: `create_duplicate_foo` is a trivially simple example of catching an + exception while using ``with session.begin``. Here create two duplicate + instances with same primary key, must catch the exception out of context + managed by a single session: + + .. code:: python + + def create_duplicate_foo(context): + foo1 = models.Foo() + foo2 = models.Foo() + foo1.id = foo2.id = 1 + session = sessionmaker() + try: + with session.begin(): + session.add(foo1) + session.add(foo2) + except exception.DBDuplicateEntry as e: + handle_error(e) + * Passing an active session between methods. Sessions should only be passed to private methods. The private method must use a subtransaction; otherwise - SQLAlchemy will throw an error when you call session.begin() on an existing + SQLAlchemy will throw an error when you call `session.begin()` on an existing transaction. Public methods should not accept a session parameter and should not be involved in sessions within the caller's scope. @@ -129,8 +152,10 @@ Recommended ways to use sessions within this framework: becomes less clear in this situation. When this is needed for code clarity, it should be clearly documented. + .. code:: python + def myfunc(foo): - session = get_session() + session = sessionmaker() with session.begin(): # do some database things bar = _private_func(foo, session) @@ -138,7 +163,7 @@ Recommended ways to use sessions within this framework: def _private_func(foo, session=None): if not session: - session = get_session() + session = sessionmaker() with session.begin(subtransaction=True): # do some other database things return bar @@ -148,13 +173,13 @@ There are some things which it is best to avoid: * Don't keep a transaction open any longer than necessary. - This means that your "with session.begin()" block should be as short + This means that your ``with session.begin()`` block should be as short as possible, while still containing all the related calls for that transaction. -* Avoid "with_lockmode('UPDATE')" when possible. +* Avoid ``with_lockmode('UPDATE')`` when possible. - In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match + In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match any rows, it will take a gap-lock. This is a form of write-lock on the "gap" where no rows exist, and prevents any other writes to that space. This can effectively prevent any INSERT into a table by locking the gap @@ -165,16 +190,19 @@ There are some things which it is best to avoid: number of rows matching a query, and if only one row is returned, then issue the SELECT FOR UPDATE. - The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. + The better long-term solution is to use + ``INSERT .. ON DUPLICATE KEY UPDATE``. However, this can not be done until the "deleted" columns are removed and proper UNIQUE constraints are added to the tables. Enabling soft deletes: -* To use/enable soft-deletes, the SoftDeleteMixin must be added +* To use/enable soft-deletes, the `SoftDeleteMixin` must be added to your model class. For example: + .. code:: python + class NovaBase(models.SoftDeleteMixin, models.ModelBase): pass @@ -182,13 +210,15 @@ Enabling soft deletes: Efficient use of soft deletes: * There are two possible ways to mark a record as deleted: - model.soft_delete() and query.soft_delete(). + `model.soft_delete()` and `query.soft_delete()`. - model.soft_delete() method works with single already fetched entry. - query.soft_delete() makes only one db request for all entries that correspond - to query. + The `model.soft_delete()` method works with a single already-fetched entry. + `query.soft_delete()` makes only one db request for all entries that + correspond to the query. -* In almost all cases you should use query.soft_delete(). Some examples: +* In almost all cases you should use `query.soft_delete()`. Some examples: + + .. code:: python def soft_delete_bar(): count = model_query(BarModel).find(some_condition).soft_delete() @@ -197,35 +227,39 @@ Efficient use of soft deletes: def complex_soft_delete_with_synchronization_bar(session=None): if session is None: - session = get_session() + session = sessionmaker() with session.begin(subtransactions=True): - count = model_query(BarModel).\ - find(some_condition).\ - soft_delete(synchronize_session=True) + count = (model_query(BarModel). + find(some_condition). + soft_delete(synchronize_session=True)) # Here synchronize_session is required, because we # don't know what is going on in outer session. if count == 0: raise Exception("0 entries were soft deleted") -* There is only one situation where model.soft_delete() is appropriate: when +* There is only one situation where `model.soft_delete()` is appropriate: when you fetch a single record, work with it, and mark it as deleted in the same transaction. + .. code:: python + def soft_delete_bar_model(): - session = get_session() + session = sessionmaker() with session.begin(): bar_ref = model_query(BarModel).find(some_condition).first() # Work with bar_ref bar_ref.soft_delete(session=session) However, if you need to work with all entries that correspond to query and - then soft delete them you should use query.soft_delete() method: + then soft delete them you should use the `query.soft_delete()` method: + + .. code:: python def soft_delete_multi_models(): - session = get_session() + session = sessionmaker() with session.begin(): - query = model_query(BarModel, session=session).\ - find(some_condition) + query = (model_query(BarModel, session=session). + find(some_condition)) model_refs = query.all() # Work with model_refs query.soft_delete(synchronize_session=False) @@ -233,131 +267,39 @@ Efficient use of soft deletes: # session and these entries are not used after this. When working with many rows, it is very important to use query.soft_delete, - which issues a single query. Using model.soft_delete(), as in the following + which issues a single query. Using `model.soft_delete()`, as in the following example, is very inefficient. + .. code:: python + for bar_ref in bar_refs: bar_ref.soft_delete(session=session) # This will produce count(bar_refs) db requests. + """ -import os.path +import functools +import logging import re import time -from eventlet import greenthread -from oslo.config import cfg import six from sqlalchemy import exc as sqla_exc -import sqlalchemy.interfaces from sqlalchemy.interfaces import PoolListener import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column from cinder.openstack.common.db import exception -from cinder.openstack.common import log as logging -from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common.gettextutils import _LE, _LW from cinder.openstack.common import timeutils -DEFAULT = 'DEFAULT' -sqlite_db_opts = [ - cfg.StrOpt('sqlite_db', - default='cinder.sqlite', - help='the filename to use with sqlite'), - cfg.BoolOpt('sqlite_synchronous', - default=True, - help='If true, use synchronous mode for sqlite'), -] - -database_opts = [ - cfg.StrOpt('connection', - default='sqlite:///' + - os.path.abspath(os.path.join(os.path.dirname(__file__), - '../', '$sqlite_db')), - help='The SQLAlchemy connection string used to connect to the ' - 'database', - deprecated_name='sql_connection', - deprecated_group=DEFAULT, - secret=True), - cfg.IntOpt('idle_timeout', - default=3600, - deprecated_name='sql_idle_timeout', - deprecated_group=DEFAULT, - help='timeout before idle sql connections are reaped'), - cfg.IntOpt('min_pool_size', - default=1, - deprecated_name='sql_min_pool_size', - deprecated_group=DEFAULT, - help='Minimum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_pool_size', - default=5, - deprecated_name='sql_max_pool_size', - deprecated_group=DEFAULT, - help='Maximum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_retries', - default=10, - deprecated_name='sql_max_retries', - deprecated_group=DEFAULT, - help='maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('retry_interval', - default=10, - deprecated_name='sql_retry_interval', - deprecated_group=DEFAULT, - help='interval between retries of opening a sql connection'), - cfg.IntOpt('max_overflow', - default=None, - deprecated_name='sql_max_overflow', - deprecated_group=DEFAULT, - help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('connection_debug', - default=0, - deprecated_name='sql_connection_debug', - deprecated_group=DEFAULT, - help='Verbosity of SQL debugging information. 0=None, ' - '100=Everything'), - cfg.BoolOpt('connection_trace', - default=False, - deprecated_name='sql_connection_trace', - deprecated_group=DEFAULT, - help='Add python stack traces to SQL as comment strings'), -] - -CONF = cfg.CONF -CONF.register_opts(sqlite_db_opts) -CONF.register_opts(database_opts, 'database') LOG = logging.getLogger(__name__) -_ENGINE = None -_MAKER = None - - -def set_defaults(sql_connection, sqlite_db): - """Set defaults for configuration variables.""" - cfg.set_defaults(database_opts, - connection=sql_connection) - cfg.set_defaults(sqlite_db_opts, - sqlite_db=sqlite_db) - - -def cleanup(): - global _ENGINE, _MAKER - - if _MAKER: - _MAKER.close_all() - _MAKER = None - if _ENGINE: - _ENGINE.dispose() - _ENGINE = None - class SqliteForeignKeysListener(PoolListener): - """ - Ensures that the foreign key constraints are enforced in SQLite. + """Ensures that the foreign key constraints are enforced in SQLite. The foreign key constraints are disabled by default in SQLite, so the foreign key constraints will be enabled here for every @@ -367,19 +309,6 @@ class SqliteForeignKeysListener(PoolListener): dbapi_con.execute('pragma foreign_keys=ON') -def get_session(autocommit=True, expire_on_commit=False, - sqlite_fk=False): - """Return a SQLAlchemy session.""" - global _MAKER - - if _MAKER is None: - engine = get_engine(sqlite_fk=sqlite_fk) - _MAKER = get_maker(engine, autocommit, expire_on_commit) - - session = _MAKER() - return session - - # note(boris-42): In current versions of DB backends unique constraint # violation messages follow the structure: # @@ -387,6 +316,11 @@ def get_session(autocommit=True, expire_on_commit=False, # 1 column - (IntegrityError) column c1 is not unique # N columns - (IntegrityError) column c1, c2, ..., N are not unique # +# sqlite since 3.7.16: +# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1 +# +# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2 +# # postgres: # 1 column - (IntegrityError) duplicate key value violates unique # constraint "users_c1_key" @@ -398,39 +332,65 @@ def get_session(autocommit=True, expire_on_commit=False, # 'c1'") # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # with -' for key 'name_of_our_constraint'") +# +# ibm_db_sa: +# N columns - (IntegrityError) SQL0803N One or more values in the INSERT +# statement, UPDATE statement, or foreign key update caused by a +# DELETE statement are not valid because the primary key, unique +# constraint or unique index identified by "2" constrains table +# "NOVA.KEY_PAIRS" from having duplicate values for the index +# key. _DUP_KEY_RE_DB = { - "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), - "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), - "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") + "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), + re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), + "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), + "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),), + "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),), } def _raise_if_duplicate_entry_error(integrity_error, engine_name): - """ + """Raise exception if two entries are duplicated. + In this function will be raised DBDuplicateEntry exception if integrity error wrap unique constraint violation. """ def get_columns_from_uniq_cons_or_name(columns): - # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3" - # means that columns c1, c2, c3 are in UniqueConstraint. + # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2" + # where `t` it is table name and columns `c1`, `c2` + # are in UniqueConstraint. uniqbase = "uniq_" if not columns.startswith(uniqbase): if engine_name == "postgresql": return [columns[columns.index("_") + 1:columns.rindex("_")]] return [columns] - return columns[len(uniqbase):].split("_x_") + return columns[len(uniqbase):].split("0")[1:] - if engine_name not in ["mysql", "sqlite", "postgresql"]: + if engine_name not in ("ibm_db_sa", "mysql", "sqlite", "postgresql"): return - m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) - if not m: + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. + for pattern in _DUP_KEY_RE_DB[engine_name]: + match = pattern.match(integrity_error.message) + if match: + break + else: return - columns = m.group(1) + + # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the + # columns so we have to omit that from the DBDuplicateEntry error. + columns = '' + + if engine_name != 'ibm_db_sa': + columns = match.group(1) if engine_name == "sqlite": - columns = columns.strip().split(", ") + columns = [c.split('.')[-1] for c in columns.strip().split(", ")] else: columns = get_columns_from_uniq_cons_or_name(columns) raise exception.DBDuplicateEntry(columns, integrity_error) @@ -448,13 +408,19 @@ _DEADLOCK_RE_DB = { def _raise_if_deadlock_error(operational_error, engine_name): - """ + """Raise exception on deadlock condition. + Raise DBDeadlock exception if OperationalError contains a Deadlock condition. """ re = _DEADLOCK_RE_DB.get(engine_name) if re is None: return + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. m = re.match(operational_error.message) if not m: return @@ -462,43 +428,40 @@ def _raise_if_deadlock_error(operational_error, engine_name): def _wrap_db_error(f): - def _wrap(*args, **kwargs): + @functools.wraps(f) + def _wrap(self, *args, **kwargs): try: - return f(*args, **kwargs) + assert issubclass( + self.__class__, sqlalchemy.orm.session.Session + ), ('_wrap_db_error() can only be applied to methods of ' + 'subclasses of sqlalchemy.orm.session.Session.') + + return f(self, *args, **kwargs) except UnicodeEncodeError: raise exception.DBInvalidUnicodeParameter() - # note(boris-42): We should catch unique constraint violation and - # wrap it by our own DBDuplicateEntry exception. Unique constraint - # violation is wrapped by IntegrityError. except sqla_exc.OperationalError as e: - _raise_if_deadlock_error(e, get_engine().name) + _raise_if_db_connection_lost(e, self.bind) + _raise_if_deadlock_error(e, self.bind.dialect.name) # NOTE(comstud): A lot of code is checking for OperationalError # so let's not wrap it for now. raise + # note(boris-42): We should catch unique constraint violation and + # wrap it by our own DBDuplicateEntry exception. Unique constraint + # violation is wrapped by IntegrityError. except sqla_exc.IntegrityError as e: # note(boris-42): SqlAlchemy doesn't unify errors from different # DBs so we must do this. Also in some tables (for example # instance_types) there are more than one unique constraint. This # means we should get names of columns, which values violate # unique constraint, from error message. - _raise_if_duplicate_entry_error(e, get_engine().name) + _raise_if_duplicate_entry_error(e, self.bind.dialect.name) raise exception.DBError(e) except Exception as e: - LOG.exception(_('DB exception wrapped.')) + LOG.exception(_LE('DB exception wrapped.')) raise exception.DBError(e) - _wrap.func_name = f.func_name return _wrap -def get_engine(sqlite_fk=False): - """Return a SQLAlchemy engine.""" - global _ENGINE - if _ENGINE is None: - _ENGINE = create_engine(CONF.database.connection, - sqlite_fk=sqlite_fk) - return _ENGINE - - def _synchronous_switch_listener(dbapi_conn, connection_rec): """Switch sqlite connections to non-synchronous mode.""" dbapi_conn.execute("PRAGMA synchronous = OFF") @@ -513,88 +476,188 @@ def _add_regexp_listener(dbapi_con, con_record): dbapi_con.create_function('regexp', 2, regexp) -def _greenthread_yield(dbapi_con, con_record): +def _thread_yield(dbapi_con, con_record): + """Ensure other greenthreads get a chance to be executed. + + If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will + execute instead of time.sleep(0). + Force a context switch. With common database backends (eg MySQLdb and + sqlite), there is no implicit yield caused by network I/O since they are + implemented by C libraries that eventlet cannot monkey patch. """ - Ensure other greenthreads get a chance to execute by forcing a context - switch. With common database backends (eg MySQLdb and sqlite), there is - no implicit yield caused by network I/O since they are implemented by - C libraries that eventlet cannot monkey patch. - """ - greenthread.sleep(0) + time.sleep(0) -def _ping_listener(dbapi_conn, connection_rec, connection_proxy): - """ - Ensures that MySQL connections checked out of the - pool are alive. +def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): + """Ensures that MySQL, PostgreSQL or DB2 connections are alive. Borrowed from: http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f """ + cursor = dbapi_conn.cursor() try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warn(_('Got mysql server has gone away: %s'), ex) - raise sqla_exc.DisconnectionError("Database server went away") + ping_sql = 'select 1' + if engine.name == 'ibm_db_sa': + # DB2 requires a table expression + ping_sql = 'select 1 from (values (1)) AS t1' + cursor.execute(ping_sql) + except Exception as ex: + if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): + msg = _LW('Database server has gone away: %s') % ex + LOG.warning(msg) + + # if the database server has gone away, all connections in the pool + # have become invalid and we can safely close all of them here, + # rather than waste time on checking of every single connection + engine.dispose() + + # this will be handled by SQLAlchemy and will force it to create + # a new connection and retry the original action + raise sqla_exc.DisconnectionError(msg) else: raise +def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None): + """Set the sql_mode session variable. + + MySQL supports several server modes. The default is None, but sessions + may choose to enable server modes like TRADITIONAL, ANSI, + several STRICT_* modes and others. + + Note: passing in '' (empty string) for sql_mode clears + the SQL mode for the session, overriding a potentially set + server default. + """ + + cursor = dbapi_con.cursor() + cursor.execute("SET SESSION sql_mode = %s", [sql_mode]) + + +def _mysql_get_effective_sql_mode(engine): + """Returns the effective SQL mode for connections from the engine pool. + + Returns ``None`` if the mode isn't available, otherwise returns the mode. + + """ + # Get the real effective SQL mode. Even when unset by + # our own config, the server may still be operating in a specific + # SQL mode as set by the server configuration. + # Also note that the checkout listener will be called on execute to + # set the mode if it's registered. + row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone() + if row is None: + return + return row[1] + + +def _mysql_check_effective_sql_mode(engine): + """Logs a message based on the effective SQL mode for MySQL connections.""" + realmode = _mysql_get_effective_sql_mode(engine) + + if realmode is None: + LOG.warning(_LW('Unable to detect effective SQL mode')) + return + + LOG.debug('MySQL server mode set to %s', realmode) + # 'TRADITIONAL' mode enables several other modes, so + # we need a substring match here + if not ('TRADITIONAL' in realmode.upper() or + 'STRICT_ALL_TABLES' in realmode.upper()): + LOG.warning(_LW("MySQL SQL mode is '%s', " + "consider enabling TRADITIONAL or STRICT_ALL_TABLES"), + realmode) + + +def _mysql_set_mode_callback(engine, sql_mode): + if sql_mode is not None: + mode_callback = functools.partial(_set_session_sql_mode, + sql_mode=sql_mode) + sqlalchemy.event.listen(engine, 'connect', mode_callback) + _mysql_check_effective_sql_mode(engine) + + def _is_db_connection_error(args): """Return True if error in connecting to db.""" # NOTE(adam_g): This is currently MySQL specific and needs to be extended # to support Postgres and others. - conn_err_codes = ('2002', '2003', '2006') + # For the db2, the error code is -30081 since the db2 is still not ready + conn_err_codes = ('2002', '2003', '2006', '2013', '-30081') for err_code in conn_err_codes: if args.find(err_code) != -1: return True return False -def create_engine(sql_connection, sqlite_fk=False): +def _raise_if_db_connection_lost(error, engine): + # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor) + # requires connection and cursor in incoming parameters, + # but we have no possibility to create connection if DB + # is not available, so in such case reconnect fails. + # But is_disconnect() ignores these parameters, so it + # makes sense to pass to function None as placeholder + # instead of connection and cursor. + if engine.dialect.is_disconnect(error, None, None): + raise exception.DBConnectionError(error) + + +def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, + idle_timeout=3600, + connection_debug=0, max_pool_size=None, max_overflow=None, + pool_timeout=None, sqlite_synchronous=True, + connection_trace=False, max_retries=10, retry_interval=10): """Return a new SQLAlchemy engine.""" + connection_dict = sqlalchemy.engine.url.make_url(sql_connection) engine_args = { - "pool_recycle": CONF.database.idle_timeout, - "echo": False, + "pool_recycle": idle_timeout, 'convert_unicode': True, } - # Map our SQL debug level to SQLAlchemy's options - if CONF.database.connection_debug >= 100: - engine_args['echo'] = 'debug' - elif CONF.database.connection_debug >= 50: - engine_args['echo'] = True + logger = logging.getLogger('sqlalchemy.engine') + + # Map SQL debug level to Python log level + if connection_debug >= 100: + logger.setLevel(logging.DEBUG) + elif connection_debug >= 50: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.WARNING) if "sqlite" in connection_dict.drivername: if sqlite_fk: engine_args["listeners"] = [SqliteForeignKeysListener()] engine_args["poolclass"] = NullPool - if CONF.database.connection == "sqlite://": + if sql_connection == "sqlite://": engine_args["poolclass"] = StaticPool engine_args["connect_args"] = {'check_same_thread': False} else: - engine_args['pool_size'] = CONF.database.max_pool_size - if CONF.database.max_overflow is not None: - engine_args['max_overflow'] = CONF.database.max_overflow + if max_pool_size is not None: + engine_args['pool_size'] = max_pool_size + if max_overflow is not None: + engine_args['max_overflow'] = max_overflow + if pool_timeout is not None: + engine_args['pool_timeout'] = pool_timeout engine = sqlalchemy.create_engine(sql_connection, **engine_args) - sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) + sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(engine, 'checkout', _ping_listener) + if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'): + ping_callback = functools.partial(_ping_listener, engine) + sqlalchemy.event.listen(engine, 'checkout', ping_callback) + if engine.name == 'mysql': + if mysql_sql_mode: + _mysql_set_mode_callback(engine, mysql_sql_mode) elif 'sqlite' in connection_dict.drivername: - if not CONF.sqlite_synchronous: + if not sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', _synchronous_switch_listener) sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) - if (CONF.database.connection_trace and - engine.dialect.dbapi.__name__ == 'MySQLdb'): + if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb': _patch_mysqldb_with_stacktrace_comments() try: @@ -603,15 +666,15 @@ def create_engine(sql_connection, sqlite_fk=False): if not _is_db_connection_error(e.args[0]): raise - remaining = CONF.database.max_retries + remaining = max_retries if remaining == -1: remaining = 'infinite' while True: - msg = _('SQL connection failed. %s attempts left.') - LOG.warn(msg % remaining) + msg = _LW('SQL connection failed. %s attempts left.') + LOG.warning(msg % remaining) if remaining != 'infinite': remaining -= 1 - time.sleep(CONF.database.retry_interval) + time.sleep(retry_interval) try: engine.connect() break @@ -656,8 +719,9 @@ def get_maker(engine, autocommit=True, expire_on_commit=False): def _patch_mysqldb_with_stacktrace_comments(): - """Adds current stack trace as a comment in queries by patching - MySQLdb.cursors.BaseCursor._do_query. + """Adds current stack trace as a comment in queries. + + Patches MySQLdb.cursors.BaseCursor._do_query. """ import MySQLdb.cursors import traceback @@ -666,25 +730,25 @@ def _patch_mysqldb_with_stacktrace_comments(): def _do_query(self, q): stack = '' - for file, line, method, function in traceback.extract_stack(): + for filename, line, method, function in traceback.extract_stack(): # exclude various common things from trace - if file.endswith('session.py') and method == '_do_query': + if filename.endswith('session.py') and method == '_do_query': continue - if file.endswith('api.py') and method == 'wrapper': + if filename.endswith('api.py') and method == 'wrapper': continue - if file.endswith('utils.py') and method == '_inner': + if filename.endswith('utils.py') and method == '_inner': continue - if file.endswith('exception.py') and method == '_wrap': + if filename.endswith('exception.py') and method == '_wrap': continue # db/api is just a wrapper around db/sqlalchemy/api - if file.endswith('db/api.py'): + if filename.endswith('db/api.py'): continue # only trace inside cinder - index = file.rfind('cinder') + index = filename.rfind('cinder') if index == -1: continue stack += "File:%s:%s Method:%s() Line:%s | " \ - % (file[index:], line, method, function) + % (filename[index:], line, method, function) # strip trailing " | " from stack if stack: @@ -695,3 +759,146 @@ def _patch_mysqldb_with_stacktrace_comments(): old_mysql_do_query(self, qq) setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) + + +class EngineFacade(object): + """A helper class for removing of global engine instances from cinder.db. + + As a library, cinder.db can't decide where to store/when to create engine + and sessionmaker instances, so this must be left for a target application. + + On the other hand, in order to simplify the adoption of cinder.db changes, + we'll provide a helper class, which creates engine and sessionmaker + on its instantiation and provides get_engine()/get_session() methods + that are compatible with corresponding utility functions that currently + exist in target projects, e.g. in Nova. + + engine/sessionmaker instances will still be global (and they are meant to + be global), but they will be stored in the app context, rather that in the + cinder.db context. + + Note: using of this helper is completely optional and you are encouraged to + integrate engine/sessionmaker instances into your apps any way you like + (e.g. one might want to bind a session to a request context). Two important + things to remember: + + 1. An Engine instance is effectively a pool of DB connections, so it's + meant to be shared (and it's thread-safe). + 2. A Session instance is not meant to be shared and represents a DB + transactional context (i.e. it's not thread-safe). sessionmaker is + a factory of sessions. + + """ + + def __init__(self, sql_connection, + sqlite_fk=False, autocommit=True, + expire_on_commit=False, **kwargs): + """Initialize engine and sessionmaker instances. + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + Keyword arguments: + + :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. + (defaults to TRADITIONAL) + :keyword idle_timeout: timeout before idle sql connections are reaped + (defaults to 3600) + :keyword connection_debug: verbosity of SQL debugging information. + 0=None, 100=Everything (defaults to 0) + :keyword max_pool_size: maximum number of SQL connections to keep open + in a pool (defaults to SQLAlchemy settings) + :keyword max_overflow: if set, use this value for max_overflow with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword pool_timeout: if set, use this value for pool_timeout with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword sqlite_synchronous: if True, SQLite uses synchronous mode + (defaults to True) + :keyword connection_trace: add python stack traces to SQL as comment + strings (defaults to False) + :keyword max_retries: maximum db connection retries during startup. + (setting -1 implies an infinite retry count) + (defaults to 10) + :keyword retry_interval: interval between retries of opening a sql + connection (defaults to 10) + + """ + + super(EngineFacade, self).__init__() + + self._engine = create_engine( + sql_connection=sql_connection, + sqlite_fk=sqlite_fk, + mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'), + idle_timeout=kwargs.get('idle_timeout', 3600), + connection_debug=kwargs.get('connection_debug', 0), + max_pool_size=kwargs.get('max_pool_size'), + max_overflow=kwargs.get('max_overflow'), + pool_timeout=kwargs.get('pool_timeout'), + sqlite_synchronous=kwargs.get('sqlite_synchronous', True), + connection_trace=kwargs.get('connection_trace', False), + max_retries=kwargs.get('max_retries', 10), + retry_interval=kwargs.get('retry_interval', 10)) + self._session_maker = get_maker( + engine=self._engine, + autocommit=autocommit, + expire_on_commit=expire_on_commit) + + def get_engine(self): + """Get the engine instance (note, that it's shared).""" + + return self._engine + + def get_session(self, **kwargs): + """Get a Session instance. + + If passed, keyword arguments values override the ones used when the + sessionmaker instance was created. + + :keyword autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :keyword expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + for arg in kwargs: + if arg not in ('autocommit', 'expire_on_commit'): + del kwargs[arg] + + return self._session_maker(**kwargs) + + @classmethod + def from_config(cls, connection_string, conf, + sqlite_fk=False, autocommit=True, expire_on_commit=False): + """Initialize EngineFacade using oslo.config config instance options. + + :param connection_string: SQLAlchemy connection string + :type connection_string: string + + :param conf: oslo.config config instance + :type conf: oslo.config.cfg.ConfigOpts + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + return cls(sql_connection=connection_string, + sqlite_fk=sqlite_fk, + autocommit=autocommit, + expire_on_commit=expire_on_commit, + **dict(conf.database.items())) diff --git a/cinder/openstack/common/db/sqlalchemy/test_base.py b/cinder/openstack/common/db/sqlalchemy/test_base.py new file mode 100644 index 00000000000..3df0828da4a --- /dev/null +++ b/cinder/openstack/common/db/sqlalchemy/test_base.py @@ -0,0 +1,165 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import functools +import os + +import fixtures +from oslotest import base as test_base +import six + +from cinder.openstack.common.db.sqlalchemy import provision +from cinder.openstack.common.db.sqlalchemy import session +from cinder.openstack.common.db.sqlalchemy import utils + + +class DbFixture(fixtures.Fixture): + """Basic database fixture. + + Allows to run tests on various db backends, such as SQLite, MySQL and + PostgreSQL. By default use sqlite backend. To override default backend + uri set env variable OS_TEST_DBAPI_CONNECTION with database admin + credentials for specific backend. + """ + + def _get_uri(self): + return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://') + + def __init__(self, test): + super(DbFixture, self).__init__() + + self.test = test + + def cleanUp(self): + self.test.engine.dispose() + + def setUp(self): + super(DbFixture, self).setUp() + + self.test.engine = session.create_engine(self._get_uri()) + self.test.sessionmaker = session.get_maker(self.test.engine) + + +class DbTestCase(test_base.BaseTestCase): + """Base class for testing of DB code. + + Using `DbFixture`. Intended to be the main database test case to use all + the tests on a given backend with user defined uri. Backend specific + tests should be decorated with `backend_specific` decorator. + """ + + FIXTURE = DbFixture + + def setUp(self): + super(DbTestCase, self).setUp() + self.useFixture(self.FIXTURE(self)) + + +ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql'] + + +def backend_specific(*dialects): + """Decorator to skip backend specific tests on inappropriate engines. + + ::dialects: list of dialects names under which the test will be launched. + """ + def wrap(f): + @functools.wraps(f) + def ins_wrap(self): + if not set(dialects).issubset(ALLOWED_DIALECTS): + raise ValueError( + "Please use allowed dialects: %s" % ALLOWED_DIALECTS) + if self.engine.name not in dialects: + msg = ('The test "%s" can be run ' + 'only on %s. Current engine is %s.') + args = (f.__name__, ' '.join(dialects), self.engine.name) + self.skip(msg % args) + else: + return f(self) + return ins_wrap + return wrap + + +@six.add_metaclass(abc.ABCMeta) +class OpportunisticFixture(DbFixture): + """Base fixture to use default CI databases. + + The databases exist in OpenStack CI infrastructure. But for the + correct functioning in local environment the databases must be + created manually. + """ + + DRIVER = abc.abstractproperty(lambda: None) + DBNAME = PASSWORD = USERNAME = 'openstack_citest' + + def setUp(self): + self._provisioning_engine = provision.get_engine( + utils.get_connect_string(backend=self.DRIVER, + user=self.USERNAME, + passwd=self.PASSWORD, + database=self.DBNAME) + ) + self._uri = provision.create_database(self._provisioning_engine) + + super(OpportunisticFixture, self).setUp() + + def cleanUp(self): + super(OpportunisticFixture, self).cleanUp() + + provision.drop_database(self._provisioning_engine, self._uri) + + def _get_uri(self): + return self._uri + + +@six.add_metaclass(abc.ABCMeta) +class OpportunisticTestCase(DbTestCase): + """Base test case to use default CI databases. + + The subclasses of the test case are running only when openstack_citest + database is available otherwise a tests will be skipped. + """ + + FIXTURE = abc.abstractproperty(lambda: None) + + def setUp(self): + credentials = { + 'backend': self.FIXTURE.DRIVER, + 'user': self.FIXTURE.USERNAME, + 'passwd': self.FIXTURE.PASSWORD, + 'database': self.FIXTURE.DBNAME} + + if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials): + msg = '%s backend is not available.' % self.FIXTURE.DRIVER + return self.skip(msg) + + super(OpportunisticTestCase, self).setUp() + + +class MySQLOpportunisticFixture(OpportunisticFixture): + DRIVER = 'mysql' + + +class PostgreSQLOpportunisticFixture(OpportunisticFixture): + DRIVER = 'postgresql' + + +class MySQLOpportunisticTestCase(OpportunisticTestCase): + FIXTURE = MySQLOpportunisticFixture + + +class PostgreSQLOpportunisticTestCase(OpportunisticTestCase): + FIXTURE = PostgreSQLOpportunisticFixture diff --git a/cinder/openstack/common/db/sqlalchemy/test_migrations.py b/cinder/openstack/common/db/sqlalchemy/test_migrations.py new file mode 100644 index 00000000000..2300306c798 --- /dev/null +++ b/cinder/openstack/common/db/sqlalchemy/test_migrations.py @@ -0,0 +1,269 @@ +# Copyright 2010-2011 OpenStack Foundation +# Copyright 2012-2013 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import logging +import os +import subprocess + +import lockfile +from oslotest import base as test_base +from six import moves +from six.moves.urllib import parse +import sqlalchemy +import sqlalchemy.exc + +from cinder.openstack.common.db.sqlalchemy import utils +from cinder.openstack.common.gettextutils import _LE + +LOG = logging.getLogger(__name__) + + +def _have_mysql(user, passwd, database): + present = os.environ.get('TEST_MYSQL_PRESENT') + if present is None: + return utils.is_backend_avail(backend='mysql', + user=user, + passwd=passwd, + database=database) + return present.lower() in ('', 'true') + + +def _have_postgresql(user, passwd, database): + present = os.environ.get('TEST_POSTGRESQL_PRESENT') + if present is None: + return utils.is_backend_avail(backend='postgres', + user=user, + passwd=passwd, + database=database) + return present.lower() in ('', 'true') + + +def _set_db_lock(lock_path=None, lock_prefix=None): + def decorator(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + try: + path = lock_path or os.environ.get("CINDER_LOCK_PATH") + lock = lockfile.FileLock(os.path.join(path, lock_prefix)) + with lock: + LOG.debug('Got lock "%s"' % f.__name__) + return f(*args, **kwargs) + finally: + LOG.debug('Lock released "%s"' % f.__name__) + return wrapper + return decorator + + +class BaseMigrationTestCase(test_base.BaseTestCase): + """Base class fort testing of migration utils.""" + + def __init__(self, *args, **kwargs): + super(BaseMigrationTestCase, self).__init__(*args, **kwargs) + + self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), + 'test_migrations.conf') + # Test machines can set the TEST_MIGRATIONS_CONF variable + # to override the location of the config file for migration testing + self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF', + self.DEFAULT_CONFIG_FILE) + self.test_databases = {} + self.migration_api = None + + def setUp(self): + super(BaseMigrationTestCase, self).setUp() + + # Load test databases from the config file. Only do this + # once. No need to re-run this on each test... + LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH) + if os.path.exists(self.CONFIG_FILE_PATH): + cp = moves.configparser.RawConfigParser() + try: + cp.read(self.CONFIG_FILE_PATH) + defaults = cp.defaults() + for key, value in defaults.items(): + self.test_databases[key] = value + except moves.configparser.ParsingError as e: + self.fail("Failed to read test_migrations.conf config " + "file. Got error: %s" % e) + else: + self.fail("Failed to find test_migrations.conf config " + "file.") + + self.engines = {} + for key, value in self.test_databases.items(): + self.engines[key] = sqlalchemy.create_engine(value) + + # We start each test case with a completely blank slate. + self._reset_databases() + + def tearDown(self): + # We destroy the test data store between each test case, + # and recreate it, which ensures that we have no side-effects + # from the tests + self._reset_databases() + super(BaseMigrationTestCase, self).tearDown() + + def execute_cmd(self, cmd=None): + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + output = process.communicate()[0] + LOG.debug(output) + self.assertEqual(0, process.returncode, + "Failed to run: %s\n%s" % (cmd, output)) + + def _reset_pg(self, conn_pieces): + (user, + password, + database, + host) = utils.get_db_connection_info(conn_pieces) + os.environ['PGPASSWORD'] = password + os.environ['PGUSER'] = user + # note(boris-42): We must create and drop database, we can't + # drop database which we have connected to, so for such + # operations there is a special database template1. + sqlcmd = ("psql -w -U %(user)s -h %(host)s -c" + " '%(sql)s' -d template1") + + sql = ("drop database if exists %s;") % database + droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(droptable) + + sql = ("create database %s;") % database + createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(createtable) + + os.unsetenv('PGPASSWORD') + os.unsetenv('PGUSER') + + @_set_db_lock(lock_prefix='migration_tests-') + def _reset_databases(self): + for key, engine in self.engines.items(): + conn_string = self.test_databases[key] + conn_pieces = parse.urlparse(conn_string) + engine.dispose() + if conn_string.startswith('sqlite'): + # We can just delete the SQLite database, which is + # the easiest and cleanest solution + db_path = conn_pieces.path.strip('/') + if os.path.exists(db_path): + os.unlink(db_path) + # No need to recreate the SQLite DB. SQLite will + # create it for us if it's not there... + elif conn_string.startswith('mysql'): + # We can execute the MySQL client to destroy and re-create + # the MYSQL database, which is easier and less error-prone + # than using SQLAlchemy to do this via MetaData...trust me. + (user, password, database, host) = \ + utils.get_db_connection_info(conn_pieces) + sql = ("drop database if exists %(db)s; " + "create database %(db)s;") % {'db': database} + cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s " + "-e \"%(sql)s\"") % {'user': user, 'password': password, + 'host': host, 'sql': sql} + self.execute_cmd(cmd) + elif conn_string.startswith('postgresql'): + self._reset_pg(conn_pieces) + + +class WalkVersionsMixin(object): + def _walk_versions(self, engine=None, snake_walk=False, downgrade=True): + # Determine latest version script from the repo, then + # upgrade from 1 through to the latest, with no data + # in the databases. This just checks that the schema itself + # upgrades successfully. + + # Place the database under version control + self.migration_api.version_control(engine, self.REPOSITORY, + self.INIT_VERSION) + self.assertEqual(self.INIT_VERSION, + self.migration_api.db_version(engine, + self.REPOSITORY)) + + LOG.debug('latest version is %s' % self.REPOSITORY.latest) + versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1) + + for version in versions: + # upgrade -> downgrade -> upgrade + self._migrate_up(engine, version, with_data=True) + if snake_walk: + downgraded = self._migrate_down( + engine, version - 1, with_data=True) + if downgraded: + self._migrate_up(engine, version) + + if downgrade: + # Now walk it back down to 0 from the latest, testing + # the downgrade paths. + for version in reversed(versions): + # downgrade -> upgrade -> downgrade + downgraded = self._migrate_down(engine, version - 1) + + if snake_walk and downgraded: + self._migrate_up(engine, version) + self._migrate_down(engine, version - 1) + + def _migrate_down(self, engine, version, with_data=False): + try: + self.migration_api.downgrade(engine, self.REPOSITORY, version) + except NotImplementedError: + # NOTE(sirp): some migrations, namely release-level + # migrations, don't support a downgrade. + return False + + self.assertEqual( + version, self.migration_api.db_version(engine, self.REPOSITORY)) + + # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target' + # version). So if we have any downgrade checks, they need to be run for + # the previous (higher numbered) migration. + if with_data: + post_downgrade = getattr( + self, "_post_downgrade_%03d" % (version + 1), None) + if post_downgrade: + post_downgrade(engine) + + return True + + def _migrate_up(self, engine, version, with_data=False): + """migrate up to a new version of the db. + + We allow for data insertion and post checks at every + migration version with special _pre_upgrade_### and + _check_### functions in the main test. + """ + # NOTE(sdague): try block is here because it's impossible to debug + # where a failed data migration happens otherwise + try: + if with_data: + data = None + pre_upgrade = getattr( + self, "_pre_upgrade_%03d" % version, None) + if pre_upgrade: + data = pre_upgrade(engine) + + self.migration_api.upgrade(engine, self.REPOSITORY, version) + self.assertEqual(version, + self.migration_api.db_version(engine, + self.REPOSITORY)) + if with_data: + check = getattr(self, "_check_%03d" % version, None) + if check: + check(engine, data) + except Exception: + LOG.error(_LE("Failed to migrate to version %s on engine %s") % + (version, engine)) + raise diff --git a/cinder/openstack/common/db/sqlalchemy/utils.py b/cinder/openstack/common/db/sqlalchemy/utils.py index 6e30d553f34..696cdf9c8c5 100644 --- a/cinder/openstack/common/db/sqlalchemy/utils.py +++ b/cinder/openstack/common/db/sqlalchemy/utils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2010-2011 OpenStack Foundation. @@ -18,16 +16,43 @@ # License for the specific language governing permissions and limitations # under the License. -"""Implementation of paginate query.""" +import logging +import re import sqlalchemy +from sqlalchemy import Boolean +from sqlalchemy import CheckConstraint +from sqlalchemy import Column +from sqlalchemy.engine import reflection +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func +from sqlalchemy import Index +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import or_ +from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.types import NullType -from cinder.openstack.common.gettextutils import _ -from cinder.openstack.common import log as logging +from cinder.openstack.common import context as request_context +from cinder.openstack.common.db.sqlalchemy import models +from cinder.openstack.common.gettextutils import _, _LI, _LW +from cinder.openstack.common import timeutils LOG = logging.getLogger(__name__) +_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+") + + +def sanitize_db_url(url): + match = _DBURL_REGEX.match(url) + if match: + return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):]) + return url + class InvalidSortKey(Exception): message = _("Sort key supplied was not valid.") @@ -69,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None, if 'id' not in sort_keys: # TODO(justinsb): If this ever gives a false-positive, check # the actual primary key, rather than assuming its id - LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) + LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) assert(not (sort_dir and sort_dirs)) @@ -85,11 +110,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Add sorting for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - + try: + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + except KeyError: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) try: sort_key_attr = getattr(model, current_sort_key) except AttributeError: @@ -105,20 +133,17 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Build up an array of sort criteria as in the docstring criteria_list = [] - for i in range(0, len(sort_keys)): + for i in range(len(sort_keys)): crit_attrs = [] - for j in range(0, i): + for j in range(i): model_attr = getattr(model, sort_keys[j]) crit_attrs.append((model_attr == marker_values[j])) model_attr = getattr(model, sort_keys[i]) if sort_dirs[i] == 'desc': crit_attrs.append((model_attr < marker_values[i])) - elif sort_dirs[i] == 'asc': - crit_attrs.append((model_attr > marker_values[i])) else: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) + crit_attrs.append((model_attr > marker_values[i])) criteria = sqlalchemy.sql.and_(*crit_attrs) criteria_list.append(criteria) @@ -130,3 +155,501 @@ def paginate_query(query, model, limit, sort_keys, marker=None, query = query.limit(limit) return query + + +def _read_deleted_filter(query, db_model, read_deleted): + if 'deleted' not in db_model.__table__.columns: + raise ValueError(_("There is no `deleted` column in `%s` table. " + "Project doesn't use soft-deleted feature.") + % db_model.__name__) + + default_deleted_value = db_model.__table__.c.deleted.default.arg + if read_deleted == 'no': + query = query.filter(db_model.deleted == default_deleted_value) + elif read_deleted == 'yes': + pass # omit the filter to include deleted and active + elif read_deleted == 'only': + query = query.filter(db_model.deleted != default_deleted_value) + else: + raise ValueError(_("Unrecognized read_deleted value '%s'") + % read_deleted) + return query + + +def _project_filter(query, db_model, context, project_only): + if project_only and 'project_id' not in db_model.__table__.columns: + raise ValueError(_("There is no `project_id` column in `%s` table.") + % db_model.__name__) + + if request_context.is_user_context(context) and project_only: + if project_only == 'allow_none': + is_none = None + query = query.filter(or_(db_model.project_id == context.project_id, + db_model.project_id == is_none)) + else: + query = query.filter(db_model.project_id == context.project_id) + + return query + + +def model_query(context, model, session, args=None, project_only=False, + read_deleted=None): + """Query helper that accounts for context's `read_deleted` field. + + :param context: context to query under + + :param model: Model to query. Must be a subclass of ModelBase. + :type model: models.ModelBase + + :param session: The session to use. + :type session: sqlalchemy.orm.session.Session + + :param args: Arguments to query. If None - model is used. + :type args: tuple + + :param project_only: If present and context is user-type, then restrict + query to match the context's project_id. If set to + 'allow_none', restriction includes project_id = None. + :type project_only: bool + + :param read_deleted: If present, overrides context's read_deleted field. + :type read_deleted: bool + + Usage: + + ..code:: python + + result = (utils.model_query(context, models.Instance, session=session) + .filter_by(uuid=instance_uuid) + .all()) + + query = utils.model_query( + context, Node, + session=session, + args=(func.count(Node.id), func.sum(Node.ram)) + ).filter_by(project_id=project_id) + + """ + + if not read_deleted: + if hasattr(context, 'read_deleted'): + # NOTE(viktors): some projects use `read_deleted` attribute in + # their contexts instead of `show_deleted`. + read_deleted = context.read_deleted + else: + read_deleted = context.show_deleted + + if not issubclass(model, models.ModelBase): + raise TypeError(_("model should be a subclass of ModelBase")) + + query = session.query(model) if not args else session.query(*args) + query = _read_deleted_filter(query, model, read_deleted) + query = _project_filter(query, model, context, project_only) + + return query + + +def get_table(engine, name): + """Returns an sqlalchemy table dynamically from db. + + Needed because the models don't work for us in migrations + as models will be far out of sync with the current data. + + .. warning:: + + Do not use this method when creating ForeignKeys in database migrations + because sqlalchemy needs the same MetaData object to hold information + about the parent table and the reference table in the ForeignKey. This + method uses a unique MetaData object per table object so it won't work + with ForeignKey creation. + """ + metadata = MetaData() + metadata.bind = engine + return Table(name, metadata, autoload=True) + + +class InsertFromSelect(UpdateBase): + """Form the base for `INSERT INTO table (SELECT ... )` statement.""" + def __init__(self, table, select): + self.table = table + self.select = select + + +@compiles(InsertFromSelect) +def visit_insert_from_select(element, compiler, **kw): + """Form the `INSERT INTO table (SELECT ... )` statement.""" + return "INSERT INTO %s %s" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.select)) + + +class ColumnError(Exception): + """Error raised when no column or an invalid column is found.""" + + +def _get_not_supported_column(col_name_col_instance, column_name): + try: + column = col_name_col_instance[column_name] + except KeyError: + msg = _("Please specify column %s in col_name_col_instance " + "param. It is required because column has unsupported " + "type by sqlite).") + raise ColumnError(msg % column_name) + + if not isinstance(column, Column): + msg = _("col_name_col_instance param has wrong type of " + "column instance for column %s It should be instance " + "of sqlalchemy.Column.") + raise ColumnError(msg % column_name) + return column + + +def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, + **col_name_col_instance): + """Drop unique constraint from table. + + DEPRECATED: this function is deprecated and will be removed from cinder.db + in a few releases. Please use UniqueConstraint.drop() method directly for + sqlalchemy-migrate migration scripts. + + This method drops UC from table and works for mysql, postgresql and sqlite. + In mysql and postgresql we are able to use "alter table" construction. + Sqlalchemy doesn't support some sqlite column types and replaces their + type with NullType in metadata. We process these columns and replace + NullType with the correct column type. + + :param migrate_engine: sqlalchemy engine + :param table_name: name of table that contains uniq constraint. + :param uc_name: name of uniq constraint that will be dropped. + :param columns: columns that are in uniq constraint. + :param col_name_col_instance: contains pair column_name=column_instance. + column_instance is instance of Column. These params + are required only for columns that have unsupported + types by sqlite. For example BigInteger. + """ + + from migrate.changeset import UniqueConstraint + + meta = MetaData() + meta.bind = migrate_engine + t = Table(table_name, meta, autoload=True) + + if migrate_engine.name == "sqlite": + override_cols = [ + _get_not_supported_column(col_name_col_instance, col.name) + for col in t.columns + if isinstance(col.type, NullType) + ] + for col in override_cols: + t.columns.replace(col) + + uc = UniqueConstraint(*columns, table=t, name=uc_name) + uc.drop() + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """Drop all old rows having the same values for columns in uc_columns. + + This method drop (or mark ad `deleted` if use_soft_delete is True) old + duplicate rows form table with name `table_name`. + + :param migrate_engine: Sqlalchemy engine + :param table_name: Table with duplicates + :param use_soft_delete: If True - values will be marked as `deleted`, + if False - values will be removed from table + :param uc_column_names: Unique constraint columns + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(columns_for_group_by) + + duplicated_rows_select = sqlalchemy.sql.select( + columns_for_select, group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + is_none = None # workaround for pyflakes + delete_condition &= table.c.deleted_at == is_none + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = sqlalchemy.sql.select( + [table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) + + +def _get_default_deleted_value(table): + if isinstance(table.c.id.type, Integer): + return 0 + if isinstance(table.c.id.type, String): + return "" + raise ColumnError(_("Unsupported id columns type")) + + +def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): + table = get_table(migrate_engine, table_name) + + insp = reflection.Inspector.from_engine(migrate_engine) + real_indexes = insp.get_indexes(table_name) + existing_index_names = dict( + [(index['name'], index['column_names']) for index in real_indexes]) + + # NOTE(boris-42): Restore indexes on `deleted` column + for index in indexes: + if 'deleted' not in index['column_names']: + continue + name = index['name'] + if name in existing_index_names: + column_names = [table.c[c] for c in existing_index_names[name]] + old_index = Index(name, *column_names, unique=index["unique"]) + old_index.drop(migrate_engine) + + column_names = [table.c[c] for c in index['column_names']] + new_index = Index(index["name"], *column_names, unique=index["unique"]) + new_index.create(migrate_engine) + + +def change_deleted_column_type_to_boolean(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_boolean_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + old_deleted = Column('old_deleted', Boolean, default=False) + old_deleted.create(table, populate_default=False) + + table.update().\ + where(table.c.deleted == table.c.id).\ + values(old_deleted=True).\ + execute() + + table.c.deleted.drop() + table.c.old_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, + **col_name_col_instance): + insp = reflection.Inspector.from_engine(migrate_engine) + table = get_table(migrate_engine, table_name) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', Boolean, default=0) + columns.append(column_copy) + + constraints = [constraint.copy() for constraint in table.constraints] + + meta = table.metadata + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + c_select = [] + for c in table.c: + if c.name != "deleted": + c_select.append(c) + else: + c_select.append(table.c.deleted == table.c.id) + + ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select)) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + new_table.update().\ + where(new_table.c.deleted == new_table.c.id).\ + values(deleted=True).\ + execute() + + +def change_deleted_column_type_to_id_type(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_id_type_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + new_deleted = Column('new_deleted', table.c.id.type, + default=_get_default_deleted_value(table)) + new_deleted.create(table, populate_default=True) + + deleted = True # workaround for pyflakes + table.update().\ + where(table.c.deleted == deleted).\ + values(new_deleted=table.c.id).\ + execute() + table.c.deleted.drop() + table.c.new_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, + **col_name_col_instance): + # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check + # constraints in sqlite DB and our `deleted` column has + # 2 check constraints. So there is only one way to remove + # these constraints: + # 1) Create new table with the same columns, constraints + # and indexes. (except deleted column). + # 2) Copy all data from old to new table. + # 3) Drop old table. + # 4) Rename new table to old table name. + insp = reflection.Inspector.from_engine(migrate_engine) + meta = MetaData(bind=migrate_engine) + table = Table(table_name, meta, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', table.c.id.type, + default=default_deleted_value) + columns.append(column_copy) + + def is_deleted_column_constraint(constraint): + # NOTE(boris-42): There is no other way to check is CheckConstraint + # associated with deleted column. + if not isinstance(constraint, CheckConstraint): + return False + sqltext = str(constraint.sqltext) + return (sqltext.endswith("deleted in (0, 1)") or + sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) + + constraints = [] + for constraint in table.constraints: + if not is_deleted_column_constraint(constraint): + constraints.append(constraint.copy()) + + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + ins = InsertFromSelect(new_table, table.select()) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + deleted = True # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=new_table.c.id).\ + execute() + + # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. + deleted = False # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=default_deleted_value).\ + execute() + + +def get_connect_string(backend, database, user=None, passwd=None): + """Get database connection + + Try to get a connection with a very specific set of values, if we get + these then we'll run the tests, otherwise they are skipped + """ + args = {'backend': backend, + 'user': user, + 'passwd': passwd, + 'database': database} + if backend == 'sqlite': + template = '%(backend)s:///%(database)s' + else: + template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" + return template % args + + +def is_backend_avail(backend, database, user=None, passwd=None): + try: + connect_uri = get_connect_string(backend=backend, + database=database, + user=user, + passwd=passwd) + engine = sqlalchemy.create_engine(connect_uri) + connection = engine.connect() + except Exception: + # intentionally catch all to handle exceptions even if we don't + # have any backend code loaded. + return False + else: + connection.close() + engine.dispose() + return True + + +def get_db_connection_info(conn_pieces): + database = conn_pieces.path.strip('/') + loc_pieces = conn_pieces.netloc.split('@') + host = loc_pieces[1] + + auth_pieces = loc_pieces[0].split(':') + user = auth_pieces[0] + password = "" + if len(auth_pieces) > 1: + password = auth_pieces[1].strip() + + return (user, password, database, host) diff --git a/cinder/openstack/common/test.py b/cinder/openstack/common/test.py new file mode 100644 index 00000000000..f8f30e916a1 --- /dev/null +++ b/cinder/openstack/common/test.py @@ -0,0 +1,99 @@ +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +############################################################################## +############################################################################## +## +## DO NOT MODIFY THIS FILE +## +## This file is being graduated to the cindertest library. Please make all +## changes there, and only backport critical fixes here. - dhellmann +## +############################################################################## +############################################################################## + +"""Common utilities used in testing""" + +import logging +import os +import tempfile + +import fixtures +import testtools + +_TRUE_VALUES = ('True', 'true', '1', 'yes') +_LOG_FORMAT = "%(levelname)8s [%(name)s] %(message)s" + + +class BaseTestCase(testtools.TestCase): + + def setUp(self): + super(BaseTestCase, self).setUp() + self._set_timeout() + self._fake_output() + self._fake_logs() + self.useFixture(fixtures.NestedTempfile()) + self.useFixture(fixtures.TempHomeDir()) + self.tempdirs = [] + + def _set_timeout(self): + test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) + try: + test_timeout = int(test_timeout) + except ValueError: + # If timeout value is invalid do not set a timeout. + test_timeout = 0 + if test_timeout > 0: + self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) + + def _fake_output(self): + if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES: + stdout = self.useFixture(fixtures.StringStream('stdout')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) + if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES: + stderr = self.useFixture(fixtures.StringStream('stderr')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) + + def _fake_logs(self): + if os.environ.get('OS_DEBUG') in _TRUE_VALUES: + level = logging.DEBUG + else: + level = logging.INFO + capture_logs = os.environ.get('OS_LOG_CAPTURE') in _TRUE_VALUES + if capture_logs: + self.useFixture( + fixtures.FakeLogger( + format=_LOG_FORMAT, + level=level, + nuke_handlers=capture_logs, + ) + ) + else: + logging.basicConfig(format=_LOG_FORMAT, level=level) + + def create_tempfiles(self, files, ext='.conf'): + tempfiles = [] + for (basename, contents) in files: + if not os.path.isabs(basename): + (fd, path) = tempfile.mkstemp(prefix=basename, suffix=ext) + else: + path = basename + ext + fd = os.open(path, os.O_CREAT | os.O_WRONLY) + tempfiles.append(path) + try: + os.write(fd, contents) + finally: + os.close(fd) + return tempfiles diff --git a/cinder/test.py b/cinder/test.py index 1523efc448c..a5968ad1f8a 100644 --- a/cinder/test.py +++ b/cinder/test.py @@ -37,7 +37,7 @@ from testtools import matchers from cinder.common import config # noqa Need to register global_opts from cinder.db import migration -from cinder.openstack.common.db.sqlalchemy import session +from cinder.db.sqlalchemy import api as sqla_api from cinder.openstack.common import log as oslo_logging from cinder.openstack.common import strutils from cinder.openstack.common import timeutils @@ -65,13 +65,13 @@ class TestingException(Exception): class Database(fixtures.Fixture): - def __init__(self, db_session, db_migrate, sql_connection, + def __init__(self, db_api, db_migrate, sql_connection, sqlite_db, sqlite_clean_db): self.sql_connection = sql_connection self.sqlite_db = sqlite_db self.sqlite_clean_db = sqlite_clean_db - self.engine = db_session.get_engine() + self.engine = db_api.get_engine() self.engine.dispose() conn = self.engine.connect() if sql_connection == "sqlite://": @@ -159,13 +159,13 @@ class TestCase(testtools.TestCase): self.start = timeutils.utcnow() CONF.set_default('connection', 'sqlite://', 'database') - CONF.set_default('sqlite_synchronous', False) + CONF.set_default('sqlite_synchronous', False, 'database') global _DB_CACHE if not _DB_CACHE: - _DB_CACHE = Database(session, migration, + _DB_CACHE = Database(sqla_api, migration, sql_connection=CONF.database.connection, - sqlite_db=CONF.sqlite_db, + sqlite_db=CONF.database.sqlite_db, sqlite_clean_db=CONF.sqlite_clean_db) self.useFixture(_DB_CACHE) diff --git a/cinder/tests/conf_fixture.py b/cinder/tests/conf_fixture.py index e3a913f0821..00153de5c4c 100644 --- a/cinder/tests/conf_fixture.py +++ b/cinder/tests/conf_fixture.py @@ -42,7 +42,7 @@ def set_defaults(conf): conf.set_default('rpc_backend', 'cinder.openstack.common.rpc.impl_fake') conf.set_default('iscsi_num_targets', 8) conf.set_default('connection', 'sqlite://', group='database') - conf.set_default('sqlite_synchronous', False) + conf.set_default('sqlite_synchronous', False, group='database') conf.set_default('policy_file', 'cinder/tests/policy.json') conf.set_default( 'xiv_ds8k_proxy', diff --git a/cinder/tests/test_storwize_svc.py b/cinder/tests/test_storwize_svc.py index 7898d6570b5..7ca0276e166 100644 --- a/cinder/tests/test_storwize_svc.py +++ b/cinder/tests/test_storwize_svc.py @@ -21,6 +21,7 @@ Tests for the IBM Storwize family and SVC volume driver. import mock import random import re +import testtools from cinder import context from cinder import exception @@ -2369,6 +2370,7 @@ class StorwizeSVCDriverTestCase(test.TestCase): self.assertEqual((7, 2, 0, 0), res['code_level'], 'Get code level error') + @testtools.skip("Bug #1302670") def test_storwize_vdisk_copy_ops(self): ctxt = testutils.get_test_admin_context() volume = self._create_volume() diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 9a31f3c2e63..363de3d721d 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -694,17 +694,6 @@ #image_conversion_dir=$state_path/conversion -# -# Options defined in cinder.openstack.common.db.sqlalchemy.session -# - -# the filename to use with sqlite (string value) -#sqlite_db=cinder.sqlite - -# If true, use synchronous mode for sqlite (boolean value) -#sqlite_synchronous=true - - # # Options defined in cinder.openstack.common.eventlet_backdoor # @@ -1919,13 +1908,9 @@ [database] # -# Options defined in cinder.openstack.common.db.api +# Options defined in cinder.db.api # -# The backend to use for db (string value) -# Deprecated group/name - [DEFAULT]/db_backend -#backend=sqlalchemy - # Enable the experimental use of thread pooling for all DB API # calls (boolean value) # Deprecated group/name - [DEFAULT]/dbapi_use_tpool @@ -1933,42 +1918,68 @@ # -# Options defined in cinder.openstack.common.db.sqlalchemy.session +# Options defined in cinder.openstack.common.db.options # +# The file name to use with SQLite (string value) +#sqlite_db=cinder.sqlite + +# If True, SQLite uses synchronous mode (boolean value) +#sqlite_synchronous=true + +# The backend to use for db (string value) +# Deprecated group/name - [DEFAULT]/db_backend +#backend=sqlalchemy + # The SQLAlchemy connection string used to connect to the # database (string value) # Deprecated group/name - [DEFAULT]/sql_connection +# Deprecated group/name - [DATABASE]/sql_connection +# Deprecated group/name - [sql]/connection #connection=sqlite:///$state_path/$sqlite_db -# timeout before idle sql connections are reaped (integer +# The SQL mode to be used for MySQL sessions. This option, +# including the default, overrides any server-set SQL mode. To +# use whatever SQL mode is set by the server configuration, +# set this to no value. Example: mysql_sql_mode= (string +# value) +#mysql_sql_mode=TRADITIONAL + +# Timeout before idle sql connections are reaped (integer # value) # Deprecated group/name - [DEFAULT]/sql_idle_timeout +# Deprecated group/name - [DATABASE]/sql_idle_timeout +# Deprecated group/name - [sql]/idle_timeout #idle_timeout=3600 # Minimum number of SQL connections to keep open in a pool # (integer value) # Deprecated group/name - [DEFAULT]/sql_min_pool_size +# Deprecated group/name - [DATABASE]/sql_min_pool_size #min_pool_size=1 # Maximum number of SQL connections to keep open in a pool # (integer value) # Deprecated group/name - [DEFAULT]/sql_max_pool_size -#max_pool_size=5 +# Deprecated group/name - [DATABASE]/sql_max_pool_size +#max_pool_size= -# maximum db connection retries during startup. (setting -1 +# Maximum db connection retries during startup. (setting -1 # implies an infinite retry count) (integer value) # Deprecated group/name - [DEFAULT]/sql_max_retries +# Deprecated group/name - [DATABASE]/sql_max_retries #max_retries=10 -# interval between retries of opening a sql connection +# Interval between retries of opening a sql connection # (integer value) # Deprecated group/name - [DEFAULT]/sql_retry_interval +# Deprecated group/name - [DATABASE]/reconnect_interval #retry_interval=10 # If set, use this value for max_overflow with sqlalchemy # (integer value) # Deprecated group/name - [DEFAULT]/sql_max_overflow +# Deprecated group/name - [DATABASE]/sqlalchemy_max_overflow #max_overflow= # Verbosity of SQL debugging information. 0=None, @@ -1981,6 +1992,30 @@ # Deprecated group/name - [DEFAULT]/sql_connection_trace #connection_trace=false +# If set, use this value for pool_timeout with sqlalchemy +# (integer value) +# Deprecated group/name - [DATABASE]/sqlalchemy_pool_timeout +#pool_timeout= + +# Enable the experimental use of database reconnect on +# connection lost (boolean value) +#use_db_reconnect=false + +# seconds between db connection retries (integer value) +#db_retry_interval=1 + +# Whether to increase interval between db connection retries, +# up to db_max_retry_interval (boolean value) +#db_inc_retry_interval=true + +# max seconds between db connection retries, if +# db_inc_retry_interval is enabled (integer value) +#db_max_retry_interval=10 + +# maximum db connection retries before error is raised. +# (setting -1 implies an infinite retry count) (integer value) +#db_max_retries=20 + [fc-zone-manager]