Merge "[Fullstack] Remove unnecessary tests" into stable/zed

This commit is contained in:
Zuul 2024-01-31 13:40:51 +00:00 committed by Gerrit Code Review
commit 5102a2b10e
8 changed files with 0 additions and 553 deletions

@ -22,25 +22,9 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.l2.extensions import qos as qos_extension
from neutron.common import config
from neutron.services.trunk.drivers.openvswitch.agent \
import driver as trunk_driver
from neutron.tests.common.agents import ovs_agent
def monkeypatch_init_handler():
original_handler = trunk_driver.init_handler
def new_init_handler(resource, event, trigger, payload=None):
# NOTE(slaweq): make this setup conditional based on server-side
# capabilities for fullstack tests we can assume that server-side
# and agent-side conf are in sync
if "trunk" not in cfg.CONF.service_plugins:
return
original_handler(resource, event, trigger, payload)
trunk_driver.init_handler = new_init_handler
def monkeypatch_qos():
mock.patch.object(ovs_lib.OVSBridge, 'clear_bandwidth_qos').start()
if "qos" in cfg.CONF.service_plugins:
@ -63,7 +47,6 @@ def main():
# ovs-vswitchd processes for each test will be isolated in separate
# namespace
config.register_common_config_options()
monkeypatch_init_handler()
monkeypatch_qos()
monkeypatch_event_filtering()
ovs_agent.main()

@ -83,22 +83,6 @@ class BaseConnectivitySameNetworkTest(base.BaseFullStackTestCase):
vms.ping_all()
class TestOvsConnectivitySameNetwork(BaseConnectivitySameNetworkTest):
l2_agent_type = constants.AGENT_TYPE_OVS
scenarios = [
('VXLAN', {'network_type': 'vxlan',
'l2_pop': False}),
('GRE-l2pop-arp_responder', {'network_type': 'gre',
'l2_pop': True,
'arp_responder': True}),
('VLANs', {'network_type': 'vlan',
'l2_pop': False})]
def test_connectivity(self):
self._test_connectivity()
class TestOvsConnectivitySameNetworkOnOvsBridgeControllerStop(
BaseConnectivitySameNetworkTest):

@ -91,15 +91,6 @@ class TestDhcpAgentNoHA(BaseDhcpAgentTest):
number_of_hosts = 1
agent_down_time = 60
def test_dhcp_assignment(self):
# First check if network was scheduled to one DHCP agent
dhcp_agents = self.client.list_dhcp_agent_hosting_networks(
self.network['id'])
self.assertEqual(1, len(dhcp_agents['agents']))
# And check if IP and gateway config is fine on FakeMachine
self.vm.block_until_dhcp_config_done()
def test_mtu_update(self):
# The test case needs access to devices in nested namespaces. ip_lib
# doesn't support it, and it's probably unsafe to touch the library for

