Add test_sriov

Add existing tests to increase plugin functionality
Also added a dependency class from base.py

Change-Id: I357bbf5ce6b38e70423e75ce0fbcfd112db387a6
This commit is contained in:
jskunda 2023-11-08 16:43:52 +01:00 committed by Roman Safronov
parent e3ff03e580
commit a940232333
3 changed files with 458 additions and 1 deletions

View File

@ -69,5 +69,13 @@ WhiteboxNeutronPluginOptions = [
cfg.BoolOpt('bgp', cfg.BoolOpt('bgp',
default=False, default=False,
help='Specifies whether the OSP setup under test has been ' help='Specifies whether the OSP setup under test has been '
'configured with BGP functionality or not') 'configured with BGP functionality or not'),
cfg.IntOpt('sriov_pfs_per_host',
default=1,
help='Number of available PF (Physical Function) ports per'
'compute node on the environment under test'),
cfg.IntOpt('sriov_vfs_per_pf',
default=5,
help='Number of available VF (Virtual Function) ports per'
'PF interface on the environment under test')
] ]

View File

@ -24,7 +24,9 @@ from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin.scenario import base from neutron_tempest_plugin.scenario import base
from oslo_log import log from oslo_log import log
from tempest.common import utils from tempest.common import utils
from tempest.common import waiters
from tempest import config from tempest import config
from tempest.lib.common import fixed_network
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture
@ -511,3 +513,193 @@ def build_user_data(net_vlan):
user_data = base64.b64encode(( user_data = base64.b64encode((
user_data_cmd % (if_full_name, if_full_name)).encode("utf-8")) user_data_cmd % (if_full_name, if_full_name)).encode("utf-8"))
return user_data return user_data
class ProviderBaseTest(BaseTempestWhiteboxTestCase):
"""Base class for tests using provider networks, such as provider routed
networks or sriov scenarios
Admin user is needed to create ports on the existing provisioning network
"""
servers = []
keypairs_client = None
secgroup_client = None
servers_client = None
extra_dhcp_opts = None
@classmethod
def create_loginable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound IPv4 and IPv6 ssh
"""
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='tcp',
direction='ingress',
ip_version=6,
port_range_min=22,
port_range_max=22)
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='tcp',
direction='ingress',
port_range_min=22,
port_range_max=22)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound IPv4 and IPv6 ping
"""
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='icmp',
direction='ingress')
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='icmpv6',
ip_version=6,
direction='ingress')
@classmethod
def resource_setup(cls):
super(ProviderBaseTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.keypair = cls.create_keypair(client=cls.keypairs_client)
secgroup_name = data_utils.rand_name('secgroup')
if cls.secgroup_client:
cls.secgroup = cls.secgroup_client.create_security_group(
name=secgroup_name)['security_group']
else:
cls.secgroup = cls.client.create_security_group(
name=secgroup_name)['security_group']
cls.security_groups.append(cls.secgroup)
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['id'],
client=cls.client)
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['id'],
client=cls.client)
@classmethod
def resource_cleanup(cls):
client = cls.servers_client or cls.os_primary.servers_client
for server in cls.servers:
cls._try_delete_resource(client.delete_server,
server['id'])
waiters.wait_for_server_termination(client,
server['id'])
super(ProviderBaseTest, cls).resource_cleanup()
@classmethod
def create_network_with_port(cls, cidr, gateway=True, **kwargs):
cls.network = cls.create_network()
if not gateway:
# some subnets need to be created without a default gateway
# e.g.: when a server is created with two ports, one of them should
# not include a default gateway
cls.subnet = cls.create_subnet(
cls.network, cidr=cidr, gateway=None)
else:
cls.subnet = cls.create_subnet(cls.network, cidr=cidr)
cls.port_id = cls.create_port(network=cls.network, **kwargs)['id']
return {'port': cls.port_id}
def build_create_port_body_and_secgroups(self, port_type, secgroup):
"""create_port_body and security_groups are needed to create ports,
whatever their types are (normal, macvtap, direct or direct-physical)
"""
create_port_body = {}
security_groups = []
if port_type not in ('direct', 'direct-physical', 'macvtap'):
create_port_body['security_groups'] = [secgroup['id']]
security_groups = [{'name': secgroup['name']}]
create_port_body['binding:vnic_type'] = port_type
create_port_body['name'] = "_".join(['port', port_type])
if self.extra_dhcp_opts is not None:
create_port_body['extra_dhcp_opts'] = self.extra_dhcp_opts
return (create_port_body, security_groups)
def _create_server(self, **kwargs):
kwargs['client'] = \
self.servers_client or self.os_primary.servers_client
kwargs['flavor_ref'] = self.flavor_ref
kwargs['image_ref'] = self.image_ref
kwargs['key_name'] = self.keypair['name']
server = self.create_server(**kwargs)
self.servers.append(server['server'])
return server['server']
def _create_network_port(self, port_type,
reuse_port=False,
use_provider_net=True,
subnet_id=None,
reused_tenant_net=None,
cidr=None,
gateway=True):
create_port_body, security_groups = \
self.build_create_port_body_and_secgroups(
port_type, self.secgroup)
if subnet_id:
subnet_name = 'segment' + subnet_id
if use_provider_net:
self.network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, self.client)
if not reuse_port:
if not subnet_id:
self.create_port(
network=self.network,
fixed_ips=[{'subnet_id': self.network['subnets'][-1]}],
**create_port_body)
else:
subnets = self.client.list_subnets(name=subnet_name)
subnet_id = {'subnet_id': subnets['subnets'][-1]['id']}
self.create_port(
network=self.network,
fixed_ips=[subnet_id],
**create_port_body)
port = {'port': self.ports[-1]['id']}
elif not reused_tenant_net:
port = self.create_network_with_port(
cidr=cidr, gateway=gateway, **create_port_body)
else:
self.network = reused_tenant_net
port = {'port': self.create_port(network=self.network,
**create_port_body)['id']}
if not subnet_id:
net_id = self.network['id']
nc = self.admin_manager.network_client
net_vlan = nc.show_network(net_id)['network'][
'provider:segmentation_id']
else:
segments = self.client.list_segments(name=subnet_name)
net_vlan = segments['segments'][-1]['segmentation_id']
user_data = ""
config_drive = False
if port_type == 'direct-physical':
user_data = build_user_data(net_vlan)
config_drive = True
return (security_groups, port, user_data, config_drive)
def check_port_status(self, port_type,
port_index=-1, server_index=-1):
# by default, use last created port (-1) and last created server (-1)
port_id = self.ports[port_index]['id']
server_id = self.servers[server_index]['id']
waiters.wait_for_interface_status(self.os_adm.interfaces_client,
server_id,
port_id,
constants.PORT_STATUS_ACTIVE)
port_details = self.client.show_port(port_id)['port']
network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, self.client)
self.assertEqual(port_details['network_id'], network['id'])
self.assertEqual(port_details['admin_state_up'], True)
self.assertEqual(port_details['binding:vnic_type'], port_type)
return port_details

