Merge "Enhanced fixtures for enginefacade-based provisioning"

This commit is contained in:
Jenkins 2016-11-09 18:29:02 +00:00 committed by Gerrit Code Review
commit a198b0f788
19 changed files with 916 additions and 39 deletions

View File

@ -262,6 +262,46 @@ class _TransactionFactory(object):
return self._legacy_facade
def get_writer_engine(self):
"""Return the writer engine for this factory.
Implies start.
"""
if not self._started:
self._start()
return self._writer_engine
def get_reader_engine(self):
"""Return the reader engine for this factory.
Implies start.
"""
if not self._started:
self._start()
return self._reader_engine
def get_writer_maker(self):
"""Return the writer sessionmaker for this factory.
Implies start.
"""
if not self._started:
self._start()
return self._writer_maker
def get_reader_maker(self):
"""Return the reader sessionmaker for this factory.
Implies start.
"""
if not self._started:
self._start()
return self._reader_maker
def _create_connection(self, mode):
if not self._started:
self._start()
@ -666,6 +706,36 @@ class _TransactionContextManager(object):
return self._factory.get_legacy_facade()
def get_engine(self):
"""Return the Engine in use.
This will be based on the state being WRITER or READER.
This implies a start operation.
"""
if self._mode is _WRITER:
return self._factory.get_writer_engine()
elif self._mode is _READER:
return self._factory.get_reader_engine()
else:
raise ValueError("mode should be WRITER or READER")
def get_sessionmaker(self):
"""Return the sessionmaker in use.
This will be based on the state being WRITER or READER.
This implies a start operation.
"""
if self._mode is _WRITER:
return self._factory.get_writer_maker()
elif self._mode is _READER:
return self._factory.get_reader_maker()
else:
raise ValueError("mode should be WRITER or READER")
def dispose_pool(self):
"""Call engine.pool.dispose() on underlying Engine objects."""
self._factory.dispose_pool()

View File

