b4ecdc9945
The neutron.tests.tools was recently made public and along with it the WarningsFixture. However the WarningsFixture as-is doesn't support filtering warnings from other modules. This patch address that by accepting an constructor arg to add other module regular expressions to filter. It also moves WarningsFixture to the fixture module where it seems to make the most sense. For a sample consumption patch see: https://review.openstack.org/#/c/651370/ Change-Id: I08c713916aef407ba108ed2c67c3e99892f5883a
241 lines
8.1 KiB
Python
241 lines
8.1 KiB
Python
# Copyright 2010-2011 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging as std_logging
|
|
import os
|
|
import os.path
|
|
import random
|
|
|
|
import fixtures
|
|
import mock
|
|
from oslo_config import cfg
|
|
from oslo_db import options as db_options
|
|
from oslo_utils import strutils
|
|
import pbr.version
|
|
import testtools
|
|
|
|
from neutron_lib._i18n import _
|
|
from neutron_lib import constants
|
|
from neutron_lib import exceptions
|
|
from neutron_lib import fixture
|
|
|
|
from neutron_lib.tests import _post_mortem_debug as post_mortem_debug
|
|
|
|
|
|
CONF = cfg.CONF
|
|
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
|
|
|
|
ROOTDIR = os.path.dirname(__file__)
|
|
ETCDIR = os.path.join(ROOTDIR, 'etc')
|
|
|
|
|
|
def etcdir(*p):
|
|
return os.path.join(ETCDIR, *p)
|
|
|
|
|
|
def fake_use_fatal_exceptions(*args):
|
|
return True
|
|
|
|
|
|
def get_rand_name(max_length=None, prefix='test'):
|
|
"""Return a random string.
|
|
|
|
The string will start with 'prefix' and will be exactly 'max_length'.
|
|
If 'max_length' is None, then exactly 8 random characters, each
|
|
hexadecimal, will be added. In case len(prefix) <= len(max_length),
|
|
ValueError will be raised to indicate the problem.
|
|
"""
|
|
|
|
if max_length:
|
|
length = max_length - len(prefix)
|
|
if length <= 0:
|
|
raise ValueError("'max_length' must be bigger than 'len(prefix)'.")
|
|
|
|
suffix = ''.join(str(random.randint(0, 9)) for i in range(length))
|
|
else:
|
|
suffix = hex(random.randint(0x10000000, 0x7fffffff))[2:]
|
|
return prefix + suffix
|
|
|
|
|
|
def get_rand_device_name(prefix='test'):
|
|
return get_rand_name(
|
|
max_length=constants.DEVICE_NAME_MAX_LEN, prefix=prefix)
|
|
|
|
|
|
def bool_from_env(key, strict=False, default=False):
|
|
value = os.environ.get(key)
|
|
return strutils.bool_from_string(value, strict=strict, default=default)
|
|
|
|
|
|
def get_test_timeout(default=0):
|
|
return int(os.environ.get('OS_TEST_TIMEOUT', default))
|
|
|
|
|
|
def sanitize_log_path(path):
|
|
# Sanitize the string so that its log path is shell friendly
|
|
return path.replace(' ', '-').replace('(', '_').replace(')', '_')
|
|
|
|
|
|
class AttributeDict(dict):
|
|
|
|
"""Provide attribute access (dict.key) to dictionary values."""
|
|
|
|
def __getattr__(self, name):
|
|
"""Allow attribute access for all keys in the dict."""
|
|
if name in self:
|
|
return self[name]
|
|
raise AttributeError(_("Unknown attribute '%s'.") % name)
|
|
|
|
|
|
class BaseTestCase(testtools.TestCase):
|
|
|
|
@staticmethod
|
|
def config_parse(conf=None, args=None):
|
|
"""Create the default configurations."""
|
|
if args is None:
|
|
args = []
|
|
args += ['--config-file', etcdir('neutron_lib.conf')]
|
|
if conf is None:
|
|
version_info = pbr.version.VersionInfo('neutron-lib')
|
|
cfg.CONF(args=args, project='neutron_lib',
|
|
version='%%(prog)s %s' % version_info.release_string())
|
|
else:
|
|
conf(args)
|
|
|
|
def setUp(self):
|
|
super(BaseTestCase, self).setUp()
|
|
self.useFixture(fixture.PluginDirectoryFixture())
|
|
|
|
# Enabling 'use_fatal_exceptions' allows us to catch string
|
|
# substitution format errors in exception messages.
|
|
mock.patch.object(exceptions.NeutronException, 'use_fatal_exceptions',
|
|
return_value=True).start()
|
|
|
|
db_options.set_defaults(cfg.CONF, connection='sqlite://')
|
|
|
|
self.useFixture(fixtures.MonkeyPatch(
|
|
'oslo_config.cfg.find_config_files',
|
|
lambda project=None, prog=None, extension=None: []))
|
|
|
|
self.setup_config()
|
|
|
|
# Configure this first to ensure pm debugging support for setUp()
|
|
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
|
|
if debugger:
|
|
self.addOnException(post_mortem_debug.get_exception_handler(
|
|
debugger))
|
|
|
|
# Make sure we see all relevant deprecation warnings when running tests
|
|
self.useFixture(fixture.WarningsFixture())
|
|
|
|
if bool_from_env('OS_DEBUG'):
|
|
_level = std_logging.DEBUG
|
|
else:
|
|
_level = std_logging.INFO
|
|
capture_logs = bool_from_env('OS_LOG_CAPTURE')
|
|
if not capture_logs:
|
|
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
|
|
self.log_fixture = self.useFixture(
|
|
fixtures.FakeLogger(
|
|
format=LOG_FORMAT,
|
|
level=_level,
|
|
nuke_handlers=capture_logs,
|
|
))
|
|
|
|
test_timeout = get_test_timeout()
|
|
if test_timeout == -1:
|
|
test_timeout = 0
|
|
if test_timeout > 0:
|
|
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
|
|
|
|
# If someone does use tempfile directly, ensure that it's cleaned up
|
|
self.useFixture(fixtures.NestedTempfile())
|
|
self.useFixture(fixtures.TempHomeDir())
|
|
|
|
self.addCleanup(mock.patch.stopall)
|
|
|
|
if bool_from_env('OS_STDOUT_CAPTURE'):
|
|
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
|
|
if bool_from_env('OS_STDERR_CAPTURE'):
|
|
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
|
|
|
|
self.addOnException(self.check_for_systemexit)
|
|
self.orig_pid = os.getpid()
|
|
|
|
def get_new_temp_dir(self):
|
|
"""Create a new temporary directory.
|
|
|
|
:returns fixtures.TempDir
|
|
"""
|
|
return self.useFixture(fixtures.TempDir())
|
|
|
|
def get_default_temp_dir(self):
|
|
"""Create a default temporary directory.
|
|
|
|
Returns the same directory during the whole test case.
|
|
|
|
:returns fixtures.TempDir
|
|
"""
|
|
if not hasattr(self, '_temp_dir'):
|
|
self._temp_dir = self.get_new_temp_dir()
|
|
return self._temp_dir
|
|
|
|
def check_for_systemexit(self, exc_info):
|
|
if isinstance(exc_info[1], SystemExit):
|
|
if os.getpid() != self.orig_pid:
|
|
# Subprocess - let it just exit
|
|
raise
|
|
# This makes sys.exit(0) still a failure
|
|
self.force_failure = True
|
|
|
|
def assertOrderedEqual(self, expected, actual):
|
|
expect_val = self.sort_dict_lists(expected)
|
|
actual_val = self.sort_dict_lists(actual)
|
|
self.assertEqual(expect_val, actual_val)
|
|
|
|
def sort_dict_lists(self, dic):
|
|
for key, value in dic.items():
|
|
if isinstance(value, list):
|
|
dic[key] = sorted(value)
|
|
elif isinstance(value, dict):
|
|
dic[key] = self.sort_dict_lists(value)
|
|
return dic
|
|
|
|
def assertDictSupersetOf(self, expected_subset, actual_superset):
|
|
"""Checks that actual dict contains the expected dict.
|
|
|
|
After checking that the arguments are of the right type, this checks
|
|
that each item in expected_subset is in, and matches, what is in
|
|
actual_superset. Separate tests are done, so that detailed info can
|
|
be reported upon failure.
|
|
"""
|
|
if not isinstance(expected_subset, dict):
|
|
self.fail("expected_subset (%s) is not an instance of dict" %
|
|
type(expected_subset))
|
|
if not isinstance(actual_superset, dict):
|
|
self.fail("actual_superset (%s) is not an instance of dict" %
|
|
type(actual_superset))
|
|
for k, v in expected_subset.items():
|
|
self.assertIn(k, actual_superset)
|
|
self.assertEqual(v, actual_superset[k],
|
|
"Key %(key)s expected: %(exp)r, actual %(act)r" %
|
|
{'key': k, 'exp': v, 'act': actual_superset[k]})
|
|
|
|
def setup_config(self, args=None):
|
|
"""Tests that need a non-default config can override this method."""
|
|
self.config_parse(args=args)
|