tempest/tempest/api/volume/base.py

419 lines
18 KiB
Python

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import compute
from tempest.common import object_storage
from tempest.common import waiters
from tempest import config
from tempest.lib.common import api_version_utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib.decorators import cleanup_order
from tempest.lib import exceptions as lib_exc
import tempest.test
CONF = config.CONF
class BaseVolumeTest(api_version_utils.BaseMicroversionTest,
tempest.test.BaseTestCase):
"""Base test case class for all Cinder API tests."""
# Set this to True in subclasses to create a default network. See
# https://bugs.launchpad.net/tempest/+bug/1844568
create_default_network = False
credentials = ['primary']
@classmethod
def skip_checks(cls):
super(BaseVolumeTest, cls).skip_checks()
if not CONF.service_available.cinder:
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
raise cls.skipException(skip_msg)
if cls.create_default_network and not CONF.service_available.neutron:
skip_msg = (
"%s skipped as Neutron is not available" % cls.__name__)
raise cls.skipException(skip_msg)
api_version_utils.check_skip_with_microversion(
cls.volume_min_microversion, cls.volume_max_microversion,
CONF.volume.min_microversion, CONF.volume.max_microversion)
@classmethod
def setup_credentials(cls):
cls.set_network_resources(
network=cls.create_default_network,
router=cls.create_default_network,
dhcp=cls.create_default_network,
subnet=cls.create_default_network)
super(BaseVolumeTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseVolumeTest, cls).setup_clients()
cls.servers_client = cls.os_primary.servers_client
if CONF.service_available.glance:
cls.images_client = cls.os_primary.image_client_v2
cls.container_client = cls.os_primary.container_client
cls.object_client = cls.os_primary.object_client
cls.backups_client = cls.os_primary.backups_client_latest
cls.volumes_client = cls.os_primary.volumes_client_latest
cls.messages_client = cls.os_primary.volume_messages_client_latest
cls.versions_client = cls.os_primary.volume_versions_client_latest
cls.groups_client = cls.os_primary.groups_client_latest
cls.group_snapshots_client = (
cls.os_primary.group_snapshots_client_latest)
cls.snapshots_client = cls.os_primary.snapshots_client_latest
cls.volumes_extension_client =\
cls.os_primary.volumes_extension_client_latest
cls.availability_zone_client = (
cls.os_primary.volume_availability_zone_client_latest)
cls.volume_limits_client = cls.os_primary.volume_limits_client_latest
@classmethod
def resource_setup(cls):
super(BaseVolumeTest, cls).resource_setup()
cls.volume_request_microversion = (
api_version_utils.select_request_microversion(
cls.volume_min_microversion,
CONF.volume.min_microversion))
cls.compute_request_microversion = (
api_version_utils.select_request_microversion(
cls.min_microversion,
CONF.compute.min_microversion))
cls.setup_api_microversion_fixture(
compute_microversion=cls.compute_request_microversion,
volume_microversion=cls.volume_request_microversion)
cls.image_ref = CONF.compute.image_ref
cls.flavor_ref = CONF.compute.flavor_ref
cls.build_interval = CONF.volume.build_interval
cls.build_timeout = CONF.volume.build_timeout
@cleanup_order
def create_volume(self, wait_until='available', **kwargs):
"""Wrapper utility that returns a test volume.
:param wait_until: wait till volume status, None means no wait.
"""
if 'size' not in kwargs:
kwargs['size'] = CONF.volume.volume_size
if 'imageRef' in kwargs:
image = self.images_client.show_image(kwargs['imageRef'])
min_disk = image['min_disk']
kwargs['size'] = max(kwargs['size'], min_disk)
if 'name' not in kwargs:
name = data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__name__ + '-Volume')
kwargs['name'] = name
if CONF.volume.volume_type and 'volume_type' not in kwargs:
# If volume_type is not provided in config then no need to
# add a volume type and
# if volume_type has already been added by child class then
# no need to override.
kwargs['volume_type'] = CONF.volume.volume_type
if CONF.compute.compute_volume_common_az:
kwargs.setdefault('availability_zone',
CONF.compute.compute_volume_common_az)
volume = self.volumes_client.create_volume(**kwargs)['volume']
self.cleanup(test_utils.call_and_ignore_notfound_exc,
self._delete_volume_for_cleanup,
self.volumes_client, volume['id'])
if wait_until:
waiters.wait_for_volume_resource_status(self.volumes_client,
volume['id'], wait_until)
return volume
@staticmethod
def _delete_volume_for_cleanup(volumes_client, volume_id):
"""Delete a volume (only) for cleanup.
If it is attached to a server, wait for it to become available,
assuming we have already deleted the server and just need nova to
complete the delete operation before it is available to be deleted.
Otherwise proceed to the regular delete_volume().
"""
try:
vol = volumes_client.show_volume(volume_id)['volume']
if vol['status'] == 'in-use':
waiters.wait_for_volume_resource_status(volumes_client,
volume_id,
'available')
except lib_exc.NotFound:
pass
BaseVolumeTest.delete_volume(volumes_client, volume_id)
@cleanup_order
def create_snapshot(self, volume_id=1, **kwargs):
"""Wrapper utility that returns a test snapshot."""
if 'name' not in kwargs:
name = data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__name__ + '-Snapshot')
kwargs['name'] = name
snapshot = self.snapshots_client.create_snapshot(
volume_id=volume_id, **kwargs)['snapshot']
self.cleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_snapshot, snapshot['id'])
waiters.wait_for_volume_resource_status(self.snapshots_client,
snapshot['id'], 'available')
return snapshot
def create_backup(self, volume_id, backup_client=None, object_client=None,
container_client=None, **kwargs):
"""Wrapper utility that returns a test backup."""
if backup_client is None:
backup_client = self.backups_client
if container_client is None:
container_client = self.container_client
if object_client is None:
object_client = self.object_client
if 'name' not in kwargs:
name = data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__class__.__name__ + '-Backup')
kwargs['name'] = name
if CONF.volume.backup_driver == "swift":
if 'container' not in kwargs:
cont_name = self.__class__.__name__ + '-backup-container'
cont = data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=cont_name)
kwargs['container'] = cont.lower()
self.addCleanup(object_storage.delete_containers,
kwargs['container'], container_client,
object_client)
backup = backup_client.create_backup(
volume_id=volume_id, **kwargs)['backup']
# addCleanup uses list pop to cleanup. Wait should be added before
# the backup is deleted
self.addCleanup(backup_client.wait_for_resource_deletion,
backup['id'])
self.addCleanup(backup_client.delete_backup, backup['id'])
waiters.wait_for_volume_resource_status(backup_client, backup['id'],
'available')
return backup
# NOTE(afazekas): these create_* and clean_* could be defined
# only in a single location in the source, and could be more general.
@staticmethod
def delete_volume(client, volume_id):
"""Delete volume by the given client"""
client.delete_volume(volume_id)
client.wait_for_resource_deletion(volume_id)
@cleanup_order
def delete_snapshot(self, snapshot_id, snapshots_client=None):
"""Delete snapshot by the given client"""
if snapshots_client is None:
snapshots_client = self.snapshots_client
snapshots_client.delete_snapshot(snapshot_id)
snapshots_client.wait_for_resource_deletion(snapshot_id)
def attach_volume(self, server_id, volume_id, wait_for_detach=True):
"""Attach a volume to a server"""
self.servers_client.attach_volume(
server_id, volumeId=volume_id,
device='/dev/%s' % CONF.compute.volume_device_name)
waiters.wait_for_volume_resource_status(self.volumes_client,
volume_id, 'in-use')
if wait_for_detach:
self.addCleanup(waiters.wait_for_volume_resource_status,
self.volumes_client, volume_id, 'available',
server_id, self.servers_client)
self.addCleanup(self.servers_client.detach_volume, server_id,
volume_id)
def create_server(self, wait_until='ACTIVE', **kwargs):
name = kwargs.pop(
'name',
data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__class__.__name__ + '-instance'))
if wait_until == 'SSHABLE' and not kwargs.get('validation_resources'):
# If we were asked for SSHABLE but were not provided with the
# required validation_resources and validatable flag, ensure we
# pass them to create_test_server() so that it will actually wait.
kwargs['validation_resources'] = (
self.get_test_validation_resources(self.os_primary))
kwargs['validatable'] = True
tenant_network = self.get_tenant_network()
body, _ = compute.create_test_server(
self.os_primary,
tenant_network=tenant_network,
name=name,
wait_until=wait_until,
**kwargs)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
self.servers_client, body['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.servers_client.delete_server, body['id'])
return body
def create_group(self, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__class__.__name__ + '-Group')
group = self.groups_client.create_group(**kwargs)['group']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_group, group['id'])
waiters.wait_for_volume_resource_status(
self.groups_client, group['id'], 'available')
return group
def delete_group(self, group_id, delete_volumes=True):
group_vols = []
if delete_volumes:
vols = self.volumes_client.list_volumes(detail=True)['volumes']
for vol in vols:
if vol['group_id'] == group_id:
group_vols.append(vol['id'])
self.groups_client.delete_group(group_id, delete_volumes)
for vol in group_vols:
self.volumes_client.wait_for_resource_deletion(vol)
self.groups_client.wait_for_resource_deletion(group_id)
class BaseVolumeAdminTest(BaseVolumeTest):
"""Base test case class for all Volume Admin API tests."""
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseVolumeAdminTest, cls).setup_clients()
cls.admin_volume_qos_client = cls.os_admin.volume_qos_client_latest
cls.admin_volume_services_client = \
cls.os_admin.volume_services_client_latest
cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest
cls.admin_volume_manage_client = (
cls.os_admin.volume_manage_client_latest)
cls.admin_volume_client = cls.os_admin.volumes_client_latest
cls.admin_groups_client = cls.os_admin.groups_client_latest
cls.admin_messages_client = cls.os_admin.volume_messages_client_latest
cls.admin_group_snapshots_client = \
cls.os_admin.group_snapshots_client_latest
cls.admin_group_types_client = cls.os_admin.group_types_client_latest
cls.admin_hosts_client = cls.os_admin.volume_hosts_client_latest
cls.admin_snapshot_manage_client = \
cls.os_admin.snapshot_manage_client_latest
cls.admin_snapshots_client = cls.os_admin.snapshots_client_latest
cls.admin_backups_client = cls.os_admin.backups_client_latest
cls.admin_encryption_types_client = \
cls.os_admin.encryption_types_client_latest
cls.admin_quota_classes_client = \
cls.os_admin.volume_quota_classes_client_latest
cls.admin_quotas_client = cls.os_admin.volume_quotas_client_latest
cls.admin_volume_limits_client = (
cls.os_admin.volume_limits_client_latest)
cls.admin_capabilities_client = \
cls.os_admin.volume_capabilities_client_latest
cls.admin_scheduler_stats_client = \
cls.os_admin.volume_scheduler_stats_client_latest
@cleanup_order
def create_test_qos_specs(self, name=None, consumer=None, **kwargs):
"""create a test Qos-Specs."""
name = name or data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__name__ + '-QoS')
consumer = consumer or 'front-end'
qos_specs = self.admin_volume_qos_client.create_qos(
name=name, consumer=consumer, **kwargs)['qos_specs']
self.cleanup(self.clear_qos_spec, qos_specs['id'])
return qos_specs
@cleanup_order
def create_volume_type(self, name=None, **kwargs):
"""Create a test volume-type"""
name = name or data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__name__ + '-volume-type')
volume_type = self.admin_volume_types_client.create_volume_type(
name=name, **kwargs)['volume_type']
self.cleanup(self.clear_volume_type, volume_type['id'])
return volume_type
def create_encryption_type(self, type_id=None, provider=None,
key_size=None, cipher=None,
control_location=None):
if not type_id:
volume_type = self.create_volume_type()
type_id = volume_type['id']
self.admin_encryption_types_client.create_encryption_type(
type_id, provider=provider, key_size=key_size, cipher=cipher,
control_location=control_location)
def create_encrypted_volume(self, encryption_provider, key_size=256,
cipher='aes-xts-plain64',
control_location='front-end'):
volume_type = self.create_volume_type()
self.create_encryption_type(type_id=volume_type['id'],
provider=encryption_provider,
key_size=key_size,
cipher=cipher,
control_location=control_location)
return self.create_volume(volume_type=volume_type['name'])
def create_group_type(self, name=None, **kwargs):
"""Create a test group-type"""
name = name or data_utils.rand_name(
prefix=CONF.resource_name_prefix,
name=self.__class__.__name__ + '-group-type')
group_type = self.admin_group_types_client.create_group_type(
name=name, **kwargs)['group_type']
self.addCleanup(self.admin_group_types_client.delete_group_type,
group_type['id'])
return group_type
@cleanup_order
def clear_qos_spec(self, qos_id):
test_utils.call_and_ignore_notfound_exc(
self.admin_volume_qos_client.delete_qos, qos_id)
test_utils.call_and_ignore_notfound_exc(
self.admin_volume_qos_client.wait_for_resource_deletion, qos_id)
@cleanup_order
def clear_volume_type(self, vol_type_id):
test_utils.call_and_ignore_notfound_exc(
self.admin_volume_types_client.delete_volume_type, vol_type_id)
test_utils.call_and_ignore_notfound_exc(
self.admin_volume_types_client.wait_for_resource_deletion,
vol_type_id)