acl: use S3 ACL for object and bucket APIs

Change-Id: I8ec79fe4cea1370cbf433a5c20f5e4cdf5b0d298
This commit is contained in:
Masaki Tsukuda
2014-11-27 17:05:58 +09:00
parent 12efe66170
commit fef3260b53
7 changed files with 696 additions and 55 deletions

View File

@@ -24,6 +24,7 @@ from swift3.etree import Element, SubElement, tostring, fromstring, \
from swift3.response import HTTPOk, S3NotImplemented, InvalidArgument, \
MalformedXML, InvalidLocationConstraint
from swift3.cfg import CONF
from swift3.subresource import ACL, Owner
from swift3.utils import LOGGER
MAX_PUT_BUCKET_BODY_SIZE = 10240
@@ -116,9 +117,6 @@ class BucketController(Controller):
"""
Handle PUT Bucket request
"""
if 'HTTP_X_AMZ_ACL' in req.environ:
handle_acl_header(req)
xml = req.xml(MAX_PUT_BUCKET_BODY_SIZE)
if xml:
# check location
@@ -135,7 +133,26 @@ class BucketController(Controller):
# Swift3 cannot support multiple reagions now.
raise InvalidLocationConstraint()
resp = req.get_response(self.app)
if CONF.s3_acl:
req_acl = ACL.from_headers(req.headers,
Owner(req.user_id, req.user_id))
# To avoid overwriting the existing bucket's ACL, we send PUT
# request first before setting the ACL to make sure that the target
# container does not exist.
resp = req.get_response(self.app)
# update metadata
req.bucket_acl = req_acl
# FIXME If this request is failed, there is a possibility that the
# bucket which has no ACL is left.
req.get_response(self.app, 'POST')
else:
if 'HTTP_X_AMZ_ACL' in req.environ:
handle_acl_header(req)
resp = req.get_response(self.app)
resp.status = HTTP_OK
resp.location = '/' + req.container_name

View File

