Fix test_stamp_pattern to remove dependency of user supplied volume name

This patch removes CONF.compute.volume_device_name dependency and
use modified list_disks method to get the newly attached
device name and use that for mkfs rather than the user supplied
device name which is based on assumtion.

Also modfied test_attach_detach testcase and unit test
for remote_client accordingly.

Change-Id: I862d1ba55f45b09385057a51b0b52a851c19287e
Closes-Bug: #1852807
This commit is contained in:
Paras Babbar 2019-12-11 16:51:57 -05:00
parent d3c82c7922
commit 4b45f9e54c
5 changed files with 39 additions and 31 deletions

View File

@ -79,7 +79,7 @@ class AttachVolumeTestJSON(BaseAttachVolumeTest):
# NOTE(andreaf) We need to ensure the ssh key has been
# injected in the guest before we power cycle
linux_client.validate_authentication()
disks_before_attach = linux_client.count_disks()
disks_before_attach = linux_client.list_disks()
volume = self.create_volume()
@ -101,8 +101,10 @@ class AttachVolumeTestJSON(BaseAttachVolumeTest):
'ACTIVE')
if CONF.validation.run_validation:
disks_after_attach = linux_client.count_disks()
self.assertGreater(disks_after_attach, disks_before_attach)
disks_after_attach = linux_client.list_disks()
self.assertGreater(
len(disks_after_attach),
len(disks_before_attach))
self.servers_client.detach_volume(server['id'], attachment['volumeId'])
waiters.wait_for_volume_resource_status(
@ -117,8 +119,8 @@ class AttachVolumeTestJSON(BaseAttachVolumeTest):
'ACTIVE')
if CONF.validation.run_validation:
disks_after_detach = linux_client.count_disks()
self.assertEqual(disks_before_attach, disks_after_detach)
disks_after_detach = linux_client.list_disks()
self.assertEqual(len(disks_before_attach), len(disks_after_detach))
@decorators.idempotent_id('7fa563fe-f0f7-43eb-9e22-a1ece036b513')
def test_list_get_volume_attachments(self):

View File

@ -73,12 +73,12 @@ class RemoteClient(remote_client.RemoteClient):
msg = "'TYPE' column is required but the output doesn't have it: "
raise tempest.lib.exceptions.TempestException(msg + output)
def count_disks(self):
def list_disks(self):
disks_list = self.get_disks()
disks_list = [line[0] for line in
[device_name.split()
for device_name in disks_list.splitlines()][1:]]
return len(disks_list)
return disks_list
def get_boot_time(self):
cmd = 'cut -f1 -d. /proc/uptime'

View File

@ -634,8 +634,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
def nova_volume_attach(self, server, volume_to_attach):
volume = self.servers_client.attach_volume(
server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
% CONF.compute.volume_device_name)['volumeAttachment']
server['id'], volumeId=volume_to_attach['id'])['volumeAttachment']
self.assertEqual(volume_to_attach['id'], volume['id'])
waiters.wait_for_volume_resource_status(self.volumes_client,
volume['id'], 'in-use')

View File

@ -55,20 +55,24 @@ class TestStampPattern(manager.ScenarioTest):
if not CONF.volume_feature_enabled.snapshot:
raise cls.skipException("Cinder volume snapshots are disabled")
def _wait_for_volume_available_on_the_system(self, ip_address,
private_key):
def _attached_volume_name(
self, disks_list_before_attach, ip_address, private_key):
ssh = self.get_remote_client(ip_address, private_key=private_key)
def _func():
disks = ssh.get_disks()
LOG.debug("Disks: %s", disks)
return CONF.compute.volume_device_name in disks
def _wait_for_volume_available_on_system():
disks_list_after_attach = ssh.list_disks()
return len(disks_list_after_attach) > len(disks_list_before_attach)
if not test_utils.call_until_true(_func,
if not test_utils.call_until_true(_wait_for_volume_available_on_system,
CONF.compute.build_timeout,
CONF.compute.build_interval):
raise lib_exc.TimeoutException
disks_list_after_attach = ssh.list_disks()
volume_name = [item for item in disks_list_after_attach
if item not in disks_list_before_attach][0]
return volume_name
@decorators.attr(type='slow')
@decorators.idempotent_id('10fd234a-515c-41e5-b092-8323060598c5')
@testtools.skipUnless(CONF.compute_feature_enabled.snapshot,
@ -91,15 +95,16 @@ class TestStampPattern(manager.ScenarioTest):
ip_for_server = self.get_server_ip(server)
# Make sure the machine ssh-able before attaching the volume
self.get_remote_client(ip_for_server,
private_key=keypair['private_key'],
server=server)
linux_client = self.get_remote_client(
ip_for_server, private_key=keypair['private_key'],
server=server)
disks_list_before_attach = linux_client.list_disks()
self.nova_volume_attach(server, volume)
self._wait_for_volume_available_on_the_system(ip_for_server,
keypair['private_key'])
volume_device_name = self._attached_volume_name(
disks_list_before_attach, ip_for_server, keypair['private_key'])
timestamp = self.create_timestamp(ip_for_server,
CONF.compute.volume_device_name,
volume_device_name,
private_key=keypair['private_key'],
server=server)
self.nova_volume_detach(server, volume)
@ -126,18 +131,19 @@ class TestStampPattern(manager.ScenarioTest):
# Make sure the machine ssh-able before attaching the volume
# Just a live machine is responding
# for device attache/detach as expected
self.get_remote_client(ip_for_snapshot,
private_key=keypair['private_key'],
server=server_from_snapshot)
linux_client = self.get_remote_client(
ip_for_snapshot, private_key=keypair['private_key'],
server=server_from_snapshot)
disks_list_before_attach = linux_client.list_disks()
# attach volume2 to instance2
self.nova_volume_attach(server_from_snapshot, volume_from_snapshot)
self._wait_for_volume_available_on_the_system(ip_for_snapshot,
keypair['private_key'])
volume_device_name = self._attached_volume_name(
disks_list_before_attach, ip_for_snapshot, keypair['private_key'])
# check the existence of the timestamp file in the volume2
timestamp2 = self.get_timestamp(ip_for_snapshot,
CONF.compute.volume_device_name,
volume_device_name,
private_key=keypair['private_key'],
server=server_from_snapshot)
self.assertEqual(timestamp, timestamp2)

View File

@ -106,14 +106,15 @@ sdb 8:16 0 1000204886016 0 disk"""
self.assertEqual(self.conn.get_disks(), result)
self._assert_exec_called_with('lsblk -lb --nodeps')
def test_count_disk(self):
def test_list_disks(self):
output_lsblk = """\
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
sda 8:0 0 128035676160 0 disk
sdb 8:16 0 1000204886016 0 disk
sr0 11:0 1 1073741312 0 rom"""
disk_list = ['sda', 'sdb']
self.ssh_mock.mock.exec_command.return_value = output_lsblk
self.assertEqual(self.conn.count_disks(), 2)
self.assertEqual(self.conn.list_disks(), disk_list)
def test_get_boot_time(self):
booted_at = 10000