From fef3260b53c82cc3d2e16cbbfefb31f29f230fea Mon Sep 17 00:00:00 2001 From: Masaki Tsukuda Date: Thu, 27 Nov 2014 17:05:58 +0900 Subject: [PATCH] acl: use S3 ACL for object and bucket APIs Change-Id: I8ec79fe4cea1370cbf433a5c20f5e4cdf5b0d298 --- swift3/controllers/bucket.py | 25 ++- swift3/controllers/obj.py | 30 ++- swift3/controllers/s3_acl.py | 28 +-- swift3/request.py | 101 ++++++++- swift3/subresource.py | 25 ++- swift3/test/unit/test_request.py | 180 +++++++++++++++ swift3/test/unit/test_s3_acl.py | 362 +++++++++++++++++++++++++++++-- 7 files changed, 696 insertions(+), 55 deletions(-) create mode 100644 swift3/test/unit/test_request.py diff --git a/swift3/controllers/bucket.py b/swift3/controllers/bucket.py index ff35f526..b5bbc328 100644 --- a/swift3/controllers/bucket.py +++ b/swift3/controllers/bucket.py @@ -24,6 +24,7 @@ from swift3.etree import Element, SubElement, tostring, fromstring, \ from swift3.response import HTTPOk, S3NotImplemented, InvalidArgument, \ MalformedXML, InvalidLocationConstraint from swift3.cfg import CONF +from swift3.subresource import ACL, Owner from swift3.utils import LOGGER MAX_PUT_BUCKET_BODY_SIZE = 10240 @@ -116,9 +117,6 @@ class BucketController(Controller): """ Handle PUT Bucket request """ - if 'HTTP_X_AMZ_ACL' in req.environ: - handle_acl_header(req) - xml = req.xml(MAX_PUT_BUCKET_BODY_SIZE) if xml: # check location @@ -135,7 +133,26 @@ class BucketController(Controller): # Swift3 cannot support multiple reagions now. raise InvalidLocationConstraint() - resp = req.get_response(self.app) + if CONF.s3_acl: + req_acl = ACL.from_headers(req.headers, + Owner(req.user_id, req.user_id)) + + # To avoid overwriting the existing bucket's ACL, we send PUT + # request first before setting the ACL to make sure that the target + # container does not exist. + resp = req.get_response(self.app) + + # update metadata + req.bucket_acl = req_acl + # FIXME If this request is failed, there is a possibility that the + # bucket which has no ACL is left. + req.get_response(self.app, 'POST') + else: + if 'HTTP_X_AMZ_ACL' in req.environ: + handle_acl_header(req) + + resp = req.get_response(self.app) + resp.status = HTTP_OK resp.location = '/' + req.container_name diff --git a/swift3/controllers/obj.py b/swift3/controllers/obj.py index f23682b8..5cee8ccc 100644 --- a/swift3/controllers/obj.py +++ b/swift3/controllers/obj.py @@ -14,10 +14,13 @@ # limitations under the License. from swift.common.http import HTTP_OK +from swift.common.utils import split_path from swift3.controllers.base import Controller -from swift3.response import AccessDenied, HTTPOk +from swift3.response import AccessDenied, HTTPOk, NoSuchKey from swift3.etree import Element, SubElement, tostring +from swift3.subresource import ACL, Owner +from swift3.cfg import CONF class ObjectController(Controller): @@ -26,6 +29,7 @@ class ObjectController(Controller): """ def GETorHEAD(self, req): resp = req.get_response(self.app) + if req.method == 'HEAD': resp.app_iter = None @@ -53,6 +57,30 @@ class ObjectController(Controller): """ Handle PUT Object and PUT Object (Copy) request """ + if CONF.s3_acl: + if 'HTTP_X_AMZ_COPY_SOURCE' in req.environ: + src_path = req.environ['HTTP_X_AMZ_COPY_SOURCE'] + src_path = src_path if src_path[0] == '/' else ('/' + src_path) + src_bucket, src_obj = split_path(src_path, 0, 2, True) + + req.get_response(self.app, 'HEAD', src_bucket, src_obj, + permission='READ') + + b_resp = req.get_response(self.app, 'HEAD', obj='') + # To avoid overwriting the existing object by unauthorized user, + # we send HEAD request first before writing the object to make + # sure that the target object does not exist or the user that sent + # the PUT request have write permission. + try: + req.get_response(self.app, 'HEAD') + except NoSuchKey: + pass + req_acl = ACL.from_headers(req.headers, + b_resp.bucket_acl.owner, + Owner(req.user_id, req.user_id)) + + req.object_acl = req_acl + resp = req.get_response(self.app) if 'HTTP_X_COPY_FROM' in req.environ: diff --git a/swift3/controllers/s3_acl.py b/swift3/controllers/s3_acl.py index 2d4d3c51..bd8c300c 100644 --- a/swift3/controllers/s3_acl.py +++ b/swift3/controllers/s3_acl.py @@ -27,7 +27,8 @@ def get_acl(headers, body, bucket_owner, object_owner=None): """ Get ACL instance from S3 (e.g. x-amz-grant) headers or S3 acl xml body. """ - acl = ACL.from_headers(headers, bucket_owner, object_owner) + acl = ACL.from_headers(headers, bucket_owner, object_owner, + as_private=False) if acl is None: # Get acl from request body if possible. @@ -65,13 +66,9 @@ class AclController(Controller): """ Handles GET Bucket acl and GET Object acl. """ - resp = req.get_response(self.app, 'HEAD') - if req.is_object_request: - acl = resp.object_acl - else: - acl = resp.bucket_acl - - acl.check_permission(req.user_id, 'READ_ACP') + resp = req.get_response(self.app, 'HEAD', permission='READ_ACP') + acl = getattr(resp, '%s_acl' % + ('object' if req.is_object_request else 'bucket')) resp = HTTPOk() resp.body = tostring(acl.elem()) @@ -83,16 +80,15 @@ class AclController(Controller): Handles PUT Bucket acl and PUT Object acl. """ if req.is_object_request: - b_resp = req.get_response(self.app, 'HEAD', obj='') - o_resp = req.get_response(self.app, 'HEAD') - + b_resp = req.get_response(self.app, 'HEAD', obj='', + skip_check=True) + o_resp = req.get_response(self.app, 'HEAD', permission='WRITE_ACP') req_acl = get_acl(req.headers, req.xml(ACL.max_xml_length), b_resp.bucket_acl.owner, o_resp.object_acl.owner) # Don't change the owner of the resource by PUT acl request. o_resp.object_acl.check_owner(req_acl.owner.id) - o_resp.object_acl.check_permission(req.user_id, 'WRITE_ACP') for g in req_acl.grants: LOGGER.debug('Grant %s %s permission on the object /%s/%s' % @@ -107,22 +103,22 @@ class AclController(Controller): # So headers['X-Copy-From'] for copy request is added here. headers['X-Copy-From'] = quote(src_path) headers['Content-Length'] = 0 - req.get_response(self.app, 'PUT', headers=headers) + req.get_response(self.app, 'PUT', headers=headers, + skip_check=True) else: - resp = req.get_response(self.app, 'HEAD') + resp = req.get_response(self.app, 'HEAD', permission='WRITE_ACP') req_acl = get_acl(req.headers, req.xml(ACL.max_xml_length), resp.bucket_acl.owner) # Don't change the owner of the resource by PUT acl request. resp.bucket_acl.check_owner(req_acl.owner.id) - resp.bucket_acl.check_permission(req.user_id, 'WRITE_ACP') for g in req_acl.grants: LOGGER.debug('Grant %s %s permission on the bucket /%s' % (g.grantee, g.permission, req.container_name)) req.bucket_acl = req_acl - req.get_response(self.app, 'POST') + req.get_response(self.app, 'POST', skip_check=True) return HTTPOk() diff --git a/swift3/request.py b/swift3/request.py index 35b99a62..510dc185 100644 --- a/swift3/request.py +++ b/swift3/request.py @@ -571,18 +571,12 @@ class Request(swob.Request): return code_map[method] - def get_response(self, app, method=None, container=None, obj=None, - body=None, query=None, headers=None): + def _get_response(self, app, method, container, obj, + headers=None, body=None, query=None): """ Calls the application with this request's environment. Returns a Response object that wraps up the application's result. """ - method = method or self.environ['REQUEST_METHOD'] - if container is None: - container = self.container_name - if obj is None: - obj = self.object_name - sw_req = self.to_swift_req(method, container, obj, headers=headers, body=body, query=query) @@ -634,3 +628,94 @@ class Request(swob.Request): raise AccessDenied() raise InternalError('unexpected status code %d' % status) + + def get_response(self, app, method=None, container=None, obj=None, + headers=None, body=None, query=None, permission=None, + skip_check=False): + """ + Calls the application with this request's environment. Returns a + Response object that wraps up the application's result. + """ + sw_method = method or self.environ['REQUEST_METHOD'] + if container is None: + container = self.container_name + if obj is None: + obj = self.object_name + + if CONF.s3_acl and not skip_check: + resource = 'object' if obj else 'container' + s3_method = self.environ['REQUEST_METHOD'] + if not permission and (s3_method, sw_method, resource) in ACL_MAP: + acl_check = ACL_MAP[(s3_method, sw_method, resource)] + resource = acl_check.get('Resource') or resource + permission = acl_check['Permission'] + + if permission: + match_resource = True + if resource == 'object': + resp = self._get_response(app, 'HEAD', container, obj) + acl = resp.object_acl + elif resource == 'container': + resp = self._get_response(app, 'HEAD', container, None) + acl = resp.bucket_acl + if obj: + match_resource = False + + acl.check_permission(self.user_id, permission) + + if sw_method == 'HEAD' and match_resource: + # If the request to swift is HEAD and the resource is + # consistent with the confirmation subject of ACL, not + # request again. This is because that contains the + # information required in the HEAD response at the time of + # ACL acquisition. + return resp + + return self._get_response(app, sw_method, container, obj, + headers, body, query) + +""" +ACL_MAP = + { + ('', '', ''): + {'Resource': '', + 'Permission': ''}, + ... + } + +s3_method: Method of S3 Request from user to swift3 +swift_method: Method of Swift Request from swift3 to swift +swift_resource: Resource of Swift Request from swift3 to swift +check_resource: +check_permission: +""" +ACL_MAP = { + # HEAD Bucket + ('HEAD', 'HEAD', 'container'): + {'Permission': 'READ'}, + # GET Service + ('GET', 'HEAD', 'container'): + {'Permission': 'OWNER'}, + # GET Bucket + ('GET', 'GET', 'container'): + {'Permission': 'READ'}, + # PUT Object, PUT Object Copy + ('PUT', 'HEAD', 'container'): + {'Permission': 'WRITE'}, + # DELETE Bucket + ('DELETE', 'DELETE', 'container'): + {'Permission': 'OWNER'}, + # HEAD Object + ('HEAD', 'HEAD', 'object'): + {'Permission': 'READ'}, + # GET Object + ('GET', 'GET', 'object'): + {'Permission': 'READ'}, + # PUT Object, PUT Object Copy + ('PUT', 'HEAD', 'object'): + {'Permission': 'WRITE'}, + # Delete Object + ('DELETE', 'DELETE', 'object'): + {'Resource': 'container', + 'Permission': 'WRITE'}, +} diff --git a/swift3/subresource.py b/swift3/subresource.py index ef60b89d..3f8eeb75 100644 --- a/swift3/subresource.py +++ b/swift3/subresource.py @@ -437,6 +437,10 @@ class ACL(object): """ Check that the user is an owner. """ + if not CONF.s3_acl: + # Ignore Swift3 ACL. + return + if not self.owner.id: if CONF.allow_no_owner: # No owner means public. @@ -450,6 +454,10 @@ class ACL(object): """ Check that the user has a permission. """ + if not CONF.s3_acl: + # Ignore Swift3 ACL. + return + try: # owners have full control permission self.check_owner(user_id) @@ -457,15 +465,17 @@ class ACL(object): except AccessDenied: pass - for g in self.grants: - if g.allow(user_id, 'FULL_CONTROL') or \ - g.allow(user_id, permission): - return + if permission in PERMISSIONS: + for g in self.grants: + if g.allow(user_id, 'FULL_CONTROL') or \ + g.allow(user_id, permission): + return raise AccessDenied() @classmethod - def from_headers(cls, headers, bucket_owner, object_owner=None): + def from_headers(cls, headers, bucket_owner, object_owner=None, + as_private=True): """ Convert HTTP headers to an ACL instance. """ @@ -495,7 +505,10 @@ class ACL(object): if len(grants) == 0: # No ACL headers - return None + if as_private: + return ACLPrivate(bucket_owner, object_owner) + else: + return None return cls(object_owner or bucket_owner, grants) diff --git a/swift3/test/unit/test_request.py b/swift3/test/unit/test_request.py new file mode 100644 index 00000000..90ebde5c --- /dev/null +++ b/swift3/test/unit/test_request.py @@ -0,0 +1,180 @@ +# Copyright (c) 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import nested +from mock import patch +import unittest + +from swift.common.swob import Request + +from swift3.subresource import ACL, User, Owner, Grant +from swift3.test.unit.test_middleware import Swift3TestCase +from swift3.cfg import CONF +from swift3.request import Request as S3_Request + + +Fake_ACL_MAP = { + # HEAD Bucket + ('HEAD', 'HEAD', 'container'): + {'Resource': 'container', + 'Permission': 'READ'}, + # GET Bucket + ('GET', 'GET', 'container'): + {'Resource': 'container', + 'Permission': 'READ'}, + # HEAD Object + ('HEAD', 'HEAD', 'object'): + {'Resource': 'object', + 'Permission': 'READ'}, + # GET Object + ('GET', 'GET', 'object'): + {'Resource': 'object', + 'Permission': 'READ'}, +} + + +def _gen_test_acl(owner, permission=None, grantee=None): + if permission is None: + return ACL(owner, []) + + if grantee is None: + grantee = User('test:tester') + return ACL(owner, [Grant(grantee, permission)]) + + +class FakeResponse(object): + def __init__(self, s3_acl): + self.bucket_acl = None + self.object_acl = None + if s3_acl: + owner = Owner(id='test:tester', name='test:tester') + self.bucket_acl = _gen_test_acl(owner, 'FULL_CONTROL') + self.object_acl = _gen_test_acl(owner, 'FULL_CONTROL') + + +class TestRequest(Swift3TestCase): + + def setUp(self): + super(TestRequest, self).setUp() + CONF.s3_acl = True + + def tearDown(self): + CONF.s3_acl = False + + @patch('swift3.request.ACL_MAP', Fake_ACL_MAP) + def _test_get_response(self, method, container='bucket', obj=None, + permission=None, skip_check=False): + path = '/' + container + ('/' + obj if obj else '') + req = Request.blank(path, + environ={'REQUEST_METHOD': method}, + headers={'Authorization': 'AWS test:tester:hmac'}) + s3_req = S3_Request(req.environ) + + with nested(patch('swift3.request.Request._get_response'), + patch('swift3.subresource.ACL.check_permission')) \ + as (mock_get_resp, m_check_permission): + mock_get_resp.return_value = FakeResponse(CONF.s3_acl) + return mock_get_resp, m_check_permission,\ + s3_req.get_response(self.swift3, permission=permission, + skip_check=skip_check) + + def test_get_response_without_s3_acl(self): + CONF.s3_acl = False + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('HEAD') + CONF.s3_acl = True + self.assertTrue(s3_resp.bucket_acl is None) + self.assertTrue(s3_resp.object_acl is None) + self.assertEqual(mock_get_resp.call_count, 1) + self.assertEqual(m_check_permission.call_count, 0) + + def test_get_response_without_check_permission(self): + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('HEAD', skip_check=True) + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 1) + self.assertEqual(m_check_permission.call_count, 0) + + def test_get_response_with_permission_specified(self): + obj = 'object' + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('GET', obj=obj, + permission='READ_ACP') + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 2) + args, kargs = mock_get_resp.call_args_list[0] + get_resp_obj = args[3] + self.assertEqual(get_resp_obj, obj) + self.assertEqual(m_check_permission.call_count, 1) + args, kargs = m_check_permission.call_args + permission = args[1] + self.assertEqual(permission, 'READ_ACP') + + def test_get_response_without_match_ACL_MAP(self): + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('POST') + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 1) + self.assertEqual(m_check_permission.call_count, 0) + + def test_get_response_without_duplication_HEAD_request(self): + obj = 'object' + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('HEAD', obj=obj) + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 1) + args, kargs = mock_get_resp.call_args_list[0] + get_resp_obj = args[3] + self.assertEqual(get_resp_obj, obj) + self.assertEqual(m_check_permission.call_count, 1) + args, kargs = m_check_permission.call_args + permission = args[1] + self.assertEqual(permission, 'READ') + + def test_get_response_with_check_object_permission(self): + obj = 'object' + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('GET', obj=obj) + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 2) + args, kargs = mock_get_resp.call_args_list[0] + get_resp_obj = args[3] + self.assertEqual(get_resp_obj, obj) + self.assertEqual(m_check_permission.call_count, 1) + args, kargs = m_check_permission.call_args + permission = args[1] + self.assertEqual(permission, 'READ') + + def test_get_response_with_check_container_permission(self): + mock_get_resp, m_check_permission, s3_resp = \ + self._test_get_response('GET') + self.assertTrue(s3_resp.bucket_acl is not None) + self.assertTrue(s3_resp.object_acl is not None) + self.assertEqual(mock_get_resp.call_count, 2) + args, kargs = mock_get_resp.call_args_list[0] + get_resp_obj = args[3] + self.assertTrue(get_resp_obj is None) + self.assertEqual(m_check_permission.call_count, 1) + args, kargs = m_check_permission.call_args + permission = args[1] + self.assertEqual(permission, 'READ') + +if __name__ == '__main__': + unittest.main() diff --git a/swift3/test/unit/test_s3_acl.py b/swift3/test/unit/test_s3_acl.py index b815393d..8d6581ce 100644 --- a/swift3/test/unit/test_s3_acl.py +++ b/swift3/test/unit/test_s3_acl.py @@ -14,26 +14,28 @@ # limitations under the License. import unittest +import simplejson as json from swift.common import swob from swift.common.swob import Request from swift3.etree import tostring, Element, SubElement from swift3.subresource import ACL, ACLPrivate, User, encode_acl, \ - Owner, Grant + decode_acl, AuthenticatedUsers, AllUsers, Owner, Grant from swift3.test.unit.test_middleware import Swift3TestCase from swift3.cfg import CONF XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance' -def _gen_test_acl(owner, permission=None, grantee=None): +def _gen_test_headers(owner, permission=None, grantee=None, + resource='container'): if permission is None: - return ACL(owner, []) + return encode_acl(resource, ACL(owner, [])) if grantee is None: grantee = User('test:tester') - return ACL(owner, [Grant(grantee, permission)]) + return encode_acl(resource, ACL(owner, [Grant(grantee, permission)])) def _make_xml(grantee): @@ -51,6 +53,12 @@ def _make_xml(grantee): class TestSwift3S3Acl(Swift3TestCase): + """ + This class has been tested in the following Controller. + [S3AclController] + [BucketController] Case: Conf.s3_acl == True + [Object Controller] Case: Conf.s3_acl == True + """ def setUp(self): super(TestSwift3S3Acl, self).setUp() @@ -85,6 +93,9 @@ class TestSwift3S3Acl(Swift3TestCase): def tearDown(self): CONF.s3_acl = False + """ + [S3AclController] + """ def test_bucket_acl_PUT_with_other_owner(self): req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'PUT'}, @@ -298,11 +309,10 @@ class TestSwift3S3Acl(Swift3TestCase): def _test_bucket_acl_GET(self, owner, permission): owner = Owner(id=owner, name=owner) - acl = _gen_test_acl(owner, permission) + headers = _gen_test_headers(owner, permission) self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent, - encode_acl('container', acl), - None) + headers, None) req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'GET'}, headers={'Authorization': 'AWS test:tester:hmac'}) @@ -313,14 +323,28 @@ class TestSwift3S3Acl(Swift3TestCase): status, headers, body = self._test_bucket_acl_GET('test:other', None) self.assertEquals(self._get_error_code(body), 'AccessDenied') - def test_bucket_GET_with_owner_permission(self): + def test_bucket_acl_GET_with_read_acp_permission(self): + status, headers, body = self._test_bucket_acl_GET('test:other', + 'READ_ACP') + self.assertEquals(status.split()[0], '200') + + def test_bucket_acl_GET_with_fullcontrol_permission(self): + status, headers, body = self._test_bucket_acl_GET('test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + + def test_bucket_acl_GET_with_owner_permission(self): status, headers, body = self._test_bucket_acl_GET('test:tester', None) self.assertEquals(status.split()[0], '200') - def _test_bucket_acl_PUT(self, owner, permission): + def _test_bucket_acl_PUT(self, owner, permission, grantee='test:tester'): owner = Owner(id=owner, name=owner) - acl = _gen_test_acl(owner, permission) + grantee = User(grantee) + headers = _gen_test_headers(owner, permission, grantee) + acl = decode_acl('container', headers) + self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent, + headers, None) req = Request.blank('/bucket?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac'}, @@ -329,21 +353,34 @@ class TestSwift3S3Acl(Swift3TestCase): return self.call_swift3(req) def test_bucket_acl_PUT_without_permission(self): - status, headers, body = self._test_bucket_acl_PUT('test:other', None) + status, headers, body = self._test_bucket_acl_PUT('test:other', None, + 'test:other') self.assertEquals(self._get_error_code(body), 'AccessDenied') + def test_bucket_acl_PUT_with_write_acp_permission(self): + + status, headers, body = self._test_bucket_acl_PUT('test:other', + 'WRITE_ACP') + self.assertEquals(status.split()[0], '200') + + def test_bucket_acl_PUT_with_fullcontrol_permission(self): + + status, headers, body = self._test_bucket_acl_PUT('test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + def test_bucket_acl_PUT_with_owner_permission(self): - status, headers, body = self._test_bucket_acl_PUT('test:tester', None) + + status, headers, body = self._test_bucket_acl_PUT('test:tester', + 'FULL_CONTROL') self.assertEquals(status.split()[0], '200') def _test_object_acl_GET(self, owner, permission): owner = Owner(id=owner, name=owner) - acl = _gen_test_acl(owner, permission) + headers = _gen_test_headers(owner, permission, resource='object') self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', - swob.HTTPOk, - encode_acl('object', acl), - None) + swob.HTTPOk, headers, None) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'GET'}, headers={'Authorization': 'AWS test:tester:hmac'}) @@ -354,18 +391,27 @@ class TestSwift3S3Acl(Swift3TestCase): status, headers, body = self._test_object_acl_GET('test:other', None) self.assertEquals(self._get_error_code(body), 'AccessDenied') + def test_object_acl_GET_with_read_acp_permission(self): + status, headers, body = self._test_object_acl_GET('test:other', + 'READ_ACP') + self.assertEquals(status.split()[0], '200') + + def test_object_acl_GET_with_fullcontrol_permission(self): + status, headers, body = self._test_object_acl_GET('test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + def test_object_acl_GET_with_owner_permission(self): status, headers, body = self._test_object_acl_GET('test:tester', None) self.assertEquals(status.split()[0], '200') def _test_object_acl_PUT(self, owner, permission): owner = Owner(id=owner, name=owner) - acl = _gen_test_acl(owner, permission) + headers = _gen_test_headers(owner, permission, resource='object') + acl = decode_acl('object', headers) self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', - swob.HTTPOk, - encode_acl('object', acl), - None) + swob.HTTPOk, headers, None) req = Request.blank('/bucket/object?acl', environ={'REQUEST_METHOD': 'PUT'}, headers={'Authorization': 'AWS test:tester:hmac'}, @@ -377,9 +423,285 @@ class TestSwift3S3Acl(Swift3TestCase): status, headers, body = self._test_object_acl_PUT('test:other', None) self.assertEquals(self._get_error_code(body), 'AccessDenied') + def test_object_acl_PUT_with_write_acp_permission(self): + status, headers, body = self._test_object_acl_PUT('test:other', + 'WRITE_ACP') + self.assertEquals(status.split()[0], '200') + + def test_object_acl_PUT_with_fullcontrol_permission(self): + status, headers, body = self._test_object_acl_PUT('test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + def test_object_acl_PUT_with_owner_permission(self): status, headers, body = self._test_object_acl_PUT('test:tester', None) self.assertEquals(status.split()[0], '200') + """ + [BucketController] Case: Conf.s3_acl == True + """ + def _test_bucket(self, method, owner, permission): + owner = Owner(id=owner, name=owner) + headers = _gen_test_headers(owner, permission) + + self.swift.register('HEAD', '/v1/AUTH_test/acltest', + swob.HTTPNoContent, headers, None) + self.swift.register('GET', '/v1/AUTH_test/acltest', swob.HTTPNoContent, + headers, json.dumps([])) + self.swift.register('POST', '/v1/AUTH_test/acltest', + swob.HTTPNoContent, {}, None) + self.swift.register('DELETE', '/v1/AUTH_test/acltest', + swob.HTTPNoContent, {}, None) + + req = Request.blank('/acltest', + environ={'REQUEST_METHOD': method}, + headers={'Authorization': 'AWS test:tester:hmac'}) + + return self.call_swift3(req) + + def test_bucket_GET_without_permission(self): + status, headers, body = self._test_bucket('GET', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_bucket_GET_with_read_permission(self): + status, headers, body = self._test_bucket('GET', 'test:other', 'READ') + self.assertEquals(status.split()[0], '200') + + def test_bucket_GET_with_fullcontrol_permission(self): + status, headers, body = self._test_bucket('GET', 'test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + + def test_bucket_GET_with_owner_permission(self): + status, headers, body = self._test_bucket('GET', 'test:tester', None) + self.assertEquals(status.split()[0], '200') + + def _test_bucket_GET_canned_acl(self, group): + owner = Owner(id='test:other', name='test:other') + headers = _gen_test_headers(owner, 'READ', group) + self.swift.register('GET', '/v1/AUTH_test/acltest', swob.HTTPNoContent, + headers, json.dumps([])) + req = Request.blank('/acltest', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Authorization': 'AWS test:tester:hmac'}) + + return self.call_swift3(req) + + def test_bucket_GET_authenticated_users(self): + status, headers, body = \ + self._test_bucket_GET_canned_acl(AuthenticatedUsers()) + self.assertEquals(status.split()[0], '200') + + def test_bucket_GET_all_users(self): + status, headers, body = self._test_bucket_GET_canned_acl(AllUsers()) + self.assertEquals(status.split()[0], '200') + + def test_bucket_PUT_with_already_exist(self): + self.swift.register('PUT', '/v1/AUTH_test/acltest', + swob.HTTPAccepted, {}, None) + status, headers, body = self._test_bucket('PUT', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'BucketAlreadyExists') + + def test_bucket_PUT(self): + self.swift.register('PUT', '/v1/AUTH_test/acltest', + swob.HTTPCreated, {}, None) + status, headers, body = self._test_bucket('PUT', 'test:other', 'WRITE') + self.assertEquals(status.split()[0], '200') + + def test_bucket_DELETE_without_permission(self): + status, headers, body = self._test_bucket('DELETE', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_bucket_DELETE_with_write_permission(self): + status, headers, body = self._test_bucket('DELETE', 'test:other', + 'WRITE') + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_bucket_DELETE_with_fullcontrol_permission(self): + status, headers, body = self._test_bucket('DELETE', 'test:other', + 'FULL_CONTROL') + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_bucket_DELETE_with_owner_permission(self): + status, headers, body = self._test_bucket('DELETE', 'test:tester', + None) + self.assertEquals(status.split()[0], '204') + + """ + [Object Controller] Case: Conf.s3_acl == True + """ + def _test_object(self, method, o_owner, o_permission, + grantee='test:tester', c_owner='test:tester', + c_permission='FULL_CONTROL', existObject=True): + c_owner = Owner(id=c_owner, name=c_owner) + o_owner = Owner(id=o_owner, name=o_owner) + grantee = User(grantee) + c_headers = _gen_test_headers(c_owner, c_permission, grantee) + o_headers = _gen_test_headers(o_owner, o_permission, grantee, 'object') + + self.swift.register('HEAD', '/v1/AUTH_test/bucket', + swob.HTTPNoContent, c_headers, None) + if existObject: + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, o_headers, None) + else: + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPNotFound, {}, None) + self.swift.register('GET', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, o_headers, '') + self.swift.register('PUT', '/v1/AUTH_test/bucket/object', + swob.HTTPCreated, {}, None) + self.swift.register('DELETE', '/v1/AUTH_test/bucket/object', + swob.HTTPNoContent, {}, None) + + req = Request.blank('/bucket/object', + environ={'REQUEST_METHOD': method}, + headers={'Authorization': 'AWS test:tester:hmac'}) + return self.call_swift3(req) + + def test_object_GET_without_permission(self): + status, headers, body = self._test_object('GET', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_object_GET_with_read_permission(self): + status, headers, body = self._test_object('GET', 'test:other', 'READ') + self.assertEquals(status.split()[0], '200') + + def test_object_GET_with_fullcontrol_permission(self): + status, headers, body = self._test_object('GET', 'test:other', + 'FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + + def test_object_GET_with_owner_permission(self): + status, headers, body = self._test_object('GET', 'test:tester', None) + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_without_permission(self): + status, headers, body = self._test_object('PUT', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_object_PUT_with_write_permission(self): + status, headers, body = self._test_object('PUT', 'test:other', 'WRITE', + existObject=False) + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_with_fullcontrol_permission(self): + status, headers, body = self._test_object('PUT', 'test:other', + 'FULL_CONTROL', + existObject=False) + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_with_owner_permission(self): + status, headers, body = self._test_object('PUT', 'test:tester', None, + existObject=False) + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_without_overwriting_permission(self): + status, headers, body = self._test_object('PUT', 'test:other', None) + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_object_PUT_with_overwriting_permission(self): + status, headers, body = self._test_object('PUT', 'test:other', + 'WRITE') + self.assertEquals(status.split()[0], '200') + + def test_object_DELETE_without_permission(self): + status, headers, body = self._test_object('DELETE', 'test:other', None, + c_owner='test:other', + c_permission='READ') + self.assertEquals(self._get_error_code(body), 'AccessDenied') + + def test_object_DELETE_with_write_permission(self): + status, headers, body = self._test_object('DELETE', 'test:tester', + None, + c_owner='test:other', + c_permission='WRITE') + self.assertEquals(status.split()[0], '204') + + def test_object_DELETE_with_fullcontrol_permission(self): + status, headers, body = self._test_object('DELETE', 'test:tester', + None, + c_owner='test:other') + self.assertEquals(status.split()[0], '204') + + def test_object_DELETE_with_owner_permission(self): + status, headers, body = self._test_object('DELETE', 'test:tester', + None) + self.assertEquals(status.split()[0], '204') + + def _test_object_copy(self, src_owner='test:tester', src_permission=None, + dst_c_owner='test:tester', dst_c_permission=None, + dst_c_grantee=None, dst_o_owner='test:tester', + dst_o_permission=None, dst_o_grantee=None): + + src_o_headers = _gen_test_headers(Owner(id=src_owner, name=src_owner), + src_permission, None, 'object') + self.swift.register('HEAD', '/v1/AUTH_test/src_bucket/src_obj', + swob.HTTPOk, src_o_headers, None) + + dst_c_headers = _gen_test_headers(Owner(id=dst_c_owner, + name=dst_c_owner), + dst_c_permission, dst_c_grantee) + self.swift.register('HEAD', '/v1/AUTH_test/dst_bucket', + swob.HTTPNoContent, dst_c_headers, None) + + dst_o_headers = _gen_test_headers(Owner(id=dst_o_owner, + name=dst_o_owner), + dst_o_permission, dst_o_grantee, + 'object') + self.swift.register('HEAD', '/v1/AUTH_test/dst_bucket/dst_obj', + swob.HTTPOk, dst_o_headers, None) + + self.swift.register('PUT', '/v1/AUTH_test/dst_bucket/dst_obj', + swob.HTTPCreated, {}, None) + + req = Request.blank( + '/dst_bucket/dst_obj', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'X-Amz-Copy-Source': '/src_bucket/src_obj'}) + + return self.call_swift3(req) + + def test_object_PUT_copy_with_owner_permission(self): + status, headers, body = \ + self._test_object_copy() + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_copy_with_fullcontrol_permission(self): + status, headers, body = \ + self._test_object_copy(src_owner='test:other', + src_permission='FULL_CONTROL', + dst_c_owner='test:other', + dst_c_permission='FULL_CONTROL', + dst_o_owner='test:other', + dst_o_permission='FULL_CONTROL') + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_copy_with_grantee_permission(self): + status, headers, body = \ + self._test_object_copy(src_owner='test:other', + src_permission='READ', + dst_c_owner='test:other', + dst_c_permission='WRITE', + dst_o_owner='test:other', + dst_o_permission='WRITE') + self.assertEquals(status.split()[0], '200') + + def test_object_PUT_copy_without_src_obj_permission(self): + status, headers, body = \ + self._test_object_copy(src_owner='test:other') + self.assertEquals(status.split()[0], '403') + + def test_object_PUT_copy_without_dst_container_permission(self): + status, headers, body = \ + self._test_object_copy(dst_c_owner='test:other') + self.assertEquals(status.split()[0], '403') + + def test_object_PUT_copy_without_dst_obj_permission(self): + status, headers, body = \ + self._test_object_copy(dst_o_owner='test:other') + self.assertEquals(status.split()[0], '403') + if __name__ == '__main__': unittest.main()