@@ -14,10 +14,13 @@
# limitations under the License.
from swift.common.http import HTTP_OK
from swift.common.utils import split_path
from swift3.controllers.base import Controller
from swift3.response import AccessDenied, HTTPOk
from swift3.response import AccessDenied, HTTPOk, NoSuchKey
from swift3.etree import Element, SubElement, tostring
from swift3.subresource import ACL, Owner
from swift3.cfg import CONF
class ObjectController(Controller):
@@ -26,6 +29,7 @@ class ObjectController(Controller):
"""
def GETorHEAD(self, req):
resp = req.get_response(self.app)
if req.method == 'HEAD':
resp.app_iter = None
@@ -53,6 +57,30 @@ class ObjectController(Controller):
"""
Handle PUT Object and PUT Object (Copy) request
"""
if CONF.s3_acl:
if 'HTTP_X_AMZ_COPY_SOURCE' in req.environ:
src_path = req.environ['HTTP_X_AMZ_COPY_SOURCE']
src_path = src_path if src_path[0] == '/' else ('/' + src_path)
src_bucket, src_obj = split_path(src_path, 0, 2, True)
req.get_response(self.app, 'HEAD', src_bucket, src_obj,
permission='READ')
b_resp = req.get_response(self.app, 'HEAD', obj='')
# To avoid overwriting the existing object by unauthorized user,
# we send HEAD request first before writing the object to make
# sure that the target object does not exist or the user that sent
# the PUT request have write permission.
try:
req.get_response(self.app, 'HEAD')
except NoSuchKey:
pass
req_acl = ACL.from_headers(req.headers,
b_resp.bucket_acl.owner,
Owner(req.user_id, req.user_id))
req.object_acl = req_acl
resp = req.get_response(self.app)
if 'HTTP_X_COPY_FROM' in req.environ:

View File

@@ -27,7 +27,8 @@ def get_acl(headers, body, bucket_owner, object_owner=None):
"""
Get ACL instance from S3 (e.g. x-amz-grant) headers or S3 acl xml body.
"""
acl = ACL.from_headers(headers, bucket_owner, object_owner)
acl = ACL.from_headers(headers, bucket_owner, object_owner,
as_private=False)
if acl is None:
# Get acl from request body if possible.
@@ -65,13 +66,9 @@ class AclController(Controller):
"""
Handles GET Bucket acl and GET Object acl.
"""
resp = req.get_response(self.app, 'HEAD')
if req.is_object_request:
acl = resp.object_acl
else:
acl = resp.bucket_acl
acl.check_permission(req.user_id, 'READ_ACP')
resp = req.get_response(self.app, 'HEAD', permission='READ_ACP')
acl = getattr(resp, '%s_acl' %
('object' if req.is_object_request else 'bucket'))
resp = HTTPOk()
resp.body = tostring(acl.elem())
@@ -83,16 +80,15 @@ class AclController(Controller):
Handles PUT Bucket acl and PUT Object acl.
"""
if req.is_object_request:
b_resp = req.get_response(self.app, 'HEAD', obj='')
o_resp = req.get_response(self.app, 'HEAD')
b_resp = req.get_response(self.app, 'HEAD', obj='',
skip_check=True)
o_resp = req.get_response(self.app, 'HEAD', permission='WRITE_ACP')
req_acl = get_acl(req.headers, req.xml(ACL.max_xml_length),
b_resp.bucket_acl.owner,
o_resp.object_acl.owner)
# Don't change the owner of the resource by PUT acl request.
o_resp.object_acl.check_owner(req_acl.owner.id)
o_resp.object_acl.check_permission(req.user_id, 'WRITE_ACP')
for g in req_acl.grants:
LOGGER.debug('Grant %s %s permission on the object /%s/%s' %
@@ -107,22 +103,22 @@ class AclController(Controller):
# So headers['X-Copy-From'] for copy request is added here.
headers['X-Copy-From'] = quote(src_path)
headers['Content-Length'] = 0
req.get_response(self.app, 'PUT', headers=headers)
req.get_response(self.app, 'PUT', headers=headers,
skip_check=True)
else:
resp = req.get_response(self.app, 'HEAD')
resp = req.get_response(self.app, 'HEAD', permission='WRITE_ACP')
req_acl = get_acl(req.headers, req.xml(ACL.max_xml_length),
resp.bucket_acl.owner)
# Don't change the owner of the resource by PUT acl request.
resp.bucket_acl.check_owner(req_acl.owner.id)
resp.bucket_acl.check_permission(req.user_id, 'WRITE_ACP')
for g in req_acl.grants:
LOGGER.debug('Grant %s %s permission on the bucket /%s' %
(g.grantee, g.permission, req.container_name))
req.bucket_acl = req_acl
req.get_response(self.app, 'POST')
req.get_response(self.app, 'POST', skip_check=True)
return HTTPOk()

View File

@@ -571,18 +571,12 @@ class Request(swob.Request):
return code_map[method]
def get_response(self, app, method=None, container=None, obj=None,
body=None, query=None, headers=None):
def _get_response(self, app, method, container, obj,
headers=None, body=None, query=None):
"""
Calls the application with this request's environment. Returns a
Response object that wraps up the application's result.
"""
method = method or self.environ['REQUEST_METHOD']
if container is None:
container = self.container_name
if obj is None:
obj = self.object_name
sw_req = self.to_swift_req(method, container, obj, headers=headers,
body=body, query=query)
@@ -634,3 +628,94 @@ class Request(swob.Request):
raise AccessDenied()
raise InternalError('unexpected status code %d' % status)
def get_response(self, app, method=None, container=None, obj=None,
headers=None, body=None, query=None, permission=None,
skip_check=False):
"""
Calls the application with this request's environment. Returns a
Response object that wraps up the application's result.
"""
sw_method = method or self.environ['REQUEST_METHOD']
if container is None:
container = self.container_name
if obj is None:
obj = self.object_name
if CONF.s3_acl and not skip_check:
resource = 'object' if obj else 'container'
s3_method = self.environ['REQUEST_METHOD']
if not permission and (s3_method, sw_method, resource) in ACL_MAP:
acl_check = ACL_MAP[(s3_method, sw_method, resource)]
resource = acl_check.get('Resource') or resource
permission = acl_check['Permission']
if permission:
match_resource = True
if resource == 'object':
resp = self._get_response(app, 'HEAD', container, obj)
acl = resp.object_acl
elif resource == 'container':
resp = self._get_response(app, 'HEAD', container, None)
acl = resp.bucket_acl
if obj:
match_resource = False
acl.check_permission(self.user_id, permission)
if sw_method == 'HEAD' and match_resource:
# If the request to swift is HEAD and the resource is
# consistent with the confirmation subject of ACL, not
# request again. This is because that contains the
# information required in the HEAD response at the time of
# ACL acquisition.
return resp
return self._get_response(app, sw_method, container, obj,
headers, body, query)
"""
ACL_MAP =
{
('<s3_method>', '<swift_method>', '<swift_resource>'):
{'Resource': '<check_resource>',
'Permission': '<check_permission>'},
...
}
s3_method: Method of S3 Request from user to swift3
swift_method: Method of Swift Request from swift3 to swift
swift_resource: Resource of Swift Request from swift3 to swift
check_resource: <container/object>
check_permission: <OWNER/READ/WRITE/READ_ACP/WRITE_ACP>
"""
ACL_MAP = {
# HEAD Bucket
('HEAD', 'HEAD', 'container'):
{'Permission': 'READ'},
# GET Service
('GET', 'HEAD', 'container'):
{'Permission': 'OWNER'},
# GET Bucket
('GET', 'GET', 'container'):
{'Permission': 'READ'},
# PUT Object, PUT Object Copy
('PUT', 'HEAD', 'container'):
{'Permission': 'WRITE'},
# DELETE Bucket
('DELETE', 'DELETE', 'container'):
{'Permission': 'OWNER'},
# HEAD Object
('HEAD', 'HEAD', 'object'):
{'Permission': 'READ'},
# GET Object
('GET', 'GET', 'object'):
{'Permission': 'READ'},
# PUT Object, PUT Object Copy
('PUT', 'HEAD', 'object'):
{'Permission': 'WRITE'},
# Delete Object
('DELETE', 'DELETE', 'object'):
{'Resource': 'container',
'Permission': 'WRITE'},
}

View File

@@ -437,6 +437,10 @@ class ACL(object):
"""
Check that the user is an owner.
"""
if not CONF.s3_acl:
# Ignore Swift3 ACL.
return
if not self.owner.id:
if CONF.allow_no_owner:
# No owner means public.
@@ -450,6 +454,10 @@ class ACL(object):
"""
Check that the user has a permission.
"""
if not CONF.s3_acl:
# Ignore Swift3 ACL.
return
try:
# owners have full control permission
self.check_owner(user_id)
@@ -457,15 +465,17 @@ class ACL(object):
except AccessDenied:
pass
for g in self.grants:
if g.allow(user_id, 'FULL_CONTROL') or \
g.allow(user_id, permission):
return
if permission in PERMISSIONS:
for g in self.grants:
if g.allow(user_id, 'FULL_CONTROL') or \
g.allow(user_id, permission):
return
raise AccessDenied()
@classmethod
def from_headers(cls, headers, bucket_owner, object_owner=None):
def from_headers(cls, headers, bucket_owner, object_owner=None,
as_private=True):
"""
Convert HTTP headers to an ACL instance.
"""
@@ -495,7 +505,10 @@ class ACL(object):
if len(grants) == 0:
# No ACL headers
return None
if as_private:
return ACLPrivate(bucket_owner, object_owner)
else:
return None
return cls(object_owner or bucket_owner, grants)

View File

@@ -0,0 +1,180 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import nested
from mock import patch
import unittest
from swift.common.swob import Request
from swift3.subresource import ACL, User, Owner, Grant
from swift3.test.unit.test_middleware import Swift3TestCase
from swift3.cfg import CONF
from swift3.request import Request as S3_Request
Fake_ACL_MAP = {
# HEAD Bucket
('HEAD', 'HEAD', 'container'):
{'Resource': 'container',
'Permission': 'READ'},
# GET Bucket
('GET', 'GET', 'container'):
{'Resource': 'container',
'Permission': 'READ'},
# HEAD Object
('HEAD', 'HEAD', 'object'):
{'Resource': 'object',
'Permission': 'READ'},
# GET Object
('GET', 'GET', 'object'):
{'Resource': 'object',
'Permission': 'READ'},
}
def _gen_test_acl(owner, permission=None, grantee=None):
if permission is None:
return ACL(owner, [])
if grantee is None:
grantee = User('test:tester')
return ACL(owner, [Grant(grantee, permission)])
class FakeResponse(object):
def __init__(self, s3_acl):
self.bucket_acl = None
self.object_acl = None
if s3_acl:
owner = Owner(id='test:tester', name='test:tester')
self.bucket_acl = _gen_test_acl(owner, 'FULL_CONTROL')
self.object_acl = _gen_test_acl(owner, 'FULL_CONTROL')
class TestRequest(Swift3TestCase):
def setUp(self):
super(TestRequest, self).setUp()
CONF.s3_acl = True
def tearDown(self):
CONF.s3_acl = False
@patch('swift3.request.ACL_MAP', Fake_ACL_MAP)
def _test_get_response(self, method, container='bucket', obj=None,
permission=None, skip_check=False):
path = '/' + container + ('/' + obj if obj else '')
req = Request.blank(path,
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac'})
s3_req = S3_Request(req.environ)
with nested(patch('swift3.request.Request._get_response'),
patch('swift3.subresource.ACL.check_permission')) \
as (mock_get_resp, m_check_permission):
mock_get_resp.return_value = FakeResponse(CONF.s3_acl)
return mock_get_resp, m_check_permission,\
s3_req.get_response(self.swift3, permission=permission,
skip_check=skip_check)
def test_get_response_without_s3_acl(self):
CONF.s3_acl = False
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD')
CONF.s3_acl = True
self.assertTrue(s3_resp.bucket_acl is None)
self.assertTrue(s3_resp.object_acl is None)
self.assertEqual(mock_get_resp.call_count, 1)
self.assertEqual(m_check_permission.call_count, 0)
def test_get_response_without_check_permission(self):
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD', skip_check=True)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 1)
self.assertEqual(m_check_permission.call_count, 0)
def test_get_response_with_permission_specified(self):
obj = 'object'
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('GET', obj=obj,
permission='READ_ACP')
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 2)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertEqual(get_resp_obj, obj)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ_ACP')
def test_get_response_without_match_ACL_MAP(self):
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('POST')
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 1)
self.assertEqual(m_check_permission.call_count, 0)
def test_get_response_without_duplication_HEAD_request(self):
obj = 'object'
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD', obj=obj)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 1)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertEqual(get_resp_obj, obj)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
def test_get_response_with_check_object_permission(self):
obj = 'object'
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('GET', obj=obj)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 2)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertEqual(get_resp_obj, obj)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
def test_get_response_with_check_container_permission(self):
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('GET')
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 2)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertTrue(get_resp_obj is None)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
if __name__ == '__main__':
unittest.main()

