From e6603a20b34bb1f807cf2e6a8424213e70bb3664 Mon Sep 17 00:00:00 2001 From: Maor Blaustein Date: Tue, 13 Feb 2024 13:29:06 +0200 Subject: [PATCH] Migrate vlan transparency tests Change-Id: I6572af33c8799947f6d630540750a92e91d292dd --- .../tests/scenario/test_vlan_transparency.py | 512 ++++++++++++++++++ 1 file changed, 512 insertions(+) create mode 100644 whitebox_neutron_tempest_plugin/tests/scenario/test_vlan_transparency.py diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/test_vlan_transparency.py b/whitebox_neutron_tempest_plugin/tests/scenario/test_vlan_transparency.py new file mode 100644 index 0000000..30185bb --- /dev/null +++ b/whitebox_neutron_tempest_plugin/tests/scenario/test_vlan_transparency.py @@ -0,0 +1,512 @@ +# Copyright (c) 2020 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import netaddr +from neutron_lib import constants +from neutron_tempest_plugin.common import ip +from neutron_tempest_plugin.common import ssh +from neutron_tempest_plugin import config +from neutron_tempest_plugin.scenario import base +from oslo_log import log as logging +from tempest.lib.common import fixed_network +from tempest.lib.common.utils import data_utils +from tempest.lib import decorators +from tempest.lib import exceptions + +LOG = logging.getLogger(__name__) +CONF = config.CONF +MIN_VLAN_ID = 1 +MAX_VLAN_ID = 4094 + + +# TODO(eolivare): split upstream class in two classes: a base class and a class +# with the tests. This is needed in order to import the base class here and to +# not duplicate code +class VlanTransparencyDowntreamBaseTest(base.BaseTempestTestCase): + credentials = ['primary', 'admin'] + force_tenant_isolation = False + + keypairs_client = None + servers_client = None + + required_extensions = ['vlan-transparent', 'allowed-address-pairs'] + + @classmethod + def resource_setup(cls): + super(VlanTransparencyDowntreamBaseTest, cls).resource_setup() + # setup basic topology for servers we can log into + cls.rand_name = data_utils.rand_name( + cls.__name__.rsplit('.', 1)[-1]) + try: + cls.network = cls.create_network(name=cls.rand_name, + vlan_transparent=True) + except exceptions.ServerFault as exc: + msg = 'Backend does not support VLAN Transparency.' + if exc.resp_body['message'] == msg: + raise cls.skipException(msg) + else: + raise exc + + cls.subnet = cls.create_subnet(network=cls.network, + name=cls.rand_name) + cls.router = cls.create_router_by_client() + cls.create_router_interface(cls.router['id'], cls.subnet['id']) + cls.keypair = cls.create_keypair(name=cls.rand_name, + client=cls.keypairs_client) + cls.vms = {} + cls.security_group = cls.create_security_group(name=cls.rand_name) + cls.create_loginable_secgroup_rule( + cls.security_group['id'], client=cls.client) + + # create security group rules for icmpv6 + rulesets = [{'protocol': constants.PROTO_NAME_IPV6_ICMP, + 'ethertype': 'IPv6', + 'direction': 'ingress'}] + cls.create_secgroup_rules( + rulesets, cls.security_group['id'], client=cls.client) + + if CONF.neutron_plugin_options.default_image_is_advanced: + cls.flavor_ref = CONF.compute.flavor_ref + cls.image_ref = CONF.compute.image_ref + else: + cls.flavor_ref = \ + CONF.neutron_plugin_options.advanced_image_flavor_ref + cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref + + @classmethod + def skip_checks(cls): + super(VlanTransparencyDowntreamBaseTest, cls).skip_checks() + if not (CONF.neutron_plugin_options.advanced_image_ref or + CONF.neutron_plugin_options.default_image_is_advanced): + raise cls.skipException( + 'Advanced image is required to run these tests.') + + def _create_port_and_server(self, server_name, port_name, + port_security=True, + allowed_address_pairs=None, + second_network=None): + if port_security: + sec_groups = [self.security_group['id']] + else: + sec_groups = None + + first_port_name = 'first_' + port_name if second_network else port_name + self.vms[server_name]['port'] = self.create_port( + network=self.network, name=first_port_name, + security_groups=sec_groups, + port_security_enabled=port_security, + allowed_address_pairs=allowed_address_pairs) + + networks = [{'port': self.vms[server_name]['port']['id']}] + + if second_network: + # Some test need servers with two ports: First one is used for + # management (ssh access via FIP) and second one is used for VLAN + # Transparency testing + second_port_name = 'second_' + port_name + self.vms[server_name]['second_port'] = self.create_port( + network=second_network, + name=second_port_name, + security_groups=sec_groups, + port_security_enabled=port_security) + networks.append( + {'port': self.vms[server_name]['second_port']['id']}) + + return self.create_server(flavor_ref=self.flavor_ref, + image_ref=self.image_ref, + key_name=self.keypair['name'], + networks=networks, + name=server_name, + client=self.servers_client)['server'] + + def _configure_vlan_transparent(self, port, ssh_client, + vlan_tag, vlan_ip): + ip_command = ip.IPCommand(ssh_client=ssh_client) + addresses = ip_command.list_addresses(port=port) + port_iface = ip.get_port_device_name(addresses, port) + subport_iface = ip_command.configure_vlan_transparent( + port=port, vlan_tag=vlan_tag, ip_addresses=[vlan_ip]) + + for address in ip_command.list_addresses(ip_addresses=vlan_ip): + self.assertEqual(subport_iface, address.device.name) + self.assertEqual(port_iface, address.device.parent) + break + else: + self.fail("Sub-port fixed IP not found on server.") + + def _configure_vlan_transparent_second_port( + self, port, ssh_client, vlan_tag, mask): + ip_command = ip.IPCommand(ssh_client=ssh_client) + port_device = ip_command.get_nic_name_by_mac(port['mac_address']) + subport_device = '{!s}.{!s}'.format(port_device, vlan_tag) + ip_address = port['fixed_ips'][0]['ip_address'] + ip_address_mask = '{!s}/{!s}'.format(ip_address, mask) + + # delete address from the untagged interface in case the address has + # been assigned to it + if ip_command.list_addresses(device=port_device, + ip_addresses=[ip_address]): + ip_command.delete_address(ip_address_mask, port_device) + if netaddr.valid_ipv6(ip_address): + LOG.debug('IPv6 routes are removed from the untagged interface' + ' to avoid them to collide with the tagged one') + # TODO(eolivare): implement list_routes for IPv6 in + # neutron-tempest-plugin + routesv6 = ip_command.execute( + '-6', 'route', 'show', 'dev', port_device).splitlines() + for route in routesv6: + destv6 = route.strip().split()[0] + # TODO(eolivare): implement delete_route for IPv6 in + # neutron-tempest-plugin + ip_command.execute( + '-6', 'route', 'del', destv6, 'dev', port_device) + # Create vlan interface (e.g.: eth1.100) + ip_command.add_link(link=port_device, name=subport_device, + link_type='vlan', segmentation_id=vlan_tag) + ip_command.set_link(device=subport_device, state='up') + ip_command.add_address(address=ip_address_mask, device=subport_device) + + for address in ip_command.list_addresses( + ip_addresses=[ip_address]): + self.assertEqual(subport_device, address.device.name) + self.assertEqual(port_device, address.device.parent) + break + else: + self.fail("Sub-port fixed IP not found on server.") + + def _create_ssh_client(self, ssh_ip): + if CONF.neutron_plugin_options.default_image_is_advanced: + username = CONF.validation.image_ssh_user + else: + username = CONF.neutron_plugin_options.advanced_image_ssh_user + return ssh.Client(host=ssh_ip, + username=username, + pkey=self.keypair['private_key']) + + def _check_remote_connectivity_vlans(self, + untagged=False, num_vlans=1, + should_succeed=True): + # All tests use two VMs, so this is a list of length 2 + vms = list(self.vms.values()) + + # If untagged is True, num_vlans is ignored + if untagged: + self.check_remote_connectivity( + vms[0]['ssh_client'], + vms[1]['port']['fixed_ips'][0]['ip_address'], + servers=[vms[0]['vm'], vms[1]['vm']], + should_succeed=should_succeed) + else: + for i in range(num_vlans): + self.check_remote_connectivity( + vms[0]['ssh_client'], + vms[1]['vlan_ipmasks'][i].split('/')[0], + servers=[vms[0]['vm'], vms[1]['vm']], + should_succeed=should_succeed) + + def _check_remote_connectivity_vlan_second_port(self): + # All tests use two VMs, so this is a list of length 2 + vms = list(self.vms.values()) + + dest_ip = vms[1]['second_port']['fixed_ips'][0]['ip_address'] + + # if second_port, num_vlans and untagged are ignored + self.check_remote_connectivity( + vms[0]['ssh_client'], + dest_ip, + servers=[vms[0]['vm'], vms[1]['vm']]) + + def _check_remote_connectivity_vlans_mtu(self): + # All tests use two VMs, so this is a list of length 2 + vms = list(self.vms.values()) + + # According to BZ1973445, MTU value from transparent VLAN interfaces + # should be reduced by 4 + new_mtu = self.network['mtu'] - 4 + for vm in vms: + ip_command = ip.IPCommand(ssh_client=vm['ssh_client']) + for address in ip_command.list_addresses( + ip_addresses=vm['vlan_ipmasks'][0].split('/')[0]): + device = address.device.name + break + + command = ['set', 'mtu', new_mtu, 'dev', device] + ip_command.execute('link', *command) + + for pkt_size in range(new_mtu - 20, new_mtu + 20, 2): + self.check_remote_connectivity( + vms[0]['ssh_client'], + vms[1]['vlan_ipmasks'][0].split('/')[0], + servers=[vms[0]['vm'], vms[1]['vm']], + mtu=pkt_size) + + def _create_multivlan_vlan_transparency_resources( + self, num_vlans=1, + port_security=True, use_allowed_address_pairs=False): + vlan_tags = [] + vlan_ipmask_templates = [] + for i in range(num_vlans): + while True: + vlan_tag = data_utils.rand_int_id(start=MIN_VLAN_ID, + end=MAX_VLAN_ID) + vlan_ipmask_template = '192.168.%d.{ip_last_byte}/24' % ( + vlan_tag % 256) + if (vlan_tag not in vlan_tags and + vlan_ipmask_template not in vlan_ipmask_templates): + vlan_tags.append(vlan_tag) + vlan_ipmask_templates.append(vlan_ipmask_template) + break + + # Two servers are created with one port each one + # VLANs are created on those ports + for i in range(2): + server_name = 'server-%s-%d' % (self.rand_name, i) + port_name = 'port-%s-%d' % (self.rand_name, i) + self.vms[server_name] = {} + vlan_ipmasks_per_vm = [] + for vlan_ipmask_template in vlan_ipmask_templates: + vlan_ipmasks_per_vm.append(vlan_ipmask_template.format( + ip_last_byte=(i + 1) * 10)) + self.vms[server_name]['vlan_ipmasks'] = vlan_ipmasks_per_vm + if use_allowed_address_pairs: + allowed_address_pairs = [] + for vlan_ipmask in vlan_ipmasks_per_vm: + allowed_address_pairs.append({'ip_address': vlan_ipmask}) + else: + allowed_address_pairs = None + + self.vms[server_name]['vm'] = self._create_port_and_server( + server_name=server_name, + port_name=port_name, + port_security=port_security, + allowed_address_pairs=allowed_address_pairs) + + if self.network['router:external']: + ssh_ip = self.vms[server_name]['port'][ + 'fixed_ips'][0]['ip_address'] + else: + self.vms[server_name]['floating_ip'] = self.create_floatingip( + port=self.vms[server_name]['port']) + ssh_ip = self.vms[server_name]['floating_ip'][ + 'floating_ip_address'] + + self.vms[server_name]['ssh_client'] = self._create_ssh_client( + ssh_ip=ssh_ip) + + self.check_connectivity( + host=ssh_ip, + ssh_user=self.vms[server_name]['ssh_client'].username, + ssh_key=self.vms[server_name]['ssh_client'].pkey) + for j in range(num_vlans): + self._configure_vlan_transparent( + port=self.vms[server_name]['port'], + ssh_client=self.vms[server_name]['ssh_client'], + vlan_tag=vlan_tags[j], + vlan_ip=self.vms[server_name]['vlan_ipmasks'][j]) + + if port_security: + # Ping from vm0 to vm1 via VLAN interfaces should fail because + # we haven't allowed ICMP + self._check_remote_connectivity_vlans(num_vlans=num_vlans, + should_succeed=False) + + # allow intra-security-group traffic + sg_rule = self.create_pingable_secgroup_rule( + self.security_group['id'], client=self.client) + self.addCleanup( + self.client.delete_security_group_rule, + sg_rule['id']) + + +class MultiVlanTransparencyTest(VlanTransparencyDowntreamBaseTest): + + num_vlans = 4 + + @decorators.idempotent_id('2e2909cc-8d1d-46ae-983e-ad87376736d4') + def test_multivlan_transparent_port_sec_disabled(self): + self._create_multivlan_vlan_transparency_resources( + num_vlans=self.num_vlans, + port_security=False, use_allowed_address_pairs=False) + self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) + self._check_remote_connectivity_vlans(untagged=True) + + @decorators.idempotent_id('853a00ce-ae5c-456c-be5e-54e2c28da891') + def test_multivlan_transparent_allowed_address_pairs(self): + self._create_multivlan_vlan_transparency_resources( + num_vlans=self.num_vlans, + port_security=True, use_allowed_address_pairs=True) + self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) + self._check_remote_connectivity_vlans(untagged=True) + + @decorators.idempotent_id('bde09eef-310c-4353-9d37-fe479e88ee01') + def test_vlan_transparent_different_vlans_drops(self): + vlan_tag_vm0 = data_utils.rand_int_id(start=MIN_VLAN_ID, + end=MAX_VLAN_ID - 1) + vlan_tag_vm1 = vlan_tag_vm0 + 1 + vlan_tags = [vlan_tag_vm0, vlan_tag_vm1] + vlan_ipmasks = ['192.168.0.10/24', '192.168.0.20/24'] + + # Two servers are created with one port each one + # VLANs are created on those ports + for i in range(2): + server_name = 'server-%s-%d' % (self.rand_name, i) + port_name = 'port-%s-%d' % (self.rand_name, i) + self.vms[server_name] = {} + self.vms[server_name]['vlan_ipmasks'] = [vlan_ipmasks[i]] + self.vms[server_name]['vm'] = self._create_port_and_server( + server_name=server_name, + port_name=port_name, + port_security=False) + self.vms[server_name]['floating_ip'] = self.create_floatingip( + port=self.vms[server_name]['port']) + ssh_ip = self.vms[server_name]['floating_ip'][ + 'floating_ip_address'] + self.vms[server_name]['ssh_client'] = self._create_ssh_client( + ssh_ip=ssh_ip) + + self.check_connectivity( + host=self.vms[server_name]['floating_ip'][ + 'floating_ip_address'], + ssh_user=self.vms[server_name]['ssh_client'].username, + ssh_key=self.vms[server_name]['ssh_client'].pkey) + self._configure_vlan_transparent( + port=self.vms[server_name]['port'], + ssh_client=self.vms[server_name]['ssh_client'], + vlan_tag=vlan_tags[i], + vlan_ip=vlan_ipmasks[i]) + + self._check_remote_connectivity_vlans(should_succeed=False) + self._check_remote_connectivity_vlans(untagged=True) + + +class MultiPortVlanTransparencyTest(VlanTransparencyDowntreamBaseTest): + def _test_vlan_transparent_connectivity_two_ports( + self, vlan_cidr, port_security, ip_version=4): + vlan_tag = data_utils.rand_int_id(start=MIN_VLAN_ID, + end=MAX_VLAN_ID - 1) + + second_rand_name = 'second_' + self.rand_name + second_network = self.create_network( + name=second_rand_name, + vlan_transparent=True, + port_security_enabled=port_security) + self.create_subnet( + network=second_network, name=second_rand_name, + cidr=vlan_cidr, enable_dhcp=False, ip_version=ip_version) + + # Two servers are created with two ports each one + # First port is for management (ssh access via FIP) + # Second port is used for VLAN Transparent testing + for i in range(2): + server_name = 'server-%s-%d' % (self.rand_name, i) + port_name = 'port-%s-%d' % (self.rand_name, i) + self.vms[server_name] = {} + + self.vms[server_name]['vm'] = self._create_port_and_server( + server_name=server_name, + port_name=port_name, + second_network=second_network) + self.vms[server_name]['floating_ip'] = self.create_floatingip( + port=self.vms[server_name]['port']) + ssh_ip = self.vms[server_name]['floating_ip'][ + 'floating_ip_address'] + self.vms[server_name]['ssh_client'] = self._create_ssh_client( + ssh_ip=ssh_ip) + + self.check_connectivity( + host=self.vms[server_name]['floating_ip'][ + 'floating_ip_address'], + ssh_user=self.vms[server_name]['ssh_client'].username, + ssh_key=self.vms[server_name]['ssh_client'].pkey) + self._configure_vlan_transparent_second_port( + port=self.vms[server_name]['second_port'], + ssh_client=self.vms[server_name]['ssh_client'], + vlan_tag=vlan_tag, + mask=vlan_cidr.split('/')[1]) + + if port_security: + # allow icmp traffic + sg_rule = self.create_pingable_secgroup_rule( + self.security_group['id'], client=self.client) + self.addCleanup( + self.client.delete_security_group_rule, + sg_rule['id']) + + self._check_remote_connectivity_vlan_second_port() + + @decorators.idempotent_id('d7653461-46ae-4d55-9ad1-85ef66084739') + def test_vlan_transparent_connectivity_two_ports(self): + self._test_vlan_transparent_connectivity_two_ports( + vlan_cidr='192.168.100.0/24', port_security=True) + + @decorators.idempotent_id('4953921d-f94b-45cb-b569-be0eb6720df2') + def test_vlan_transparency_ipv6_port_sec_disabled(self): + self._test_vlan_transparent_connectivity_two_ports( + vlan_cidr='2001:db8::/64', port_security=False, ip_version=6) + + @decorators.idempotent_id('07a3ad80-d6b0-4644-8a33-198dabae6b2f') + def test_vlan_transparency_ipv6_port_sec_enabled(self): + self._test_vlan_transparent_connectivity_two_ports( + vlan_cidr='2001:db9::/64', port_security=True, ip_version=6) + + +class MtuVlanTransparencyTest(VlanTransparencyDowntreamBaseTest): + @decorators.idempotent_id('8e51bc66-c19e-44d5-860a-7e42a1aea722') + def test_vlan_transparent_packet_length_greater_mtu(self): + self._create_multivlan_vlan_transparency_resources( + port_security=False, use_allowed_address_pairs=False) + self._check_remote_connectivity_vlans_mtu() + + +class ProviderNetworkVlanTransparencyTest(VlanTransparencyDowntreamBaseTest): + num_vlans = 1 + + @classmethod + def setup_clients(cls): + super(ProviderNetworkVlanTransparencyTest, cls).setup_clients() + cls.client = cls.os_adm.network_client + cls.keypairs_client = cls.os_adm.keypairs_client + cls.servers_client = cls.os_adm.servers_client + + @classmethod + def resource_setup(cls): + super(ProviderNetworkVlanTransparencyTest, cls).resource_setup() + # no tenant network is used during this test + # existing external network is used instead + cls.network = cls.os_admin.network_client.list_networks(**{ + 'admin_state_up': True, 'router:external': True})['networks'][-1] + if cls.network['vlan_transparent'] is not True: + raise cls.skipException( + 'This test are only executed when VLAN Transparency is ' + 'enabled on the external network.') + if cls.network['shared'] is not True: + skip_reason = "Provider network is not shared" + cls.skipTest(skip_reason) + + @decorators.idempotent_id('8e51bc66-c19e-44d5-860a-7e42a1aea722') + def test_vlan_transparent_provider_network_port_sec_disabled(self): + self._create_multivlan_vlan_transparency_resources( + num_vlans=self.num_vlans, + port_security=False, use_allowed_address_pairs=False) + self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) + self._check_remote_connectivity_vlans(untagged=True) + + @decorators.idempotent_id('ab472bfd-2a2b-4731-a018-c6adc87994d0') + def test_vlan_transparent_provider_network_port_sec_enabled(self): + self._create_multivlan_vlan_transparency_resources( + num_vlans=self.num_vlans, + port_security=True, use_allowed_address_pairs=True) + self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) + self._check_remote_connectivity_vlans(untagged=True)