s3api: add get acl object and bucket support
A simple response that gives the owner to FULL_CONTROL always returns because Swift doesn't support a fine acl. This also adds get acl unit tests.
This commit is contained in:
@@ -60,7 +60,7 @@ import base64
|
||||
import errno
|
||||
import boto.utils
|
||||
from xml.sax.saxutils import escape as xml_escape
|
||||
import cgi
|
||||
import urlparse
|
||||
|
||||
from webob import Request, Response
|
||||
from webob.exc import HTTPNotFound
|
||||
@@ -109,6 +109,25 @@ def get_err_response(code):
|
||||
return resp
|
||||
|
||||
|
||||
def get_acl(account_name):
|
||||
body = ('<AccessControlPolicy>'
|
||||
'<Owner>'
|
||||
'<ID>%s</ID>'
|
||||
'</Owner>'
|
||||
'<AccessControlList>'
|
||||
'<Grant>'
|
||||
'<Grantee xmlns:xsi="http://www.w3.org/2001/'\
|
||||
'XMLSchema-instance" xsi:type="CanonicalUser">'
|
||||
'<ID>%s</ID>'
|
||||
'</Grantee>'
|
||||
'<Permission>FULL_CONTROL</Permission>'
|
||||
'</Grant>'
|
||||
'</AccessControlList>'
|
||||
'</AccessControlPolicy>' %
|
||||
(account_name, account_name))
|
||||
return Response(body=body, content_type="text/plain")
|
||||
|
||||
|
||||
class Controller(object):
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
@@ -165,6 +184,7 @@ class BucketController(Controller):
|
||||
**kwargs):
|
||||
Controller.__init__(self, app)
|
||||
self.container_name = unquote(container_name)
|
||||
self.account_name = unquote(account_name)
|
||||
env['HTTP_X_AUTH_TOKEN'] = token
|
||||
env['PATH_INFO'] = '/v1/%s/%s' % (account_name, container_name)
|
||||
|
||||
@@ -173,7 +193,7 @@ class BucketController(Controller):
|
||||
Handle GET Bucket (List Objects) request
|
||||
"""
|
||||
if 'QUERY_STRING' in env:
|
||||
args = dict(cgi.parse_qsl(env['QUERY_STRING']))
|
||||
args = dict(urlparse.parse_qsl(env['QUERY_STRING'], 1))
|
||||
else:
|
||||
args = {}
|
||||
max_keys = min(int(args.get('max-keys', MAX_BUCKET_LISTING)),
|
||||
@@ -197,6 +217,9 @@ class BucketController(Controller):
|
||||
else:
|
||||
return get_err_response('InvalidURI')
|
||||
|
||||
if 'acl' in args:
|
||||
return get_acl(self.account_name)
|
||||
|
||||
objects = loads(''.join(list(body_iter)))
|
||||
body = ('<?xml version="1.0" encoding="UTF-8"?>'
|
||||
'<ListBucketResult '
|
||||
@@ -279,6 +302,7 @@ class ObjectController(Controller):
|
||||
def __init__(self, env, app, account_name, token, container_name,
|
||||
object_name, **kwargs):
|
||||
Controller.__init__(self, app)
|
||||
self.account_name = unquote(account_name)
|
||||
self.container_name = unquote(container_name)
|
||||
env['HTTP_X_AUTH_TOKEN'] = token
|
||||
env['PATH_INFO'] = '/v1/%s/%s/%s' % (account_name, container_name,
|
||||
@@ -290,6 +314,13 @@ class ObjectController(Controller):
|
||||
headers = dict(self.response_args[1])
|
||||
|
||||
if 200 <= status < 300:
|
||||
if 'QUERY_STRING' in env:
|
||||
args = dict(urlparse.parse_qsl(env['QUERY_STRING'], 1))
|
||||
else:
|
||||
args = {}
|
||||
if 'acl' in args:
|
||||
return get_acl(self.account_name)
|
||||
|
||||
new_hdrs = {}
|
||||
for key, val in headers.iteritems():
|
||||
_key = key.lower()
|
||||
|
||||
@@ -421,6 +421,23 @@ class TestSwift3(unittest.TestCase):
|
||||
resp = local_app(req.environ, local_app.app.do_start_response)
|
||||
self.assertEquals(local_app.app.response_args[0].split()[0], '204')
|
||||
|
||||
def _check_acl(self, owner, resp):
|
||||
dom = xml.dom.minidom.parseString("".join(resp))
|
||||
self.assertEquals(dom.firstChild.nodeName, 'AccessControlPolicy')
|
||||
name = dom.getElementsByTagName('Permission')[0].childNodes[0].nodeValue
|
||||
self.assertEquals(name, 'FULL_CONTROL')
|
||||
name = dom.getElementsByTagName('ID')[0].childNodes[0].nodeValue
|
||||
self.assertEquals(name, owner)
|
||||
|
||||
def test_bucket_acl_GET(self):
|
||||
local_app = swift3.filter_factory({})(FakeAppBucket())
|
||||
bucket_name = 'junk'
|
||||
req = Request.blank('/%s?acl' % bucket_name,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
resp = local_app(req.environ, local_app.app.do_start_response)
|
||||
self._check_acl('test:tester', resp)
|
||||
|
||||
def _test_object_GETorHEAD(self, method):
|
||||
local_app = swift3.filter_factory({})(FakeAppObject())
|
||||
req = Request.blank('/bucket/object',
|
||||
@@ -529,6 +546,13 @@ class TestSwift3(unittest.TestCase):
|
||||
resp = local_app(req.environ, local_app.app.do_start_response)
|
||||
self.assertEquals(local_app.app.response_args[0].split()[0], '204')
|
||||
|
||||
def test_object_acl_GET(self):
|
||||
local_app = swift3.filter_factory({})(FakeAppObject())
|
||||
req = Request.blank('/bucket/object?acl',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
resp = local_app(req.environ, local_app.app.do_start_response)
|
||||
self._check_acl('test:tester', resp)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user