@ -16,7 +16,6 @@ import functools
import os
import time
import netaddr
from neutron_lib import constants
from neutronclient.common import exceptions
from oslo_utils import uuidutils
@ -323,18 +322,6 @@ class TestLegacyL3Agent(TestL3Agent):
host_descriptions)
super(TestLegacyL3Agent, self).setUp(env)
def test_namespace_exists(self):
tenant_id = uuidutils.generate_uuid()
router = self.safe_client.create_router(tenant_id)
network = self.safe_client.create_network(tenant_id)
subnet = self.safe_client.create_subnet(
tenant_id, network['id'], '20.0.0.0/24', gateway_ip='20.0.0.1')
self.safe_client.add_router_interface(router['id'], subnet['id'])
namespace = self._get_namespace(router['id'])
self.assert_namespace_exists(namespace)
def test_mtu_update(self):
tenant_id = uuidutils.generate_uuid()
@ -361,92 +348,6 @@ class TestLegacyL3Agent(TestL3Agent):
network = self.safe_client.update_network(network['id'], mtu=mtu)
common_utils.wait_until_true(lambda: ri_dev.link.mtu == mtu)
def test_east_west_traffic(self):
tenant_id = uuidutils.generate_uuid()
router = self.safe_client.create_router(tenant_id)
vm1 = self._create_net_subnet_and_vm(
tenant_id, ['20.0.0.0/24', '2001:db8:aaaa::/64'],
self.environment.hosts[0], router)
vm2 = self._create_net_subnet_and_vm(
tenant_id, ['21.0.0.0/24', '2001:db8:bbbb::/64'],
self.environment.hosts[1], router)
vm1.block_until_ping(vm2.ip)
# Verify ping6 from vm2 to vm1 IPv6 Address
vm2.block_until_ping(vm1.ipv6)
def test_north_south_traffic(self):
# This function creates an external network which is connected to
# central_bridge and spawns an external_vm on it.
# The external_vm is configured with the gateway_ip (both v4 & v6
# addresses) of external subnet. Later, it creates a tenant router,
# a tenant network and two tenant subnets (v4 and v6). The tenant
# router is associated with tenant network and external network to
# provide north-south connectivity to the VMs.
# We validate the following in this testcase.
# 1. SNAT support: using ping from tenant VM to external_vm
# 2. Floating IP support: using ping from external_vm to VM floating ip
# 3. IPv6 ext connectivity: using ping6 from tenant vm to external_vm.
tenant_id = uuidutils.generate_uuid()
ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
external_vm = self._create_external_vm(ext_net, ext_sub)
# Create an IPv6 subnet in the external network
v6network = self.useFixture(
ip_network.ExclusiveIPNetwork(
"2001:db8:1234::1", "2001:db8:1234::10", "64")).network
# NOTE(ykarel): gateway_ip is explicitly added as iputils package
# requires fix for https://github.com/iputils/iputils/issues/371
# is not available in CentOS 9-Stream
ext_v6sub = self.safe_client.create_subnet(
tenant_id, ext_net['id'], v6network, gateway_ip='2001:db8:1234::1')
router = self.safe_client.create_router(tenant_id,
external_network=ext_net['id'])
# Configure the gateway_ip of external v6subnet on the external_vm.
external_vm.ipv6_cidr = common_utils.ip_to_cidr(
ext_v6sub['gateway_ip'], 64)
# Configure an IPv6 downstream route to the v6Address of router gw port
for fixed_ip in router['external_gateway_info']['external_fixed_ips']:
if netaddr.IPNetwork(fixed_ip['ip_address']).version == 6:
external_vm.set_default_gateway(fixed_ip['ip_address'])
vm = self._create_net_subnet_and_vm(
tenant_id, ['20.0.0.0/24', '2001:db8:aaaa::/64'],
self.environment.hosts[1], router)
# ping external vm to test snat
vm.block_until_ping(external_vm.ip)
fip = self.safe_client.create_floatingip(
tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id'])
# ping floating ip from external vm
external_vm.block_until_ping(fip['floating_ip_address'])
# Verify VM is able to reach the router interface.
vm.block_until_ping(vm.gateway_ipv6)
# Verify north-south connectivity using ping6 to external_vm.
vm.block_until_ping(external_vm.ipv6)
# Now let's remove and create again phys bridge and check connectivity
# once again
br_phys = self.environment.hosts[0].br_phys
br_phys.destroy()
br_phys.create()
self.environment.hosts[0].connect_to_central_network_via_vlans(
br_phys)
# ping floating ip from external vm
external_vm.block_until_ping(fip['floating_ip_address'])
# Verify VM is able to reach the router interface.
vm.block_until_ping(vm.gateway_ipv6)
# Verify north-south connectivity using ping6 to external_vm.
vm.block_until_ping(external_vm.ipv6)
def test_gateway_ip_changed(self):
self._test_gateway_ip_changed()

@ -29,7 +29,6 @@ from neutron.tests.fullstack.resources import machine
from neutron.tests.unit import testlib_api
from neutron.agent.common import ovs_lib
from neutron.services.qos.drivers.openvswitch import driver as ovs_drv
load_tests = testlib_api.module_load_tests
@ -201,47 +200,6 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase):
for direction in list(all_directions):
self._wait_for_bw_rule_applied(vm, None, None, direction)
def test_bw_limit_qos_policy_rule_lifecycle(self):
new_limit = BANDWIDTH_LIMIT + 100
# Create port with qos policy attached
vm, qos_policy = self._prepare_vm_with_qos_policy(
[functools.partial(
self._add_bw_limit_rule,
BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction)])
bw_rule = qos_policy['rules'][0]
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction)
qos_policy_id = qos_policy['id']
self.client.delete_bandwidth_limit_rule(bw_rule['id'], qos_policy_id)
self._wait_for_bw_rule_removed(vm, self.direction)
# Create new rule with no given burst value, in such case ovs and lb
# agent should apply burst value as
# bandwidth_limit * qos_consts.DEFAULT_BURST_RATE
new_expected_burst = self._get_expected_burst_value(new_limit,
self.direction)
new_rule = self.safe_client.create_bandwidth_limit_rule(
self.tenant_id, qos_policy_id, new_limit, direction=self.direction)
self._wait_for_bw_rule_applied(
vm, new_limit, new_expected_burst, self.direction)
# Update qos policy rule id
self.client.update_bandwidth_limit_rule(
new_rule['id'], qos_policy_id,
body={'bandwidth_limit_rule': {'max_kbps': BANDWIDTH_LIMIT,
'max_burst_kbps': BANDWIDTH_BURST}})
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction)
# Remove qos policy from port
self.client.update_port(
vm.neutron_port['id'],
body={'port': {'qos_policy_id': None}})
self._wait_for_bw_rule_removed(vm, self.direction)
def test_bw_limit_direction_change(self):
# Create port with qos policy attached, with rule self.direction
vm, qos_policy = self._prepare_vm_with_qos_policy(
@ -584,27 +542,6 @@ class TestPacketRateLimitQoSOvs(_TestPacketRateLimitQoS,
]
class TestQoSWithL2Population(base.BaseFullStackTestCase):
scenarios = [
(constants.AGENT_TYPE_OVS,
{'mech_drivers': 'openvswitch',
'supported_rules': ovs_drv.SUPPORTED_RULES}),
]
def setUp(self):
host_desc = [] # No need to register agents for this test case
env_desc = environment.EnvironmentDescription(
qos=True, l2_pop=True, mech_drivers=self.mech_drivers)
env = environment.Environment(env_desc, host_desc)
super(TestQoSWithL2Population, self).setUp(env)
def test_supported_qos_rule_types(self):
res = self.client.list_qos_rule_types()
rule_types = {t['type'] for t in res['rule_types']}
expected_rules = set(self.supported_rules)
self.assertEqual(expected_rules, rule_types)
class TestQoSPolicyIsDefault(base.BaseFullStackTestCase):
NAME = 'fs_policy'

@ -669,21 +669,6 @@ class SecurityGroupRulesTest(base.BaseFullStackTestCase):
host_descriptions)
super(SecurityGroupRulesTest, self).setUp(env)
def test_security_group_rule_quota(self):
project_id = uuidutils.generate_uuid()
quota = self.client.show_quota_details(project_id)
sg_rules_used = quota['quota']['security_group_rule']['used']
self.assertEqual(0, sg_rules_used)
self.safe_client.create_security_group(project_id)
quota = self.client.show_quota_details(project_id)
sg_rules_used = quota['quota']['security_group_rule']['used']
self.safe_client.update_quota(project_id, 'security_group_rule',
sg_rules_used)
self.assertRaises(nc_exc.OverQuotaClient,
self.safe_client.create_security_group, project_id)
def test_normalized_cidr_in_rule(self):
project_id = uuidutils.generate_uuid()
sg = self.safe_client.create_security_group(project_id)

