Control interface on tenant VMs for all QoS bandwidth tests

The tenant VM single interface overwhelmed while bandwidth limit
tested, therefore ssh connections aren't passing to same interface.

Issue noticed in CI after additional QoS gateway bandwidth limit
tests were added [1], possibly ovn policer different method of
limit enforcement is related.

Fix adds a few things to address such issues:
1) Separate resources for control (network/subnet/router/ports/fips/sg).
   Each tenant VM (iperf server/client) has control and test interfaces.
   All control resources unaffected by QoS bandwidth limiting methods:
   port, security group, gateway, floating ips, etc.
   The control ports accessed through dedicated separate control FIPs.

2) Symmetric and strict routing for control ip on tenant VMs.

3) Added iperf3 options to enforce correct ip/interface use with
   both client/server.

4) Adjustments in various tests and commonly used helper methods,
   such as test with 3 ports uses ignore argument, resource creation
   order changes to allow default route to be on test port, and more.

[1]
954607: Test QoS max bandwidth limit for router gateways, along with other types
https://review.opendev.org/c/x/whitebox-neutron-tempest-plugin/+/954607

Resolves: OSPRH-18910
Related: OSPCIX-1011
Signed-off-by: Maor Blaustein <mblue@redhat.com>
Change-Id: I266252a62ff6eca441df44f6321fd99452d8394f
This commit is contained in:
Maor Blaustein
2025-08-19 20:41:24 +03:00
parent 242eb566e7
commit 480c7b06d2
2 changed files with 211 additions and 55 deletions

View File

