Merge "Forbid importing neutron.tests.* from outside tests subtree"
This commit is contained in:
commit
be29217d82
@ -29,6 +29,7 @@ Neutron Specific Commandments
|
||||
- [N340] Check usage of <module>.i18n (and neutron.i18n)
|
||||
- [N341] Check usage of _ from python builtins
|
||||
- [N342] String interpolation should be delayed at logging calls.
|
||||
- [N343] Production code must not import from neutron.tests.*
|
||||
- [N344] Python 3: Do not use filter(lambda obj: test(obj), data). Replace it
|
||||
with [obj for obj in data if test(obj)].
|
||||
|
||||
|
@ -400,6 +400,15 @@ Where appropriate, exceptions can be added to the above script. If
|
||||
code is not part of the Neutron namespace, for example, it's probably
|
||||
reasonable to exclude their unit tests from the check.
|
||||
|
||||
|
||||
.. note ::
|
||||
|
||||
At no time should the production code import anything from testing subtree
|
||||
(neutron.tests). There are distributions that split out neutron.tests
|
||||
modules in a separate package that is not installed by default, making any
|
||||
code that relies on presence of the modules to fail. For example, RDO is one
|
||||
of those distributions.
|
||||
|
||||
Running Tests
|
||||
-------------
|
||||
|
||||
|
@ -37,7 +37,6 @@ from neutron.common import utils as common_utils
|
||||
from neutron.plugins.common import constants as const
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
|
||||
import constants as ovs_const
|
||||
from neutron.tests import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -48,14 +47,14 @@ MINIMUM_DIBBLER_VERSION = '1.0.1'
|
||||
|
||||
|
||||
def ovs_vxlan_supported(from_ip='192.0.2.1', to_ip='192.0.2.2'):
|
||||
name = base.get_rand_device_name(prefix='vxlantest-')
|
||||
name = common_utils.get_rand_device_name(prefix='vxlantest-')
|
||||
with ovs_lib.OVSBridge(name) as br:
|
||||
port = br.add_tunnel_port(from_ip, to_ip, const.TYPE_VXLAN)
|
||||
return port != ovs_lib.INVALID_OFPORT
|
||||
|
||||
|
||||
def ovs_geneve_supported(from_ip='192.0.2.3', to_ip='192.0.2.4'):
|
||||
name = base.get_rand_device_name(prefix='genevetest-')
|
||||
name = common_utils.get_rand_device_name(prefix='genevetest-')
|
||||
with ovs_lib.OVSBridge(name) as br:
|
||||
port = br.add_tunnel_port(from_ip, to_ip, const.TYPE_GENEVE)
|
||||
return port != ovs_lib.INVALID_OFPORT
|
||||
@ -63,14 +62,14 @@ def ovs_geneve_supported(from_ip='192.0.2.3', to_ip='192.0.2.4'):
|
||||
|
||||
def iproute2_vxlan_supported():
|
||||
ip = ip_lib.IPWrapper()
|
||||
name = base.get_rand_device_name(prefix='vxlantest-')
|
||||
name = common_utils.get_rand_device_name(prefix='vxlantest-')
|
||||
port = ip.add_vxlan(name, 3000)
|
||||
ip.del_veth(name)
|
||||
return name == port.name
|
||||
|
||||
|
||||
def patch_supported():
|
||||
name, peer_name, patch_name = base.get_related_rand_device_names(
|
||||
name, peer_name, patch_name = common_utils.get_related_rand_device_names(
|
||||
['patchtest-', 'peertest0-', 'peertest1-'])
|
||||
with ovs_lib.OVSBridge(name) as br:
|
||||
port = br.add_patch_port(patch_name, peer_name)
|
||||
@ -92,7 +91,7 @@ def ofctl_arg_supported(cmd, **kwargs):
|
||||
:param **kwargs: arguments to test with the command.
|
||||
:returns: a boolean if the supplied arguments are supported.
|
||||
"""
|
||||
br_name = base.get_rand_device_name(prefix='br-test-')
|
||||
br_name = common_utils.get_rand_device_name(prefix='br-test-')
|
||||
with ovs_lib.OVSBridge(br_name) as test_br:
|
||||
full_args = ["ovs-ofctl", cmd, test_br.br_name,
|
||||
ovs_lib._build_flow_expr_str(kwargs, cmd.split('-')[0])]
|
||||
@ -312,7 +311,7 @@ def keepalived_ipv6_supported():
|
||||
6. Verify if IPv6 default route is configured by keepalived.
|
||||
"""
|
||||
|
||||
br_name, ha_port, gw_port = base.get_related_rand_device_names(
|
||||
br_name, ha_port, gw_port = common_utils.get_related_rand_device_names(
|
||||
['ka-test-', ha_router.HA_DEV_PREFIX, namespaces.INTERNAL_DEV_PREFIX])
|
||||
gw_vip = 'fdf8:f53b:82e4::10/64'
|
||||
expected_default_gw = 'fe80:f816::1'
|
||||
@ -362,7 +361,7 @@ def ovsdb_native_supported():
|
||||
|
||||
|
||||
def ovs_conntrack_supported():
|
||||
br_name = base.get_rand_device_name(prefix="ovs-test-")
|
||||
br_name = common_utils.get_rand_device_name(prefix="ovs-test-")
|
||||
|
||||
with ovs_lib.OVSBridge(br_name) as br:
|
||||
try:
|
||||
|
@ -726,3 +726,42 @@ def import_modules_recursively(topdir):
|
||||
for dir_ in dirs:
|
||||
modules.extend(import_modules_recursively(dir_))
|
||||
return modules
|
||||
|
||||
|
||||
def get_rand_name(max_length=None, prefix='test'):
|
||||
"""Return a random string.
|
||||
|
||||
The string will start with 'prefix' and will be exactly 'max_length'.
|
||||
If 'max_length' is None, then exactly 8 random characters, each
|
||||
hexadecimal, will be added. In case len(prefix) <= len(max_length),
|
||||
ValueError will be raised to indicate the problem.
|
||||
"""
|
||||
return get_related_rand_names([prefix], max_length)[0]
|
||||
|
||||
|
||||
def get_rand_device_name(prefix='test'):
|
||||
return get_rand_name(
|
||||
max_length=n_const.DEVICE_NAME_MAX_LEN, prefix=prefix)
|
||||
|
||||
|
||||
def get_related_rand_names(prefixes, max_length=None):
|
||||
"""Returns a list of the prefixes with the same random characters appended
|
||||
|
||||
:param prefixes: A list of prefix strings
|
||||
:param max_length: The maximum length of each returned string
|
||||
:returns: A list with each prefix appended with the same random characters
|
||||
"""
|
||||
|
||||
if max_length:
|
||||
length = max_length - max(len(p) for p in prefixes)
|
||||
if length <= 0:
|
||||
raise ValueError("'max_length' must be longer than all prefixes")
|
||||
else:
|
||||
length = 8
|
||||
rndchrs = get_random_string(length)
|
||||
return [p + rndchrs for p in prefixes]
|
||||
|
||||
|
||||
def get_related_rand_device_names(prefixes):
|
||||
return get_related_rand_names(prefixes,
|
||||
max_length=n_const.DEVICE_NAME_MAX_LEN)
|
||||
|
@ -77,6 +77,10 @@ unittest_imports_dot = re.compile(r"\bimport[\s]+unittest\b")
|
||||
unittest_imports_from = re.compile(r"\bfrom[\s]+unittest\b")
|
||||
filter_match = re.compile(r".*filter\(lambda ")
|
||||
|
||||
tests_imports_dot = re.compile(r"\bimport[\s]+neutron.tests\b")
|
||||
tests_imports_from1 = re.compile(r"\bfrom[\s]+neutron.tests\b")
|
||||
tests_imports_from2 = re.compile(r"\bfrom[\s]+neutron[\s]+import[\s]+tests\b")
|
||||
|
||||
|
||||
@flake8ext
|
||||
def validate_log_translations(logical_line, physical_line, filename):
|
||||
@ -382,6 +386,23 @@ def check_delayed_string_interpolation(logical_line, filename, noqa):
|
||||
yield(0, msg)
|
||||
|
||||
|
||||
@flake8ext
|
||||
def check_no_imports_from_tests(logical_line, filename, noqa):
|
||||
"""N343 Production code must not import from neutron.tests.*
|
||||
"""
|
||||
msg = ("N343 Production code must not import from neutron.tests.*")
|
||||
|
||||
if noqa:
|
||||
return
|
||||
|
||||
if 'neutron/tests/' in filename:
|
||||
return
|
||||
|
||||
for regex in tests_imports_dot, tests_imports_from1, tests_imports_from2:
|
||||
if re.match(regex, logical_line):
|
||||
yield(0, msg)
|
||||
|
||||
|
||||
@flake8ext
|
||||
def check_python3_no_filter(logical_line):
|
||||
"""N344 - Use list comprehension instead of filter(lambda)."""
|
||||
@ -412,4 +433,5 @@ def factory(register):
|
||||
register(check_builtins_gettext)
|
||||
register(check_unittest_imports)
|
||||
register(check_delayed_string_interpolation)
|
||||
register(check_no_imports_from_tests)
|
||||
register(check_python3_no_filter)
|
||||
|
@ -20,12 +20,13 @@ import contextlib
|
||||
import gc
|
||||
import os
|
||||
import os.path
|
||||
import sys
|
||||
import weakref
|
||||
|
||||
from debtcollector import moves
|
||||
import eventlet.timeout
|
||||
import fixtures
|
||||
import mock
|
||||
from neutron_lib import constants
|
||||
from oslo_concurrency.fixture import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging import conffixture as messaging_conffixture
|
||||
@ -67,43 +68,14 @@ def fake_use_fatal_exceptions(*args):
|
||||
return True
|
||||
|
||||
|
||||
def get_related_rand_names(prefixes, max_length=None):
|
||||
"""Returns a list of the prefixes with the same random characters appended
|
||||
|
||||
:param prefixes: A list of prefix strings
|
||||
:param max_length: The maximum length of each returned string
|
||||
:returns: A list with each prefix appended with the same random characters
|
||||
"""
|
||||
|
||||
if max_length:
|
||||
length = max_length - max(len(p) for p in prefixes)
|
||||
if length <= 0:
|
||||
raise ValueError("'max_length' must be longer than all prefixes")
|
||||
else:
|
||||
length = 8
|
||||
rndchrs = utils.get_random_string(length)
|
||||
return [p + rndchrs for p in prefixes]
|
||||
|
||||
|
||||
def get_rand_name(max_length=None, prefix='test'):
|
||||
"""Return a random string.
|
||||
|
||||
The string will start with 'prefix' and will be exactly 'max_length'.
|
||||
If 'max_length' is None, then exactly 8 random characters, each
|
||||
hexadecimal, will be added. In case len(prefix) <= len(max_length),
|
||||
ValueError will be raised to indicate the problem.
|
||||
"""
|
||||
return get_related_rand_names([prefix], max_length)[0]
|
||||
|
||||
|
||||
def get_rand_device_name(prefix='test'):
|
||||
return get_rand_name(
|
||||
max_length=constants.DEVICE_NAME_MAX_LEN, prefix=prefix)
|
||||
|
||||
|
||||
def get_related_rand_device_names(prefixes):
|
||||
return get_related_rand_names(prefixes,
|
||||
max_length=constants.DEVICE_NAME_MAX_LEN)
|
||||
for _name in ('get_related_rand_names',
|
||||
'get_rand_name',
|
||||
'get_rand_device_name',
|
||||
'get_related_rand_device_names'):
|
||||
setattr(sys.modules[__name__], _name, moves.moved_function(
|
||||
getattr(utils, _name), _name, __name__,
|
||||
message='use "neutron.common.utils.%s" instead' % _name,
|
||||
version='Newton', removal_version='Ocata'))
|
||||
|
||||
|
||||
def bool_from_env(key, strict=False, default=False):
|
||||
|
@ -17,6 +17,7 @@ from neutron_lib import constants as n_const
|
||||
import testtools.testcase
|
||||
import unittest2.case
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.tests import base
|
||||
from neutron.tests import tools
|
||||
|
||||
@ -39,7 +40,7 @@ def create_resource(prefix, creation_func, *args, **kwargs):
|
||||
return creation_func(prefix, *args, **kwargs)
|
||||
|
||||
while True:
|
||||
name = base.get_rand_name(
|
||||
name = utils.get_rand_name(
|
||||
max_length=n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix=prefix)
|
||||
try:
|
||||
|
@ -44,7 +44,6 @@ from neutron.common import utils as common_utils
|
||||
from neutron.db import db_base_plugin_common
|
||||
from neutron.plugins.ml2.drivers.linuxbridge.agent import \
|
||||
linuxbridge_neutron_agent as linuxbridge_agent
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.tests.common import base as common_base
|
||||
from neutron.tests import tools
|
||||
|
||||
@ -226,11 +225,11 @@ def create_patch_ports(source, destination):
|
||||
:param source: Instance of OVSBridge
|
||||
:param destination: Instance of OVSBridge
|
||||
"""
|
||||
common = tests_base.get_rand_name(max_length=4, prefix='')
|
||||
common = common_utils.get_rand_name(max_length=4, prefix='')
|
||||
prefix = '%s-%s-' % (PATCH_PREFIX, common)
|
||||
|
||||
source_name = tests_base.get_rand_device_name(prefix=prefix)
|
||||
destination_name = tests_base.get_rand_device_name(prefix=prefix)
|
||||
source_name = common_utils.get_rand_device_name(prefix=prefix)
|
||||
destination_name = common_utils.get_rand_device_name(prefix=prefix)
|
||||
|
||||
source.add_patch_port(source_name, destination_name)
|
||||
destination.add_patch_port(destination_name, source_name)
|
||||
@ -586,9 +585,9 @@ class NamedVethFixture(VethFixture):
|
||||
@staticmethod
|
||||
def get_veth_name(name):
|
||||
if name.startswith(VETH0_PREFIX):
|
||||
return tests_base.get_rand_device_name(VETH0_PREFIX)
|
||||
return common_utils.get_rand_device_name(VETH0_PREFIX)
|
||||
if name.startswith(VETH1_PREFIX):
|
||||
return tests_base.get_rand_device_name(VETH1_PREFIX)
|
||||
return common_utils.get_rand_device_name(VETH1_PREFIX)
|
||||
return name
|
||||
|
||||
|
||||
@ -710,7 +709,7 @@ class OVSPortFixture(PortFixture):
|
||||
# because in some tests this port can be used to providing connection
|
||||
# between linuxbridge agents and vlan_id can be also added to this
|
||||
# device name it has to be max LB_DEVICE_NAME_MAX_LEN long
|
||||
port_name = tests_base.get_rand_name(
|
||||
port_name = common_utils.get_rand_name(
|
||||
LB_DEVICE_NAME_MAX_LEN,
|
||||
PORT_PREFIX
|
||||
)
|
||||
|
@ -19,8 +19,8 @@ import fixtures
|
||||
from neutron_lib import constants
|
||||
from neutronclient.common import exceptions
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
def _safe_method(f):
|
||||
@ -54,7 +54,7 @@ class ClientFixture(fixtures.Fixture):
|
||||
external_network=None):
|
||||
resource_type = 'router'
|
||||
|
||||
name = name or base.get_rand_name(prefix=resource_type)
|
||||
name = name or utils.get_rand_name(prefix=resource_type)
|
||||
spec = {'tenant_id': tenant_id, 'name': name, 'ha': ha}
|
||||
if external_network:
|
||||
spec['external_gateway_info'] = {"network_id": external_network}
|
||||
@ -64,7 +64,7 @@ class ClientFixture(fixtures.Fixture):
|
||||
def create_network(self, tenant_id, name=None, external=False):
|
||||
resource_type = 'network'
|
||||
|
||||
name = name or base.get_rand_name(prefix=resource_type)
|
||||
name = name or utils.get_rand_name(prefix=resource_type)
|
||||
spec = {'tenant_id': tenant_id, 'name': name}
|
||||
spec['router:external'] = external
|
||||
return self._create_resource(resource_type, spec)
|
||||
@ -74,7 +74,7 @@ class ClientFixture(fixtures.Fixture):
|
||||
ipv6_address_mode='slaac', ipv6_ra_mode='slaac'):
|
||||
resource_type = 'subnet'
|
||||
|
||||
name = name or base.get_rand_name(prefix=resource_type)
|
||||
name = name or utils.get_rand_name(prefix=resource_type)
|
||||
ip_version = netaddr.IPNetwork(cidr).version
|
||||
spec = {'tenant_id': tenant_id, 'network_id': network_id, 'name': name,
|
||||
'cidr': cidr, 'enable_dhcp': enable_dhcp,
|
||||
|
@ -17,8 +17,8 @@ import tempfile
|
||||
import fixtures
|
||||
from neutron_lib import constants
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.plugins.ml2.extensions import qos as qos_ext
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import config_fixtures
|
||||
from neutron.tests.common.exclusive_resources import port
|
||||
from neutron.tests.common import helpers as c_helpers
|
||||
@ -93,7 +93,7 @@ class NeutronConfigFixture(ConfigFixture):
|
||||
super(NeutronConfigFixture, self)._setUp()
|
||||
|
||||
def _generate_host(self):
|
||||
return base.get_rand_name(prefix='host-')
|
||||
return utils.get_rand_name(prefix='host-')
|
||||
|
||||
def _generate_state_path(self, temp_dir):
|
||||
# Assume that temp_dir will be removed by the caller
|
||||
@ -185,19 +185,19 @@ class OVSConfigFixture(ConfigFixture):
|
||||
super(OVSConfigFixture, self)._setUp()
|
||||
|
||||
def _generate_bridge_mappings(self):
|
||||
return 'physnet1:%s' % base.get_rand_device_name(prefix='br-eth')
|
||||
return 'physnet1:%s' % utils.get_rand_device_name(prefix='br-eth')
|
||||
|
||||
def _generate_integration_bridge(self):
|
||||
return base.get_rand_device_name(prefix='br-int')
|
||||
return utils.get_rand_device_name(prefix='br-int')
|
||||
|
||||
def _generate_tunnel_bridge(self):
|
||||
return base.get_rand_device_name(prefix='br-tun')
|
||||
return utils.get_rand_device_name(prefix='br-tun')
|
||||
|
||||
def _generate_int_peer(self):
|
||||
return base.get_rand_device_name(prefix='patch-tun')
|
||||
return utils.get_rand_device_name(prefix='patch-tun')
|
||||
|
||||
def _generate_tun_peer(self):
|
||||
return base.get_rand_device_name(prefix='patch-int')
|
||||
return utils.get_rand_device_name(prefix='patch-int')
|
||||
|
||||
def get_br_int_name(self):
|
||||
return self.config.ovs.integration_bridge
|
||||
@ -285,10 +285,10 @@ class L3ConfigFixture(ConfigFixture):
|
||||
})
|
||||
|
||||
def _generate_external_bridge(self):
|
||||
return base.get_rand_device_name(prefix='br-ex')
|
||||
return utils.get_rand_device_name(prefix='br-ex')
|
||||
|
||||
def get_external_bridge(self):
|
||||
return self.config.DEFAULT.external_network_bridge
|
||||
|
||||
def _generate_namespace_suffix(self):
|
||||
return base.get_rand_name(prefix='test')
|
||||
return utils.get_rand_name(prefix='test')
|
||||
|
@ -75,9 +75,9 @@ class RabbitmqEnvironmentFixture(fixtures.Fixture):
|
||||
self.host = host
|
||||
|
||||
def _setUp(self):
|
||||
self.user = base.get_rand_name(prefix='user')
|
||||
self.password = base.get_rand_name(prefix='pass')
|
||||
self.vhost = base.get_rand_name(prefix='vhost')
|
||||
self.user = common_utils.get_rand_name(prefix='user')
|
||||
self.password = common_utils.get_rand_name(prefix='pass')
|
||||
self.vhost = common_utils.get_rand_name(prefix='vhost')
|
||||
|
||||
self._execute('add_user', self.user, self.password)
|
||||
self.addCleanup(self._execute, 'delete_user', self.user)
|
||||
|
@ -53,12 +53,12 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
||||
'ovs_neutron_agent.OVSPluginApi')
|
||||
mock.patch(agent_rpc).start()
|
||||
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
|
||||
self.br_int = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-int')
|
||||
self.br_tun = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-tun')
|
||||
self.br_phys = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-phys')
|
||||
self.br_int = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-int')
|
||||
self.br_tun = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-tun')
|
||||
self.br_phys = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
|
||||
prefix='br-phys')
|
||||
patch_name_len = n_const.DEVICE_NAME_MAX_LEN - len("-patch-tun")
|
||||
self.patch_tun = "%s-patch-tun" % self.br_int[patch_name_len:]
|
||||
self.patch_int = "%s-patch-int" % self.br_tun[patch_name_len:]
|
||||
@ -177,7 +177,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
||||
random.randint(3, 254),
|
||||
random.randint(3, 254),
|
||||
random.randint(3, 254))}],
|
||||
'vif_name': base.get_rand_name(
|
||||
'vif_name': utils.get_rand_name(
|
||||
self.driver.DEV_NAME_LEN, self.driver.DEV_NAME_PREFIX)}
|
||||
|
||||
def _create_test_network_dict(self):
|
||||
|
@ -24,7 +24,7 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import machine_fixtures
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent.l3 import framework
|
||||
@ -98,7 +98,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
||||
self.assertIsNone(device.route.get_gateway())
|
||||
|
||||
def _make_bridge(self):
|
||||
bridge = framework.get_ovs_bridge(tests_base.get_rand_name())
|
||||
bridge = framework.get_ovs_bridge(utils.get_rand_name())
|
||||
bridge.create()
|
||||
self.addCleanup(bridge.destroy)
|
||||
return bridge
|
||||
|
@ -12,9 +12,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from debtcollector import moves
|
||||
import testscenarios
|
||||
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.common import utils
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
@ -26,8 +27,10 @@ MARKED_BLOCK_RULE = '-m mark --mark %s -j DROP' % MARK_VALUE
|
||||
ICMP_BLOCK_RULE = '-p icmp -j DROP'
|
||||
|
||||
|
||||
#TODO(jschwarz): Move these two functions to neutron/tests/common/
|
||||
get_rand_name = tests_base.get_rand_name
|
||||
get_rand_name = moves.moved_function(
|
||||
utils.get_rand_name, 'get_rand_name', __name__,
|
||||
message='use "neutron.common.utils.get_rand_name" instead',
|
||||
version='Newton', removal_version='Ocata')
|
||||
|
||||
|
||||
# Regarding MRO, it goes BaseOVSLinuxTestCase, WithScenarios,
|
||||
|
@ -20,7 +20,6 @@ from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import utils
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent.linux import base
|
||||
|
||||
@ -42,7 +41,7 @@ class OVSInterfaceDriverTestCase(base.BaseOVSLinuxTestCase):
|
||||
namespace='not_a_namespace')
|
||||
|
||||
def test_plug_succeeds(self):
|
||||
device_name = tests_base.get_rand_name()
|
||||
device_name = utils.get_rand_name()
|
||||
mac_address = utils.get_random_mac('fa:16:3e:00:00:00'.split(':'))
|
||||
namespace = self.useFixture(net_helpers.NamespaceFixture()).name
|
||||
bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
|
||||
@ -75,7 +74,7 @@ class OVSInterfaceDriverTestCase(base.BaseOVSLinuxTestCase):
|
||||
# Now plug a device with intended MTU that is higher than for the port
|
||||
# above and validate that its MTU is not reduced to the least MTU on
|
||||
# the bridge
|
||||
device_name = tests_base.get_rand_name()
|
||||
device_name = utils.get_rand_name()
|
||||
mac_address = utils.get_random_mac('fa:16:3e:00:00:00'.split(':'))
|
||||
namespace = self.useFixture(net_helpers.NamespaceFixture()).name
|
||||
self.interface.plug(network_id=uuidutils.generate_uuid(),
|
||||
|
@ -25,7 +25,6 @@ from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent.linux import base
|
||||
from neutron.tests.functional import base as functional_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -52,11 +51,11 @@ class IpLibTestFramework(functional_base.BaseSudoTestCase):
|
||||
|
||||
def generate_device_details(self, name=None, ip_cidrs=None,
|
||||
mac_address=None, namespace=None):
|
||||
return Device(name or base.get_rand_name(),
|
||||
return Device(name or utils.get_rand_name(),
|
||||
ip_cidrs or ["%s/24" % TEST_IP],
|
||||
mac_address or
|
||||
utils.get_random_mac('fa:16:3e:00:00:00'.split(':')),
|
||||
namespace or base.get_rand_name())
|
||||
namespace or utils.get_rand_name())
|
||||
|
||||
def _safe_delete_device(self, device):
|
||||
try:
|
||||
@ -185,7 +184,7 @@ class IpLibTestCase(IpLibTestFramework):
|
||||
|
||||
def test_dummy_exists(self):
|
||||
namespace = self.useFixture(net_helpers.NamespaceFixture())
|
||||
dev_name = base.get_rand_name()
|
||||
dev_name = utils.get_rand_name()
|
||||
device = namespace.ip_wrapper.add_dummy(dev_name)
|
||||
self.addCleanup(self._safe_delete_device, device)
|
||||
self._check_for_device_name(namespace.ip_wrapper, dev_name, True)
|
||||
|
@ -16,6 +16,7 @@
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ipset_manager
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import machine_fixtures
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent.linux import base
|
||||
@ -35,7 +36,7 @@ class IpsetBase(functional_base.BaseSudoTestCase):
|
||||
self.source, self.destination = self.useFixture(
|
||||
machine_fixtures.PeerMachines(bridge)).machines
|
||||
|
||||
self.ipset_name = base.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
|
||||
self.ipset_name = utils.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
|
||||
self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT'
|
||||
% self.ipset_name)
|
||||
self.ipset = self._create_ipset_manager_and_set(
|
||||
|
@ -18,7 +18,7 @@ import mock
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.ovsdb import api
|
||||
from neutron.agent.ovsdb import impl_idl
|
||||
from neutron.tests import base as test_base
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base
|
||||
|
||||
@ -36,7 +36,7 @@ class ImplIdlTestCase(base.BaseSudoTestCase):
|
||||
super(ImplIdlTestCase, self).setUp()
|
||||
self.config(group='OVS', ovsdb_interface='native')
|
||||
self.ovs = ovs_lib.BaseOVS()
|
||||
self.brname = test_base.get_rand_device_name(net_helpers.BR_PREFIX)
|
||||
self.brname = utils.get_rand_device_name(net_helpers.BR_PREFIX)
|
||||
# Make sure exceptions pass through by calling do_post_commit directly
|
||||
mock.patch.object(
|
||||
impl_idl.NeutronOVSDBTransaction, "post_commit",
|
||||
|
@ -27,10 +27,10 @@ from testtools.content import text_content
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.cmd.sanity import checks
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent \
|
||||
import ovs_neutron_agent as ovsagt
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.tests.common import base as common_base
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent import test_ovs_lib
|
||||
@ -391,7 +391,7 @@ class OVSFlowTestCase(OVSAgentTestBase):
|
||||
'local_ip': '198.51.100.1', # RFC 5737 TEST-NET-2
|
||||
}
|
||||
kwargs = {'vlan': 777, 'tun_id': 888}
|
||||
port_name = tests_base.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
port_name = common_utils.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
ofport = self.br_tun.add_tunnel_port(port_name, attrs['remote_ip'],
|
||||
attrs['local_ip'])
|
||||
self.br_tun.install_flood_to_tun(ports=[ofport], **kwargs)
|
||||
|
@ -20,7 +20,7 @@ import mock
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent.linux import base
|
||||
|
||||
@ -37,15 +37,15 @@ class OVSBridgeTestBase(base.BaseOVSLinuxTestCase):
|
||||
# Convert ((a, b), (c, d)) to {a: b, c: d} and add 'type' by default
|
||||
attrs = collections.OrderedDict(interface_attrs)
|
||||
attrs.setdefault('type', 'internal')
|
||||
port_name = tests_base.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
port_name = utils.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
return (port_name, self.br.add_port(port_name, *attrs.items()))
|
||||
|
||||
def create_ovs_vif_port(self, iface_id=None, mac=None,
|
||||
iface_field='iface-id'):
|
||||
if iface_id is None:
|
||||
iface_id = base.get_rand_name()
|
||||
iface_id = utils.get_rand_name()
|
||||
if mac is None:
|
||||
mac = base.get_rand_name()
|
||||
mac = utils.get_rand_name()
|
||||
attrs = ('external_ids', {iface_field: iface_id, 'attached-mac': mac})
|
||||
port_name, ofport = self.create_ovs_port(attrs)
|
||||
return ovs_lib.VifPort(port_name, ofport, iface_id, mac, self.br)
|
||||
@ -75,7 +75,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
|
||||
self.assertRaises(RuntimeError, cmd.execute, check_error=True)
|
||||
|
||||
def test_replace_port(self):
|
||||
port_name = tests_base.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
port_name = utils.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
self.br.replace_port(port_name, ('type', 'internal'))
|
||||
self.assertTrue(self.br.port_exists(port_name))
|
||||
self.assertEqual('internal',
|
||||
@ -148,7 +148,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
|
||||
self.assertIn(dpid, self.br.get_datapath_id())
|
||||
|
||||
def _test_add_tunnel_port(self, attrs):
|
||||
port_name = tests_base.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
port_name = utils.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
self.br.add_tunnel_port(port_name, attrs['remote_ip'],
|
||||
attrs['local_ip'])
|
||||
self.assertEqual('gre',
|
||||
@ -172,7 +172,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
|
||||
self._test_add_tunnel_port(attrs)
|
||||
|
||||
def test_add_patch_port(self):
|
||||
local = tests_base.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
local = utils.get_rand_device_name(net_helpers.PORT_PREFIX)
|
||||
peer = 'remotepeer'
|
||||
self.br.add_patch_port(local, peer)
|
||||
self.assertEqual(self.ovs.db_get_val('Interface', local, 'type'),
|
||||
@ -342,7 +342,7 @@ class OVSLibTestCase(base.BaseOVSLinuxTestCase):
|
||||
self.ovs = ovs_lib.BaseOVS()
|
||||
|
||||
def test_bridge_lifecycle_baseovs(self):
|
||||
name = base.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
name = utils.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
self.addCleanup(self.ovs.delete_bridge, name)
|
||||
br = self.ovs.add_bridge(name)
|
||||
self.assertEqual(br.br_name, name)
|
||||
@ -357,7 +357,7 @@ class OVSLibTestCase(base.BaseOVSLinuxTestCase):
|
||||
self.assertTrue(set(self.ovs.get_bridges()).issuperset(bridges))
|
||||
|
||||
def test_bridge_lifecycle_ovsbridge(self):
|
||||
name = base.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
name = utils.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
br = ovs_lib.OVSBridge(name)
|
||||
self.assertEqual(br.br_name, name)
|
||||
# Make sure that instantiating an OVSBridge does not actually create
|
||||
@ -374,10 +374,10 @@ class OVSLibTestCase(base.BaseOVSLinuxTestCase):
|
||||
Makes sure that db_find search queries give the same result for both
|
||||
implementations.
|
||||
"""
|
||||
bridge_name = base.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
bridge_name = utils.get_rand_name(prefix=net_helpers.BR_PREFIX)
|
||||
self.addCleanup(self.ovs.delete_bridge, bridge_name)
|
||||
br = self.ovs.add_bridge(bridge_name)
|
||||
port_name = base.get_rand_name(prefix=net_helpers.PORT_PREFIX)
|
||||
port_name = utils.get_rand_name(prefix=net_helpers.PORT_PREFIX)
|
||||
br.add_port(port_name)
|
||||
self.ovs.set_db_attribute('Port', port_name, 'tag', 42)
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
|
||||
import testtools
|
||||
|
||||
from neutron.tests import base as tests_base
|
||||
from neutron.common import utils
|
||||
from neutron.tests.retargetable import base
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ class TestExample(base.RetargetableApiTest):
|
||||
"""
|
||||
|
||||
def test_network_lifecycle(self):
|
||||
net = self.client.create_network(name=tests_base.get_rand_name())
|
||||
net = self.client.create_network(name=utils.get_rand_name())
|
||||
listed_networks = {x.id: x.name for x in self.client.get_networks()}
|
||||
self.assertIn(net.id, listed_networks)
|
||||
self.assertEqual(listed_networks[net.id], net.name,
|
||||
|
@ -339,6 +339,20 @@ class HackingTestCase(base.BaseTestCase):
|
||||
self.assertEqual(
|
||||
1, len(list(checks.check_log_warn_deprecated(bad, 'f'))))
|
||||
|
||||
def test_check_no_imports_from_tests(self):
|
||||
fail_codes = ('from neutron import tests',
|
||||
'from neutron.tests import base',
|
||||
'import neutron.tests.base')
|
||||
for fail_code in fail_codes:
|
||||
self.assertEqual(
|
||||
1, len(list(
|
||||
checks.check_no_imports_from_tests(
|
||||
fail_code, "neutron/common/rpc.py", None))))
|
||||
self.assertEqual(
|
||||
0, len(list(
|
||||
checks.check_no_imports_from_tests(
|
||||
fail_code, "neutron/tests/test_fake.py", None))))
|
||||
|
||||
def test_check_python3_filter(self):
|
||||
f = checks.check_python3_no_filter
|
||||
self.assertLineFails(f, "filter(lambda obj: test(obj), data)")
|
||||
@ -346,6 +360,7 @@ class HackingTestCase(base.BaseTestCase):
|
||||
self.assertLinePasses(f, "filter(function, range(0,10))")
|
||||
self.assertLinePasses(f, "lambda x, y: x+y")
|
||||
|
||||
|
||||
# The following is borrowed from hacking/tests/test_doctest.py.
|
||||
# Tests defined in docstring is easier to understand
|
||||
# in some cases, for example, hacking rules which take tokens as argument.
|
||||
|
Loading…
Reference in New Issue
Block a user