You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
313 lines
9.9 KiB
Python
313 lines
9.9 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import six
|
|
import testresources
|
|
import testscenarios
|
|
import testtools
|
|
|
|
from neutron_lib.db import api as db_api
|
|
from neutron_lib import fixture as lib_fixtures
|
|
from oslo_config import cfg
|
|
from oslo_db import exception as oslodb_exception
|
|
from oslo_db.sqlalchemy import provision
|
|
|
|
from neutron.db.migration import cli as migration
|
|
# Import all data models
|
|
from neutron.db.migration.models import head # noqa
|
|
from neutron.tests import base
|
|
from neutron import wsgi
|
|
|
|
|
|
class ExpectedException(testtools.ExpectedException):
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if super(ExpectedException, self).__exit__(exc_type,
|
|
exc_value,
|
|
traceback):
|
|
self.exception = exc_value
|
|
return True
|
|
return False
|
|
|
|
|
|
def create_request(path, body, content_type, method='GET',
|
|
query_string=None, context=None, headers=None):
|
|
headers = headers or {}
|
|
if query_string:
|
|
url = "%s?%s" % (path, query_string)
|
|
else:
|
|
url = path
|
|
req = wsgi.Request.blank(url)
|
|
req.method = method
|
|
req.headers = {}
|
|
req.headers['Accept'] = content_type
|
|
req.headers.update(headers)
|
|
if isinstance(body, six.text_type):
|
|
req.body = body.encode()
|
|
else:
|
|
req.body = body
|
|
if context:
|
|
req.environ['neutron.context'] = context
|
|
return req
|
|
|
|
|
|
class StaticSqlFixtureNoSchema(lib_fixtures.SqlFixture):
|
|
"""Fixture which keeps a single sqlite memory database at the global
|
|
scope
|
|
|
|
"""
|
|
|
|
_GLOBAL_RESOURCES = False
|
|
|
|
@classmethod
|
|
def _init_resources(cls):
|
|
if cls._GLOBAL_RESOURCES:
|
|
return
|
|
else:
|
|
cls._GLOBAL_RESOURCES = True
|
|
cls.database_resource = provision.DatabaseResource(
|
|
"sqlite", db_api.get_context_manager())
|
|
dependency_resources = {}
|
|
for name, resource in cls.database_resource.resources:
|
|
dependency_resources[name] = resource.getResource()
|
|
cls.engine = dependency_resources['backend'].engine
|
|
|
|
def _delete_from_schema(self, engine):
|
|
pass
|
|
|
|
|
|
class OpportunisticSqlFixture(lib_fixtures.SqlFixture):
|
|
"""Fixture which uses testresources with oslo_db provisioning to
|
|
check for available backends and optimize test runs.
|
|
|
|
Requires that the test itself implement the resources attribute.
|
|
|
|
"""
|
|
|
|
DRIVER = 'sqlite'
|
|
|
|
def __init__(self, test):
|
|
super(OpportunisticSqlFixture, self).__init__()
|
|
self.test = test
|
|
|
|
@classmethod
|
|
def _generate_schema_w_migrations(cls, engine):
|
|
alembic_configs = migration.get_alembic_configs()
|
|
with engine.connect() as conn:
|
|
for alembic_config in alembic_configs:
|
|
alembic_config.attributes['connection'] = conn
|
|
alembic_config.neutron_config = cfg.CONF
|
|
alembic_config.neutron_config.set_override(
|
|
'connection', str(engine.url), group='database')
|
|
migration.do_alembic_command(
|
|
alembic_config, 'upgrade', 'heads')
|
|
|
|
def _delete_from_schema(self, engine):
|
|
if self.test.BUILD_SCHEMA:
|
|
super(OpportunisticSqlFixture, self)._delete_from_schema(engine)
|
|
|
|
def _init_resources(self):
|
|
testresources.setUpResources(
|
|
self.test, self.test.resources, testresources._get_result())
|
|
self.addCleanup(
|
|
testresources.tearDownResources,
|
|
self.test, self.test.resources, testresources._get_result()
|
|
)
|
|
|
|
# unfortunately, fixtures won't let us call a skip() from
|
|
# here. So the test has to check this also.
|
|
# see https://github.com/testing-cabal/fixtures/issues/31
|
|
if hasattr(self.test, 'db'):
|
|
self.engine = self.test.engine = self.test.db.engine
|
|
|
|
@classmethod
|
|
def resources_collection(cls, test):
|
|
# reimplement current oslo.db code.
|
|
# FIXME(zzzeek) The patterns here are up in the air enough
|
|
# that I think keeping this totally separate will give us the
|
|
# most leverage in being able to fix oslo.db in an upcoming
|
|
# release, then port neutron back to the working version.
|
|
|
|
driver = test.DRIVER
|
|
|
|
if driver not in test._database_resources:
|
|
try:
|
|
test._database_resources[driver] = \
|
|
provision.DatabaseResource(driver)
|
|
except oslodb_exception.BackendNotAvailable:
|
|
test._database_resources[driver] = None
|
|
|
|
database_resource = test._database_resources[driver]
|
|
if database_resource is None:
|
|
return []
|
|
|
|
key = (driver, None)
|
|
if test.BUILD_SCHEMA:
|
|
if key not in test._schema_resources:
|
|
test._schema_resources[key] = provision.SchemaResource(
|
|
database_resource,
|
|
cls._generate_schema_w_migrations
|
|
if test.BUILD_WITH_MIGRATIONS
|
|
else cls._generate_schema, teardown=False)
|
|
|
|
schema_resource = test._schema_resources[key]
|
|
return [
|
|
('schema', schema_resource),
|
|
('db', database_resource)
|
|
]
|
|
else:
|
|
return [
|
|
('db', database_resource)
|
|
]
|
|
|
|
|
|
class BaseSqlTestCase(object):
|
|
BUILD_SCHEMA = True
|
|
|
|
def setUp(self):
|
|
super(BaseSqlTestCase, self).setUp()
|
|
|
|
self._setup_database_fixtures()
|
|
|
|
def _setup_database_fixtures(self):
|
|
if self.BUILD_SCHEMA:
|
|
fixture = lib_fixtures.StaticSqlFixture()
|
|
else:
|
|
fixture = StaticSqlFixtureNoSchema()
|
|
self.useFixture(fixture)
|
|
self.engine = fixture.engine
|
|
|
|
|
|
class SqlTestCaseLight(BaseSqlTestCase, base.DietTestCase):
|
|
"""All SQL taste, zero plugin/rpc sugar"""
|
|
|
|
|
|
class SqlTestCase(BaseSqlTestCase, base.BaseTestCase):
|
|
"""regular sql test"""
|
|
|
|
|
|
class OpportunisticDBTestMixin(object):
|
|
"""Mixin that converts a BaseSqlTestCase to use the
|
|
OpportunisticSqlFixture.
|
|
"""
|
|
|
|
SKIP_ON_UNAVAILABLE_DB = not base.bool_from_env('OS_FAIL_ON_MISSING_DEPS')
|
|
|
|
FIXTURE = OpportunisticSqlFixture
|
|
|
|
BUILD_WITH_MIGRATIONS = False
|
|
|
|
def _setup_database_fixtures(self):
|
|
self.useFixture(self.FIXTURE(self))
|
|
|
|
if not hasattr(self, 'db'):
|
|
msg = "backend '%s' unavailable" % self.DRIVER
|
|
if self.SKIP_ON_UNAVAILABLE_DB:
|
|
self.skipTest(msg)
|
|
else:
|
|
self.fail(msg)
|
|
|
|
_schema_resources = {}
|
|
_database_resources = {}
|
|
|
|
@property
|
|
def resources(self):
|
|
"""this attribute is used by testresources for optimized
|
|
sorting of tests.
|
|
|
|
This is the big requirement that allows testresources to sort
|
|
tests such that database "resources" can be kept open for
|
|
many tests at once.
|
|
|
|
IMO(zzzeek) "sorting" should not be needed; only that necessary
|
|
resources stay open as long as they are needed (or long enough to
|
|
reduce overhead). testresources would be improved to not depend on
|
|
custom, incompatible-with-pytest "suite classes", fixture information
|
|
leaking out of the Fixture classes themselves, and exotic sorting
|
|
schemes for something that can nearly always be handled "good enough"
|
|
with unittest-standard setupclass/setupmodule schemes.
|
|
|
|
"""
|
|
|
|
return self.FIXTURE.resources_collection(self)
|
|
|
|
|
|
class MySQLTestCaseMixin(OpportunisticDBTestMixin):
|
|
"""Mixin that turns any BaseSqlTestCase into a MySQL test suite.
|
|
|
|
If the MySQL db is unavailable then this test is skipped, unless
|
|
OS_FAIL_ON_MISSING_DEPS is enabled.
|
|
"""
|
|
DRIVER = "mysql"
|
|
|
|
|
|
class PostgreSQLTestCaseMixin(OpportunisticDBTestMixin):
|
|
"""Mixin that turns any BaseSqlTestCase into a PostgresSQL test suite.
|
|
|
|
If the PostgreSQL db is unavailable then this test is skipped, unless
|
|
OS_FAIL_ON_MISSING_DEPS is enabled.
|
|
"""
|
|
DRIVER = "postgresql"
|
|
|
|
|
|
def module_load_tests(loader, found_tests, pattern):
|
|
"""Apply OptimisingTestSuite on a per-module basis.
|
|
|
|
FIXME(zzzeek): oslo.db provides this but the contract that
|
|
"pattern" should be None no longer seems to behave as it used
|
|
to at the module level, so this function needs to be added in this
|
|
form.
|
|
|
|
"""
|
|
|
|
result = testresources.OptimisingTestSuite()
|
|
found_tests = testscenarios.load_tests_apply_scenarios(
|
|
loader, found_tests, pattern)
|
|
result.addTest(found_tests)
|
|
return result
|
|
|
|
|
|
class WebTestCase(SqlTestCase):
|
|
fmt = 'json'
|
|
|
|
def setUp(self):
|
|
super(WebTestCase, self).setUp()
|
|
json_deserializer = wsgi.JSONDeserializer()
|
|
self._deserializers = {
|
|
'application/json': json_deserializer,
|
|
}
|
|
|
|
def deserialize(self, response):
|
|
ctype = 'application/%s' % self.fmt
|
|
data = self._deserializers[ctype].deserialize(response.body)['body']
|
|
return data
|
|
|
|
def serialize(self, data):
|
|
ctype = 'application/%s' % self.fmt
|
|
result = wsgi.Serializer().serialize(data, ctype)
|
|
return result
|
|
|
|
|
|
class SubDictMatch(object):
|
|
|
|
def __init__(self, sub_dict):
|
|
self.sub_dict = sub_dict
|
|
|
|
def __eq__(self, super_dict):
|
|
return all(item in super_dict.items()
|
|
for item in self.sub_dict.items())
|