The method ``get_ml2_conf_file`` now uses the configuration option "ml2_plugin_config" instead of the hardcoded default value "/etc/neutron/plugins/ml2/ml2_conf.ini" Change-Id: Ib749d3b35ebaf030375160ebdc63b2ec4f73b959
317 lines
11 KiB
317 lines
11 KiB
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import re
import subprocess
import time
from neutron_tempest_plugin.common import shell
from neutron_tempest_plugin.common import utils as common_utils
from oslo_log import log
from tempest import config
from tempest.lib import exceptions
from whitebox_neutron_tempest_plugin.common import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
WB_CONF = CONF.whitebox_neutron_plugin_options
def create_payload_file(ssh_client, size):
"head -c {0} /dev/zero > {0}".format(size))
def get_temp_file(ssh_client):
output_file = ssh_client.exec_command(
return output_file
def cat_remote_file(ssh_client, path):
return ssh_client.exec_command(
'cat {}'.format(path)).rstrip()
def get_default_interface(ssh_client):
return ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1 | "
"cut -d ' ' -f 5" % constants.GLOBAL_IP).rstrip()
def get_route_interface(ssh_client, dst_ip):
output = ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1" % dst_ip)
if output:
for line in output.splitlines():
fields = line.strip().split()
device_index = fields.index('dev') + 1
return fields[device_index]
def make_sure_local_port_is_open(protocol, port):
"sudo iptables-save | "
r"grep 'INPUT.*{protocol}.*\-\-dport {port} \-j ACCEPT' "
"&& true || "
"sudo iptables -I INPUT 1 -p {protocol} --dport {port} -j ACCEPT"
"".format(protocol=protocol, port=port))
# Unlike ncat server function from the upstream plugin this ncat server
# turns itself off automatically after timeout
def run_ncat_server(ssh_client, udp):
output_file = get_temp_file(ssh_client)
cmd = "sudo timeout {0} nc -l {1} -p {2} > {3}".format(
constants.NCAT_TIMEOUT, udp, constants.NCAT_PORT, output_file)
LOG.debug("Starting nc server: '%s'", cmd)
return output_file
# Unlike ncat client function from the upstream plugin this ncat client
# is able to run from any host, not only locally
def run_ncat_client(ssh_client, host, udp, payload_size):
cmd = "nc -w 1 {0} {1} {2} < {3}".format(
host, udp, constants.NCAT_PORT, payload_size)
LOG.debug("Starting nc client: '%s'", cmd)
def flush_routing_cache(ssh_client):
ssh_client.exec_command("sudo ip route flush cache")
def kill_iperf_process(ssh_client):
cmd = "PATH=$PATH:/usr/sbin pkill iperf3"
except exceptions.SSHExecCommandFailed:
def configure_interface_up(client, port, interface=None, path=None):
"""configures down interface with ip and activates it
client (ssh.Client):ssh client which has interface to configure.
port (port):port object of interface.
interface (str):optional interface name on vm.
path (str):optional shell PATH variable.
shell_path = path or "PATH=$PATH:/sbin"
test_interface = interface or client.exec_command(
"{};ip addr | grep {} -B 1 | head -1 | "
r"cut -d ':' -f 2 | sed 's/\ //g'".format(
shell_path, port['mac_address'])).rstrip()
if CONF.neutron_plugin_options.default_image_is_advanced:
cmd = ("ip addr show {interface} | grep {ip} || "
"sudo dhclient {interface}").format(
cmd = ("cat /sys/class/net/{interface}/operstate | "
"grep -q -v down && true || "
"({path}; sudo ip link set {interface} up && "
"sudo ip addr add {ip}/24 dev {interface})").format(
lambda: execute_command_safely(client, cmd), timeout=30, sleep=5)
def parse_dhcp_options_from_nmcli(
ssh_client, ip_version,
timeout=20.0, interval=5.0, expected_empty=False, vlan=None):
# first of all, test ssh connection is available - the time it takes until
# ssh connection can be established is not cosidered for the nmcli timeout
# Add grep -v to exclude loopback interface because
# Managing the lookback interface using NetworkManager is included in
# RHEL9.2 image. Previous version is not included.
cmd_find_connection = 'nmcli -g NAME con show --active | grep -v "^lo"'
if vlan is not None:
cmd_find_connection += ' | grep {}'.format(vlan)
cmd_show_dhcp = ('sudo nmcli -f DHCP{} con show '
'"$({})"').format(ip_version, cmd_find_connection)
start_time = time.time()
while True:
output = ssh_client.exec_command(cmd_show_dhcp)
except exceptions.SSHExecCommandFailed:
LOG.warning('Failed to run nmcli on VM - retrying...')
if not output and not expected_empty:
LOG.warning('nmcli result on VM is empty - retrying...')
if time.time() - start_time > timeout:
message = ('Failed to run nmcli on VM after {} '
raise exceptions.TimeoutException(message)
if not output:
LOG.warning('Failed to obtain DHCP opts')
return None
obtained_dhcp_opts = {}
for line in output.splitlines():
newline = re.sub(r'^DHCP{}.OPTION\[[0-9]+\]:\s+'.format(ip_version),
'', line.strip())
option = newline.split('=')[0].strip()
value = newline.split('=')[1].strip()
if option in constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON:
option = constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON[option]
obtained_dhcp_opts[option] = value
return obtained_dhcp_opts
def execute_command_safely(ssh_client, command):
output = ssh_client.exec_command(command)
except exceptions.SSHExecCommandFailed as err:
LOG.warning('command failed: %s', command)
return False
LOG.debug('command executed successfully: %s\n'
'command output:\n%s',
command, output)
return True
def host_responds_to_ping(ip, count=3):
cmd = "ping -c{} {}".format(count, ip)
subprocess.check_output(['bash', '-c', cmd])
except subprocess.CalledProcessError:
return False
return True
def run_local_cmd(cmd, timeout=10):
command = "timeout " + str(timeout) + " " + cmd
LOG.debug("Running local command '{}'".format(command))
output, errors = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE,
return output, errors
def interface_state_set(client, interface, state):
shell_path = 'PATH=$PATH:/sbin'
LOG.debug('Setting interface {} {} on {}'.format(
interface, state, client.host))
"{path}; sudo ip link set {interface} {state}".format(
path=shell_path, interface=interface, state=state))
def remote_service_action(client, service, action, target_state):
cmd = "sudo systemctl {action} {service}".format(
action=action, service=service)
LOG.debug("Running '{}' on {}".format(cmd, client.host))
lambda: remote_service_check_state(client, service, target_state),
timeout=30, sleep=5,
exception=RuntimeError("Service failed to reach the required "
"state '{}'".format(target_state)))
def remote_service_check_state(client, service, state):
cmd = ("sudo systemctl is-active {service} "
"| grep -w {state} || true".format(service=service, state=state))
output = client.exec_command(cmd).strip()
return (state in output)
# NOTE(mblue): Please use specific regex to avoid dismissing various issues
def retry_on_assert_fail(max_retries,
"""Decorator that retries a function up to max_retries times on asser fail
In order to avoid dismissing exceptions which lead to bugs,
obligatory regex checked in caught exception message,
also optional specific exception type can be passed.
:param max_retries: Obligatory maximum number of retries before failing.
:param assert_regex: Obligatory regex should be in exception message.
:param exception_type: Optional specific exception related to failure.
def decor(f):
def inner(*args, **kwargs):
retries = 0
while retries < max_retries:
return f(*args, **kwargs)
except exception_type as e:
if not (re.search(assert_regex, str(e)) or
re.search(assert_regex, repr(e))):
f"Assertion failed: {e}. Retrying ({retries + 1}/"
retries += 1
raise AssertionError(f"Assert failed after {max_retries} retries.")
return inner
return decor
def wait_for_neutron_api(neutron_client, timeout=100):
"""Waits until the Neutron API replies
:param neutron_client: a Neutron client; it could have or not admin
:param timeout: maximum time (in seconds) to wait for the Neutron API.
def _list_agents():
return True
except exceptions.RestClientException:
return False
common_utils.wait_until_true(_list_agents, timeout=timeout, sleep=1)
def get_neutron_api_service_name():
"""Return the Neutron API service name based on the test configuration"""
if WB_CONF.openstack_type == 'devstack':
# NOTE: in OSP18+, the Neutron API will use WSGI by default (not the
# eventlet server) and the name will be "neutron api"
return 'q svc'
return 'neutron api'
def get_ml2_conf_file():
"""Neutron ML2 config file name depending on the installation type
The default value of WB_CONF.ml2_plugin_config is
if WB_CONF.openstack_type in ('podified', 'devstack'):
return WB_CONF.ml2_plugin_config
return ('/var/lib/config-data/puppet-generated/neutron' +