Use oslo db for db access

Change-Id: I5f33d747543b5ba936b8ac46e68bf43b1792e231
Partial-bug: #1552282
This commit is contained in:
gong yong sheng 2016-06-03 15:15:22 +08:00
parent 70190b66ba
commit 2a257882fb
17 changed files with 22 additions and 2426 deletions

View File

@ -25,6 +25,7 @@ six>=1.9.0 # MIT
stevedore>=1.10.0 # Apache-2.0 stevedore>=1.10.0 # Apache-2.0
oslo.concurrency>=3.8.0 # Apache-2.0 oslo.concurrency>=3.8.0 # Apache-2.0
oslo.config>=3.10.0 # Apache-2.0 oslo.config>=3.10.0 # Apache-2.0
oslo.db>=4.1.0 # Apache-2.0
oslo.log>=1.14.0 # Apache-2.0 oslo.log>=1.14.0 # Apache-2.0
oslo.messaging>=4.5.0 # Apache-2.0 oslo.messaging>=4.5.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0 oslo.rootwrap>=2.0.0 # Apache-2.0

View File

@ -20,12 +20,12 @@ Routines for configuring Tacker
import os import os
from oslo_config import cfg from oslo_config import cfg
from oslo_db import options as db_options
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
from paste import deploy from paste import deploy
from tacker.common import utils from tacker.common import utils
from tacker.openstack.common.db import options as db_options
from tacker import version from tacker import version
@ -95,12 +95,18 @@ cfg.CONF.register_cli_opts(core_cli_opts)
# Ensure that the control exchange is set correctly # Ensure that the control exchange is set correctly
oslo_messaging.set_transport_defaults(control_exchange='tacker') oslo_messaging.set_transport_defaults(control_exchange='tacker')
_SQL_CONNECTION_DEFAULT = 'sqlite://'
# Update the default QueuePool parameters. These can be tweaked by the
# configuration variables - max_pool_size, max_overflow and pool_timeout def set_db_defaults():
db_options.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT, # Update the default QueuePool parameters. These can be tweaked by the
sqlite_db='', max_pool_size=10, # conf variables - max_pool_size, max_overflow and pool_timeout
max_overflow=20, pool_timeout=10) db_options.set_defaults(
cfg.CONF,
connection='sqlite://',
sqlite_db='', max_pool_size=10,
max_overflow=20, pool_timeout=10)
set_db_defaults()
def init(args, **kwargs): def init(args, **kwargs):

View File

@ -14,17 +14,13 @@
# under the License. # under the License.
from oslo_config import cfg from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log as logging from oslo_log import log as logging
import sqlalchemy as sql
from tacker.db import model_base
from tacker.openstack.common.db.sqlalchemy import session
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
cfg.CONF.import_opt('connection', context_manager = enginefacade.transaction_context()
'tacker.openstack.common.db.options',
group='database')
_FACADE = None _FACADE = None
@ -33,25 +29,12 @@ def _create_facade_lazily():
global _FACADE global _FACADE
if _FACADE is None: if _FACADE is None:
_FACADE = session.EngineFacade.from_config( context_manager.configure(sqlite_fk=True, **cfg.CONF.database)
cfg.CONF.database.connection, cfg.CONF, sqlite_fk=True) _FACADE = context_manager._factory.get_legacy_facade()
return _FACADE return _FACADE
def configure_db():
"""Configure database.
Establish the database, create an engine if needed, and register
the models.
"""
register_models()
def clear_db(base=model_base.BASE):
unregister_models(base)
def get_engine(): def get_engine():
"""Helper method to grab engine.""" """Helper method to grab engine."""
facade = _create_facade_lazily() facade = _create_facade_lazily()
@ -63,25 +46,3 @@ def get_session(autocommit=True, expire_on_commit=False):
facade = _create_facade_lazily() facade = _create_facade_lazily()
return facade.get_session(autocommit=autocommit, return facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit) expire_on_commit=expire_on_commit)
def register_models(base=model_base.BASE):
"""Register Models and create properties."""
try:
facade = _create_facade_lazily()
engine = facade.get_engine()
base.metadata.create_all(engine)
except sql.exc.OperationalError as e:
LOG.info(_("Database registration exception: %s"), e)
return False
return True
def unregister_models(base=model_base.BASE):
"""Unregister Models, useful clearing out data before testing."""
try:
facade = _create_facade_lazily()
engine = facade.get_engine()
base.metadata.drop_all(engine)
except Exception:
LOG.exception(_("Database exception"))

View File

@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo_db.sqlalchemy import models
from sqlalchemy.ext import declarative from sqlalchemy.ext import declarative
from sqlalchemy import orm from sqlalchemy import orm
from tacker.openstack.common.db.sqlalchemy import models
class TackerBase(models.ModelBase): class TackerBase(models.ModelBase):
"""Base class for Tacker Models.""" """Base class for Tacker Models."""

View File

