whitebox-neutron-tempest-pl.../whitebox_neutron_tempest_plugin/common/utils.py
Roman Safronov bf09f02aca Add L3HA OVN tests
Moved tests from the downstream plugin with minimal changes.
Setup function changed significantly in order to support
environments where routers can be scheduled on compute nodes.
Base _create_server function was changed to support spawning
a VM on a specific host or any host excluding list of hosts.
The tests are supported currently only on environments where ovn
routers are located on compute or standalone networker nodes.
Test that performs shutting down a node is required this
patch [1].
Also:
Since _create_server return value was changed (simplified),
adjusted some qos tests that could be affected.
Removed redundant 'return None' statements from some base functions.

[1] https://review.opendev.org/c/x/whitebox-neutron-tempest-plugin/+/913851

Change-Id: Ic7d46a9a432089f1cd1a30af7639e9824e464c6d
2024-04-01 19:12:30 +03:00

230 lines
8.1 KiB
Python

# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import subprocess
import time
from neutron_tempest_plugin.common import shell
from neutron_tempest_plugin.common import utils as common_utils
from oslo_log import log
from tempest import config
from tempest.lib import exceptions
from whitebox_neutron_tempest_plugin.common import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
def create_payload_file(ssh_client, size):
ssh_client.exec_command(
"head -c {0} /dev/zero > {0}".format(size))
def get_temp_file(ssh_client):
output_file = ssh_client.exec_command(
'mktemp').rstrip()
return output_file
def cat_remote_file(ssh_client, path):
return ssh_client.exec_command(
'cat {}'.format(path)).rstrip()
def get_default_interface(ssh_client):
return ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1 | "
"cut -d ' ' -f 5" % constants.GLOBAL_IP).rstrip()
def get_route_interface(ssh_client, dst_ip):
output = ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1" % dst_ip)
if output:
for line in output.splitlines():
fields = line.strip().split()
device_index = fields.index('dev') + 1
return fields[device_index]
def make_sure_local_port_is_open(protocol, port):
shell.execute_local_command(
"sudo iptables-save | "
r"grep 'INPUT.*{protocol}.*\-\-dport {port} \-j ACCEPT' "
"&& true || "
"sudo iptables -I INPUT 1 -p {protocol} --dport {port} -j ACCEPT"
"".format(protocol=protocol, port=port))
# Unlike ncat server function from the upstream plugin this ncat server
# turns itself off automatically after timeout
def run_ncat_server(ssh_client, udp):
output_file = get_temp_file(ssh_client)
cmd = "sudo timeout {0} nc -l {1} -p {2} > {3}".format(
constants.NCAT_TIMEOUT, udp, constants.NCAT_PORT, output_file)
LOG.debug("Starting nc server: '%s'", cmd)
ssh_client.open_session().exec_command(cmd)
return output_file
# Unlike ncat client function from the upstream plugin this ncat client
# is able to run from any host, not only locally
def run_ncat_client(ssh_client, host, udp, payload_size):
cmd = "nc -w 1 {0} {1} {2} < {3}".format(
host, udp, constants.NCAT_PORT, payload_size)
LOG.debug("Starting nc client: '%s'", cmd)
ssh_client.exec_command(cmd)
def flush_routing_cache(ssh_client):
ssh_client.exec_command("sudo ip route flush cache")
def kill_iperf_process(ssh_client):
cmd = "PATH=$PATH:/usr/sbin pkill iperf3"
try:
ssh_client.exec_command(cmd)
except exceptions.SSHExecCommandFailed:
pass
def configure_interface_up(client, port, interface=None, path=None):
"""configures down interface with ip and activates it
Parameters:
client (ssh.Client):ssh client which has interface to configure.
port (port):port object of interface.
interface (str):optional interface name on vm.
path (str):optional shell PATH variable.
"""
shell_path = path or "PATH=$PATH:/sbin"
test_interface = interface or client.exec_command(
"{};ip addr | grep {} -B 1 | head -1 | "
r"cut -d ':' -f 2 | sed 's/\ //g'".format(
shell_path, port['mac_address'])).rstrip()
if CONF.neutron_plugin_options.default_image_is_advanced:
cmd = ("ip addr show {interface} | grep {ip} || "
"sudo dhclient {interface}").format(
ip=port['fixed_ips'][0]['ip_address'],
interface=test_interface)
else:
cmd = ("cat /sys/class/net/{interface}/operstate | "
"grep -q -v down && true || "
"({path}; sudo ip link set {interface} up && "
"sudo ip addr add {ip}/24 dev {interface})").format(
path=shell_path,
ip=port['fixed_ips'][0]['ip_address'],
interface=test_interface)
common_utils.wait_until_true(
lambda: execute_command_safely(client, cmd), timeout=30, sleep=5)
def parse_dhcp_options_from_nmcli(
ssh_client, ip_version,
timeout=20.0, interval=5.0, expected_empty=False, vlan=None):
# first of all, test ssh connection is available - the time it takes until
# ssh connection can be established is not cosidered for the nmcli timeout
ssh_client.test_connection_auth()
# Add grep -v to exclude loopback interface because
# Managing the lookback interface using NetworkManager is included in
# RHEL9.2 image. Previous version is not included.
cmd_find_connection = 'nmcli -g NAME con show --active | grep -v "^lo"'
if vlan is not None:
cmd_find_connection += ' | grep {}'.format(vlan)
cmd_show_dhcp = ('sudo nmcli -f DHCP{} con show '
'"$({})"').format(ip_version, cmd_find_connection)
start_time = time.time()
while True:
try:
output = ssh_client.exec_command(cmd_show_dhcp)
except exceptions.SSHExecCommandFailed:
LOG.warning('Failed to run nmcli on VM - retrying...')
else:
if not output and not expected_empty:
LOG.warning('nmcli result on VM is empty - retrying...')
else:
break
if time.time() - start_time > timeout:
message = ('Failed to run nmcli on VM after {} '
'seconds'.format(timeout))
raise exceptions.TimeoutException(message)
time.sleep(interval)
if not output:
LOG.warning('Failed to obtain DHCP opts')
return None
obtained_dhcp_opts = {}
for line in output.splitlines():
newline = re.sub(r'^DHCP{}.OPTION\[[0-9]+\]:\s+'.format(ip_version),
'', line.strip())
option = newline.split('=')[0].strip()
value = newline.split('=')[1].strip()
if option in constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON:
option = constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON[option]
obtained_dhcp_opts[option] = value
return obtained_dhcp_opts
def execute_command_safely(ssh_client, command):
try:
output = ssh_client.exec_command(command)
except exceptions.SSHExecCommandFailed as err:
LOG.warning('command failed: %s', command)
LOG.exception(err)
return False
LOG.debug('command executed successfully: %s\n'
'command output:\n%s',
command, output)
return True
def host_responds_to_ping(ip, count=3):
cmd = "ping -c{} {}".format(count, ip)
try:
subprocess.check_output(['bash', '-c', cmd])
except subprocess.CalledProcessError:
return False
return True
def run_local_cmd(cmd, timeout=10):
command = "timeout " + str(timeout) + " " + cmd
LOG.debug("Running local command '{}'".format(command))
output, errors = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
return output, errors
def interface_state_set(client, interface, state):
shell_path = 'PATH=$PATH:/sbin'
LOG.debug('Setting interface {} {} on {}'.format(
interface, state, client.host))
client.exec_command(
"{path}; sudo ip link set {interface} {state}".format(
path=shell_path, interface=interface, state=state))
def remote_service_action(client, service, action):
cmd = "sudo systemctl {action} {service}".format(
action=action, service=service)
LOG.debug("Running '{}' on {}".format(cmd, client.host))
client.exec_command(cmd)