Need to have stable tempest scenario manager

This patch addresses following facts:
1. common manager methods among plugins should be
   defined in Tempest.
2. methods should be consistent with names and
   parameters.
3. Add docstring about method usage and parameter.

Implements: blueprint tempest-scenario-manager-stable
Signed-off by: Soniya Vyas<svyas@redhat.com>
Change-Id: I191c8861c9ec29b78c9c8ad12ff2ddf3ffbf4db1
This commit is contained in:
Soniya Vyas 2020-07-15 15:20:59 +05:30
parent e866da9a11
commit 0c84f3ee2e
3 changed files with 163 additions and 36 deletions

View File

@ -33,7 +33,7 @@ class Manager(clients.ServiceClients):
def __init__(self, credentials, scope='project'): def __init__(self, credentials, scope='project'):
msg = ("tempest.manager.Manager is not a stable interface and as such " msg = ("tempest.manager.Manager is not a stable interface and as such "
"it should not imported directly. It will be removed as " "it should not be imported directly. It will be removed as "
"soon as the client manager becomes available in tempest.lib.") "soon as the client manager becomes available in tempest.lib.")
LOG.warning(msg) LOG.warning(msg)
dscv = CONF.identity.disable_ssl_certificate_validation dscv = CONF.identity.disable_ssl_certificate_validation

View File

