# Copyright 2024 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import time from neutron_lib import constants as lib_constants from neutron_tempest_plugin.common import ssh from neutron_tempest_plugin import config from oslo_log import log from tempest.lib import decorators from whitebox_neutron_tempest_plugin.common import constants from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture from whitebox_neutron_tempest_plugin.common import utils from whitebox_neutron_tempest_plugin.tests.scenario import base CONF = config.CONF WB_CONF = config.CONF.whitebox_neutron_plugin_options LOG = log.getLogger(__name__) class GatewayMtuTest(base.TrafficFlowTest, base.BaseTempestTestCaseOvn): credentials = ['primary', 'admin'] @classmethod def skip_checks(cls): super(GatewayMtuTest, cls).skip_checks() if not CONF.network.public_network_id: raise cls.skipException( "The public_network_id option must be specified. ") @classmethod def resource_setup(cls): super(GatewayMtuTest, cls).resource_setup() if CONF.neutron_plugin_options.default_image_is_advanced: cls.flavor_ref = CONF.compute.flavor_ref cls.image_ref = CONF.compute.image_ref cls.username = CONF.validation.image_ssh_user else: cls.flavor_ref = \ CONF.neutron_plugin_options.advanced_image_flavor_ref cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user if (not cls.flavor_ref) or (not cls.image_ref): raise cls.skipException( 'No advanced image/flavor available for these tests') cls.keypair = cls.create_keypair() cls.secgroup = cls.create_security_group() cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id']) cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id']) for protocol in ['udp', 'tcp']: cls.os_primary.network_client.create_security_group_rule( security_group_id=cls.secgroup['id'], protocol=protocol, direction=lib_constants.INGRESS_DIRECTION, port_range_min=constants.NCAT_PORT, port_range_max=constants.NCAT_PORT) def _verify_capture_for_icmp_unreacheable( self, protocol, payload_size, src_client, dst_client, must_contain=False): """Check that ICMP 'fragmentation needed' message is sent when needed :param protocol(str): Allowed values 'udp', 'tcp'. :param payload_size(int): Size in bytes. :param src_client(Client): SSH client to the origin of the traffic. :param dst_client(Client): SSH client to the destination host. :param must_contain(bool): Specifies if we expect to get ICMP 'fragmentation needed' message in the traffic capture. :return: MTU size of next hop or None if the capture must not contain ICMP message. """ udp = '-u' if protocol == 'udp' else '' utils.create_payload_file(src_client, payload_size) interface = utils.get_default_interface(src_client) filters = 'icmp[icmptype] == 3 and icmp[icmpcode] == 4' src_server_capture = capture.TcpdumpCapture( src_client, interface, filters) self.useFixture(src_server_capture) time.sleep(10) if not must_contain: output_file = utils.run_ncat_server(dst_client, udp) utils.run_ncat_client(src_client, dst_client.host, udp, payload_size) time.sleep(5) src_server_capture.stop() if must_contain: self.assertFalse( src_server_capture.is_empty(), "No ICMP 'fragmentation needed' message") return src_server_capture.get_next_hop_mtu() else: self.assertTrue( src_server_capture.is_empty(), "Unexpected ICMP 'fragmentation needed' message found") output_file_size = int(dst_client.exec_command( 'stat -c "%s" ' + output_file).rstrip()) self.assertEqual( output_file_size, payload_size, "Delivered data has size {} bytes while expected {}.".format( output_file_size, payload_size)) def _validate_environment_config(self): msg = "ovn_emit_need_to_frag is not set to 'true' in config file" for node in self.nodes: if self.is_devstack: if node['is_controller'] is False: continue else: self.check_service_setting( node, service='', config_files=[WB_CONF.ml2_plugin_config], section='ovn', param='ovn_emit_need_to_frag', msg=msg) if not self.is_devstack: config_files = self.get_configs_of_service() self.check_service_setting( {'client': self.proxy_host_client}, config_files=config_files, section='ovn', param='ovn_emit_need_to_frag', msg=msg) def setup(self): self._validate_environment_config() self.network = self.create_network() subnet = self.create_subnet(self.network) self.router = self.create_router_by_client() self.create_router_interface(self.router['id'], subnet['id']) self.ensure_external_network_is_shared() self.external_mtu = self.external_network['mtu'] self.internal_mtu = self.network['mtu'] if int(self.external_mtu) >= int(self.internal_mtu): self.original_mtu = self.external_mtu new_mtu = self.internal_mtu - 30 self.addCleanup(self._restore_external_network_mtu) self.external_mtu = self.os_admin.network_client.update_network( CONF.network.public_network_id, mtu=new_mtu)['network']['mtu'] self._validate_ovn_db_mtu_settings() ext_vm = self._create_server( network=self.external_network, create_floating_ip=False) ext_vm_ssh_client = ssh.Client( ext_vm['port']['fixed_ips'][0]['ip_address'], self.username, pkey=self.keypair['private_key']) self.local_client = ext_vm_ssh_client # We'll use the default self.server as a proxy for no-FIP scenario self.server = self._create_server() server_ssh_client = ssh.Client( self.server['fip']['floating_ip_address'], self.username, pkey=self.keypair['private_key']) self.test_server = self._create_server(create_floating_ip=False) test_server_ip = self.test_server['port']['fixed_ips'][0]['ip_address'] self.test_server_client = ssh.Client( test_server_ip, self.username, pkey=self.keypair['private_key'], proxy_client=server_ssh_client) def _restore_external_network_mtu(self): self.os_admin.network_client.update_network( CONF.network.public_network_id, mtu=self.original_mtu) def validate_next_hop_mtu(self, starting_payload_size): next_hop_mtu = self._verify_capture_for_icmp_unreacheable( protocol='udp', must_contain=True, src_client=self.test_server_client, dst_client=self.local_client, payload_size=starting_payload_size) payload_size = (int(next_hop_mtu) - constants.IP_HEADER_LENGTH - constants.UDP_HEADER_LENGTH) self.assertLess( payload_size, starting_payload_size, "Received next hop MTU %s is bigger than expected" % next_hop_mtu) self._verify_capture_for_icmp_unreacheable( protocol='udp', must_contain=False, src_client=self.test_server_client, dst_client=self.local_client, payload_size=payload_size) def check_pmtud_basic(self): self.setup() maximal_payload_for_internal_net = (self.internal_mtu - constants.IP_HEADER_LENGTH - constants.UDP_HEADER_LENGTH) self.validate_next_hop_mtu( starting_payload_size=maximal_payload_for_internal_net) self.create_floatingip(port=self.test_server['port']) self.validate_next_hop_mtu( starting_payload_size=maximal_payload_for_internal_net) def _validate_ovn_db_mtu_settings(self): router_port = self.os_admin.network_client.list_ports( device_id=self.router['id'], device_owner=lib_constants.DEVICE_OWNER_ROUTER_GW)['ports'][0] ovn_gateway_mtu = self.get_router_port_gateway_mtu(router_port['id']) self.assertEqual( self.external_mtu, ovn_gateway_mtu, "MTU is not set properly in OVN DB") class GatewayMtuTestUdp(GatewayMtuTest): credentials = ['primary', 'admin'] @decorators.idempotent_id('7f7470ff-31b4-4ad8-bfa7-82dcca174744') def test_south_to_north_pmtud_udp_basic(self): """Verify that south->north path MTU discovery is working for UDP Setup: Environment with public network MTU smaller than tenant networks. Scenario: 1. Create router connected to pre-existed external network. It is expected that the external network MTU is lower than internal network MTU (will be created in the next step) 2. Create internal network, connect it to the router and spawn 2 instances, one will be used as a proxy in order to be able to access the test server even during no-FIP scenario. 3. Launch traffic capture on the test server and send a UDP packet matching the internal network MTU to the host connected to the external network (local address from external network is used). 4. Validate that the router returned ICMP 'fragmentation needed' message and send a new UDP packet matching MTU in this message. 5. Validate that the smaller file was delivered to the external address and the router did not send any ICMP message. 6. Add a FIP to the test server and repeat steps 3-5. """ self.check_pmtud_basic() @decorators.idempotent_id('cab28be4-7d11-485b-b99e-ea511fe2a675') def test_south_to_north_pmtud_udp_change_mtu(self): """Verify that south->north path MTU discovery is working for UDP Setup: Environment with public network MTU smaller than tenant networks. Scenario: 1. Create router connected to pre-existed external network. It is expected that the external network MTU is lower than internal network MTU (will be created in the next step) 2. Create internal network, connect it to the router and spawn 2 instances, one will be used as a proxy in order to be able to access the test server even during no-FIP scenario. 3. Verify that OVN DB MTU entries are configured correctly. 4. Change MTU of the external network and verify that OVN DB MTU settings upated accordingly. 5. Verify that south->north path MTU discovery is working for UDP using new the MTU (see details in test_south_to_north_pmtud_udp description, steps 3-6) 6. After the test the original MTU of the external network is restored. """ self.setup() new_mtu = int(self.external_mtu) - 20 self.external_mtu = self.os_admin.network_client.update_network( CONF.network.public_network_id, mtu=new_mtu)['network']['mtu'] self._validate_ovn_db_mtu_settings() minimal_payload_causing_fragmentation = ( self.external_mtu - constants.ETHERNET_HEADER_LENGTH + 1) self.validate_next_hop_mtu( starting_payload_size=minimal_payload_causing_fragmentation) self.create_floatingip(port=self.test_server['port']) self.validate_next_hop_mtu( starting_payload_size=minimal_payload_causing_fragmentation) class GatewayMtuTestIcmp(GatewayMtuTest): def make_sure_routing_cache_is_clear(self, ssh_client, dst): utils.flush_routing_cache(ssh_client) result = ssh_client.exec_command("ip route get %s" % dst) if 'mtu' in result: raise self.skipException( 'Routing cache already contains mtu records') def validate_next_hop_mtu(self, starting_payload_size): next_hop_mtu = self._verify_capture_for_icmp_unreacheable( must_contain=True, src_client=self.test_server_client, dst_client=self.local_client, payload_size=starting_payload_size) payload_size = (int(next_hop_mtu) - constants.IP_HEADER_LENGTH - constants.UDP_HEADER_LENGTH) self.assertLess( payload_size, starting_payload_size, "Received next hop MTU %s is bigger than expected" % next_hop_mtu) self._verify_capture_for_icmp_unreacheable( must_contain=False, src_client=self.test_server_client, dst_client=self.local_client, payload_size=starting_payload_size) def _verify_capture_for_icmp_unreacheable( self, payload_size, src_client, dst_client, must_contain=False): """Check that ICMP 'fragmentation needed' message is sent when needed :param payload_size(int): Size in bytes. :param src_client(Client): SSH client to the origin of the traffic. :param dst_client(Client): SSH client to the destination :param must_contain(bool): Specifies if we expect to get ICMP 'fragmentation needed' message in the traffic capture. :return: MTU size of next hop or None if the capture must not contain ICMP message. """ interface = utils.get_default_interface(src_client) filters = 'icmp[icmptype] == 3 and icmp[icmpcode] == 4' should_succeed = not must_contain if must_contain: self.make_sure_routing_cache_is_clear(src_client, dst_client.host) time.sleep(2) ping_count = 1 else: ping_count = 10 src_server_capture = capture.TcpdumpCapture( src_client, interface, filters) self.useFixture(src_server_capture) time.sleep(10) self.check_remote_connectivity( source=src_client, dest=dst_client.host, mtu=payload_size, should_succeed=should_succeed, ping_count=ping_count, timeout=20, forbid_packet_loss=True) time.sleep(5) src_server_capture.stop() if must_contain: self.assertFalse( src_server_capture.is_empty(), "No ICMP 'fragmentation needed' message") return src_server_capture.get_next_hop_mtu() else: self.assertTrue( src_server_capture.is_empty(), "Unexpected ICMP 'fragmentation needed' message found") return None @decorators.idempotent_id('10d73cf0-7506-4899-864c-cebe43099be3') def test_northbound_pmtud_icmp(self): self.check_pmtud_basic()