@ -76,14 +76,24 @@ class Schema(object):
class BackendResource(testresources.TestResourceManager):
def __init__(self, database_type):
def __init__(self, database_type, ad_hoc_url=None):
super(BackendResource, self).__init__()
self.database_type = database_type
self.backend = Backend.backend_for_database_type(self.database_type)
self.ad_hoc_url = ad_hoc_url
if ad_hoc_url is None:
self.backend = Backend.backend_for_database_type(
self.database_type)
else:
self.backend = Backend(self.database_type, ad_hoc_url)
self.backend._verify()
def make(self, dependency_resources):
return self.backend
def clean(self, resource):
self.backend._dispose()
def isDirty(self):
return False
@ -100,9 +110,11 @@ class DatabaseResource(testresources.TestResourceManager):
"""
def __init__(self, database_type, _enginefacade=None):
def __init__(self, database_type, _enginefacade=None,
provision_new_database=False, ad_hoc_url=None):
super(DatabaseResource, self).__init__()
self.database_type = database_type
self.provision_new_database = provision_new_database
# NOTE(zzzeek) the _enginefacade is an optional argument
# here in order to accomodate Neutron's current direct use
@ -114,38 +126,42 @@ class DatabaseResource(testresources.TestResourceManager):
else:
self._enginefacade = enginefacade._context_manager
self.resources = [
('backend', BackendResource(database_type))
('backend', BackendResource(database_type, ad_hoc_url))
]
def make(self, dependency_resources):
backend = dependency_resources['backend']
_enginefacade = self._enginefacade.make_new_manager()
db_token = _random_ident()
url = backend.provisioned_database_url(db_token)
if self.provision_new_database:
db_token = _random_ident()
url = backend.provisioned_database_url(db_token)
LOG.info(
"CREATE BACKEND %s TOKEN %s", backend.engine.url, db_token)
backend.create_named_database(db_token, conditional=True)
else:
db_token = None
url = backend.url
_enginefacade.configure(
logging_name="%s@%s" % (self.database_type, db_token))
LOG.info(
"CREATE BACKEND %s TOKEN %s", backend.engine.url, db_token)
backend.create_named_database(db_token, conditional=True)
_enginefacade._factory._start(connection=url)
engine = _enginefacade._factory._writer_engine
return ProvisionedDatabase(backend, _enginefacade, engine, db_token)
def clean(self, resource):
resource.engine.dispose()
LOG.info(
"DROP BACKEND %s TOKEN %s",
resource.backend.engine, resource.db_token)
resource.backend.drop_named_database(resource.db_token)
if self.provision_new_database:
LOG.info(
"DROP BACKEND %s TOKEN %s",
resource.backend.engine, resource.db_token)
resource.backend.drop_named_database(resource.db_token)
def isDirty(self):
return False
@debtcollector.removals.removed_class("TransactionResource")
class TransactionResource(testresources.TestResourceManager):
def __init__(self, database_resource, schema_resource):
@ -299,6 +315,10 @@ class Backend(object):
conn.close()
return eng
def _dispose(self):
"""Dispose main resources of this backend."""
self.impl.dispose(self.engine)
def create_named_database(self, ident, conditional=False):
"""Create a database with the given name."""
@ -400,6 +420,10 @@ class BackendImpl(object):
supports_drop_fk = True
def dispose(self, engine):
LOG.info("DISPOSE ENGINE %s", engine)
engine.dispose()
@classmethod
def all_impls(cls):
"""Return an iterator of all possible BackendImpl objects.
@ -567,6 +591,17 @@ class SQLiteBackendImpl(BackendImpl):
supports_drop_fk = False
def dispose(self, engine):
LOG.info("DISPOSE ENGINE %s", engine)
engine.dispose()
url = engine.url
self._drop_url_file(url, True)
def _drop_url_file(self, url, conditional):
filename = url.database
if filename and (not conditional or os.access(filename, os.F_OK)):
os.remove(filename)
def create_opportunistic_driver_url(self):
return "sqlite://"

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import debtcollector
import fixtures
import testresources
import testscenarios
@ -35,6 +36,7 @@ from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
@debtcollector.removals.removed_class("DbFixture")
class DbFixture(fixtures.Fixture):
"""Basic database fixture.
@ -90,6 +92,7 @@ class DbFixture(fixtures.Fixture):
self.addCleanup(self.test.enginefacade.dispose_global)
@debtcollector.removals.removed_class("DbTestCase")
class DbTestCase(test_base.BaseTestCase):
"""Base class for testing of DB code.
@ -191,6 +194,7 @@ class DbTestCase(test_base.BaseTestCase):
"implemented within generate_schema().")
@debtcollector.removals.removed_class("OpportunisticTestCase")
class OpportunisticTestCase(DbTestCase):
"""Placeholder for backwards compatibility."""
@ -220,18 +224,22 @@ def backend_specific(*dialects):
return wrap
@debtcollector.removals.removed_class("MySQLOpportunisticFixture")
class MySQLOpportunisticFixture(DbFixture):
DRIVER = 'mysql'
@debtcollector.removals.removed_class("PostgreSQLOpportunisticFixture")
class PostgreSQLOpportunisticFixture(DbFixture):
DRIVER = 'postgresql'
@debtcollector.removals.removed_class("MySQLOpportunisticTestCase")
class MySQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = MySQLOpportunisticFixture
@debtcollector.removals.removed_class("PostgreSQLOpportunisticTestCase")
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = PostgreSQLOpportunisticFixture

View File

