Add functional tests of error for ACL
This patch adds functional tests of error for ACL's API.
Add following error tests.
PUT Bucket ACL
SignatureDoesNotMatch
NoSuchBucket
AccessDenied
GET Bucket ACL
SignatureDoesNotMatch
NoSuchBucket
AccessDenied
GET Object ACL
SignatureDoesNotMatch
NoSuchKey
AccessDenied
note: since PUT Object ACL is not supported, this patch omits the test for this.
add a method to each API.
move PUT Bucket of each test method to setUp().
remove tester2's admin permission for test of AccessDenied.
Change-Id: I5d0214a59d175719f7b895b21f83f97764bf88a2
This commit is contained in:
@@ -18,7 +18,7 @@ use = egg:swift#proxy
|
||||
use = egg:swift#tempauth
|
||||
user_test_admin = admin .admin
|
||||
user_test_tester = testing .admin
|
||||
user_test_tester2 = testing2 .admin
|
||||
user_test_tester2 = testing2
|
||||
|
||||
[filter:swift3]
|
||||
use = egg:swift3#swift3
|
||||
|
||||
@@ -119,7 +119,7 @@ def get_admin_connection():
|
||||
def get_tester2_connection():
|
||||
"""
|
||||
Return tester2 connection behaves as:
|
||||
user_test_tester2 = testing2 .admin
|
||||
user_test_tester2 = testing2
|
||||
"""
|
||||
aws_access_key = os.environ.get('TESTER2_ACCESS_KEY')
|
||||
aws_secret_key = os.environ.get('TESTER2_SECRET_KEY')
|
||||
|
||||
@@ -64,7 +64,7 @@ _create_swift_accounts()
|
||||
_add_user SERVICE service swift password admin
|
||||
_add_user ADMIN test admin admin ResellerAdmin
|
||||
_add_user TESTER test tester testing admin
|
||||
_add_user TESTER2 test tester2 testing2 admin
|
||||
_add_user TESTER2 test tester2 testing2 member
|
||||
|
||||
SERVICE=$(openstack service create swift --type=object-store | _get_id)
|
||||
openstack endpoint create $SERVICE \
|
||||
|
||||
@@ -16,25 +16,29 @@
|
||||
import unittest
|
||||
|
||||
from swift3.test.functional import Swift3FunctionalTestCase
|
||||
from swift3.test.functional.utils import assert_common_response_headers
|
||||
from swift3.test.functional.s3_test_client import Connection, \
|
||||
get_tester2_connection
|
||||
from swift3.test.functional.utils import assert_common_response_headers, \
|
||||
get_error_code
|
||||
from swift3.etree import fromstring
|
||||
|
||||
|
||||
class TestSwift3Acl(Swift3FunctionalTestCase):
|
||||
def setUp(self):
|
||||
super(TestSwift3Acl, self).setUp()
|
||||
self.bucket = 'bucket'
|
||||
self.obj = 'object'
|
||||
self.conn.make_request('PUT', self.bucket)
|
||||
self.conn2 = get_tester2_connection()
|
||||
|
||||
def test_acl(self):
|
||||
bucket = 'bucket'
|
||||
obj = 'object'
|
||||
self.conn.make_request('PUT', bucket)
|
||||
self.conn.make_request('PUT', bucket, obj)
|
||||
self.conn.make_request('PUT', self.bucket, self.obj)
|
||||
query = 'acl'
|
||||
|
||||
# PUT Bucket ACL
|
||||
headers = {'x-amz-acl': 'public-read'}
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, headers=headers,
|
||||
self.conn.make_request('PUT', self.bucket, headers=headers,
|
||||
query=query)
|
||||
self.assertEquals(status, 200)
|
||||
assert_common_response_headers(self, headers)
|
||||
@@ -42,7 +46,7 @@ class TestSwift3Acl(Swift3FunctionalTestCase):
|
||||
|
||||
# GET Bucket ACL
|
||||
status, headers, body = \
|
||||
self.conn.make_request('GET', bucket, query=query)
|
||||
self.conn.make_request('GET', self.bucket, query=query)
|
||||
self.assertEquals(status, 200)
|
||||
assert_common_response_headers(self, headers)
|
||||
# TODO: Fix the response that last-modified must be in the response.
|
||||
@@ -58,7 +62,7 @@ class TestSwift3Acl(Swift3FunctionalTestCase):
|
||||
|
||||
# GET Object ACL
|
||||
status, headers, body = \
|
||||
self.conn.make_request('GET', bucket, obj, query=query)
|
||||
self.conn.make_request('GET', self.bucket, self.obj, query=query)
|
||||
self.assertEquals(status, 200)
|
||||
assert_common_response_headers(self, headers)
|
||||
# TODO: Fix the response that last-modified must be in the response.
|
||||
@@ -72,5 +76,54 @@ class TestSwift3Acl(Swift3FunctionalTestCase):
|
||||
acl = elem.find('AccessControlList')
|
||||
self.assertTrue(acl.find('Grant') is not None)
|
||||
|
||||
def test_put_bucket_acl_error(self):
|
||||
req_headers = {'x-amz-acl': 'public-read'}
|
||||
aws_error_conn = Connection(aws_secret_key='invalid')
|
||||
status, headers, body = \
|
||||
aws_error_conn.make_request('PUT', self.bucket,
|
||||
headers=req_headers, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', 'nothing',
|
||||
headers=req_headers, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'NoSuchBucket')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn2.make_request('PUT', self.bucket,
|
||||
headers=req_headers, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'AccessDenied')
|
||||
|
||||
def test_get_bucket_acl_error(self):
|
||||
aws_error_conn = Connection(aws_secret_key='invalid')
|
||||
status, headers, body = \
|
||||
aws_error_conn.make_request('GET', self.bucket, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn.make_request('GET', 'nothing', query='acl')
|
||||
self.assertEquals(get_error_code(body), 'NoSuchBucket')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn2.make_request('GET', self.bucket, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'AccessDenied')
|
||||
|
||||
def test_get_object_acl_error(self):
|
||||
self.conn.make_request('PUT', self.bucket, self.obj)
|
||||
|
||||
aws_error_conn = Connection(aws_secret_key='invalid')
|
||||
status, headers, body = \
|
||||
aws_error_conn.make_request('GET', self.bucket, self.obj,
|
||||
query='acl')
|
||||
self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn.make_request('GET', self.bucket, 'nothing', query='acl')
|
||||
self.assertEquals(get_error_code(body), 'NoSuchKey')
|
||||
|
||||
status, headers, body = \
|
||||
self.conn2.make_request('GET', self.bucket, self.obj, query='acl')
|
||||
self.assertEquals(get_error_code(body), 'AccessDenied')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user