Merge "Sync latest db.sqlalchemy from oslo-incubator"

This commit is contained in:
Jenkins 2014-02-07 22:59:37 +00:00 committed by Gerrit Code Review
commit 5e08f52525
10 changed files with 1174 additions and 141 deletions

View File

@ -1534,10 +1534,10 @@
# Options defined in nova.openstack.common.db.sqlalchemy.session # Options defined in nova.openstack.common.db.sqlalchemy.session
# #
# the filename to use with sqlite (string value) # The file name to use with SQLite (string value)
#sqlite_db=nova.sqlite #sqlite_db=nova.sqlite
# If true, use synchronous mode for sqlite (boolean value) # If True, SQLite uses synchronous mode (boolean value)
#sqlite_synchronous=true #sqlite_synchronous=true
@ -2421,16 +2421,18 @@
# database (string value) # database (string value)
# Deprecated group/name - [DEFAULT]/sql_connection # Deprecated group/name - [DEFAULT]/sql_connection
# Deprecated group/name - [DATABASE]/sql_connection # Deprecated group/name - [DATABASE]/sql_connection
# Deprecated group/name - [sql]/connection
#connection=sqlite:////nova/openstack/common/db/$sqlite_db #connection=sqlite:////nova/openstack/common/db/$sqlite_db
# The SQLAlchemy connection string used to connect to the # The SQLAlchemy connection string used to connect to the
# slave database (string value) # slave database (string value)
#slave_connection= #slave_connection=
# timeout before idle sql connections are reaped (integer # Timeout before idle sql connections are reaped (integer
# value) # value)
# Deprecated group/name - [DEFAULT]/sql_idle_timeout # Deprecated group/name - [DEFAULT]/sql_idle_timeout
# Deprecated group/name - [DATABASE]/sql_idle_timeout # Deprecated group/name - [DATABASE]/sql_idle_timeout
# Deprecated group/name - [sql]/idle_timeout
#idle_timeout=3600 #idle_timeout=3600
# Minimum number of SQL connections to keep open in a pool # Minimum number of SQL connections to keep open in a pool
@ -2445,13 +2447,13 @@
# Deprecated group/name - [DATABASE]/sql_max_pool_size # Deprecated group/name - [DATABASE]/sql_max_pool_size
#max_pool_size=<None> #max_pool_size=<None>
# maximum db connection retries during startup. (setting -1 # Maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value) # implies an infinite retry count) (integer value)
# Deprecated group/name - [DEFAULT]/sql_max_retries # Deprecated group/name - [DEFAULT]/sql_max_retries
# Deprecated group/name - [DATABASE]/sql_max_retries # Deprecated group/name - [DATABASE]/sql_max_retries
#max_retries=10 #max_retries=10
# interval between retries of opening a sql connection # Interval between retries of opening a sql connection
# (integer value) # (integer value)
# Deprecated group/name - [DEFAULT]/sql_retry_interval # Deprecated group/name - [DEFAULT]/sql_retry_interval
# Deprecated group/name - [DATABASE]/reconnect_interval # Deprecated group/name - [DATABASE]/reconnect_interval

View File

@ -209,6 +209,7 @@ class Instance(BASE, NovaBase):
base_name = self.uuid base_name = self.uuid
return base_name return base_name
@property
def _extra_keys(self): def _extra_keys(self):
return ['name'] return ['name']
@ -1097,6 +1098,7 @@ class Aggregate(BASE, NovaBase):
'AggregateMetadata.deleted == 0,' 'AggregateMetadata.deleted == 0,'
'Aggregate.deleted == 0)') 'Aggregate.deleted == 0)')
@property
def _extra_keys(self): def _extra_keys(self):
return ['hosts', 'metadetails', 'availability_zone'] return ['hosts', 'metadetails', 'availability_zone']

View File

