Merge "Code cleanup of object storage tests"

This commit is contained in:
Jenkins 2013-05-16 23:11:48 +00:00 committed by Gerrit Code Review
commit f0b10c451d
6 changed files with 150 additions and 299 deletions

View File

@ -22,12 +22,9 @@ from tempest.tests.object_storage import base
class AccountTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(AccountTest, cls).setUpClass()
#Create a container
cls.container_name = rand_name(name='TestContainer')
cls.container_client.create_container(cls.container_name)
@ -37,8 +34,7 @@ class AccountTest(base.BaseObjectTest):
@attr(type='smoke')
def test_list_containers(self):
# List of all containers should not be empty
# list of all containers should not be empty
params = {'format': 'json'}
resp, container_list = \
self.account_client.list_account_containers(params=params)
@ -49,8 +45,7 @@ class AccountTest(base.BaseObjectTest):
@attr(type='smoke')
def test_list_account_metadata(self):
# List all account metadata
# list all account metadata
resp, metadata = self.account_client.list_account_metadata()
self.assertEqual(resp['status'], '204')
self.assertIn('x-account-object-count', resp)
@ -59,8 +54,7 @@ class AccountTest(base.BaseObjectTest):
@attr(type='smoke')
def test_create_account_metadata(self):
# Add metadata to account
# add metadata to account
metadata = {'test-account-meta': 'Meta!'}
resp, _ = \
self.account_client.create_account_metadata(metadata=metadata)
@ -72,8 +66,7 @@ class AccountTest(base.BaseObjectTest):
@attr(type='smoke')
def test_delete_account_metadata(self):
# Delete metadata from account
# delete metadata from account
metadata = ['test-account-meta']
resp, _ = \
self.account_client.delete_account_metadata(metadata=metadata)
@ -84,11 +77,10 @@ class AccountTest(base.BaseObjectTest):
@attr(type='negative')
def test_list_containers_with_non_authorized_user(self):
#Listing containers with using non authorized user
# list containers using non-authorized user
# Randomly creating user
# create user
self.data.setup_test_user()
resp, body = \
self.token_client.auth(self.data.test_user,
self.data.test_password,
@ -97,14 +89,11 @@ class AccountTest(base.BaseObjectTest):
self.token_client.get_token(self.data.test_user,
self.data.test_password,
self.data.test_tenant)
custom_headers = {'X-Auth-Token': new_token}
params = {'format': 'json'}
# Trying to list containers with non authorized user token
# list containers with non-authorized user token
self.assertRaises(exceptions.Unauthorized,
self.custom_account_client.list_account_containers,
params=params, metadata=custom_headers)
#Attempt to the delete the user setup created
# delete the user which was created
self.data.teardown_all()

View File

@ -22,7 +22,6 @@ from tempest.tests.object_storage import base
class ContainerTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(ContainerTest, cls).setUpClass()
@ -31,73 +30,58 @@ class ContainerTest(base.BaseObjectTest):
@classmethod
def tearDownClass(cls):
for container in cls.containers:
#Get list of all object in the container
objlist = \
cls.container_client.list_all_container_objects(container)
#Attempt to delete every object in the container
# delete every object in the container
for obj in objlist:
resp, _ = \
cls.object_client.delete_object(container, obj['name'])
#Attempt to delete the container
# delete the container
resp, _ = cls.container_client.delete_container(container)
@attr(type='smoke')
def test_create_container(self):
# Create a container, test responses
#Create a container
container_name = rand_name(name='TestContainer')
resp, body = self.container_client.create_container(container_name)
self.containers.append(container_name)
self.assertTrue(resp['status'] in ('202', '201'))
@attr(type='smoke')
def test_delete_container(self):
# Create and Delete a container, test responses
#Create a container
# create a container
container_name = rand_name(name='TestContainer')
resp, _ = self.container_client.create_container(container_name)
self.containers.append(container_name)
#Delete Container
# delete container
resp, _ = self.container_client.delete_container(container_name)
self.assertEqual(resp['status'], '204')
self.containers.remove(container_name)
@attr(type='smoke')
def test_list_container_contents_json(self):
# Add metadata to object
# add metadata to an object
#Create a container
# create a container
container_name = rand_name(name='TestContainer')
resp, _ = self.container_client.create_container(container_name)
self.containers.append(container_name)
#Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(container_name,
object_name, data)
#Set Object Metadata
# set object metadata
meta_key = rand_name(name='Meta-Test-')
meta_value = rand_name(name='MetaValue-')
orig_metadata = {meta_key: meta_value}
resp, _ = self.object_client.update_object_metadata(container_name,
object_name,
orig_metadata)
#Get Container contents list json format
# get container contents list
params = {'format': 'json'}
resp, object_list = \
self.container_client.\
list_container_contents(container_name, params=params)
self.assertEqual(resp['status'], '200')
self.assertIsNotNone(object_list)
@ -106,14 +90,13 @@ class ContainerTest(base.BaseObjectTest):
@attr(type='smoke')
def test_container_metadata(self):
# Update/Retrieve/Delete Container Metadata
# update/retrieve/delete container metadata
# Create a container
# create a container
container_name = rand_name(name='TestContainer')
resp, _ = self.container_client.create_container(container_name)
self.containers.append(container_name)
# Update container metadata
# update container metadata
metadata = {'name': 'Pictures',
'description': 'Travel'
}
@ -122,7 +105,7 @@ class ContainerTest(base.BaseObjectTest):
metadata=metadata)
self.assertEqual(resp['status'], '204')
# List container metadata
# list container metadata
resp, _ = self.container_client.list_container_metadata(
container_name)
self.assertEqual(resp['status'], '204')
@ -131,18 +114,19 @@ class ContainerTest(base.BaseObjectTest):
self.assertEqual(resp['x-container-meta-name'], 'Pictures')
self.assertEqual(resp['x-container-meta-description'], 'Travel')
# Delete container metadata
# delete container metadata
resp, _ = self.container_client.delete_container_metadata(
container_name,
metadata=metadata.keys())
self.assertEqual(resp['status'], '204')
# check if the metadata are no longer there
resp, _ = self.container_client.list_container_metadata(container_name)
self.assertEqual(resp['status'], '204')
self.assertNotIn('x-container-meta-name', resp)
self.assertNotIn('x-container-meta-description', resp)
# Delete Container
# delete container
resp, _ = self.container_client.delete_container(container_name)
self.assertEqual(resp['status'], '204')
self.containers.remove(container_name)

