Run iperf3 client (and server) in background

This patch implements possibility to run iperf3 client and server as
external processes in background.
Client can be run on the local machine where Tobiko is run or, if
ssh_client is given it will run it on the remote machine.

For the iperf3 server, this one will be run on the remote machine if
the ``iperf3_server_ssh_client`` is given. If not, iperf3 server is
expected to be run on the destination to perform tests correctly.

Additionally 2 new config options for the RHOSO topologies are added:

* max_traffic_break_allowed - to specify longest allowed single break in
  the traffic tested with iperf3,
* max_total_breaks_allowed - to specify total allowed breaks time in the
  traffic tested with iperf3.

Related: #TOBIKO-128
Change-Id: I459e8505e8ccc75caa1c9e3e0955e94ea38c4352
This commit is contained in:
Slawek Kaplonski
2025-02-28 16:24:26 +01:00
parent 9ac2809868
commit 9a0cd1d012
5 changed files with 328 additions and 11 deletions

View File

@@ -70,6 +70,23 @@ RHOSP_OPTIONS = [
default=False,
deprecated_group=TRIPLEO_GROUP_NAME,
help="whether Ceph RGW is deployed"),
# Background connectivity related settings:
cfg.IntOpt('max_traffic_break_allowed',
default=0,
help="longest allowed single break time during the background "
"connectivity tests like e.g. those using iperf3 "
"(in seconds)"),
cfg.IntOpt('max_total_breaks_allowed',
default=0,
help="longest allowed total break time during the background "
"connectivity tests like e.g. those using iperf3. "
"This option represents total time when connetion "
"was not working. "
"For example it could be: not working for 3 seconds, "
"then working for 60 seconds and then again not working "
"for another 10 seconds. In such case this total break "
"time would be 13 seconds."),
]
TRIPLEO_OPTIONS = [

View File

@@ -16,6 +16,12 @@
from __future__ import absolute_import
from tobiko.shell.iperf3 import _assert
from tobiko.shell.iperf3 import _execute
assert_has_bandwith_limits = _assert.assert_has_bandwith_limits
execute_iperf3_client_in_background = \
_execute.execute_iperf3_client_in_background
check_iperf3_client_results = _execute.check_iperf3_client_results
iperf3_client_alive = _execute.iperf3_client_alive
stop_iperf3_client = _execute.stop_iperf3_client

View File

@@ -17,21 +17,79 @@ from __future__ import absolute_import
from __future__ import division
import json
import os
import typing
import netaddr
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.shell.iperf3 import _interface
from tobiko.shell.iperf3 import _parameters
from tobiko.shell import sh
from tobiko.shell import ssh
CONF = config.CONF
LOG = log.getLogger(__name__)
def _get_filepath(address: typing.Union[str, netaddr.IPAddress],
path: str,
ssh_client: ssh.SSHClientType = None) -> str:
if ssh_client:
final_dir = _get_remote_filepath(path, ssh_client)
else:
final_dir = _get_local_filepath(path)
filename = f'iperf_{address}.log'
return os.path.join(final_dir, filename)
def _get_local_filepath(path: str) -> str:
final_dir_path = f'{sh.get_user_home_dir()}/{path}'
if not os.path.exists(final_dir_path):
os.makedirs(final_dir_path)
return final_dir_path
def _get_remote_filepath(path: str,
ssh_client: ssh.SSHClientType) -> str:
homedir = sh.execute('echo ~', ssh_client=ssh_client).stdout.rstrip()
final_dir_path = f'{homedir}/{path}'
sh.execute(f'/usr/bin/mkdir -p {final_dir_path}',
ssh_client=ssh_client)
return final_dir_path
def _truncate_iperf3_client_logfile(
logfile: str,
ssh_client: ssh.SSHClientType = None) -> None:
if ssh_client:
_truncate_remote_logfile(logfile, ssh_client)
else:
tobiko.truncate_logfile(logfile)
def _truncate_remote_logfile(logfile: str,
ssh_client: ssh.SSHClientType) -> None:
truncated_logfile = tobiko.get_truncated_filename(logfile)
sh.execute(f'/usr/bin/mv {logfile} {truncated_logfile}',
ssh_client=ssh_client)
def _remove_old_logfile(logfile: str,
ssh_client: ssh.SSHClientType = None):
if ssh_client:
sh.execute(f'/usr/bin/rm -f {logfile}',
ssh_client=ssh_client)
else:
try:
os.remove(logfile)
except FileNotFoundError:
pass
def get_bandwidth(address: typing.Union[str, netaddr.IPAddress],
bitrate: int = None,
download: bool = None,
@@ -68,21 +126,202 @@ def execute_iperf3_client(address: typing.Union[str, netaddr.IPAddress],
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None,
timeout: tobiko.Seconds = None) \
timeout: tobiko.Seconds = None,
logfile: str = None,
run_in_background: bool = False) \
-> typing.Dict:
params_timeout: typing.Optional[int] = None
if timeout is not None:
if run_in_background:
params_timeout = 0
elif timeout is not None:
params_timeout = int(timeout - 0.5)
parameters = _parameters.iperf3_client_parameters(address=address,
bitrate=bitrate,
download=download,
port=port,
protocol=protocol,
timeout=params_timeout)
parameters = _parameters.iperf3_client_parameters(
address=address, bitrate=bitrate,
download=download, port=port, protocol=protocol,
timeout=params_timeout, logfile=logfile)
command = _interface.get_iperf3_client_command(parameters)
# output is a dictionary
if run_in_background:
process = sh.process(command, ssh_client=ssh_client)
process.execute()
return {}
output = sh.execute(command,
ssh_client=ssh_client,
timeout=timeout).stdout
return json.loads(output)
def execute_iperf3_client_in_background(
address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613
bitrate: int = None,
download: bool = None,
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None,
iperf3_server_ssh_client: ssh.SSHClientType = None,
output_dir: str = 'tobiko_iperf_results',
**kwargs) -> None:
output_path = _get_filepath(address, output_dir, ssh_client)
LOG.info(f'starting iperf3 client process to > {address} , '
f'output file is : {output_path}')
# just in case there is some leftover file from previous run,
# it needs to be removed, otherwise iperf will append new log
# to the end of the existing file and this will make json output
# file to be malformed
_remove_old_logfile(output_path, ssh_client=ssh_client)
# If there is ssh client for the server where iperf3 server is going
# to run, lets make sure it is started fresh as e.g. in case of
# failure in the previous run, it may report that is still "busy" thus
# iperf3 client will not start properly
if iperf3_server_ssh_client:
_stop_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
_start_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
if not _iperf3_server_alive(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client):
testcase = tobiko.get_test_case()
testcase.fail('iperf3 server did not start properly '
f'on the server {iperf3_server_ssh_client}')
# Now, finally iperf3 client should be ready to start
execute_iperf3_client(
address=address,
bitrate=bitrate,
download=download,
port=port,
protocol=protocol,
ssh_client=ssh_client,
logfile=output_path,
run_in_background=True)
def _get_iperf3_pid(
address: typing.Union[str, netaddr.IPAddress, None] = None,
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None) -> typing.Union[int, None]:
try:
iperf_pids = sh.execute(
'pidof iperf3', ssh_client=ssh_client).stdout.rstrip().split(" ")
except sh.ShellCommandFailed:
return None
for iperf_pid in iperf_pids:
proc_cmdline = sh.get_command_line(
iperf_pid,
ssh_client=ssh_client)
if address and str(address) in proc_cmdline:
# This is looking for the iperf client instance
return int(iperf_pid)
elif port and protocol:
# By looking for port and protocol we are looking
# for the iperf3 server's PID
if "-s" in proc_cmdline and f"-p {port}" in proc_cmdline:
if ((protocol.lower() == 'udp' and "-u" in proc_cmdline) or
(protocol.lower() == 'tcp' and
'-u' not in proc_cmdline)):
return int(iperf_pid)
return None
def check_iperf3_client_results(address: typing.Union[str, netaddr.IPAddress],
output_dir: str = 'tobiko_iperf_results',
ssh_client: ssh.SSHClientType = None,
**kwargs): # noqa; pylint: disable=W0613
# This function expects that the result file is available locally already
#
logfile = _get_filepath(address, output_dir, ssh_client)
try:
iperf_log_raw = sh.execute(
f"cat {logfile}", ssh_client=ssh_client).stdout
except sh.ShellCommandFailed as err:
if config.is_prevent_create():
# Tobiko is not expected to create resources in this run
# so iperf should be already running and log file should
# be already there, if it is not, it should fail
tobiko.fail('Failed to read iperf log from the file. '
f'Server IP address: {address}; Logfile: {logfile}')
else:
# Tobiko is creating resources so it is normal that file was not
# there yet
LOG.debug(f'Failed to read iperf log from the file. '
f'Error: {err}')
return
iperf_log = json.loads(iperf_log_raw)
longest_break = 0 # seconds
breaks_total = 0 # seconds
current_break = 0 # seconds
intervals = iperf_log.get("intervals")
if not intervals:
tobiko.fail(f"No intervals data found in {logfile}")
for interval in intervals:
if interval["sum"]["bytes"] == 0:
interval_duration = (
interval["sum"]["end"] - interval["sum"]["start"])
current_break += interval_duration
if current_break > longest_break:
longest_break = current_break
breaks_total += interval_duration
else:
current_break = 0
_truncate_iperf3_client_logfile(logfile, ssh_client)
testcase = tobiko.get_test_case()
testcase.assertLessEqual(longest_break,
CONF.tobiko.rhosp.max_traffic_break_allowed)
testcase.assertLessEqual(breaks_total,
CONF.tobiko.rhosp.max_total_breaks_allowed)
def iperf3_client_alive(address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613
ssh_client: ssh.SSHClientType = None,
**kwargs) -> bool:
return bool(_get_iperf3_pid(address=address, ssh_client=ssh_client))
def stop_iperf3_client(address: typing.Union[str, netaddr.IPAddress],
ssh_client: ssh.SSHClientType = None,
**kwargs): # noqa; pylint: disable=W0613
pid = _get_iperf3_pid(address=address, ssh_client=ssh_client)
if pid:
LOG.info(f'iperf3 client process to > {address} already running '
f'with PID: {pid}')
sh.execute(f'sudo kill {pid}', ssh_client=ssh_client)
def _start_iperf3_server(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType):
parameters = _parameters.iperf3_server_parameters(
port=port, protocol=protocol)
command = _interface.get_iperf3_server_command(parameters)
process = sh.process(command, ssh_client=ssh_client)
process.execute()
def _iperf3_server_alive(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType = None) -> bool:
return bool(
_get_iperf3_pid(port=port, protocol=protocol,
ssh_client=ssh_client))
def _stop_iperf3_server(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType = None):
pid = _get_iperf3_pid(port=port, protocol=protocol, ssh_client=ssh_client)
if pid:
LOG.info(f'iperf3 server listening on the {protocol} port: {port} '
f'is already running with PID: {pid}')
sh.execute(f'sudo kill {pid}', ssh_client=ssh_client)

View File

@@ -29,6 +29,11 @@ def get_iperf3_client_command(parameters: _parameters.Iperf3ClientParameters):
return interface.get_iperf3_client_command(parameters)
def get_iperf3_server_command(parameters: _parameters.Iperf3ServerParameters):
interface = Iperf3Interface()
return interface.get_iperf3_server_command(parameters)
class Iperf3Interface:
def get_iperf3_client_command(
@@ -38,6 +43,13 @@ class Iperf3Interface:
options = self.get_iperf3_client_options(parameters=parameters)
return sh.shell_command('iperf3') + options
def get_iperf3_server_command(
self,
parameters: _parameters.Iperf3ServerParameters) \
-> sh.ShellCommand:
options = self.get_iperf3_server_options(parameters=parameters)
return sh.shell_command('iperf3') + options
def get_iperf3_client_options(
self,
parameters: _parameters.Iperf3ClientParameters) \
@@ -54,6 +66,20 @@ class Iperf3Interface:
options += self.get_download_option(parameters.download)
if parameters.protocol is not None:
options += self.get_protocol_option(parameters.protocol)
if parameters.logfile is not None:
options += self.get_logfile_option(parameters.logfile)
return options
def get_iperf3_server_options(
self,
parameters: _parameters.Iperf3ServerParameters) \
-> sh.ShellCommand:
options = sh.ShellCommand(['-J'])
options += self.get_server_mode_option()
if parameters.port is not None:
options += self.get_port_option(parameters.port)
if parameters.protocol is not None:
options += self.get_protocol_option(parameters.protocol)
return options
@staticmethod
@@ -64,6 +90,10 @@ class Iperf3Interface:
def get_client_mode_option(server_address: str):
return ['-c', server_address]
@staticmethod
def get_server_mode_option():
return ["-s"]
@staticmethod
def get_download_option(download: bool):
if download:
@@ -82,7 +112,7 @@ class Iperf3Interface:
@staticmethod
def get_timeout_option(timeout: int):
if timeout > 0:
if timeout >= 0:
return ['-t', timeout]
else:
return []
@@ -90,3 +120,7 @@ class Iperf3Interface:
@staticmethod
def get_port_option(port):
return ['-p', port]
@staticmethod
def get_logfile_option(logfile):
return ['--logfile', logfile]

View File

@@ -29,6 +29,12 @@ class Iperf3ClientParameters(typing.NamedTuple):
port: typing.Optional[int] = None
protocol: typing.Optional[str] = None
timeout: typing.Optional[int] = None
logfile: typing.Optional[str] = None
class Iperf3ServerParameters(typing.NamedTuple):
port: typing.Optional[int] = None
protocol: typing.Optional[str] = None
def iperf3_client_parameters(
@@ -37,7 +43,8 @@ def iperf3_client_parameters(
download: bool = None,
port: int = None,
protocol: str = None,
timeout: int = None):
timeout: int = None,
logfile: str = None):
"""Get iperf3 client parameters
mode allowed values: client or server
ip is only needed for client mode
@@ -60,4 +67,18 @@ def iperf3_client_parameters(
download=download,
port=port,
protocol=protocol,
timeout=timeout)
timeout=timeout,
logfile=logfile)
def iperf3_server_parameters(
port: int = None, protocol: str = None) -> Iperf3ServerParameters:
"""Get iperf3 server parameters
"""
config = tobiko.tobiko_config().iperf3
if port is None:
port = config.port
if protocol is None:
protocol = config.protocol
return Iperf3ServerParameters(port=port,
protocol=protocol)