Migrate routed provider network tests

Moved downstream tests with minimal adjustments, changed uuids.
Added necessary config values.

Change-Id: I763726b2a676b6f0f8908adc775d45d2e1daace1
This commit is contained in:
Roman Safronov
2024-03-22 01:30:13 +02:00
parent 383df48782
commit 03bedfea5a
2 changed files with 692 additions and 3 deletions

View File

@@ -65,6 +65,10 @@ WhiteboxNeutronPluginOptions = [
help='Specify explicitly whether to run traffic flow tests.'
' This is needed because some ML2 plugins (e.g. ovn ) do '
'not expose api_extensions like dvr for some purposes.'),
cfg.BoolOpt('provroutednetwork_support',
default=False,
help='Boolean that specifies if Provider Routed Networks'
'are supported or not'),
cfg.IntOpt('broadcast_receivers_count',
default=2,
help='How many receivers to use in broadcast tests. Default '
@@ -91,13 +95,16 @@ WhiteboxNeutronPluginOptions = [
cfg.StrOpt('neutron_config',
default='/etc/neutron/neutron.conf',
help='Path to neutron configuration file.'),
cfg.StrOpt('ml2_plugin_config',
default='/etc/neutron/plugins/ml2/ml2_conf.ini',
help='Path to ml2 plugin config.'),
cfg.StrOpt('ext_bridge',
default='br-ex',
help="OpenvSwitch bridge dedicated for external network."),
cfg.IntOpt('ovn_max_controller_gw_ports_per_router',
default=1,
help='The number of network nodes used '
'for the OVN router HA.'),
cfg.StrOpt('ml2_plugin_config',
default='/etc/neutron/plugins/ml2/ml2_conf.ini',
help='Path to ml2 plugin config.'),
cfg.IntOpt('minbw_placement_nic_kbps_egress',
default=None,
help='BW configured per NIC for the minimum BW placement '

View File

@@ -0,0 +1,682 @@
# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
from neutron_tempest_plugin import config
from paramiko import ssh_exception as ssh_exc
from tempest.common import waiters
from tempest.lib.common import fixed_network
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from whitebox_neutron_tempest_plugin.tests.scenario import base
CONF = config.CONF
WB_CONF = config.CONF.whitebox_neutron_plugin_options
class ProviderRoutedNetworkBaseTest(base.ProviderBaseTest):
"""Base class for tests using provisioning networks"""
required_extensions = ['segment']
@classmethod
def setup_clients(cls):
super(ProviderRoutedNetworkBaseTest, cls).setup_clients()
cls.client = cls.os_adm.network_client
cls.keypairs_client = cls.os_adm.keypairs_client
cls.servers_client = cls.os_adm.servers_client
cls.resources = cls.map_resources()
@classmethod
def skip_checks(cls):
super(ProviderRoutedNetworkBaseTest, cls).skip_checks()
if not WB_CONF.provroutednetwork_support:
raise cls.skipException("Skipped because Provider Routed Networks "
"are not supported")
@classmethod
def get_networks_with_segments(cls):
"""Collect list of networks that have segments"""
client = cls.os_adm.network_client
nets = client.list_networks()['networks']
nets_with_segments = []
for net in nets:
if net.get('segments'):
nets_with_segments.append(net['id'])
return nets_with_segments
@classmethod
def get_host_physical_network(cls, host):
"""Searches for the name of the physical network attached to the host
It is not possible to request the name of the physical network that
is attached to the specific hypervisor. In order to get it we can
create port with specified network and hypervisor so there will
be information about the subnet that the port has fixed IP address.
That subnet belongs to the segment that we can get the physical
network name from.
"""
client = cls.os_adm.network_client
network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, client)
port = client.create_port(network_id=network['id'],
**{'binding:host_id': host})['port']
subnet_id = port['fixed_ips'][0]['subnet_id']
client.delete_port(port['id'])
subnet = client.show_subnet(subnet_id)['subnet']
segment_id = subnet.get('segment_id')
if not segment_id:
return ''
segment = client.show_segment(segment_id)
return segment['segment']['physical_network']
@classmethod
def map_resources(cls):
"""Collect the information about resources in the availability zone
Returnes the dictionary of the following schema:
{<az_name>: {'hosts': <list_of_hypervisors_in_az>,
'physical_network': <name_of_the_physical_network>,
'networks':
{<net_id>:
{<segment_id>: [<subnet_ids>]}
}
},
<az2_name>: {...},
<az3_name>: {...}}
"""
resources = {}
client = cls.os_adm.network_client
az_list = cls.os_admin.az_client.list_availability_zones(detail=True)
ext_nets = cls.get_networks_with_segments()
for az in az_list['availabilityZoneInfo']:
az_computes = []
physical_net = ''
for host in az.get('hosts'):
if az['hosts'][host].get('nova-compute'):
az_computes.append(host)
if not physical_net:
physical_net = cls.get_host_physical_network(host)
if not az_computes:
continue
networks = {}
for net in ext_nets:
networks[net] = {}
segments = client.list_segments(**{'network_id': net})
for segment in segments['segments']:
if segment['physical_network'] == physical_net:
networks[net][segment['id']] = []
subnets = client.list_subnets(
**{'network_id': net})
for subnet in subnets['subnets']:
if subnet.get('segment_id') == segment['id']:
networks[net][segment['id']].append(
subnet['id'])
resources[az['zoneName']] = {'hosts': az_computes,
'physical_network': physical_net,
'networks': networks}
return resources
def create_server_with_params(self, ports=(), nets=(), az='', client=None):
"""Create server with specified ports, networks, availability zone
Ports and nets are the lists of the corresponding UUIDs. Ports have to
be created in advance.
"""
srv_params = {'networks': []}
if not ports and not nets:
network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, self.client)
nets = (network['id'], )
if nets:
srv_params['security_groups'] = [{'name': self.secgroup['name']}]
for network in nets:
srv_params['networks'].append({'uuid': network})
if ports:
for port in ports:
srv_params['networks'].append({'port': port})
if az:
srv_params['availability_zone'] = az
srv_params['name'] = data_utils.rand_name(self._testMethodName)
client = client or self.servers_client
server = client.create_server(flavorRef=CONF.compute.flavor_ref,
imageRef=CONF.compute.image_ref,
key_name=self.keypair['name'],
**srv_params)['server']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
client,
server['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_server,
server['id'])
return server
def wait_for_server_ready(self, server, client=None):
"""Wait for server is ACTIVE and booted"""
client = client or self.os_adm.servers_client
self.wait_for_server_active(server, client)
def system_booted():
console = client.get_console_output(server['id'])['output']
for line in console.split('\n'):
if 'login:' in line.lower():
return True
return False
try:
utils.wait_until_true(system_booted, timeout=300, sleep=5)
except utils.WaitTimeout:
pass
return client.show_server(server['id'])['server']
def get_provider_network_ssh_client(self, server, key=None, timeout=300):
"""Return SSH client for the server connected to the provider network
As there might be more than one interface for the server, function
will iterate over all the available IPv4 interfaces
"""
if not key:
key = self.keypair
for addresses in server['addresses'].values():
for address in addresses:
ssh_client = ssh.Client(address['addr'],
self.username,
pkey=key['private_key'],
timeout=timeout)
try:
ssh_client.test_connection_auth()
return ssh_client
except (lib_exc.SSHTimeout,
ssh_exc.AuthenticationException,
ssh_exc.NoValidConnectionsError):
continue
else:
raise lib_exc.TimeoutException(
'Can not establish SSH '
'connection with one of the available addresses: '
'{}'.format(server['addresses']))
def verify_ips_on_interfaces(self, server):
"""Make sure that all IP addresses are configured on interfaces
For cirros images the second interface should be enabled manually
after the server is created. All other OSes (RHEL, Centos, Ubuntu)
are configured automatically still there is `nmcli` command specified
to create link with DHCP on the supported images
"""
ssh_client = self.get_provider_network_ssh_client(server)
ifs = ssh_client.exec_command("PATH=$PATH:/sbin:/usr/bin; ip -f inet"
" -o addr | awk '{{print $2}}'")
links_with_ips = ifs.strip().split('\n')
links = ssh_client.exec_command("PATH=$PATH:/sbin:/usr/bin; ip -f "
"inet -o link | awk '{{print $2}}'")
all_links = [link.strip(':') for link in links.strip().split('\n')]
links_with_no_ip = []
for link in all_links:
if link not in links_with_ips:
links_with_no_ip.append(link)
if not links_with_no_ip:
return
for link in links_with_no_ip:
cmd = 'sudo chmod 777 {net_file} && '\
'sudo echo "auto {link}" >> {net_file} && '\
'sudo echo "iface {link} inet dhcp" >> {net_file} && '\
'sudo chmod 664 {net_file} || '\
'sudo nmcli con add type ethernet con-name et ifname {link}'\
.format(net_file='/etc/network/interfaces', link=link)
ssh_client.exec_command(cmd)
self.os_adm.servers_client.reboot_server(server['id'],
**{"type": "SOFT"})
def create_network_with_segment(self):
"""Creates dummy provider network with segment
This function is helpful when you need to check the configuration
or the database entries as it will not take any parameters for
the created network and so all the connectivity tests will fail
"""
segments = self.client.list_segments()
for segment in segments['segments']:
if segment['physical_network'] and \
segment['network_type'] == 'vlan':
physical_network = segment['physical_network']
break
net = self.create_network(external=True,
shared=True,
provider_network_type='vlan',
provider_physical_network=physical_network,
provider_segmentation_id='666')
net_id = net['id']
segment = self.client.list_segments(**{'network_id': net_id})
segment_id = segment['segments'][0]['id']
return [net_id, segment_id]
def create_segment_for_network(self, network_id):
"""Creates dummy segment for the provided network
This function is helpful when you need to check the configuration
or the database entries as it will not take any valuable parameters for
the segment to be created
"""
segments = self.client.list_segments()
for segment in segments['segments']:
if segment['physical_network'] and \
segment['network_type'] == 'vlan':
physical_network = segment['physical_network']
break
segment = self.client.create_segment(
**{'network_id': network_id,
'physical_network': physical_network,
'network_type': 'vlan',
'segmentation_id': '654'})['segment']
return segment['id']
class ProviderRoutedNetworkOVNConfigTest(ProviderRoutedNetworkBaseTest,
base.BaseTempestTestCaseOvn):
"""Class contains tests for verification routed provider network OVN config
"""
def get_host_ovn_bridge_mapping(self, host):
"""Get the bridge mapping from the host
Mapping has the following format: '"leaf0:br-ex"'. Need to be careful
with quotes. We return the dictionalry with bridge name as a key and
physical network name as a value
"""
if not hasattr(self, 'nodes'):
self.discover_nodes()
cmd = 'sudo ovs-vsctl get open . external_ids:ovn-bridge-mappings'
for node in self.nodes:
if host.startswith(node['name']):
output = node['client'].exec_command(cmd).strip()
if output:
mapping = {}
bridges = output.strip('"').split(',')
for bridge in bridges:
bridge_name = bridge.split(':')[1]
physical_network_name = bridge.split(':')[0]
mapping[bridge_name] = physical_network_name
return mapping
return {}
@decorators.idempotent_id('2c2ee2b7-ab4f-4b2b-aeb8-66ddad841129')
def test_bridge_mappings(self):
"""Verifies that correct physical networks attached to computes
Each compute node should have the physical network attached according
to its availability zone.
"""
ext_bridge = WB_CONF.ext_bridge
for _, details in self.resources.items():
for host in details['hosts']:
mapping = self.get_host_ovn_bridge_mapping(host)
physical_network = mapping.get(ext_bridge, '')
self.assertEqual(details['physical_network'], physical_network)
@decorators.idempotent_id('93df90c3-de2e-4e2d-9ec3-82dc7fe4a8f3')
def test_shared_network_ovn_db_configuration(self):
"""Test that all external networks and ports are created in OVN DB
There supposed to be separate logical switch for each external
network. Each logical switch should contain ports for every segment
that is available for the external network.
"""
port_to_net = []
port_to_physical_net = []
for _, details in self.resources.items():
for net_id, net_details in details['networks'].items():
for segment_id in net_details:
port_name = 'provnet-{sid}'.format(sid=segment_id)
phys_net = details['physical_network']
net_name = 'neutron-{nid}'.format(nid=net_id)
port_to_net.append([port_name, net_name])
port_to_physical_net.append([port_name, phys_net])
for port, net in port_to_net:
self.assertEqual(net, self.get_logical_switch(port))
for port, net in port_to_physical_net:
self.assertEqual(net, self.get_physical_net(port))
@decorators.idempotent_id('0de60d28-5c63-4825-b2a9-f5c070a7e7a3')
def test_segment_removed_from_ovn_db(self):
"""Test that segment is removed from the OVN DB correctly
When we delete the network segment we should not see it in the OVN
database anymore. We should make sure that the relevant records are
created before we will verify the deletion.
"""
network_id, segment_id = self.create_network_with_segment()
new_segment_id = self.create_segment_for_network(network_id)
for sid in segment_id, new_segment_id:
port = 'provnet-{sid}'.format(sid=sid)
net = 'neutron-{nid}'.format(nid=network_id)
self.assertEqual(net, self.get_logical_switch(port))
self.client.delete_segment(new_segment_id)
self.verify_that_segment_deleted(new_segment_id)
class RoutedProviderNetworkConnectivityTests(ProviderRoutedNetworkBaseTest):
"""Class to test connectivity through the routed provider networks"""
def _get_pingable_ips(self, servers):
"""Get all the IP addresses assigned to servers from the list
All IP addresses are divided by protocol (IPv4 and IPv6)
"""
ips_to_ping = {4: [], 6: []}
for server in servers:
srv_details = self.os_adm.servers_client.show_server(server['id'])
for addresses in srv_details['server']['addresses'].values():
for address in addresses:
ips_to_ping[address['version']].append(address['addr'])
return ips_to_ping
def _get_servers_ssh_clients(self, servers, key=None):
"""Get list of ssh clients together with supported IP versions
"""
client = self.os_adm.servers_client
ssh_clients = []
for srv in servers:
srv_info = client.show_server(srv['id'])['server']
supported_ip_versions = set()
for addresses in srv_info['addresses'].values():
for address in addresses:
supported_ip_versions.add(address['version'])
ssh_client = self.get_provider_network_ssh_client(srv_info, key)
self.assertIsNotNone(ssh_client)
ssh_clients.append([ssh_client, supported_ip_versions])
return ssh_clients
def _ping_supported_ips(self, ssh_client, ips_to_ping, ip_versions):
"""Try to ping all the IP addresses of supported version using ssh
"""
for ip_ver in ip_versions:
for ip_addr in ips_to_ping.get(ip_ver, []):
self.check_remote_connectivity(ssh_client, ip_addr)
@decorators.idempotent_id('2250efcc-6ff9-493a-9884-a7164e88b0fe')
def test_cross_segment_connectivity(self):
"""Test connectivity between different segments
Connectivity is verified between different segments and different
networks too if more than one provider network is created
"""
servers = []
for az, details in self.resources.items():
for net_id in details.get('networks'):
servers.append(self.create_server_with_params(
nets=(net_id, ),
az=az))
for srv in servers:
self.wait_for_server_ready(srv)
ssh_clients = self._get_servers_ssh_clients(servers)
ips_to_ping = self._get_pingable_ips(servers)
for ssh_client, ip_versions in ssh_clients:
self._ping_supported_ips(ssh_client, ips_to_ping, ip_versions)
@decorators.idempotent_id('f6846357-ab69-40d0-86d6-1f2574adb070')
def test_cross_segment_connectivity_with_preconfigured_ports(self):
"""Test connectivity between different segments with preconf ports
Connectivity is verified between different segments and different
networks too if more than one provider network is created. Servers are
spawened with preconfigured ports
"""
servers = []
for az, details in self.resources.items():
for net_id, net_details in details.get('networks', {}).items():
for subnets in net_details.values():
fixed_subnets = []
for subnet_id in subnets:
fixed_subnets.append({'subnet_id': subnet_id})
port_id = self.create_port(
network={'id': net_id},
security_groups=[self.secgroup['id']],
fixed_ips=fixed_subnets)['id']
servers.append(self.create_server_with_params(
ports=(port_id, ),
az=az))
for srv in servers:
self.wait_for_server_ready(srv)
ssh_clients = self._get_servers_ssh_clients(servers)
ips_to_ping = self._get_pingable_ips(servers)
for ssh_client, ip_versions in ssh_clients:
self._ping_supported_ips(ssh_client, ips_to_ping, ip_versions)
@decorators.idempotent_id('4e49bf45-b928-4689-afc3-a75736f83891')
def test_create_server_by_non_admin_user(self):
"""Create server in provider network by non admin user"""
client = self.os_primary.servers_client
key = self.create_keypair(client=self.os_primary.keypairs_client)
sg_name = data_utils.rand_name('secgroup')
sg = self.os_primary.network_client.create_security_group(
name=sg_name)['security_group']
self.security_groups.append(sg)
self.create_loginable_secgroup_rule(secgroup_id=sg['id'],
client=self.client)
self.create_pingable_secgroup_rule(secgroup_id=sg['id'],
client=self.client)
servers = []
for az, az_details in self.resources.items():
for network in az_details['networks']:
network_id = network
break
params = {'name': data_utils.rand_name(self._testMethodName),
'networks': [{'uuid': network_id}],
'flavorRef': CONF.compute.flavor_ref,
'imageRef': CONF.compute.image_ref,
'key_name': key['name'],
'security_groups': [{'name': sg_name}],
'availability_zone': az}
server = client.create_server(**params)['server']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
self.servers_client,
server['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.servers_client.delete_server,
server['id'])
servers.append(server)
for srv in servers:
server = self.wait_for_server_ready(srv)
ssh_clients = self._get_servers_ssh_clients(servers, key)
ips_to_ping = self._get_pingable_ips(servers)
for ssh_client, ip_versions in ssh_clients:
self._ping_supported_ips(ssh_client, ips_to_ping, ip_versions)
@decorators.idempotent_id('329342b5-9ec6-433a-96bc-82619a549343')
def test_instance_with_two_provider_networks_connectivity(self):
"""Test connectivity between servers with two interfaces
Servers are created with two interfaces in two different networks
one server per availability zone.
"""
nets = self.get_networks_with_segments()
if len(nets) < 2:
raise self.skipException('Only one provider network available')
servers = []
for az in self.resources:
servers.append(self.create_server_with_params(
nets=(nets[0], nets[1]), az=az))
for srv in servers:
server = self.wait_for_server_ready(srv)
self.verify_ips_on_interfaces(server)
for srv in servers:
server = self.wait_for_server_ready(srv)
ssh_clients = self._get_servers_ssh_clients(servers)
ips_to_ping = self._get_pingable_ips(servers)
for ssh_client, ip_versions in ssh_clients:
self._ping_supported_ips(ssh_client, ips_to_ping, ip_versions)
class RoutedProviderNetworkMigrationTests(
ProviderRoutedNetworkBaseTest, base.BaseTempestWhiteboxTestCase):
"""Class to test migration in routed provider networks
Migration should work in only availability zones with more than one
compute node available. If there is no availability zone with a single
compute node the negative tests will be skipped
"""
@classmethod
def resource_setup(cls):
super(RoutedProviderNetworkMigrationTests, cls).resource_setup()
cls.overcloudrc_path = cls.get_stack_rc_file_path()
def _migrate_servers(self, servers, live_migration=False):
"""Perform and verify migration for several VMs"""
if live_migration:
block_migration = (CONF.compute_feature_enabled.
block_migration_for_live_migration)
for srv in servers:
self.servers_client.live_migrate_server(
srv['id'],
block_migration=block_migration,
host=None,
disk_over_commit=False)
else:
for srv in servers:
self.servers_client.migrate_server(srv['id'])
for srv in servers:
waiters.wait_for_server_status(self.servers_client,
srv['id'],
'VERIFY_RESIZE')
self.servers_client.confirm_resize_server(srv['id'])
for srv in servers:
server = self.wait_for_server_ready(srv)
old_host = srv['OS-EXT-SRV-ATTR:host']
new_host = server['OS-EXT-SRV-ATTR:host']
self.assertNotEqual(old_host, new_host, 'VM did not migrate')
self.assertIsNotNone(
self.get_provider_network_ssh_client(server, timeout=60))
@testtools.skipUnless(CONF.compute_feature_enabled.live_migration,
'Live migration is not available.')
@testtools.skipUnless(
CONF.compute_feature_enabled.block_migration_for_live_migration,
'Block migration for live migration is not available.')
@decorators.idempotent_id('337c57f9-5dd5-4cc2-b68e-a14701081735')
def test_live_migration_in_multiple_compute_zone(self):
"""Test live migration in availability zone with more than one compute
"""
skip_migration = True
servers = []
servers_details = []
for az, details in self.resources.items():
if len(details.get('hosts', [])) > 1:
skip_migration = False
servers.append(self.create_server_with_params(az=az))
if skip_migration:
raise self.skipException(
'No availability zone with more than one compute')
for srv in servers:
servers_details.append(self.wait_for_server_ready(srv))
self._migrate_servers(servers_details, live_migration=True)
@testtools.skipUnless(CONF.compute_feature_enabled.live_migration,
'Live migration is not available.')
@testtools.skipUnless(
CONF.compute_feature_enabled.block_migration_for_live_migration,
'Block migration for live migration is not available.')
@decorators.idempotent_id('d9f8c7ea-8469-4be7-86b3-e89113c5b8e0')
def test_live_migration_in_single_compute_zone(self):
"""Test live migration in availability zone with one compute"""
skip_migration = True
servers = []
for az, details in self.resources.items():
if len(details.get('hosts', [])) == 1:
skip_migration = False
servers.append(self.create_server_with_params(az=az))
if skip_migration:
raise self.skipException(
'No availability zone with only one compute')
for srv in servers:
self.wait_for_server_ready(srv)
self.assertRaises(lib_exc.BadRequest,
self._migrate_servers,
[srv, ],
live_migration=True)
@testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,
'Cold migration is not available.')
@decorators.idempotent_id('e9ca4aee-3b90-467a-b6c8-d9f97afbf6ae')
def test_cold_migration_in_multiple_compute_zone(self):
"""Test cold migration in availability zone with more than one compute
"""
skip_migration = True
servers = []
servers_details = []
for az, details in self.resources.items():
if len(details.get('hosts', [])) > 1:
skip_migration = False
servers.append(self.create_server_with_params(az=az))
if skip_migration:
raise self.skipException(
'No availability zone with more than one compute')
for srv in servers:
servers_details.append(self.wait_for_server_ready(srv))
self._migrate_servers(servers_details)
@testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,
'Cold migration is not available.')
@decorators.attr(type='negative')
@decorators.idempotent_id('7fc70f13-5712-46d9-ba17-5d7c6b73b203')
def test_negative_cold_migration_in_single_compute_zone(self):
"""Negative test cold migration in availability zone with one compute
"""
wait_err = (
"Negative test failed, didn't respond with 400 bad request "
"status or showed error state when migration was listed.")
skip_migration = True
servers = []
for az, details in self.resources.items():
if len(details.get('hosts', [])) == 1:
skip_migration = False
servers.append(self.create_server_with_params(az=az))
if skip_migration:
raise self.skipException(
'No availability zone with only one compute')
for srv in servers:
self.wait_for_server_ready(srv)
try:
self.assertRaises(lib_exc.BadRequest,
self._migrate_servers,
[srv, ])
# Nova API on OSP>17 sets error status when listing
# migrations instead of 400 response (BadRequest) for this test
except AssertionError:
cmd = ('. {} && '
'openstack server migration list --server {} '
' -f value -c Status').format(
self.overcloudrc_path, srv['id'])
utils.wait_until_true(lambda: self.validate_command(
cmd,
pattern='error',
ret_bool_pattern=True),
timeout=90,
sleep=10,
exception=RuntimeError(wait_err))