
Today neutron-lib has a SqlFixture that's public, but not used by anyone else. However, this fixture doesn't contain the same functionality as the fixture in neutron; so neutron breaks when using it as-is today. This patch preps for consumption of the SqlFixture(s) by: - Moving a few more of SqlFixtures from neutron into lib; these are used by consumers today. - Updates the existing SqlFixture to include some functionality that neutron requires. - Makes all these SqlFixtures private. This is temporary so that we can have a little time to test them in neutron and other's gates while making sure no one uses them from lib. Once we are done testing we will update them again in lib to be public and consume them. At that point some UTs can also be added with a release note. Neutron sample consumption patch: https://review.openstack.org/#/c/651907/ Change-Id: Iaefc83d852a8dca48569e543f535f7c49f5ad0db
414 lines
14 KiB
Python
414 lines
14 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
import warnings
|
|
|
|
import fixtures
|
|
import mock
|
|
from oslo_config import cfg
|
|
from oslo_db.sqlalchemy import enginefacade
|
|
from oslo_db.sqlalchemy import provision
|
|
from oslo_db.sqlalchemy import session
|
|
from oslo_messaging import conffixture
|
|
|
|
from neutron_lib.api import attributes
|
|
from neutron_lib.api import definitions
|
|
from neutron_lib.callbacks import manager
|
|
from neutron_lib.callbacks import registry
|
|
from neutron_lib.db import api as db_api
|
|
from neutron_lib.db import model_base
|
|
from neutron_lib.db import model_query
|
|
from neutron_lib.db import resource_extend
|
|
from neutron_lib.plugins import directory
|
|
from neutron_lib import rpc
|
|
from neutron_lib.tests.unit import fake_notifier
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
class PluginDirectoryFixture(fixtures.Fixture):
|
|
|
|
def __init__(self, plugin_directory=None):
|
|
super(PluginDirectoryFixture, self).__init__()
|
|
self.plugin_directory = (
|
|
plugin_directory or directory._PluginDirectory())
|
|
|
|
def _setUp(self):
|
|
self._orig_directory = directory._PLUGIN_DIRECTORY
|
|
directory._PLUGIN_DIRECTORY = self.plugin_directory
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
directory._PLUGIN_DIRECTORY = self._orig_directory
|
|
|
|
|
|
class CallbackRegistryFixture(fixtures.Fixture):
|
|
"""Callback registry fixture.
|
|
|
|
This class is intended to be used as a fixture within unit tests and
|
|
therefore consumers must register it using useFixture() within their
|
|
unit test class. The implementation optionally allows consumers to pass
|
|
in the CallbacksManager manager to use for your tests.
|
|
"""
|
|
|
|
def __init__(self, callback_manager=None):
|
|
"""Creates a new RegistryFixture.
|
|
|
|
:param callback_manager: If specified, the return value to use for
|
|
_get_callback_manager(). Otherwise a new instance of CallbacksManager
|
|
is used.
|
|
"""
|
|
super(CallbackRegistryFixture, self).__init__()
|
|
self.callback_manager = callback_manager or manager.CallbacksManager()
|
|
self.patcher = None
|
|
|
|
def _setUp(self):
|
|
self._orig_manager = registry._get_callback_manager()
|
|
self.patcher = mock.patch.object(
|
|
registry, '_get_callback_manager',
|
|
return_value=self.callback_manager)
|
|
self.patcher.start()
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
registry._CALLBACK_MANAGER = self._orig_manager
|
|
self.patcher.stop()
|
|
|
|
|
|
class _EnableSQLiteFKsFixture(fixtures.Fixture):
|
|
"""Turn SQLite PRAGMA foreign keys on and off for tests.
|
|
|
|
FIXME(zzzeek): figure out some way to get oslo.db test_base to honor
|
|
oslo_db.engines.create_engine() arguments like sqlite_fks as well
|
|
as handling that it needs to be turned off during drops.
|
|
|
|
"""
|
|
|
|
def __init__(self, engine):
|
|
self.engine = engine
|
|
|
|
def _setUp(self):
|
|
if self.engine.name == 'sqlite':
|
|
self.engine.execute("PRAGMA foreign_keys=ON")
|
|
|
|
def disable_fks():
|
|
with self.engine.connect() as conn:
|
|
conn.connection.rollback()
|
|
conn.execute("PRAGMA foreign_keys=OFF")
|
|
self.addCleanup(disable_fks)
|
|
|
|
|
|
class _SqlFixture(fixtures.Fixture):
|
|
|
|
# flag to indicate that the models have been loaded
|
|
_TABLES_ESTABLISHED = False
|
|
|
|
def _init_resources(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def generate_schema(cls, engine):
|
|
# Register all data models
|
|
if not _SqlFixture._TABLES_ESTABLISHED:
|
|
model_base.BASEV2.metadata.create_all(engine)
|
|
_SqlFixture._TABLES_ESTABLISHED = True
|
|
|
|
def delete_from_schema(self, engine):
|
|
with engine.begin() as conn:
|
|
for table in reversed(
|
|
model_base.BASEV2.metadata.sorted_tables):
|
|
conn.execute(table.delete())
|
|
|
|
def _setUp(self):
|
|
self.engine = db_api.CONTEXT_WRITER.get_engine()
|
|
self.generate_schema(self.engine)
|
|
self._init_resources()
|
|
|
|
self.sessionmaker = session.get_maker(self.engine)
|
|
|
|
_restore_factory = db_api.get_context_manager()._root_factory
|
|
|
|
self.enginefacade_factory = enginefacade._TestTransactionFactory(
|
|
self.engine, self.sessionmaker, from_factory=_restore_factory,
|
|
apply_global=False)
|
|
|
|
db_api.get_context_manager()._root_factory = self.enginefacade_factory
|
|
|
|
self.addCleanup(lambda: self.delete_from_schema(self.engine))
|
|
self.addCleanup(
|
|
lambda: setattr(
|
|
db_api.get_context_manager(),
|
|
"_root_factory", _restore_factory))
|
|
|
|
self.useFixture(_EnableSQLiteFKsFixture(self.engine))
|
|
|
|
|
|
class _StaticSqlFixture(_SqlFixture):
|
|
"""Fixture which keeps a single sqlite memory database at global scope."""
|
|
|
|
_GLOBAL_RESOURCES = False
|
|
|
|
@classmethod
|
|
def _init_resources(cls):
|
|
# this is a classlevel version of what testresources
|
|
# does w/ the resources attribute as well as the
|
|
# setUpResources() step (which requires a test instance, that
|
|
# SqlFixture does not have). Because this is a SQLite memory
|
|
# database, we don't actually tear it down, so we can keep
|
|
# it running throughout all tests.
|
|
if cls._GLOBAL_RESOURCES:
|
|
return
|
|
else:
|
|
cls._GLOBAL_RESOURCES = True
|
|
cls.schema_resource = provision.SchemaResource(
|
|
provision.DatabaseResource(
|
|
"sqlite", db_api.get_context_manager()),
|
|
cls.generate_schema, teardown=False)
|
|
dependency_resources = {}
|
|
for name, resource in cls.schema_resource.resources:
|
|
dependency_resources[name] = resource.getResource()
|
|
cls.schema_resource.make(dependency_resources)
|
|
cls.engine = dependency_resources['database'].engine
|
|
|
|
|
|
class APIDefinitionFixture(fixtures.Fixture):
|
|
"""Test fixture for testing neutron-lib API definitions.
|
|
|
|
Extension API definition RESOURCE_ATTRIBUTE_MAP dicts get updated as
|
|
part of standard extension processing/handling. While this behavior is
|
|
fine for service runtime, it can impact testing scenarios whereby test1
|
|
updates the attribute map (globally) and test2 doesn't expect the
|
|
test1 updates.
|
|
|
|
This fixture saves and restores 1 or more neutron-lib API definitions
|
|
attribute maps. It should be used anywhere multiple tests can be run
|
|
that might update an extension attribute map.
|
|
|
|
In addition the fixture backs up and restores the global attribute
|
|
RESOURCES base on the boolean value of its backup_global_resources
|
|
attribute.
|
|
"""
|
|
|
|
def __init__(self, *api_definitions):
|
|
"""Create a new instance.
|
|
|
|
Consumers can also control if the fixture should handle the global
|
|
attribute RESOURCE map using the backup_global_resources of the
|
|
fixture instance. If True the fixture will also handle
|
|
neutron_lib.api.attributes.RESOURCES.
|
|
|
|
:param api_definitions: Zero or more API definitions the fixture
|
|
should handle. If no api_definitions are passed, the default is
|
|
to handle all neutron_lib API definitions as well as the global
|
|
RESOURCES attribute map.
|
|
"""
|
|
self.definitions = api_definitions or definitions._ALL_API_DEFINITIONS
|
|
self._orig_attr_maps = {}
|
|
self._orig_resources = {}
|
|
self.backup_global_resources = True
|
|
|
|
def _setUp(self):
|
|
for api_def in self.definitions:
|
|
self._orig_attr_maps[api_def.ALIAS] = (
|
|
api_def, {k: copy.deepcopy(v)
|
|
for k, v in api_def.RESOURCE_ATTRIBUTE_MAP.items()})
|
|
if self.backup_global_resources:
|
|
for resource, attrs in attributes.RESOURCES.items():
|
|
self._orig_resources[resource] = copy.deepcopy(attrs)
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
# clear + repopulate so consumer refs don't change
|
|
for alias, def_and_map in self._orig_attr_maps.items():
|
|
api_def, attr_map = def_and_map[0], def_and_map[1]
|
|
api_def.RESOURCE_ATTRIBUTE_MAP.clear()
|
|
api_def.RESOURCE_ATTRIBUTE_MAP.update(attr_map)
|
|
if self.backup_global_resources:
|
|
attributes.RESOURCES.clear()
|
|
attributes.RESOURCES.update(self._orig_resources)
|
|
|
|
@classmethod
|
|
def all_api_definitions_fixture(cls):
|
|
"""Return a fixture that handles all neutron-lib api-defs."""
|
|
return APIDefinitionFixture(*tuple(definitions._ALL_API_DEFINITIONS))
|
|
|
|
|
|
class PlacementAPIClientFixture(fixtures.Fixture):
|
|
"""Placement API client fixture.
|
|
|
|
This class is intended to be used as a fixture within unit tests and
|
|
therefore consumers must register it using useFixture() within their
|
|
unit test class.
|
|
"""
|
|
|
|
def __init__(self, placement_api_client):
|
|
"""Creates a new PlacementAPIClientFixture.
|
|
|
|
:param placement_api_client: Placement API client object.
|
|
"""
|
|
super(PlacementAPIClientFixture, self).__init__()
|
|
self.placement_api_client = placement_api_client
|
|
|
|
def _setUp(self):
|
|
self.addCleanup(self._restore)
|
|
|
|
def mock_create_client():
|
|
self.placement_api_client.client = mock.Mock()
|
|
|
|
self._mock_create_client = mock.patch.object(
|
|
self.placement_api_client, '_create_client',
|
|
side_effect=mock_create_client)
|
|
self._mock_get = mock.patch.object(self.placement_api_client, '_get')
|
|
self._mock_post = mock.patch.object(self.placement_api_client, '_post')
|
|
self._mock_put = mock.patch.object(self.placement_api_client, '_put')
|
|
self._mock_delete = mock.patch.object(self.placement_api_client,
|
|
'_delete')
|
|
self._mock_create_client.start()
|
|
self.mock_get = self._mock_get.start()
|
|
self.mock_post = self._mock_post.start()
|
|
self.mock_put = self._mock_put.start()
|
|
self.mock_delete = self._mock_delete.start()
|
|
|
|
def _restore(self):
|
|
self._mock_create_client.stop()
|
|
self._mock_get.stop()
|
|
self._mock_post.stop()
|
|
self._mock_put.stop()
|
|
self._mock_delete.stop()
|
|
|
|
|
|
class DBRetryErrorsFixture(fixtures.Fixture):
|
|
|
|
def __init__(self, **retry_kwargs):
|
|
self._retry_kwargs = retry_kwargs
|
|
self._patchers = []
|
|
|
|
def _setUp(self):
|
|
for k, v in self._retry_kwargs.items():
|
|
patcher = mock.patch.object(db_api._retry_db_errors, k, new=v)
|
|
patcher.start()
|
|
self._patchers.append(patcher)
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
for p in self._patchers:
|
|
p.stop()
|
|
|
|
|
|
class DBAPIContextManagerFixture(fixtures.Fixture):
|
|
|
|
def __init__(self, mock_context_manager=mock.ANY):
|
|
self.cxt_manager = (mock.Mock() if mock_context_manager == mock.ANY
|
|
else mock_context_manager)
|
|
self._backup_mgr = None
|
|
|
|
def _setUp(self):
|
|
self._backup_mgr = db_api._CTX_MANAGER
|
|
db_api._CTX_MANAGER = self.cxt_manager
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
db_api._CTX_MANAGER = self._backup_mgr
|
|
|
|
|
|
class DBQueryHooksFixture(fixtures.Fixture):
|
|
|
|
def _setUp(self, query_hooks=None):
|
|
self._backup = model_query._model_query_hooks
|
|
model_query._model_query_hooks = query_hooks or {}
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
model_query._model_query_hooks = self._backup
|
|
|
|
|
|
class RPCFixture(fixtures.Fixture):
|
|
|
|
def _setUp(self):
|
|
# don't actually start RPC listeners when testing
|
|
mock.patch.object(rpc.Connection, 'consume_in_threads',
|
|
return_value=[]).start()
|
|
self.useFixture(fixtures.MonkeyPatch(
|
|
'oslo_messaging.Notifier', fake_notifier.FakeNotifier))
|
|
|
|
self.messaging_conf = conffixture.ConfFixture(CONF)
|
|
self.messaging_conf.transport_url = 'fake:/'
|
|
# NOTE(russellb) We want all calls to return immediately.
|
|
self.messaging_conf.response_timeout = 0
|
|
self.useFixture(self.messaging_conf)
|
|
|
|
self.addCleanup(rpc.cleanup)
|
|
rpc.init(CONF)
|
|
|
|
|
|
class DBResourceExtendFixture(fixtures.Fixture):
|
|
|
|
def __init__(self, extended_functions=None):
|
|
self.extended_functions = extended_functions or {}
|
|
|
|
def _setUp(self):
|
|
self._backup = copy.deepcopy(
|
|
resource_extend._resource_extend_functions)
|
|
resource_extend._resource_extend_functions = self.extended_functions
|
|
self.addCleanup(self._restore)
|
|
|
|
def _restore(self):
|
|
resource_extend._resource_extend_functions = self._backup
|
|
|
|
|
|
class OpenFixture(fixtures.Fixture):
|
|
"""Mock access to a specific file while preserving open for others."""
|
|
|
|
def __init__(self, filepath, contents=''):
|
|
self.path = filepath
|
|
self.contents = contents
|
|
|
|
def _setUp(self):
|
|
self.mock_open = mock.mock_open(read_data=self.contents)
|
|
self._orig_open = open
|
|
|
|
def replacement_open(name, *args, **kwargs):
|
|
if name == self.path:
|
|
return self.mock_open(name, *args, **kwargs)
|
|
return self._orig_open(name, *args, **kwargs)
|
|
|
|
self._patch = mock.patch('six.moves.builtins.open',
|
|
new=replacement_open)
|
|
self._patch.start()
|
|
self.addCleanup(self._patch.stop)
|
|
|
|
|
|
class WarningsFixture(fixtures.Fixture):
|
|
"""Filters out warnings during test runs."""
|
|
|
|
warning_types = (
|
|
DeprecationWarning, PendingDeprecationWarning, ImportWarning
|
|
)
|
|
|
|
def __init__(self, module_re=None):
|
|
"""Create a new WarningsFixture.
|
|
|
|
:param module_re: A list of regular expression strings that will be
|
|
used with filterwarnings. Multiple expressions are or'd together.
|
|
"""
|
|
self._modules = ['^neutron_lib\\.']
|
|
if module_re:
|
|
self._modules.extend(module_re)
|
|
|
|
def _setUp(self):
|
|
self.addCleanup(warnings.resetwarnings)
|
|
for wtype in self.warning_types:
|
|
warnings.filterwarnings(
|
|
"always", category=wtype, module='|'.join(self._modules))
|