@ -0,0 +1,265 @@
# coding: utf-8
#
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Base on code in migrate/changeset/databases/sqlite.py which is under
# the following license:
#
# The MIT License
#
# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import re
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from nova.openstack.common.db import exception
from nova.openstack.common.db.sqlalchemy import session as db_session
from nova.openstack.common.gettextutils import _
get_engine = db_session.get_engine
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
This feature is needed for _recreate_table() to work properly.
Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
"""
data = table.metadata.bind.execute(
"""SELECT sql
FROM sqlite_master
WHERE
type='table' AND
name=:table_name""",
table_name=table.name
).fetchone()[0]
UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
return [
UniqueConstraint(
*[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
name=name
)
for name, cols in re.findall(UNIQUE_PATTERN, data)
]
def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
"""Recreate the table properly
Unlike the corresponding original method of sqlalchemy-migrate this one
doesn't drop existing unique constraints when creating a new one.
"""
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
# problems during copy and re-create
for index in table.indexes:
index.drop()
# reflect existing unique constraints
for uc in self._get_unique_constraints(table):
table.append_constraint(uc)
# omit given unique constraints when creating a new table if required
table.constraints = set([
cons for cons in table.constraints
if omit_uniques is None or cons.name not in omit_uniques
])
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column, delta)
table.create(bind=self.connection)
self.append(insertion_string % {'table_name': table_name})
self.execute()
self.append('DROP TABLE migration_tmp')
self.execute()
def _visit_migrate_unique_constraint(self, *p, **k):
"""Drop the given unique constraint
The corresponding original method of sqlalchemy-migrate just
raises NotImplemented error
"""
self.recreate_table(p[0].table, omit_uniques=[p[0].name])
def patch_migrate():
"""A workaround for SQLite's inability to alter things
SQLite abilities to alter tables are very limited (please read
http://www.sqlite.org/lang_altertable.html for more details).
E. g. one can't drop a column or a constraint in SQLite. The
workaround for this is to recreate the original table omitting
the corresponding constraint (or column).
sqlalchemy-migrate library has recreate_table() method that
implements this workaround, but it does it wrong:
- information about unique constraints of a table
is not retrieved. So if you have a table with one
unique constraint and a migration adding another one
you will end up with a table that has only the
latter unique constraint, and the former will be lost
- dropping of unique constraints is not supported at all
The proper way to fix this is to provide a pull-request to
sqlalchemy-migrate, but the project seems to be dead. So we
can go on with monkey-patching of the lib at least for now.
"""
# this patch is needed to ensure that recreate_table() doesn't drop
# existing unique constraints of the table when creating a new one
helper_cls = sqlite.SQLiteHelper
helper_cls.recreate_table = _recreate_table
helper_cls._get_unique_constraints = _get_unique_constraints
# this patch is needed to be able to drop existing unique constraints
constraint_cls = sqlite.SQLiteConstraintDropper
constraint_cls.visit_migrate_unique_constraint = \
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)
def db_sync(abs_path, version=None, init_version=0):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
available version.
:param init_version: Initial database version
"""
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(abs_path, init_version)
repository = _find_migrate_repo(abs_path)
_db_schema_sanity_check()
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
version)
def _db_schema_sanity_check():
engine = get_engine()
if engine.name == 'mysql':
onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
'from information_schema.TABLES '
'where TABLE_SCHEMA=%s and '
'TABLE_COLLATION NOT LIKE "%%utf8%%"')
table_names = [res[0] for res in engine.execute(onlyutf8_sql,
engine.url.database)]
if len(table_names) > 0:
raise ValueError(_('Tables "%s" have non utf8 collation, '
'please make sure all tables are CHARSET=utf8'
) % ','.join(table_names))
def db_version(abs_path, init_version):
"""Show the current version of the repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(get_engine(), repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0 or 'alembic_version' in tables:
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
else:
raise exception.DbMigrationError(
message=_(
"The database is not under version control, but has "
"tables. Please stamp the current version of the schema "
"manually."))
def db_version_control(abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(get_engine(), repository, version)
return version
def _find_migrate_repo(abs_path):
"""Get the project's change script repository
:param abs_path: Absolute path to migrate repository
"""
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
return Repository(abs_path)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. # Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
@ -41,13 +39,13 @@ class ModelBase(object):
if not session: if not session:
session = sa.get_session() session = sa.get_session()
# NOTE(boris-42): This part of code should be look like: # NOTE(boris-42): This part of code should be look like:
# sesssion.add(self) # session.add(self)
# session.flush() # session.flush()
# But there is a bug in sqlalchemy and eventlet that # But there is a bug in sqlalchemy and eventlet that
# raises NoneType exception if there is no running # raises NoneType exception if there is no running
# transaction and rollback is called. As long as # transaction and rollback is called. As long as
# sqlalchemy has this bug we have to create transaction # sqlalchemy has this bug we have to create transaction
# explicity. # explicitly.
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
session.add(self) session.add(self)
session.flush() session.flush()
@ -61,13 +59,24 @@ class ModelBase(object):
def get(self, key, default=None): def get(self, key, default=None):
return getattr(self, key, default) return getattr(self, key, default)
@property
def _extra_keys(self):
"""Specifies custom fields
Subclasses can override this property to return a list
of custom fields that should be included in their dict
representation.
For reference check tests/db/sqlalchemy/test_models.py
"""
return []
def __iter__(self): def __iter__(self):
columns = dict(object_mapper(self).columns).keys() columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked # NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name' # up, beyond the actual db columns. An example would be the 'name'
# property for an Instance. # property for an Instance.
if hasattr(self, '_extra_keys'): columns.extend(self._extra_keys)
columns.extend(self._extra_keys())
self._i = iter(columns) self._i = iter(columns)
return self return self
@ -89,12 +98,12 @@ class ModelBase(object):
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_']) if not k[0] == '_'])
local.update(joined) local.update(joined)
return local.iteritems() return six.iteritems(local)
class TimestampMixin(object): class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow) created_at = Column(DateTime, default=lambda: timeutils.utcnow())
updated_at = Column(DateTime, onupdate=timeutils.utcnow) updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
class SoftDeleteMixin(object): class SoftDeleteMixin(object):

View File

@ -0,0 +1,187 @@
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provision test environment for specific DB backends"""
import argparse
import os
import random
import string
from six import moves
import sqlalchemy
from nova.openstack.common.db import exception as exc
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
def _gen_credentials(*names):
"""Generate credentials."""
auth_dict = {}
for name in names:
val = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
auth_dict[name] = val
return auth_dict
def _get_engine(uri=SQL_CONNECTION):
"""Engine creation
By default the uri is SQL_CONNECTION which is admin credentials.
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
raise exc.DBConnectionError(msg % SQL_CONNECTION)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = _gen_credentials('database', 'user', 'passwd')
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"grant all on %(database)s.* to '%(user)s'@'localhost'"
" identified by '%(passwd)s';",
"create database %(database)s;",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
"create user %(user)s with password '%(passwd)s';",
"create database %(database)s owner %(user)s;",
]
}
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
try:
sql_rows = sqls[driver]
except KeyError:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(engine, sql_query, driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = _get_engine(current_uri)
admin_engine = _get_engine()
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
if driver == 'sqlite':
try:
os.remove(auth['database'])
except OSError:
pass
return
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"drop user '%(user)s'@'localhost';",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
]
}
try:
sql_rows = sqls[driver]
except KeyError:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(admin_engine, sql_query, driver)
def main():
"""Controller to handle commands
::create: Create test user and database with random names.
::drop: Drop user and database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test '
'databases and users.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
type=int,
help='Number of databases to create.')
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases and users.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
args = parser.parse_args()
engine = _get_engine()
which = args.which
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
if __name__ == "__main__":
main()

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -20,41 +18,45 @@
Initializing: Initializing:
* Call set_defaults with the minimal of the following kwargs: * Call `set_defaults()` with the minimal of the following kwargs:
sql_connection, sqlite_db ``sql_connection``, ``sqlite_db``
Example: Example:
.. code:: python
session.set_defaults( session.set_defaults(
sql_connection="sqlite:///var/lib/nova/sqlite.db", sql_connection="sqlite:///var/lib/nova/sqlite.db",
sqlite_db="/var/lib/nova/sqlite.db") sqlite_db="/var/lib/nova/sqlite.db")
Recommended ways to use sessions within this framework: Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with AUTOCOMMIT=1. * Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
model_query() will implicitly use a session when called without one `model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted. to be automatically retried if the database connection is interrupted.
Note: Automatic retry will be enabled in a future patch. .. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at functionality should be handled at a logical level. For an example, look at
the code around quotas and reservation_rollback(). the code around quotas and `reservation_rollback()`.
Examples: Examples:
.. code:: python
def get_foo(context, foo): def get_foo(context, foo):
return model_query(context, models.Foo).\ return (model_query(context, models.Foo).
filter_by(foo=foo).\ filter_by(foo=foo).
first() first())
def update_foo(context, id, newfoo): def update_foo(context, id, newfoo):
model_query(context, models.Foo).\ (model_query(context, models.Foo).
filter_by(id=id).\ filter_by(id=id).
update({'foo': newfoo}) update({'foo': newfoo}))
def create_foo(context, values): def create_foo(context, values):
foo_ref = models.Foo() foo_ref = models.Foo()
@ -63,18 +65,26 @@ Recommended ways to use sessions within this framework:
return foo_ref return foo_ref
* Within the scope of a single method, keeping all the reads and writes within * Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's __exit__ the context managed by a single session. In this way, the session's
handler will take care of calling flush() and commit() for you. `__exit__` handler will take care of calling `flush()` and `commit()` for
If using this approach, you should not explicitly call flush() or commit(). you. If using this approach, you should not explicitly call `flush()` or
Any error within the context of the session will cause the session to emit `commit()`. Any error within the context of the session will cause the
a ROLLBACK. If the connection is dropped before this is possible, the session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
database will implicitly rollback the transaction. raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
Note: statements in the session scope will not be automatically retried. .. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you If you create models within the session, they need to be added, but you
do not need to call model.save() do not need to call `model.save()`:
.. code:: python
def create_many_foo(context, foos): def create_many_foo(context, foos):
session = get_session() session = get_session()
@ -87,36 +97,60 @@ Recommended ways to use sessions within this framework:
def update_bar(context, foo_id, newbar): def update_bar(context, foo_id, newbar):
session = get_session() session = get_session()
with session.begin(): with session.begin():
foo_ref = model_query(context, models.Foo, session).\ foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).\ filter_by(id=foo_id).
first() first())
model_query(context, models.Bar, session).\ (model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).\ filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}) update({'bar': newbar}))
Note: update_bar is a trivially simple example of using "with session.begin". .. note:: `update_bar` is a trivially simple example of using
Whereas create_many_foo is a good example of when a transaction is needed, ``with session.begin``. Whereas `create_many_foo` is a good example of
it is always best to use as few queries as possible. The two queries in when a transaction is needed, it is always best to use as few queries as
update_bar can be better expressed using a single query which avoids possible.
the need for an explicit transaction. It can be expressed like so:
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
.. code:: python
def update_bar(context, foo_id, newbar): def update_bar(context, foo_id, newbar):
subq = model_query(context, models.Foo.id).\ subq = (model_query(context, models.Foo.id).
filter_by(id=foo_id).\ filter_by(id=foo_id).
limit(1).\ limit(1).
subquery() subquery())
model_query(context, models.Bar).\ (model_query(context, models.Bar).
filter_by(id=subq.as_scalar()).\ filter_by(id=subq.as_scalar()).
update({'bar': newbar}) update({'bar': newbar}))
For reference, this emits approximagely the following SQL statement: For reference, this emits approximately the following SQL statement::
UPDATE bar SET bar = ${newbar} UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code:: python
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = get_session()
try:
with session.begin():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed * Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call session.begin() on an existing SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope. not be involved in sessions within the caller's scope.
@ -129,6 +163,8 @@ Recommended ways to use sessions within this framework:
becomes less clear in this situation. When this is needed for code clarity, becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented. it should be clearly documented.
.. code:: python
def myfunc(foo): def myfunc(foo):
session = get_session() session = get_session()
with session.begin(): with session.begin():
@ -148,13 +184,13 @@ There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary. * Don't keep a transaction open any longer than necessary.
This means that your "with session.begin()" block should be as short This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that as possible, while still containing all the related calls for that
transaction. transaction.
* Avoid "with_lockmode('UPDATE')" when possible. * Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space. "gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap This can effectively prevent any INSERT into a table by locking the gap
@ -165,16 +201,19 @@ There are some things which it is best to avoid:
number of rows matching a query, and if only one row is returned, number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE. then issue the SELECT FOR UPDATE.
The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables. proper UNIQUE constraints are added to the tables.
Enabling soft deletes: Enabling soft deletes:
* To use/enable soft-deletes, the SoftDeleteMixin must be added * To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example: to your model class. For example:
.. code:: python
class NovaBase(models.SoftDeleteMixin, models.ModelBase): class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass pass
@ -182,13 +221,15 @@ Enabling soft deletes:
Efficient use of soft deletes: Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted: * There are two possible ways to mark a record as deleted:
model.soft_delete() and query.soft_delete(). `model.soft_delete()` and `query.soft_delete()`.
model.soft_delete() method works with single already fetched entry. The `model.soft_delete()` method works with a single already-fetched entry.
query.soft_delete() makes only one db request for all entries that correspond `query.soft_delete()` makes only one db request for all entries that
to query. correspond to the query.
* In almost all cases you should use query.soft_delete(). Some examples: * In almost all cases you should use `query.soft_delete()`. Some examples:
.. code:: python
def soft_delete_bar(): def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete() count = model_query(BarModel).find(some_condition).soft_delete()
@ -199,18 +240,20 @@ Efficient use of soft deletes:
if session is None: if session is None:
session = get_session() session = get_session()
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
count = model_query(BarModel).\ count = (model_query(BarModel).
find(some_condition).\ find(some_condition).
soft_delete(synchronize_session=True) soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we # Here synchronize_session is required, because we
# don't know what is going on in outer session. # don't know what is going on in outer session.
if count == 0: if count == 0:
raise Exception("0 entries were soft deleted") raise Exception("0 entries were soft deleted")
* There is only one situation where model.soft_delete() is appropriate: when * There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same you fetch a single record, work with it, and mark it as deleted in the same
transaction. transaction.
.. code:: python
def soft_delete_bar_model(): def soft_delete_bar_model():
session = get_session() session = get_session()
with session.begin(): with session.begin():
@ -219,13 +262,15 @@ Efficient use of soft deletes:
bar_ref.soft_delete(session=session) bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and However, if you need to work with all entries that correspond to query and
then soft delete them you should use query.soft_delete() method: then soft delete them you should use the `query.soft_delete()` method:
.. code:: python
def soft_delete_multi_models(): def soft_delete_multi_models():
session = get_session() session = get_session()
with session.begin(): with session.begin():
query = model_query(BarModel, session=session).\ query = (model_query(BarModel, session=session).
find(some_condition) find(some_condition))
model_refs = query.all() model_refs = query.all()
# Work with model_refs # Work with model_refs
query.soft_delete(synchronize_session=False) query.soft_delete(synchronize_session=False)
@ -233,19 +278,23 @@ Efficient use of soft deletes:
# session and these entries are not used after this. # session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete, When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using model.soft_delete(), as in the following which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient. example, is very inefficient.
.. code:: python
for bar_ref in bar_refs: for bar_ref in bar_refs:
bar_ref.soft_delete(session=session) bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests. # This will produce count(bar_refs) db requests.
""" """
import functools
import logging
import os.path import os.path
import re import re
import time import time
from eventlet import greenthread
from oslo.config import cfg from oslo.config import cfg
import six import six
from sqlalchemy import exc as sqla_exc from sqlalchemy import exc as sqla_exc
@ -255,17 +304,16 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import literal_column
from nova.openstack.common.db import exception from nova.openstack.common.db import exception
from nova.openstack.common.gettextutils import _ # noqa from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils from nova.openstack.common import timeutils
sqlite_db_opts = [ sqlite_db_opts = [
cfg.StrOpt('sqlite_db', cfg.StrOpt('sqlite_db',
default='nova.sqlite', default='nova.sqlite',
help='the filename to use with sqlite'), help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous', cfg.BoolOpt('sqlite_synchronous',
default=True, default=True,
help='If true, use synchronous mode for sqlite'), help='If True, SQLite uses synchronous mode'),
] ]
database_opts = [ database_opts = [
@ -275,23 +323,27 @@ database_opts = [
'../', '$sqlite_db')), '../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'database', 'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection', deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'), group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection', cfg.DeprecatedOpt('sql_connection',
group='DATABASE')], group='DATABASE'),
secret=True), cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection', cfg.StrOpt('slave_connection',
default='', default='',
secret=True,
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'slave database', 'slave database'),
secret=True),
cfg.IntOpt('idle_timeout', cfg.IntOpt('idle_timeout',
default=3600, default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'), group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout', cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE')], group='DATABASE'),
help='timeout before idle sql connections are reaped'), cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size', cfg.IntOpt('min_pool_size',
default=1, default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
@ -314,7 +366,7 @@ database_opts = [
group='DEFAULT'), group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries', cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')], group='DATABASE')],
help='maximum db connection retries during startup. ' help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'), '(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval', cfg.IntOpt('retry_interval',
default=10, default=10,
@ -322,7 +374,7 @@ database_opts = [
group='DEFAULT'), group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval', cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')], group='DATABASE')],
help='interval between retries of opening a sql connection'), help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow', cfg.IntOpt('max_overflow',
default=None, default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
@ -408,8 +460,8 @@ class SqliteForeignKeysListener(PoolListener):
dbapi_con.execute('pragma foreign_keys=ON') dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False, def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
sqlite_fk=False, slave_session=False): slave_session=False, mysql_traditional_mode=False):
"""Return a SQLAlchemy session.""" """Return a SQLAlchemy session."""
global _MAKER global _MAKER
global _SLAVE_MAKER global _SLAVE_MAKER
@ -419,7 +471,8 @@ def get_session(autocommit=True, expire_on_commit=False,
maker = _SLAVE_MAKER maker = _SLAVE_MAKER
if maker is None: if maker is None:
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session) engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
mysql_traditional_mode=mysql_traditional_mode)
maker = get_maker(engine, autocommit, expire_on_commit) maker = get_maker(engine, autocommit, expire_on_commit)
if slave_session: if slave_session:
@ -438,6 +491,11 @@ def get_session(autocommit=True, expire_on_commit=False,
# 1 column - (IntegrityError) column c1 is not unique # 1 column - (IntegrityError) column c1 is not unique
# N columns - (IntegrityError) column c1, c2, ..., N are not unique # N columns - (IntegrityError) column c1, c2, ..., N are not unique
# #
# sqlite since 3.7.16:
# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
#
# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
#
# postgres: # postgres:
# 1 column - (IntegrityError) duplicate key value violates unique # 1 column - (IntegrityError) duplicate key value violates unique
# constraint "users_c1_key" # constraint "users_c1_key"
@ -450,9 +508,10 @@ def get_session(autocommit=True, expire_on_commit=False,
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'") # with -' for key 'name_of_our_constraint'")
_DUP_KEY_RE_DB = { _DUP_KEY_RE_DB = {
"sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
"postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
} }
@ -477,13 +536,22 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
if engine_name not in ["mysql", "sqlite", "postgresql"]: if engine_name not in ["mysql", "sqlite", "postgresql"]:
return return
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) # FIXME(johannes): The usage of the .message attribute has been
if not m: # deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
for pattern in _DUP_KEY_RE_DB[engine_name]:
match = pattern.match(integrity_error.message)
if match:
break
else:
return return
columns = m.group(1)
columns = match.group(1)
if engine_name == "sqlite": if engine_name == "sqlite":
columns = columns.strip().split(", ") columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
else: else:
columns = get_columns_from_uniq_cons_or_name(columns) columns = get_columns_from_uniq_cons_or_name(columns)
raise exception.DBDuplicateEntry(columns, integrity_error) raise exception.DBDuplicateEntry(columns, integrity_error)
@ -509,6 +577,11 @@ def _raise_if_deadlock_error(operational_error, engine_name):
re = _DEADLOCK_RE_DB.get(engine_name) re = _DEADLOCK_RE_DB.get(engine_name)
if re is None: if re is None:
return return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message) m = re.match(operational_error.message)
if not m: if not m:
return return
@ -516,6 +589,7 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f): def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(*args, **kwargs): def _wrap(*args, **kwargs):
try: try:
return f(*args, **kwargs) return f(*args, **kwargs)
@ -540,11 +614,11 @@ def _wrap_db_error(f):
except Exception as e: except Exception as e:
LOG.exception(_('DB exception wrapped.')) LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e) raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap return _wrap
def get_engine(sqlite_fk=False, slave_engine=False): def get_engine(sqlite_fk=False, slave_engine=False,
mysql_traditional_mode=False):
"""Return a SQLAlchemy engine.""" """Return a SQLAlchemy engine."""
global _ENGINE global _ENGINE
global _SLAVE_ENGINE global _SLAVE_ENGINE
@ -556,8 +630,8 @@ def get_engine(sqlite_fk=False, slave_engine=False):
db_uri = CONF.database.slave_connection db_uri = CONF.database.slave_connection
if engine is None: if engine is None:
engine = create_engine(db_uri, engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
sqlite_fk=sqlite_fk) mysql_traditional_mode=mysql_traditional_mode)
if slave_engine: if slave_engine:
_SLAVE_ENGINE = engine _SLAVE_ENGINE = engine
else: else:
@ -580,45 +654,65 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp) dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record): def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed. """Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch. implemented by C libraries that eventlet cannot monkey patch.
""" """
greenthread.sleep(0) time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy): def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the pool are alive. """Ensures that MySQL and DB2 connections are alive.
Borrowed from: Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
""" """
cursor = dbapi_conn.cursor()
try: try:
dbapi_conn.cursor().execute('select 1') ping_sql = 'select 1'
except dbapi_conn.OperationalError as ex: if engine.name == 'ibm_db_sa':
if ex.args[0] in (2006, 2013, 2014, 2045, 2055): # DB2 requires a table expression
LOG.warn(_('Got mysql server has gone away: %s'), ex) ping_sql = 'select 1 from (values (1)) AS t1'
raise sqla_exc.DisconnectionError("Database server went away") cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex
LOG.warning(msg)
raise sqla_exc.DisconnectionError(msg)
else: else:
raise raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
"""Set engine mode to 'traditional'.
Required to prevent silent truncates at insert or update operations
under MySQL. By default MySQL truncates inserted string if it longer
than a declared field just with warning. That is fraught with data
corruption.
"""
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
def _is_db_connection_error(args): def _is_db_connection_error(args):
"""Return True if error in connecting to db.""" """Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended # NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others. # to support Postgres and others.
# For the db2, the error code is -30081 since the db2 is still not ready # For the db2, the error code is -30081 since the db2 is still not ready
conn_err_codes = ('2002', '2003', '2006', '-30081') conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
for err_code in conn_err_codes: for err_code in conn_err_codes:
if args.find(err_code) != -1: if args.find(err_code) != -1:
return True return True
return False return False
def create_engine(sql_connection, sqlite_fk=False): def create_engine(sql_connection, sqlite_fk=False,
mysql_traditional_mode=False):
"""Return a new SQLAlchemy engine.""" """Return a new SQLAlchemy engine."""
# NOTE(geekinutah): At this point we could be connecting to the normal # NOTE(geekinutah): At this point we could be connecting to the normal
# db handle or the slave db handle. Things like # db handle or the slave db handle. Things like
@ -657,10 +751,21 @@ def create_engine(sql_connection, sqlite_fk=False):
engine = sqlalchemy.create_engine(sql_connection, **engine_args) engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername: if engine.name in ['mysql', 'ibm_db_sa']:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener) callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
if engine.name == 'mysql':
if mysql_traditional_mode:
sqlalchemy.event.listen(engine, 'checkout',
_set_mode_traditional)
else:
LOG.warning(_("This application has not enabled MySQL "
"traditional mode, which means silent "
"data corruption may occur. "
"Please encourage the application "
"developers to enable this mode."))
elif 'sqlite' in connection_dict.drivername: elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous: if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect', sqlalchemy.event.listen(engine, 'connect',
@ -682,7 +787,7 @@ def create_engine(sql_connection, sqlite_fk=False):
remaining = 'infinite' remaining = 'infinite'
while True: while True:
msg = _('SQL connection failed. %s attempts left.') msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining) LOG.warning(msg % remaining)
if remaining != 'infinite': if remaining != 'infinite':
remaining -= 1 remaining -= 1
time.sleep(CONF.database.retry_interval) time.sleep(CONF.database.retry_interval)
@ -741,25 +846,25 @@ def _patch_mysqldb_with_stacktrace_comments():
def _do_query(self, q): def _do_query(self, q):
stack = '' stack = ''
for file, line, method, function in traceback.extract_stack(): for filename, line, method, function in traceback.extract_stack():
# exclude various common things from trace # exclude various common things from trace
if file.endswith('session.py') and method == '_do_query': if filename.endswith('session.py') and method == '_do_query':
continue continue
if file.endswith('api.py') and method == 'wrapper': if filename.endswith('api.py') and method == 'wrapper':
continue continue
if file.endswith('utils.py') and method == '_inner': if filename.endswith('utils.py') and method == '_inner':
continue continue
if file.endswith('exception.py') and method == '_wrap': if filename.endswith('exception.py') and method == '_wrap':
continue continue
# db/api is just a wrapper around db/sqlalchemy/api # db/api is just a wrapper around db/sqlalchemy/api
if file.endswith('db/api.py'): if filename.endswith('db/api.py'):
continue continue
# only trace inside nova # only trace inside nova
index = file.rfind('nova') index = filename.rfind('nova')
if index == -1: if index == -1:
continue continue
stack += "File:%s:%s Method:%s() Line:%s | " \ stack += "File:%s:%s Method:%s() Line:%s | " \
% (file[index:], line, method, function) % (filename[index:], line, method, function)
# strip trailing " | " from stack # strip trailing " | " from stack
if stack: if stack:

View File

@ -0,0 +1,154 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import functools
import os
import fixtures
from oslo.config import cfg
import six
from nova.openstack.common.db.sqlalchemy import session
from nova.openstack.common.db.sqlalchemy import utils
from nova.openstack.common import test
class DbFixture(fixtures.Fixture):
"""Basic database fixture.
Allows to run tests on various db backends, such as SQLite, MySQL and
PostgreSQL. By default use sqlite backend. To override default backend
uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
credentials for specific backend.
"""
def _get_uri(self):
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
def __init__(self):
super(DbFixture, self).__init__()
self.conf = cfg.CONF
self.conf.import_opt('connection',
'nova.openstack.common.db.sqlalchemy.session',
group='database')
def setUp(self):
super(DbFixture, self).setUp()
self.conf.set_default('connection', self._get_uri(), group='database')
self.addCleanup(self.conf.reset)
class DbTestCase(test.BaseTestCase):
"""Base class for testing of DB code.
Using `DbFixture`. Intended to be the main database test case to use all
the tests on a given backend with user defined uri. Backend specific
tests should be decorated with `backend_specific` decorator.
"""
FIXTURE = DbFixture
def setUp(self):
super(DbTestCase, self).setUp()
self.useFixture(self.FIXTURE())
self.addCleanup(session.cleanup)
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
def backend_specific(*dialects):
"""Decorator to skip backend specific tests on inappropriate engines.
::dialects: list of dialects names under which the test will be launched.
"""
def wrap(f):
@functools.wraps(f)
def ins_wrap(self):
if not set(dialects).issubset(ALLOWED_DIALECTS):
raise ValueError(
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
engine = session.get_engine()
if engine.name not in dialects:
msg = ('The test "%s" can be run '
'only on %s. Current engine is %s.')
args = (f.__name__, ' '.join(dialects), engine.name)
self.skip(msg % args)
else:
return f(self)
return ins_wrap
return wrap
@six.add_metaclass(abc.ABCMeta)
class OpportunisticFixture(DbFixture):
"""Base fixture to use default CI databases.
The databases exist in OpenStack CI infrastructure. But for the
correct functioning in local environment the databases must be
created manually.
"""
DRIVER = abc.abstractproperty(lambda: None)
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
def _get_uri(self):
return utils.get_connect_string(backend=self.DRIVER,
user=self.USERNAME,
passwd=self.PASSWORD,
database=self.DBNAME)
@six.add_metaclass(abc.ABCMeta)
class OpportunisticTestCase(DbTestCase):
"""Base test case to use default CI databases.
The subclasses of the test case are running only when openstack_citest
database is available otherwise a tests will be skipped.
"""
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,
'passwd': self.FIXTURE.PASSWORD,
'database': self.FIXTURE.DBNAME}
if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
msg = '%s backend is not available.' % self.FIXTURE.DRIVER
return self.skip(msg)
super(OpportunisticTestCase, self).setUp()
class MySQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'mysql'
class PostgreSQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'postgresql'
class MySQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = MySQLOpportunisticFixture
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = PostgreSQLOpportunisticFixture

View File

@ -0,0 +1,269 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright 2012-2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import os
import subprocess
import lockfile
from six import moves
import sqlalchemy
import sqlalchemy.exc
from nova.openstack.common.db.sqlalchemy import utils
from nova.openstack.common.gettextutils import _
from nova.openstack.common.py3kcompat import urlutils
from nova.openstack.common import test
LOG = logging.getLogger(__name__)
def _have_mysql(user, passwd, database):
present = os.environ.get('TEST_MYSQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='mysql',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _have_postgresql(user, passwd, database):
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='postgres',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _set_db_lock(lock_path=None, lock_prefix=None):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
path = lock_path or os.environ.get("NOVA_LOCK_PATH")
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
with lock:
LOG.debug(_('Got lock "%s"') % f.__name__)
return f(*args, **kwargs)
finally:
LOG.debug(_('Lock released "%s"') % f.__name__)
return wrapper
return decorator
class BaseMigrationTestCase(test.BaseTestCase):
"""Base class fort testing of migration utils."""
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.test_databases = {}
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = moves.configparser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except moves.configparser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
self._reset_databases()
def tearDown(self):
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
self._reset_databases()
super(BaseMigrationTestCase, self).tearDown()
def execute_cmd(self, cmd=None):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.communicate()[0]
LOG.debug(output)
self.assertEqual(0, process.returncode,
"Failed to run: %s\n%s" % (cmd, output))
def _reset_pg(self, conn_pieces):
(user,
password,
database,
host) = utils.get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %s;") % database
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(droptable)
sql = ("create database %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
@_set_db_lock(lock_prefix='migration_tests-')
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = urlutils.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = \
utils.get_db_connection_info(conn_pieces)
sql = ("drop database if exists %(db)s; "
"create database %(db)s;") % {'db': database}
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself
# upgrades successfully.
# Place the database under version control
self.migration_api.version_control(engine, self.REPOSITORY,
self.INIT_VERSION)
self.assertEqual(self.INIT_VERSION,
self.migration_api.db_version(engine,
self.REPOSITORY))
LOG.debug('latest version is %s' % self.REPOSITORY.latest)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
for version in versions:
# upgrade -> downgrade -> upgrade
self._migrate_up(engine, version, with_data=True)
if snake_walk:
downgraded = self._migrate_down(
engine, version - 1, with_data=True)
if downgraded:
self._migrate_up(engine, version)
if downgrade:
# Now walk it back down to 0 from the latest, testing
# the downgrade paths.
for version in reversed(versions):
# downgrade -> upgrade -> downgrade
downgraded = self._migrate_down(engine, version - 1)
if snake_walk and downgraded:
self._migrate_up(engine, version)
self._migrate_down(engine, version - 1)
def _migrate_down(self, engine, version, with_data=False):
try:
self.migration_api.downgrade(engine, self.REPOSITORY, version)
except NotImplementedError:
# NOTE(sirp): some migrations, namely release-level
# migrations, don't support a downgrade.
return False
self.assertEqual(
version, self.migration_api.db_version(engine, self.REPOSITORY))
# NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
# version). So if we have any downgrade checks, they need to be run for
# the previous (higher numbered) migration.
if with_data:
post_downgrade = getattr(
self, "_post_downgrade_%03d" % (version + 1), None)
if post_downgrade:
post_downgrade(engine)
return True
def _migrate_up(self, engine, version, with_data=False):
"""migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _pre_upgrade_### and
_check_### functions in the main test.
"""
# NOTE(sdague): try block is here because it's impossible to debug
# where a failed data migration happens otherwise
try:
if with_data:
data = None
pre_upgrade = getattr(
self, "_pre_upgrade_%03d" % version, None)
if pre_upgrade:
data = pre_upgrade(engine)
self.migration_api.upgrade(engine, self.REPOSITORY, version)
self.assertEqual(version,
self.migration_api.db_version(engine,
self.REPOSITORY))
if with_data:
check = getattr(self, "_check_%03d" % version, None)
if check:
check(engine, data)
except Exception:
LOG.error("Failed to migrate to version %s on engine %s" %
(version, engine))
raise

View File

@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging
import re import re
from migrate.changeset import UniqueConstraint from migrate.changeset import UniqueConstraint
@ -36,9 +37,7 @@ from sqlalchemy import String
from sqlalchemy import Table from sqlalchemy import Table
from sqlalchemy.types import NullType from sqlalchemy.types import NullType
from nova.openstack.common.gettextutils import _ # noqa from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils from nova.openstack.common import timeutils
@ -497,3 +496,52 @@ def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
where(new_table.c.deleted == deleted).\ where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\ values(deleted=default_deleted_value).\
execute() execute()
def get_connect_string(backend, database, user=None, passwd=None):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
"""
args = {'backend': backend,
'user': user,
'passwd': passwd,
'database': database}
if backend == 'sqlite':
template = '%(backend)s:///%(database)s'
else:
template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
return template % args
def is_backend_avail(backend, database, user=None, passwd=None):
try:
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)

View File

@ -4081,14 +4081,9 @@ class VolumeUsageDBApiTestCase(test.TestCase):
def test_vol_usage_update_no_totals_update(self): def test_vol_usage_update_no_totals_update(self):
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
now = timeutils.utcnow() now = timeutils.utcnow()
timeutils.set_time_override(now)
start_time = now - datetime.timedelta(seconds=10) start_time = now - datetime.timedelta(seconds=10)
self.mox.StubOutWithMock(timeutils, 'utcnow')
timeutils.utcnow().AndReturn(now)
timeutils.utcnow().AndReturn(now)
timeutils.utcnow().AndReturn(now)
self.mox.ReplayAll()
expected_vol_usages = { expected_vol_usages = {
u'1': {'volume_id': u'1', u'1': {'volume_id': u'1',
'instance_uuid': 'fake-instance-uuid1', 'instance_uuid': 'fake-instance-uuid1',
@ -4154,17 +4149,11 @@ class VolumeUsageDBApiTestCase(test.TestCase):
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
now = datetime.datetime(1, 1, 1, 1, 0, 0) now = datetime.datetime(1, 1, 1, 1, 0, 0)
start_time = now - datetime.timedelta(seconds=10) start_time = now - datetime.timedelta(seconds=10)
self.mox.StubOutWithMock(timeutils, 'utcnow')
timeutils.utcnow().AndReturn(now)
now1 = now + datetime.timedelta(minutes=1) now1 = now + datetime.timedelta(minutes=1)
timeutils.utcnow().AndReturn(now1)
now2 = now + datetime.timedelta(minutes=2) now2 = now + datetime.timedelta(minutes=2)
timeutils.utcnow().AndReturn(now2)
now3 = now + datetime.timedelta(minutes=3) now3 = now + datetime.timedelta(minutes=3)
timeutils.utcnow().AndReturn(now3)
self.mox.ReplayAll()
timeutils.set_time_override(now)
db.vol_usage_update(ctxt, u'1', rd_req=100, rd_bytes=200, db.vol_usage_update(ctxt, u'1', rd_req=100, rd_bytes=200,
wr_req=300, wr_bytes=400, wr_req=300, wr_bytes=400,
instance_id='fake-instance-uuid', instance_id='fake-instance-uuid',
@ -4175,6 +4164,7 @@ class VolumeUsageDBApiTestCase(test.TestCase):
self.assertEqual(current_usage['tot_reads'], 0) self.assertEqual(current_usage['tot_reads'], 0)
self.assertEqual(current_usage['curr_reads'], 100) self.assertEqual(current_usage['curr_reads'], 100)
timeutils.set_time_override(now1)
db.vol_usage_update(ctxt, u'1', rd_req=200, rd_bytes=300, db.vol_usage_update(ctxt, u'1', rd_req=200, rd_bytes=300,
wr_req=400, wr_bytes=500, wr_req=400, wr_bytes=500,
instance_id='fake-instance-uuid', instance_id='fake-instance-uuid',
@ -4186,6 +4176,7 @@ class VolumeUsageDBApiTestCase(test.TestCase):
self.assertEqual(current_usage['tot_reads'], 200) self.assertEqual(current_usage['tot_reads'], 200)
self.assertEqual(current_usage['curr_reads'], 0) self.assertEqual(current_usage['curr_reads'], 0)
timeutils.set_time_override(now2)
db.vol_usage_update(ctxt, u'1', rd_req=300, rd_bytes=400, db.vol_usage_update(ctxt, u'1', rd_req=300, rd_bytes=400,
wr_req=500, wr_bytes=600, wr_req=500, wr_bytes=600,
instance_id='fake-instance-uuid', instance_id='fake-instance-uuid',
@ -4196,6 +4187,7 @@ class VolumeUsageDBApiTestCase(test.TestCase):
self.assertEqual(current_usage['tot_reads'], 200) self.assertEqual(current_usage['tot_reads'], 200)
self.assertEqual(current_usage['curr_reads'], 300) self.assertEqual(current_usage['curr_reads'], 300)
timeutils.set_time_override(now3)
db.vol_usage_update(ctxt, u'1', rd_req=400, rd_bytes=500, db.vol_usage_update(ctxt, u'1', rd_req=400, rd_bytes=500,
wr_req=600, wr_bytes=700, wr_req=600, wr_bytes=700,
instance_id='fake-instance-uuid', instance_id='fake-instance-uuid',