Merge "Remove neutron.tests.sub_base"

This commit is contained in:
Jenkins 2015-03-28 04:03:01 +00:00 committed by Gerrit Code Review
commit cc60ff21f8
3 changed files with 13 additions and 170 deletions

View File

@ -19,8 +19,8 @@ import netaddr
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.tests.common import base from neutron.tests import base as tests_base
from neutron.tests import sub_base from neutron.tests.common import base as common_base
from neutron.tests import tools from neutron.tests import tools
NS_PREFIX = 'func-' NS_PREFIX = 'func-'
@ -31,8 +31,8 @@ VETH1_PREFIX = 'test-veth1'
def get_rand_port_name(): def get_rand_port_name():
return sub_base.get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN, return tests_base.get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN,
prefix=PORT_PREFIX) prefix=PORT_PREFIX)
def increment_ip_cidr(ip_cidr, offset=1): def increment_ip_cidr(ip_cidr, offset=1):
@ -95,7 +95,7 @@ class VethFixture(fixtures.Fixture):
name1 = name0.replace(VETH0_PREFIX, VETH1_PREFIX) name1 = name0.replace(VETH0_PREFIX, VETH1_PREFIX)
return ip_wrapper.add_veth(name0, name1) return ip_wrapper.add_veth(name0, name1)
self.ports = base.create_resource(VETH0_PREFIX, _create_veth) self.ports = common_base.create_resource(VETH0_PREFIX, _create_veth)
self.addCleanup(self.destroy) self.addCleanup(self.destroy)
def destroy(self): def destroy(self):

View File

@ -19,12 +19,12 @@ import fixtures
import six import six
from neutron.common import constants from neutron.common import constants
from neutron.tests import base
from neutron.tests.common import helpers as c_helpers from neutron.tests.common import helpers as c_helpers
from neutron.tests.functional.agent.linux import helpers from neutron.tests.functional.agent.linux import helpers
from neutron.tests import sub_base
class ConfigDict(sub_base.AttributeDict): class ConfigDict(base.AttributeDict):
def update(self, other): def update(self, other):
self.convert_to_attr_dict(other) self.convert_to_attr_dict(other)
super(ConfigDict, self).update(other) super(ConfigDict, self).update(other)
@ -36,8 +36,8 @@ class ConfigDict(sub_base.AttributeDict):
""" """
for key, value in other.iteritems(): for key, value in other.iteritems():
if isinstance(value, dict): if isinstance(value, dict):
if not isinstance(value, sub_base.AttributeDict): if not isinstance(value, base.AttributeDict):
other[key] = sub_base.AttributeDict(value) other[key] = base.AttributeDict(value)
self.convert_to_attr_dict(value) self.convert_to_attr_dict(value)
@ -122,7 +122,7 @@ class NeutronConfigFixture(ConfigFixture):
}) })
def _generate_host(self): def _generate_host(self):
return sub_base.get_rand_name(prefix='host-') return base.get_rand_name(prefix='host-')
def _generate_state_path(self, temp_dir): def _generate_state_path(self, temp_dir):
# Assume that temp_dir will be removed by the caller # Assume that temp_dir will be removed by the caller
@ -174,10 +174,10 @@ class ML2ConfigFixture(ConfigFixture):
def _generate_bridge_mappings(self): def _generate_bridge_mappings(self):
return ('physnet1:%s' % return ('physnet1:%s' %
sub_base.get_rand_name( base.get_rand_name(
prefix='br-eth', prefix='br-eth',
max_length=constants.DEVICE_NAME_MAX_LEN)) max_length=constants.DEVICE_NAME_MAX_LEN))
def _generate_integration_bridge(self): def _generate_integration_bridge(self):
return sub_base.get_rand_name(prefix='br-int', return base.get_rand_name(prefix='br-int',
max_length=constants.DEVICE_NAME_MAX_LEN) max_length=constants.DEVICE_NAME_MAX_LEN)

View File

@ -1,157 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base test case for all tests.
To change behavoir only for tests that do not rely on Tempest, please
target the neutron.tests.base module instead.
There should be no non-test Neutron imports in this module to ensure
that the functional API tests can import Tempest without triggering
errors due to duplicate configuration definitions.
"""
import contextlib
import logging as std_logging
import os
import os.path
import random
import traceback
import eventlet.timeout
import fixtures
import mock
from oslo_utils import strutils
import testtools
from neutron.tests import post_mortem_debug
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
def get_rand_name(max_length=None, prefix='test'):
name = prefix + str(random.randint(1, 0x7fffffff))
return name[:max_length] if max_length is not None else name
def bool_from_env(key, strict=False, default=False):
value = os.environ.get(key)
return strutils.bool_from_string(value, strict=strict, default=default)
class AttributeDict(dict):
"""
Provide attribute access (dict.key) to dictionary values.
"""
def __getattr__(self, name):
"""Allow attribute access for all keys in the dict."""
if name in self:
return self[name]
raise AttributeError(_("Unknown attribute '%s'.") % name)
class SubBaseTestCase(testtools.TestCase):
def setUp(self):
super(SubBaseTestCase, self).setUp()
# Configure this first to ensure pm debugging support for setUp()
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
if debugger:
self.addOnException(post_mortem_debug.get_exception_handler(
debugger))
if bool_from_env('OS_DEBUG'):
_level = std_logging.DEBUG
else:
_level = std_logging.INFO
capture_logs = bool_from_env('OS_LOG_CAPTURE')
if not capture_logs:
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
self.log_fixture = self.useFixture(
fixtures.FakeLogger(
format=LOG_FORMAT,
level=_level,
nuke_handlers=capture_logs,
))
test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
if test_timeout == -1:
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
# If someone does use tempfile directly, ensure that it's cleaned up
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.addCleanup(mock.patch.stopall)
if bool_from_env('OS_STDOUT_CAPTURE'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if bool_from_env('OS_STDERR_CAPTURE'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.addOnException(self.check_for_systemexit)
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
self.fail("A SystemExit was raised during the test. %s"
% traceback.format_exception(*exc_info))
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.timeout.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def assertOrderedEqual(self, expected, actual):
expect_val = self.sort_dict_lists(expected)
actual_val = self.sort_dict_lists(actual)
self.assertEqual(expect_val, actual_val)
def sort_dict_lists(self, dic):
for key, value in dic.iteritems():
if isinstance(value, list):
dic[key] = sorted(value)
elif isinstance(value, dict):
dic[key] = self.sort_dict_lists(value)
return dic
def assertDictSupersetOf(self, expected_subset, actual_superset):
"""Checks that actual dict contains the expected dict.
After checking that the arguments are of the right type, this checks
that each item in expected_subset is in, and matches, what is in
actual_superset. Separate tests are done, so that detailed info can
be reported upon failure.
"""
if not isinstance(expected_subset, dict):
self.fail("expected_subset (%s) is not an instance of dict" %
type(expected_subset))
if not isinstance(actual_superset, dict):
self.fail("actual_superset (%s) is not an instance of dict" %
type(actual_superset))
for k, v in expected_subset.items():
self.assertIn(k, actual_superset)
self.assertEqual(v, actual_superset[k],
"Key %(key)s expected: %(exp)r, actual %(act)r" %
{'key': k, 'exp': v, 'act': actual_superset[k]})