Merge "Update oslo db"

This commit is contained in:
Jenkins 2014-01-29 10:28:28 +00:00 committed by Gerrit Code Review
commit 8edfb48d34
11 changed files with 164 additions and 155 deletions

View File

@ -755,11 +755,6 @@
# Deprecated group/name - [DEFAULT]/db_backend
#backend=sqlalchemy
# Enable the experimental use of thread pooling for all DB API
# calls (boolean value)
# Deprecated group/name - [DEFAULT]/dbapi_use_tpool
#use_tpool=false
#
# Options defined in heat.openstack.common.db.sqlalchemy.session
@ -780,6 +775,7 @@
# value)
# Deprecated group/name - [DEFAULT]/sql_idle_timeout
# Deprecated group/name - [DATABASE]/sql_idle_timeout
# Deprecated group/name - [sql]/idle_timeout
#idle_timeout=3600
# Minimum number of SQL connections to keep open in a pool

View File

@ -1,14 +0,0 @@
# Copyright 2012 Cloudscaling Group, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -19,27 +19,15 @@ Supported configuration options:
The following two parameters are in the 'database' group:
`backend`: DB backend name or full module path to DB backend module.
`use_tpool`: Enable thread pooling of DB API calls.
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
API methods.
*NOTE*: There are bugs in eventlet when using tpool combined with
threading locks. The python logging module happens to use such locks. To
work around this issue, be sure to specify thread=False with
eventlet.monkey_patch().
A bug for eventlet has been filed here:
https://bitbucket.org/eventlet/eventlet/issue/137/
"""
import functools
from oslo.config import cfg
from heat.openstack.common import importutils
from heat.openstack.common import lockutils
db_opts = [
@ -48,12 +36,6 @@ db_opts = [
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.BoolOpt('use_tpool',
default=False,
deprecated_name='dbapi_use_tpool',
deprecated_group='DEFAULT',
help='Enable the experimental use of thread pooling for '
'all DB API calls')
]
CONF = cfg.CONF
@ -64,41 +46,12 @@ class DBAPI(object):
def __init__(self, backend_mapping=None):
if backend_mapping is None:
backend_mapping = {}
self.__backend = None
self.__backend_mapping = backend_mapping
@lockutils.synchronized('dbapi_backend', 'heat-')
def __get_backend(self):
"""Get the actual backend. May be a module or an instance of
a class. Doesn't matter to us. We do this synchronized as it's
possible multiple greenthreads started very quickly trying to do
DB calls and eventlet can switch threads before self.__backend gets
assigned.
"""
if self.__backend:
# Another thread assigned it
return self.__backend
backend_name = CONF.database.backend
self.__use_tpool = CONF.database.use_tpool
if self.__use_tpool:
from eventlet import tpool
self.__tpool = tpool
# Import the untranslated name if we don't have a
# mapping.
backend_path = self.__backend_mapping.get(backend_name,
backend_name)
backend_path = backend_mapping.get(backend_name, backend_name)
backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend()
return self.__backend
def __getattr__(self, key):
backend = self.__backend or self.__get_backend()
attr = getattr(backend, key)
if not self.__use_tpool or not hasattr(attr, '__call__'):
return attr
def tpool_wrapper(*args, **kwargs):
return self.__tpool.execute(attr, *args, **kwargs)
functools.update_wrapper(tpool_wrapper, attr)
return tpool_wrapper
return getattr(self.__backend, key)

View File

@ -16,7 +16,7 @@
"""DB related custom exceptions."""
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common.gettextutils import _
class DBError(Exception):

View File

@ -1,14 +0,0 @@
# Copyright 2012 Cloudscaling Group, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -82,7 +82,6 @@ from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
_REPOSITORY = None
get_engine = db_session.get_engine
@ -221,6 +220,7 @@ def db_sync(abs_path, version=None, init_version=0):
current_version = db_version(abs_path, init_version)
repository = _find_migrate_repo(abs_path)
_db_schema_sanity_check()
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
@ -228,6 +228,22 @@ def db_sync(abs_path, version=None, init_version=0):
version)
def _db_schema_sanity_check():
engine = get_engine()
if engine.name == 'mysql':
onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
'from information_schema.TABLES '
'where TABLE_SCHEMA=%s and '
'TABLE_COLLATION NOT LIKE "%%utf8%%"')
table_names = [res[0] for res in engine.execute(onlyutf8_sql,
engine.url.database)]
if len(table_names) > 0:
raise ValueError(_('Tables "%s" have non utf8 collation, '
'please make sure all tables are CHARSET=utf8'
) % ','.join(table_names))
def db_version(abs_path, init_version):
"""Show the current version of the repository.
@ -246,10 +262,11 @@ def db_version(abs_path, init_version):
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
else:
# Some pre-Essex DB's may not be version controlled.
# Require them to upgrade using Essex first.
raise exception.DbMigrationError(
message=_("Upgrade DB using Essex release first."))
message=_(
"The database is not under version control, but has "
"tables. Please stamp the current version of the schema "
"manually."))
def db_version_control(abs_path, version=None):
@ -271,9 +288,6 @@ def _find_migrate_repo(abs_path):
:param abs_path: Absolute path to migrate repository
"""
global _REPOSITORY
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
if _REPOSITORY is None:
_REPOSITORY = Repository(abs_path)
return _REPOSITORY
return Repository(abs_path)

View File

@ -59,7 +59,16 @@ class ModelBase(object):
def get(self, key, default=None):
return getattr(self, key, default)
def _get_extra_keys(self):
@property
def _extra_keys(self):
"""Specifies custom fields
Subclasses can override this property to return a list
of custom fields that should be included in their dict
representation.
For reference check tests/db/sqlalchemy/test_models.py
"""
return []
def __iter__(self):
@ -67,7 +76,7 @@ class ModelBase(object):
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
columns.extend(self._get_extra_keys())
columns.extend(self._extra_keys)
self._i = iter(columns)
return self
@ -89,12 +98,12 @@ class ModelBase(object):
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
return six.iteritems(local)
class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
created_at = Column(DateTime, default=lambda: timeutils.utcnow())
updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
class SoftDeleteMixin(object):

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
@ -22,6 +20,7 @@ import os
import random
import string
from six import moves
import sqlalchemy
from heat.openstack.common.db import exception as exc
@ -34,7 +33,8 @@ def _gen_credentials(*names):
"""Generate credentials."""
auth_dict = {}
for name in names:
val = ''.join(random.choice(string.lowercase) for i in xrange(10))
val = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
auth_dict[name] = val
return auth_dict

View File

@ -21,7 +21,7 @@ Initializing:
* Call set_defaults with the minimal of the following kwargs:
sql_connection, sqlite_db
Example:
Example::
session.set_defaults(
sql_connection="sqlite:///var/lib/heat/sqlite.db",
@ -42,17 +42,17 @@ Recommended ways to use sessions within this framework:
functionality should be handled at a logical level. For an example, look at
the code around quotas and reservation_rollback().
Examples:
Examples::
def get_foo(context, foo):
return model_query(context, models.Foo).\
filter_by(foo=foo).\
first()
return (model_query(context, models.Foo).
filter_by(foo=foo).
first())
def update_foo(context, id, newfoo):
model_query(context, models.Foo).\
filter_by(id=id).\
update({'foo': newfoo})
(model_query(context, models.Foo).
filter_by(id=id).
update({'foo': newfoo}))
def create_foo(context, values):
foo_ref = models.Foo()
@ -66,14 +66,21 @@ Recommended ways to use sessions within this framework:
handler will take care of calling flush() and commit() for you.
If using this approach, you should not explicitly call flush() or commit().
Any error within the context of the session will cause the session to emit
a ROLLBACK. If the connection is dropped before this is possible, the
database will implicitly rollback the transaction.
a ROLLBACK. Database Errors like IntegrityError will be raised in
session's __exit__ handler, and any try/except within the context managed
by session will not be triggered. And catching other non-database errors in
the session will not trigger the ROLLBACK, so exception handlers should
always be outside the session, unless the developer wants to do a partial
commit on purpose. If the connection is dropped before this is possible,
the database will implicitly roll back the transaction.
Note: statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you
do not need to call model.save()
::
def create_many_foo(context, foos):
session = get_session()
with session.begin():
@ -85,33 +92,50 @@ Recommended ways to use sessions within this framework:
def update_bar(context, foo_id, newbar):
session = get_session()
with session.begin():
foo_ref = model_query(context, models.Foo, session).\
filter_by(id=foo_id).\
first()
model_query(context, models.Bar, session).\
filter_by(id=foo_ref['bar_id']).\
update({'bar': newbar})
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
first())
(model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
Note: update_bar is a trivially simple example of using "with session.begin".
Whereas create_many_foo is a good example of when a transaction is needed,
it is always best to use as few queries as possible. The two queries in
update_bar can be better expressed using a single query which avoids
the need for an explicit transaction. It can be expressed like so:
the need for an explicit transaction. It can be expressed like so::
def update_bar(context, foo_id, newbar):
subq = model_query(context, models.Foo.id).\
filter_by(id=foo_id).\
limit(1).\
subquery()
model_query(context, models.Bar).\
filter_by(id=subq.as_scalar()).\
update({'bar': newbar})
subq = (model_query(context, models.Foo.id).
filter_by(id=foo_id).
limit(1).
subquery())
(model_query(context, models.Bar).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
For reference, this emits approximately the following SQL statement:
For reference, this emits approximately the following SQL statement::
UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
Note: create_duplicate_foo is a trivially simple example of catching an
exception while using "with session.begin". Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = get_session()
try:
with session.begin():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call session.begin() on an existing
@ -127,6 +151,8 @@ Recommended ways to use sessions within this framework:
becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented.
::
def myfunc(foo):
session = get_session()
with session.begin():
@ -171,7 +197,7 @@ There are some things which it is best to avoid:
Enabling soft deletes:
* To use/enable soft-deletes, the SoftDeleteMixin must be added
to your model class. For example:
to your model class. For example::
class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass
@ -179,14 +205,15 @@ Enabling soft deletes:
Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted:
* There are two possible ways to mark a record as deleted::
model.soft_delete() and query.soft_delete().
model.soft_delete() method works with single already fetched entry.
query.soft_delete() makes only one db request for all entries that correspond
to query.
* In almost all cases you should use query.soft_delete(). Some examples:
* In almost all cases you should use query.soft_delete(). Some examples::
def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete()
@ -197,9 +224,9 @@ Efficient use of soft deletes:
if session is None:
session = get_session()
with session.begin(subtransactions=True):
count = model_query(BarModel).\
find(some_condition).\
soft_delete(synchronize_session=True)
count = (model_query(BarModel).
find(some_condition).
soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we
# don't know what is going on in outer session.
if count == 0:
@ -209,6 +236,8 @@ Efficient use of soft deletes:
you fetch a single record, work with it, and mark it as deleted in the same
transaction.
::
def soft_delete_bar_model():
session = get_session()
with session.begin():
@ -217,13 +246,13 @@ Efficient use of soft deletes:
bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and
then soft delete them you should use query.soft_delete() method:
then soft delete them you should use query.soft_delete() method::
def soft_delete_multi_models():
session = get_session()
with session.begin():
query = model_query(BarModel, session=session).\
find(some_condition)
query = (model_query(BarModel, session=session).
find(some_condition))
model_refs = query.all()
# Work with model_refs
query.soft_delete(synchronize_session=False)
@ -234,6 +263,8 @@ Efficient use of soft deletes:
which issues a single query. Using model.soft_delete(), as in the following
example, is very inefficient.
::
for bar_ref in bar_refs:
bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests.
@ -247,14 +278,13 @@ import time
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
import sqlalchemy.interfaces
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from heat.openstack.common.db import exception
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
@ -274,6 +304,7 @@ database_opts = [
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
@ -282,6 +313,7 @@ database_opts = [
group='sql'), ]),
cfg.StrOpt('slave_connection',
default='',
secret=True,
help='The SQLAlchemy connection string used to connect to the '
'slave database'),
cfg.IntOpt('idle_timeout',
@ -289,7 +321,9 @@ database_opts = [
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE')],
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
@ -407,8 +441,8 @@ class SqliteForeignKeysListener(PoolListener):
dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False,
sqlite_fk=False, slave_session=False):
def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
slave_session=False, mysql_traditional_mode=False):
"""Return a SQLAlchemy session."""
global _MAKER
global _SLAVE_MAKER
@ -418,7 +452,8 @@ def get_session(autocommit=True, expire_on_commit=False,
maker = _SLAVE_MAKER
if maker is None:
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session)
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
mysql_traditional_mode=mysql_traditional_mode)
maker = get_maker(engine, autocommit, expire_on_commit)
if slave_session:
@ -437,6 +472,11 @@ def get_session(autocommit=True, expire_on_commit=False,
# 1 column - (IntegrityError) column c1 is not unique
# N columns - (IntegrityError) column c1, c2, ..., N are not unique
#
# sqlite since 3.7.16:
# 1 column - (IntegrityError) UNIQUE constraint failed: k1
#
# N columns - (IntegrityError) UNIQUE constraint failed: k1, k2
#
# postgres:
# 1 column - (IntegrityError) duplicate key value violates unique
# constraint "users_c1_key"
@ -449,9 +489,10 @@ def get_session(autocommit=True, expire_on_commit=False,
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'")
_DUP_KEY_RE_DB = {
"sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
"postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),
"mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$")
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
}
@ -481,10 +522,14 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)
if not m:
for pattern in _DUP_KEY_RE_DB[engine_name]:
match = pattern.match(integrity_error.message)
if match:
break
else:
return
columns = m.group(1)
columns = match.group(1)
if engine_name == "sqlite":
columns = columns.strip().split(", ")
@ -553,7 +598,8 @@ def _wrap_db_error(f):
return _wrap
def get_engine(sqlite_fk=False, slave_engine=False):
def get_engine(sqlite_fk=False, slave_engine=False,
mysql_traditional_mode=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
global _SLAVE_ENGINE
@ -565,8 +611,8 @@ def get_engine(sqlite_fk=False, slave_engine=False):
db_uri = CONF.database.slave_connection
if engine is None:
engine = create_engine(db_uri,
sqlite_fk=sqlite_fk)
engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
mysql_traditional_mode=mysql_traditional_mode)
if slave_engine:
_SLAVE_ENGINE = engine
else:
@ -623,6 +669,17 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
"""Set engine mode to 'traditional'.
Required to prevent silent truncates at insert or update operations
under MySQL. By default MySQL truncates inserted string if it longer
than a declared field just with warning. That is fraught with data
corruption.
"""
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
@ -635,7 +692,8 @@ def _is_db_connection_error(args):
return False
def create_engine(sql_connection, sqlite_fk=False):
def create_engine(sql_connection, sqlite_fk=False,
mysql_traditional_mode=False):
"""Return a new SQLAlchemy engine."""
# NOTE(geekinutah): At this point we could be connecting to the normal
# db handle or the slave db handle. Things like
@ -679,6 +737,13 @@ def create_engine(sql_connection, sqlite_fk=False):
if engine.name in ['mysql', 'ibm_db_sa']:
callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
if mysql_traditional_mode:
sqlalchemy.event.listen(engine, 'checkout', _set_mode_traditional)
else:
LOG.warning(_("This application has not enabled MySQL traditional"
" mode, which means silent data corruption may"
" occur. Please encourage the application"
" developers to enable this mode."))
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',

View File

@ -14,12 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import functools
import os
import subprocess
import lockfile
from six import moves
import sqlalchemy
import sqlalchemy.exc
@ -130,13 +130,13 @@ class BaseMigrationTestCase(test.BaseTestCase):
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
cp = moves.configparser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except ConfigParser.ParsingError as e:
except moves.configparser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:

View File

@ -36,7 +36,7 @@ from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
@ -133,9 +133,9 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Build up an array of sort criteria as in the docstring
criteria_list = []
for i in range(0, len(sort_keys)):
for i in range(len(sort_keys)):
crit_attrs = []
for j in range(0, i):
for j in range(i):
model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j]))