# Copyright (c) 2014 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest from swift.common.swob import Request from swift.common.middleware.s3api.etree import tostring, Element, SubElement from swift.common.middleware.s3api.subresource import ACL, ACLPrivate, User, \ Owner, Grant from test.unit.common.middleware.s3api import S3ApiTestCaseAcl XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance' def _make_xml(grantee): owner = 'test:tester' permission = 'READ' elem = Element('AccessControlPolicy') elem_owner = SubElement(elem, 'Owner') SubElement(elem_owner, 'ID').text = owner SubElement(elem_owner, 'DisplayName').text = owner acl_list_elem = SubElement(elem, 'AccessControlList') elem_grant = SubElement(acl_list_elem, 'Grant') elem_grant.append(grantee) SubElement(elem_grant, 'Permission').text = permission return tostring(elem) class TestS3ApiS3Acl(S3ApiTestCaseAcl): def test_bucket_acl_PUT_with_other_owner(self): req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header()}, body=tostring( ACLPrivate( Owner(id='test:other', name='test:other')).elem())) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'AccessDenied') def test_object_acl_PUT_xml_error(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header()}, body="invalid xml") status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'MalformedACLError') def test_canned_acl_private(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'private'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') self.assertIn('REMOTE_USER', req.environ) def test_canned_acl_public_read(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'public-read'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_canned_acl_public_read_write(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'public-read-write'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_canned_acl_authenticated_read(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'authenticated-read'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_canned_acl_bucket_owner_read(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'bucket-owner-read'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_canned_acl_bucket_owner_full_control(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'bucket-owner-full-control'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_invalid_canned_acl(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-acl': 'invalid'}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidArgument') def _test_grant_header(self, permission): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-' + permission: 'id=test:tester'}) return self.call_s3api(req) def test_grant_read(self): status, headers, body = self._test_grant_header('read') self.assertEqual(status.split()[0], '200') def test_grant_write(self): status, headers, body = self._test_grant_header('write') self.assertEqual(status.split()[0], '200') def test_grant_read_acp(self): status, headers, body = self._test_grant_header('read-acp') self.assertEqual(status.split()[0], '200') def test_grant_write_acp(self): status, headers, body = self._test_grant_header('write-acp') self.assertEqual(status.split()[0], '200') def test_grant_full_control(self): status, headers, body = self._test_grant_header('full-control') self.assertEqual(status.split()[0], '200') def test_grant_invalid_permission(self): status, headers, body = self._test_grant_header('invalid') self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader') def test_grant_with_both_header_and_xml(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-full-control': 'id=test:tester'}, body=tostring( ACLPrivate( Owner(id='test:tester', name='test:tester')).elem())) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'UnexpectedContent') def test_grant_with_both_header_and_canned_acl(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-full-control': 'id=test:tester', 'x-amz-acl': 'public-read'}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidRequest') def test_grant_email(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'emailAddress=a@b.c'}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'NotImplemented') def test_grant_email_xml(self): grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI}) grantee.set('{%s}type' % XMLNS_XSI, 'AmazonCustomerByEmail') SubElement(grantee, 'EmailAddress').text = 'Grantees@email.com' xml = _make_xml(grantee=grantee) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header()}, body=xml) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'NotImplemented') def test_grant_invalid_group_xml(self): grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI}) grantee.set('{%s}type' % XMLNS_XSI, 'Invalid') xml = _make_xml(grantee=grantee) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header()}, body=xml) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'MalformedACLError') def test_grant_authenticated_users(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'uri="http://acs.amazonaws.com/groups/' 'global/AuthenticatedUsers"'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_grant_all_users(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'uri="http://acs.amazonaws.com/groups/' 'global/AllUsers"'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_grant_all_users_with_uppercase_type(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'URI="http://acs.amazonaws.com/groups/' 'global/AllUsers"'}) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') def test_grant_invalid_uri(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'uri="http://localhost/"'}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidArgument') def test_grant_invalid_uri_xml(self): grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI}) grantee.set('{%s}type' % XMLNS_XSI, 'Group') SubElement(grantee, 'URI').text = 'invalid' xml = _make_xml(grantee) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header()}, body=xml) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidArgument') def test_grant_invalid_target(self): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac', 'Date': self.get_date_header(), 'x-amz-grant-read': 'key=value'}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidArgument') def _test_bucket_acl_GET(self, account): req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'GET'}, headers={'Authorization': 'AWS %s:hmac' % account, 'Date': self.get_date_header()}) return self.call_s3api(req) def test_bucket_acl_GET_without_permission(self): status, headers, body = self._test_bucket_acl_GET('test:other') self.assertEqual(self._get_error_code(body), 'AccessDenied') def test_bucket_acl_GET_with_read_acp_permission(self): status, headers, body = self._test_bucket_acl_GET('test:read_acp') self.assertEqual(status.split()[0], '200') def test_bucket_acl_GET_with_fullcontrol_permission(self): status, headers, body = self._test_bucket_acl_GET('test:full_control') self.assertEqual(status.split()[0], '200') def test_bucket_acl_GET_with_owner_permission(self): status, headers, body = self._test_bucket_acl_GET('test:tester') self.assertEqual(status.split()[0], '200') def _test_bucket_acl_PUT(self, account, permission='FULL_CONTROL'): acl = ACL(self.default_owner, [Grant(User(account), permission)]) req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS %s:hmac' % account, 'Date': self.get_date_header()}, body=tostring(acl.elem())) return self.call_s3api(req) def test_bucket_acl_PUT_without_permission(self): status, headers, body = self._test_bucket_acl_PUT('test:other') self.assertEqual(self._get_error_code(body), 'AccessDenied') def test_bucket_acl_PUT_with_write_acp_permission(self): status, headers, body = self._test_bucket_acl_PUT('test:write_acp') self.assertEqual(status.split()[0], '200') def test_bucket_acl_PUT_with_fullcontrol_permission(self): status, headers, body = self._test_bucket_acl_PUT('test:full_control') self.assertEqual(status.split()[0], '200') def test_bucket_acl_PUT_with_owner_permission(self): status, headers, body = self._test_bucket_acl_PUT('test:tester') self.assertEqual(status.split()[0], '200') def _test_object_acl_GET(self, account): req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'GET'}, headers={'Authorization': 'AWS %s:hmac' % account, 'Date': self.get_date_header()}) return self.call_s3api(req) def test_object_acl_GET_without_permission(self): status, headers, body = self._test_object_acl_GET('test:other') self.assertEqual(self._get_error_code(body), 'AccessDenied') def test_object_acl_GET_with_read_acp_permission(self): status, headers, body = self._test_object_acl_GET('test:read_acp') self.assertEqual(status.split()[0], '200') def test_object_acl_GET_with_fullcontrol_permission(self): status, headers, body = self._test_object_acl_GET('test:full_control') self.assertEqual(status.split()[0], '200') def test_object_acl_GET_with_owner_permission(self): status, headers, body = self._test_object_acl_GET('test:tester') self.assertEqual(status.split()[0], '200') def _test_object_acl_PUT(self, account, permission='FULL_CONTROL'): acl = ACL(self.default_owner, [Grant(User(account), permission)]) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS %s:hmac' % account, 'Date': self.get_date_header()}, body=tostring(acl.elem())) return self.call_s3api(req) def test_object_acl_PUT_without_permission(self): status, headers, body = self._test_object_acl_PUT('test:other') self.assertEqual(self._get_error_code(body), 'AccessDenied') def test_object_acl_PUT_with_write_acp_permission(self): status, headers, body = self._test_object_acl_PUT('test:write_acp') self.assertEqual(status.split()[0], '200') def test_object_acl_PUT_with_fullcontrol_permission(self): status, headers, body = self._test_object_acl_PUT('test:full_control') self.assertEqual(status.split()[0], '200') def test_object_acl_PUT_with_owner_permission(self): status, headers, body = self._test_object_acl_PUT('test:tester') self.assertEqual(status.split()[0], '200') if __name__ == '__main__': unittest.main()