diff --git a/neutron/openstack/common/db/__init__.py b/neutron/openstack/common/db/__init__.py index 1b9b60dec1..e69de29bb2 100644 --- a/neutron/openstack/common/db/__init__.py +++ b/neutron/openstack/common/db/__init__.py @@ -1,16 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/neutron/openstack/common/db/api.py b/neutron/openstack/common/db/api.py index e39fdc9034..a0cec8562d 100644 --- a/neutron/openstack/common/db/api.py +++ b/neutron/openstack/common/db/api.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2013 Rackspace Hosting # All Rights Reserved. # @@ -21,27 +19,21 @@ Supported configuration options: The following two parameters are in the 'database' group: `backend`: DB backend name or full module path to DB backend module. -`use_tpool`: Enable thread pooling of DB API calls. A DB backend module should implement a method named 'get_backend' which takes no arguments. The method can return any object that implements DB API methods. - -*NOTE*: There are bugs in eventlet when using tpool combined with -threading locks. The python logging module happens to use such locks. To -work around this issue, be sure to specify thread=False with -eventlet.monkey_patch(). - -A bug for eventlet has been filed here: - -https://bitbucket.org/eventlet/eventlet/issue/137/ """ + import functools +import logging +import time from oslo.config import cfg +from neutron.openstack.common.db import exception +from neutron.openstack.common.gettextutils import _ # noqa from neutron.openstack.common import importutils -from neutron.openstack.common import lockutils db_opts = [ @@ -50,57 +42,95 @@ db_opts = [ deprecated_name='db_backend', deprecated_group='DEFAULT', help='The backend to use for db'), - cfg.BoolOpt('use_tpool', + cfg.BoolOpt('use_db_reconnect', default=False, - deprecated_name='dbapi_use_tpool', - deprecated_group='DEFAULT', - help='Enable the experimental use of thread pooling for ' - 'all DB API calls') + help='Enable the experimental use of database reconnect ' + 'on connection lost'), + cfg.IntOpt('db_retry_interval', + default=1, + help='seconds between db connection retries'), + cfg.BoolOpt('db_inc_retry_interval', + default=True, + help='Whether to increase interval between db connection ' + 'retries, up to db_max_retry_interval'), + cfg.IntOpt('db_max_retry_interval', + default=10, + help='max seconds between db connection retries, if ' + 'db_inc_retry_interval is enabled'), + cfg.IntOpt('db_max_retries', + default=20, + help='maximum db connection retries before error is raised. ' + '(setting -1 implies an infinite retry count)'), ] CONF = cfg.CONF CONF.register_opts(db_opts, 'database') +LOG = logging.getLogger(__name__) + + +def safe_for_db_retry(f): + """Enable db-retry for decorated function, if config option enabled.""" + f.__dict__['enable_retry'] = True + return f + + +def _wrap_db_retry(f): + """Retry db.api methods, if DBConnectionError() raised + + Retry decorated db.api methods. If we enabled `use_db_reconnect` + in config, this decorator will be applied to all db.api functions, + marked with @safe_for_db_retry decorator. + Decorator catchs DBConnectionError() and retries function in a + loop until it succeeds, or until maximum retries count will be reached. + """ + @functools.wraps(f) + def wrapper(*args, **kwargs): + next_interval = CONF.database.db_retry_interval + remaining = CONF.database.db_max_retries + + while True: + try: + return f(*args, **kwargs) + except exception.DBConnectionError as e: + if remaining == 0: + LOG.exception(_('DB exceeded retry limit.')) + raise exception.DBError(e) + if remaining != -1: + remaining -= 1 + LOG.exception(_('DB connection error.')) + # NOTE(vsergeyev): We are using patched time module, so this + # effectively yields the execution context to + # another green thread. + time.sleep(next_interval) + if CONF.database.db_inc_retry_interval: + next_interval = min( + next_interval * 2, + CONF.database.db_max_retry_interval + ) + return wrapper + class DBAPI(object): def __init__(self, backend_mapping=None): if backend_mapping is None: backend_mapping = {} - self.__backend = None - self.__backend_mapping = backend_mapping - - @lockutils.synchronized('dbapi_backend', 'neutron-') - def __get_backend(self): - """Get the actual backend. May be a module or an instance of - a class. Doesn't matter to us. We do this synchronized as it's - possible multiple greenthreads started very quickly trying to do - DB calls and eventlet can switch threads before self.__backend gets - assigned. - """ - if self.__backend: - # Another thread assigned it - return self.__backend backend_name = CONF.database.backend - self.__use_tpool = CONF.database.use_tpool - if self.__use_tpool: - from eventlet import tpool - self.__tpool = tpool # Import the untranslated name if we don't have a # mapping. - backend_path = self.__backend_mapping.get(backend_name, - backend_name) + backend_path = backend_mapping.get(backend_name, backend_name) backend_mod = importutils.import_module(backend_path) self.__backend = backend_mod.get_backend() - return self.__backend def __getattr__(self, key): - backend = self.__backend or self.__get_backend() - attr = getattr(backend, key) - if not self.__use_tpool or not hasattr(attr, '__call__'): + attr = getattr(self.__backend, key) + + if not hasattr(attr, '__call__'): return attr + # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry + # DB API methods, decorated with @safe_for_db_retry + # on disconnect. + if CONF.database.use_db_reconnect and hasattr(attr, 'enable_retry'): + attr = _wrap_db_retry(attr) - def tpool_wrapper(*args, **kwargs): - return self.__tpool.execute(attr, *args, **kwargs) - - functools.update_wrapper(tpool_wrapper, attr) - return tpool_wrapper + return attr diff --git a/neutron/openstack/common/db/exception.py b/neutron/openstack/common/db/exception.py index ed435777bc..ee92ccfa6a 100644 --- a/neutron/openstack/common/db/exception.py +++ b/neutron/openstack/common/db/exception.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -18,6 +16,8 @@ """DB related custom exceptions.""" +import six + from neutron.openstack.common.gettextutils import _ @@ -25,7 +25,7 @@ class DBError(Exception): """Wraps an implementation specific exception.""" def __init__(self, inner_exception=None): self.inner_exception = inner_exception - super(DBError, self).__init__(str(inner_exception)) + super(DBError, self).__init__(six.text_type(inner_exception)) class DBDuplicateEntry(DBError): @@ -43,3 +43,14 @@ class DBDeadlock(DBError): class DBInvalidUnicodeParameter(Exception): message = _("Invalid Parameter: " "Unicode is not supported by the current database.") + + +class DbMigrationError(DBError): + """Wraps migration specific exception.""" + def __init__(self, message=None): + super(DbMigrationError, self).__init__(message) + + +class DBConnectionError(DBError): + """Wraps connection specific exception.""" + pass diff --git a/neutron/openstack/common/db/sqlalchemy/__init__.py b/neutron/openstack/common/db/sqlalchemy/__init__.py index 1b9b60dec1..e69de29bb2 100644 --- a/neutron/openstack/common/db/sqlalchemy/__init__.py +++ b/neutron/openstack/common/db/sqlalchemy/__init__.py @@ -1,16 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/neutron/openstack/common/db/sqlalchemy/models.py b/neutron/openstack/common/db/sqlalchemy/models.py index a72c4aa3e0..688f8c8874 100644 --- a/neutron/openstack/common/db/sqlalchemy/models.py +++ b/neutron/openstack/common/db/sqlalchemy/models.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2011 X.commerce, a business unit of eBay Inc. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. @@ -22,11 +20,13 @@ SQLAlchemy models. """ +import six + from sqlalchemy import Column, Integer from sqlalchemy import DateTime from sqlalchemy.orm import object_mapper -from neutron.openstack.common.db.sqlalchemy.session import get_session +from neutron.openstack.common.db.sqlalchemy import session as sa from neutron.openstack.common import timeutils @@ -37,15 +37,15 @@ class ModelBase(object): def save(self, session=None): """Save this object.""" if not session: - session = get_session() + session = sa.get_session() # NOTE(boris-42): This part of code should be look like: - # sesssion.add(self) + # session.add(self) # session.flush() # But there is a bug in sqlalchemy and eventlet that # raises NoneType exception if there is no running # transaction and rollback is called. As long as # sqlalchemy has this bug we have to create transaction - # explicity. + # explicitly. with session.begin(subtransactions=True): session.add(self) session.flush() @@ -59,23 +59,34 @@ class ModelBase(object): def get(self, key, default=None): return getattr(self, key, default) + @property + def _extra_keys(self): + """Specifies custom fields + + Subclasses can override this property to return a list + of custom fields that should be included in their dict + representation. + + For reference check tests/db/sqlalchemy/test_models.py + """ + return [] + def __iter__(self): columns = dict(object_mapper(self).columns).keys() # NOTE(russellb): Allow models to specify other keys that can be looked # up, beyond the actual db columns. An example would be the 'name' # property for an Instance. - if hasattr(self, '_extra_keys'): - columns.extend(self._extra_keys()) + columns.extend(self._extra_keys) self._i = iter(columns) return self def next(self): - n = self._i.next() + n = six.advance_iterator(self._i) return n, getattr(self, n) def update(self, values): """Make the model object behave like a dict.""" - for k, v in values.iteritems(): + for k, v in six.iteritems(values): setattr(self, k, v) def iteritems(self): @@ -84,15 +95,15 @@ class ModelBase(object): Includes attributes from joins. """ local = dict(self) - joined = dict([(k, v) for k, v in self.__dict__.iteritems() + joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) if not k[0] == '_']) local.update(joined) - return local.iteritems() + return six.iteritems(local) class TimestampMixin(object): - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) + created_at = Column(DateTime, default=lambda: timeutils.utcnow()) + updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow()) class SoftDeleteMixin(object): diff --git a/neutron/openstack/common/db/sqlalchemy/session.py b/neutron/openstack/common/db/sqlalchemy/session.py index 32793431dd..9e095f0d64 100644 --- a/neutron/openstack/common/db/sqlalchemy/session.py +++ b/neutron/openstack/common/db/sqlalchemy/session.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -20,41 +18,45 @@ Initializing: -* Call set_defaults with the minimal of the following kwargs: - sql_connection, sqlite_db +* Call `set_defaults()` with the minimal of the following kwargs: + ``sql_connection``, ``sqlite_db`` Example: + .. code:: python + session.set_defaults( sql_connection="sqlite:///var/lib/neutron/sqlite.db", sqlite_db="/var/lib/neutron/sqlite.db") Recommended ways to use sessions within this framework: -* Don't use them explicitly; this is like running with AUTOCOMMIT=1. - model_query() will implicitly use a session when called without one +* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. + `model_query()` will implicitly use a session when called without one supplied. This is the ideal situation because it will allow queries to be automatically retried if the database connection is interrupted. - Note: Automatic retry will be enabled in a future patch. + .. note:: Automatic retry will be enabled in a future patch. It is generally fine to issue several queries in a row like this. Even though they may be run in separate transactions and/or separate sessions, each one will see the data from the prior calls. If needed, undo- or rollback-like functionality should be handled at a logical level. For an example, look at - the code around quotas and reservation_rollback(). + the code around quotas and `reservation_rollback()`. Examples: + .. code:: python + def get_foo(context, foo): - return model_query(context, models.Foo).\ - filter_by(foo=foo).\ - first() + return (model_query(context, models.Foo). + filter_by(foo=foo). + first()) def update_foo(context, id, newfoo): - model_query(context, models.Foo).\ - filter_by(id=id).\ - update({'foo': newfoo}) + (model_query(context, models.Foo). + filter_by(id=id). + update({'foo': newfoo})) def create_foo(context, values): foo_ref = models.Foo() @@ -63,18 +65,26 @@ Recommended ways to use sessions within this framework: return foo_ref -* Within the scope of a single method, keeping all the reads and writes within - the context managed by a single session. In this way, the session's __exit__ - handler will take care of calling flush() and commit() for you. - If using this approach, you should not explicitly call flush() or commit(). - Any error within the context of the session will cause the session to emit - a ROLLBACK. If the connection is dropped before this is possible, the - database will implicitly rollback the transaction. +* Within the scope of a single method, keep all the reads and writes within + the context managed by a single session. In this way, the session's + `__exit__` handler will take care of calling `flush()` and `commit()` for + you. If using this approach, you should not explicitly call `flush()` or + `commit()`. Any error within the context of the session will cause the + session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be + raised in `session`'s `__exit__` handler, and any try/except within the + context managed by `session` will not be triggered. And catching other + non-database errors in the session will not trigger the ROLLBACK, so + exception handlers should always be outside the session, unless the + developer wants to do a partial commit on purpose. If the connection is + dropped before this is possible, the database will implicitly roll back the + transaction. - Note: statements in the session scope will not be automatically retried. + .. note:: Statements in the session scope will not be automatically retried. If you create models within the session, they need to be added, but you - do not need to call model.save() + do not need to call `model.save()`: + + .. code:: python def create_many_foo(context, foos): session = get_session() @@ -87,36 +97,62 @@ Recommended ways to use sessions within this framework: def update_bar(context, foo_id, newbar): session = get_session() with session.begin(): - foo_ref = model_query(context, models.Foo, session).\ - filter_by(id=foo_id).\ - first() - model_query(context, models.Bar, session).\ - filter_by(id=foo_ref['bar_id']).\ - update({'bar': newbar}) + foo_ref = (model_query(context, models.Foo, session). + filter_by(id=foo_id). + first()) + (model_query(context, models.Bar, session). + filter_by(id=foo_ref['bar_id']). + update({'bar': newbar})) - Note: update_bar is a trivially simple example of using "with session.begin". - Whereas create_many_foo is a good example of when a transaction is needed, - it is always best to use as few queries as possible. The two queries in - update_bar can be better expressed using a single query which avoids - the need for an explicit transaction. It can be expressed like so: + .. note:: `update_bar` is a trivially simple example of using + ``with session.begin``. Whereas `create_many_foo` is a good example of + when a transaction is needed, it is always best to use as few queries as + possible. + + The two queries in `update_bar` can be better expressed using a single query + which avoids the need for an explicit transaction. It can be expressed like + so: + + .. code:: python def update_bar(context, foo_id, newbar): - subq = model_query(context, models.Foo.id).\ - filter_by(id=foo_id).\ - limit(1).\ - subquery() - model_query(context, models.Bar).\ - filter_by(id=subq.as_scalar()).\ - update({'bar': newbar}) + subq = (model_query(context, models.Foo.id). + filter_by(id=foo_id). + limit(1). + subquery()) + (model_query(context, models.Bar). + filter_by(id=subq.as_scalar()). + update({'bar': newbar})) - For reference, this emits approximagely the following SQL statement: + For reference, this emits approximately the following SQL statement: + + .. code:: sql UPDATE bar SET bar = ${newbar} WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); + .. note:: `create_duplicate_foo` is a trivially simple example of catching an + exception while using ``with session.begin``. Here create two duplicate + instances with same primary key, must catch the exception out of context + managed by a single session: + + .. code:: python + + def create_duplicate_foo(context): + foo1 = models.Foo() + foo2 = models.Foo() + foo1.id = foo2.id = 1 + session = get_session() + try: + with session.begin(): + session.add(foo1) + session.add(foo2) + except exception.DBDuplicateEntry as e: + handle_error(e) + * Passing an active session between methods. Sessions should only be passed to private methods. The private method must use a subtransaction; otherwise - SQLAlchemy will throw an error when you call session.begin() on an existing + SQLAlchemy will throw an error when you call `session.begin()` on an existing transaction. Public methods should not accept a session parameter and should not be involved in sessions within the caller's scope. @@ -129,6 +165,8 @@ Recommended ways to use sessions within this framework: becomes less clear in this situation. When this is needed for code clarity, it should be clearly documented. + .. code:: python + def myfunc(foo): session = get_session() with session.begin(): @@ -148,13 +186,13 @@ There are some things which it is best to avoid: * Don't keep a transaction open any longer than necessary. - This means that your "with session.begin()" block should be as short + This means that your ``with session.begin()`` block should be as short as possible, while still containing all the related calls for that transaction. -* Avoid "with_lockmode('UPDATE')" when possible. +* Avoid ``with_lockmode('UPDATE')`` when possible. - In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match + In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match any rows, it will take a gap-lock. This is a form of write-lock on the "gap" where no rows exist, and prevents any other writes to that space. This can effectively prevent any INSERT into a table by locking the gap @@ -165,16 +203,19 @@ There are some things which it is best to avoid: number of rows matching a query, and if only one row is returned, then issue the SELECT FOR UPDATE. - The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. + The better long-term solution is to use + ``INSERT .. ON DUPLICATE KEY UPDATE``. However, this can not be done until the "deleted" columns are removed and proper UNIQUE constraints are added to the tables. Enabling soft deletes: -* To use/enable soft-deletes, the SoftDeleteMixin must be added +* To use/enable soft-deletes, the `SoftDeleteMixin` must be added to your model class. For example: + .. code:: python + class NovaBase(models.SoftDeleteMixin, models.ModelBase): pass @@ -182,13 +223,15 @@ Enabling soft deletes: Efficient use of soft deletes: * There are two possible ways to mark a record as deleted: - model.soft_delete() and query.soft_delete(). + `model.soft_delete()` and `query.soft_delete()`. - model.soft_delete() method works with single already fetched entry. - query.soft_delete() makes only one db request for all entries that correspond - to query. + The `model.soft_delete()` method works with a single already-fetched entry. + `query.soft_delete()` makes only one db request for all entries that + correspond to the query. -* In almost all cases you should use query.soft_delete(). Some examples: +* In almost all cases you should use `query.soft_delete()`. Some examples: + + .. code:: python def soft_delete_bar(): count = model_query(BarModel).find(some_condition).soft_delete() @@ -199,18 +242,20 @@ Efficient use of soft deletes: if session is None: session = get_session() with session.begin(subtransactions=True): - count = model_query(BarModel).\ - find(some_condition).\ - soft_delete(synchronize_session=True) + count = (model_query(BarModel). + find(some_condition). + soft_delete(synchronize_session=True)) # Here synchronize_session is required, because we # don't know what is going on in outer session. if count == 0: raise Exception("0 entries were soft deleted") -* There is only one situation where model.soft_delete() is appropriate: when +* There is only one situation where `model.soft_delete()` is appropriate: when you fetch a single record, work with it, and mark it as deleted in the same transaction. + .. code:: python + def soft_delete_bar_model(): session = get_session() with session.begin(): @@ -219,13 +264,15 @@ Efficient use of soft deletes: bar_ref.soft_delete(session=session) However, if you need to work with all entries that correspond to query and - then soft delete them you should use query.soft_delete() method: + then soft delete them you should use the `query.soft_delete()` method: + + .. code:: python def soft_delete_multi_models(): session = get_session() with session.begin(): - query = model_query(BarModel, session=session).\ - find(some_condition) + query = (model_query(BarModel, session=session). + find(some_condition)) model_refs = query.all() # Work with model_refs query.soft_delete(synchronize_session=False) @@ -233,23 +280,26 @@ Efficient use of soft deletes: # session and these entries are not used after this. When working with many rows, it is very important to use query.soft_delete, - which issues a single query. Using model.soft_delete(), as in the following + which issues a single query. Using `model.soft_delete()`, as in the following example, is very inefficient. + .. code:: python + for bar_ref in bar_refs: bar_ref.soft_delete(session=session) # This will produce count(bar_refs) db requests. + """ +import functools +import logging import os.path import re import time -from eventlet import greenthread from oslo.config import cfg import six from sqlalchemy import exc as sqla_exc -import sqlalchemy.interfaces from sqlalchemy.interfaces import PoolListener import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool @@ -257,18 +307,15 @@ from sqlalchemy.sql.expression import literal_column from neutron.openstack.common.db import exception from neutron.openstack.common.gettextutils import _ -from neutron.openstack.common import log as logging from neutron.openstack.common import timeutils -DEFAULT = 'DEFAULT' - sqlite_db_opts = [ cfg.StrOpt('sqlite_db', default='neutron.sqlite', - help='the filename to use with sqlite'), + help='The file name to use with SQLite'), cfg.BoolOpt('sqlite_synchronous', default=True, - help='If true, use synchronous mode for sqlite'), + help='If True, SQLite uses synchronous mode'), ] database_opts = [ @@ -278,76 +325,80 @@ database_opts = [ '../', '$sqlite_db')), help='The SQLAlchemy connection string used to connect to the ' 'database', - deprecated_name='sql_connection', - deprecated_group=DEFAULT, + secret=True, deprecated_opts=[cfg.DeprecatedOpt('sql_connection', - group='DATABASE')], - secret=True), + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE'), + cfg.DeprecatedOpt('connection', + group='sql'), ]), cfg.StrOpt('slave_connection', default='', + secret=True, help='The SQLAlchemy connection string used to connect to the ' - 'slave database', - secret=True), + 'slave database'), cfg.IntOpt('idle_timeout', default=3600, - deprecated_name='sql_idle_timeout', - deprecated_group=DEFAULT, deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', - group='DATABASE')], - help='timeout before idle sql connections are reaped'), + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE'), + cfg.DeprecatedOpt('idle_timeout', + group='sql')], + help='Timeout before idle sql connections are reaped'), cfg.IntOpt('min_pool_size', default=1, - deprecated_name='sql_min_pool_size', - deprecated_group=DEFAULT, deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', group='DATABASE')], help='Minimum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_pool_size', default=None, - deprecated_name='sql_max_pool_size', - deprecated_group=DEFAULT, deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', group='DATABASE')], help='Maximum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_retries', default=10, - deprecated_name='sql_max_retries', - deprecated_group=DEFAULT, deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', group='DATABASE')], - help='maximum db connection retries during startup. ' + help='Maximum db connection retries during startup. ' '(setting -1 implies an infinite retry count)'), cfg.IntOpt('retry_interval', default=10, - deprecated_name='sql_retry_interval', - deprecated_group=DEFAULT, - deprecated_opts=[cfg.DeprecatedOpt('reconnect_interval', + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', group='DATABASE')], - help='interval between retries of opening a sql connection'), + help='Interval between retries of opening a sql connection'), cfg.IntOpt('max_overflow', default=None, - deprecated_name='sql_max_overflow', - deprecated_group=DEFAULT, - deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_max_overflow', + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', group='DATABASE')], help='If set, use this value for max_overflow with sqlalchemy'), cfg.IntOpt('connection_debug', default=0, - deprecated_name='sql_connection_debug', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], help='Verbosity of SQL debugging information. 0=None, ' '100=Everything'), cfg.BoolOpt('connection_trace', default=False, - deprecated_name='sql_connection_trace', - deprecated_group=DEFAULT, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], help='Add python stack traces to SQL as comment strings'), cfg.IntOpt('pool_timeout', default=None, - deprecated_name='sqlalchemy_pool_timeout', - deprecated_group='DATABASE', + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], help='If set, use this value for pool_timeout with sqlalchemy'), ] @@ -411,8 +462,8 @@ class SqliteForeignKeysListener(PoolListener): dbapi_con.execute('pragma foreign_keys=ON') -def get_session(autocommit=True, expire_on_commit=False, - sqlite_fk=False, slave_session=False): +def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False, + slave_session=False, mysql_traditional_mode=False): """Return a SQLAlchemy session.""" global _MAKER global _SLAVE_MAKER @@ -422,7 +473,8 @@ def get_session(autocommit=True, expire_on_commit=False, maker = _SLAVE_MAKER if maker is None: - engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session) + engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session, + mysql_traditional_mode=mysql_traditional_mode) maker = get_maker(engine, autocommit, expire_on_commit) if slave_session: @@ -441,6 +493,11 @@ def get_session(autocommit=True, expire_on_commit=False, # 1 column - (IntegrityError) column c1 is not unique # N columns - (IntegrityError) column c1, c2, ..., N are not unique # +# sqlite since 3.7.16: +# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1 +# +# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2 +# # postgres: # 1 column - (IntegrityError) duplicate key value violates unique # constraint "users_c1_key" @@ -453,9 +510,10 @@ def get_session(autocommit=True, expire_on_commit=False, # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # with -' for key 'name_of_our_constraint'") _DUP_KEY_RE_DB = { - "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), - "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), - "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") + "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), + re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), + "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), + "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),) } @@ -480,13 +538,22 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): if engine_name not in ["mysql", "sqlite", "postgresql"]: return - m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) - if not m: + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. + for pattern in _DUP_KEY_RE_DB[engine_name]: + match = pattern.match(integrity_error.message) + if match: + break + else: return - columns = m.group(1) + + columns = match.group(1) if engine_name == "sqlite": - columns = columns.strip().split(", ") + columns = [c.split('.')[-1] for c in columns.strip().split(", ")] else: columns = get_columns_from_uniq_cons_or_name(columns) raise exception.DBDuplicateEntry(columns, integrity_error) @@ -512,6 +579,11 @@ def _raise_if_deadlock_error(operational_error, engine_name): re = _DEADLOCK_RE_DB.get(engine_name) if re is None: return + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. m = re.match(operational_error.message) if not m: return @@ -519,19 +591,21 @@ def _raise_if_deadlock_error(operational_error, engine_name): def _wrap_db_error(f): + @functools.wraps(f) def _wrap(*args, **kwargs): try: return f(*args, **kwargs) except UnicodeEncodeError: raise exception.DBInvalidUnicodeParameter() - # note(boris-42): We should catch unique constraint violation and - # wrap it by our own DBDuplicateEntry exception. Unique constraint - # violation is wrapped by IntegrityError. except sqla_exc.OperationalError as e: + _raise_if_db_connection_lost(e, get_engine()) _raise_if_deadlock_error(e, get_engine().name) # NOTE(comstud): A lot of code is checking for OperationalError # so let's not wrap it for now. raise + # note(boris-42): We should catch unique constraint violation and + # wrap it by our own DBDuplicateEntry exception. Unique constraint + # violation is wrapped by IntegrityError. except sqla_exc.IntegrityError as e: # note(boris-42): SqlAlchemy doesn't unify errors from different # DBs so we must do this. Also in some tables (for example @@ -543,11 +617,11 @@ def _wrap_db_error(f): except Exception as e: LOG.exception(_('DB exception wrapped.')) raise exception.DBError(e) - _wrap.func_name = f.func_name return _wrap -def get_engine(sqlite_fk=False, slave_engine=False): +def get_engine(sqlite_fk=False, slave_engine=False, + mysql_traditional_mode=False): """Return a SQLAlchemy engine.""" global _ENGINE global _SLAVE_ENGINE @@ -559,8 +633,8 @@ def get_engine(sqlite_fk=False, slave_engine=False): db_uri = CONF.database.slave_connection if engine is None: - engine = create_engine(db_uri, - sqlite_fk=sqlite_fk) + engine = create_engine(db_uri, sqlite_fk=sqlite_fk, + mysql_traditional_mode=mysql_traditional_mode) if slave_engine: _SLAVE_ENGINE = engine else: @@ -583,44 +657,77 @@ def _add_regexp_listener(dbapi_con, con_record): dbapi_con.create_function('regexp', 2, regexp) -def _greenthread_yield(dbapi_con, con_record): +def _thread_yield(dbapi_con, con_record): """Ensure other greenthreads get a chance to be executed. + If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will + execute instead of time.sleep(0). Force a context switch. With common database backends (eg MySQLdb and sqlite), there is no implicit yield caused by network I/O since they are implemented by C libraries that eventlet cannot monkey patch. """ - greenthread.sleep(0) + time.sleep(0) -def _ping_listener(dbapi_conn, connection_rec, connection_proxy): - """Ensures that MySQL connections checked out of the pool are alive. +def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): + """Ensures that MySQL and DB2 connections are alive. Borrowed from: http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f """ + cursor = dbapi_conn.cursor() try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warn(_('Got mysql server has gone away: %s'), ex) - raise sqla_exc.DisconnectionError("Database server went away") + ping_sql = 'select 1' + if engine.name == 'ibm_db_sa': + # DB2 requires a table expression + ping_sql = 'select 1 from (values (1)) AS t1' + cursor.execute(ping_sql) + except Exception as ex: + if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): + msg = _('Database server has gone away: %s') % ex + LOG.warning(msg) + raise sqla_exc.DisconnectionError(msg) else: raise +def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy): + """Set engine mode to 'traditional'. + + Required to prevent silent truncates at insert or update operations + under MySQL. By default MySQL truncates inserted string if it longer + than a declared field just with warning. That is fraught with data + corruption. + """ + dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;") + + def _is_db_connection_error(args): """Return True if error in connecting to db.""" # NOTE(adam_g): This is currently MySQL specific and needs to be extended # to support Postgres and others. - conn_err_codes = ('2002', '2003', '2006') + # For the db2, the error code is -30081 since the db2 is still not ready + conn_err_codes = ('2002', '2003', '2006', '2013', '-30081') for err_code in conn_err_codes: if args.find(err_code) != -1: return True return False -def create_engine(sql_connection, sqlite_fk=False): +def _raise_if_db_connection_lost(error, engine): + # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor) + # requires connection and cursor in incoming parameters, + # but we have no possibility to create connection if DB + # is not available, so in such case reconnect fails. + # But is_disconnect() ignores these parameters, so it + # makes sense to pass to function None as placeholder + # instead of connection and cursor. + if engine.dialect.is_disconnect(error, None, None): + raise exception.DBConnectionError(error) + + +def create_engine(sql_connection, sqlite_fk=False, + mysql_traditional_mode=False): """Return a new SQLAlchemy engine.""" # NOTE(geekinutah): At this point we could be connecting to the normal # db handle or the slave db handle. Things like @@ -659,10 +766,21 @@ def create_engine(sql_connection, sqlite_fk=False): engine = sqlalchemy.create_engine(sql_connection, **engine_args) - sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) + sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(engine, 'checkout', _ping_listener) + if engine.name in ['mysql', 'ibm_db_sa']: + callback = functools.partial(_ping_listener, engine) + sqlalchemy.event.listen(engine, 'checkout', callback) + if engine.name == 'mysql': + if mysql_traditional_mode: + sqlalchemy.event.listen(engine, 'checkout', + _set_mode_traditional) + else: + LOG.warning(_("This application has not enabled MySQL " + "traditional mode, which means silent " + "data corruption may occur. " + "Please encourage the application " + "developers to enable this mode.")) elif 'sqlite' in connection_dict.drivername: if not CONF.sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', @@ -684,7 +802,7 @@ def create_engine(sql_connection, sqlite_fk=False): remaining = 'infinite' while True: msg = _('SQL connection failed. %s attempts left.') - LOG.warn(msg % remaining) + LOG.warning(msg % remaining) if remaining != 'infinite': remaining -= 1 time.sleep(CONF.database.retry_interval) @@ -743,25 +861,25 @@ def _patch_mysqldb_with_stacktrace_comments(): def _do_query(self, q): stack = '' - for file, line, method, function in traceback.extract_stack(): + for filename, line, method, function in traceback.extract_stack(): # exclude various common things from trace - if file.endswith('session.py') and method == '_do_query': + if filename.endswith('session.py') and method == '_do_query': continue - if file.endswith('api.py') and method == 'wrapper': + if filename.endswith('api.py') and method == 'wrapper': continue - if file.endswith('utils.py') and method == '_inner': + if filename.endswith('utils.py') and method == '_inner': continue - if file.endswith('exception.py') and method == '_wrap': + if filename.endswith('exception.py') and method == '_wrap': continue # db/api is just a wrapper around db/sqlalchemy/api - if file.endswith('db/api.py'): + if filename.endswith('db/api.py'): continue # only trace inside neutron - index = file.rfind('neutron') + index = filename.rfind('neutron') if index == -1: continue stack += "File:%s:%s Method:%s() Line:%s | " \ - % (file[index:], line, method, function) + % (filename[index:], line, method, function) # strip trailing " | " from stack if stack: diff --git a/neutron/openstack/common/db/sqlalchemy/utils.py b/neutron/openstack/common/db/sqlalchemy/utils.py index f3d56bfdf1..ff35a12762 100644 --- a/neutron/openstack/common/db/sqlalchemy/utils.py +++ b/neutron/openstack/common/db/sqlalchemy/utils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2010-2011 OpenStack Foundation. @@ -18,16 +16,42 @@ # License for the specific language governing permissions and limitations # under the License. -"""Implementation of paginate query.""" +import logging +import re +from migrate.changeset import UniqueConstraint import sqlalchemy +from sqlalchemy import Boolean +from sqlalchemy import CheckConstraint +from sqlalchemy import Column +from sqlalchemy.engine import reflection +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func +from sqlalchemy import Index +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy.sql import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.types import NullType from neutron.openstack.common.gettextutils import _ -from neutron.openstack.common import log as logging +from neutron.openstack.common import timeutils LOG = logging.getLogger(__name__) +_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+") + + +def sanitize_db_url(url): + match = _DBURL_REGEX.match(url) + if match: + return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):]) + return url + class InvalidSortKey(Exception): message = _("Sort key supplied was not valid.") @@ -69,7 +93,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None, if 'id' not in sort_keys: # TODO(justinsb): If this ever gives a false-positive, check # the actual primary key, rather than assuming its id - LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) + LOG.warning(_('Id not in sort_keys; is sort_keys unique?')) assert(not (sort_dir and sort_dirs)) @@ -85,11 +109,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Add sorting for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - + try: + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + except KeyError: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) try: sort_key_attr = getattr(model, current_sort_key) except AttributeError: @@ -105,20 +132,17 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Build up an array of sort criteria as in the docstring criteria_list = [] - for i in range(0, len(sort_keys)): + for i in range(len(sort_keys)): crit_attrs = [] - for j in range(0, i): + for j in range(i): model_attr = getattr(model, sort_keys[j]) crit_attrs.append((model_attr == marker_values[j])) model_attr = getattr(model, sort_keys[i]) if sort_dirs[i] == 'desc': crit_attrs.append((model_attr < marker_values[i])) - elif sort_dirs[i] == 'asc': - crit_attrs.append((model_attr > marker_values[i])) else: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) + crit_attrs.append((model_attr > marker_values[i])) criteria = sqlalchemy.sql.and_(*crit_attrs) criteria_list.append(criteria) @@ -130,3 +154,394 @@ def paginate_query(query, model, limit, sort_keys, marker=None, query = query.limit(limit) return query + + +def get_table(engine, name): + """Returns an sqlalchemy table dynamically from db. + + Needed because the models don't work for us in migrations + as models will be far out of sync with the current data. + """ + metadata = MetaData() + metadata.bind = engine + return Table(name, metadata, autoload=True) + + +class InsertFromSelect(UpdateBase): + """Form the base for `INSERT INTO table (SELECT ... )` statement.""" + def __init__(self, table, select): + self.table = table + self.select = select + + +@compiles(InsertFromSelect) +def visit_insert_from_select(element, compiler, **kw): + """Form the `INSERT INTO table (SELECT ... )` statement.""" + return "INSERT INTO %s %s" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.select)) + + +class ColumnError(Exception): + """Error raised when no column or an invalid column is found.""" + + +def _get_not_supported_column(col_name_col_instance, column_name): + try: + column = col_name_col_instance[column_name] + except KeyError: + msg = _("Please specify column %s in col_name_col_instance " + "param. It is required because column has unsupported " + "type by sqlite).") + raise ColumnError(msg % column_name) + + if not isinstance(column, Column): + msg = _("col_name_col_instance param has wrong type of " + "column instance for column %s It should be instance " + "of sqlalchemy.Column.") + raise ColumnError(msg % column_name) + return column + + +def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, + **col_name_col_instance): + """Drop unique constraint from table. + + This method drops UC from table and works for mysql, postgresql and sqlite. + In mysql and postgresql we are able to use "alter table" construction. + Sqlalchemy doesn't support some sqlite column types and replaces their + type with NullType in metadata. We process these columns and replace + NullType with the correct column type. + + :param migrate_engine: sqlalchemy engine + :param table_name: name of table that contains uniq constraint. + :param uc_name: name of uniq constraint that will be dropped. + :param columns: columns that are in uniq constraint. + :param col_name_col_instance: contains pair column_name=column_instance. + column_instance is instance of Column. These params + are required only for columns that have unsupported + types by sqlite. For example BigInteger. + """ + + meta = MetaData() + meta.bind = migrate_engine + t = Table(table_name, meta, autoload=True) + + if migrate_engine.name == "sqlite": + override_cols = [ + _get_not_supported_column(col_name_col_instance, col.name) + for col in t.columns + if isinstance(col.type, NullType) + ] + for col in override_cols: + t.columns.replace(col) + + uc = UniqueConstraint(*columns, table=t, name=uc_name) + uc.drop() + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """Drop all old rows having the same values for columns in uc_columns. + + This method drop (or mark ad `deleted` if use_soft_delete is True) old + duplicate rows form table with name `table_name`. + + :param migrate_engine: Sqlalchemy engine + :param table_name: Table with duplicates + :param use_soft_delete: If True - values will be marked as `deleted`, + if False - values will be removed from table + :param uc_column_names: Unique constraint columns + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(columns_for_group_by) + + duplicated_rows_select = select(columns_for_select, + group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + is_none = None # workaround for pyflakes + delete_condition &= table.c.deleted_at == is_none + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = select([table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) + + +def _get_default_deleted_value(table): + if isinstance(table.c.id.type, Integer): + return 0 + if isinstance(table.c.id.type, String): + return "" + raise ColumnError(_("Unsupported id columns type")) + + +def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): + table = get_table(migrate_engine, table_name) + + insp = reflection.Inspector.from_engine(migrate_engine) + real_indexes = insp.get_indexes(table_name) + existing_index_names = dict( + [(index['name'], index['column_names']) for index in real_indexes]) + + # NOTE(boris-42): Restore indexes on `deleted` column + for index in indexes: + if 'deleted' not in index['column_names']: + continue + name = index['name'] + if name in existing_index_names: + column_names = [table.c[c] for c in existing_index_names[name]] + old_index = Index(name, *column_names, unique=index["unique"]) + old_index.drop(migrate_engine) + + column_names = [table.c[c] for c in index['column_names']] + new_index = Index(index["name"], *column_names, unique=index["unique"]) + new_index.create(migrate_engine) + + +def change_deleted_column_type_to_boolean(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_boolean_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + old_deleted = Column('old_deleted', Boolean, default=False) + old_deleted.create(table, populate_default=False) + + table.update().\ + where(table.c.deleted == table.c.id).\ + values(old_deleted=True).\ + execute() + + table.c.deleted.drop() + table.c.old_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, + **col_name_col_instance): + insp = reflection.Inspector.from_engine(migrate_engine) + table = get_table(migrate_engine, table_name) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', Boolean, default=0) + columns.append(column_copy) + + constraints = [constraint.copy() for constraint in table.constraints] + + meta = table.metadata + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + c_select = [] + for c in table.c: + if c.name != "deleted": + c_select.append(c) + else: + c_select.append(table.c.deleted == table.c.id) + + ins = InsertFromSelect(new_table, select(c_select)) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + new_table.update().\ + where(new_table.c.deleted == new_table.c.id).\ + values(deleted=True).\ + execute() + + +def change_deleted_column_type_to_id_type(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_id_type_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + new_deleted = Column('new_deleted', table.c.id.type, + default=_get_default_deleted_value(table)) + new_deleted.create(table, populate_default=True) + + deleted = True # workaround for pyflakes + table.update().\ + where(table.c.deleted == deleted).\ + values(new_deleted=table.c.id).\ + execute() + table.c.deleted.drop() + table.c.new_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, + **col_name_col_instance): + # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check + # constraints in sqlite DB and our `deleted` column has + # 2 check constraints. So there is only one way to remove + # these constraints: + # 1) Create new table with the same columns, constraints + # and indexes. (except deleted column). + # 2) Copy all data from old to new table. + # 3) Drop old table. + # 4) Rename new table to old table name. + insp = reflection.Inspector.from_engine(migrate_engine) + meta = MetaData(bind=migrate_engine) + table = Table(table_name, meta, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', table.c.id.type, + default=default_deleted_value) + columns.append(column_copy) + + def is_deleted_column_constraint(constraint): + # NOTE(boris-42): There is no other way to check is CheckConstraint + # associated with deleted column. + if not isinstance(constraint, CheckConstraint): + return False + sqltext = str(constraint.sqltext) + return (sqltext.endswith("deleted in (0, 1)") or + sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) + + constraints = [] + for constraint in table.constraints: + if not is_deleted_column_constraint(constraint): + constraints.append(constraint.copy()) + + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + ins = InsertFromSelect(new_table, table.select()) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + deleted = True # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=new_table.c.id).\ + execute() + + # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. + deleted = False # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=default_deleted_value).\ + execute() + + +def get_connect_string(backend, database, user=None, passwd=None): + """Get database connection + + Try to get a connection with a very specific set of values, if we get + these then we'll run the tests, otherwise they are skipped + """ + args = {'backend': backend, + 'user': user, + 'passwd': passwd, + 'database': database} + if backend == 'sqlite': + template = '%(backend)s:///%(database)s' + else: + template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" + return template % args + + +def is_backend_avail(backend, database, user=None, passwd=None): + try: + connect_uri = get_connect_string(backend=backend, + database=database, + user=user, + passwd=passwd) + engine = sqlalchemy.create_engine(connect_uri) + connection = engine.connect() + except Exception: + # intentionally catch all to handle exceptions even if we don't + # have any backend code loaded. + return False + else: + connection.close() + engine.dispose() + return True + + +def get_db_connection_info(conn_pieces): + database = conn_pieces.path.strip('/') + loc_pieces = conn_pieces.netloc.split('@') + host = loc_pieces[1] + + auth_pieces = loc_pieces[0].split(':') + user = auth_pieces[0] + password = "" + if len(auth_pieces) > 1: + password = auth_pieces[1].strip() + + return (user, password, database, host)