@ -91,10 +91,32 @@ class ScenarioTest(tempest.test.BaseTestCase):
volume_microversion=self.volume_request_microversion, volume_microversion=self.volume_request_microversion,
placement_microversion=self.placement_request_microversion)) placement_microversion=self.placement_request_microversion))
def setup_compute_client(cls):
"""Compute and Compute security groups client"""
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
def setup_network_client(cls):
"""Neutron network client"""
cls.networks_client = cls.os_primary.networks_client
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
@classmethod @classmethod
def setup_clients(cls): def setup_clients(cls):
"""This setup the service clients for the tests"""
super(ScenarioTest, cls).setup_clients() super(ScenarioTest, cls).setup_clients()
# Clients (in alphabetical order)
cls.flavors_client = cls.os_primary.flavors_client cls.flavors_client = cls.os_primary.flavors_client
cls.compute_floating_ips_client = ( cls.compute_floating_ips_client = (
cls.os_primary.compute_floating_ips_client) cls.os_primary.compute_floating_ips_client)
@ -108,37 +130,20 @@ class ScenarioTest(tempest.test.BaseTestCase):
raise lib_exc.InvalidConfiguration( raise lib_exc.InvalidConfiguration(
'Either api_v1 or api_v2 must be True in ' 'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].') '[image-feature-enabled].')
# Compute image client
cls.compute_images_client = cls.os_primary.compute_images_client cls.setup_compute_client(cls)
cls.keypairs_client = cls.os_primary.keypairs_client cls.setup_network_client(cls)
# Nova security groups client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
# Neutron network client
cls.networks_client = cls.os_primary.networks_client
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
# Use the latest available volume clients
if CONF.service_available.cinder: if CONF.service_available.cinder:
cls.volumes_client = cls.os_primary.volumes_client_latest cls.volumes_client = cls.os_primary.volumes_client_latest
cls.snapshots_client = cls.os_primary.snapshots_client_latest cls.snapshots_client = cls.os_primary.snapshots_client_latest
cls.backups_client = cls.os_primary.backups_client_latest cls.backups_client = cls.os_primary.backups_client_latest
# ## Test functions library # ## Test functions library
#
# The create_[resource] functions only return body and discard the # The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests # resp part which is not used in scenario tests
def create_port(self, network_id, client=None, **kwargs): def create_port(self, network_id, client=None, **kwargs):
"""Creates port"""
if not client: if not client:
client = self.ports_client client = self.ports_client
name = data_utils.rand_name(self.__class__.__name__) name = data_utils.rand_name(self.__class__.__name__)
@ -156,6 +161,13 @@ class ScenarioTest(tempest.test.BaseTestCase):
return port return port
def create_keypair(self, client=None): def create_keypair(self, client=None):
"""Creates keypair
Keypair is a public key of OpenSSH key pair used for accessing
and create servers
Keypair can also be created by a private key for the same purpose
Here, the keys are randomly generated[public/private]
"""
if not client: if not client:
client = self.keypairs_client client = self.keypairs_client
name = data_utils.rand_name(self.__class__.__name__) name = data_utils.rand_name(self.__class__.__name__)
@ -295,6 +307,13 @@ class ScenarioTest(tempest.test.BaseTestCase):
def create_volume(self, size=None, name=None, snapshot_id=None, def create_volume(self, size=None, name=None, snapshot_id=None,
imageRef=None, volume_type=None): imageRef=None, volume_type=None):
"""Creates volume
This wrapper utility creates volume and waits for volume to be
in 'available' state.
This method returns the volume's full representation by GET request.
"""
if size is None: if size is None:
size = CONF.volume.volume_size size = CONF.volume.volume_size
if imageRef: if imageRef:
@ -334,6 +353,11 @@ class ScenarioTest(tempest.test.BaseTestCase):
def create_backup(self, volume_id, name=None, description=None, def create_backup(self, volume_id, name=None, description=None,
force=False, snapshot_id=None, incremental=False, force=False, snapshot_id=None, incremental=False,
container=None): container=None):
"""Creates backup
This wrapper utility creates backup and waits for backup to be
in 'available' state.
"""
name = name or data_utils.rand_name( name = name or data_utils.rand_name(
self.__class__.__name__ + "-backup") self.__class__.__name__ + "-backup")
@ -351,6 +375,12 @@ class ScenarioTest(tempest.test.BaseTestCase):
return backup return backup
def restore_backup(self, backup_id): def restore_backup(self, backup_id):
"""Restore backup
This wrapper utility restores backup and waits for backup to be
in 'available' state.
"""
restore = self.backups_client.restore_backup(backup_id)['restore'] restore = self.backups_client.restore_backup(backup_id)['restore']
self.addCleanup(self.volumes_client.delete_volume, self.addCleanup(self.volumes_client.delete_volume,
restore['volume_id']) restore['volume_id'])
@ -362,8 +392,31 @@ class ScenarioTest(tempest.test.BaseTestCase):
self.assertEqual(backup_id, restore['backup_id']) self.assertEqual(backup_id, restore['backup_id'])
return restore return restore
def rebuild_server(self, server_id, image=None,
preserve_ephemeral=False, wait=True,
rebuild_kwargs=None):
if image is None:
image = CONF.compute.image_ref
rebuild_kwargs = rebuild_kwargs or {}
LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
server_id, image, preserve_ephemeral)
self.servers_client.rebuild_server(
server_id=server_id,
image_ref=image,
preserve_ephemeral=preserve_ephemeral,
**rebuild_kwargs)
if wait:
waiters.wait_for_server_status(self.servers_client,
server_id, 'ACTIVE')
def create_volume_snapshot(self, volume_id, name=None, description=None, def create_volume_snapshot(self, volume_id, name=None, description=None,
metadata=None, force=False): metadata=None, force=False):
"""Creates volume
This wrapper utility creates volume snapshot and waits for backup
to be in 'available' state.
"""
name = name or data_utils.rand_name( name = name or data_utils.rand_name(
self.__class__.__name__ + '-snapshot') self.__class__.__name__ + '-snapshot')
snapshot = self.snapshots_client.create_snapshot( snapshot = self.snapshots_client.create_snapshot(
@ -372,6 +425,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
display_name=name, display_name=name,
description=description, description=description,
metadata=metadata)['snapshot'] metadata=metadata)['snapshot']
self.addCleanup(self.snapshots_client.wait_for_resource_deletion, self.addCleanup(self.snapshots_client.wait_for_resource_deletion,
snapshot['id']) snapshot['id'])
self.addCleanup(self.snapshots_client.delete_snapshot, snapshot['id']) self.addCleanup(self.snapshots_client.delete_snapshot, snapshot['id'])
@ -401,6 +455,23 @@ class ScenarioTest(tempest.test.BaseTestCase):
admin_volume_type_client.delete_volume_type(volume_type['id']) admin_volume_type_client.delete_volume_type(volume_type['id'])
def create_volume_type(self, client=None, name=None, backend_name=None): def create_volume_type(self, client=None, name=None, backend_name=None):
"""Creates volume type
In a multiple-storage back-end configuration,
each back end has a name (volume_backend_name).
The name of the back end is declared as an extra-specification
of a volume type (such as, volume_backend_name=LVM).
When a volume is created, the scheduler chooses an
appropriate back end to handle the request, according
to the volume type specified by the user.
The scheduler uses volume types to explicitly create volumes on
specific back ends.
Before using volume type, a volume type has to be declared
to Block Storage. In addition to that, an extra-specification
has to be created to link the volume type to a back end name.
"""
if not client: if not client:
client = self.os_admin.volume_types_client_latest client = self.os_admin.volume_types_client_latest
if not name: if not name:
@ -416,6 +487,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
volume_type = client.create_volume_type( volume_type = client.create_volume_type(
name=randomized_name, extra_specs=extra_specs)['volume_type'] name=randomized_name, extra_specs=extra_specs)['volume_type']
self.assertIn('id', volume_type)
self.addCleanup(self._cleanup_volume_type, volume_type) self.addCleanup(self._cleanup_volume_type, volume_type)
return volume_type return volume_type
@ -456,7 +528,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
return rules return rules
def _create_security_group(self): def _create_security_group(self):
# Create security group """Create security group and add rules to security group"""
sg_name = data_utils.rand_name(self.__class__.__name__) sg_name = data_utils.rand_name(self.__class__.__name__)
sg_desc = sg_name + " description" sg_desc = sg_name + " description"
secgroup = self.compute_security_groups_client.create_security_group( secgroup = self.compute_security_groups_client.create_security_group(
@ -470,7 +542,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
# Add rules to the security group # Add rules to the security group
self._create_loginable_secgroup_rule(secgroup['id']) self._create_loginable_secgroup_rule(secgroup['id'])
return secgroup return secgroup
def get_remote_client(self, ip_address, username=None, private_key=None, def get_remote_client(self, ip_address, username=None, private_key=None,
@ -555,6 +626,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
return image['id'] return image['id']
def _log_console_output(self, servers=None, client=None): def _log_console_output(self, servers=None, client=None):
"""Console log output"""
if not CONF.compute_feature_enabled.console_output: if not CONF.compute_feature_enabled.console_output:
LOG.debug('Console output not supported, cannot log') LOG.debug('Console output not supported, cannot log')
return return
@ -573,11 +645,12 @@ class ScenarioTest(tempest.test.BaseTestCase):
"for the console log", server['id']) "for the console log", server['id'])
def _log_net_info(self, exc): def _log_net_info(self, exc):
# network debug is called as part of ssh init """network debug is called as part of ssh init"""
if not isinstance(exc, lib_exc.SSHTimeout): if not isinstance(exc, lib_exc.SSHTimeout):
LOG.debug('Network information on a devstack host') LOG.debug('Network information on a devstack host')
def create_server_snapshot(self, server, name=None): def create_server_snapshot(self, server, name=None):
"""Creates server snapshot"""
# Glance client # Glance client
_image_client = self.image_client _image_client = self.image_client
# Compute client # Compute client
@ -595,7 +668,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
_image_client.delete_image, image_id) _image_client.delete_image, image_id)
if CONF.image_feature_enabled.api_v1: if CONF.image_feature_enabled.api_v1:
# In glance v1 the additional properties are stored in the headers. # In glance v1 the additional properties are stored in the headers
resp = _image_client.check_image(image_id) resp = _image_client.check_image(image_id)
snapshot_image = common_image.get_image_meta_from_headers(resp) snapshot_image = common_image.get_image_meta_from_headers(resp)
image_props = snapshot_image.get('properties', {}) image_props = snapshot_image.get('properties', {})
@ -625,6 +698,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
return snapshot_image return snapshot_image
def nova_volume_attach(self, server, volume_to_attach): def nova_volume_attach(self, server, volume_to_attach):
"""Attach nova volume"""
volume = self.servers_client.attach_volume( volume = self.servers_client.attach_volume(
server['id'], volumeId=volume_to_attach['id'])['volumeAttachment'] server['id'], volumeId=volume_to_attach['id'])['volumeAttachment']
self.assertEqual(volume_to_attach['id'], volume['id']) self.assertEqual(volume_to_attach['id'], volume['id'])
@ -635,12 +709,14 @@ class ScenarioTest(tempest.test.BaseTestCase):
return self.volumes_client.show_volume(volume['id'])['volume'] return self.volumes_client.show_volume(volume['id'])['volume']
def nova_volume_detach(self, server, volume): def nova_volume_detach(self, server, volume):
"""Detach nova volume"""
self.servers_client.detach_volume(server['id'], volume['id']) self.servers_client.detach_volume(server['id'], volume['id'])
waiters.wait_for_volume_resource_status(self.volumes_client, waiters.wait_for_volume_resource_status(self.volumes_client,
volume['id'], 'available') volume['id'], 'available')
def ping_ip_address(self, ip_address, should_succeed=True, def ping_ip_address(self, ip_address, should_succeed=True,
ping_timeout=None, mtu=None, server=None): ping_timeout=None, mtu=None, server=None):
"""ping ip address"""
timeout = ping_timeout or CONF.validation.ping_timeout timeout = ping_timeout or CONF.validation.ping_timeout
cmd = ['ping', '-c1', '-w1'] cmd = ['ping', '-c1', '-w1']
@ -701,6 +777,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
:raises: AssertError if the result of the connectivity check does :raises: AssertError if the result of the connectivity check does
not match the value of the should_connect param not match the value of the should_connect param
""" """
LOG.debug('checking network connections to IP %s with user: %s', LOG.debug('checking network connections to IP %s with user: %s',
ip_address, username) ip_address, username)
if should_connect: if should_connect:
@ -724,7 +801,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
LOG.exception(extra_msg) LOG.exception(extra_msg)
raise raise
def create_floating_ip(self, thing, pool_name=None): def create_floating_ip(self, server, pool_name=None):
"""Create a floating IP and associates to a server on Nova""" """Create a floating IP and associates to a server on Nova"""
if not pool_name: if not pool_name:
@ -735,11 +812,17 @@ class ScenarioTest(tempest.test.BaseTestCase):
self.compute_floating_ips_client.delete_floating_ip, self.compute_floating_ips_client.delete_floating_ip,
floating_ip['id']) floating_ip['id'])
self.compute_floating_ips_client.associate_floating_ip_to_server( self.compute_floating_ips_client.associate_floating_ip_to_server(
floating_ip['ip'], thing['id']) floating_ip['ip'], server['id'])
return floating_ip return floating_ip
def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt', def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
private_key=None, server=None): private_key=None, server=None):
"""Creates timestamp
This wrapper utility does ssh, creates timestamp and returns the
created timestamp.
"""
ssh_client = self.get_remote_client(ip_address, ssh_client = self.get_remote_client(ip_address,
private_key=private_key, private_key=private_key,
server=server) server=server)
@ -757,6 +840,11 @@ class ScenarioTest(tempest.test.BaseTestCase):
def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt', def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
private_key=None, server=None): private_key=None, server=None):
"""Returns timestamp
This wrapper utility does ssh and returns the timestamp.
"""
ssh_client = self.get_remote_client(ip_address, ssh_client = self.get_remote_client(ip_address,
private_key=private_key, private_key=private_key,
server=server) server=server)
@ -774,6 +862,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
Based on the configuration we're in, return a correct ip Based on the configuration we're in, return a correct ip
address for validating that a guest is up. address for validating that a guest is up.
""" """
if CONF.validation.connect_method == 'floating': if CONF.validation.connect_method == 'floating':
# The tests calling this method don't have a floating IP # The tests calling this method don't have a floating IP
# and can't make use of the validation resources. So the # and can't make use of the validation resources. So the
@ -799,6 +888,8 @@ class ScenarioTest(tempest.test.BaseTestCase):
@classmethod @classmethod
def get_host_for_server(cls, server_id): def get_host_for_server(cls, server_id):
"""Gets host of server"""
server_details = cls.os_admin.servers_client.show_server(server_id) server_details = cls.os_admin.servers_client.show_server(server_id)
return server_details['server']['OS-EXT-SRV-ATTR:host'] return server_details['server']['OS-EXT-SRV-ATTR:host']
@ -817,6 +908,12 @@ class ScenarioTest(tempest.test.BaseTestCase):
security_group=None, security_group=None,
delete_on_termination=False, delete_on_termination=False,
name=None): name=None):
"""Boot instance from resource
This wrapper utility boots instance from resource with block device
mapping with source info passed in arguments
"""
create_kwargs = dict() create_kwargs = dict()
if keypair: if keypair:
create_kwargs['key_name'] = keypair['name'] create_kwargs['key_name'] = keypair['name']
@ -833,6 +930,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
return self.create_server(image_id='', **create_kwargs) return self.create_server(image_id='', **create_kwargs)
def create_volume_from_image(self): def create_volume_from_image(self):
"""Create volume from image"""
img_uuid = CONF.compute.image_ref img_uuid = CONF.compute.image_ref
vol_name = data_utils.rand_name( vol_name = data_utils.rand_name(
self.__class__.__name__ + '-volume-origin') self.__class__.__name__ + '-volume-origin')
@ -888,8 +986,17 @@ class NetworkScenarioTest(ScenarioTest):
namestart='subnet-smoke', **kwargs): namestart='subnet-smoke', **kwargs):
"""Create a subnet for the given network """Create a subnet for the given network
This utility creates subnet for the given network
within the cidr block configured for tenant networks. within the cidr block configured for tenant networks.
:param **kwargs:
See extra parameters below
:Keyword Arguments:
* *ip_version = ip version of the given network,
""" """
if not subnets_client: if not subnets_client:
subnets_client = self.subnets_client subnets_client = self.subnets_client
@ -992,22 +1099,23 @@ class NetworkScenarioTest(ScenarioTest):
"Unable to get network by name: %s" % network_name) "Unable to get network by name: %s" % network_name)
return net[0] return net[0]
def create_floating_ip(self, thing, external_network_id=None, def create_floating_ip(self, server, external_network_id=None,
port_id=None, client=None): port_id=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron""" """Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id: if not external_network_id:
external_network_id = CONF.network.public_network_id external_network_id = CONF.network.public_network_id
if not client: if not client:
client = self.floating_ips_client client = self.floating_ips_client
if not port_id: if not port_id:
port_id, ip4 = self._get_server_port_id_and_ip4(thing) port_id, ip4 = self._get_server_port_id_and_ip4(server)
else: else:
ip4 = None ip4 = None
kwargs = { kwargs = {
'floating_network_id': external_network_id, 'floating_network_id': external_network_id,
'port_id': port_id, 'port_id': port_id,
'tenant_id': thing.get('project_id') or thing['tenant_id'], 'tenant_id': server.get('project_id') or server['tenant_id'],
'fixed_ip_address': ip4, 'fixed_ip_address': ip4,
} }
if CONF.network.subnet_id: if CONF.network.subnet_id:
@ -1027,6 +1135,7 @@ class NetworkScenarioTest(ScenarioTest):
:param status: target status :param status: target status
:raises: AssertionError if status doesn't match :raises: AssertionError if status doesn't match
""" """
floatingip_id = floating_ip['id'] floatingip_id = floating_ip['id']
def refresh(): def refresh():
@ -1053,6 +1162,7 @@ class NetworkScenarioTest(ScenarioTest):
private_key, private_key,
should_connect=True, should_connect=True,
servers_for_debug=None): servers_for_debug=None):
"""Checks tenant network connectivity"""
if not CONF.network.project_networks_reachable: if not CONF.network.project_networks_reachable:
msg = 'Tenant networks not configured to be reachable.' msg = 'Tenant networks not configured to be reachable.'
LOG.info(msg) LOG.info(msg)
@ -1085,6 +1195,7 @@ class NetworkScenarioTest(ScenarioTest):
:returns: True, if the connection succeeded and it was expected to :returns: True, if the connection succeeded and it was expected to
succeed. False otherwise. succeed. False otherwise.
""" """
method_name = '%s_check' % protocol method_name = '%s_check' % protocol
connectivity_checker = getattr(source, method_name) connectivity_checker = getattr(source, method_name)
@ -1148,6 +1259,7 @@ class NetworkScenarioTest(ScenarioTest):
:param project_id: secgroup will be created in this project :param project_id: secgroup will be created in this project
:returns: the created security group :returns: the created security group
""" """
if client is None: if client is None:
client = self.security_groups_client client = self.security_groups_client
if not project_id: if not project_id:
@ -1189,6 +1301,7 @@ class NetworkScenarioTest(ScenarioTest):
port_range_max: 22 port_range_max: 22
} }
""" """
if sec_group_rules_client is None: if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client sec_group_rules_client = self.security_group_rules_client
if security_groups_client is None: if security_groups_client is None:
@ -1279,6 +1392,7 @@ class NetworkScenarioTest(ScenarioTest):
network has, a tenant router will be created and returned that network has, a tenant router will be created and returned that
routes traffic to the public network. routes traffic to the public network.
""" """
if not client: if not client:
client = self.routers_client client = self.routers_client
if not project_id: if not project_id:
@ -1319,6 +1433,7 @@ class NetworkScenarioTest(ScenarioTest):
'provider:segmentation_id': '42'} 'provider:segmentation_id': '42'}
:returns: network, subnet, router :returns: network, subnet, router
""" """
if CONF.network.shared_physical_network: if CONF.network.shared_physical_network:
# NOTE(Shrews): This exception is for environments where tenant # NOTE(Shrews): This exception is for environments where tenant
# credential isolation is available, but network separation is # credential isolation is available, but network separation is
@ -1375,6 +1490,7 @@ class EncryptionScenarioTest(ScenarioTest):
def create_encryption_type(self, client=None, type_id=None, provider=None, def create_encryption_type(self, client=None, type_id=None, provider=None,
key_size=None, cipher=None, key_size=None, cipher=None,
control_location=None): control_location=None):
"""Creates an encryption type for volume"""
if not client: if not client:
client = self.admin_encryption_types_client client = self.admin_encryption_types_client
if not type_id: if not type_id:
@ -1388,6 +1504,7 @@ class EncryptionScenarioTest(ScenarioTest):
def create_encrypted_volume(self, encryption_provider, volume_type, def create_encrypted_volume(self, encryption_provider, volume_type,
key_size=256, cipher='aes-xts-plain64', key_size=256, cipher='aes-xts-plain64',
control_location='front-end'): control_location='front-end'):
"""Creates an encrypted volume"""
volume_type = self.create_volume_type(name=volume_type) volume_type = self.create_volume_type(name=volume_type)
self.create_encryption_type(type_id=volume_type['id'], self.create_encryption_type(type_id=volume_type['id'],
provider=encryption_provider, provider=encryption_provider,
@ -1428,11 +1545,12 @@ class ObjectStorageScenarioTest(ScenarioTest):
cls.object_client = cls.os_operator.object_client cls.object_client = cls.os_operator.object_client
def get_swift_stat(self): def get_swift_stat(self):
"""get swift status for our user account.""" """Get swift status for our user account."""
self.account_client.list_account_containers() self.account_client.list_account_containers()
LOG.debug('Swift status information obtained successfully') LOG.debug('Swift status information obtained successfully')
def create_container(self, container_name=None): def create_container(self, container_name=None):
"""Creates container"""
name = container_name or data_utils.rand_name( name = container_name or data_utils.rand_name(
'swift-scenario-container') 'swift-scenario-container')
self.container_client.update_container(name) self.container_client.update_container(name)
@ -1445,10 +1563,12 @@ class ObjectStorageScenarioTest(ScenarioTest):
return name return name
def delete_container(self, container_name): def delete_container(self, container_name):
"""Deletes container"""
self.container_client.delete_container(container_name) self.container_client.delete_container(container_name)
LOG.debug('Container %s deleted', container_name) LOG.debug('Container %s deleted', container_name)
def upload_object_to_container(self, container_name, obj_name=None): def upload_object_to_container(self, container_name, obj_name=None):
"""Uploads object to container"""
obj_name = obj_name or data_utils.rand_name('swift-scenario-object') obj_name = obj_name or data_utils.rand_name('swift-scenario-object')
obj_data = data_utils.random_bytes() obj_data = data_utils.random_bytes()
self.object_client.create_object(container_name, obj_name, obj_data) self.object_client.create_object(container_name, obj_name, obj_data)
@ -1459,6 +1579,7 @@ class ObjectStorageScenarioTest(ScenarioTest):
return obj_name, obj_data return obj_name, obj_data
def delete_object(self, container_name, filename): def delete_object(self, container_name, filename):
"""Deletes object"""
self.object_client.delete_object(container_name, filename) self.object_client.delete_object(container_name, filename)
self.list_and_check_container_objects(container_name, self.list_and_check_container_objects(container_name,
not_present_obj=[filename]) not_present_obj=[filename])
@ -1466,8 +1587,13 @@ class ObjectStorageScenarioTest(ScenarioTest):
def list_and_check_container_objects(self, container_name, def list_and_check_container_objects(self, container_name,
present_obj=None, present_obj=None,
not_present_obj=None): not_present_obj=None):
# List objects for a given container and assert which are present and """List and verify objects for a given container
# which are not.
This utility lists objects for a given container
and asserts which are present and
which are not
"""
if present_obj is None: if present_obj is None:
present_obj = [] present_obj = []
if not_present_obj is None: if not_present_obj is None:
@ -1482,5 +1608,6 @@ class ObjectStorageScenarioTest(ScenarioTest):
self.assertNotIn(obj, object_list) self.assertNotIn(obj, object_list)
def download_and_verify(self, container_name, obj_name, expected_data): def download_and_verify(self, container_name, obj_name, expected_data):
"""Asserts the object and expected data to verify if they are same"""
_, obj = self.object_client.get_object(container_name, obj_name) _, obj = self.object_client.get_object(container_name, obj_name)
self.assertEqual(obj, expected_data) self.assertEqual(obj, expected_data)

View File

@ -130,7 +130,7 @@ class TestGettingAddress(manager.NetworkScenarioTest):
key_name=self.keypair['name'], key_name=self.keypair['name'],
security_groups=[{'name': self.sec_grp['name']}], security_groups=[{'name': self.sec_grp['name']}],
networks=[{'uuid': n['id']} for n in networks]) networks=[{'uuid': n['id']} for n in networks])
fip = self.create_floating_ip(thing=srv) fip = self.create_floating_ip(server=srv)
ips = self.define_server_ips(srv=srv) ips = self.define_server_ips(srv=srv)
ssh = self.get_remote_client( ssh = self.get_remote_client(
ip_address=fip['floating_ip_address'], ip_address=fip['floating_ip_address'],