Shrink Tempest scenario manager copy
scenario manager in tempest plugin was copied from Tempest to avoid any plugin break. This copied version of scenario manager is using lot of Tempest interfaces. Many of them are not stable and might change which can break plugin. For example - https://review.openstack.org/#/c/503875/3 This commit shrinks the scenario manager copy to keep only required methods. Change-Id: I2b0ce582775dcaee92edd84ce7adffe67edb67f5
This commit is contained in:
parent
5f27334b87
commit
a0f99c2c59
@ -18,16 +18,13 @@ import subprocess
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils as json
|
||||
from oslo_utils import netutils
|
||||
|
||||
from tempest.common import compute
|
||||
from tempest.common import image as common_image
|
||||
from tempest.common.utils.linux import remote_client
|
||||
from tempest.common.utils import net_utils
|
||||
from tempest.common import waiters
|
||||
from tempest import config
|
||||
from tempest import exceptions
|
||||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib.common.utils import test_utils
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
@ -47,29 +44,8 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
def setup_clients(cls):
|
||||
super(ScenarioTest, cls).setup_clients()
|
||||
# Clients (in alphabetical order)
|
||||
cls.flavors_client = cls.os_primary.flavors_client
|
||||
cls.compute_floating_ips_client = (
|
||||
cls.os_primary.compute_floating_ips_client)
|
||||
if CONF.service_available.glance:
|
||||
# Check if glance v1 is available to determine which client to use.
|
||||
if CONF.image_feature_enabled.api_v1:
|
||||
cls.image_client = cls.os_primary.image_client
|
||||
elif CONF.image_feature_enabled.api_v2:
|
||||
cls.image_client = cls.os_primary.image_client_v2
|
||||
else:
|
||||
raise lib_exc.InvalidConfiguration(
|
||||
'Either api_v1 or api_v2 must be True in '
|
||||
'[image-feature-enabled].')
|
||||
# Compute image client
|
||||
cls.compute_images_client = cls.os_primary.compute_images_client
|
||||
cls.keypairs_client = cls.os_primary.keypairs_client
|
||||
# Nova security groups client
|
||||
cls.compute_security_groups_client = (
|
||||
cls.os_primary.compute_security_groups_client)
|
||||
cls.compute_security_group_rules_client = (
|
||||
cls.os_primary.compute_security_group_rules_client)
|
||||
cls.servers_client = cls.os_primary.servers_client
|
||||
cls.interface_client = cls.os_primary.interfaces_client
|
||||
# Neutron network client
|
||||
cls.networks_client = cls.os_primary.networks_client
|
||||
cls.ports_client = cls.os_primary.ports_client
|
||||
@ -80,13 +56,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
cls.security_group_rules_client = (
|
||||
cls.os_primary.security_group_rules_client)
|
||||
|
||||
if CONF.volume_feature_enabled.api_v2:
|
||||
cls.volumes_client = cls.os_primary.volumes_v2_client
|
||||
cls.snapshots_client = cls.os_primary.snapshots_v2_client
|
||||
else:
|
||||
cls.volumes_client = cls.os_primary.volumes_client
|
||||
cls.snapshots_client = cls.os_primary.snapshots_client
|
||||
|
||||
# ## Test functions library
|
||||
#
|
||||
# The create_[resource] functions only return body and discard the
|
||||
@ -214,115 +183,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
server = clients.servers_client.show_server(body['id'])['server']
|
||||
return server
|
||||
|
||||
def create_volume(self, size=None, name=None, snapshot_id=None,
|
||||
imageRef=None, volume_type=None):
|
||||
if size is None:
|
||||
size = CONF.volume.volume_size
|
||||
if imageRef:
|
||||
image = self.compute_images_client.show_image(imageRef)['image']
|
||||
min_disk = image.get('minDisk')
|
||||
size = max(size, min_disk)
|
||||
if name is None:
|
||||
name = data_utils.rand_name(self.__class__.__name__ + "-volume")
|
||||
kwargs = {'display_name': name,
|
||||
'snapshot_id': snapshot_id,
|
||||
'imageRef': imageRef,
|
||||
'volume_type': volume_type,
|
||||
'size': size}
|
||||
volume = self.volumes_client.create_volume(**kwargs)['volume']
|
||||
|
||||
self.addCleanup(self.volumes_client.wait_for_resource_deletion,
|
||||
volume['id'])
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.volumes_client.delete_volume, volume['id'])
|
||||
|
||||
# NOTE(e0ne): Cinder API v2 uses name instead of display_name
|
||||
if 'display_name' in volume:
|
||||
self.assertEqual(name, volume['display_name'])
|
||||
else:
|
||||
self.assertEqual(name, volume['name'])
|
||||
waiters.wait_for_volume_resource_status(self.volumes_client,
|
||||
volume['id'], 'available')
|
||||
# The volume retrieved on creation has a non-up-to-date status.
|
||||
# Retrieval after it becomes active ensures correct details.
|
||||
volume = self.volumes_client.show_volume(volume['id'])['volume']
|
||||
return volume
|
||||
|
||||
def create_volume_type(self, client=None, name=None, backend_name=None):
|
||||
if not client:
|
||||
client = self.admin_volume_types_client
|
||||
if not name:
|
||||
class_name = self.__class__.__name__
|
||||
name = data_utils.rand_name(class_name + '-volume-type')
|
||||
randomized_name = data_utils.rand_name('scenario-type-' + name)
|
||||
|
||||
LOG.debug("Creating a volume type: %s on backend %s",
|
||||
randomized_name, backend_name)
|
||||
extra_specs = {}
|
||||
if backend_name:
|
||||
extra_specs = {"volume_backend_name": backend_name}
|
||||
|
||||
body = client.create_volume_type(name=randomized_name,
|
||||
extra_specs=extra_specs)
|
||||
volume_type = body['volume_type']
|
||||
self.assertIn('id', volume_type)
|
||||
self.addCleanup(client.delete_volume_type, volume_type['id'])
|
||||
return volume_type
|
||||
|
||||
def _create_loginable_secgroup_rule(self, secgroup_id=None):
|
||||
_client = self.compute_security_groups_client
|
||||
_client_rules = self.compute_security_group_rules_client
|
||||
if secgroup_id is None:
|
||||
sgs = _client.list_security_groups()['security_groups']
|
||||
for sg in sgs:
|
||||
if sg['name'] == 'default':
|
||||
secgroup_id = sg['id']
|
||||
|
||||
# These rules are intended to permit inbound ssh and icmp
|
||||
# traffic from all sources, so no group_id is provided.
|
||||
# Setting a group_id would only permit traffic from ports
|
||||
# belonging to the same security group.
|
||||
rulesets = [
|
||||
{
|
||||
# ssh
|
||||
'ip_protocol': 'tcp',
|
||||
'from_port': 22,
|
||||
'to_port': 22,
|
||||
'cidr': '0.0.0.0/0',
|
||||
},
|
||||
{
|
||||
# ping
|
||||
'ip_protocol': 'icmp',
|
||||
'from_port': -1,
|
||||
'to_port': -1,
|
||||
'cidr': '0.0.0.0/0',
|
||||
}
|
||||
]
|
||||
rules = list()
|
||||
for ruleset in rulesets:
|
||||
sg_rule = _client_rules.create_security_group_rule(
|
||||
parent_group_id=secgroup_id, **ruleset)['security_group_rule']
|
||||
rules.append(sg_rule)
|
||||
return rules
|
||||
|
||||
def _create_security_group(self):
|
||||
# Create security group
|
||||
sg_name = data_utils.rand_name(self.__class__.__name__)
|
||||
sg_desc = sg_name + " description"
|
||||
secgroup = self.compute_security_groups_client.create_security_group(
|
||||
name=sg_name, description=sg_desc)['security_group']
|
||||
self.assertEqual(secgroup['name'], sg_name)
|
||||
self.assertEqual(secgroup['description'], sg_desc)
|
||||
self.addCleanup(
|
||||
test_utils.call_and_ignore_notfound_exc,
|
||||
self.compute_security_groups_client.delete_security_group,
|
||||
secgroup['id'])
|
||||
|
||||
# Add rules to the security group
|
||||
self._create_loginable_secgroup_rule(secgroup['id'])
|
||||
|
||||
return secgroup
|
||||
|
||||
def get_remote_client(self, ip_address, username=None, private_key=None):
|
||||
"""Get a SSH client to a remote server
|
||||
|
||||
@ -362,65 +222,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
|
||||
return linux_client
|
||||
|
||||
def _image_create(self, name, fmt, path,
|
||||
disk_format=None, properties=None):
|
||||
if properties is None:
|
||||
properties = {}
|
||||
name = data_utils.rand_name('%s-' % name)
|
||||
params = {
|
||||
'name': name,
|
||||
'container_format': fmt,
|
||||
'disk_format': disk_format or fmt,
|
||||
}
|
||||
if CONF.image_feature_enabled.api_v1:
|
||||
params['is_public'] = 'False'
|
||||
params['properties'] = properties
|
||||
params = {'headers': common_image.image_meta_to_headers(**params)}
|
||||
else:
|
||||
params['visibility'] = 'private'
|
||||
# Additional properties are flattened out in the v2 API.
|
||||
params.update(properties)
|
||||
body = self.image_client.create_image(**params)
|
||||
image = body['image'] if 'image' in body else body
|
||||
self.addCleanup(self.image_client.delete_image, image['id'])
|
||||
self.assertEqual("queued", image['status'])
|
||||
with open(path, 'rb') as image_file:
|
||||
if CONF.image_feature_enabled.api_v1:
|
||||
self.image_client.update_image(image['id'], data=image_file)
|
||||
else:
|
||||
self.image_client.store_image_file(image['id'], image_file)
|
||||
return image['id']
|
||||
|
||||
def glance_image_create(self):
|
||||
img_path = CONF.scenario.img_dir + "/" + CONF.scenario.img_file
|
||||
aki_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.aki_img_file
|
||||
ari_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ari_img_file
|
||||
ami_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ami_img_file
|
||||
img_container_format = CONF.scenario.img_container_format
|
||||
img_disk_format = CONF.scenario.img_disk_format
|
||||
img_properties = CONF.scenario.img_properties
|
||||
LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
|
||||
"properties: %s, ami: %s, ari: %s, aki: %s",
|
||||
img_path, img_container_format, img_disk_format,
|
||||
img_properties, ami_img_path, ari_img_path, aki_img_path)
|
||||
try:
|
||||
image = self._image_create('scenario-img',
|
||||
img_container_format,
|
||||
img_path,
|
||||
disk_format=img_disk_format,
|
||||
properties=img_properties)
|
||||
except IOError:
|
||||
LOG.debug("A qcow2 image was not found. Try to get a uec image.")
|
||||
kernel = self._image_create('scenario-aki', 'aki', aki_img_path)
|
||||
ramdisk = self._image_create('scenario-ari', 'ari', ari_img_path)
|
||||
properties = {'kernel_id': kernel, 'ramdisk_id': ramdisk}
|
||||
image = self._image_create('scenario-ami', 'ami',
|
||||
path=ami_img_path,
|
||||
properties=properties)
|
||||
LOG.debug("image:%s", image)
|
||||
|
||||
return image
|
||||
|
||||
def _log_console_output(self, servers=None):
|
||||
if not CONF.compute_feature_enabled.console_output:
|
||||
LOG.debug('Console output not supported, cannot log')
|
||||
@ -443,90 +244,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
if not isinstance(exc, lib_exc.SSHTimeout):
|
||||
LOG.debug('Network information on a devstack host')
|
||||
|
||||
def create_server_snapshot(self, server, name=None):
|
||||
# Glance client
|
||||
_image_client = self.image_client
|
||||
# Compute client
|
||||
_images_client = self.compute_images_client
|
||||
if name is None:
|
||||
name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
|
||||
LOG.debug("Creating a snapshot image for server: %s", server['name'])
|
||||
image = _images_client.create_image(server['id'], name=name)
|
||||
image_id = image.response['location'].split('images/')[1]
|
||||
waiters.wait_for_image_status(_image_client, image_id, 'active')
|
||||
|
||||
self.addCleanup(_image_client.wait_for_resource_deletion,
|
||||
image_id)
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
_image_client.delete_image, image_id)
|
||||
|
||||
if CONF.image_feature_enabled.api_v1:
|
||||
# In glance v1 the additional properties are stored in the headers.
|
||||
resp = _image_client.check_image(image_id)
|
||||
snapshot_image = common_image.get_image_meta_from_headers(resp)
|
||||
image_props = snapshot_image.get('properties', {})
|
||||
else:
|
||||
# In glance v2 the additional properties are flattened.
|
||||
snapshot_image = _image_client.show_image(image_id)
|
||||
image_props = snapshot_image
|
||||
|
||||
bdm = image_props.get('block_device_mapping')
|
||||
if bdm:
|
||||
bdm = json.loads(bdm)
|
||||
if bdm and 'snapshot_id' in bdm[0]:
|
||||
snapshot_id = bdm[0]['snapshot_id']
|
||||
self.addCleanup(
|
||||
self.snapshots_client.wait_for_resource_deletion,
|
||||
snapshot_id)
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.snapshots_client.delete_snapshot,
|
||||
snapshot_id)
|
||||
waiters.wait_for_volume_resource_status(self.snapshots_client,
|
||||
snapshot_id,
|
||||
'available')
|
||||
image_name = snapshot_image['name']
|
||||
self.assertEqual(name, image_name)
|
||||
LOG.debug("Created snapshot image %s for server %s",
|
||||
image_name, server['name'])
|
||||
return snapshot_image
|
||||
|
||||
def nova_volume_attach(self, server, volume_to_attach):
|
||||
volume = self.servers_client.attach_volume(
|
||||
server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
|
||||
% CONF.compute.volume_device_name)['volumeAttachment']
|
||||
self.assertEqual(volume_to_attach['id'], volume['id'])
|
||||
waiters.wait_for_volume_resource_status(self.volumes_client,
|
||||
volume['id'], 'in-use')
|
||||
|
||||
# Return the updated volume after the attachment
|
||||
return self.volumes_client.show_volume(volume['id'])['volume']
|
||||
|
||||
def nova_volume_detach(self, server, volume):
|
||||
self.servers_client.detach_volume(server['id'], volume['id'])
|
||||
waiters.wait_for_volume_resource_status(self.volumes_client,
|
||||
volume['id'], 'available')
|
||||
|
||||
volume = self.volumes_client.show_volume(volume['id'])['volume']
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
def rebuild_server(self, server_id, image=None,
|
||||
preserve_ephemeral=False, wait=True,
|
||||
rebuild_kwargs=None):
|
||||
if image is None:
|
||||
image = CONF.compute.image_ref
|
||||
|
||||
rebuild_kwargs = rebuild_kwargs or {}
|
||||
|
||||
LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
|
||||
server_id, image, preserve_ephemeral)
|
||||
self.servers_client.rebuild_server(
|
||||
server_id=server_id, image_ref=image,
|
||||
preserve_ephemeral=preserve_ephemeral,
|
||||
**rebuild_kwargs)
|
||||
if wait:
|
||||
waiters.wait_for_server_status(self.servers_client,
|
||||
server_id, 'ACTIVE')
|
||||
|
||||
def ping_ip_address(self, ip_address, should_succeed=True,
|
||||
ping_timeout=None, mtu=None):
|
||||
timeout = ping_timeout or CONF.validation.ping_timeout
|
||||
@ -615,78 +332,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
||||
self._log_console_output(servers)
|
||||
raise
|
||||
|
||||
def create_floating_ip(self, thing, pool_name=None):
|
||||
"""Create a floating IP and associates to a server on Nova"""
|
||||
|
||||
if not pool_name:
|
||||
pool_name = CONF.network.floating_network_name
|
||||
floating_ip = (self.compute_floating_ips_client.
|
||||
create_floating_ip(pool=pool_name)['floating_ip'])
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.compute_floating_ips_client.delete_floating_ip,
|
||||
floating_ip['id'])
|
||||
self.compute_floating_ips_client.associate_floating_ip_to_server(
|
||||
floating_ip['ip'], thing['id'])
|
||||
return floating_ip
|
||||
|
||||
def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
|
||||
private_key=None):
|
||||
ssh_client = self.get_remote_client(ip_address,
|
||||
private_key=private_key)
|
||||
if dev_name is not None:
|
||||
ssh_client.make_fs(dev_name)
|
||||
ssh_client.mount(dev_name, mount_path)
|
||||
cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
|
||||
ssh_client.exec_command(cmd_timestamp)
|
||||
timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
|
||||
% mount_path)
|
||||
if dev_name is not None:
|
||||
ssh_client.umount(mount_path)
|
||||
return timestamp
|
||||
|
||||
def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
|
||||
private_key=None):
|
||||
ssh_client = self.get_remote_client(ip_address,
|
||||
private_key=private_key)
|
||||
if dev_name is not None:
|
||||
ssh_client.mount(dev_name, mount_path)
|
||||
timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
|
||||
% mount_path)
|
||||
if dev_name is not None:
|
||||
ssh_client.umount(mount_path)
|
||||
return timestamp
|
||||
|
||||
def get_server_ip(self, server):
|
||||
"""Get the server fixed or floating IP.
|
||||
|
||||
Based on the configuration we're in, return a correct ip
|
||||
address for validating that a guest is up.
|
||||
"""
|
||||
if CONF.validation.connect_method == 'floating':
|
||||
# The tests calling this method don't have a floating IP
|
||||
# and can't make use of the validation resources. So the
|
||||
# method is creating the floating IP there.
|
||||
return self.create_floating_ip(server)['ip']
|
||||
elif CONF.validation.connect_method == 'fixed':
|
||||
# Determine the network name to look for based on config or creds
|
||||
# provider network resources.
|
||||
if CONF.validation.network_for_ssh:
|
||||
addresses = server['addresses'][
|
||||
CONF.validation.network_for_ssh]
|
||||
else:
|
||||
creds_provider = self._get_credentials_provider()
|
||||
net_creds = creds_provider.get_primary_creds()
|
||||
network = getattr(net_creds, 'network', None)
|
||||
addresses = (server['addresses'][network['name']]
|
||||
if network else [])
|
||||
for address in addresses:
|
||||
if (address['version'] == CONF.validation.ip_version_for_ssh
|
||||
and address['OS-EXT-IPS:type'] == 'fixed'):
|
||||
return address['addr']
|
||||
raise exceptions.ServerUnreachable(server_id=server['id'])
|
||||
else:
|
||||
raise lib_exc.InvalidConfiguration()
|
||||
|
||||
|
||||
class NetworkScenarioTest(ScenarioTest):
|
||||
"""Base class for network scenario tests.
|
||||
@ -1220,131 +865,3 @@ class NetworkScenarioTest(ScenarioTest):
|
||||
routers_client.remove_router_interface, router_id,
|
||||
subnet_id=subnet['id'])
|
||||
return network, subnet, router
|
||||
|
||||
|
||||
class EncryptionScenarioTest(ScenarioTest):
|
||||
"""Base class for encryption scenario tests"""
|
||||
|
||||
credentials = ['primary', 'admin']
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(EncryptionScenarioTest, cls).setup_clients()
|
||||
if CONF.volume_feature_enabled.api_v2:
|
||||
cls.admin_volume_types_client = cls.os_adm.volume_types_v2_client
|
||||
cls.admin_encryption_types_client =\
|
||||
cls.os_adm.encryption_types_v2_client
|
||||
else:
|
||||
cls.admin_volume_types_client = cls.os_adm.volume_types_client
|
||||
cls.admin_encryption_types_client =\
|
||||
cls.os_adm.encryption_types_client
|
||||
|
||||
def create_encryption_type(self, client=None, type_id=None, provider=None,
|
||||
key_size=None, cipher=None,
|
||||
control_location=None):
|
||||
if not client:
|
||||
client = self.admin_encryption_types_client
|
||||
if not type_id:
|
||||
volume_type = self.create_volume_type()
|
||||
type_id = volume_type['id']
|
||||
LOG.debug("Creating an encryption type for volume type: %s", type_id)
|
||||
client.create_encryption_type(
|
||||
type_id, provider=provider, key_size=key_size, cipher=cipher,
|
||||
control_location=control_location)['encryption']
|
||||
|
||||
|
||||
class ObjectStorageScenarioTest(ScenarioTest):
|
||||
"""Provide harness to do Object Storage scenario tests.
|
||||
|
||||
Subclasses implement the tests that use the methods provided by this
|
||||
class.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def skip_checks(cls):
|
||||
super(ObjectStorageScenarioTest, cls).skip_checks()
|
||||
if not CONF.service_available.swift:
|
||||
skip_msg = ("%s skipped as swift is not available" %
|
||||
cls.__name__)
|
||||
raise cls.skipException(skip_msg)
|
||||
|
||||
@classmethod
|
||||
def setup_credentials(cls):
|
||||
cls.set_network_resources()
|
||||
super(ObjectStorageScenarioTest, cls).setup_credentials()
|
||||
operator_role = CONF.object_storage.operator_role
|
||||
cls.os_operator = cls.get_client_manager(roles=[operator_role])
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(ObjectStorageScenarioTest, cls).setup_clients()
|
||||
# Clients for Swift
|
||||
cls.account_client = cls.os_operator.account_client
|
||||
cls.container_client = cls.os_operator.container_client
|
||||
cls.object_client = cls.os_operator.object_client
|
||||
|
||||
def get_swift_stat(self):
|
||||
"""get swift status for our user account."""
|
||||
self.account_client.list_account_containers()
|
||||
LOG.debug('Swift status information obtained successfully')
|
||||
|
||||
def create_container(self, container_name=None):
|
||||
name = container_name or data_utils.rand_name(
|
||||
'swift-scenario-container')
|
||||
self.container_client.create_container(name)
|
||||
# look for the container to assure it is created
|
||||
self.list_and_check_container_objects(name)
|
||||
LOG.debug('Container %s created', name)
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.container_client.delete_container,
|
||||
name)
|
||||
return name
|
||||
|
||||
def delete_container(self, container_name):
|
||||
self.container_client.delete_container(container_name)
|
||||
LOG.debug('Container %s deleted', container_name)
|
||||
|
||||
def upload_object_to_container(self, container_name, obj_name=None):
|
||||
obj_name = obj_name or data_utils.rand_name('swift-scenario-object')
|
||||
obj_data = data_utils.random_bytes()
|
||||
self.object_client.create_object(container_name, obj_name, obj_data)
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.object_client.delete_object,
|
||||
container_name,
|
||||
obj_name)
|
||||
return obj_name, obj_data
|
||||
|
||||
def delete_object(self, container_name, filename):
|
||||
self.object_client.delete_object(container_name, filename)
|
||||
self.list_and_check_container_objects(container_name,
|
||||
not_present_obj=[filename])
|
||||
|
||||
def list_and_check_container_objects(self, container_name,
|
||||
present_obj=None,
|
||||
not_present_obj=None):
|
||||
# List objects for a given container and assert which are present and
|
||||
# which are not.
|
||||
if present_obj is None:
|
||||
present_obj = []
|
||||
if not_present_obj is None:
|
||||
not_present_obj = []
|
||||
_, object_list = self.container_client.list_container_contents(
|
||||
container_name)
|
||||
if present_obj:
|
||||
for obj in present_obj:
|
||||
self.assertIn(obj, object_list)
|
||||
if not_present_obj:
|
||||
for obj in not_present_obj:
|
||||
self.assertNotIn(obj, object_list)
|
||||
|
||||
def change_container_acl(self, container_name, acl):
|
||||
metadata_param = {'metadata_prefix': 'x-container-',
|
||||
'metadata': {'read': acl}}
|
||||
self.container_client.update_container_metadata(container_name,
|
||||
**metadata_param)
|
||||
resp, _ = self.container_client.list_container_metadata(container_name)
|
||||
self.assertEqual(resp['x-container-read'], acl)
|
||||
|
||||
def download_and_verify(self, container_name, obj_name, expected_data):
|
||||
_, obj = self.object_client.get_object(container_name, obj_name)
|
||||
self.assertEqual(obj, expected_data)
|
||||
|
Loading…
Reference in New Issue
Block a user