Merge "Tests to check few object actions anonymously"

This commit is contained in:
Jenkins 2012-12-31 19:50:35 +00:00 committed by Gerrit Code Review
commit 56c86bfc0a
2 changed files with 115 additions and 25 deletions

View File

@ -19,6 +19,7 @@ import httplib2
import json
import re
from tempest.common.rest_client import RestClient
from tempest import exceptions
class ObjectClient(RestClient):
@ -141,12 +142,11 @@ class ObjectClientCustomizedHeader(RestClient):
req_url = "%s/%s" % (self.base_url, url)
resp, resp_body = self.http_obj.request(req_url, method,
headers=headers, body=body)
if resp.status in (401, 403):
try:
resp_body = json.loads(resp_body)
raise exceptions.Unauthorized(resp_body['error']['message'])
except ValueError:
pass
if resp.status == 401 or resp.status == 403:
self._log(req_url, body, resp, resp_body)
raise exceptions.Unauthorized()
return resp, resp_body
def get_object(self, container, object_name, metadata=None):
@ -159,3 +159,22 @@ class ObjectClientCustomizedHeader(RestClient):
url = "{0}/{1}".format(container, object_name)
resp, body = self.get(url, headers=headers)
return resp, body
def create_object(self, container, object_name, data, metadata=None):
"""Create storage object"""
headers = {}
if metadata:
for key in metadata:
headers[str(key)] = metadata[key]
url = "%s/%s" % (str(container), str(object_name))
resp, body = self.put(url, data, headers=headers)
return resp, body
def delete_object(self, container, object_name):
"""Delete storage object"""
url = "%s/%s" % (str(container), str(object_name))
resp, body = self.delete(url)
return resp, body

View File

@ -19,6 +19,7 @@ from nose.plugins.attrib import attr
from tempest.common.utils.data_utils import arbitrary_string
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
from tempest.tests.object_storage import base
@ -265,34 +266,104 @@ class ObjectTest(base.BaseObjectTest):
dst_container_name)
@attr(type='smoke')
def test_access_object_without_using_creds(self):
def test_access_public_container_object_without_using_creds(self):
"""Make container public-readable, and access the object
anonymously, e.g. without using credentials"""
# Update Container Metadata to make public readable
headers = {'X-Container-Read': '.r:*,.rlistings'}
resp, body = \
self.container_client.update_container_metadata(
self.container_name, metadata=headers, metadata_prefix='')
self.assertIn(resp['status'], '204')
try:
resp_meta = None
# Update Container Metadata to make public readable
cont_headers = {'X-Container-Read': '.r:*,.rlistings'}
resp_meta, body = \
self.container_client.update_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
self.assertEqual(resp_meta['status'], '204')
# Create Object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
self.assertEqual(resp['status'], '201')
# List container metadata
resp_meta, _ = \
self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp_meta['status'], '204')
self.assertIn('x-container-read', resp_meta)
self.assertEqual(resp_meta['x-container-read'], '.r:*,.rlistings')
# Trying to Get Object with empty Headers as it is public readable
resp, body = \
self.custom_object_client.get_object(self.container_name,
object_name, metadata={})
self.assertEqual(body, data)
finally:
if resp_meta['status'] == '204':
# Delete updated container metadata, to revert back.
resp, body = \
self.container_client.delete_container_metadata(
self.container_name, metadata=cont_headers,
metadata_prefix='')
resp, _ = \
self.container_client.list_container_metadata(
self.container_name)
self.assertEqual(resp['status'], '204')
self.assertIn('x-container-read', resp)
self.assertEqual(resp['x-container-read'], 'x')
@attr(type='negative')
def test_access_object_without_using_creds(self):
"""Attempt to access the object anonymously, e.g.
not using any credentials"""
# Create Object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name) * 1,
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
self.assertEqual(resp['status'], '201')
# List container metadata
resp, _ = \
self.container_client.list_container_metadata(self.container_name)
self.assertEqual(resp['status'], '204')
self.assertIn('x-container-read', resp)
self.assertEqual(resp['x-container-read'], '.r:*,.rlistings')
# Trying to Get Object with empty Headers
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.get_object,
self.container_name, object_name, metadata={})
# Trying to Get Object with empty Headers as it is public readable
resp, body = \
self.custom_object_client.get_object(self.container_name,
object_name, metadata={})
self.assertEqual(body, data)
@attr(type='negative')
def test_write_object_without_using_creds(self):
"""Attempt to write to the object anonymously, e.g.
not using any credentials"""
# Trying to Create Object with empty Headers
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
obj_headers = {'Content-Type': 'application/json',
'Accept': 'application/json'}
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.create_object,
self.container_name, object_name, data,
metadata=obj_headers)
@attr(type='negative')
def test_delete_object_without_using_creds(self):
"""Attempt to delete the object anonymously,
e.g. not using any credentials"""
# Create Object
object_name = rand_name(name='Object')
data = arbitrary_string(size=len(object_name),
base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# Trying to Delete Object with empty Headers
self.assertRaises(exceptions.Unauthorized,
self.custom_object_client.delete_object,
self.container_name, object_name)