@ -0,0 +1,546 @@
# Copyright (c) 2016 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import logging
import testresources
from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
class ReplaceEngineFacadeFixture(fixtures.Fixture):
"""A fixture that will plug the engine of one enginefacade into another.
This fixture can be used by test suites that already have their own non-
oslo_db database setup / teardown schemes, to plug any URL or test-oriented
enginefacade as-is into an enginefacade-oriented API.
For applications that use oslo.db's testing fixtures, the
ReplaceEngineFacade fixture is used internally.
E.g.::
class MyDBTest(TestCase):
def setUp(self):
from myapplication.api import main_enginefacade
my_test_enginefacade = enginefacade.transaction_context()
my_test_enginefacade.configure(connection=my_test_url)
self.useFixture(
ReplaceEngineFacadeFixture(
main_enginefacade, my_test_enginefacade))
Above, the main_enginefacade object is the normal application level
one, and my_test_enginefacade is a local one that we've created to
refer to some testing database. Throughout the fixture's setup,
the application level enginefacade will use the engine factory and
engines of the testing enginefacade, and at fixture teardown will be
replaced back.
"""
def __init__(self, enginefacade, replace_with_enginefacade):
super(ReplaceEngineFacadeFixture, self).__init__()
self.enginefacade = enginefacade
self.replace_with_enginefacade = replace_with_enginefacade
def _setUp(self):
_reset_facade = self.enginefacade.patch_factory(
self.replace_with_enginefacade._factory
)
self.addCleanup(_reset_facade)
class BaseDbFixture(fixtures.Fixture):
"""Base database provisioning fixture.
This serves as the base class for the other fixtures, but by itself
does not implement _setUp(). It provides the basis for the flags
implemented by the various capability mixins (GenerateSchema,
DeletesFromSchema, etc.) as well as providing an abstraction over
the provisioning objects, which are specific to testresources.
Overall, consumers of this fixture just need to use the right classes
and the testresources mechanics are taken care of.
"""
DRIVER = "sqlite"
_DROP_SCHEMA_PER_TEST = True
_BUILD_SCHEMA = False
_BUILD_WITH_MIGRATIONS = False
_database_resources = {}
_db_not_available = {}
_schema_resources = {}
def __init__(self, driver=None, ident=None):
super(BaseDbFixture, self).__init__()
self.driver = driver or self.DRIVER
self.ident = ident or "default"
self.resource_key = (self.driver, self.__class__, self.ident)
def get_enginefacade(self):
"""Return an enginefacade._TransactionContextManager.
This is typically a global variable like "context_manager" declared
in the db/api.py module and is the object returned by
enginefacade.transaction_context().
If left not implemented, the global enginefacade manager is used.
For the case where a project uses per-object or per-test enginefacades
like Gnocchi, the get_per_test_enginefacade()
method should also be implemented.
"""
return enginefacade._context_manager
def get_per_test_enginefacade(self):
"""Return an enginefacade._TransactionContextManager per test.
This facade should be the one that the test expects the code to
use. Usually this is the same one returned by get_engineafacade()
which is the default. For special applications like Gnocchi,
this can be overridden to provide an instance-level facade.
"""
return self.get_enginefacade()
def _get_db_resource_not_available_reason(self):
return self._db_not_available.get(self.resource_key, None)
def _has_db_resource(self):
return self._database_resources.get(
self.resource_key, None) is not None
def _generate_schema_resource(self, database_resource):
return provision.SchemaResource(
database_resource,
None if not self._BUILD_SCHEMA
else self.generate_schema_create_all
if not self._BUILD_WITH_MIGRATIONS
else self.generate_schema_migrations,
self._DROP_SCHEMA_PER_TEST
)
def _get_resources(self):
key = self.resource_key
# the DatabaseResource and SchemaResource provision objects
# can be used by testresources as a marker outside of an individual
# test to indicate that this database / schema can be used across
# multiple tests. To make this work, many instances of this
# fixture have to return the *same* resource object given the same
# inputs. so we cache these in class-level dictionaries.
if key not in self._database_resources:
_enginefacade = self.get_enginefacade()
try:
self._database_resources[key] = \
self._generate_database_resource(_enginefacade)
except exception.BackendNotAvailable as bne:
self._database_resources[key] = None
self._db_not_available[key] = str(bne)
database_resource = self._database_resources[key]
if database_resource is None:
return []
else:
if key in self._schema_resources:
schema_resource = self._schema_resources[key]
else:
schema_resource = self._schema_resources[key] = \
self._generate_schema_resource(database_resource)
return [
('_schema_%s' % self.ident, schema_resource),
('_db_%s' % self.ident, database_resource)
]
class GeneratesSchema(object):
"""Mixin defining a fixture as generating a schema using create_all().
This is a "capability" mixin that works in conjunction with classes
that include BaseDbFixture as a base.
"""
_BUILD_SCHEMA = True
_BUILD_WITH_MIGRATIONS = False
def generate_schema_create_all(self, engine):
"""A hook which should generate the model schema using create_all().
This hook is called within the scope of creating the database
assuming BUILD_WITH_MIGRATIONS is False.
"""
class GeneratesSchemaFromMigrations(GeneratesSchema):
"""Mixin defining a fixture as generating a schema using migrations.
This is a "capability" mixin that works in conjunction with classes
that include BaseDbFixture as a base.
"""
_BUILD_WITH_MIGRATIONS = True
def generate_schema_migrations(self, engine):
"""A hook which should generate the model schema using migrations.
This hook is called within the scope of creating the database
assuming BUILD_WITH_MIGRATIONS is True.
"""
class ResetsData(object):
"""Mixin defining a fixture that resets schema data without dropping."""
_DROP_SCHEMA_PER_TEST = False
def setup_for_reset(self, engine, enginefacade):
""""Perform setup that may be needed before the test runs."""
def reset_schema_data(self, engine, enginefacade):
"""Reset the data in the schema."""
class DeletesFromSchema(ResetsData):
"""Mixin defining a fixture that can delete from all tables in place.
When DeletesFromSchema is present in a fixture,
_DROP_SCHEMA_PER_TEST is now False; this means that the
"teardown" flag of provision.SchemaResource will be False, which
prevents SchemaResource from dropping all objects within the schema
after each test.
This is a "capability" mixin that works in conjunction with classes
that include BaseDbFixture as a base.
"""
def reset_schema_data(self, engine, facade):
self.delete_from_schema(engine)
def delete_from_schema(self, engine):
"""A hook which should delete all data from an existing schema.
Should *not* drop any objects, just remove data from tables
that needs to be reset between tests.
"""
class RollsBackTransaction(ResetsData):
"""Fixture class that maintains a database transaction per test.
"""
def setup_for_reset(self, engine, facade):
conn = engine.connect()
engine = utils.NonCommittingEngine(conn)
self._reset_engine = enginefacade._TestTransactionFactory.apply_engine(
engine, facade)
def reset_schema_data(self, engine, facade):
self._reset_engine()
engine._dispose()
class SimpleDbFixture(BaseDbFixture):
"""Fixture which provides an engine from a fixed URL.
The SimpleDbFixture is generally appropriate only for a SQLite memory
database, as this database is naturally isolated from other processes and
does not require management of schemas. For tests that need to
run specifically against MySQL or Postgresql, the OpportunisticDbFixture
is more appropriate.
The database connection information itself comes from the provisoning
system, matching the desired driver (typically sqlite) to the default URL
that provisioning provides for this driver (in the case of sqlite, it's
the SQLite memory URL, e.g. sqlite://. For MySQL and Postgresql, it's
the familiar "openstack_citest" URL on localhost).
There are a variety of create/drop schemes that can take place:
* The default is to procure a database connection on setup,
and at teardown, an instruction is issued to "drop" all
objects in the schema (e.g. tables, indexes). The SQLAlchemy
engine itself remains referenced at the class level for subsequent
re-use.
* When the GeneratesSchema or GeneratesSchemaFromMigrations mixins
are implemented, the appropriate generate_schema method is also
called when the fixture is set up, by default this is per test.
* When the DeletesFromSchema mixin is implemented, the generate_schema
method is now only called **once**, and the "drop all objects"
system is replaced with the delete_from_schema method. This
allows the same database to remain set up with all schema objects
intact, so that expensive migrations need not be run on every test.
* The fixture does **not** dispose the engine at the end of a test.
It is assumed the same engine will be re-used many times across
many tests. The AdHocDbFixture extends this one to provide
engine.dispose() at the end of a test.
This fixture is intended to work without needing a reference to
the test itself, and therefore cannot take advantage of the
OptimisingTestSuite.
"""
_dependency_resources = {}
def _get_provisioned_db(self):
return self._dependency_resources["_db_%s" % self.ident]
def _generate_database_resource(self, _enginefacade):
return provision.DatabaseResource(self.driver, _enginefacade)
def _setUp(self):
super(SimpleDbFixture, self)._setUp()
cls = self.__class__
if "_db_%s" % self.ident not in cls._dependency_resources:
resources = self._get_resources()
# initialize resources the same way that testresources does.
for name, resource in resources:
cls._dependency_resources[name] = resource.getResource()
provisioned_db = self._get_provisioned_db()
if not self._DROP_SCHEMA_PER_TEST:
self.setup_for_reset(
provisioned_db.engine, provisioned_db.enginefacade)
self.useFixture(ReplaceEngineFacadeFixture(
self.get_per_test_enginefacade(),
provisioned_db.enginefacade
))
if not self._DROP_SCHEMA_PER_TEST:
self.addCleanup(
self.reset_schema_data,
provisioned_db.engine, provisioned_db.enginefacade)
self.addCleanup(self._cleanup)
def _teardown_resources(self):
for name, resource in self._get_resources():
dep = self._dependency_resources.pop(name)
resource.finishedWith(dep)
def _cleanup(self):
pass
class AdHocDbFixture(SimpleDbFixture):
""""Fixture which creates and disposes a database engine per test.
Also allows a specific URL to be passed, meaning the fixture can
be hardcoded to a specific SQLite file.
For a SQLite, this fixture will create the named database upon setup
and tear it down upon teardown. For other databases, the
database is assumed to exist already and will remain after teardown.
"""
def __init__(self, url=None):
if url:
self.url = provision.sa_url.make_url(str(url))
driver = self.url.get_backend_name()
else:
driver = None
self.url = None
BaseDbFixture.__init__(
self, driver=driver,
ident=provision._random_ident())
self.url = url
def _generate_database_resource(self, _enginefacade):
return provision.DatabaseResource(
self.driver, _enginefacade, ad_hoc_url=self.url)
def _cleanup(self):
self._teardown_resources()
class OpportunisticDbFixture(BaseDbFixture):
"""Fixture which uses testresources fully for optimised runs.
This fixture relies upon the use of the OpportunisticDBTestMixin to supply
a test.resources attribute, and also works much more effectively when
combined the testresources.OptimisingTestSuite. The
optimize_db_test_loader() function should be used at the module and package
levels to optimize database provisioning across many tests.
"""
def __init__(self, test, driver=None, ident=None):
super(OpportunisticDbFixture, self).__init__(
driver=driver, ident=ident)
self.test = test
def _get_provisioned_db(self):
return getattr(self.test, "_db_%s" % self.ident)
def _generate_database_resource(self, _enginefacade):
return provision.DatabaseResource(
self.driver, _enginefacade, provision_new_database=True)
def _setUp(self):
super(OpportunisticDbFixture, self)._setUp()
if not self._has_db_resource():
return
provisioned_db = self._get_provisioned_db()
if not self._DROP_SCHEMA_PER_TEST:
self.setup_for_reset(
provisioned_db.engine, provisioned_db.enginefacade)
self.useFixture(ReplaceEngineFacadeFixture(
self.get_per_test_enginefacade(),
provisioned_db.enginefacade
))
if not self._DROP_SCHEMA_PER_TEST:
self.addCleanup(
self.reset_schema_data,
provisioned_db.engine, provisioned_db.enginefacade)
class OpportunisticDBTestMixin(object):
"""Test mixin that integrates the test suite with testresources.
There are three goals to this system:
1. Allow creation of "stub" test suites that will run all the tests in a
parent suite against a specific kind of database (e.g. Mysql,
Postgresql), where the entire suite will be skipped if that target
kind of database is not available to the suite.
2. provide a test with a process-local, anonymously named schema within a
target database, so that the test can run concurrently with other tests
without conflicting data
3. provide compatibility with the testresources.OptimisingTestSuite, which
organizes TestCase instances ahead of time into groups that all
make use of the same type of database, setting up and tearing down
a database schema once for the scope of any number of tests within.
This technique is essential when testing against a non-SQLite database
because building of a schema is expensive, and also is most ideally
accomplished using the applications schema migration which are
even more vastly slow than a straight create_all().
This mixin provides the .resources attribute required by testresources when
using the OptimisingTestSuite.The .resources attribute then provides a
collection of testresources.TestResourceManager objects, which are defined
here in oslo_db.sqlalchemy.provision. These objects know how to find
available database backends, build up temporary databases, and invoke
schema generation and teardown instructions. The actual "build the schema
objects" part of the equation, and optionally a "delete from all the
tables" step, is provided by the implementing application itself.
"""
SKIP_ON_UNAVAILABLE_DB = True
FIXTURE = OpportunisticDbFixture
_collected_resources = None
_instantiated_fixtures = None
@property
def resources(self):
"""Provide a collection of TestResourceManager objects.
The collection here is memoized, both at the level of the test
case itself, as well as in the fixture object(s) which provide
those resources.
"""
if self._collected_resources is not None:
return self._collected_resources
fixtures = self._instantiate_fixtures()
self._collected_resources = []
for fixture in fixtures:
self._collected_resources.extend(fixture._get_resources())
return self._collected_resources
def setUp(self):
self._setup_fixtures()
super(OpportunisticDBTestMixin, self).setUp()
def _get_default_provisioned_db(self):
return self._db_default
def _instantiate_fixtures(self):
if self._instantiated_fixtures:
return self._instantiated_fixtures
self._instantiated_fixtures = utils.to_list(self.generate_fixtures())
return self._instantiated_fixtures
def generate_fixtures(self):
return self.FIXTURE(test=self)
def _setup_fixtures(self):
testresources.setUpResources(
self, self.resources, testresources._get_result())
self.addCleanup(
testresources.tearDownResources,
self, self.resources, testresources._get_result()
)
fixtures = self._instantiate_fixtures()
for fixture in fixtures:
self.useFixture(fixture)
if not fixture._has_db_resource():
msg = fixture._get_db_resource_not_available_reason()
if self.SKIP_ON_UNAVAILABLE_DB:
self.skip(msg)
else:
self.fail(msg)
class MySQLOpportunisticFixture(OpportunisticDbFixture):
DRIVER = 'mysql'
class PostgresqlOpportunisticFixture(OpportunisticDbFixture):
DRIVER = 'postgresql'