@@ -15,6 +15,7 @@
import base64
import collections
from functools import partial
import ipaddress
from multiprocessing import Process
import os
import random
@@ -638,12 +639,16 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
def _create_server(
self, create_floating_ip=True, exclude_hosts=None,
network=None, use_admin_client=False, **kwargs):
network = network or self.network
network=None, use_admin_client=False, port=None, **kwargs):
kwargs.setdefault('name', data_utils.rand_name('server-test'))
kwargs['flavorRef'] = self.flavor_ref
kwargs['imageRef'] = self.image_ref
kwargs['networks'] = [{'uuid': network['id']}]
if not network and not port:
networks = [{'uuid': self.network['id']}]
else:
networks = [
{'port': port['id']}] if port else [{'uuid': network['id']}]
kwargs['networks'] = networks
if not kwargs.get('key_name'):
kwargs['key_name'] = self.keypair['name']
if not kwargs.get('security_groups'):
@@ -687,19 +692,23 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
self.fail("Failed to spawn a server on a host other than in "
"this list: '{}'. Can not proceed.".format(
' '.join(exclude_hosts)))
port = network_client.list_ports(
network_id=network['id'],
device_id=server['id'])['ports'][0]
if port:
_port = network_client.show_port(port['id'])['port']
else:
_port = network_client.list_ports(
network_id=networks[0]['uuid'],
device_id=server['id']
)['ports'][0]
if create_floating_ip:
fip = network_client.create_floatingip(
floating_network_id=CONF.network.public_network_id,
port_id=port['id'])['floatingip']
port_id=_port['id'])['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
network_client.delete_floatingip,
fip['id'])
else:
fip = None
return {'port': port, 'fip': fip, 'server': server}
return {'port': _port, 'fip': fip, 'server': server}
def _create_server_for_topology(
self, network_id=None, port_type=None,
@@ -1098,6 +1107,59 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
param='enable_distributed_floating_ip',
skip_if_fails=skip_if_fails)
@classmethod
def tenant_sym_path_for_port(
cls, ssh_client, port, rtr_ip=None, ip_ver=None):
"""Enforce symmetric path for port/interface related ip, on tenant vm
(Should work for both advanced/cirros images).
Parameters:
ssh_client (ssh.Client):ssh client to tenant vm.
port (dict):port with first IP used for symmetric path.
rtr_ip (str):optionally IP address to enforce return next hop.
ip_ver (int):optionally filter by IP verion (values 6 or 4).
"""
# use local ip from given openstack port, optionally ignore ipv4/6
if ip_ver:
ip_str = 'v' + str(ip_ver)
addr_validator = netaddr.valid_ipv4 if ip_ver == 4 else \
netaddr.valid_ipv6
for fixed_ip in port['fixed_ips']:
if addr_validator(fixed_ip['ip_address']):
ctl_ip = fixed_ip['ip_address']
else:
ip_str = ''
ctl_ip = port['fixed_ips'][0]['ip_address']
if len(port['fixed_ips']) > 1:
LOG.warning(
'defaulted to port first fixed ip, many fixed ips found')
assert netaddr.valid_ipv4(ctl_ip) or netaddr.valid_ipv6(ctl_ip), \
f"port {port['id']} didn't have legit ip{ip_str} address"
# subnet cidr related to port
subnet = cls.os_admin.network_client.show_subnet(
port['fixed_ips'][0]['subnet_id'])['subnet']
cidr = subnet['cidr'].partition('/')[-1]
# enforce argument given next hop, or default to ctl_ip related router
if not rtr_ip:
rtr_ip = subnet['gateway_ip']
ipaddress.ip_address(rtr_ip)
# interface name according to tenant os
iface = ssh_client.exec_command(
f"sudo ip -br a | grep {ctl_ip} | cut -d' ' -f1").strip()
# sets symmetric route
cmd = (
"sysctl -w net.ipv4.conf.all.rp_filter=1 && "
"sysctl -w net.ipv4.conf.default.rp_filter=1 && "
f"sysctl -w net.ipv4.conf.{iface}.rp_filter=1 && "
r"{ grep '100 ctl' /etc/iproute2/rt_tables || "
r"echo '100 ctl' >> /etc/iproute2/rt_tables; } && "
f"ip route replace default via {rtr_ip} dev {iface} table ctl && "
f"ip rule add from {ctl_ip}/{cidr} lookup ctl priority 1000 && "
"echo 'sym path done'")
LOG.debug(
'Executing script for symmetric path:\n%s', cmd)
ssh_client.execute_script(cmd, become_root=True)
class BaseTempestTestCaseAdvanced(BaseTempestWhiteboxTestCase):
"""Base class skips test suites unless advanced image is available,

View File

@@ -164,6 +164,54 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
cls.create_secgroup_rules(
rulesets, cls.secgroup['security_group']['id'])
# Use separate resources not affected by QoS bw limits, in order to
# to control tenant VMs such as ones used for client/server iperf
ctl_ptn = data_utils.rand_name('tenant-ctl-%s')
cls.ctl_secgroup = cls.create_security_group(
name=ctl_ptn % 'secgroup')
cls.security_groups.append(cls.ctl_secgroup)
cls.create_loginable_secgroup_rule(
secgroup_id=cls.ctl_secgroup['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.ctl_secgroup['id'])
cls.ctl_network = cls.create_network(name=ctl_ptn % 'network')
cls.ctl_subnet = cls.create_subnet(
network=cls.ctl_network, name=ctl_ptn % 'subnet')
cls.ctl_router = cls.create_router_by_client()
cls.create_router_interface(
cls.ctl_router['id'], cls.ctl_subnet['id'])
def _ensure_ctl_fip_for_vm(self, vm):
"""Create control FIP through additional interface/port on tenant VM
(if doesn't exist already).
Mostly needed for bandwidth limit tests.
"""
if 'ctl_fip' in vm:
return
port = self.create_port(
self.ctl_network,
qos_policy_id=None,
security_groups=[self.ctl_secgroup['id']])
self.create_interface(vm['id'], port['id'])
waiters.wait_for_interface_status(
self.os_primary.interfaces_client, vm['id'],
port['id'], constants.PORT_STATUS_ACTIVE)
utils.configure_interface_up(vm['ssh_client'], port)
vm['ctl_fip'] = self.create_floatingip(port=port)
vm['ctl_ssh'] = ssh.Client(
vm['ctl_fip']['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
# NOTE(mblue): consider explicit symmetric and strict (1) setting
# for test nic, custom image defaults to off (0), so
# `max(all, per-iface)` set to 1, also first nic is set default route
# force tenant symmetric route for control interface
self.tenant_sym_path_for_port(vm['ssh_client'], port)
# TODO(mblue): fix when new NTP changes available for rdo gate,
# since NTP commit 2d663554 missing.
self.check_connectivity(host=None, ssh_client=vm['ctl_ssh'])
@staticmethod
def _get_iperf_proto_param(protocol):
if protocol == constants.PROTO_NAME_TCP:
@@ -177,28 +225,24 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
raise ValueError('Unsupported protocol %s' % protocol)
def _test_egress_bw(
self, ssh_client, ssh_server, server_ip, protocol, maxbitrate,
timeout=6):
self, ssh_client, ssh_server, client_ip, server_ip, protocol,
maxbitrate, timeout=6):
utils.kill_iperf_process(ssh_server)
utils.kill_iperf_process(ssh_client)
iperf_server_filename = utils.get_temp_file(ssh_server)
server_cmd = ('iperf3 -s -p {port} -J --logfile {output_file} '
'-D'.format(port=self.IPERF_PORT,
output_file=iperf_server_filename))
server_cmd = (
f'iperf3 -s -B {server_ip} -p {self.IPERF_PORT} -J --logfile '
f'{iperf_server_filename} -D')
LOG.debug('Run iperf3 command on server: %s', server_cmd)
ssh_server.exec_command(server_cmd)
time.sleep(0.1)
protocol_param = self._get_iperf_proto_param(protocol)
client_cmd = ('iperf3 -c {server_ip} -p {port} {protocol_param} '
'-t {timeout} -b {maxbitrate}'.format(
server_ip=server_ip,
port=self.IPERF_PORT,
protocol_param=protocol_param,
timeout=timeout,
maxbitrate=maxbitrate))
client_cmd = (
f'iperf3 -c {server_ip} -B {client_ip} -p {self.IPERF_PORT} '
f'{protocol_param} -t {timeout} -b {maxbitrate}')
LOG.debug('Run iperf3 command on client: %s', client_cmd)
ssh_client.exec_command(client_cmd)
time.sleep(0.1)
@@ -206,25 +250,22 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
ssh_server, iperf_server_filename))
def _test_ingress_bw(
self, ssh_client, ssh_server, server_ip, protocol, maxbitrate,
timeout=6):
self, ssh_client, ssh_server, client_ip, server_ip, protocol,
maxbitrate, timeout=6):
utils.kill_iperf_process(ssh_server)
utils.kill_iperf_process(ssh_client)
server_cmd = 'iperf3 -s -p {port} -D'.format(port=self.IPERF_PORT)
server_cmd = f'iperf3 -s -B {server_ip} -p {self.IPERF_PORT} -D'
LOG.debug('Run iperf3 command on server: %s', server_cmd)
ssh_server.exec_command(server_cmd)
time.sleep(0.1)
iperf_client_filename = utils.get_temp_file(ssh_client)
protocol_param = self._get_iperf_proto_param(protocol)
client_cmd = ('iperf3 -c {server_ip} -p {port} {protocol_param} '
'-t {timeout} -b {maxbitrate} '
'-R -J > {output_file}'.format(
server_ip=server_ip, port=self.IPERF_PORT,
protocol_param=protocol_param, timeout=timeout,
maxbitrate=maxbitrate,
output_file=iperf_client_filename))
client_cmd = (
f'iperf3 -c {server_ip} -B {client_ip} -p {self.IPERF_PORT} '
f'{protocol_param} -t {timeout} -b {maxbitrate} '
f'-R -J > {iperf_client_filename}')
LOG.debug('Run iperf3 command on client: %s', client_cmd)
ssh_client.exec_command(client_cmd)
time.sleep(0.1)
@@ -266,20 +307,53 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
raise self.skipException(
"iperf3 is not available on VM instance")
def _get_tenant_port_by_type(self, vm_id, _type, ignore_port_ids=None):
# bw tests: iperf3 tenant VMs assumed to only have control/test ports,
# special cases may use ignore_port_ids.
assert _type in ('control', 'test')
vm_ports = self.admin_client.list_ports(device_id=vm_id)['ports']
for port in vm_ports:
if ignore_port_ids and port['id'] in ignore_port_ids:
LOG.debug('Ignored port %s (found in %s ignore list)',
port['id'], ignore_port_ids)
continue
for fixed_ip in port['fixed_ips']:
if _type == 'control':
found = fixed_ip['subnet_id'] == self.ctl_subnet['id']
else:
found = fixed_ip['subnet_id'] != self.ctl_subnet['id']
if found:
LOG.debug(
"%s port %s (related ip %s) found for tenant vm %s",
_type, port['id'], fixed_ip['ip_address'], vm_id)
return port
LOG.warning("%s port not found for tenant vm %s (ignore list %s)",
_type, vm_id, ignore_port_ids)
# retry only when noticed measuring issue, as reported in BZ#2274465
@utils.retry_on_assert_fail(
max_retries=2,
assert_regex='not .* than')
def _validate_bw_limit(self, client, server, egress=True, ipv6=False,
bw_limit=None, protocol=constants.PROTO_NAME_TCP):
server_port = self.client.list_ports(
device_id=server['id'])['ports'][0]
bw_limit=None, protocol=constants.PROTO_NAME_TCP,
ignore_port_ids=None):
# control port/fip, per server/client tenant vms of iperf
self._ensure_ctl_fip_for_vm(client)
self._ensure_ctl_fip_for_vm(server)
# NOTE(mblue): iperf3 needs the soon to be overwhelmed test port,
# while control port remains for ssh (and any non bw limited traffic)
# ignore list currently for a single OVN test using 3 ports on vm
server_port = self._get_tenant_port_by_type(
server['id'], 'test', ignore_port_ids=ignore_port_ids)
server_ips = []
for fixed_ip in server_port['fixed_ips']:
if netaddr.valid_ipv6(fixed_ip['ip_address']) and not ipv6:
continue
server_ips.append(fixed_ip['ip_address'])
self.assertGreater(len(server_ips), 0)
client_ip = self._get_tenant_port_by_type(
client['id'], 'test', ignore_port_ids=ignore_port_ids
)['fixed_ips'][0]['ip_address']
if egress:
test_bw_method = self._test_egress_bw
@@ -296,20 +370,24 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
maxbitrate = (bw_limit * 1.5 if bw_limit is not None
else self.MIN_KBPS_NO_BWLIMIT * 2000)
# TODO(mblue): all server ips tested, but not all client ips, nested
# loop to test all server/client ip possibilities would add test
# time, consider if needed (ex: ipv6 to ipv6 not tested, etc).
# egress: send payload from client to server
# ingress: download payload from server to client
for server_ip in server_ips:
perf_measures = test_bw_method(
client['ssh_client'], server['ssh_client'],
server_ip, protocol, maxbitrate=maxbitrate)
client['ctl_ssh'], server['ctl_ssh'],
client_ip, server_ip, protocol, maxbitrate=maxbitrate)
LOG.debug('perf_measures = %s', perf_measures)
# verify bw limit
measured_bw = self._calculate_bw(perf_measures)
LOG.debug(
'%s %s / server_ip = %s / measured_bw = %f',
direction, protocol, server_ip, measured_bw)
'%s %s / server_ip = %s , client_ip = %s / measured_bw = %f',
direction, protocol, server_ip, client_ip, measured_bw)
if bw_limit is None:
LOG.debug('no %s bw_limit configured', direction)
@@ -377,9 +455,9 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
protocol=constants.PROTO_NAME_UDP)
direction = 'egress' if egress else 'ingress'
# Create new QoS policy and attach to the src network
# Create new QoS policy and attach to the src test network
net_bwlimit_policy_id = self._create_qos_policy()
src_port = self.client.list_ports(device_id=client['id'])['ports'][0]
src_port = self._get_tenant_port_by_type(client['id'], 'test')
self.admin_client.update_network(
src_port['network_id'], qos_policy_id=net_bwlimit_policy_id)
self.addCleanup(self.admin_client.update_network,
@@ -711,8 +789,7 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
def _validate_traffic_marked(
self, mark, src_server, dst_server, ipv6=False, outer=False):
dst_port = self.client.list_ports(
device_id=dst_server['id'])['ports'][0]
dst_port = self._get_tenant_port_by_type(dst_server['id'], 'test')
dst_ips = []
for fixed_ip in dst_port['fixed_ips']:
if netaddr.valid_ipv6(fixed_ip['ip_address']) and not ipv6:
@@ -720,9 +797,7 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
dst_ips.append(fixed_ip['ip_address'])
self.assertGreater(len(dst_ips), 0)
src_port = self.client.list_ports(
device_id=src_server['id'])['ports'][0]
src_port = self._get_tenant_port_by_type(src_server['id'], 'test')
if outer:
interface = WB_CONF.node_tenant_interface
dst_server['host'] = self.get_host_for_server(
@@ -950,8 +1025,8 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
self, src_server, dst_server, migration_method):
vms = {'sender': src_server, 'receiver': dst_server}
port = self.client.list_ports(
device_id=vms['sender']['id'])['ports'][0]
port = self._get_tenant_port_by_type(
vms['sender']['id'], 'test')
max_kbps = (self.bwlimit_kbps_net
if port['binding:vnic_type'] == 'normal'
else self.bwlimit_kbps_net * 1000)
@@ -1000,6 +1075,16 @@ class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
# Make sure that bw limit still works after migration, but only if
# sender VM and receiver VM are in different hosts.
if vm_host['sender'] != vm_host['receiver']:
# NOTE(mblue): enable and re-add symmetric and strict routing
# for control ip address/nic after cold migration
if migration_method == 'cold-migration':
for _vm in (vms['sender'], vms['receiver']):
ctl_port = self._get_tenant_port_by_type(
_vm['id'], 'control')
utils.configure_interface_up(
_vm['ssh_client'], ctl_port)
self.tenant_sym_path_for_port(
_vm['ssh_client'], ctl_port)
self._validate_bw_limit(
bw_limit=max_kbps * 1000, client=vms['sender'],
server=vms['receiver'])
@@ -1519,18 +1604,26 @@ class QosTestOvn(base.BaseTempestTestCaseOvn, QosBaseTest):
network_qos,
qos_policy_id=policy_id,
security_groups=[secgroup['id']])
port_no_qos = self.create_port(
network_no_qos,
qos_policy_id=None,
security_groups=[secgroup['id']])
# launch server with non policy port, then attach also to policy port
port_no_qos, fip_no_qos, server = self._create_server(
network=network_no_qos).values()
# launch server with policy port, then attach to non policy port
# (so default route on tenant vm test port first, as other bw tests)
port_qos, fip_qos, server = self._create_server(
port=port_qos).values()
# other server to validate QoS policy port later
scheduler_hints = {'different_host': server['id']}
other_fip, other_server = tuple(self._create_server(
network=network_qos, scheduler_hints=scheduler_hints).values())[1:]
network=network_qos,
security_groups=[{'name': secgroup['name']}],
scheduler_hints=scheduler_hints
).values())[1:]
server['ssh_client'] = ssh.Client(
fip_no_qos['floating_ip_address'],
fip_qos['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
@@ -1541,13 +1634,13 @@ class QosTestOvn(base.BaseTempestTestCaseOvn, QosBaseTest):
server['ssh_client'].test_connection_auth()
self.create_interface(server['id'], port_qos['id'])
self.create_interface(server['id'], port_no_qos['id'])
waiters.wait_for_interface_status(
self.os_primary.interfaces_client, server['id'],
port_qos['id'], constants.PORT_STATUS_ACTIVE)
port_no_qos['id'], constants.PORT_STATUS_ACTIVE)
# configure ip and activate QoS port interface from server CLI
utils.configure_interface_up(server['ssh_client'], port_qos)
utils.configure_interface_up(server['ssh_client'], port_no_qos)
# validate connectivity with QoS port using another VM on QoS subnet
self.check_remote_connectivity(
@@ -1578,7 +1671,8 @@ class QosTestOvn(base.BaseTempestTestCaseOvn, QosBaseTest):
if is_iperf_installed:
self._validate_bw_limit(
client=server, server=other_server,
bw_limit=self.MAX_KBPS * 1000)
bw_limit=self.MAX_KBPS * 1000,
ignore_port_ids=(port_no_qos['id'],))
@decorators.idempotent_id('ba85bd87-f4f6-45a8-a2bd-97acb804b6f9')
def test_create_network_qos_policy_before_creating_vm(self):