View File

@ -0,0 +1,257 @@
# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_tempest_plugin import config
from tempest.common.utils.linux import remote_client
from tempest import exceptions
from tempest.lib import decorators
from whitebox_neutron_tempest_plugin.tests.scenario import base
CONF = config.CONF
WB_CONF = CONF.whitebox_neutron_plugin_options
class NetworkSriovBaseTest(base.ProviderBaseTest):
@classmethod
def setup_clients(cls):
super(NetworkSriovBaseTest, cls).setup_clients()
cls.secgroup_client = cls.manager.network_client
@classmethod
def skip_checks(cls):
super(NetworkSriovBaseTest, cls).skip_checks()
if not (CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced):
raise cls.skipException(
'Advanced image is required to run these tests.')
@classmethod
def resource_setup(cls):
super(NetworkSriovBaseTest, cls).resource_setup()
if not cls.has_sriov_support:
raise cls.skipException("Skipped because SRIOV is not supported")
cls.router = cls.create_router_by_client()
# set image and flavor in case default image is not advanced
if not CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = \
CONF.neutron_plugin_options.advanced_image_flavor_ref
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
def _create_network_port(self, port_type, cidr, reused_net=None):
return super(NetworkSriovBaseTest, self)._create_network_port(
port_type, reuse_port=False, use_provider_net=False,
reused_tenant_net=reused_net, cidr=cidr)
def _test_create_instance_with_network_port(self, port_type, cidr=None):
security_groups, networks, user_data, config_drive = \
self._create_network_port(port_type, cidr)
self.create_router_interface(self.router['id'], self.subnet['id'])
return self._create_server_with_fip(
security_groups, [networks],
user_data=user_data, config_drive=config_drive,
vm_name='vm_' + port_type)
def _test_create_instance_with_two_tenant_networks_two_ports(
self, port_types, cidrs=[None, None], reused_nets=[None, None]):
_, networks, user_data, config_drive = \
self._create_network_port(port_types[0], cidrs[0], reused_nets[0])
self.create_router_interface(self.router['id'], self.subnet['id'])
# create secondary network and port
_, networks_alt, user_data_alt, config_drive_alt = \
self._create_network_port(port_types[1], cidrs[1], reused_nets[1])
return self._create_server_with_several_networks(
[networks, networks_alt],
user_data=user_data, config_drive=config_drive,
vm_name="vm-" + self._testMethodName)
def _create_server_with_fip(
self, security_groups, networks, user_data, config_drive, vm_name):
server = self._create_server(
security_groups=security_groups, networks=networks,
user_data=user_data, config_drive=config_drive,
name=vm_name)
port = self.client.list_ports(
network_id=self.network['id'],
device_id=server['id'])['ports'][0]
self.fip = self.create_and_associate_floatingip(port['id'])
return server
def _create_server_with_several_networks(
self, networks, user_data, config_drive, vm_name):
server = self._create_server(
networks=networks,
user_data=user_data, config_drive=config_drive,
name=vm_name)
port0_id = networks[0]['port']
self.fips.append(self.create_and_associate_floatingip(port0_id))
return server
def remove_server_and_network(self, server):
self.manager.servers_client.delete_server(server['id'])
self.client.delete_port(self.port_id)
self.client.delete_network(self.network['id'])
def _test_create_instances_with_ports(self, port1, port2):
cidr = netaddr.IPNetwork('10.100.0.0/16')
server1 = self._test_create_instance_with_network_port(
port1, cidr=cidr)
server_ssh_client = remote_client.RemoteClient(
self.floating_ips[0]['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'], server=server1)
fip1 = self.fip
cidr = netaddr.IPNetwork('10.200.0.0/16')
self._test_create_instance_with_network_port(
port2, cidr=cidr)
fip2 = self.fip
for fip in (fip1, fip2):
self.check_connectivity(fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
server_ssh_client.ping_host(self.fip['fixed_ip_address'])
class NetworkSriov01AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('71a73ba9-646f-4d79-b5c9-7832c814d09d')
def test_create_instances_with_normal_normal_ports(self):
self._test_create_instances_with_ports('normal', 'normal')
class NetworkSriov02AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('e90efadc-54af-4a17-8fc5-22f15629b297')
def test_create_instances_with_normal_vf_ports(self):
self._test_create_instances_with_ports('direct', 'normal')
class NetworkSriov03AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('1757205f-a479-49df-a0f6-411ee54f400c')
def test_create_instances_with_vf_vf_ports(self):
self._test_create_instances_with_ports('direct', 'direct')
class NetworkSriov04Test(NetworkSriovBaseTest):
@decorators.idempotent_id('26dba72c-1754-4fca-8c70-1feda1915327')
def test_create_instance_with_normal_port(self):
self._test_create_instance_with_network_port('normal')
self.check_connectivity(self.fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
class NetworkSriov05Test(NetworkSriovBaseTest):
@decorators.idempotent_id('39a69007-845f-43aa-adae-4efc808674f4')
def test_create_instance_with_direct_port(self):
self._test_create_instance_with_network_port('direct')
self.check_connectivity(self.fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
class NetworkSriov06Test(NetworkSriovBaseTest):
@decorators.idempotent_id('010c1bbb-7b6a-4b55-9eac-5147b69e42f8')
def test_create_instance_with_direct_phy_port(self):
self._test_create_instance_with_network_port('direct-physical')
self.check_connectivity(self.fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
class NetworkSriov07AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('e72b0b01-50b9-4d80-b8c4-9f59447122af')
def test_create_instances_with_vf_pf_ports(self):
self._test_create_instances_with_ports('direct-physical', 'direct')
class NetworkSriov08AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('f6c9a9f1-9e20-4e12-94b8-cf1f45437a40')
def test_create_instances_with_normal_pf_ports(self):
self._test_create_instances_with_ports('direct-physical', 'normal')
class NetworkSriov09AdvancedTest(NetworkSriovBaseTest):
@decorators.idempotent_id('31dcaaa3-35d0-4188-8a78-5400b397d7fc')
def test_create_instances_with_pf_pf_ports(self):
self._test_create_instances_with_ports('direct-physical',
'direct-physical')
class NetworkSriov10PFVFNegativeTest(NetworkSriovBaseTest):
"""Assuming we have host with one SRIOV port and it's already occupied,
then creation of VM with VF or PF port should fail
We create a VM with PF port in order to occupy SRIOV port.
Creating VM with PF or VF should fail as there is no available port
"""
@decorators.idempotent_id('f2707194-a91d-4e47-8a14-934efa5a6422')
def test_try_create_instance_with_pf_port(self):
cidr_template = '10.{second_byte}.0.0/16'
for i in range(CONF.compute.min_compute_nodes *
WB_CONF.sriov_pfs_per_host):
cidr_value = cidr_template.format(second_byte=10 + i)
self._test_create_instance_with_network_port(
port_type='direct-physical',
cidr=netaddr.IPNetwork(cidr_value))
self.assertRaises(
exceptions.BuildErrorException,
self._test_create_instance_with_network_port,
port_type='direct-physical',
cidr=netaddr.IPNetwork('11.11.1.0/24'))
self.assertRaises(
exceptions.BuildErrorException,
self._test_create_instance_with_network_port,
port_type='direct', cidr=netaddr.IPNetwork('21.21.2.0/24'))
class NetworkSriov11Test(NetworkSriovBaseTest):
@decorators.idempotent_id('7f4dae87-e04a-4e91-80ff-b0baf0ba9c95')
def test_create_instance_with_macvtap_port(self):
self._test_create_instance_with_network_port('macvtap')
self.check_connectivity(self.fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
class NetworkSriov12MultiplePortsTest(NetworkSriovBaseTest):
@decorators.idempotent_id('250c2b7b-c4eb-4c3e-813a-6189581ba62e')
def test_vms_with_two_ports_different_networks(self):
self.fips = []
port_types = ['normal', 'direct']
cidrs = [netaddr.IPNetwork('10.100.0.0/16'),
netaddr.IPNetwork('10.101.0.0/16')]
self._test_create_instance_with_two_tenant_networks_two_ports(
port_types=port_types, cidrs=cidrs)
reused_nets = self.networks[0:2]
self._test_create_instance_with_two_tenant_networks_two_ports(
port_types=port_types, cidrs=cidrs, reused_nets=reused_nets)
# check connectivity with FIPs
for fip in self.fips:
self.check_connectivity(fip['floating_ip_address'],
self.username,
self.keypair['private_key'])
# check connectivity from both VMs to all 4 ports
for fip in self.fips:
server_ssh_client = remote_client.RemoteClient(
fip['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
for port in self.ports:
fixed_ip = port['fixed_ips'][0]['ip_address']
server_ssh_client.ping_host(fixed_ip)