# Copyright 2010-2011 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import logging as std_logging import os import os.path import random import fixtures import mock from oslo_config import cfg from oslo_db import options as db_options from oslo_utils import strutils import pbr.version import testtools from neutron_lib._i18n import _ from neutron_lib import constants from neutron_lib import exceptions from neutron_lib import fixture from neutron_lib.tests import _post_mortem_debug as post_mortem_debug from neutron_lib.tests import _tools as tools CONF = cfg.CONF LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" ROOTDIR = os.path.dirname(__file__) ETCDIR = os.path.join(ROOTDIR, 'etc') def etcdir(*p): return os.path.join(ETCDIR, *p) def fake_use_fatal_exceptions(*args): return True def get_rand_name(max_length=None, prefix='test'): """Return a random string. The string will start with 'prefix' and will be exactly 'max_length'. If 'max_length' is None, then exactly 8 random characters, each hexadecimal, will be added. In case len(prefix) <= len(max_length), ValueError will be raised to indicate the problem. """ if max_length: length = max_length - len(prefix) if length <= 0: raise ValueError("'max_length' must be bigger than 'len(prefix)'.") suffix = ''.join(str(random.randint(0, 9)) for i in range(length)) else: suffix = hex(random.randint(0x10000000, 0x7fffffff))[2:] return prefix + suffix def get_rand_device_name(prefix='test'): return get_rand_name( max_length=constants.DEVICE_NAME_MAX_LEN, prefix=prefix) def bool_from_env(key, strict=False, default=False): value = os.environ.get(key) return strutils.bool_from_string(value, strict=strict, default=default) def get_test_timeout(default=0): return int(os.environ.get('OS_TEST_TIMEOUT', default)) def sanitize_log_path(path): # Sanitize the string so that its log path is shell friendly return path.replace(' ', '-').replace('(', '_').replace(')', '_') class AttributeDict(dict): """Provide attribute access (dict.key) to dictionary values.""" def __getattr__(self, name): """Allow attribute access for all keys in the dict.""" if name in self: return self[name] raise AttributeError(_("Unknown attribute '%s'.") % name) class BaseTestCase(testtools.TestCase): @staticmethod def config_parse(conf=None, args=None): """Create the default configurations.""" if args is None: args = [] args += ['--config-file', etcdir('neutron_lib.conf')] if conf is None: version_info = pbr.version.VersionInfo('neutron-lib') cfg.CONF(args=args, project='neutron_lib', version='%%(prog)s %s' % version_info.release_string()) else: conf(args) def setUp(self): super(BaseTestCase, self).setUp() self.useFixture(fixture.PluginDirectoryFixture()) # Enabling 'use_fatal_exceptions' allows us to catch string # substitution format errors in exception messages. mock.patch.object(exceptions.NeutronException, 'use_fatal_exceptions', return_value=True).start() # Update the default QueuePool parameters. These can be tweaked by the # conf variables - max_pool_size, max_overflow and pool_timeout db_options.set_defaults( cfg.CONF, connection='sqlite://', max_pool_size=10, max_overflow=20, pool_timeout=10) self.useFixture(fixtures.MonkeyPatch( 'oslo_config.cfg.find_config_files', lambda project=None, prog=None, extension=None: [])) self.setup_config() # Configure this first to ensure pm debugging support for setUp() debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER') if debugger: self.addOnException(post_mortem_debug.get_exception_handler( debugger)) # Make sure we see all relevant deprecation warnings when running tests self.useFixture(tools.WarningsFixture()) if bool_from_env('OS_DEBUG'): _level = std_logging.DEBUG else: _level = std_logging.INFO capture_logs = bool_from_env('OS_LOG_CAPTURE') if not capture_logs: std_logging.basicConfig(format=LOG_FORMAT, level=_level) self.log_fixture = self.useFixture( fixtures.FakeLogger( format=LOG_FORMAT, level=_level, nuke_handlers=capture_logs, )) test_timeout = get_test_timeout() if test_timeout == -1: test_timeout = 0 if test_timeout > 0: self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) # If someone does use tempfile directly, ensure that it's cleaned up self.useFixture(fixtures.NestedTempfile()) self.useFixture(fixtures.TempHomeDir()) self.addCleanup(mock.patch.stopall) if bool_from_env('OS_STDOUT_CAPTURE'): stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) if bool_from_env('OS_STDERR_CAPTURE'): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) self.addOnException(self.check_for_systemexit) self.orig_pid = os.getpid() def get_new_temp_dir(self): """Create a new temporary directory. :returns fixtures.TempDir """ return self.useFixture(fixtures.TempDir()) def get_default_temp_dir(self): """Create a default temporary directory. Returns the same directory during the whole test case. :returns fixtures.TempDir """ if not hasattr(self, '_temp_dir'): self._temp_dir = self.get_new_temp_dir() return self._temp_dir def check_for_systemexit(self, exc_info): if isinstance(exc_info[1], SystemExit): if os.getpid() != self.orig_pid: # Subprocess - let it just exit raise # This makes sys.exit(0) still a failure self.force_failure = True def assertOrderedEqual(self, expected, actual): expect_val = self.sort_dict_lists(expected) actual_val = self.sort_dict_lists(actual) self.assertEqual(expect_val, actual_val) def sort_dict_lists(self, dic): for key, value in dic.items(): if isinstance(value, list): dic[key] = sorted(value) elif isinstance(value, dict): dic[key] = self.sort_dict_lists(value) return dic def assertDictSupersetOf(self, expected_subset, actual_superset): """Checks that actual dict contains the expected dict. After checking that the arguments are of the right type, this checks that each item in expected_subset is in, and matches, what is in actual_superset. Separate tests are done, so that detailed info can be reported upon failure. """ if not isinstance(expected_subset, dict): self.fail("expected_subset (%s) is not an instance of dict" % type(expected_subset)) if not isinstance(actual_superset, dict): self.fail("actual_superset (%s) is not an instance of dict" % type(actual_superset)) for k, v in expected_subset.items(): self.assertIn(k, actual_superset) self.assertEqual(v, actual_superset[k], "Key %(key)s expected: %(exp)r, actual %(act)r" % {'key': k, 'exp': v, 'act': actual_superset[k]}) def setup_config(self, args=None): """Tests that need a non-default config can override this method.""" self.config_parse(args=args)