@ -61,27 +61,6 @@ class TestSubnet(base.BaseFullStackTestCase):
def _show_subnet(self, subnet_id):
return self.client.show_subnet(subnet_id)
def test_create_subnet_ipv4(self):
cidr = self.useFixture(
ip_network.ExclusiveIPNetwork(
'240.0.0.0', '240.255.255.255', '24')).network
subnet = self._create_subnet(self._project_id, self._network['id'],
cidr)
subnet = self._show_subnet(subnet['id'])
self.assertEqual(subnet['subnet']['gateway_ip'],
str(netaddr.IPNetwork(cidr).network + 1))
def test_create_subnet_ipv6_slaac(self):
cidr = self.useFixture(
ip_network.ExclusiveIPNetwork(
'2001:db8::', '2001:db8::ffff', '64')).network
subnet = self._create_subnet(self._project_id, self._network['id'],
cidr, ipv6_address_mode='slaac',
ipv6_ra_mode='slaac')
subnet = self._show_subnet(subnet['id'])
self.assertEqual(subnet['subnet']['gateway_ip'],
str(netaddr.IPNetwork(cidr).network))
def test_create_subnet_ipv6_prefix_delegation(self):
subnet = self._create_subnet(self._project_id, self._network['id'],
None, ipv6_address_mode='slaac',

@ -1,313 +0,0 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import netaddr
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.common import utils
from neutron.services.trunk.drivers.openvswitch.agent import ovsdb_handler
from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager
from neutron.services.trunk.drivers.openvswitch import utils as trunk_ovs_utils
from neutron.tests.fullstack import base
from neutron.tests.fullstack.resources import environment
from neutron.tests.fullstack.resources import machine
def trunk_bridge_does_not_exist(trunk_id):
"""Return true if trunk bridge for given ID does not exists."""
bridge = trunk_manager.TrunkBridge(trunk_id)
return not bridge.exists()
def make_ip_network(port, network):
"""Make an IPNetwork object from port and network.
Function returns IPNetwork object containing fixed IP address from port
dictionary with prefixlen from network object.
:param port: Port dictionary returned by Neutron API
:param network: IPNetwork object in which the port's IP will be assigned.
"""
ip_address = netaddr.IPAddress(
port['fixed_ips'][0]['ip_address'])
return netaddr.IPNetwork(
(ip_address.value, network.prefixlen))
class TrunkTestException(Exception):
pass
class Network(object):
"""A helper class to keep persistent info about assigned addresses."""
def __init__(self, prefix, network_cidr, tag=None):
self.prefix = prefix
self.network = netaddr.IPNetwork(network_cidr)
self.neutron_network = None
self.neutron_subnet = None
self.tag = tag
# Currently, only vlan is supported. Pass via __init__ once more are
# supported.
self.segmentation_type = 'vlan'
@property
def cidr(self):
return str(self.network.cidr)
@property
def gateway(self):
"""Return lowest possible IP in the given subnet."""
return str(netaddr.IPAddress(self.network.first + 1))
@property
def id(self):
return self.neutron_network['id']
@property
def name(self):
return "%s-network" % self.prefix
@property
def subnet_name(self):
return "%s-subnet" % self.prefix
class TestTrunkPlugin(base.BaseFullStackTestCase):
def setUp(self):
host_desc = [environment.HostDescription(
l3_agent=False,
l2_agent_type=constants.AGENT_TYPE_OVS)]
env_desc = environment.EnvironmentDescription(service_plugins='trunk')
env = environment.Environment(env_desc, host_desc)
super(TestTrunkPlugin, self).setUp(env)
self.tenant_id = uuidutils.generate_uuid()
self.trunk_network = Network('trunk', '10.0.0.0/24')
self.vlan1_network = Network('vlan1', '192.168.0.0/24', tag=10)
self.vlan2_network = Network('vlan2', '192.168.1.0/24', tag=20)
self.host = self.environment.hosts[0]
for network in (
self.trunk_network, self.vlan1_network, self.vlan2_network):
self.create_network_and_subnet(network)
def create_network_and_subnet(self, network):
"""Create network and subnet resources in Neutron based on network
object.
The resource names will be <prefix>-network and <prefix>-subnet, where
prefix is taken from network object.
:param network: Network object from this module.
"""
network.neutron_network = self.safe_client.create_network(
self.tenant_id, network.name)
network.neutron_subnet = self.safe_client.create_subnet(
self.tenant_id,
network.id,
cidr=network.cidr,
gateway_ip=network.gateway,
name=network.subnet_name,
enable_dhcp=False)
def create_vlan_aware_vm(self, trunk_network, vlan_networks):
"""Create a fake machine with one untagged port and subports
according vlan_networks parameter.
:param trunk_network: Instance of Network where trunk port should be
created.
:param vlan_networks: List of Network instances where subports should
be created.
"""
trunk_parent_port = self.safe_client.create_port(
self.tenant_id, trunk_network.id)
vlan_subports = [
self.safe_client.create_port(self.tenant_id, vlan_network.id,
mac_address=trunk_parent_port['mac_address'])
for vlan_network in vlan_networks]
trunk = self.safe_client.create_trunk(
self.tenant_id,
name='mytrunk',
port_id=trunk_parent_port['id'],
sub_ports=[
{'port_id': vlan_subport['id'],
'segmentation_type': 'vlan',
'segmentation_id': vlan_network.tag}
for vlan_subport, vlan_network in zip(vlan_subports,
vlan_networks)
],
)
vm = self.useFixture(
machine.FakeFullstackTrunkMachine(
trunk,
self.host,
trunk_network.id,
self.tenant_id,
self.safe_client,
neutron_port=trunk_parent_port,
bridge_name=trunk_ovs_utils.gen_trunk_br_name(trunk['id'])))
for port, vlan_network in zip(vlan_subports, vlan_networks):
ip_network = make_ip_network(port, vlan_network.network)
vm.add_vlan_interface(
port['mac_address'], ip_network, vlan_network.tag)
vm.block_until_boot()
return vm
def create_vm_in_network(self, network):
"""Create a fake machine in given network."""
return self.useFixture(
machine.FakeFullstackMachine(
self.host,
network.id,
self.tenant_id,
self.safe_client
)
)
def add_subport_to_vm(self, vm, subport_network):
"""Add subport from subport_network to given vm.
:param vm: FakeFullstackMachine instance to with subport should be
added.
:param subport_network: Network object representing network containing
port for subport.
"""
subport = self.safe_client.create_port(
self.tenant_id, subport_network.id,
mac_address=vm.neutron_port['mac_address'])
subport_spec = {
'port_id': subport['id'],
'segmentation_type': subport_network.segmentation_type,
'segmentation_id': subport_network.tag
}
self.safe_client.trunk_add_subports(
self.tenant_id, vm.trunk['id'], [subport_spec])
ip_network = make_ip_network(subport, subport_network.network)
vm.add_vlan_interface(
subport['mac_address'], ip_network, subport_network.tag)
# NOTE(slaweq): As is described in bug
# https://bugs.launchpad.net/neutron/+bug/1687709 when more than one
# different ovs-agent with enabled trunk driver is running at a time it
# might lead to race conditions between them.
# Because of that ovs_agent used for fullstack tests is monkeypatched and
# loads trunk driver only if trunk service plugin is enabled.
# That makes restriction that only a single set of tests with trunk-enabled
# services will run at the same time.
def test_trunk_lifecycle(self):
"""Test life-cycle of a fake VM with trunk port.
This test uses 4 fake machines:
- vlan_aware_vm (A) that is at the beginning connected to a trunk
network and a vlan1 network.
- trunk_network_vm (B) that is connected to the trunk network.
- vlan1_network_vm (C) that is connected to the vlan1 network.
- vlan2_network_vm (D) that is connected to a vlan2 network.
Scenario steps:
- all the vms from above are created
- A can talk with B (over the trunk network)
- A can talk with C (over the vlan1 network)
- A can not talk with D (no leg on the vlan2 network)
- subport from the vlan2 network is added to A
- A can now talk with D (over the vlan2 network)
- subport from the vlan1 network is removed from A
- A can talk with B (over the trunk network)
- A can not talk with C (no leg on the vlan1 network)
- A can talk with D (over the vlan2 network)
- A is deleted which leads to removal of trunk bridge
- no leftovers like patch ports to the trunk bridge should remain on
an integration bridge
"""
vlan_aware_vm = self.create_vlan_aware_vm(
self.trunk_network,
[self.vlan1_network]
)
trunk_id = vlan_aware_vm.trunk['id']
# Create helper vms with different networks
trunk_network_vm = self.create_vm_in_network(self.trunk_network)
vlan1_network_vm = self.create_vm_in_network(self.vlan1_network)
vlan2_network_vm = self.create_vm_in_network(self.vlan2_network)
for vm in trunk_network_vm, vlan1_network_vm, vlan2_network_vm:
vm.block_until_boot()
# Test connectivity to trunk and subport
vlan_aware_vm.block_until_ping(trunk_network_vm.ip)
vlan_aware_vm.block_until_ping(vlan1_network_vm.ip)
# Subport for vlan2 hasn't been added yet
vlan_aware_vm.block_until_no_ping(vlan2_network_vm.ip)
# Add another subport and test
self.add_subport_to_vm(vlan_aware_vm, self.vlan2_network)
vlan_aware_vm.block_until_ping(vlan2_network_vm.ip)
# Remove the first subport
self.safe_client.trunk_remove_subports(
self.tenant_id,
trunk_id,
[vlan_aware_vm.trunk['sub_ports'][0]])
# vlan1_network_vm now shouldn't be able to talk to vlan_aware_vm
vlan_aware_vm.block_until_no_ping(vlan1_network_vm.ip)
# but trunk and vlan2 should be able to ping
vlan_aware_vm.block_until_ping(trunk_network_vm.ip)
vlan_aware_vm.block_until_ping(vlan2_network_vm.ip)
# Delete vm and check that patch ports and trunk bridge are gone
vlan_aware_vm.destroy()
bridge_doesnt_exist_predicate = functools.partial(
trunk_bridge_does_not_exist, trunk_id)
utils.wait_until_true(
bridge_doesnt_exist_predicate,
exception=TrunkTestException(
'Trunk bridge with ID %s has not been removed' %
trunk_id)
)
integration_bridge = self.host.get_bridge(None)
no_patch_ports_predicate = functools.partial(
lambda bridge: not ovsdb_handler.bridge_has_service_port(bridge),
integration_bridge,
)
try:
utils.wait_until_true(no_patch_ports_predicate)
except utils.WaitTimeout:
# Create exception object after timeout to provide up-to-date list
# of interfaces
raise TrunkTestException(
"Integration bridge %s still has following ports while some of"
" them are patch ports for trunk that were supposed to be "
"removed: %s" % (
integration_bridge.br_name,
integration_bridge.get_iface_name_list()
)
)