tempest/tempest/api/volume/base.py
Benny Kopilov 7beb2d0ceb Allows to skip wait for volume create
Current code does not support skipping wait for
volume, tempest plugins require test without waiting
and still use all existing flow code.

Examples:
Create multiple volume in a batch without waiting
Create multiple volumes and expecting for error state

Instead of duplicating code its better to set a flag,
wait_until default value is available, when its None we
skip waiting inside create volume

Change-Id: I7fe90e26f773b3a128e3d1bff1b89a3ef665eebb
2022-04-13 07:26:36 +03:00

347 lines
15 KiB
Python

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import compute
from tempest.common import waiters
from tempest import config
from tempest.lib.common import api_version_utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
import tempest.test
CONF = config.CONF
class BaseVolumeTest(api_version_utils.BaseMicroversionTest,
tempest.test.BaseTestCase):
"""Base test case class for all Cinder API tests."""
# Set this to True in subclasses to create a default network. See
# https://bugs.launchpad.net/tempest/+bug/1844568
create_default_network = False
credentials = ['primary']
@classmethod
def skip_checks(cls):
super(BaseVolumeTest, cls).skip_checks()
if not CONF.service_available.cinder:
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
raise cls.skipException(skip_msg)
api_version_utils.check_skip_with_microversion(
cls.volume_min_microversion, cls.volume_max_microversion,
CONF.volume.min_microversion, CONF.volume.max_microversion)
@classmethod
def setup_credentials(cls):
cls.set_network_resources(
network=cls.create_default_network,
subnet=cls.create_default_network)
super(BaseVolumeTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseVolumeTest, cls).setup_clients()
cls.servers_client = cls.os_primary.servers_client
if CONF.service_available.glance:
cls.images_client = cls.os_primary.image_client_v2
cls.backups_client = cls.os_primary.backups_client_latest
cls.volumes_client = cls.os_primary.volumes_client_latest
cls.messages_client = cls.os_primary.volume_messages_client_latest
cls.versions_client = cls.os_primary.volume_versions_client_latest
cls.groups_client = cls.os_primary.groups_client_latest
cls.group_snapshots_client = (
cls.os_primary.group_snapshots_client_latest)
cls.snapshots_client = cls.os_primary.snapshots_client_latest
cls.volumes_extension_client =\
cls.os_primary.volumes_extension_client_latest
cls.availability_zone_client = (
cls.os_primary.volume_availability_zone_client_latest)
cls.volume_limits_client = cls.os_primary.volume_limits_client_latest
@classmethod
def resource_setup(cls):
super(BaseVolumeTest, cls).resource_setup()
cls.volume_request_microversion = (
api_version_utils.select_request_microversion(
cls.volume_min_microversion,
CONF.volume.min_microversion))
cls.compute_request_microversion = (
api_version_utils.select_request_microversion(
cls.min_microversion,
CONF.compute.min_microversion))
cls.setup_api_microversion_fixture(
compute_microversion=cls.compute_request_microversion,
volume_microversion=cls.volume_request_microversion)
cls.image_ref = CONF.compute.image_ref
cls.flavor_ref = CONF.compute.flavor_ref
cls.build_interval = CONF.volume.build_interval
cls.build_timeout = CONF.volume.build_timeout
@classmethod
def create_volume(cls, wait_until='available', **kwargs):
"""Wrapper utility that returns a test volume.
:param wait_until: wait till volume status, None means no wait.
"""
if 'size' not in kwargs:
kwargs['size'] = CONF.volume.volume_size
if 'imageRef' in kwargs:
image = cls.images_client.show_image(kwargs['imageRef'])
min_disk = image['min_disk']
kwargs['size'] = max(kwargs['size'], min_disk)
if 'name' not in kwargs:
name = data_utils.rand_name(cls.__name__ + '-Volume')
kwargs['name'] = name
if CONF.volume.volume_type and 'volume_type' not in kwargs:
# If volume_type is not provided in config then no need to
# add a volume type and
# if volume_type has already been added by child class then
# no need to override.
kwargs['volume_type'] = CONF.volume.volume_type
if CONF.compute.compute_volume_common_az:
kwargs.setdefault('availability_zone',
CONF.compute.compute_volume_common_az)
volume = cls.volumes_client.create_volume(**kwargs)['volume']
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
cls.delete_volume, cls.volumes_client,
volume['id'])
if wait_until:
waiters.wait_for_volume_resource_status(cls.volumes_client,
volume['id'], wait_until)
return volume
@classmethod
def create_snapshot(cls, volume_id=1, **kwargs):
"""Wrapper utility that returns a test snapshot."""
if 'name' not in kwargs:
name = data_utils.rand_name(cls.__name__ + '-Snapshot')
kwargs['name'] = name
snapshot = cls.snapshots_client.create_snapshot(
volume_id=volume_id, **kwargs)['snapshot']
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
cls.delete_snapshot, snapshot['id'])
waiters.wait_for_volume_resource_status(cls.snapshots_client,
snapshot['id'], 'available')
return snapshot
def create_backup(self, volume_id, backup_client=None, **kwargs):
"""Wrapper utility that returns a test backup."""
if backup_client is None:
backup_client = self.backups_client
if 'name' not in kwargs:
name = data_utils.rand_name(self.__class__.__name__ + '-Backup')
kwargs['name'] = name
backup = backup_client.create_backup(
volume_id=volume_id, **kwargs)['backup']
# addCleanup uses list pop to cleanup. Wait should be added before
# the backup is deleted
self.addCleanup(backup_client.wait_for_resource_deletion,
backup['id'])
self.addCleanup(backup_client.delete_backup, backup['id'])
waiters.wait_for_volume_resource_status(backup_client, backup['id'],
'available')
return backup
# NOTE(afazekas): these create_* and clean_* could be defined
# only in a single location in the source, and could be more general.
@staticmethod
def delete_volume(client, volume_id):
"""Delete volume by the given client"""
client.delete_volume(volume_id)
client.wait_for_resource_deletion(volume_id)
@classmethod
def delete_snapshot(cls, snapshot_id, snapshots_client=None):
"""Delete snapshot by the given client"""
if snapshots_client is None:
snapshots_client = cls.snapshots_client
snapshots_client.delete_snapshot(snapshot_id)
snapshots_client.wait_for_resource_deletion(snapshot_id)
def attach_volume(self, server_id, volume_id):
"""Attach a volume to a server"""
self.servers_client.attach_volume(
server_id, volumeId=volume_id,
device='/dev/%s' % CONF.compute.volume_device_name)
waiters.wait_for_volume_resource_status(self.volumes_client,
volume_id, 'in-use')
self.addCleanup(waiters.wait_for_volume_resource_status,
self.volumes_client, volume_id, 'available')
self.addCleanup(self.servers_client.detach_volume, server_id,
volume_id)
def create_server(self, wait_until='ACTIVE', **kwargs):
name = kwargs.pop(
'name',
data_utils.rand_name(self.__class__.__name__ + '-instance'))
tenant_network = self.get_tenant_network()
body, _ = compute.create_test_server(
self.os_primary,
tenant_network=tenant_network,
name=name,
wait_until=wait_until,
**kwargs)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
self.servers_client, body['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.servers_client.delete_server, body['id'])
return body
def create_group(self, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = data_utils.rand_name(
self.__class__.__name__ + '-Group')
group = self.groups_client.create_group(**kwargs)['group']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_group, group['id'])
waiters.wait_for_volume_resource_status(
self.groups_client, group['id'], 'available')
return group
def delete_group(self, group_id, delete_volumes=True):
group_vols = []
if delete_volumes:
vols = self.volumes_client.list_volumes(detail=True)['volumes']
for vol in vols:
if vol['group_id'] == group_id:
group_vols.append(vol['id'])
self.groups_client.delete_group(group_id, delete_volumes)
for vol in group_vols:
self.volumes_client.wait_for_resource_deletion(vol)
self.groups_client.wait_for_resource_deletion(group_id)
class BaseVolumeAdminTest(BaseVolumeTest):
"""Base test case class for all Volume Admin API tests."""
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseVolumeAdminTest, cls).setup_clients()
cls.admin_volume_qos_client = cls.os_admin.volume_qos_client_latest
cls.admin_volume_services_client = \
cls.os_admin.volume_services_client_latest
cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest
cls.admin_volume_manage_client = (
cls.os_admin.volume_manage_client_latest)
cls.admin_volume_client = cls.os_admin.volumes_client_latest
cls.admin_groups_client = cls.os_admin.groups_client_latest
cls.admin_messages_client = cls.os_admin.volume_messages_client_latest
cls.admin_group_snapshots_client = \
cls.os_admin.group_snapshots_client_latest
cls.admin_group_types_client = cls.os_admin.group_types_client_latest
cls.admin_hosts_client = cls.os_admin.volume_hosts_client_latest
cls.admin_snapshot_manage_client = \
cls.os_admin.snapshot_manage_client_latest
cls.admin_snapshots_client = cls.os_admin.snapshots_client_latest
cls.admin_backups_client = cls.os_admin.backups_client_latest
cls.admin_encryption_types_client = \
cls.os_admin.encryption_types_client_latest
cls.admin_quota_classes_client = \
cls.os_admin.volume_quota_classes_client_latest
cls.admin_quotas_client = cls.os_admin.volume_quotas_client_latest
cls.admin_volume_limits_client = (
cls.os_admin.volume_limits_client_latest)
cls.admin_capabilities_client = \
cls.os_admin.volume_capabilities_client_latest
cls.admin_scheduler_stats_client = \
cls.os_admin.volume_scheduler_stats_client_latest
@classmethod
def create_test_qos_specs(cls, name=None, consumer=None, **kwargs):
"""create a test Qos-Specs."""
name = name or data_utils.rand_name(cls.__name__ + '-QoS')
consumer = consumer or 'front-end'
qos_specs = cls.admin_volume_qos_client.create_qos(
name=name, consumer=consumer, **kwargs)['qos_specs']
cls.addClassResourceCleanup(cls.clear_qos_spec, qos_specs['id'])
return qos_specs
@classmethod
def create_volume_type(cls, name=None, **kwargs):
"""Create a test volume-type"""
name = name or data_utils.rand_name(cls.__name__ + '-volume-type')
volume_type = cls.admin_volume_types_client.create_volume_type(
name=name, **kwargs)['volume_type']
cls.addClassResourceCleanup(cls.clear_volume_type, volume_type['id'])
return volume_type
def create_encryption_type(self, type_id=None, provider=None,
key_size=None, cipher=None,
control_location=None):
if not type_id:
volume_type = self.create_volume_type()
type_id = volume_type['id']
self.admin_encryption_types_client.create_encryption_type(
type_id, provider=provider, key_size=key_size, cipher=cipher,
control_location=control_location)
def create_encrypted_volume(self, encryption_provider, key_size=256,
cipher='aes-xts-plain64',
control_location='front-end'):
volume_type = self.create_volume_type()
self.create_encryption_type(type_id=volume_type['id'],
provider=encryption_provider,
key_size=key_size,
cipher=cipher,
control_location=control_location)
return self.create_volume(volume_type=volume_type['name'])
def create_group_type(self, name=None, **kwargs):
"""Create a test group-type"""
name = name or data_utils.rand_name(
self.__class__.__name__ + '-group-type')
group_type = self.admin_group_types_client.create_group_type(
name=name, **kwargs)['group_type']
self.addCleanup(self.admin_group_types_client.delete_group_type,
group_type['id'])
return group_type
@classmethod
def clear_qos_spec(cls, qos_id):
test_utils.call_and_ignore_notfound_exc(
cls.admin_volume_qos_client.delete_qos, qos_id)
test_utils.call_and_ignore_notfound_exc(
cls.admin_volume_qos_client.wait_for_resource_deletion, qos_id)
@classmethod
def clear_volume_type(cls, vol_type_id):
test_utils.call_and_ignore_notfound_exc(
cls.admin_volume_types_client.delete_volume_type, vol_type_id)
test_utils.call_and_ignore_notfound_exc(
cls.admin_volume_types_client.wait_for_resource_deletion,
vol_type_id)