From 6b56c212e35b873a563a34166f57de56590c50e6 Mon Sep 17 00:00:00 2001 From: Boris Pavlovic Date: Sun, 25 Aug 2013 14:48:22 +0400 Subject: [PATCH] Add DB utils from Oslo There is a lot of work done around DB and sqlalachemy in OSLO: 1) session and egnine wrappers 2) base models 3) tests for migration on real backends 4) exception wrappers 5) provide work around unqiue constraints ... --- openstack-common.conf | 2 + rally/openstack/common/db/__init__.py | 16 + rally/openstack/common/db/api.py | 106 +++ rally/openstack/common/db/exception.py | 51 ++ .../common/db/sqlalchemy/__init__.py | 16 + .../common/db/sqlalchemy/migration.py | 278 ++++++ .../openstack/common/db/sqlalchemy/models.py | 108 +++ .../openstack/common/db/sqlalchemy/session.py | 793 ++++++++++++++++++ .../common/db/sqlalchemy/test_migrations.py | 289 +++++++ rally/openstack/common/db/sqlalchemy/utils.py | 501 +++++++++++ rally/openstack/common/excutils.py | 99 +++ rally/openstack/common/fileutils.py | 110 +++ rally/openstack/common/lockutils.py | 276 ++++++ 13 files changed, 2645 insertions(+) create mode 100644 rally/openstack/common/db/__init__.py create mode 100644 rally/openstack/common/db/api.py create mode 100644 rally/openstack/common/db/exception.py create mode 100644 rally/openstack/common/db/sqlalchemy/__init__.py create mode 100644 rally/openstack/common/db/sqlalchemy/migration.py create mode 100644 rally/openstack/common/db/sqlalchemy/models.py create mode 100644 rally/openstack/common/db/sqlalchemy/session.py create mode 100644 rally/openstack/common/db/sqlalchemy/test_migrations.py create mode 100644 rally/openstack/common/db/sqlalchemy/utils.py create mode 100644 rally/openstack/common/excutils.py create mode 100644 rally/openstack/common/fileutils.py create mode 100644 rally/openstack/common/lockutils.py diff --git a/openstack-common.conf b/openstack-common.conf index db5a720462..4d3b8ee9ea 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -3,6 +3,8 @@ # The list of modules to copy from oslo-incubator.git module=cliutils module=config +module=db +module=db.sqlalchemy module=gettextutils module=importutils module=log diff --git a/rally/openstack/common/db/__init__.py b/rally/openstack/common/db/__init__.py new file mode 100644 index 0000000000..1b9b60dec1 --- /dev/null +++ b/rally/openstack/common/db/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/rally/openstack/common/db/api.py b/rally/openstack/common/db/api.py new file mode 100644 index 0000000000..f9fa3d1fbe --- /dev/null +++ b/rally/openstack/common/db/api.py @@ -0,0 +1,106 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Multiple DB API backend support. + +Supported configuration options: + +The following two parameters are in the 'database' group: +`backend`: DB backend name or full module path to DB backend module. +`use_tpool`: Enable thread pooling of DB API calls. + +A DB backend module should implement a method named 'get_backend' which +takes no arguments. The method can return any object that implements DB +API methods. + +*NOTE*: There are bugs in eventlet when using tpool combined with +threading locks. The python logging module happens to use such locks. To +work around this issue, be sure to specify thread=False with +eventlet.monkey_patch(). + +A bug for eventlet has been filed here: + +https://bitbucket.org/eventlet/eventlet/issue/137/ +""" +import functools + +from oslo.config import cfg + +from rally.openstack.common import importutils +from rally.openstack.common import lockutils + + +db_opts = [ + cfg.StrOpt('backend', + default='sqlalchemy', + deprecated_name='db_backend', + deprecated_group='DEFAULT', + help='The backend to use for db'), + cfg.BoolOpt('use_tpool', + default=False, + deprecated_name='dbapi_use_tpool', + deprecated_group='DEFAULT', + help='Enable the experimental use of thread pooling for ' + 'all DB API calls') +] + +CONF = cfg.CONF +CONF.register_opts(db_opts, 'database') + + +class DBAPI(object): + def __init__(self, backend_mapping=None): + if backend_mapping is None: + backend_mapping = {} + self.__backend = None + self.__backend_mapping = backend_mapping + + @lockutils.synchronized('dbapi_backend', 'rally-') + def __get_backend(self): + """Get the actual backend. May be a module or an instance of + a class. Doesn't matter to us. We do this synchronized as it's + possible multiple greenthreads started very quickly trying to do + DB calls and eventlet can switch threads before self.__backend gets + assigned. + """ + if self.__backend: + # Another thread assigned it + return self.__backend + backend_name = CONF.database.backend + self.__use_tpool = CONF.database.use_tpool + if self.__use_tpool: + from eventlet import tpool + self.__tpool = tpool + # Import the untranslated name if we don't have a + # mapping. + backend_path = self.__backend_mapping.get(backend_name, + backend_name) + backend_mod = importutils.import_module(backend_path) + self.__backend = backend_mod.get_backend() + return self.__backend + + def __getattr__(self, key): + backend = self.__backend or self.__get_backend() + attr = getattr(backend, key) + if not self.__use_tpool or not hasattr(attr, '__call__'): + return attr + + def tpool_wrapper(*args, **kwargs): + return self.__tpool.execute(attr, *args, **kwargs) + + functools.update_wrapper(tpool_wrapper, attr) + return tpool_wrapper diff --git a/rally/openstack/common/db/exception.py b/rally/openstack/common/db/exception.py new file mode 100644 index 0000000000..e5a9fa59f5 --- /dev/null +++ b/rally/openstack/common/db/exception.py @@ -0,0 +1,51 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""DB related custom exceptions.""" + +from rally.openstack.common.gettextutils import _ # noqa + + +class DBError(Exception): + """Wraps an implementation specific exception.""" + def __init__(self, inner_exception=None): + self.inner_exception = inner_exception + super(DBError, self).__init__(str(inner_exception)) + + +class DBDuplicateEntry(DBError): + """Wraps an implementation specific exception.""" + def __init__(self, columns=[], inner_exception=None): + self.columns = columns + super(DBDuplicateEntry, self).__init__(inner_exception) + + +class DBDeadlock(DBError): + def __init__(self, inner_exception=None): + super(DBDeadlock, self).__init__(inner_exception) + + +class DBInvalidUnicodeParameter(Exception): + message = _("Invalid Parameter: " + "Unicode is not supported by the current database.") + + +class DbMigrationError(DBError): + """Wraps migration specific exception.""" + def __init__(self, message=None): + super(DbMigrationError, self).__init__(str(message)) diff --git a/rally/openstack/common/db/sqlalchemy/__init__.py b/rally/openstack/common/db/sqlalchemy/__init__.py new file mode 100644 index 0000000000..1b9b60dec1 --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/rally/openstack/common/db/sqlalchemy/migration.py b/rally/openstack/common/db/sqlalchemy/migration.py new file mode 100644 index 0000000000..13c525a4b5 --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/migration.py @@ -0,0 +1,278 @@ +# coding: utf-8 +# +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# Base on code in migrate/changeset/databases/sqlite.py which is under +# the following license: +# +# The MIT License +# +# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE + +import distutils.version as dist_version +import os +import re + +import migrate +from migrate.changeset import ansisql +from migrate.changeset.databases import sqlite +from migrate.versioning import util as migrate_util +import sqlalchemy +from sqlalchemy.schema import UniqueConstraint + +from rally.openstack.common.db import exception +from rally.openstack.common.db.sqlalchemy import session as db_session +from rally.openstack.common.gettextutils import _ # noqa + + +@migrate_util.decorator +def patched_with_engine(f, *a, **kw): + url = a[0] + engine = migrate_util.construct_engine(url, **kw) + + try: + kw['engine'] = engine + return f(*a, **kw) + finally: + if isinstance(engine, migrate_util.Engine) and engine is not url: + migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine) + engine.dispose() + + +# TODO(jkoelker) When migrate 0.7.3 is released and nova depends +# on that version or higher, this can be removed +MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3') +if (not hasattr(migrate, '__version__') or + dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION): + migrate_util.with_engine = patched_with_engine + + +# NOTE(jkoelker) Delay importing migrate until we are patched +from migrate import exceptions as versioning_exceptions +from migrate.versioning import api as versioning_api +from migrate.versioning.repository import Repository + +_REPOSITORY = None + +get_engine = db_session.get_engine + + +def _get_unique_constraints(self, table): + """Retrieve information about existing unique constraints of the table + + This feature is needed for _recreate_table() to work properly. + Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x. + + """ + + data = table.metadata.bind.execute( + """SELECT sql + FROM sqlite_master + WHERE + type='table' AND + name=:table_name""", + table_name=table.name + ).fetchone()[0] + + UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)" + return [ + UniqueConstraint( + *[getattr(table.columns, c.strip(' "')) for c in cols.split(",")], + name=name + ) + for name, cols in re.findall(UNIQUE_PATTERN, data) + ] + + +def _recreate_table(self, table, column=None, delta=None, omit_uniques=None): + """Recreate the table properly + + Unlike the corresponding original method of sqlalchemy-migrate this one + doesn't drop existing unique constraints when creating a new one. + + """ + + table_name = self.preparer.format_table(table) + + # we remove all indexes so as not to have + # problems during copy and re-create + for index in table.indexes: + index.drop() + + # reflect existing unique constraints + for uc in self._get_unique_constraints(table): + table.append_constraint(uc) + # omit given unique constraints when creating a new table if required + table.constraints = set([ + cons for cons in table.constraints + if omit_uniques is None or cons.name not in omit_uniques + ]) + + self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) + self.execute() + + insertion_string = self._modify_table(table, column, delta) + + table.create(bind=self.connection) + self.append(insertion_string % {'table_name': table_name}) + self.execute() + self.append('DROP TABLE migration_tmp') + self.execute() + + +def _visit_migrate_unique_constraint(self, *p, **k): + """Drop the given unique constraint + + The corresponding original method of sqlalchemy-migrate just + raises NotImplemented error + + """ + + self.recreate_table(p[0].table, omit_uniques=[p[0].name]) + + +def patch_migrate(): + """A workaround for SQLite's inability to alter things + + SQLite abilities to alter tables are very limited (please read + http://www.sqlite.org/lang_altertable.html for more details). + E. g. one can't drop a column or a constraint in SQLite. The + workaround for this is to recreate the original table omitting + the corresponding constraint (or column). + + sqlalchemy-migrate library has recreate_table() method that + implements this workaround, but it does it wrong: + + - information about unique constraints of a table + is not retrieved. So if you have a table with one + unique constraint and a migration adding another one + you will end up with a table that has only the + latter unique constraint, and the former will be lost + + - dropping of unique constraints is not supported at all + + The proper way to fix this is to provide a pull-request to + sqlalchemy-migrate, but the project seems to be dead. So we + can go on with monkey-patching of the lib at least for now. + + """ + + # this patch is needed to ensure that recreate_table() doesn't drop + # existing unique constraints of the table when creating a new one + helper_cls = sqlite.SQLiteHelper + helper_cls.recreate_table = _recreate_table + helper_cls._get_unique_constraints = _get_unique_constraints + + # this patch is needed to be able to drop existing unique constraints + constraint_cls = sqlite.SQLiteConstraintDropper + constraint_cls.visit_migrate_unique_constraint = \ + _visit_migrate_unique_constraint + constraint_cls.__bases__ = (ansisql.ANSIColumnDropper, + sqlite.SQLiteConstraintGenerator) + + +def db_sync(abs_path, version=None, init_version=0): + """Upgrade or downgrade a database. + + Function runs the upgrade() or downgrade() functions in change scripts. + + :param abs_path: Absolute path to migrate repository. + :param version: Database will upgrade/downgrade until this version. + If None - database will update to the latest + available version. + :param init_version: Initial database version + """ + if version is not None: + try: + version = int(version) + except ValueError: + raise exception.DbMigrationError( + message=_("version should be an integer")) + + current_version = db_version(abs_path, init_version) + repository = _find_migrate_repo(abs_path) + if version is None or version > current_version: + return versioning_api.upgrade(get_engine(), repository, version) + else: + return versioning_api.downgrade(get_engine(), repository, + version) + + +def db_version(abs_path, init_version): + """Show the current version of the repository. + + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + try: + return versioning_api.db_version(get_engine(), repository) + except versioning_exceptions.DatabaseNotControlledError: + meta = sqlalchemy.MetaData() + engine = get_engine() + meta.reflect(bind=engine) + tables = meta.tables + if len(tables) == 0: + db_version_control(abs_path, init_version) + return versioning_api.db_version(get_engine(), repository) + else: + # Some pre-Essex DB's may not be version controlled. + # Require them to upgrade using Essex first. + raise exception.DbMigrationError( + message=_("Upgrade DB using Essex release first.")) + + +def db_version_control(abs_path, version=None): + """Mark a database as under this repository's version control. + + Once a database is under version control, schema changes should + only be done via change scripts in this repository. + + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + versioning_api.version_control(get_engine(), repository, version) + return version + + +def _find_migrate_repo(abs_path): + """Get the project's change script repository + + :param abs_path: Absolute path to migrate repository + """ + global _REPOSITORY + if not os.path.exists(abs_path): + raise exception.DbMigrationError("Path %s not found" % abs_path) + if _REPOSITORY is None: + _REPOSITORY = Repository(abs_path) + return _REPOSITORY diff --git a/rally/openstack/common/db/sqlalchemy/models.py b/rally/openstack/common/db/sqlalchemy/models.py new file mode 100644 index 0000000000..8e8346c9f3 --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/models.py @@ -0,0 +1,108 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Piston Cloud Computing, Inc. +# Copyright 2012 Cloudscaling Group, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +SQLAlchemy models. +""" + +import six + +from sqlalchemy import Column, Integer +from sqlalchemy import DateTime +from sqlalchemy.orm import object_mapper + +from rally.openstack.common.db.sqlalchemy import session as sa +from rally.openstack.common import timeutils + + +class ModelBase(object): + """Base class for models.""" + __table_initialized__ = False + + def save(self, session=None): + """Save this object.""" + if not session: + session = sa.get_session() + # NOTE(boris-42): This part of code should be look like: + # sesssion.add(self) + # session.flush() + # But there is a bug in sqlalchemy and eventlet that + # raises NoneType exception if there is no running + # transaction and rollback is called. As long as + # sqlalchemy has this bug we have to create transaction + # explicity. + with session.begin(subtransactions=True): + session.add(self) + session.flush() + + def __setitem__(self, key, value): + setattr(self, key, value) + + def __getitem__(self, key): + return getattr(self, key) + + def get(self, key, default=None): + return getattr(self, key, default) + + def __iter__(self): + columns = dict(object_mapper(self).columns).keys() + # NOTE(russellb): Allow models to specify other keys that can be looked + # up, beyond the actual db columns. An example would be the 'name' + # property for an Instance. + if hasattr(self, '_extra_keys'): + columns.extend(self._extra_keys()) + self._i = iter(columns) + return self + + def next(self): + n = six.advance_iterator(self._i) + return n, getattr(self, n) + + def update(self, values): + """Make the model object behave like a dict.""" + for k, v in six.iteritems(values): + setattr(self, k, v) + + def iteritems(self): + """Make the model object behave like a dict. + + Includes attributes from joins. + """ + local = dict(self) + joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) + if not k[0] == '_']) + local.update(joined) + return local.iteritems() + + +class TimestampMixin(object): + created_at = Column(DateTime, default=timeutils.utcnow) + updated_at = Column(DateTime, onupdate=timeutils.utcnow) + + +class SoftDeleteMixin(object): + deleted_at = Column(DateTime) + deleted = Column(Integer, default=0) + + def soft_delete(self, session=None): + """Mark this object as deleted.""" + self.deleted = self.id + self.deleted_at = timeutils.utcnow() + self.save(session=session) diff --git a/rally/openstack/common/db/sqlalchemy/session.py b/rally/openstack/common/db/sqlalchemy/session.py new file mode 100644 index 0000000000..0232fe279c --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/session.py @@ -0,0 +1,793 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Session Handling for SQLAlchemy backend. + +Initializing: + +* Call set_defaults with the minimal of the following kwargs: + sql_connection, sqlite_db + + Example: + + session.set_defaults( + sql_connection="sqlite:///var/lib/rally/sqlite.db", + sqlite_db="/var/lib/rally/sqlite.db") + +Recommended ways to use sessions within this framework: + +* Don't use them explicitly; this is like running with AUTOCOMMIT=1. + model_query() will implicitly use a session when called without one + supplied. This is the ideal situation because it will allow queries + to be automatically retried if the database connection is interrupted. + + Note: Automatic retry will be enabled in a future patch. + + It is generally fine to issue several queries in a row like this. Even though + they may be run in separate transactions and/or separate sessions, each one + will see the data from the prior calls. If needed, undo- or rollback-like + functionality should be handled at a logical level. For an example, look at + the code around quotas and reservation_rollback(). + + Examples: + + def get_foo(context, foo): + return model_query(context, models.Foo).\ + filter_by(foo=foo).\ + first() + + def update_foo(context, id, newfoo): + model_query(context, models.Foo).\ + filter_by(id=id).\ + update({'foo': newfoo}) + + def create_foo(context, values): + foo_ref = models.Foo() + foo_ref.update(values) + foo_ref.save() + return foo_ref + + +* Within the scope of a single method, keeping all the reads and writes within + the context managed by a single session. In this way, the session's __exit__ + handler will take care of calling flush() and commit() for you. + If using this approach, you should not explicitly call flush() or commit(). + Any error within the context of the session will cause the session to emit + a ROLLBACK. If the connection is dropped before this is possible, the + database will implicitly rollback the transaction. + + Note: statements in the session scope will not be automatically retried. + + If you create models within the session, they need to be added, but you + do not need to call model.save() + + def create_many_foo(context, foos): + session = get_session() + with session.begin(): + for foo in foos: + foo_ref = models.Foo() + foo_ref.update(foo) + session.add(foo_ref) + + def update_bar(context, foo_id, newbar): + session = get_session() + with session.begin(): + foo_ref = model_query(context, models.Foo, session).\ + filter_by(id=foo_id).\ + first() + model_query(context, models.Bar, session).\ + filter_by(id=foo_ref['bar_id']).\ + update({'bar': newbar}) + + Note: update_bar is a trivially simple example of using "with session.begin". + Whereas create_many_foo is a good example of when a transaction is needed, + it is always best to use as few queries as possible. The two queries in + update_bar can be better expressed using a single query which avoids + the need for an explicit transaction. It can be expressed like so: + + def update_bar(context, foo_id, newbar): + subq = model_query(context, models.Foo.id).\ + filter_by(id=foo_id).\ + limit(1).\ + subquery() + model_query(context, models.Bar).\ + filter_by(id=subq.as_scalar()).\ + update({'bar': newbar}) + + For reference, this emits approximagely the following SQL statement: + + UPDATE bar SET bar = ${newbar} + WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); + +* Passing an active session between methods. Sessions should only be passed + to private methods. The private method must use a subtransaction; otherwise + SQLAlchemy will throw an error when you call session.begin() on an existing + transaction. Public methods should not accept a session parameter and should + not be involved in sessions within the caller's scope. + + Note that this incurs more overhead in SQLAlchemy than the above means + due to nesting transactions, and it is not possible to implicitly retry + failed database operations when using this approach. + + This also makes code somewhat more difficult to read and debug, because a + single database transaction spans more than one method. Error handling + becomes less clear in this situation. When this is needed for code clarity, + it should be clearly documented. + + def myfunc(foo): + session = get_session() + with session.begin(): + # do some database things + bar = _private_func(foo, session) + return bar + + def _private_func(foo, session=None): + if not session: + session = get_session() + with session.begin(subtransaction=True): + # do some other database things + return bar + + +There are some things which it is best to avoid: + +* Don't keep a transaction open any longer than necessary. + + This means that your "with session.begin()" block should be as short + as possible, while still containing all the related calls for that + transaction. + +* Avoid "with_lockmode('UPDATE')" when possible. + + In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match + any rows, it will take a gap-lock. This is a form of write-lock on the + "gap" where no rows exist, and prevents any other writes to that space. + This can effectively prevent any INSERT into a table by locking the gap + at the end of the index. Similar problems will occur if the SELECT FOR UPDATE + has an overly broad WHERE clause, or doesn't properly use an index. + + One idea proposed at ODS Fall '12 was to use a normal SELECT to test the + number of rows matching a query, and if only one row is returned, + then issue the SELECT FOR UPDATE. + + The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. + However, this can not be done until the "deleted" columns are removed and + proper UNIQUE constraints are added to the tables. + + +Enabling soft deletes: + +* To use/enable soft-deletes, the SoftDeleteMixin must be added + to your model class. For example: + + class NovaBase(models.SoftDeleteMixin, models.ModelBase): + pass + + +Efficient use of soft deletes: + +* There are two possible ways to mark a record as deleted: + model.soft_delete() and query.soft_delete(). + + model.soft_delete() method works with single already fetched entry. + query.soft_delete() makes only one db request for all entries that correspond + to query. + +* In almost all cases you should use query.soft_delete(). Some examples: + + def soft_delete_bar(): + count = model_query(BarModel).find(some_condition).soft_delete() + if count == 0: + raise Exception("0 entries were soft deleted") + + def complex_soft_delete_with_synchronization_bar(session=None): + if session is None: + session = get_session() + with session.begin(subtransactions=True): + count = model_query(BarModel).\ + find(some_condition).\ + soft_delete(synchronize_session=True) + # Here synchronize_session is required, because we + # don't know what is going on in outer session. + if count == 0: + raise Exception("0 entries were soft deleted") + +* There is only one situation where model.soft_delete() is appropriate: when + you fetch a single record, work with it, and mark it as deleted in the same + transaction. + + def soft_delete_bar_model(): + session = get_session() + with session.begin(): + bar_ref = model_query(BarModel).find(some_condition).first() + # Work with bar_ref + bar_ref.soft_delete(session=session) + + However, if you need to work with all entries that correspond to query and + then soft delete them you should use query.soft_delete() method: + + def soft_delete_multi_models(): + session = get_session() + with session.begin(): + query = model_query(BarModel, session=session).\ + find(some_condition) + model_refs = query.all() + # Work with model_refs + query.soft_delete(synchronize_session=False) + # synchronize_session=False should be set if there is no outer + # session and these entries are not used after this. + + When working with many rows, it is very important to use query.soft_delete, + which issues a single query. Using model.soft_delete(), as in the following + example, is very inefficient. + + for bar_ref in bar_refs: + bar_ref.soft_delete(session=session) + # This will produce count(bar_refs) db requests. +""" + +import functools +import os.path +import re +import time + +from eventlet import greenthread +from oslo.config import cfg +import six +from sqlalchemy import exc as sqla_exc +import sqlalchemy.interfaces +from sqlalchemy.interfaces import PoolListener +import sqlalchemy.orm +from sqlalchemy.pool import NullPool, StaticPool +from sqlalchemy.sql.expression import literal_column + +from rally.openstack.common.db import exception +from rally.openstack.common.gettextutils import _ # noqa +from rally.openstack.common import log as logging +from rally.openstack.common import timeutils + +sqlite_db_opts = [ + cfg.StrOpt('sqlite_db', + default='rally.sqlite', + help='the filename to use with sqlite'), + cfg.BoolOpt('sqlite_synchronous', + default=True, + help='If true, use synchronous mode for sqlite'), +] + +database_opts = [ + cfg.StrOpt('connection', + default='sqlite:///' + + os.path.abspath(os.path.join(os.path.dirname(__file__), + '../', '$sqlite_db')), + help='The SQLAlchemy connection string used to connect to the ' + 'database', + deprecated_opts=[cfg.DeprecatedOpt('sql_connection', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE')]), + cfg.StrOpt('slave_connection', + default='', + help='The SQLAlchemy connection string used to connect to the ' + 'slave database'), + cfg.IntOpt('idle_timeout', + default=3600, + deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE')], + help='timeout before idle sql connections are reaped'), + cfg.IntOpt('min_pool_size', + default=1, + deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', + group='DATABASE')], + help='Minimum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_pool_size', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', + group='DATABASE')], + help='Maximum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_retries', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', + group='DATABASE')], + help='maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('retry_interval', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', + group='DATABASE')], + help='interval between retries of opening a sql connection'), + cfg.IntOpt('max_overflow', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', + group='DATABASE')], + help='If set, use this value for max_overflow with sqlalchemy'), + cfg.IntOpt('connection_debug', + default=0, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('connection_trace', + default=False, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], + help='Add python stack traces to SQL as comment strings'), + cfg.IntOpt('pool_timeout', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], + help='If set, use this value for pool_timeout with sqlalchemy'), +] + +CONF = cfg.CONF +CONF.register_opts(sqlite_db_opts) +CONF.register_opts(database_opts, 'database') + +LOG = logging.getLogger(__name__) + +_ENGINE = None +_MAKER = None +_SLAVE_ENGINE = None +_SLAVE_MAKER = None + + +def set_defaults(sql_connection, sqlite_db, max_pool_size=None, + max_overflow=None, pool_timeout=None): + """Set defaults for configuration variables.""" + cfg.set_defaults(database_opts, + connection=sql_connection) + cfg.set_defaults(sqlite_db_opts, + sqlite_db=sqlite_db) + # Update the QueuePool defaults + if max_pool_size is not None: + cfg.set_defaults(database_opts, + max_pool_size=max_pool_size) + if max_overflow is not None: + cfg.set_defaults(database_opts, + max_overflow=max_overflow) + if pool_timeout is not None: + cfg.set_defaults(database_opts, + pool_timeout=pool_timeout) + + +def cleanup(): + global _ENGINE, _MAKER + global _SLAVE_ENGINE, _SLAVE_MAKER + + if _MAKER: + _MAKER.close_all() + _MAKER = None + if _ENGINE: + _ENGINE.dispose() + _ENGINE = None + if _SLAVE_MAKER: + _SLAVE_MAKER.close_all() + _SLAVE_MAKER = None + if _SLAVE_ENGINE: + _SLAVE_ENGINE.dispose() + _SLAVE_ENGINE = None + + +class SqliteForeignKeysListener(PoolListener): + """Ensures that the foreign key constraints are enforced in SQLite. + + The foreign key constraints are disabled by default in SQLite, + so the foreign key constraints will be enabled here for every + database connection + """ + def connect(self, dbapi_con, con_record): + dbapi_con.execute('pragma foreign_keys=ON') + + +def get_session(autocommit=True, expire_on_commit=False, + sqlite_fk=False, slave_session=False): + """Return a SQLAlchemy session.""" + global _MAKER + global _SLAVE_MAKER + maker = _MAKER + + if slave_session: + maker = _SLAVE_MAKER + + if maker is None: + engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session) + maker = get_maker(engine, autocommit, expire_on_commit) + + if slave_session: + _SLAVE_MAKER = maker + else: + _MAKER = maker + + session = maker() + return session + + +# note(boris-42): In current versions of DB backends unique constraint +# violation messages follow the structure: +# +# sqlite: +# 1 column - (IntegrityError) column c1 is not unique +# N columns - (IntegrityError) column c1, c2, ..., N are not unique +# +# postgres: +# 1 column - (IntegrityError) duplicate key value violates unique +# constraint "users_c1_key" +# N columns - (IntegrityError) duplicate key value violates unique +# constraint "name_of_our_constraint" +# +# mysql: +# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key +# 'c1'") +# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined +# with -' for key 'name_of_our_constraint'") +_DUP_KEY_RE_DB = { + "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), + "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), + "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") +} + + +def _raise_if_duplicate_entry_error(integrity_error, engine_name): + """Raise exception if two entries are duplicated. + + In this function will be raised DBDuplicateEntry exception if integrity + error wrap unique constraint violation. + """ + + def get_columns_from_uniq_cons_or_name(columns): + # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2" + # where `t` it is table name and columns `c1`, `c2` + # are in UniqueConstraint. + uniqbase = "uniq_" + if not columns.startswith(uniqbase): + if engine_name == "postgresql": + return [columns[columns.index("_") + 1:columns.rindex("_")]] + return [columns] + return columns[len(uniqbase):].split("0")[1:] + + if engine_name not in ["mysql", "sqlite", "postgresql"]: + return + + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. + m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) + if not m: + return + columns = m.group(1) + + if engine_name == "sqlite": + columns = columns.strip().split(", ") + else: + columns = get_columns_from_uniq_cons_or_name(columns) + raise exception.DBDuplicateEntry(columns, integrity_error) + + +# NOTE(comstud): In current versions of DB backends, Deadlock violation +# messages follow the structure: +# +# mysql: +# (OperationalError) (1213, 'Deadlock found when trying to get lock; try ' +# 'restarting transaction') +_DEADLOCK_RE_DB = { + "mysql": re.compile(r"^.*\(1213, 'Deadlock.*") +} + + +def _raise_if_deadlock_error(operational_error, engine_name): + """Raise exception on deadlock condition. + + Raise DBDeadlock exception if OperationalError contains a Deadlock + condition. + """ + re = _DEADLOCK_RE_DB.get(engine_name) + if re is None: + return + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. + m = re.match(operational_error.message) + if not m: + return + raise exception.DBDeadlock(operational_error) + + +def _wrap_db_error(f): + @functools.wraps(f) + def _wrap(*args, **kwargs): + try: + return f(*args, **kwargs) + except UnicodeEncodeError: + raise exception.DBInvalidUnicodeParameter() + # note(boris-42): We should catch unique constraint violation and + # wrap it by our own DBDuplicateEntry exception. Unique constraint + # violation is wrapped by IntegrityError. + except sqla_exc.OperationalError as e: + _raise_if_deadlock_error(e, get_engine().name) + # NOTE(comstud): A lot of code is checking for OperationalError + # so let's not wrap it for now. + raise + except sqla_exc.IntegrityError as e: + # note(boris-42): SqlAlchemy doesn't unify errors from different + # DBs so we must do this. Also in some tables (for example + # instance_types) there are more than one unique constraint. This + # means we should get names of columns, which values violate + # unique constraint, from error message. + _raise_if_duplicate_entry_error(e, get_engine().name) + raise exception.DBError(e) + except Exception as e: + LOG.exception(_('DB exception wrapped.')) + raise exception.DBError(e) + return _wrap + + +def get_engine(sqlite_fk=False, slave_engine=False): + """Return a SQLAlchemy engine.""" + global _ENGINE + global _SLAVE_ENGINE + engine = _ENGINE + db_uri = CONF.database.connection + + if slave_engine: + engine = _SLAVE_ENGINE + db_uri = CONF.database.slave_connection + + if engine is None: + engine = create_engine(db_uri, + sqlite_fk=sqlite_fk) + if slave_engine: + _SLAVE_ENGINE = engine + else: + _ENGINE = engine + + return engine + + +def _synchronous_switch_listener(dbapi_conn, connection_rec): + """Switch sqlite connections to non-synchronous mode.""" + dbapi_conn.execute("PRAGMA synchronous = OFF") + + +def _add_regexp_listener(dbapi_con, con_record): + """Add REGEXP function to sqlite connections.""" + + def regexp(expr, item): + reg = re.compile(expr) + return reg.search(six.text_type(item)) is not None + dbapi_con.create_function('regexp', 2, regexp) + + +def _greenthread_yield(dbapi_con, con_record): + """Ensure other greenthreads get a chance to be executed. + + Force a context switch. With common database backends (eg MySQLdb and + sqlite), there is no implicit yield caused by network I/O since they are + implemented by C libraries that eventlet cannot monkey patch. + """ + greenthread.sleep(0) + + +def _ping_listener(dbapi_conn, connection_rec, connection_proxy): + """Ensures that MySQL connections checked out of the pool are alive. + + Borrowed from: + http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f + """ + try: + dbapi_conn.cursor().execute('select 1') + except dbapi_conn.OperationalError as ex: + if ex.args[0] in (2006, 2013, 2014, 2045, 2055): + LOG.warn(_('Got mysql server has gone away: %s'), ex) + raise sqla_exc.DisconnectionError("Database server went away") + else: + raise + + +def _is_db_connection_error(args): + """Return True if error in connecting to db.""" + # NOTE(adam_g): This is currently MySQL specific and needs to be extended + # to support Postgres and others. + conn_err_codes = ('2002', '2003', '2006') + for err_code in conn_err_codes: + if args.find(err_code) != -1: + return True + return False + + +def create_engine(sql_connection, sqlite_fk=False): + """Return a new SQLAlchemy engine.""" + # NOTE(geekinutah): At this point we could be connecting to the normal + # db handle or the slave db handle. Things like + # _wrap_db_error aren't going to work well if their + # backends don't match. Let's check. + _assert_matching_drivers() + connection_dict = sqlalchemy.engine.url.make_url(sql_connection) + + engine_args = { + "pool_recycle": CONF.database.idle_timeout, + "echo": False, + 'convert_unicode': True, + } + + # Map our SQL debug level to SQLAlchemy's options + if CONF.database.connection_debug >= 100: + engine_args['echo'] = 'debug' + elif CONF.database.connection_debug >= 50: + engine_args['echo'] = True + + if "sqlite" in connection_dict.drivername: + if sqlite_fk: + engine_args["listeners"] = [SqliteForeignKeysListener()] + engine_args["poolclass"] = NullPool + + if CONF.database.connection == "sqlite://": + engine_args["poolclass"] = StaticPool + engine_args["connect_args"] = {'check_same_thread': False} + else: + if CONF.database.max_pool_size is not None: + engine_args['pool_size'] = CONF.database.max_pool_size + if CONF.database.max_overflow is not None: + engine_args['max_overflow'] = CONF.database.max_overflow + if CONF.database.pool_timeout is not None: + engine_args['pool_timeout'] = CONF.database.pool_timeout + + engine = sqlalchemy.create_engine(sql_connection, **engine_args) + + sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) + + if 'mysql' in connection_dict.drivername: + sqlalchemy.event.listen(engine, 'checkout', _ping_listener) + elif 'sqlite' in connection_dict.drivername: + if not CONF.sqlite_synchronous: + sqlalchemy.event.listen(engine, 'connect', + _synchronous_switch_listener) + sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) + + if (CONF.database.connection_trace and + engine.dialect.dbapi.__name__ == 'MySQLdb'): + _patch_mysqldb_with_stacktrace_comments() + + try: + engine.connect() + except sqla_exc.OperationalError as e: + if not _is_db_connection_error(e.args[0]): + raise + + remaining = CONF.database.max_retries + if remaining == -1: + remaining = 'infinite' + while True: + msg = _('SQL connection failed. %s attempts left.') + LOG.warn(msg % remaining) + if remaining != 'infinite': + remaining -= 1 + time.sleep(CONF.database.retry_interval) + try: + engine.connect() + break + except sqla_exc.OperationalError as e: + if (remaining != 'infinite' and remaining == 0) or \ + not _is_db_connection_error(e.args[0]): + raise + return engine + + +class Query(sqlalchemy.orm.query.Query): + """Subclass of sqlalchemy.query with soft_delete() method.""" + def soft_delete(self, synchronize_session='evaluate'): + return self.update({'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow()}, + synchronize_session=synchronize_session) + + +class Session(sqlalchemy.orm.session.Session): + """Custom Session class to avoid SqlAlchemy Session monkey patching.""" + @_wrap_db_error + def query(self, *args, **kwargs): + return super(Session, self).query(*args, **kwargs) + + @_wrap_db_error + def flush(self, *args, **kwargs): + return super(Session, self).flush(*args, **kwargs) + + @_wrap_db_error + def execute(self, *args, **kwargs): + return super(Session, self).execute(*args, **kwargs) + + +def get_maker(engine, autocommit=True, expire_on_commit=False): + """Return a SQLAlchemy sessionmaker using the given engine.""" + return sqlalchemy.orm.sessionmaker(bind=engine, + class_=Session, + autocommit=autocommit, + expire_on_commit=expire_on_commit, + query_cls=Query) + + +def _patch_mysqldb_with_stacktrace_comments(): + """Adds current stack trace as a comment in queries. + + Patches MySQLdb.cursors.BaseCursor._do_query. + """ + import MySQLdb.cursors + import traceback + + old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query + + def _do_query(self, q): + stack = '' + for file, line, method, function in traceback.extract_stack(): + # exclude various common things from trace + if file.endswith('session.py') and method == '_do_query': + continue + if file.endswith('api.py') and method == 'wrapper': + continue + if file.endswith('utils.py') and method == '_inner': + continue + if file.endswith('exception.py') and method == '_wrap': + continue + # db/api is just a wrapper around db/sqlalchemy/api + if file.endswith('db/api.py'): + continue + # only trace inside rally + index = file.rfind('rally') + if index == -1: + continue + stack += "File:%s:%s Method:%s() Line:%s | " \ + % (file[index:], line, method, function) + + # strip trailing " | " from stack + if stack: + stack = stack[:-3] + qq = "%s /* %s */" % (q, stack) + else: + qq = q + old_mysql_do_query(self, qq) + + setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) + + +def _assert_matching_drivers(): + """Make sure slave handle and normal handle have the same driver.""" + # NOTE(geekinutah): There's no use case for writing to one backend and + # reading from another. Who knows what the future holds? + if CONF.database.slave_connection == '': + return + + normal = sqlalchemy.engine.url.make_url(CONF.database.connection) + slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection) + assert normal.drivername == slave.drivername diff --git a/rally/openstack/common/db/sqlalchemy/test_migrations.py b/rally/openstack/common/db/sqlalchemy/test_migrations.py new file mode 100644 index 0000000000..a7a4794fe5 --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/test_migrations.py @@ -0,0 +1,289 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010-2011 OpenStack Foundation +# Copyright 2012-2013 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import commands +import ConfigParser +import os +import urlparse + +import sqlalchemy +import sqlalchemy.exc + +from rally.openstack.common import lockutils +from rally.openstack.common import log as logging +from rally.openstack.common import test + +LOG = logging.getLogger(__name__) + + +def _get_connect_string(backend, user, passwd, database): + """Get database connection + + Try to get a connection with a very specific set of values, if we get + these then we'll run the tests, otherwise they are skipped + """ + if backend == "postgres": + backend = "postgresql+psycopg2" + elif backend == "mysql": + backend = "mysql+mysqldb" + else: + raise Exception("Unrecognized backend: '%s'" % backend) + + return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" + % {'backend': backend, 'user': user, 'passwd': passwd, + 'database': database}) + + +def _is_backend_avail(backend, user, passwd, database): + try: + connect_uri = _get_connect_string(backend, user, passwd, database) + engine = sqlalchemy.create_engine(connect_uri) + connection = engine.connect() + except Exception: + # intentionally catch all to handle exceptions even if we don't + # have any backend code loaded. + return False + else: + connection.close() + engine.dispose() + return True + + +def _have_mysql(user, passwd, database): + present = os.environ.get('TEST_MYSQL_PRESENT') + if present is None: + return _is_backend_avail('mysql', user, passwd, database) + return present.lower() in ('', 'true') + + +def _have_postgresql(user, passwd, database): + present = os.environ.get('TEST_POSTGRESQL_PRESENT') + if present is None: + return _is_backend_avail('postgres', user, passwd, database) + return present.lower() in ('', 'true') + + +def get_db_connection_info(conn_pieces): + database = conn_pieces.path.strip('/') + loc_pieces = conn_pieces.netloc.split('@') + host = loc_pieces[1] + + auth_pieces = loc_pieces[0].split(':') + user = auth_pieces[0] + password = "" + if len(auth_pieces) > 1: + password = auth_pieces[1].strip() + + return (user, password, database, host) + + +class BaseMigrationTestCase(test.BaseTestCase): + """Base class fort testing of migration utils.""" + + def __init__(self, *args, **kwargs): + super(BaseMigrationTestCase, self).__init__(*args, **kwargs) + + self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), + 'test_migrations.conf') + # Test machines can set the TEST_MIGRATIONS_CONF variable + # to override the location of the config file for migration testing + self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF', + self.DEFAULT_CONFIG_FILE) + self.test_databases = {} + self.migration_api = None + + def setUp(self): + super(BaseMigrationTestCase, self).setUp() + + # Load test databases from the config file. Only do this + # once. No need to re-run this on each test... + LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH) + if os.path.exists(self.CONFIG_FILE_PATH): + cp = ConfigParser.RawConfigParser() + try: + cp.read(self.CONFIG_FILE_PATH) + defaults = cp.defaults() + for key, value in defaults.items(): + self.test_databases[key] = value + except ConfigParser.ParsingError as e: + self.fail("Failed to read test_migrations.conf config " + "file. Got error: %s" % e) + else: + self.fail("Failed to find test_migrations.conf config " + "file.") + + self.engines = {} + for key, value in self.test_databases.items(): + self.engines[key] = sqlalchemy.create_engine(value) + + # We start each test case with a completely blank slate. + self._reset_databases() + + def tearDown(self): + # We destroy the test data store between each test case, + # and recreate it, which ensures that we have no side-effects + # from the tests + self._reset_databases() + super(BaseMigrationTestCase, self).tearDown() + + def execute_cmd(self, cmd=None): + status, output = commands.getstatusoutput(cmd) + LOG.debug(output) + self.assertEqual(0, status, + "Failed to run: %s\n%s" % (cmd, output)) + + @lockutils.synchronized('pgadmin', 'tests-', external=True) + def _reset_pg(self, conn_pieces): + (user, password, database, host) = get_db_connection_info(conn_pieces) + os.environ['PGPASSWORD'] = password + os.environ['PGUSER'] = user + # note(boris-42): We must create and drop database, we can't + # drop database which we have connected to, so for such + # operations there is a special database template1. + sqlcmd = ("psql -w -U %(user)s -h %(host)s -c" + " '%(sql)s' -d template1") + + sql = ("drop database if exists %s;") % database + droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(droptable) + + sql = ("create database %s;") % database + createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(createtable) + + os.unsetenv('PGPASSWORD') + os.unsetenv('PGUSER') + + def _reset_databases(self): + for key, engine in self.engines.items(): + conn_string = self.test_databases[key] + conn_pieces = urlparse.urlparse(conn_string) + engine.dispose() + if conn_string.startswith('sqlite'): + # We can just delete the SQLite database, which is + # the easiest and cleanest solution + db_path = conn_pieces.path.strip('/') + if os.path.exists(db_path): + os.unlink(db_path) + # No need to recreate the SQLite DB. SQLite will + # create it for us if it's not there... + elif conn_string.startswith('mysql'): + # We can execute the MySQL client to destroy and re-create + # the MYSQL database, which is easier and less error-prone + # than using SQLAlchemy to do this via MetaData...trust me. + (user, password, database, host) = \ + get_db_connection_info(conn_pieces) + sql = ("drop database if exists %(db)s; " + "create database %(db)s;") % {'db': database} + cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s " + "-e \"%(sql)s\"") % {'user': user, 'password': password, + 'host': host, 'sql': sql} + self.execute_cmd(cmd) + elif conn_string.startswith('postgresql'): + self._reset_pg(conn_pieces) + + +class WalkVersionsMixin(object): + def _walk_versions(self, engine=None, snake_walk=False, downgrade=True): + # Determine latest version script from the repo, then + # upgrade from 1 through to the latest, with no data + # in the databases. This just checks that the schema itself + # upgrades successfully. + + # Place the database under version control + self.migration_api.version_control(engine, self.REPOSITORY, + self.INIT_VERSION) + self.assertEqual(self.INIT_VERSION, + self.migration_api.db_version(engine, + self.REPOSITORY)) + + LOG.debug('latest version is %s' % self.REPOSITORY.latest) + versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1) + + for version in versions: + # upgrade -> downgrade -> upgrade + self._migrate_up(engine, version, with_data=True) + if snake_walk: + downgraded = self._migrate_down( + engine, version - 1, with_data=True) + if downgraded: + self._migrate_up(engine, version) + + if downgrade: + # Now walk it back down to 0 from the latest, testing + # the downgrade paths. + for version in reversed(versions): + # downgrade -> upgrade -> downgrade + downgraded = self._migrate_down(engine, version - 1) + + if snake_walk and downgraded: + self._migrate_up(engine, version) + self._migrate_down(engine, version - 1) + + def _migrate_down(self, engine, version, with_data=False): + try: + self.migration_api.downgrade(engine, self.REPOSITORY, version) + except NotImplementedError: + # NOTE(sirp): some migrations, namely release-level + # migrations, don't support a downgrade. + return False + + self.assertEqual( + version, self.migration_api.db_version(engine, self.REPOSITORY)) + + # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target' + # version). So if we have any downgrade checks, they need to be run for + # the previous (higher numbered) migration. + if with_data: + post_downgrade = getattr( + self, "_post_downgrade_%03d" % (version + 1), None) + if post_downgrade: + post_downgrade(engine) + + return True + + def _migrate_up(self, engine, version, with_data=False): + """migrate up to a new version of the db. + + We allow for data insertion and post checks at every + migration version with special _pre_upgrade_### and + _check_### functions in the main test. + """ + # NOTE(sdague): try block is here because it's impossible to debug + # where a failed data migration happens otherwise + try: + if with_data: + data = None + pre_upgrade = getattr( + self, "_pre_upgrade_%03d" % version, None) + if pre_upgrade: + data = pre_upgrade(engine) + + self.migration_api.upgrade(engine, self.REPOSITORY, version) + self.assertEqual(version, + self.migration_api.db_version(engine, + self.REPOSITORY)) + if with_data: + check = getattr(self, "_check_%03d" % version, None) + if check: + check(engine, data) + except Exception: + LOG.error("Failed to migrate to version %s on engine %s" % + (version, engine)) + raise diff --git a/rally/openstack/common/db/sqlalchemy/utils.py b/rally/openstack/common/db/sqlalchemy/utils.py new file mode 100644 index 0000000000..82671b94a8 --- /dev/null +++ b/rally/openstack/common/db/sqlalchemy/utils.py @@ -0,0 +1,501 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2010-2011 OpenStack Foundation. +# Copyright 2012 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +from migrate.changeset import UniqueConstraint +import sqlalchemy +from sqlalchemy import Boolean +from sqlalchemy import CheckConstraint +from sqlalchemy import Column +from sqlalchemy.engine import reflection +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func +from sqlalchemy import Index +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy.sql import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.types import NullType + +from rally.openstack.common.gettextutils import _ # noqa + +from rally.openstack.common import log as logging +from rally.openstack.common import timeutils + + +LOG = logging.getLogger(__name__) + +_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+") + + +def sanitize_db_url(url): + match = _DBURL_REGEX.match(url) + if match: + return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):]) + return url + + +class InvalidSortKey(Exception): + message = _("Sort key supplied was not valid.") + + +# copy from glance/db/sqlalchemy/api.py +def paginate_query(query, model, limit, sort_keys, marker=None, + sort_dir=None, sort_dirs=None): + """Returns a query with sorting / pagination criteria added. + + Pagination works by requiring a unique sort_key, specified by sort_keys. + (If sort_keys is not unique, then we risk looping through values.) + We use the last row in the previous page as the 'marker' for pagination. + So we must return values that follow the passed marker in the order. + With a single-valued sort_key, this would be easy: sort_key > X. + With a compound-values sort_key, (k1, k2, k3) we must do this to repeat + the lexicographical ordering: + (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3) + + We also have to cope with different sort_directions. + + Typically, the id of the last row is used as the client-facing pagination + marker, then the actual marker object must be fetched from the db and + passed in to us as marker. + + :param query: the query object to which we should add paging/sorting + :param model: the ORM model class + :param limit: maximum number of items to return + :param sort_keys: array of attributes by which results should be sorted + :param marker: the last item of the previous page; we returns the next + results after this value. + :param sort_dir: direction in which results should be sorted (asc, desc) + :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys + + :rtype: sqlalchemy.orm.query.Query + :return: The query with sorting/pagination added. + """ + + if 'id' not in sort_keys: + # TODO(justinsb): If this ever gives a false-positive, check + # the actual primary key, rather than assuming its id + LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) + + assert(not (sort_dir and sort_dirs)) + + # Default the sort direction to ascending + if sort_dirs is None and sort_dir is None: + sort_dir = 'asc' + + # Ensure a per-column sort direction + if sort_dirs is None: + sort_dirs = [sort_dir for _sort_key in sort_keys] + + assert(len(sort_dirs) == len(sort_keys)) + + # Add sorting + for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): + try: + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + except KeyError: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) + try: + sort_key_attr = getattr(model, current_sort_key) + except AttributeError: + raise InvalidSortKey() + query = query.order_by(sort_dir_func(sort_key_attr)) + + # Add pagination + if marker is not None: + marker_values = [] + for sort_key in sort_keys: + v = getattr(marker, sort_key) + marker_values.append(v) + + # Build up an array of sort criteria as in the docstring + criteria_list = [] + for i in range(0, len(sort_keys)): + crit_attrs = [] + for j in range(0, i): + model_attr = getattr(model, sort_keys[j]) + crit_attrs.append((model_attr == marker_values[j])) + + model_attr = getattr(model, sort_keys[i]) + if sort_dirs[i] == 'desc': + crit_attrs.append((model_attr < marker_values[i])) + else: + crit_attrs.append((model_attr > marker_values[i])) + + criteria = sqlalchemy.sql.and_(*crit_attrs) + criteria_list.append(criteria) + + f = sqlalchemy.sql.or_(*criteria_list) + query = query.filter(f) + + if limit is not None: + query = query.limit(limit) + + return query + + +def get_table(engine, name): + """Returns an sqlalchemy table dynamically from db. + + Needed because the models don't work for us in migrations + as models will be far out of sync with the current data. + """ + metadata = MetaData() + metadata.bind = engine + return Table(name, metadata, autoload=True) + + +class InsertFromSelect(UpdateBase): + """Form the base for `INSERT INTO table (SELECT ... )` statement.""" + def __init__(self, table, select): + self.table = table + self.select = select + + +@compiles(InsertFromSelect) +def visit_insert_from_select(element, compiler, **kw): + """Form the `INSERT INTO table (SELECT ... )` statement.""" + return "INSERT INTO %s %s" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.select)) + + +class ColumnError(Exception): + """Error raised when no column or an invalid column is found.""" + + +def _get_not_supported_column(col_name_col_instance, column_name): + try: + column = col_name_col_instance[column_name] + except KeyError: + msg = _("Please specify column %s in col_name_col_instance " + "param. It is required because column has unsupported " + "type by sqlite).") + raise ColumnError(msg % column_name) + + if not isinstance(column, Column): + msg = _("col_name_col_instance param has wrong type of " + "column instance for column %s It should be instance " + "of sqlalchemy.Column.") + raise ColumnError(msg % column_name) + return column + + +def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, + **col_name_col_instance): + """Drop unique constraint from table. + + This method drops UC from table and works for mysql, postgresql and sqlite. + In mysql and postgresql we are able to use "alter table" construction. + Sqlalchemy doesn't support some sqlite column types and replaces their + type with NullType in metadata. We process these columns and replace + NullType with the correct column type. + + :param migrate_engine: sqlalchemy engine + :param table_name: name of table that contains uniq constraint. + :param uc_name: name of uniq constraint that will be dropped. + :param columns: columns that are in uniq constraint. + :param col_name_col_instance: contains pair column_name=column_instance. + column_instance is instance of Column. These params + are required only for columns that have unsupported + types by sqlite. For example BigInteger. + """ + + meta = MetaData() + meta.bind = migrate_engine + t = Table(table_name, meta, autoload=True) + + if migrate_engine.name == "sqlite": + override_cols = [ + _get_not_supported_column(col_name_col_instance, col.name) + for col in t.columns + if isinstance(col.type, NullType) + ] + for col in override_cols: + t.columns.replace(col) + + uc = UniqueConstraint(*columns, table=t, name=uc_name) + uc.drop() + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """Drop all old rows having the same values for columns in uc_columns. + + This method drop (or mark ad `deleted` if use_soft_delete is True) old + duplicate rows form table with name `table_name`. + + :param migrate_engine: Sqlalchemy engine + :param table_name: Table with duplicates + :param use_soft_delete: If True - values will be marked as `deleted`, + if False - values will be removed from table + :param uc_column_names: Unique constraint columns + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(columns_for_group_by) + + duplicated_rows_select = select(columns_for_select, + group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + is_none = None # workaround for pyflakes + delete_condition &= table.c.deleted_at == is_none + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = select([table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) + + +def _get_default_deleted_value(table): + if isinstance(table.c.id.type, Integer): + return 0 + if isinstance(table.c.id.type, String): + return "" + raise ColumnError(_("Unsupported id columns type")) + + +def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): + table = get_table(migrate_engine, table_name) + + insp = reflection.Inspector.from_engine(migrate_engine) + real_indexes = insp.get_indexes(table_name) + existing_index_names = dict( + [(index['name'], index['column_names']) for index in real_indexes]) + + # NOTE(boris-42): Restore indexes on `deleted` column + for index in indexes: + if 'deleted' not in index['column_names']: + continue + name = index['name'] + if name in existing_index_names: + column_names = [table.c[c] for c in existing_index_names[name]] + old_index = Index(name, *column_names, unique=index["unique"]) + old_index.drop(migrate_engine) + + column_names = [table.c[c] for c in index['column_names']] + new_index = Index(index["name"], *column_names, unique=index["unique"]) + new_index.create(migrate_engine) + + +def change_deleted_column_type_to_boolean(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_boolean_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + old_deleted = Column('old_deleted', Boolean, default=False) + old_deleted.create(table, populate_default=False) + + table.update().\ + where(table.c.deleted == table.c.id).\ + values(old_deleted=True).\ + execute() + + table.c.deleted.drop() + table.c.old_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, + **col_name_col_instance): + insp = reflection.Inspector.from_engine(migrate_engine) + table = get_table(migrate_engine, table_name) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', Boolean, default=0) + columns.append(column_copy) + + constraints = [constraint.copy() for constraint in table.constraints] + + meta = table.metadata + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + c_select = [] + for c in table.c: + if c.name != "deleted": + c_select.append(c) + else: + c_select.append(table.c.deleted == table.c.id) + + ins = InsertFromSelect(new_table, select(c_select)) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + new_table.update().\ + where(new_table.c.deleted == new_table.c.id).\ + values(deleted=True).\ + execute() + + +def change_deleted_column_type_to_id_type(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_id_type_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + new_deleted = Column('new_deleted', table.c.id.type, + default=_get_default_deleted_value(table)) + new_deleted.create(table, populate_default=True) + + deleted = True # workaround for pyflakes + table.update().\ + where(table.c.deleted == deleted).\ + values(new_deleted=table.c.id).\ + execute() + table.c.deleted.drop() + table.c.new_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, + **col_name_col_instance): + # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check + # constraints in sqlite DB and our `deleted` column has + # 2 check constraints. So there is only one way to remove + # these constraints: + # 1) Create new table with the same columns, constraints + # and indexes. (except deleted column). + # 2) Copy all data from old to new table. + # 3) Drop old table. + # 4) Rename new table to old table name. + insp = reflection.Inspector.from_engine(migrate_engine) + meta = MetaData(bind=migrate_engine) + table = Table(table_name, meta, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', table.c.id.type, + default=default_deleted_value) + columns.append(column_copy) + + def is_deleted_column_constraint(constraint): + # NOTE(boris-42): There is no other way to check is CheckConstraint + # associated with deleted column. + if not isinstance(constraint, CheckConstraint): + return False + sqltext = str(constraint.sqltext) + return (sqltext.endswith("deleted in (0, 1)") or + sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) + + constraints = [] + for constraint in table.constraints: + if not is_deleted_column_constraint(constraint): + constraints.append(constraint.copy()) + + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + ins = InsertFromSelect(new_table, table.select()) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + deleted = True # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=new_table.c.id).\ + execute() + + # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. + deleted = False # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=default_deleted_value).\ + execute() diff --git a/rally/openstack/common/excutils.py b/rally/openstack/common/excutils.py new file mode 100644 index 0000000000..58cbb139fb --- /dev/null +++ b/rally/openstack/common/excutils.py @@ -0,0 +1,99 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exception related utilities. +""" + +import logging +import sys +import time +import traceback + +from rally.openstack.common.gettextutils import _ # noqa + + +class save_and_reraise_exception(object): + """Save current exception, run some code and then re-raise. + + In some cases the exception context can be cleared, resulting in None + being attempted to be re-raised after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, run handler code, and + then re-raise the original exception. If another exception occurs, the + saved exception is logged and the new exception is re-raised. + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example: + + except Exception: + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False + """ + def __init__(self): + self.reraise = True + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + raise self.type_, self.value, self.tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + this_exc_message = unicode(exc) + if this_exc_message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + this_exc_message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = this_exc_message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/rally/openstack/common/fileutils.py b/rally/openstack/common/fileutils.py new file mode 100644 index 0000000000..943050a9e9 --- /dev/null +++ b/rally/openstack/common/fileutils.py @@ -0,0 +1,110 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import errno +import os + +from rally.openstack.common import excutils +from rally.openstack.common.gettextutils import _ # noqa +from rally.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +_FILE_CACHE = {} + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise + + +def read_cached_file(filename, force_reload=False): + """Read from a file if it has been modified. + + :param force_reload: Whether to reload the file. + :returns: A tuple with a boolean specifying if the data is fresh + or not. + """ + global _FILE_CACHE + + if force_reload and filename in _FILE_CACHE: + del _FILE_CACHE[filename] + + reloaded = False + mtime = os.path.getmtime(filename) + cache_info = _FILE_CACHE.setdefault(filename, {}) + + if not cache_info or mtime > cache_info.get('mtime', 0): + LOG.debug(_("Reloading cached file %s") % filename) + with open(filename) as fap: + cache_info['data'] = fap.read() + cache_info['mtime'] = mtime + reloaded = True + return (reloaded, cache_info['data']) + + +def delete_if_exists(path): + """Delete a file, but ignore file not found error. + + :param path: File to delete + """ + + try: + os.unlink(path) + except OSError as e: + if e.errno == errno.ENOENT: + return + else: + raise + + +@contextlib.contextmanager +def remove_path_on_error(path): + """Protect code that wants to operate on PATH atomically. + Any exception will cause PATH to be removed. + + :param path: File to work with + """ + try: + yield + except Exception: + with excutils.save_and_reraise_exception(): + delete_if_exists(path) + + +def file_open(*args, **kwargs): + """Open file + + see built-in file() documentation for more details + + Note: The reason this is kept in a separate module is to easily + be able to provide a stub module that doesn't alter system + state at all (for unit tests) + """ + return file(*args, **kwargs) diff --git a/rally/openstack/common/lockutils.py b/rally/openstack/common/lockutils.py new file mode 100644 index 0000000000..954338dfb1 --- /dev/null +++ b/rally/openstack/common/lockutils.py @@ -0,0 +1,276 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import errno +import functools +import os +import time +import weakref + +from eventlet import semaphore +from oslo.config import cfg + +from rally.openstack.common import fileutils +from rally.openstack.common.gettextutils import _ # noqa +from rally.openstack.common import local +from rally.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +util_opts = [ + cfg.BoolOpt('disable_process_locking', default=False, + help='Whether to disable inter-process locks'), + cfg.StrOpt('lock_path', + help=('Directory to use for lock files.')) +] + + +CONF = cfg.CONF +CONF.register_opts(util_opts) + + +def set_defaults(lock_path): + cfg.set_defaults(util_opts, lock_path=lock_path) + + +class _InterProcessLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. Exclusive + access between local threads should be achieved using the semaphores + in the @synchronized decorator. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self.lockfile = None + self.fname = name + + def __enter__(self): + self.lockfile = open(self.fname, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + return self + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + # external locks synchronise things like iptables + # updates - give it some time to prevent busy spinning + time.sleep(0.01) + else: + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.unlock() + self.lockfile.close() + except IOError: + LOG.exception(_("Could not release the acquired lock `%s`"), + self.fname) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_InterProcessLock): + def trylock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) + + +class _PosixLock(_InterProcessLock): + def trylock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_UN) + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock +else: + import fcntl + InterProcessLock = _PosixLock + +_semaphores = weakref.WeakValueDictionary() + + +@contextlib.contextmanager +def lock(name, lock_file_prefix=None, external=False, lock_path=None): + """Context based lock + + This function yields a `semaphore.Semaphore` instance unless external is + True, in which case, it'll yield an InterProcessLock instance. + + :param lock_file_prefix: The lock_file_prefix argument is used to provide + lock files on disk with a meaningful prefix. + + :param external: The external keyword argument denotes whether this lock + should work across multiple processes. This means that if two different + workers both run a a method decorated with @synchronized('mylock', + external=True), only one of them will execute at a time. + + :param lock_path: The lock_path keyword argument is used to specify a + special location for external lock files to live. If nothing is set, then + CONF.lock_path is used as a default. + """ + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn + # amically-allocating-and-destroying-mutexes + sem = _semaphores.get(name, semaphore.Semaphore()) + if name not in _semaphores: + # this check is not racy - we're already holding ref locally + # so GC won't remove the item and there was no IO switch + # (only valid in greenthreads) + _semaphores[name] = sem + + with sem: + LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s"'), + {'lock': name}) + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path or CONF.lock_path + if not local_lock_path: + raise cfg.RequiredOptError('lock_path') + + if not os.path.exists(local_lock_path): + fileutils.ensure_tree(local_lock_path) + LOG.info(_('Created lock path: %s'), local_lock_path) + + def add_prefix(name, prefix): + if not prefix: + return name + sep = '' if prefix.endswith('-') else '-' + return '%s%s%s' % (prefix, sep, name) + + # NOTE(mikal): the lock name cannot contain directory + # separators + lock_file_name = add_prefix(name.replace(os.sep, '_'), + lock_file_prefix) + + lock_file_path = os.path.join(local_lock_path, lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock as lock: + LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + yield lock + finally: + LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + else: + yield sem + + finally: + local.strong_store.locks_held.remove(name) + + +def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): + """Synchronization decorator. + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + """ + + def wrap(f): + @functools.wraps(f) + def inner(*args, **kwargs): + with lock(name, lock_file_prefix, external, lock_path): + LOG.debug(_('Got semaphore / lock "%(function)s"'), + {'function': f.__name__}) + return f(*args, **kwargs) + + LOG.debug(_('Semaphore / lock released "%(function)s"'), + {'function': f.__name__}) + return inner + return wrap + + +def synchronized_with_prefix(lock_file_prefix): + """Partial object generator for the synchronization decorator. + + Redefine @synchronized in each project like so:: + + (in nova/utils.py) + from nova.openstack.common import lockutils + + synchronized = lockutils.synchronized_with_prefix('nova-') + + + (in nova/foo.py) + from nova import utils + + @utils.synchronized('mylock') + def bar(self, *args): + ... + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. + """ + + return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)