@ -16,19 +16,18 @@
import uuid import uuid
from oslo_db import exception
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_exc from sqlalchemy.orm import exc as orm_exc
from sqlalchemy import sql from sqlalchemy import sql
from tacker.db import api as tdbapi
from tacker.db import db_base from tacker.db import db_base
from tacker.db import model_base from tacker.db import model_base
from tacker.db import models_v1 from tacker.db import models_v1
from tacker.db.vm import vm_db from tacker.db.vm import vm_db
from tacker.extensions import nfvo from tacker.extensions import nfvo
from tacker import manager from tacker import manager
from tacker.openstack.common.db import exception
from tacker.openstack.common import uuidutils from tacker.openstack.common import uuidutils
@ -64,7 +63,6 @@ class VimAuth(model_base.BASE, models_v1.HasId):
class NfvoPluginDb(nfvo.NFVOPluginBase, db_base.CommonDbMixin): class NfvoPluginDb(nfvo.NFVOPluginBase, db_base.CommonDbMixin):
def __init__(self): def __init__(self):
tdbapi.register_models()
super(NfvoPluginDb, self).__init__() super(NfvoPluginDb, self).__init__()
@property @property

View File

@ -23,7 +23,6 @@ from sqlalchemy.orm import exc as orm_exc
from tacker.api.v1 import attributes from tacker.api.v1 import attributes
from tacker import context as t_context from tacker import context as t_context
from tacker.db import api as qdbapi
from tacker.db import db_base from tacker.db import db_base
from tacker.db import model_base from tacker.db import model_base
from tacker.db import models_v1 from tacker.db import models_v1
@ -148,7 +147,6 @@ class VNFMPluginDb(vnfm.VNFMPluginBase, db_base.CommonDbMixin):
return subnet['network_id'] return subnet['network_id']
def __init__(self): def __init__(self):
qdbapi.register_models()
super(VNFMPluginDb, self).__init__() super(VNFMPluginDb, self).__init__()
def _get_resource(self, context, model, id): def _get_resource(self, context, model, id):

View File