View File

@@ -14,26 +14,28 @@
# limitations under the License.
import unittest
import simplejson as json
from swift.common import swob
from swift.common.swob import Request
from swift3.etree import tostring, Element, SubElement
from swift3.subresource import ACL, ACLPrivate, User, encode_acl, \
Owner, Grant
decode_acl, AuthenticatedUsers, AllUsers, Owner, Grant
from swift3.test.unit.test_middleware import Swift3TestCase
from swift3.cfg import CONF
XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
def _gen_test_acl(owner, permission=None, grantee=None):
def _gen_test_headers(owner, permission=None, grantee=None,
resource='container'):
if permission is None:
return ACL(owner, [])
return encode_acl(resource, ACL(owner, []))
if grantee is None:
grantee = User('test:tester')
return ACL(owner, [Grant(grantee, permission)])
return encode_acl(resource, ACL(owner, [Grant(grantee, permission)]))
def _make_xml(grantee):
@@ -51,6 +53,12 @@ def _make_xml(grantee):
class TestSwift3S3Acl(Swift3TestCase):
"""
This class has been tested in the following Controller.
[S3AclController]
[BucketController] Case: Conf.s3_acl == True
[Object Controller] Case: Conf.s3_acl == True
"""
def setUp(self):
super(TestSwift3S3Acl, self).setUp()
@@ -85,6 +93,9 @@ class TestSwift3S3Acl(Swift3TestCase):
def tearDown(self):
CONF.s3_acl = False
"""
[S3AclController]
"""
def test_bucket_acl_PUT_with_other_owner(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
@@ -298,11 +309,10 @@ class TestSwift3S3Acl(Swift3TestCase):
def _test_bucket_acl_GET(self, owner, permission):
owner = Owner(id=owner, name=owner)
acl = _gen_test_acl(owner, permission)
headers = _gen_test_headers(owner, permission)
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
encode_acl('container', acl),
None)
headers, None)
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'})
@@ -313,14 +323,28 @@ class TestSwift3S3Acl(Swift3TestCase):
status, headers, body = self._test_bucket_acl_GET('test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_GET_with_owner_permission(self):
def test_bucket_acl_GET_with_read_acp_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:other',
'READ_ACP')
self.assertEquals(status.split()[0], '200')
def test_bucket_acl_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_bucket_acl_GET_with_owner_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:tester', None)
self.assertEquals(status.split()[0], '200')
def _test_bucket_acl_PUT(self, owner, permission):
def _test_bucket_acl_PUT(self, owner, permission, grantee='test:tester'):
owner = Owner(id=owner, name=owner)
acl = _gen_test_acl(owner, permission)
grantee = User(grantee)
headers = _gen_test_headers(owner, permission, grantee)
acl = decode_acl('container', headers)
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
headers, None)
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac'},
@@ -329,21 +353,34 @@ class TestSwift3S3Acl(Swift3TestCase):
return self.call_swift3(req)
def test_bucket_acl_PUT_without_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:other', None)
status, headers, body = self._test_bucket_acl_PUT('test:other', None,
'test:other')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_acl_PUT_with_write_acp_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:other',
'WRITE_ACP')
self.assertEquals(status.split()[0], '200')
def test_bucket_acl_PUT_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_bucket_acl_PUT_with_owner_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:tester', None)
status, headers, body = self._test_bucket_acl_PUT('test:tester',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def _test_object_acl_GET(self, owner, permission):
owner = Owner(id=owner, name=owner)
acl = _gen_test_acl(owner, permission)
headers = _gen_test_headers(owner, permission, resource='object')
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
swob.HTTPOk,
encode_acl('object', acl),
None)
swob.HTTPOk, headers, None)
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'})
@@ -354,18 +391,27 @@ class TestSwift3S3Acl(Swift3TestCase):
status, headers, body = self._test_object_acl_GET('test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_acl_GET_with_read_acp_permission(self):
status, headers, body = self._test_object_acl_GET('test:other',
'READ_ACP')
self.assertEquals(status.split()[0], '200')
def test_object_acl_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_object_acl_GET('test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_object_acl_GET_with_owner_permission(self):
status, headers, body = self._test_object_acl_GET('test:tester', None)
self.assertEquals(status.split()[0], '200')
def _test_object_acl_PUT(self, owner, permission):
owner = Owner(id=owner, name=owner)
acl = _gen_test_acl(owner, permission)
headers = _gen_test_headers(owner, permission, resource='object')
acl = decode_acl('object', headers)
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
swob.HTTPOk,
encode_acl('object', acl),
None)
swob.HTTPOk, headers, None)
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac'},
@@ -377,9 +423,285 @@ class TestSwift3S3Acl(Swift3TestCase):
status, headers, body = self._test_object_acl_PUT('test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_acl_PUT_with_write_acp_permission(self):
status, headers, body = self._test_object_acl_PUT('test:other',
'WRITE_ACP')
self.assertEquals(status.split()[0], '200')
def test_object_acl_PUT_with_fullcontrol_permission(self):
status, headers, body = self._test_object_acl_PUT('test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_object_acl_PUT_with_owner_permission(self):
status, headers, body = self._test_object_acl_PUT('test:tester', None)
self.assertEquals(status.split()[0], '200')
"""
[BucketController] Case: Conf.s3_acl == True
"""
def _test_bucket(self, method, owner, permission):
owner = Owner(id=owner, name=owner)
headers = _gen_test_headers(owner, permission)
self.swift.register('HEAD', '/v1/AUTH_test/acltest',
swob.HTTPNoContent, headers, None)
self.swift.register('GET', '/v1/AUTH_test/acltest', swob.HTTPNoContent,
headers, json.dumps([]))
self.swift.register('POST', '/v1/AUTH_test/acltest',
swob.HTTPNoContent, {}, None)
self.swift.register('DELETE', '/v1/AUTH_test/acltest',
swob.HTTPNoContent, {}, None)
req = Request.blank('/acltest',
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac'})
return self.call_swift3(req)
def test_bucket_GET_without_permission(self):
status, headers, body = self._test_bucket('GET', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_GET_with_read_permission(self):
status, headers, body = self._test_bucket('GET', 'test:other', 'READ')
self.assertEquals(status.split()[0], '200')
def test_bucket_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket('GET', 'test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_bucket_GET_with_owner_permission(self):
status, headers, body = self._test_bucket('GET', 'test:tester', None)
self.assertEquals(status.split()[0], '200')
def _test_bucket_GET_canned_acl(self, group):
owner = Owner(id='test:other', name='test:other')
headers = _gen_test_headers(owner, 'READ', group)
self.swift.register('GET', '/v1/AUTH_test/acltest', swob.HTTPNoContent,
headers, json.dumps([]))
req = Request.blank('/acltest',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'})
return self.call_swift3(req)
def test_bucket_GET_authenticated_users(self):
status, headers, body = \
self._test_bucket_GET_canned_acl(AuthenticatedUsers())
self.assertEquals(status.split()[0], '200')
def test_bucket_GET_all_users(self):
status, headers, body = self._test_bucket_GET_canned_acl(AllUsers())
self.assertEquals(status.split()[0], '200')
def test_bucket_PUT_with_already_exist(self):
self.swift.register('PUT', '/v1/AUTH_test/acltest',
swob.HTTPAccepted, {}, None)
status, headers, body = self._test_bucket('PUT', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'BucketAlreadyExists')
def test_bucket_PUT(self):
self.swift.register('PUT', '/v1/AUTH_test/acltest',
swob.HTTPCreated, {}, None)
status, headers, body = self._test_bucket('PUT', 'test:other', 'WRITE')
self.assertEquals(status.split()[0], '200')
def test_bucket_DELETE_without_permission(self):
status, headers, body = self._test_bucket('DELETE', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_DELETE_with_write_permission(self):
status, headers, body = self._test_bucket('DELETE', 'test:other',
'WRITE')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_DELETE_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket('DELETE', 'test:other',
'FULL_CONTROL')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_DELETE_with_owner_permission(self):
status, headers, body = self._test_bucket('DELETE', 'test:tester',
None)
self.assertEquals(status.split()[0], '204')
"""
[Object Controller] Case: Conf.s3_acl == True
"""
def _test_object(self, method, o_owner, o_permission,
grantee='test:tester', c_owner='test:tester',
c_permission='FULL_CONTROL', existObject=True):
c_owner = Owner(id=c_owner, name=c_owner)
o_owner = Owner(id=o_owner, name=o_owner)
grantee = User(grantee)
c_headers = _gen_test_headers(c_owner, c_permission, grantee)
o_headers = _gen_test_headers(o_owner, o_permission, grantee, 'object')
self.swift.register('HEAD', '/v1/AUTH_test/bucket',
swob.HTTPNoContent, c_headers, None)
if existObject:
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
swob.HTTPOk, o_headers, None)
else:
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
swob.HTTPNotFound, {}, None)
self.swift.register('GET', '/v1/AUTH_test/bucket/object',
swob.HTTPOk, o_headers, '')
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
swob.HTTPCreated, {}, None)
self.swift.register('DELETE', '/v1/AUTH_test/bucket/object',
swob.HTTPNoContent, {}, None)
req = Request.blank('/bucket/object',
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac'})
return self.call_swift3(req)
def test_object_GET_without_permission(self):
status, headers, body = self._test_object('GET', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_GET_with_read_permission(self):
status, headers, body = self._test_object('GET', 'test:other', 'READ')
self.assertEquals(status.split()[0], '200')
def test_object_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_object('GET', 'test:other',
'FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_object_GET_with_owner_permission(self):
status, headers, body = self._test_object('GET', 'test:tester', None)
self.assertEquals(status.split()[0], '200')
def test_object_PUT_without_permission(self):
status, headers, body = self._test_object('PUT', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_PUT_with_write_permission(self):
status, headers, body = self._test_object('PUT', 'test:other', 'WRITE',
existObject=False)
self.assertEquals(status.split()[0], '200')
def test_object_PUT_with_fullcontrol_permission(self):
status, headers, body = self._test_object('PUT', 'test:other',
'FULL_CONTROL',
existObject=False)
self.assertEquals(status.split()[0], '200')
def test_object_PUT_with_owner_permission(self):
status, headers, body = self._test_object('PUT', 'test:tester', None,
existObject=False)
self.assertEquals(status.split()[0], '200')
def test_object_PUT_without_overwriting_permission(self):
status, headers, body = self._test_object('PUT', 'test:other', None)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_PUT_with_overwriting_permission(self):
status, headers, body = self._test_object('PUT', 'test:other',
'WRITE')
self.assertEquals(status.split()[0], '200')
def test_object_DELETE_without_permission(self):
status, headers, body = self._test_object('DELETE', 'test:other', None,
c_owner='test:other',
c_permission='READ')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_object_DELETE_with_write_permission(self):
status, headers, body = self._test_object('DELETE', 'test:tester',
None,
c_owner='test:other',
c_permission='WRITE')
self.assertEquals(status.split()[0], '204')
def test_object_DELETE_with_fullcontrol_permission(self):
status, headers, body = self._test_object('DELETE', 'test:tester',
None,
c_owner='test:other')
self.assertEquals(status.split()[0], '204')
def test_object_DELETE_with_owner_permission(self):
status, headers, body = self._test_object('DELETE', 'test:tester',
None)
self.assertEquals(status.split()[0], '204')
def _test_object_copy(self, src_owner='test:tester', src_permission=None,
dst_c_owner='test:tester', dst_c_permission=None,
dst_c_grantee=None, dst_o_owner='test:tester',
dst_o_permission=None, dst_o_grantee=None):
src_o_headers = _gen_test_headers(Owner(id=src_owner, name=src_owner),
src_permission, None, 'object')
self.swift.register('HEAD', '/v1/AUTH_test/src_bucket/src_obj',
swob.HTTPOk, src_o_headers, None)
dst_c_headers = _gen_test_headers(Owner(id=dst_c_owner,
name=dst_c_owner),
dst_c_permission, dst_c_grantee)
self.swift.register('HEAD', '/v1/AUTH_test/dst_bucket',
swob.HTTPNoContent, dst_c_headers, None)
dst_o_headers = _gen_test_headers(Owner(id=dst_o_owner,
name=dst_o_owner),
dst_o_permission, dst_o_grantee,
'object')
self.swift.register('HEAD', '/v1/AUTH_test/dst_bucket/dst_obj',
swob.HTTPOk, dst_o_headers, None)
self.swift.register('PUT', '/v1/AUTH_test/dst_bucket/dst_obj',
swob.HTTPCreated, {}, None)
req = Request.blank(
'/dst_bucket/dst_obj',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'X-Amz-Copy-Source': '/src_bucket/src_obj'})
return self.call_swift3(req)
def test_object_PUT_copy_with_owner_permission(self):
status, headers, body = \
self._test_object_copy()
self.assertEquals(status.split()[0], '200')
def test_object_PUT_copy_with_fullcontrol_permission(self):
status, headers, body = \
self._test_object_copy(src_owner='test:other',
src_permission='FULL_CONTROL',
dst_c_owner='test:other',
dst_c_permission='FULL_CONTROL',
dst_o_owner='test:other',
dst_o_permission='FULL_CONTROL')
self.assertEquals(status.split()[0], '200')
def test_object_PUT_copy_with_grantee_permission(self):
status, headers, body = \
self._test_object_copy(src_owner='test:other',
src_permission='READ',
dst_c_owner='test:other',
dst_c_permission='WRITE',
dst_o_owner='test:other',
dst_o_permission='WRITE')
self.assertEquals(status.split()[0], '200')
def test_object_PUT_copy_without_src_obj_permission(self):
status, headers, body = \
self._test_object_copy(src_owner='test:other')
self.assertEquals(status.split()[0], '403')
def test_object_PUT_copy_without_dst_container_permission(self):
status, headers, body = \
self._test_object_copy(dst_c_owner='test:other')
self.assertEquals(status.split()[0], '403')
def test_object_PUT_copy_without_dst_obj_permission(self):
status, headers, body = \
self._test_object_copy(dst_o_owner='test:other')
self.assertEquals(status.split()[0], '403')
if __name__ == '__main__':
unittest.main()