View File

@ -23,11 +23,9 @@ import time
class ContainerSyncTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(ContainerSyncTest, cls).setUpClass()
cls.containers = []
cls.objects = []
container_sync_timeout = \
@ -36,8 +34,7 @@ class ContainerSyncTest(base.BaseObjectTest):
int(cls.config.object_storage.container_sync_interval)
cls.attempts = \
int(container_sync_timeout / cls.container_sync_interval)
# Define container and object clients
# define container and object clients
cls.clients = {}
cls.clients[rand_name(name='TestContainerSync')] = \
(cls.container_client, cls.object_client)
@ -50,29 +47,25 @@ class ContainerSyncTest(base.BaseObjectTest):
@classmethod
def tearDownClass(cls):
for cont_name, client in cls.clients.items():
#Get list of all object in the container
objlist = client[0].list_all_container_objects(cont_name)
#Attempt to delete every object in the container
# delete every object in the container
if objlist:
for obj in objlist:
resp, _ = client[1].delete_object(cont_name, obj['name'])
#Attempt to delete the container
# delete the container
resp, _ = client[0].delete_container(cont_name)
@testtools.skip('Until Bug #1093743 is resolved.')
@attr(type='positive')
def test_container_synchronization(self):
#Container to container synchronization
#To allow/accept sync requests to/from other accounts
# container to container synchronization
# to allow/accept sync requests to/from other accounts
#Switch container synchronization on and create objects in a containers
# turn container synchronization on and create object in container
for cont in (self.containers, self.containers[::-1]):
cont_client = [self.clients[c][0] for c in cont]
obj_client = [self.clients[c][1] for c in cont]
#tell first container to syncronize to a second
# tell first container to synchronize to a second
headers = {'X-Container-Sync-Key': 'sync_key',
'X-Container-Sync-To': "%s/%s" %
(cont_client[1].base_url, str(cont[1]))}
@ -81,8 +74,7 @@ class ContainerSyncTest(base.BaseObjectTest):
self.assertTrue(resp['status'] in ('202', '201'),
'Error installing X-Container-Sync-To '
'for the container "%s"' % (cont[0]))
#Create Object in container
# create object in container
object_name = rand_name(name='TestSyncObject')
data = object_name[::-1] # arbitrary_string()
resp, _ = obj_client[0].create_object(cont[0], object_name, data)
@ -92,7 +84,7 @@ class ContainerSyncTest(base.BaseObjectTest):
% (object_name, cont[0]))
self.objects.append(object_name)
#Wait for Container contents list json format will be not empty
# wait until container contents list is not empty
cont_client = [self.clients[c][0] for c in self.containers]
params = {'format': 'json'}
while self.attempts > 0:
@ -112,15 +104,13 @@ class ContainerSyncTest(base.BaseObjectTest):
'Error listing the destination container`s'
' "%s" contents' % (self.containers[1]))
object_list_1 = dict((obj['name'], obj) for obj in object_list_1)
# check that containers is not empty and has equal keys()
# or wait for next attepmt
# check that containers are not empty and have equal keys()
# or wait for next attempt
if not object_list_0 or not object_list_1 or \
set(object_list_0.keys()) != set(object_list_1.keys()):
time.sleep(self.container_sync_interval)
self.attempts -= 1
else:
break
# Check for synchronization
self.assertEqual(object_list_0, object_list_1,
'Different object lists in containers.')