View File

@ -32,6 +32,7 @@ import sqlalchemy.types as types
from oslo_db._i18n import _LE
from oslo_db import exception as exc
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
@ -595,7 +596,9 @@ class ModelsMigrationsSync(object):
' for running of this test: %s' % e)
# drop all tables after a test run
self.addCleanup(functools.partial(self.db.backend.drop_all_objects,
backend = provision.Backend.backend_for_database_type(
self.get_engine().name)
self.addCleanup(functools.partial(backend.drop_all_objects,
self.get_engine()))
# run migration scripts

View File

@ -0,0 +1,43 @@
# Copyright (c) 2016 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy.test_base import backend_specific # noqa
from oslo_db.sqlalchemy import test_fixtures as db_fixtures
from oslotest import base as test_base
@enginefacade.transaction_context_provider
class Context(object):
pass
context = Context()
class DbTestCase(db_fixtures.OpportunisticDBTestMixin, test_base.BaseTestCase):
def setUp(self):
super(DbTestCase, self).setUp()
self.engine = enginefacade.writer.get_engine()
self.sessionmaker = enginefacade.writer.get_sessionmaker()
class MySQLOpportunisticTestCase(DbTestCase):
FIXTURE = db_fixtures.MySQLOpportunisticFixture
class PostgreSQLOpportunisticTestCase(DbTestCase):
FIXTURE = db_fixtures.PostgresqlOpportunisticFixture

View File

@ -24,8 +24,8 @@ from sqlalchemy.ext import declarative as sa_decl
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import test_base
from oslo_db import tests
from oslo_db.tests.sqlalchemy import base as test_base
class EventletTestMixin(object):

View File

@ -33,7 +33,7 @@ from oslo_db import options
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import engines as oslo_engines
from oslo_db.sqlalchemy import orm
from oslo_db.sqlalchemy import test_base
from oslo_db.tests.sqlalchemy import base as test_base
enginefacade.transaction_context_provider(oslo_context.RequestContext)

View File

@ -30,7 +30,7 @@ from sqlalchemy.orm import mapper
from oslo_db import exception
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import test_base
from oslo_db.tests.sqlalchemy import base as test_base
from oslo_db.tests import utils as test_utils
_TABLE_NAME = '__tmp__test__tmp__'

View File

@ -11,9 +11,12 @@
# under the License.
import mock
import testresources
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import test_base as legacy_test_base
from oslo_db.sqlalchemy import test_fixtures
from oslotest import base as oslo_test_base
@ -21,10 +24,12 @@ class BackendSkipTest(oslo_test_base.BaseTestCase):
def test_skip_no_dbapi(self):
class FakeDatabaseOpportunisticFixture(test_base.DbFixture):
class FakeDatabaseOpportunisticFixture(
test_fixtures.OpportunisticDbFixture):
DRIVER = 'postgresql'
class SomeTest(test_base.DbTestCase):
class SomeTest(test_fixtures.OpportunisticDBTestMixin,
oslo_test_base.BaseTestCase):
FIXTURE = FakeDatabaseOpportunisticFixture
def runTest(self):
@ -61,10 +66,13 @@ class BackendSkipTest(oslo_test_base.BaseTestCase):
def test_skip_no_such_backend(self):
class FakeDatabaseOpportunisticFixture(test_base.DbFixture):
class FakeDatabaseOpportunisticFixture(
test_fixtures.OpportunisticDbFixture):
DRIVER = 'postgresql+nosuchdbapi'
class SomeTest(test_base.DbTestCase):
class SomeTest(test_fixtures.OpportunisticDBTestMixin,
oslo_test_base.BaseTestCase):
FIXTURE = FakeDatabaseOpportunisticFixture
def runTest(self):
@ -81,3 +89,110 @@ class BackendSkipTest(oslo_test_base.BaseTestCase):
"Backend 'postgresql+nosuchdbapi' is unavailable: No such backend",
str(ex)
)
def test_skip_no_dbapi_legacy(self):
class FakeDatabaseOpportunisticFixture(
legacy_test_base.DbFixture):
DRIVER = 'postgresql'
class SomeTest(legacy_test_base.DbTestCase):
FIXTURE = FakeDatabaseOpportunisticFixture
def runTest(self):
pass
st = SomeTest()
# patch in replacement lookup dictionaries to avoid
# leaking from/to other tests
with mock.patch(
"oslo_db.sqlalchemy.provision."
"Backend.backends_by_database_type", {
"postgresql":
provision.Backend("postgresql", "postgresql://")}):
st._database_resources = {}
st._db_not_available = {}
st._schema_resources = {}
with mock.patch(
"sqlalchemy.create_engine",
mock.Mock(side_effect=ImportError())):
self.assertEqual([], st.resources)
ex = self.assertRaises(
self.skipException,
st.setUp
)
self.assertEqual(
"Backend 'postgresql' is unavailable: No DBAPI installed",
str(ex)
)
def test_skip_no_such_backend_legacy(self):
class FakeDatabaseOpportunisticFixture(
legacy_test_base.DbFixture):
DRIVER = 'postgresql+nosuchdbapi'
class SomeTest(legacy_test_base.DbTestCase):
FIXTURE = FakeDatabaseOpportunisticFixture
def runTest(self):
pass
st = SomeTest()
ex = self.assertRaises(
self.skipException,
st.setUp
)
self.assertEqual(
"Backend 'postgresql+nosuchdbapi' is unavailable: No such backend",
str(ex)
)
class EnginefacadeIntegrationTest(oslo_test_base.BaseTestCase):
def test_db_fixture(self):
normal_mgr = enginefacade.transaction_context()
normal_mgr.configure(
connection="sqlite://",
sqlite_fk=True,
mysql_sql_mode="FOOBAR",
max_overflow=38
)
class MyFixture(test_fixtures.OpportunisticDbFixture):
def get_enginefacade(self):
return normal_mgr
test = mock.Mock(SCHEMA_SCOPE=None)
fixture = MyFixture(test=test)
resources = fixture._get_resources()
testresources.setUpResources(test, resources, None)
self.addCleanup(
testresources.tearDownResources,
test, resources, None
)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
self.assertTrue(normal_mgr._factory._started)
test.engine = normal_mgr.writer.get_engine()
self.assertEqual("sqlite://", str(test.engine.url))
self.assertIs(test.engine, normal_mgr._factory._writer_engine)
engine_args = normal_mgr._factory._engine_args_for_conf(None)
self.assertTrue(engine_args['sqlite_fk'])
self.assertEqual("FOOBAR", engine_args["mysql_sql_mode"])
self.assertEqual(38, engine_args["max_overflow"])
fixture.cleanUp()
fixture._clear_cleanups() # so the real cleanUp works
self.assertFalse(normal_mgr._factory._started)

View File

@ -24,7 +24,7 @@ import sqlalchemy
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import migration
from oslo_db.sqlalchemy import test_base
from oslo_db.tests.sqlalchemy import base as test_base
from oslo_db.tests import utils as test_utils

View File

@ -23,8 +23,8 @@ import sqlalchemy as sa
import sqlalchemy.ext.declarative as sa_decl
from oslo_db import exception as exc
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import test_migrations as migrate
from oslo_db.tests.sqlalchemy import base as test_base
class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):

