neutron-tempest-plugin/neutron_tempest_plugin/scenario/test_mtu.py
Takashi Kajinami 435ff6ffc5 Bump hacking
hacking 3.0.x is too old. Bump it to the version currently used in
tempest repo.

Depends-on: https://review.opendev.org/c/openstack/tempest/+/906634
Change-Id: I01f9496e0fb66397916f8f8ce7543e3786f5d1dc
2024-11-22 10:37:03 +00:00

264 lines
12 KiB
Python

# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import provider_net
from oslo_log import log
from oslo_serialization import jsonutils
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
import testtools
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
class NetworkMtuBaseTest(base.BaseTempestTestCase):
@classmethod
def resource_setup(cls):
super(NetworkMtuBaseTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.router = cls.create_router_by_client()
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name='secgroup_mtu')
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.use_advanced_image = False
cls.username = CONF.validation.image_ssh_user
else:
cls.use_advanced_image = True
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
def create_pingable_vm(self, net, keypair, secgroup):
if self.use_advanced_image:
flavor_ref = CONF.neutron_plugin_options.advanced_image_flavor_ref
image_ref = CONF.neutron_plugin_options.advanced_image_ref
else:
flavor_ref = CONF.compute.flavor_ref
image_ref = CONF.compute.image_ref
server = self.create_server(
flavor_ref=flavor_ref,
image_ref=image_ref,
key_name=keypair['name'],
networks=[{'uuid': net['id']}],
security_groups=[{'name': secgroup[
'security_group']['name']}])
waiters.wait_for_server_status(
self.os_primary.servers_client, server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
port = self.client.list_ports(
network_id=net['id'], device_id=server['server']['id'])['ports'][0]
fip = self.create_floatingip(port=port)
return server, fip
def _get_network_params(self):
return jsonutils.loads(CONF.neutron_plugin_options.test_mtu_networks)
class NetworkMtuTest(NetworkMtuBaseTest):
credentials = ['primary', 'admin']
servers = []
networks = []
@classmethod
def skip_checks(cls):
super(NetworkMtuTest, cls).skip_checks()
if ("vxlan" not in
config.CONF.neutron_plugin_options.available_type_drivers or
"gre" not in
config.CONF.neutron_plugin_options.available_type_drivers):
raise cls.skipException("GRE or VXLAN type_driver is not enabled")
@classmethod
@utils.requires_ext(extension=provider_net.ALIAS, service="network")
def resource_setup(cls):
super(NetworkMtuTest, cls).resource_setup()
def _create_setup(self):
self.admin_client = self.os_admin.network_client
net_kwargs = {'tenant_id': self.client.project_id}
for net_type in ['vxlan', 'gre']:
net_kwargs['name'] = '-'.join([net_type, 'net'])
net_kwargs['provider:network_type'] = net_type
network = self.admin_client.create_network(**net_kwargs)[
'network']
self.networks.append(network)
self.addCleanup(self.admin_client.delete_network, network['id'])
subnet = self.create_subnet(network)
self.create_router_interface(self.router['id'], subnet['id'])
self.addCleanup(self.client.remove_router_interface_with_subnet_id,
self.router['id'], subnet['id'])
# check that MTUs are different for 2 networks
self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
self.networks.sort(key=lambda net: net['mtu'])
server1, fip1 = self.create_pingable_vm(self.networks[0],
self.keypair, self.secgroup)
server_ssh_client1 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
self.username, pkey=self.keypair['private_key'])
server2, fip2 = self.create_pingable_vm(self.networks[1],
self.keypair, self.secgroup)
server_ssh_client2 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
self.username, pkey=self.keypair['private_key'])
for fip in (fip1, fip2):
self.check_connectivity(
fip['floating_ip_address'],
self.username, self.keypair['private_key'],
servers=[server1, server2])
return server_ssh_client1, fip1, server_ssh_client2, fip2
@testtools.skipUnless(
(CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced),
"Advanced image is required to run this test.")
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a273d9d344')
def test_connectivity_min_max_mtu(self):
server_ssh_client, _, _, fip2 = self._create_setup()
# ping with min mtu of 2 networks succeeds even when
# fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[0]['mtu'], fragmentation=False)
# ping with the size above min mtu of 2 networks
# fails when fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[0]['mtu'] + 1, fragmentation=False)
# ping with max mtu of 2 networks succeeds when
# fragmentation is enabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[1]['mtu'])
# ping with max mtu of 2 networks fails when fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[1]['mtu'], fragmentation=False)
class NetworkWritableMtuTest(NetworkMtuBaseTest):
credentials = ['primary', 'admin']
servers = []
networks = []
@classmethod
@utils.requires_ext(extension="net-mtu-writable", service="network")
def resource_setup(cls):
super(NetworkWritableMtuTest, cls).resource_setup()
if cls.is_driver_ovn:
raise cls.skipException("East/west icmp fragmentation is not "
"supported with ML2/OVN")
def _create_setup(self):
self.admin_client = self.os_admin.network_client
for test_net in self._get_network_params():
test_net['tenant_id'] = self.client.project_id
test_net['name'] = data_utils.rand_name('net')
cidr = None if 'cidr' not in test_net else test_net.pop('cidr')
network = self.admin_client.create_network(**test_net)[
'network']
self.networks.append(network)
self.addCleanup(self.admin_client.delete_network, network['id'])
subnet = self.create_subnet(network, cidr=cidr)
self.create_router_interface(self.router['id'], subnet['id'])
self.addCleanup(self.client.remove_router_interface_with_subnet_id,
self.router['id'], subnet['id'])
# update network mtu
net_mtu = self.admin_client.show_network(
self.networks[0]['id'])['network']['mtu']
self.admin_client.update_network(self.networks[0]['id'],
mtu=(net_mtu - 1))
self.networks[0]['mtu'] = (
self.admin_client.show_network(
self.networks[0]['id'])['network']['mtu'])
# check that MTUs are different for 2 networks
self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
self.networks.sort(key=lambda net: net['mtu'])
server1, fip1 = self.create_pingable_vm(self.networks[0],
self.keypair, self.secgroup)
server_ssh_client1 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
self.username, pkey=self.keypair['private_key'])
server2, fip2 = self.create_pingable_vm(self.networks[1],
self.keypair, self.secgroup)
server_ssh_client2 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
self.username, pkey=self.keypair['private_key'])
for fip in (fip1, fip2):
self.check_connectivity(
fip['floating_ip_address'],
self.username, self.keypair['private_key'])
return server_ssh_client1, fip1, server_ssh_client2, fip2
@testtools.skipUnless(
(CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced),
"Advanced image is required to run this test.")
@decorators.idempotent_id('bc470200-d8f4-4f07-b294-1b4cbaaa35b9')
def test_connectivity_min_max_mtu(self):
server_ssh_client, _, _, fip2 = self._create_setup()
log_msg = ("Ping with %(mtu_size)s MTU of 2 networks. "
"Fragmentation is %(fragmentation_state)s. "
"Expected result: ping %(ping_status)s")
# ping with min mtu of 2 networks succeeds even when
# fragmentation is disabled
LOG.debug(log_msg, mtu_size='minimal',
fragmentation_state='disabled', ping_status='succeeded')
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[0]['mtu'], fragmentation=False)
# ping with the size above min mtu of 2 networks
# fails when fragmentation is disabled
LOG.debug(log_msg, mtu_size='size above minimal',
fragmentation_state='disabled', ping_status='failed')
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[0]['mtu'] + 2, fragmentation=False)
# ping with max mtu of 2 networks succeeds when
# fragmentation is enabled
LOG.debug(log_msg, mtu_size='maximal',
fragmentation_state='enabled', ping_status='succeeded')
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[1]['mtu'])
# ping with max mtu of 2 networks fails when fragmentation is disabled
LOG.debug(log_msg, mtu_size='maximal',
fragmentation_state='disabled', ping_status='failed')
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[1]['mtu'], fragmentation=False)