@ -1,162 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Multiple DB API backend support.
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
API methods.
"""
import functools
import logging
import threading
import time
from tacker.openstack.common.db import exception
from tacker.openstack.common.gettextutils import _LE
from tacker.openstack.common import importutils
LOG = logging.getLogger(__name__)
def safe_for_db_retry(f):
"""Enable db-retry for decorated function, if config option enabled."""
f.__dict__['enable_retry'] = True
return f
class wrap_db_retry(object):
"""Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect`
in config, this decorator will be applied to all db.api functions,
marked with @safe_for_db_retry decorator.
Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
"""
def __init__(self, retry_interval, max_retries, inc_retry_interval,
max_retry_interval):
super(wrap_db_retry, self).__init__()
self.retry_interval = retry_interval
self.max_retries = max_retries
self.inc_retry_interval = inc_retry_interval
self.max_retry_interval = max_retry_interval
def __call__(self, f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
next_interval = self.retry_interval
remaining = self.max_retries
while True:
try:
return f(*args, **kwargs)
except exception.DBConnectionError as e:
if remaining == 0:
LOG.exception(_LE('DB exceeded retry limit.'))
raise exception.DBError(e)
if remaining != -1:
remaining -= 1
LOG.exception(_LE('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so
# this effectively yields the execution
# context to another green thread.
time.sleep(next_interval)
if self.inc_retry_interval:
next_interval = min(
next_interval * 2,
self.max_retry_interval
)
return wrapper
class DBAPI(object):
def __init__(self, backend_name, backend_mapping=None, lazy=False,
**kwargs):
"""Initialize the chosen DB API backend.
:param backend_name: name of the backend to load
:type backend_name: str
:param backend_mapping: backend name -> module/class to load mapping
:type backend_mapping: dict
:param lazy: load the DB backend lazily on the first DB API method call
:type lazy: bool
Keyword arguments:
:keyword use_db_reconnect: retry DB transactions on disconnect or not
:type use_db_reconnect: bool
:keyword retry_interval: seconds between transaction retries
:type retry_interval: int
:keyword inc_retry_interval: increase retry interval or not
:type inc_retry_interval: bool
:keyword max_retry_interval: max interval value between retries
:type max_retry_interval: int
:keyword max_retries: max number of retries before an error is raised
:type max_retries: int
"""
self._backend = None
self._backend_name = backend_name
self._backend_mapping = backend_mapping or {}
self._lock = threading.Lock()
if not lazy:
self._load_backend()
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
self.retry_interval = kwargs.get('retry_interval', 1)
self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
self.max_retries = kwargs.get('max_retries', 20)
def _load_backend(self):
with self._lock:
if not self._backend:
# Import the untranslated name if we don't have a mapping
backend_path = self._backend_mapping.get(self._backend_name,
self._backend_name)
backend_mod = importutils.import_module(backend_path)
self._backend = backend_mod.get_backend()
def __getattr__(self, key):
if not self._backend:
self._load_backend()
attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'):
return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry
# on disconnect.
if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
attr = wrap_db_retry(
retry_interval=self.retry_interval,
max_retries=self.max_retries,
inc_retry_interval=self.inc_retry_interval,
max_retry_interval=self.max_retry_interval)(attr)
return attr

View File

@ -1,56 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""DB related custom exceptions."""
import six
from tacker.openstack.common.gettextutils import _
class DBError(Exception):
"""Wraps an implementation specific exception."""
def __init__(self, inner_exception=None):
self.inner_exception = inner_exception
super(DBError, self).__init__(six.text_type(inner_exception))
class DBDuplicateEntry(DBError):
"""Wraps an implementation specific exception."""
def __init__(self, columns=None, inner_exception=None):
self.columns = columns if columns else []
super(DBDuplicateEntry, self).__init__(inner_exception)
class DBDeadlock(DBError):
def __init__(self, inner_exception=None):
super(DBDeadlock, self).__init__(inner_exception)
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(message)
class DBConnectionError(DBError):
"""Wraps connection specific exception."""
pass

View File

@ -1,171 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_config import cfg
database_opts = [
cfg.StrOpt('sqlite_db',
deprecated_group='DEFAULT',
default='tacker.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
deprecated_group='DEFAULT',
default=True,
help='If True, SQLite uses synchronous mode'),
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.StrOpt('connection',
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
cfg.BoolOpt('use_db_reconnect',
default=False,
help='Enable the experimental use of database reconnect '
'on connection lost'),
cfg.IntOpt('db_retry_interval',
default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
]
CONF = cfg.CONF
CONF.register_opts(database_opts, 'database')
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
cfg.set_defaults(database_opts,
max_pool_size=max_pool_size)
if max_overflow is not None:
cfg.set_defaults(database_opts,
max_overflow=max_overflow)
if pool_timeout is not None:
cfg.set_defaults(database_opts,
pool_timeout=pool_timeout)
def list_opts():
"""Returns a list of oslo_config options available in the library.
The returned list includes all oslo_config options which may be registered
at runtime by the library.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users by this library.
:returns: a list of (group_name, opts) tuples
"""
return [('database', copy.deepcopy(database_opts))]

View File

@ -1,119 +0,0 @@
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Piston Cloud Computing, Inc.
# Copyright 2012 Cloudscaling Group, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models.
"""
import six
from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
from tacker.openstack.common import timeutils
class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
def save(self, session):
"""Save this object."""
# NOTE(boris-42): This part of code should be look like:
# session.add(self)
# session.flush()
# But there is a bug in sqlalchemy and eventlet that
# raises NoneType exception if there is no running
# transaction and rollback is called. As long as
# sqlalchemy has this bug we have to create transaction
# explicitly.
with session.begin(subtransactions=True):
session.add(self)
session.flush()
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
@property
def _extra_keys(self):
"""Specifies custom fields
Subclasses can override this property to return a list
of custom fields that should be included in their dict
representation.
For reference check tests/db/sqlalchemy/test_models.py
"""
return []
def __iter__(self):
columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
columns.extend(self._extra_keys)
self._i = iter(columns)
return self
# In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return six.iteritems(local)
class TimestampMixin(object):
created_at = Column(DateTime, default=lambda: timeutils.utcnow())
updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
class SoftDeleteMixin(object):
deleted_at = Column(DateTime)
deleted = Column(Integer, default=0)
def soft_delete(self, session):
"""Mark this object as deleted."""
self.deleted = self.id
self.deleted_at = timeutils.utcnow()
self.save(session=session)

View File

@ -1,157 +0,0 @@
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
from six import moves
import sqlalchemy
from tacker.openstack.common.db import exception as exc
LOG = logging.getLogger(__name__)
def get_engine(uri):
"""Engine creation
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
if driver == 'sqlite':
try:
os.remove(auth['database'])
except OSError:
pass
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
def main():
"""Controller to handle commands
::create: Create test user and database with random names.
::drop: Drop user and database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test '
'databases and users.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
type=int,
help='Number of databases to create.')
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases and users.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
args = parser.parse_args()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
if __name__ == "__main__":
main()

View File

@ -1,904 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend.
Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
`model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted.
.. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at
the code around quotas and `reservation_rollback()`.
Examples:
.. code:: python
def get_foo(context, foo):
return (model_query(context, models.Foo).
filter_by(foo=foo).
first())
def update_foo(context, id, newfoo):
(model_query(context, models.Foo).
filter_by(id=id).
update({'foo': newfoo}))
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
foo_ref.save()
return foo_ref
* Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's
`__exit__` handler will take care of calling `flush()` and `commit()` for
you. If using this approach, you should not explicitly call `flush()` or
`commit()`. Any error within the context of the session will cause the
session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
.. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code:: python
def create_many_foo(context, foos):
session = sessionmaker()
with session.begin():
for foo in foos:
foo_ref = models.Foo()
foo_ref.update(foo)
session.add(foo_ref)
def update_bar(context, foo_id, newbar):
session = sessionmaker()
with session.begin():
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
first())
(model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
.. note:: `update_bar` is a trivially simple example of using
``with session.begin``. Whereas `create_many_foo` is a good example of
when a transaction is needed, it is always best to use as few queries as
possible.
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
.. code:: python
def update_bar(context, foo_id, newbar):
subq = (model_query(context, models.Foo.id).
filter_by(id=foo_id).
limit(1).
subquery())
(model_query(context, models.Bar).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
For reference, this emits approximately the following SQL statement:
.. code:: sql
UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code:: python
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = sessionmaker()
try:
with session.begin():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope.
Note that this incurs more overhead in SQLAlchemy than the above means
due to nesting transactions, and it is not possible to implicitly retry
failed database operations when using this approach.
This also makes code somewhat more difficult to read and debug, because a
single database transaction spans more than one method. Error handling
becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented.
.. code:: python
def myfunc(foo):
session = sessionmaker()
with session.begin():
# do some database things
bar = _private_func(foo, session)
return bar
def _private_func(foo, session=None):
if not session:
session = sessionmaker()
with session.begin(subtransaction=True):
# do some other database things
return bar
There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary.
This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that
transaction.
* Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap
at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
has an overly broad WHERE clause, or doesn't properly use an index.
One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE.
The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables.
Enabling soft deletes:
* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example:
.. code:: python
class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass
Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted:
`model.soft_delete()` and `query.soft_delete()`.
The `model.soft_delete()` method works with a single already-fetched entry.
`query.soft_delete()` makes only one db request for all entries that
correspond to the query.
* In almost all cases you should use `query.soft_delete()`. Some examples:
.. code:: python
def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete()
if count == 0:
raise Exception("0 entries were soft deleted")
def complex_soft_delete_with_synchronization_bar(session=None):
if session is None:
session = sessionmaker()
with session.begin(subtransactions=True):
count = (model_query(BarModel).
find(some_condition).
soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we
# don't know what is going on in outer session.
if count == 0:
raise Exception("0 entries were soft deleted")
* There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same
transaction.
.. code:: python
def soft_delete_bar_model():
session = sessionmaker()
with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref
bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and
then soft delete them you should use the `query.soft_delete()` method:
.. code:: python
def soft_delete_multi_models():
session = sessionmaker()
with session.begin():
query = (model_query(BarModel, session=session).
find(some_condition))
model_refs = query.all()
# Work with model_refs
query.soft_delete(synchronize_session=False)
# synchronize_session=False should be set if there is no outer
# session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient.
.. code:: python
for bar_ref in bar_refs:
bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests.
"""
import functools
import logging
import re
import time
import six
from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from tacker.openstack.common.db import exception
from tacker.openstack.common.gettextutils import _LE, _LW
from tacker.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class SqliteForeignKeysListener(PoolListener):
"""Ensures that the foreign key constraints are enforced in SQLite.
The foreign key constraints are disabled by default in SQLite,
so the foreign key constraints will be enabled here for every
database connection
"""
def connect(self, dbapi_con, con_record):
dbapi_con.execute('pragma foreign_keys=ON')
# note(boris-42): In current versions of DB backends unique constraint
# violation messages follow the structure:
#
# sqlite:
# 1 column - (IntegrityError) column c1 is not unique
# N columns - (IntegrityError) column c1, c2, ..., N are not unique
#
# sqlite since 3.7.16:
# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
#
# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
#
# postgres:
# 1 column - (IntegrityError) duplicate key value violates unique
# constraint "users_c1_key"
# N columns - (IntegrityError) duplicate key value violates unique
# constraint "name_of_our_constraint"
#
# mysql:
# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key
# 'c1'")
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'")
#
# ibm_db_sa:
# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
# statement, UPDATE statement, or foreign key update caused by a
# DELETE statement are not valid because the primary key, unique
# constraint or unique index identified by "2" constrains table
# "NOVA.KEY_PAIRS" from having duplicate values for the index
# key.
_DUP_KEY_RE_DB = {
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
"ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
}
def _raise_if_duplicate_entry_error(integrity_error, engine_name):
"""Raise exception if two entries are duplicated.
In this function will be raised DBDuplicateEntry exception if integrity
error wrap unique constraint violation.
"""
def get_columns_from_uniq_cons_or_name(columns):
# note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
# where `t` it is table name and columns `c1`, `c2`
# are in UniqueConstraint.
uniqbase = "uniq_"
if not columns.startswith(uniqbase):
if engine_name == "postgresql":
return [columns[columns.index("_") + 1:columns.rindex("_")]]
return [columns]
return columns[len(uniqbase):].split("0")[1:]
if engine_name not in ("ibm_db_sa", "mysql", "sqlite", "postgresql"):
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
for pattern in _DUP_KEY_RE_DB[engine_name]:
match = pattern.match(integrity_error.message)
if match:
break
else:
return
# NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
# columns so we have to omit that from the DBDuplicateEntry error.
columns = ''
if engine_name != 'ibm_db_sa':
columns = match.group(1)
if engine_name == "sqlite":
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
else:
columns = get_columns_from_uniq_cons_or_name(columns)
raise exception.DBDuplicateEntry(columns, integrity_error)
# NOTE(comstud): In current versions of DB backends, Deadlock violation
# messages follow the structure:
#
# mysql:
# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '
# 'restarting transaction') <query_str> <query_args>
_DEADLOCK_RE_DB = {
"mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
}
def _raise_if_deadlock_error(operational_error, engine_name):
"""Raise exception on deadlock condition.
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
"""
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message)
if not m:
return
raise exception.DBDeadlock(operational_error)
def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(self, *args, **kwargs):
try:
assert issubclass(
self.__class__, sqlalchemy.orm.session.Session
), ('_wrap_db_error() can only be applied to methods of '
'subclasses of sqlalchemy.orm.session.Session.')
return f(self, *args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
except sqla_exc.OperationalError as e:
_raise_if_db_connection_lost(e, self.bind)
_raise_if_deadlock_error(e, self.bind.dialect.name)
# NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now.
raise
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example
# instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate
# unique constraint, from error message.
_raise_if_duplicate_entry_error(e, self.bind.dialect.name)
raise exception.DBError(e)
except Exception as e:
LOG.exception(_LE('DB exception wrapped.'))
raise exception.DBError(e)
return _wrap
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(six.text_type(item)) is not None
dbapi_con.create_function('regexp', 2, regexp)
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
time.sleep(0)
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL, PostgreSQL or DB2 connections are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
cursor = dbapi_conn.cursor()
try:
ping_sql = 'select 1'
if engine.name == 'ibm_db_sa':
# DB2 requires a table expression
ping_sql = 'select 1 from (values (1)) AS t1'
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg)
# if the database server has gone away, all connections in the pool
# have become invalid and we can safely close all of them here,
# rather than waste time on checking of every single connection
engine.dispose()
# this will be handled by SQLAlchemy and will force it to create
# a new connection and retry the original action
raise sqla_exc.DisconnectionError(msg)
else:
raise
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
"""Set the sql_mode session variable.
MySQL supports several server modes. The default is None, but sessions
may choose to enable server modes like TRADITIONAL, ANSI,
several STRICT_* modes and others.
Note: passing in '' (empty string) for sql_mode clears
the SQL mode for the session, overriding a potentially set
server default.
"""
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
def _mysql_get_effective_sql_mode(engine):
"""Returns the effective SQL mode for connections from the engine pool.
Returns ``None`` if the mode isn't available, otherwise returns the mode.
"""
# Get the real effective SQL mode. Even when unset by
# our own config, the server may still be operating in a specific
# SQL mode as set by the server configuration.
# Also note that the checkout listener will be called on execute to
# set the mode if it's registered.
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
if row is None:
return
return row[1]
def _mysql_check_effective_sql_mode(engine):
"""Logs a message based on the effective SQL mode for MySQL connections."""
realmode = _mysql_get_effective_sql_mode(engine)
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
return
LOG.debug('MySQL server mode set to %s', realmode)
# 'TRADITIONAL' mode enables several other modes, so
# we need a substring match here
if not ('TRADITIONAL' in realmode.upper() or
'STRICT_ALL_TABLES' in realmode.upper()):
LOG.warning(_LW("MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
def _mysql_set_mode_callback(engine, sql_mode):
if sql_mode is not None:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=sql_mode)
sqlalchemy.event.listen(engine, 'connect', mode_callback)
_mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
# For the db2, the error code is -30081 since the db2 is still not ready
conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
def _raise_if_db_connection_lost(error, engine):
# NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
# requires connection and cursor in incoming parameters,
# but we have no possibility to create connection if DB
# is not available, so in such case reconnect fails.
# But is_disconnect() ignores these parameters, so it
# makes sense to pass to function None as placeholder
# instead of connection and cursor.
if engine.dialect.is_disconnect(error, None, None):
raise exception.DBConnectionError(error)
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10):
"""Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": idle_timeout,
'convert_unicode': True,
}
logger = logging.getLogger('sqlalchemy.engine')
# Map SQL debug level to Python log level
if connection_debug >= 100:
logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
if sql_connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
if max_pool_size is not None:
engine_args['pool_size'] = max_pool_size
if max_overflow is not None:
engine_args['max_overflow'] = max_overflow
if pool_timeout is not None:
engine_args['pool_timeout'] = pool_timeout
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'):
ping_callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql':
if mysql_sql_mode:
_mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
_patch_mysqldb_with_stacktrace_comments()
try:
engine.connect()
except sqla_exc.OperationalError as e:
if not _is_db_connection_error(e.args[0]):
raise
remaining = max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(retry_interval)
try:
engine.connect()
break
except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \
not _is_db_connection_error(e.args[0]):
raise
return engine
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
def soft_delete(self, synchronize_session='evaluate'):
return self.update({'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()},
synchronize_session=synchronize_session)
class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching."""
@_wrap_db_error
def query(self, *args, **kwargs):
return super(Session, self).query(*args, **kwargs)
@_wrap_db_error
def flush(self, *args, **kwargs):
return super(Session, self).flush(*args, **kwargs)
@_wrap_db_error
def execute(self, *args, **kwargs):
return super(Session, self).execute(*args, **kwargs)
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)
def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries.
Patches MySQLdb.cursors.BaseCursor._do_query.
"""
import MySQLdb.cursors
import traceback
old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
def _do_query(self, q):
stack = ''
for filename, line, method, function in traceback.extract_stack():
# exclude various common things from trace
if filename.endswith('session.py') and method == '_do_query':
continue
if filename.endswith('api.py') and method == 'wrapper':
continue
if filename.endswith('utils.py') and method == '_inner':
continue
if filename.endswith('exception.py') and method == '_wrap':
continue
# db/api is just a wrapper around db/sqlalchemy/api
if filename.endswith('db/api.py'):
continue
# only trace inside tacker
index = filename.rfind('tacker')
if index == -1:
continue
stack += "File:%s:%s Method:%s() Line:%s | " \
% (filename[index:], line, method, function)
# strip trailing " | " from stack
if stack:
stack = stack[:-3]
qq = "%s /* %s */" % (q, stack)
else:
qq = q
old_mysql_do_query(self, qq)
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
class EngineFacade(object):
"""A helper class for removing of global engine instances from tacker.db.
As a library, tacker.db can't decide where to store/when to create engine
and sessionmaker instances, so this must be left for a target application.
On the other hand, in order to simplify the adoption of tacker.db changes,
we'll provide a helper class, which creates engine and sessionmaker
on its instantiation and provides get_engine()/get_session() methods
that are compatible with corresponding utility functions that currently
exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to
be global), but they will be stored in the app context, rather that in the
tacker.db context.
Note: using of this helper is completely optional and you are encouraged to
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection,
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
0=None, 100=Everything (defaults to 0)
:keyword max_pool_size: maximum number of SQL connections to keep open
in a pool (defaults to SQLAlchemy settings)
:keyword max_overflow: if set, use this value for max_overflow with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword pool_timeout: if set, use this value for pool_timeout with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
(defaults to True)
:keyword connection_trace: add python stack traces to SQL as comment
strings (defaults to False)
:keyword max_retries: maximum db connection retries during startup.
(setting -1 implies an infinite retry count)
(defaults to 10)
:keyword retry_interval: interval between retries of opening a sql
connection (defaults to 10)
"""
super(EngineFacade, self).__init__()
self._engine = create_engine(
sql_connection=sql_connection,
sqlite_fk=sqlite_fk,
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
idle_timeout=kwargs.get('idle_timeout', 3600),
connection_debug=kwargs.get('connection_debug', 0),
max_pool_size=kwargs.get('max_pool_size'),
max_overflow=kwargs.get('max_overflow'),
pool_timeout=kwargs.get('pool_timeout'),
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
connection_trace=kwargs.get('connection_trace', False),
max_retries=kwargs.get('max_retries', 10),
retry_interval=kwargs.get('retry_interval', 10))
self._session_maker = get_maker(
engine=self._engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
def get_engine(self):
"""Get the engine instance (note, that it's shared)."""
return self._engine
def get_session(self, **kwargs):
"""Get a Session instance.
If passed, keyword arguments values override the ones used when the
sessionmaker instance was created.
:keyword autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:keyword expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
for arg in kwargs:
if arg not in ('autocommit', 'expire_on_commit'):
del kwargs[arg]
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, connection_string, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo_config config instance options.
:param connection_string: SQLAlchemy connection string
:type connection_string: string
:param conf: oslo_config config instance
:type conf: oslo_config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(sql_connection=connection_string,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
**dict(conf.database.items()))

View File

@ -1,153 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import functools
import os
import fixtures
import six
from tacker.openstack.common.db.sqlalchemy import session
from tacker.openstack.common.db.sqlalchemy import utils
from tacker.openstack.common.fixture import lockutils
from tacker.openstack.common import test
class DbFixture(fixtures.Fixture):
"""Basic database fixture.
Allows to run tests on various db backends, such as SQLite, MySQL and
PostgreSQL. By default use sqlite backend. To override default backend
uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
credentials for specific backend.
"""
def _get_uri(self):
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
def __init__(self, test):
super(DbFixture, self).__init__()
self.test = test
def setUp(self):
super(DbFixture, self).setUp()
self.test.engine = session.create_engine(self._get_uri())
self.test.sessionmaker = session.get_maker(self.test.engine)
self.addCleanup(self.test.engine.dispose)
class DbTestCase(test.BaseTestCase):
"""Base class for testing of DB code.
Using `DbFixture`. Intended to be the main database test case to use all
the tests on a given backend with user defined uri. Backend specific
tests should be decorated with `backend_specific` decorator.
"""
FIXTURE = DbFixture
def setUp(self):
super(DbTestCase, self).setUp()
self.useFixture(self.FIXTURE(self))
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
def backend_specific(*dialects):
"""Decorator to skip backend specific tests on inappropriate engines.
::dialects: list of dialects names under which the test will be launched.
"""
def wrap(f):
@functools.wraps(f)
def ins_wrap(self):
if not set(dialects).issubset(ALLOWED_DIALECTS):
raise ValueError(
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
if self.engine.name not in dialects:
msg = ('The test "%s" can be run '
'only on %s. Current engine is %s.')
args = (f.__name__, ' '.join(dialects), self.engine.name)
self.skip(msg % args)
else:
return f(self)
return ins_wrap
return wrap
@six.add_metaclass(abc.ABCMeta)
class OpportunisticFixture(DbFixture):
"""Base fixture to use default CI databases.
The databases exist in OpenStack CI infrastructure. But for the
correct functioning in local environment the databases must be
created manually.
"""
DRIVER = abc.abstractproperty(lambda: None)
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
def _get_uri(self):
return utils.get_connect_string(backend=self.DRIVER,
user=self.USERNAME,
passwd=self.PASSWORD,
database=self.DBNAME)
@six.add_metaclass(abc.ABCMeta)
class OpportunisticTestCase(DbTestCase):
"""Base test case to use default CI databases.
The subclasses of the test case are running only when openstack_citest
database is available otherwise a tests will be skipped.
"""
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
# TODO(bnemec): Remove this once infra is ready for
# https://review.openstack.org/#/c/74963/ to merge.
self.useFixture(lockutils.LockFixture('opportunistic-db'))
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,
'passwd': self.FIXTURE.PASSWORD,
'database': self.FIXTURE.DBNAME}
if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
msg = '%s backend is not available.' % self.FIXTURE.DRIVER
return self.skip(msg)
super(OpportunisticTestCase, self).setUp()
class MySQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'mysql'
class PostgreSQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'postgresql'
class MySQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = MySQLOpportunisticFixture
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = PostgreSQLOpportunisticFixture

View File

@ -1,647 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010-2011 OpenStack Foundation.
# Copyright 2012 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import re
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from tacker.openstack.common import context as request_context
from tacker.openstack.common.db.sqlalchemy import models
from tacker.openstack.common.gettextutils import _, _LI, _LW
from tacker.openstack.common import timeutils
LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
# copy from glance/db/sqlalchemy/api.py
def paginate_query(query, model, limit, sort_keys, marker=None,
sort_dir=None, sort_dirs=None):
"""Returns a query with sorting / pagination criteria added.
Pagination works by requiring a unique sort_key, specified by sort_keys.
(If sort_keys is not unique, then we risk looping through values.)
We use the last row in the previous page as the 'marker' for pagination.
So we must return values that follow the passed marker in the order.
With a single-valued sort_key, this would be easy: sort_key > X.
With a compound-values sort_key, (k1, k2, k3) we must do this to repeat
the lexicographical ordering:
(k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
We also have to cope with different sort_directions.
Typically, the id of the last row is used as the client-facing pagination
marker, then the actual marker object must be fetched from the db and
passed in to us as marker.
:param query: the query object to which we should add paging/sorting
:param model: the ORM model class
:param limit: maximum number of items to return
:param sort_keys: array of attributes by which results should be sorted
:param marker: the last item of the previous page; we returns the next
results after this value.
:param sort_dir: direction in which results should be sorted (asc, desc)
:param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys
:rtype: sqlalchemy.orm.query.Query
:return: The query with sorting/pagination added.
"""
if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs))
# Default the sort direction to ascending
if sort_dirs is None and sort_dir is None:
sort_dir = 'asc'
# Ensure a per-column sort direction
if sort_dirs is None:
sort_dirs = [sort_dir for _sort_key in sort_keys]
assert(len(sort_dirs) == len(sort_keys))
# Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
try:
sort_dir_func = {
'asc': sqlalchemy.asc,
'desc': sqlalchemy.desc,
}[current_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
try:
sort_key_attr = getattr(model, current_sort_key)
except AttributeError:
raise InvalidSortKey()
query = query.order_by(sort_dir_func(sort_key_attr))
# Add pagination
if marker is not None:
marker_values = []
for sort_key in sort_keys:
v = getattr(marker, sort_key)
marker_values.append(v)
# Build up an array of sort criteria as in the docstring
criteria_list = []
for i in range(len(sort_keys)):
crit_attrs = []
for j in range(i):
model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j]))
model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i]))
else:
crit_attrs.append((model_attr > marker_values[i]))
criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria)
f = sqlalchemy.sql.or_(*criteria_list)
query = query.filter(f)
if limit is not None:
query = query.limit(limit)
return query
def _read_deleted_filter(query, db_model, read_deleted):
if 'deleted' not in db_model.__table__.columns:
raise ValueError(_("There is no `deleted` column in `%s` table. "
"Project doesn't use soft-deleted feature.")
% db_model.__name__)
default_deleted_value = db_model.__table__.c.deleted.default.arg
if read_deleted == 'no':
query = query.filter(db_model.deleted == default_deleted_value)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter(db_model.deleted != default_deleted_value)
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
return query
def _project_filter(query, db_model, context, project_only):
if project_only and 'project_id' not in db_model.__table__.columns:
raise ValueError(_("There is no `project_id` column in `%s` table.")
% db_model.__name__)
if request_context.is_user_context(context) and project_only:
if project_only == 'allow_none':
is_none = None
query = query.filter(or_(db_model.project_id == context.project_id,
db_model.project_id == is_none))
else:
query = query.filter(db_model.project_id == context.project_id)
return query
def model_query(context, model, session, args=None, project_only=False,
read_deleted=None):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: Model to query. Must be a subclass of ModelBase.
:type model: models.ModelBase
:param session: The session to use.
:type session: sqlalchemy.orm.session.Session
:param args: Arguments to query. If None - model is used.
:type args: tuple
:param project_only: If present and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
:type project_only: bool
:param read_deleted: If present, overrides context's read_deleted field.
:type read_deleted: bool
Usage:
..code:: python
result = (utils.model_query(context, models.Instance, session=session)
.filter_by(uuid=instance_uuid)
.all())
query = utils.model_query(
context, Node,
session=session,
args=(func.count(Node.id), func.sum(Node.ram))
).filter_by(project_id=project_id)
"""
if not read_deleted:
if hasattr(context, 'read_deleted'):
# NOTE(viktors): some projects use `read_deleted` attribute in
# their contexts instead of `show_deleted`.
read_deleted = context.read_deleted
else:
read_deleted = context.show_deleted
if not issubclass(model, models.ModelBase):
raise TypeError(_("model should be a subclass of ModelBase"))
query = session.query(model) if not args else session.query(*args)
query = _read_deleted_filter(query, model, read_deleted)
query = _project_filter(query, model, context, project_only)
return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
metadata = MetaData()
metadata.bind = engine
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
except KeyError:
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise ColumnError(msg % column_name)
return column
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
DEPRECATED: this function is deprecated and will be removed from tacker.db
in a few releases. Please use UniqueConstraint.drop() method directly for
sqlalchemy-migrate migration scripts.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
from migrate.changeset import UniqueConstraint
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = sqlalchemy.sql.select(
columns_for_select, group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = sqlalchemy.sql.select(
[table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
def _get_default_deleted_value(table):
if isinstance(table.c.id.type, Integer):
return 0
if isinstance(table.c.id.type, String):
return ""
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
table = get_table(migrate_engine, table_name)
insp = reflection.Inspector.from_engine(migrate_engine)
real_indexes = insp.get_indexes(table_name)
existing_index_names = dict(
[(index['name'], index['column_names']) for index in real_indexes])
# NOTE(boris-42): Restore indexes on `deleted` column
for index in indexes:
if 'deleted' not in index['column_names']:
continue
name = index['name']
if name in existing_index_names:
column_names = [table.c[c] for c in existing_index_names[name]]
old_index = Index(name, *column_names, unique=index["unique"])
old_index.drop(migrate_engine)
column_names = [table.c[c] for c in index['column_names']]
new_index = Index(index["name"], *column_names, unique=index["unique"])
new_index.create(migrate_engine)
def change_deleted_column_type_to_boolean(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_boolean_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
old_deleted = Column('old_deleted', Boolean, default=False)
old_deleted.create(table, populate_default=False)
table.update().\
where(table.c.deleted == table.c.id).\
values(old_deleted=True).\
execute()
table.c.deleted.drop()
table.c.old_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
**col_name_col_instance):
insp = reflection.Inspector.from_engine(migrate_engine)
table = get_table(migrate_engine, table_name)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', Boolean, default=0)
columns.append(column_copy)
constraints = [constraint.copy() for constraint in table.constraints]
meta = table.metadata
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
c_select = []
for c in table.c:
if c.name != "deleted":
c_select.append(c)
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
new_table.update().\
where(new_table.c.deleted == new_table.c.id).\
values(deleted=True).\
execute()
def change_deleted_column_type_to_id_type(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_id_type_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
new_deleted = Column('new_deleted', table.c.id.type,
default=_get_default_deleted_value(table))
new_deleted.create(table, populate_default=True)
deleted = True # workaround for pyflakes
table.update().\
where(table.c.deleted == deleted).\
values(new_deleted=table.c.id).\
execute()
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
**col_name_col_instance):
# NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
# constraints in sqlite DB and our `deleted` column has
# 2 check constraints. So there is only one way to remove
# these constraints:
# 1) Create new table with the same columns, constraints
# and indexes. (except deleted column).
# 2) Copy all data from old to new table.
# 3) Drop old table.
# 4) Rename new table to old table name.
insp = reflection.Inspector.from_engine(migrate_engine)
meta = MetaData(bind=migrate_engine)
table = Table(table_name, meta, autoload=True)
default_deleted_value = _get_default_deleted_value(table)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
def is_deleted_column_constraint(constraint):
# NOTE(boris-42): There is no other way to check is CheckConstraint
# associated with deleted column.
if not isinstance(constraint, CheckConstraint):
return False
sqltext = str(constraint.sqltext)
return (sqltext.endswith("deleted in (0, 1)") or
sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
constraints = []
for constraint in table.constraints:
if not is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
ins = InsertFromSelect(new_table, table.select())
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=new_table.c.id).\
execute()
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\
execute()
def get_connect_string(backend, database, user=None, passwd=None):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
"""
args = {'backend': backend,
'user': user,
'passwd': passwd,
'database': database}
if backend == 'sqlite':
template = '%(backend)s:///%(database)s'
else:
template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
return template % args
def is_backend_avail(backend, database, user=None, passwd=None):
try:
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)

View File

@ -15,6 +15,7 @@
import fixtures import fixtures
from tacker.common import config
from tacker.db import api as db_api from tacker.db import api as db_api
from tacker.db import model_base from tacker.db import model_base
from tacker.tests.unit import base from tacker.tests.unit import base
@ -45,5 +46,6 @@ class SqlFixture(fixtures.Fixture):
class SqlTestCase(base.TestCase): class SqlTestCase(base.TestCase):
def setUp(self): def setUp(self):
config.set_db_defaults()
super(SqlTestCase, self).setUp() super(SqlTestCase, self).setUp()
self.useFixture(SqlFixture()) self.useFixture(SqlFixture())