Add Functional Tests of input parameter test of normal for Bucket

This patch adds input parameter test of normal status for Bucket.
Add following tests.

And remove the extra reset process in test_head_bucket_error.

note: add a method to each API's parameter.
      combination of parameters is not tested.

PUT Bucket
 [body(xml)] LocationConstraint

GET Bucket
 [query] delimiter
 [query] encoding-type
 [query] marker
 [query] max-keys
 [query] prefix

blueprint functional-tests

Change-Id: I87094388796df1f291ae93455cc80cc7de9c09fb
This commit is contained in:
Naoto Nishizono
2015-03-11 15:39:05 +09:00
parent 07f2681876
commit 5db4e013db

View File

@@ -18,7 +18,7 @@ import unittest
from swift3.test.functional.s3_test_client import Connection
from swift3.test.functional.utils import get_error_code,\
assert_common_response_headers
from swift3.etree import fromstring
from swift3.etree import fromstring, tostring, Element, SubElement
from swift3.cfg import CONF
from swift3.test.functional import Swift3FunctionalTestCase
@@ -27,6 +27,11 @@ class TestSwift3Bucket(Swift3FunctionalTestCase):
def setUp(self):
super(TestSwift3Bucket, self).setUp()
def _gen_location_xml(self, location):
elem = Element('CreateBucketConfiguration')
SubElement(elem, 'LocationConstraint').text = location
return tostring(elem)
def test_bucket(self):
bucket = 'bucket'
@@ -114,7 +119,13 @@ class TestSwift3Bucket(Swift3FunctionalTestCase):
self.conn.make_request('PUT', 'bucket')
status, headers, body = self.conn.make_request('PUT', 'bucket')
self.assertEquals(get_error_code(body), 'BucketAlreadyExists')
self.conn.make_request('DELETE', 'bucket')
def test_put_bucket_with_LocationConstraint(self):
bucket = 'bucket'
xml = self._gen_location_xml('US')
status, headers, body = \
self.conn.make_request('PUT', bucket, body=xml)
self.assertEquals(status, 200)
def test_get_bucket_error(self):
self.conn.make_request('PUT', 'bucket')
@@ -130,7 +141,132 @@ class TestSwift3Bucket(Swift3FunctionalTestCase):
status, headers, body = self.conn.make_request('GET', 'nothing')
self.assertEquals(get_error_code(body), 'NoSuchBucket')
self.conn.make_request('DELETE', 'bucket')
def _prepare_test_get_bucket(self, bucket, objects):
self.conn.make_request('PUT', bucket)
for obj in objects:
self.conn.make_request('PUT', bucket, obj)
def test_get_bucket_with_delimiter(self):
bucket = 'bucket'
put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
'dir/subdir/object')
self._prepare_test_get_bucket(bucket, put_objects)
delimiter = '/'
query = 'delimiter=%s' % delimiter
expect_objects = ('object', 'object2')
expect_prefixes = ('dir/', 'subdir/', 'subdir2/')
status, headers, body = \
self.conn.make_request('GET', bucket, query=query)
self.assertEquals(status, 200)
elem = fromstring(body, 'ListBucketResult')
self.assertEquals(elem.find('Delimiter').text, delimiter)
resp_objects = elem.findall('./Contents')
self.assertEquals(len(list(resp_objects)), len(expect_objects))
for i, o in enumerate(resp_objects):
self.assertEquals(o.find('Key').text, expect_objects[i])
self.assertTrue(o.find('LastModified').text is not None)
self.assertTrue(o.find('ETag').text is not None)
self.assertTrue(o.find('Size').text is not None)
self.assertEquals(o.find('StorageClass').text, 'STANDARD')
self.assertTrue(o.find('Owner/ID').text, self.conn.user_id)
self.assertTrue(o.find('Owner/DisplayName').text,
self.conn.user_id)
resp_prefixes = elem.findall('CommonPrefixes')
self.assertEquals(len(resp_prefixes), len(expect_prefixes))
for i, p in enumerate(resp_prefixes):
self.assertEquals(p.find('./Prefix').text, expect_prefixes[i])
def test_get_bucket_with_encoding_type(self):
bucket = 'bucket'
put_objects = ('object', 'object2')
self._prepare_test_get_bucket(bucket, put_objects)
encoding_type = 'url'
query = 'encoding-type=%s' % encoding_type
status, headers, body = \
self.conn.make_request('GET', bucket, query=query)
self.assertEquals(status, 200)
elem = fromstring(body, 'ListBucketResult')
self.assertEquals(elem.find('EncodingType').text, encoding_type)
def test_get_bucket_with_marker(self):
bucket = 'bucket'
put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
'dir/subdir/object')
self._prepare_test_get_bucket(bucket, put_objects)
marker = 'object'
query = 'marker=%s' % marker
expect_objects = ('object2', 'subdir/object', 'subdir2/object')
status, headers, body = \
self.conn.make_request('GET', bucket, query=query)
self.assertEquals(status, 200)
elem = fromstring(body, 'ListBucketResult')
self.assertEquals(elem.find('Marker').text, marker)
resp_objects = elem.findall('./Contents')
self.assertEquals(len(list(resp_objects)), len(expect_objects))
for i, o in enumerate(resp_objects):
self.assertEquals(o.find('Key').text, expect_objects[i])
self.assertTrue(o.find('LastModified').text is not None)
self.assertTrue(o.find('ETag').text is not None)
self.assertTrue(o.find('Size').text is not None)
self.assertEquals(o.find('StorageClass').text, 'STANDARD')
self.assertTrue(o.find('Owner/ID').text, self.conn.user_id)
self.assertTrue(o.find('Owner/DisplayName').text,
self.conn.user_id)
def test_get_bucket_with_max_keys(self):
bucket = 'bucket'
put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
'dir/subdir/object')
self._prepare_test_get_bucket(bucket, put_objects)
max_keys = '2'
query = 'max-keys=%s' % max_keys
expect_objects = ('dir/subdir/object', 'object')
status, headers, body = \
self.conn.make_request('GET', bucket, query=query)
self.assertEquals(status, 200)
elem = fromstring(body, 'ListBucketResult')
self.assertEquals(elem.find('MaxKeys').text, max_keys)
resp_objects = elem.findall('./Contents')
self.assertEquals(len(list(resp_objects)), len(expect_objects))
for i, o in enumerate(resp_objects):
self.assertEquals(o.find('Key').text, expect_objects[i])
self.assertTrue(o.find('LastModified').text is not None)
self.assertTrue(o.find('ETag').text is not None)
self.assertTrue(o.find('Size').text is not None)
self.assertEquals(o.find('StorageClass').text, 'STANDARD')
self.assertTrue(o.find('Owner/ID').text, self.conn.user_id)
self.assertTrue(o.find('Owner/DisplayName').text,
self.conn.user_id)
def test_get_bucket_with_prefix(self):
bucket = 'bucket'
req_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
'dir/subdir/object')
self._prepare_test_get_bucket(bucket, req_objects)
prefix = 'object'
query = 'prefix=%s' % prefix
expect_objects = ('object', 'object2')
status, headers, body = \
self.conn.make_request('GET', bucket, query=query)
self.assertEquals(status, 200)
elem = fromstring(body, 'ListBucketResult')
self.assertEquals(elem.find('Prefix').text, prefix)
resp_objects = elem.findall('./Contents')
self.assertEquals(len(list(resp_objects)), len(expect_objects))
for i, o in enumerate(resp_objects):
self.assertEquals(o.find('Key').text, expect_objects[i])
self.assertTrue(o.find('LastModified').text is not None)
self.assertTrue(o.find('ETag').text is not None)
self.assertTrue(o.find('Size').text is not None)
self.assertEquals(o.find('StorageClass').text, 'STANDARD')
self.assertTrue(o.find('Owner/ID').text, self.conn.user_id)
self.assertTrue(o.find('Owner/DisplayName').text,
self.conn.user_id)
def test_head_bucket_error(self):
self.conn.make_request('PUT', 'bucket')
@@ -147,8 +283,6 @@ class TestSwift3Bucket(Swift3FunctionalTestCase):
status, headers, body = self.conn.make_request('HEAD', 'nothing')
self.assertEquals(status, 404)
self.conn.make_request('DELETE', 'bucket')
def test_delete_bucket_error(self):
status, headers, body = \
self.conn.make_request('DELETE', 'bucket+invalid')