View File

@ -25,12 +25,9 @@ import time
class ObjectExpiryTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(ObjectExpiryTest, cls).setUpClass()
#Create a container
cls.container_name = rand_name(name='TestContainer')
cls.container_client.create_container(cls.container_name)
@ -41,54 +38,45 @@ class ObjectExpiryTest(base.BaseObjectTest):
But delete action for the expired object is raising
NotFound exception and also non empty container cannot be deleted.
"""
#Get list of all object in the container
objlist = \
cls.container_client.list_all_container_objects(cls.container_name)
#Attempt to delete every object in the container
# delete every object in the container
if objlist:
for obj in objlist:
resp, _ = cls.object_client.delete_object(cls.container_name,
obj['name'])
#Attempt to delete the container
# delete the container
resp, _ = cls.container_client.delete_container(cls.container_name)
@testtools.skip('Until Bug #1069849 is resolved.')
@attr(type='regression')
def test_get_object_after_expiry_time(self):
# GET object after expiry time
#TODO(harika-vakadi): Similar test case has to be created for
#TODO(harika-vakadi): similar test case has to be created for
# "X-Delete-At", after this test case works.
#Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
#Update object metadata with expiry time of 3 seconds
# update object metadata with expiry time of 3 seconds
metadata = {'X-Delete-After': '3'}
resp, _ = \
self.object_client.update_object_metadata(self.container_name,
object_name, metadata,
metadata_prefix='')
resp, _ = \
self.object_client.list_object_metadata(self.container_name,
object_name)
self.assertEqual(resp['status'], '200')
self.assertIn('x-delete-at', resp)
resp, body = self.object_client.get_object(self.container_name,
object_name)
self.assertEqual(resp['status'], '200')
# Check data
# check data
self.assertEqual(body, data)
# Sleep for over 5 seconds, so that object is expired
# sleep for over 5 seconds, so that object expires
time.sleep(5)
# Verification of raised exception after object gets expired
# object should not be there anymore
self.assertRaises(exceptions.NotFound, self.object_client.get_object,
self.container_name, object_name)

View File

@ -25,55 +25,42 @@ import time
class ObjectTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(ObjectTest, cls).setUpClass()
#Create a container
cls.container_name = rand_name(name='TestContainer')
cls.container_client.create_container(cls.container_name)
# Randomly creating user
cls.data.setup_test_user()
resp, body = cls.token_client.auth(cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
cls.new_token = cls.token_client.get_token(cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
cls.custom_headers = {'X-Auth-Token': cls.new_token}
@classmethod
def tearDownClass(cls):
#Get list of all object in the container
objlist = cls.container_client.list_all_container_objects(
cls.container_name)
#Attempt to delete every object in the container
# delete every object in the container
for obj in objlist:
resp, _ = cls.object_client.delete_object(cls.container_name,
obj['name'])
#Attempt to delete the container
# delete the container
resp, _ = cls.container_client.delete_container(cls.container_name)
#Attempt to the delete the user setup created
# delete the user setup created
cls.data.teardown_all()
@attr(type='smoke')
def test_create_object(self):
# Create storage object, test response
#Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
#Create another Object
# create another object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
@ -82,42 +69,36 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='smoke')
def test_delete_object(self):
# Create and delete a storage object, test responses
#Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# delete object
resp, _ = self.object_client.delete_object(self.container_name,
object_name)
self.assertEqual(resp['status'], '204')
@attr(type='smoke')
def test_object_metadata(self):
# Add metadata to storage object, test if metadata is retrievable
# add metadata to storage object, test if metadata is retrievable
#Create Object
# create Object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
#Set Object Metadata
# set object metadata
meta_key = rand_name(name='test-')
meta_value = rand_name(name='MetaValue-')
orig_metadata = {meta_key: meta_value}
resp, _ = self.object_client.update_object_metadata(
self.container_name, object_name,
orig_metadata)
self.container_name, object_name, orig_metadata)
self.assertEqual(resp['status'], '202')
#Get Object Metadata
# get object metadata
resp, resp_metadata = self.object_client.list_object_metadata(
self.container_name, object_name)
self.assertEqual(resp['status'], '200')
actual_meta_key = 'x-object-meta-' + meta_key
self.assertTrue(actual_meta_key in resp)
@ -125,138 +106,121 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='smoke')
def test_get_object(self):
# Retrieve object's data(in response body)
# retrieve object's data (in response body)
#Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# get object
resp, body = self.object_client.get_object(self.container_name,
object_name)
self.assertEqual(resp['status'], '200')
# Check data
self.assertEqual(body, data)
@attr(type='smoke')
def test_copy_object_in_same_container(self):
# Copy storage object
# Create source Object
# create source object
src_object_name = rand_name(name='SrcObject')
src_data = arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name, src_data)
# Create destination Object
src_object_name,
src_data)
# create destination object
dst_object_name = rand_name(name='DstObject')
dst_data = arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name, dst_data)
# Copy source object to destination
dst_object_name,
dst_data)
# copy source object to destination
resp, _ = self.object_client.copy_object_in_same_container(
self.container_name, src_object_name, dst_object_name)
self.assertEqual(resp['status'], '201')
# Check data
# check data
resp, body = self.object_client.get_object(self.container_name,
dst_object_name)
self.assertEqual(body, src_data)
@attr(type='smoke')
def test_copy_object_to_itself(self):
# Change the content type of an existing object
# change the content type of an existing object
# Create Object
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# Get the old content type
self.object_client.create_object(self.container_name,
object_name, data)
# get the old content type
resp_tmp, _ = self.object_client.list_object_metadata(
self.container_name,
object_name)
# Change the content type of the object
self.container_name, object_name)
# change the content type of the object
metadata = {'content-type': 'text/plain; charset=UTF-8'}
self.assertNotEqual(resp_tmp['content-type'], metadata['content-type'])
resp, _ = self.object_client.copy_object_in_same_container(
self.container_name, object_name, object_name, metadata)
self.assertEqual(resp['status'], '201')
# Check the content type
# check the content type
resp, _ = self.object_client.list_object_metadata(self.container_name,
object_name)
self.assertEqual(resp['content-type'], metadata['content-type'])
@attr(type='smoke')
def test_copy_object_2d_way(self):
# Copy storage object
# Create source Object
# create source object
src_object_name = rand_name(name='SrcObject')
src_data = arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name, src_data)
# Create destination Object
# create destination object
dst_object_name = rand_name(name='DstObject')
dst_data = arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name, dst_data)
# Copy source object to destination
# copy source object to destination
resp, _ = self.object_client.copy_object_2d_way(self.container_name,
src_object_name,
dst_object_name)
self.assertEqual(resp['status'], '201')
# Check data
# check data
resp, body = self.object_client.get_object(self.container_name,
dst_object_name)
self.assertEqual(body, src_data)
@attr(type='smoke')
def test_copy_object_across_containers(self):
# Copy storage object across containers
#Create a container so as to use as source container
# create a container to use as asource container
src_container_name = rand_name(name='TestSourceContainer')
self.container_client.create_container(src_container_name)
#Create a container so as to use as destination container
# create a container to use as a destination container
dst_container_name = rand_name(name='TestDestinationContainer')
self.container_client.create_container(dst_container_name)
# Create Object in source container
# create object in source container
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 2,
base_text=object_name)
resp, _ = self.object_client.create_object(src_container_name,
object_name, data)
#Set Object Metadata
# set object metadata
meta_key = rand_name(name='test-')
meta_value = rand_name(name='MetaValue-')
orig_metadata = {meta_key: meta_value}
resp, _ = self.object_client.update_object_metadata(src_container_name,
object_name,
orig_metadata)
self.assertEqual(resp['status'], '202')
try:
# Copy object from source container to destination container
# copy object from source container to destination container
resp, _ = self.object_client.copy_object_across_containers(
src_container_name, object_name, dst_container_name,
object_name)
self.assertEqual(resp['status'], '201')
# Check if object is present in destination container
# check if object is present in destination container
resp, body = self.object_client.get_object(dst_container_name,
object_name)
self.assertEqual(body, data)
@ -268,12 +232,12 @@ class ObjectTest(base.BaseObjectTest):
self.fail("Got exception :%s ; while copying"
" object across containers" % e)
finally:
#Delete objects from respective containers
# delete objects from respective containers
resp, _ = self.object_client.delete_object(dst_container_name,
object_name)
resp, _ = self.object_client.delete_object(src_container_name,
object_name)
#Delete containers created in this method
# delete containers created in this method
resp, _ = self.container_client.delete_container(
src_container_name)
resp, _ = self.container_client.delete_container(
@ -281,19 +245,16 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='smoke')
def test_access_public_container_object_without_using_creds(self):
# Make container public-readable, and access the object
# anonymously, e.g. without using credentials
# make container public-readable and access an object in it object
# anonymously, without using credentials
try:
resp_meta = None
# Update Container Metadata to make public readable
# update container metadata to make it publicly readable
cont_headers = {'X-Container-Read': '.r:*,.rlistings'}
resp_meta, body = self.container_client.update_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
self.container_name, metadata=cont_headers, metadata_prefix='')
self.assertEqual(resp_meta['status'], '204')
# Create Object
# create object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
@ -301,25 +262,23 @@ class ObjectTest(base.BaseObjectTest):
object_name, data)
self.assertEqual(resp['status'], '201')
# List container metadata
# list container metadata
resp_meta, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp_meta['status'], '204')
self.assertIn('x-container-read', resp_meta)
self.assertEqual(resp_meta['x-container-read'], '.r:*,.rlistings')
# Trying to Get Object with empty Headers as it is public readable
# trying to get object with empty headers as it is public readable
resp, body = self.custom_object_client.get_object(
self.container_name, object_name,
metadata={})
self.container_name, object_name, metadata={})
self.assertEqual(body, data)
finally:
if resp_meta['status'] == '204':
# Delete updated container metadata, to revert back.
# delete updated container metadata, to revert back.
resp, body = self.container_client.delete_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
@ -328,9 +287,8 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='smoke')
def test_access_public_object_with_another_user_creds(self):
#Make container public-readable, and access the object
#anonymously, e.g. using another user credentials
# make container public-readable and access an object in it using
# another user's credentials
try:
resp_meta = None
cont_headers = {'X-Container-Read': '.r:*,.rlistings'}
@ -338,7 +296,7 @@ class ObjectTest(base.BaseObjectTest):
self.container_name, metadata=cont_headers,
metadata_prefix='')
self.assertEqual(resp_meta['status'], '204')
# Create Object
# create object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 1,
base_text=object_name)
@ -346,18 +304,17 @@ class ObjectTest(base.BaseObjectTest):
object_name, data)
self.assertEqual(resp['status'], '201')
# List container metadata
# list container metadata
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
self.assertIn('x-container-read', resp)
self.assertEqual(resp['x-container-read'], '.r:*,.rlistings')
# Trying to GET Auth Token of Alternate user
# get auth token of alternative user
token = self.identity_client_alt.get_auth()
headers = {'X-Auth-Token': token}
# Trying to create object with Alternate user creds
# access object using alternate user creds
resp, body = self.custom_object_client.get_object(
self.container_name, object_name,
metadata=headers)
@ -369,11 +326,10 @@ class ObjectTest(base.BaseObjectTest):
finally:
if resp_meta['status'] == '204':
# Delete updated container metadata, to revert back.
# delete updated container metadata, to revert back.
resp, body = self.container_client.delete_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
@ -383,20 +339,18 @@ class ObjectTest(base.BaseObjectTest):
@testtools.skip('Until Bug #1020722 is resolved.')
@attr(type='smoke')
def test_write_public_object_without_using_creds(self):
#Make container public-writable, and create object
#anonymously, e.g. without using credentials
# make container public-writable, and create object anonymously, e.g.
# without using credentials
try:
resp_meta = None
# Update Container Metadata to make public readable
# update container metadata to make publily writable
cont_headers = {'X-Container-Write': '-*'}
resp_meta, body = self.container_client.update_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
self.container_name, metadata=cont_headers, metadata_prefix='')
self.assertEqual(resp_meta['status'], '204')
# List container metadata
# list container metadata
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
self.assertIn('x-container-write', resp)
self.assertEqual(resp['x-container-write'], '-*')
@ -404,14 +358,11 @@ class ObjectTest(base.BaseObjectTest):
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
headers = {'Content-Type': 'application/json',
'Accept': 'application/json'}
#Trying to Create object without using creds
# create object as anonymous user
resp, body = self.custom_object_client.create_object(
self.container_name, object_name,
data, metadata=headers)
self.container_name, object_name, data, metadata=headers)
self.assertEqual(resp['status'], '201')
except Exception as e:
@ -420,11 +371,10 @@ class ObjectTest(base.BaseObjectTest):
finally:
if resp_meta['status'] == '204':
# Delete updated container metadata, to revert back.
# delete updated container metadata, to revert back.
resp, body = self.container_client.delete_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
@ -434,52 +384,45 @@ class ObjectTest(base.BaseObjectTest):
@testtools.skip('Until Bug #1020722 is resolved.')
@attr(type='smoke')
def test_write_public_with_another_user_creds(self):
#Make container public-writable, and create object
#anonymously, e.g. with another user credentials
# make container public-writable, and create object with another user's
# credentials
try:
resp_meta = None
# Update Container Metadata to make public readable
# update container metadata to make it publicly writable
cont_headers = {'X-Container-Write': '-*'}
resp_meta, body = self.container_client.update_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
self.assertEqual(resp_meta['status'], '204')
# List container metadata
# list container metadata
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
self.assertIn('x-container-write', resp)
self.assertEqual(resp['x-container-write'], '-*')
#Trying to GET auth token of Alternate user
# trying to get auth token of alternative user
token = self.identity_client_alt.get_auth()
headers = {'Content-Type': 'application/json',
'Accept': 'application/json',
'X-Auth-Token': token}
#Trying to Create an object with another user creds
# trying to create an object with another user's creds
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, body = self.custom_object_client.create_object(
self.container_name, object_name,
data, metadata=headers)
self.container_name, object_name, data, metadata=headers)
self.assertEqual(resp['status'], '201')
except Exception as e:
self.fail("Failed to create public writable object with another"
" user creds raised exception is %s" % e)
finally:
if resp_meta['status'] == '204':
# Delete updated container metadata, to revert back.
# delete updated container metadata, to revert back.
resp, body = self.container_client.delete_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
resp, _ = self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
@ -488,34 +431,26 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='negative')
def test_access_object_without_using_creds(self):
# Attempt to access the object anonymously, e.g.
# not using any credentials
# Create Object
# create object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
self.assertEqual(resp['status'], '201')
# Trying to Get Object with empty Headers
# trying to get object with empty headers
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.get_object,
self.container_name, object_name, metadata={})
@attr(type='negative')
def test_write_object_without_using_creds(self):
# Attempt to write to the object anonymously, e.g.
# not using any credentials
# Trying to Create Object with empty Headers
# trying to create object with empty headers
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
obj_headers = {'Content-Type': 'application/json',
'Accept': 'application/json'}
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.create_object,
self.container_name, object_name, data,
@ -523,30 +458,25 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='negative')
def test_delete_object_without_using_creds(self):
# Attempt to delete the object anonymously,
# e.g. not using any credentials
# Create Object
# create object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# Trying to Delete Object with empty Headers
# trying to delete object with empty headers
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.delete_object,
self.container_name, object_name)
@attr(type='negative')
def test_write_object_with_non_authorized_user(self):
#Attempt to upload another file using non authorized user
# attempt to upload another file using non-authorized user
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 5,
base_text=object_name)
# Trying to Create Object with non authorized user token
# trying to create object with non-authorized user
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.create_object,
self.container_name, object_name, data,
@ -554,18 +484,14 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='negative')
def test_read_object_with_non_authorized_user(self):
#Attempt to download the file using non authorized user
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 5,
base_text=object_name)
resp, body = self.object_client.create_object(
self.container_name, object_name,
data)
self.container_name, object_name, data)
self.assertEqual(resp['status'], '201')
# Trying to Get Object with non authorized user token
# trying to get object with non authorized user token
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.get_object,
self.container_name, object_name,
@ -573,18 +499,13 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='negative')
def test_delete_object_with_non_authorized_user(self):
#Attempt to delete container using non authorized user
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 5,
base_text=object_name)
resp, body = self.object_client.create_object(
self.container_name, object_name,
data)
self.container_name, object_name, data)
self.assertEqual(resp['status'], '201')
# Trying to Delete Object with non authorized user token
# trying to delete object with non-authorized user token
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.delete_object,
self.container_name, object_name,
@ -593,11 +514,11 @@ class ObjectTest(base.BaseObjectTest):
@testtools.skip('Until Bug #1097137 is resolved.')
@attr(type='positive')
def test_get_object_using_temp_url(self):
#Access object using temp url within expiry time
# access object using temporary URL within expiration time
try:
#Update Account Metadata
# Flag to check if account metadata got updated
# update account metadata
# flag to check if account metadata got updated
flag = False
key = 'Meta'
metadata = {'Temp-URL-Key': key}
@ -605,27 +526,23 @@ class ObjectTest(base.BaseObjectTest):
metadata=metadata)
self.assertEqual(resp['status'], '204')
flag = True
resp, _ = self.account_client.list_account_metadata()
self.assertIn('x-account-meta-temp-url-key', resp)
self.assertEqual(resp['x-account-meta-temp-url-key'], key)
# Create Object
# create object
object_name = rand_name(name='ObjectTemp')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
self.object_client.create_object(self.container_name,
object_name, data)
expires = int(time.time() + 10)
#Trying to GET object using temp URL with in expiry time
# trying to get object using temp url with in expiry time
_, body = self.object_client.get_object_using_temp_url(
self.container_name, object_name,
expires, key)
self.assertEqual(body, data)
finally:
if flag:
resp, _ = self.account_client.delete_account_metadata(
@ -635,35 +552,29 @@ class ObjectTest(base.BaseObjectTest):
@attr(type='positive')
def test_object_upload_in_segments(self):
#Attempt to upload object in segments
#Create Object
# create object
object_name = rand_name(name='LObject')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
segments = 10
self.object_client.create_object(self.container_name,
object_name, data)
#Uploading 10 segments
# uploading 10 segments
for i in range(segments):
resp, _ = self.object_client.create_object_segments(
self.container_name, object_name,
i, data)
# Creating a Manifest File (Metadata Update)
self.container_name, object_name, i, data)
# creating a manifest file (metadata update)
metadata = {'X-Object-Manifest': '%s/%s/'
% (self.container_name, object_name)}
resp, _ = self.object_client.update_object_metadata(
self.container_name, object_name,
metadata, metadata_prefix='')
self.container_name, object_name, metadata, metadata_prefix='')
resp, _ = self.object_client.list_object_metadata(
self.container_name, object_name)
self.assertIn('x-object-manifest', resp)
self.assertEqual(resp['x-object-manifest'],
'%s/%s/' % (self.container_name, object_name))
#Downloading the object
# downloading the object
resp, body = self.object_client.get_object(
self.container_name, object_name)
self.assertEqual(data * segments, body)

View File

@ -21,7 +21,6 @@ from tempest.tests.object_storage import base
class ContainerTest(base.BaseObjectTest):
@classmethod
def setUpClass(cls):
super(ContainerTest, cls).setUpClass()
@ -30,16 +29,13 @@ class ContainerTest(base.BaseObjectTest):
@classmethod
def tearDownClass(cls):
for container in cls.containers:
#Get list of all object in the container
objlist = \
cls.container_client.list_all_container_objects(container)
#Attempt to delete every object in the container
# delete every object in the container
for obj in objlist:
resp, _ = \
cls.object_client.delete_object(container, obj['name'])
#Attempt to delete the container
# delete the container
resp, _ = cls.container_client.delete_container(container)
def assertContainer(self, container, count, byte, versioned):
@ -54,16 +50,13 @@ class ContainerTest(base.BaseObjectTest):
@attr(type='smoke')
def test_versioned_container(self):
# Versioned container responses tests
# Create a containers
# create container
vers_container_name = rand_name(name='TestVersionContainer')
resp, body = self.container_client.create_container(
vers_container_name)
self.containers.append(vers_container_name)
self.assertIn(resp['status'], ('202', '201'))
self.assertContainer(vers_container_name, '0', '0',
'Missing Header')
self.assertContainer(vers_container_name, '0', '0', 'Missing Header')
base_container_name = rand_name(name='TestBaseContainer')
headers = {'X-versions-Location': vers_container_name}
@ -75,18 +68,17 @@ class ContainerTest(base.BaseObjectTest):
self.assertIn(resp['status'], ('202', '201'))
self.assertContainer(base_container_name, '0', '0',
vers_container_name)
# Create Object
object_name = rand_name(name='TestObject')
# create object
resp, _ = self.object_client.create_object(base_container_name,
object_name, '1')
# create 2nd version of object
resp, _ = self.object_client.create_object(base_container_name,
object_name, '2')
resp, body = self.object_client.get_object(base_container_name,
object_name)
self.assertEqual(body, '2')
# Delete Object version 2
# delete object version 2
resp, _ = self.object_client.delete_object(base_container_name,
object_name)
self.assertContainer(base_container_name, '1', '1',
@ -94,21 +86,18 @@ class ContainerTest(base.BaseObjectTest):
resp, body = self.object_client.get_object(base_container_name,
object_name)
self.assertEqual(body, '1')
# Delete Object version 1
# delete object version 1
resp, _ = self.object_client.delete_object(base_container_name,
object_name)
# Containers are Empty
# containers should be empty
self.assertContainer(base_container_name, '0', '0',
vers_container_name)
self.assertContainer(vers_container_name, '0', '0',
'Missing Header')
# Delete Containers
# delete containers
resp, _ = self.container_client.delete_container(base_container_name)
self.assertEqual(resp['status'], '204')
self.containers.remove(base_container_name)
resp, _ = self.container_client.delete_container(vers_container_name)
self.assertEqual(resp['status'], '204')
self.containers.remove(vers_container_name)