Update openstack.common.db

This updates the openstack.common.db part of Oslo so it stops using
openstack.common.exception. The openstack.common.exception is now
deprecated and has been removed from oslo-incubator.

Change-Id: Ic6ef0e698e5060614859bc3894eb71524c8fce21
This commit is contained in:
Julien Danjou 2013-10-16 11:02:16 +02:00
parent 4603a7ab5d
commit a05dc4b9cd
5 changed files with 159 additions and 17 deletions

View File

@ -43,3 +43,9 @@ class DBDeadlock(DBError):
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(str(message))

View File

@ -38,12 +38,53 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
import distutils.version as dist_version
import os
import re
import migrate
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate.versioning import util as migrate_util
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from heat.openstack.common.db import exception
from heat.openstack.common.db.sqlalchemy import session as db_session
from heat.openstack.common.gettextutils import _ # noqa
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
engine = migrate_util.construct_engine(url, **kw)
try:
kw['engine'] = engine
return f(*a, **kw)
finally:
if isinstance(engine, migrate_util.Engine) and engine is not url:
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
engine.dispose()
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
# on that version or higher, this can be removed
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
if (not hasattr(migrate, '__version__') or
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
migrate_util.with_engine = patched_with_engine
# NOTE(jkoelker) Delay importing migrate until we are patched
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
_REPOSITORY = None
get_engine = db_session.get_engine
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
@ -157,3 +198,81 @@ def patch_migrate():
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)
def db_sync(abs_path, version=None, init_version=0):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
available version.
:param init_version: Initial database version
"""
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(abs_path, init_version)
repository = _find_migrate_repo(abs_path)
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
version)
def db_version(abs_path, init_version):
"""Show the current version of the repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(get_engine(), repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0:
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
else:
# Some pre-Essex DB's may not be version controlled.
# Require them to upgrade using Essex first.
raise exception.DbMigrationError(
message=_("Upgrade DB using Essex release first."))
def db_version_control(abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(get_engine(), repository, version)
return version
def _find_migrate_repo(abs_path):
"""Get the project's change script repository
:param abs_path: Absolute path to migrate repository
"""
global _REPOSITORY
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
if _REPOSITORY is None:
_REPOSITORY = Repository(abs_path)
return _REPOSITORY

View File

@ -61,13 +61,15 @@ class ModelBase(object):
def get(self, key, default=None):
return getattr(self, key, default)
def _get_extra_keys(self):
return []
def __iter__(self):
columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
if hasattr(self, '_extra_keys'):
columns.extend(self._extra_keys())
columns.extend(self._get_extra_keys())
self._i = iter(columns)
return self

View File

@ -241,11 +241,11 @@ Efficient use of soft deletes:
# This will produce count(bar_refs) db requests.
"""
import functools
import os.path
import re
import time
from eventlet import greenthread
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
@ -279,13 +279,13 @@ database_opts = [
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE')],
secret=True),
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection',
default='',
help='The SQLAlchemy connection string used to connect to the '
'slave database',
secret=True),
'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
@ -527,6 +527,7 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
@ -551,7 +552,6 @@ def _wrap_db_error(f):
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap
@ -591,14 +591,16 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record):
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
greenthread.sleep(0)
time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
@ -667,7 +669,7 @@ def create_engine(sql_connection, sqlite_fk=False):
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)

View File

@ -18,6 +18,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
@ -38,13 +40,21 @@ from sqlalchemy.types import NullType
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import exception
from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
@ -175,6 +185,10 @@ def visit_insert_from_select(element, compiler, **kw):
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
@ -182,13 +196,13 @@ def _get_not_supported_column(col_name_col_instance, column_name):
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
return column
@ -286,8 +300,7 @@ def _get_default_deleted_value(table):
return 0
if isinstance(table.c.id.type, String):
return ""
raise exception.OpenstackException(
message=_("Unsupported id columns type"))
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
@ -357,7 +370,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
constraints = [constraint.copy() for constraint in table.constraints]
meta = MetaData(bind=migrate_engine)
meta = table.metadata
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()