Fix invalid canned acl response

When x-amz-acl in the request takes invalid value
swift3 must return InvalidArgument error response
instead of current InvalidRequest response.

This patch fixes it and adds unit tests to check
both handle_acl_headers (for normal acl) and
ACL class in swift3.subresource (for s3acl).

Closes-Bug: #1433895
Change-Id: I4f1d1aeed68546346d21a9d43c43c1fd0f415457
This commit is contained in:
Kota Tsuyuzaki
2015-04-06 01:04:16 -07:00
parent 4b6f4066e0
commit 4f94bacde9
4 changed files with 104 additions and 11 deletions

View File

@@ -497,15 +497,21 @@ class ACL(object):
Grant(Grantee.from_header(grantee), permission))
if 'x-amz-acl' in headers:
acl = headers['x-amz-acl']
if len(grants) > 0:
err_msg = 'Specifying both Canned ACLs and Header ' \
'Grants is not allowed'
raise InvalidRequest(err_msg)
grantees = canned_acl_grantees(bucket_owner, object_owner)[acl]
for permission, grantee in grantees:
grants.append(Grant(grantee, permission))
try:
acl = headers['x-amz-acl']
if len(grants) > 0:
err_msg = 'Specifying both Canned ACLs and Header ' \
'Grants is not allowed'
raise InvalidRequest(err_msg)
grantees = canned_acl_grantees(
bucket_owner, object_owner)[acl]
for permission, grantee in grantees:
grants.append(Grant(grantee, permission))
except KeyError:
# expects canned_acl_grantees()[] raises KeyError
raise InvalidArgument('x-amz-acl', headers['x-amz-acl'])
except (KeyError, ValueError):
# TODO: think about we really catch this except sequence
raise InvalidRequest()
if len(grants) == 0:

View File

@@ -21,6 +21,8 @@ from swift3.test.unit import Swift3TestCase
from swift3.etree import fromstring, tostring, Element, SubElement, XMLNS_XSI
from swift3.test.unit.test_s3_acl import s3acl
import mock
from swift3.response import InvalidArgument
from swift3.acl_utils import handle_acl_header
class TestSwift3Acl(Swift3TestCase):
@@ -121,5 +123,50 @@ class TestSwift3Acl(Swift3TestCase):
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'MalformedACLError')
def test_handle_acl_header(self):
def check_generated_acl_header(acl, targets):
req = Request.blank('/bucket',
headers={'X-Amz-Acl': acl})
handle_acl_header(req)
for target in targets:
self.assertTrue(target[0] in req.headers)
self.assertEquals(req.headers[target[0]], target[1])
check_generated_acl_header('public-read',
[('X-Container-Read', '.r:*,.rlistings')])
check_generated_acl_header('public-read-write',
[('X-Container-Read', '.r:*,.rlistings'),
('X-Container-Write', '.r:*')])
check_generated_acl_header('private',
[('X-Container-Read', '.'),
('X-Container-Write', '.')])
@s3acl(s3acl_only=True)
def test_handle_acl_header_with_s3acl(self):
def check_generated_acl_header(acl, targets):
req = Request.blank('/bucket',
headers={'X-Amz-Acl': acl})
for target in targets:
self.assertTrue(target not in req.headers)
self.assertTrue('HTTP_X_AMZ_ACL' in req.environ)
# TODO: add transration and assertion for s3acl
check_generated_acl_header('public-read',
['X-Container-Read'])
check_generated_acl_header('public-read-write',
['X-Container-Read', 'X-Container-Write'])
check_generated_acl_header('private',
['X-Container-Read', 'X-Container-Write'])
def test_handle_acl_with_invalid_header_string(self):
req = Request.blank('/bucket', headers={'X-Amz-Acl': 'invalid'})
with self.assertRaises(InvalidArgument) as cm:
handle_acl_header(req)
self.assertTrue('argument_name' in cm.exception.info)
self.assertEquals(cm.exception.info['argument_name'], 'x-amz-acl')
self.assertTrue('argument_value' in cm.exception.info)
self.assertEquals(cm.exception.info['argument_value'], 'invalid')
if __name__ == '__main__':
unittest.main()

View File

@@ -226,7 +226,7 @@ class TestSwift3S3Acl(Swift3TestCase):
headers={'Authorization': 'AWS test:tester:hmac',
'x-amz-acl': 'invalid'})
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'InvalidRequest')
self.assertEquals(self._get_error_code(body), 'InvalidArgument')
def _test_grant_header(self, permission):
req = Request.blank('/bucket/object?acl',

View File

@@ -17,11 +17,11 @@ import unittest
from swift.common.utils import json
from swift3.response import AccessDenied
from swift3.response import AccessDenied, InvalidArgument
from swift3.subresource import User, AuthenticatedUsers, AllUsers, \
ACLPrivate, ACLPublicRead, ACLPublicReadWrite, ACLAuthenticatedRead, \
ACLBucketOwnerRead, ACLBucketOwnerFullControl, Owner, ACL, encode_acl, \
decode_acl
decode_acl, canned_acl_grantees
from swift3.utils import CONF, sysmeta_header
@@ -278,6 +278,46 @@ class TestSwift3Subresource(unittest.TestCase):
self.assertEqual('test:tester', header_value['Owner'])
self.assertEqual(len(header_value['Grant']), 99)
def test_from_headers_x_amz_acl(self):
canned_acls = ['public-read', 'public-read-write',
'authenticated-read', 'bucket-owner-read',
'bucket-owner-full-control', 'log-delivery-write']
owner = Owner('test:tester', 'test:tester')
# TODO: make a test for canned_acl_grantees function
grantee_map = canned_acl_grantees(owner)
for acl_str in canned_acls:
acl = ACL.from_headers({'x-amz-acl': acl_str}, owner)
expected = grantee_map[acl_str]
self.assertEquals(len(acl.grants), len(expected)) # sanity
# parse Grant object to permission and grantee
actual_grants = [(grant.permission, grant.grantee)
for grant in acl.grants]
assertions = zip(sorted(expected), sorted(actual_grants))
for (expected_permission, expected_grantee), \
(permission, grantee) in assertions:
self.assertEquals(expected_permission, permission)
self.assertTrue(
isinstance(grantee, expected_grantee.__class__))
if isinstance(grantee, User):
self.assertEquals(expected_grantee.id, grantee.id)
self.assertEquals(expected_grantee.display_name,
grantee.display_name)
def test_from_headers_x_amz_acl_invalid(self):
with self.assertRaises(InvalidArgument) as cm:
ACL.from_headers({'x-amz-acl': 'invalid'},
Owner('test:tester', 'test:tester'))
self.assertTrue('argument_name' in cm.exception.info)
self.assertEquals(cm.exception.info['argument_name'], 'x-amz-acl')
self.assertTrue('argument_value' in cm.exception.info)
self.assertEquals(cm.exception.info['argument_value'], 'invalid')
if __name__ == '__main__':
unittest.main()