Merge "s3_acl: Require swift_owner authz to create buckets"
This commit is contained in:
commit
cfc4f30d63
@ -51,7 +51,7 @@ Example::
|
||||
"""
|
||||
from swift.common.middleware.s3api.subresource import ACL, Owner, encode_acl
|
||||
from swift.common.middleware.s3api.s3response import MissingSecurityHeader, \
|
||||
MalformedACLError, UnexpectedContent
|
||||
MalformedACLError, UnexpectedContent, AccessDenied
|
||||
from swift.common.middleware.s3api.etree import fromstring, XMLSyntaxError, \
|
||||
DocumentInvalid
|
||||
from swift.common.middleware.s3api.utils import MULTIUPLOAD_SUFFIX, \
|
||||
@ -206,6 +206,9 @@ class BucketAclHandler(BaseAclHandler):
|
||||
req_acl = ACL.from_headers(self.req.headers,
|
||||
Owner(self.user_id, self.user_id))
|
||||
|
||||
if not self.req.environ.get('swift_owner'):
|
||||
raise AccessDenied()
|
||||
|
||||
# To avoid overwriting the existing bucket's ACL, we send PUT
|
||||
# request first before setting the ACL to make sure that the target
|
||||
# container does not exist.
|
||||
|
@ -1350,6 +1350,9 @@ class S3AclRequest(S3Request):
|
||||
# tempauth
|
||||
self.user_id = self.access_key
|
||||
|
||||
sw_req.environ.get('swift.authorize', lambda req: None)(sw_req)
|
||||
self.environ['swift_owner'] = sw_req.environ.get('swift_owner', False)
|
||||
|
||||
# Need to skip S3 authorization on subsequent requests to prevent
|
||||
# overwriting the account in PATH_INFO
|
||||
del self.headers['Authorization']
|
||||
|
@ -151,7 +151,11 @@ class TestS3ApiBucket(S3ApiBase):
|
||||
|
||||
self.conn.make_request('PUT', 'bucket')
|
||||
status, headers, body = self.conn.make_request('PUT', 'bucket')
|
||||
self.assertEqual(get_error_code(body), 'BucketAlreadyExists')
|
||||
# If the user can't create buckets, they shouldn't even know
|
||||
# whether the bucket exists. For some reason, though, when s3_acl
|
||||
# is disabled, we translate 403 -> BucketAlreadyExists??
|
||||
self.assertIn(get_error_code(body),
|
||||
('AccessDenied', 'BucketAlreadyExists'))
|
||||
|
||||
def test_put_bucket_with_LocationConstraint(self):
|
||||
bucket = 'bucket'
|
||||
|
@ -106,7 +106,8 @@ class S3ApiTestCase(unittest.TestCase):
|
||||
elem = fromstring(body, 'Error')
|
||||
return elem.find('./Message').text
|
||||
|
||||
def _test_method_error(self, method, path, response_class, headers={}):
|
||||
def _test_method_error(self, method, path, response_class, headers={},
|
||||
env={}):
|
||||
if not path.startswith('/'):
|
||||
path = '/' + path # add a missing slash before the path
|
||||
|
||||
@ -117,8 +118,8 @@ class S3ApiTestCase(unittest.TestCase):
|
||||
self.swift.register(method, uri, response_class, headers, None)
|
||||
headers.update({'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header()})
|
||||
req = swob.Request.blank(path, environ={'REQUEST_METHOD': method},
|
||||
headers=headers)
|
||||
env.update({'REQUEST_METHOD': method})
|
||||
req = swob.Request.blank(path, environ=env, headers=headers)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
return self._get_error_code(body)
|
||||
|
||||
|
@ -53,9 +53,15 @@ class FakeSwift(object):
|
||||
env['REMOTE_USER'] = 'authorized'
|
||||
|
||||
if env['REQUEST_METHOD'] == 'TEST':
|
||||
# AccessDenied by default at s3acl authenticate
|
||||
env['swift.authorize'] = \
|
||||
lambda req: swob.HTTPForbidden(request=req)
|
||||
|
||||
def authorize_cb(req):
|
||||
# Assume swift owner, if not yet set
|
||||
req.environ.setdefault('swift_owner', True)
|
||||
# But then default to blocking authz, to ensure we've replaced
|
||||
# the default auth system
|
||||
return swob.HTTPForbidden(request=req)
|
||||
|
||||
env['swift.authorize'] = authorize_cb
|
||||
else:
|
||||
env['swift.authorize'] = lambda req: None
|
||||
|
||||
|
@ -498,6 +498,12 @@ class TestS3ApiBucket(S3ApiTestCase):
|
||||
swob.HTTPCreated)
|
||||
self.assertEqual(code, 'InvalidBucketName')
|
||||
|
||||
@s3acl(s3acl_only=True)
|
||||
def test_bucket_PUT_error_non_owner(self):
|
||||
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted,
|
||||
env={'swift_owner': False})
|
||||
self.assertEqual(code, 'AccessDenied')
|
||||
|
||||
@s3acl
|
||||
def test_bucket_PUT(self):
|
||||
req = Request.blank('/bucket',
|
||||
|
Loading…
Reference in New Issue
Block a user