View File

@ -21,7 +21,7 @@ from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import test_base
from oslo_db.tests.sqlalchemy import base as test_base
BASE = declarative_base()

View File

@ -11,6 +11,8 @@
# under the License.
import mock
import os
from oslotest import base as oslo_test_base
from sqlalchemy import exc as sa_exc
from sqlalchemy import inspect
@ -18,8 +20,11 @@ from sqlalchemy import schema
from sqlalchemy import types
from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import test_fixtures
from oslo_db.sqlalchemy import utils
from oslo_db.tests.sqlalchemy import base as test_base
class DropAllObjectsTest(test_base.DbTestCase):
@ -66,7 +71,8 @@ class DropAllObjectsTest(test_base.DbTestCase):
set(insp.get_table_names())
)
self.db.backend.drop_all_objects(self.engine)
self._get_default_provisioned_db().\
backend.drop_all_objects(self.engine)
insp = inspect(self.engine)
self.assertEqual(
@ -167,16 +173,18 @@ class RetainSchemaTest(oslo_test_base.BaseTestCase):
def _run_test(self):
try:
database_resource = provision.DatabaseResource(self.DRIVER)
database_resource = provision.DatabaseResource(
self.DRIVER, provision_new_database=True)
except exception.BackendNotAvailable:
self.skip("database not available")
schema_resource = provision.SchemaResource(
database_resource, self._gen_schema)
transaction_resource = provision.TransactionResource(
database_resource, schema_resource)
engine = transaction_resource.getResource()
schema = schema_resource.getResource()
conn = schema.database.engine.connect()
engine = utils.NonCommittingEngine(conn)
with engine.connect() as conn:
rows = conn.execute(self.test_table.select())
@ -202,7 +210,8 @@ class RetainSchemaTest(oslo_test_base.BaseTestCase):
rows = conn.execute(self.test_table.select())
self.assertEqual([(2, 3)], rows.fetchall())
transaction_resource.finishedWith(engine)
engine._dispose()
schema_resource.finishedWith(schema)
class MySQLRetainSchemaTest(RetainSchemaTest):
@ -211,3 +220,45 @@ class MySQLRetainSchemaTest(RetainSchemaTest):
class PostgresqlRetainSchemaTest(RetainSchemaTest):
DRIVER = "postgresql"
class AdHocURLTest(oslo_test_base.BaseTestCase):
def test_sqlite_setup_teardown(self):
fixture = test_fixtures.AdHocDbFixture("sqlite:///foo.db")
fixture.setUp()
self.assertEqual(
str(enginefacade._context_manager._factory._writer_engine.url),
"sqlite:///foo.db"
)
self.assertTrue(os.path.exists("foo.db"))
fixture.cleanUp()
self.assertFalse(os.path.exists("foo.db"))
def test_mysql_setup_teardown(self):
try:
mysql_backend = provision.Backend.backend_for_database_type(
"mysql")
except exception.BackendNotAvailable:
self.skip("mysql backend not available")
mysql_backend.create_named_database("adhoc_test")
self.addCleanup(
mysql_backend.drop_named_database, "adhoc_test"
)
url = str(mysql_backend.provisioned_database_url("adhoc_test"))
fixture = test_fixtures.AdHocDbFixture(url)
fixture.setUp()
self.assertEqual(
str(enginefacade._context_manager._factory._writer_engine.url),
url
)
fixture.cleanUp()

View File

@ -33,10 +33,11 @@ from sqlalchemy.ext.declarative import declarative_base
from oslo_db import exception
from oslo_db import options as db_options
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
from oslo_db.tests.sqlalchemy import base as test_base
BASE = declarative_base()
@ -65,8 +66,8 @@ class RegexpFilterTestCase(test_base.DbTestCase):
self.addCleanup(test_table.drop)
def _test_regexp_filter(self, regexp, expected):
_session = self.sessionmaker()
with _session.begin():
with enginefacade.writer.using(test_base.context):
_session = test_base.context.session
for i in ['10', '20', u'']:
tbl = RegexpTable()
tbl.update({'bar': i})

View File

@ -18,8 +18,8 @@ from sqlalchemy.ext.declarative import declarative_base
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import types
from oslo_db.tests.sqlalchemy import base as test_base
BASE = declarative_base()

View File

@ -17,8 +17,8 @@ from sqlalchemy import schema
from sqlalchemy import sql
from sqlalchemy import types as sqltypes
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import update_match
from oslo_db.tests.sqlalchemy import base as test_base
Base = declarative.declarative_base()

View File

@ -42,8 +42,8 @@ from oslo_db.sqlalchemy.compat import utils as compat_utils
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base as db_test_base
from oslo_db.sqlalchemy import utils
from oslo_db.tests.sqlalchemy import base as db_test_base
from oslo_db.tests import utils as test_utils

View File

@ -0,0 +1,5 @@
---
deprecations:
- base test classes from ``oslo_db.sqlalchemy.test_base`` are deprecated in
favor of new fixtures introduced in ``oslo_db.sqlalchemy.test_fixtures``
module