swift/test/unit/common/middleware/s3api/test_subresource.py
Kota Tsuyuzaki 636b922f3b Import swift3 into swift repo as s3api middleware
This attempts to import openstack/swift3 package into swift upstream
repository, namespace. This is almost simple porting except following items.

1. Rename swift3 namespace to swift.common.middleware.s3api
1.1 Rename also some conflicted class names (e.g. Request/Response)

2. Port unittests to test/unit/s3api dir to be able to run on the gate.

3. Port functests to test/functional/s3api and setup in-process testing

4. Port docs to doc dir, then address the namespace change.

5. Use get_logger() instead of global logger instance

6. Avoid global conf instance

Ex. fix various minor issue on those steps (e.g. packages, dependencies,
  deprecated things)

The details and patch references in the work on feature/s3api are listed
at https://trello.com/b/ZloaZ23t/s3api (completed board)

Note that, because this is just a porting, no new feature is developed since
the last swift3 release, and in the future work, Swift upstream may continue
to work on remaining items for further improvements and the best compatibility
of Amazon S3. Please read the new docs for your deployment and keep track to
know what would be changed in the future releases.

Change-Id: Ib803ea89cfee9a53c429606149159dd136c036fd
Co-Authored-By: Thiago da Silva <thiago@redhat.com>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
2018-04-27 15:53:57 +09:00

368 lines
17 KiB
Python

# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from swift.common.utils import json
from swift.common.middleware.s3api.s3response import AccessDenied, \
InvalidArgument, S3NotImplemented
from swift.common.middleware.s3api.subresource import User, \
AuthenticatedUsers, AllUsers, \
ACLPrivate, ACLPublicRead, ACLPublicReadWrite, ACLAuthenticatedRead, \
ACLBucketOwnerRead, ACLBucketOwnerFullControl, Owner, ACL, encode_acl, \
decode_acl, canned_acl_grantees, Grantee
from swift.common.middleware.s3api.utils import sysmeta_header
from swift.common.middleware.s3api.exception import InvalidSubresource
class TestS3ApiSubresource(unittest.TestCase):
def setUp(self):
self.s3_acl = True
self.allow_no_owner = False
def test_acl_canonical_user(self):
grantee = User('test:tester')
self.assertTrue('test:tester' in grantee)
self.assertTrue('test:tester2' not in grantee)
self.assertEqual(str(grantee), 'test:tester')
self.assertEqual(grantee.elem().find('./ID').text, 'test:tester')
def test_acl_authenticated_users(self):
grantee = AuthenticatedUsers()
self.assertTrue('test:tester' in grantee)
self.assertTrue('test:tester2' in grantee)
uri = 'http://acs.amazonaws.com/groups/global/AuthenticatedUsers'
self.assertEqual(grantee.elem().find('./URI').text, uri)
def test_acl_all_users(self):
grantee = AllUsers()
self.assertTrue('test:tester' in grantee)
self.assertTrue('test:tester2' in grantee)
uri = 'http://acs.amazonaws.com/groups/global/AllUsers'
self.assertEqual(grantee.elem().find('./URI').text, uri)
def check_permission(self, acl, user_id, permission):
try:
acl.check_permission(user_id, permission)
return True
except AccessDenied:
return False
def test_acl_private(self):
acl = ACLPrivate(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_public_read(self):
acl = ACLPublicRead(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_public_read_write(self):
acl = ACLPublicReadWrite(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_authenticated_read(self):
acl = ACLAuthenticatedRead(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_bucket_owner_read(self):
acl = ACLBucketOwnerRead(
bucket_owner=Owner('test:tester2', 'test:tester2'),
object_owner=Owner('test:tester', 'test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_bucket_owner_full_control(self):
acl = ACLBucketOwnerFullControl(
bucket_owner=Owner('test:tester2', 'test:tester2'),
object_owner=Owner('test:tester', 'test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester2', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_elem(self):
acl = ACLPrivate(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
elem = acl.elem()
self.assertTrue(elem.find('./Owner') is not None)
self.assertTrue(elem.find('./AccessControlList') is not None)
grants = [e for e in elem.findall('./AccessControlList/Grant')]
self.assertEqual(len(grants), 1)
self.assertEqual(grants[0].find('./Grantee/ID').text, 'test:tester')
self.assertEqual(
grants[0].find('./Grantee/DisplayName').text, 'test:tester')
def test_acl_from_elem(self):
# check translation from element
acl = ACLPrivate(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner)
elem = acl.elem()
acl = ACL.from_elem(elem, self.s3_acl, self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_acl_from_elem_by_id_only(self):
elem = ACLPrivate(Owner(id='test:tester',
name='test:tester'),
s3_acl=self.s3_acl,
allow_no_owner=self.allow_no_owner).elem()
elem.find('./Owner').remove(elem.find('./Owner/DisplayName'))
acl = ACL.from_elem(elem, self.s3_acl, self.allow_no_owner)
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'READ_ACP'))
self.assertTrue(self.check_permission(acl, 'test:tester', 'WRITE_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'READ'))
self.assertFalse(self.check_permission(acl, 'test:tester2', 'WRITE'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'READ_ACP'))
self.assertFalse(self.check_permission(acl, 'test:tester2',
'WRITE_ACP'))
def test_decode_acl_container(self):
access_control_policy = \
{'Owner': 'test:tester',
'Grant': [{'Permission': 'FULL_CONTROL',
'Grantee': 'test:tester'}]}
headers = {sysmeta_header('container', 'acl'):
json.dumps(access_control_policy)}
acl = decode_acl('container', headers, self.allow_no_owner)
self.assertEqual(type(acl), ACL)
self.assertEqual(acl.owner.id, 'test:tester')
self.assertEqual(len(acl.grants), 1)
self.assertEqual(str(acl.grants[0].grantee), 'test:tester')
self.assertEqual(acl.grants[0].permission, 'FULL_CONTROL')
def test_decode_acl_object(self):
access_control_policy = \
{'Owner': 'test:tester',
'Grant': [{'Permission': 'FULL_CONTROL',
'Grantee': 'test:tester'}]}
headers = {sysmeta_header('object', 'acl'):
json.dumps(access_control_policy)}
acl = decode_acl('object', headers, self.allow_no_owner)
self.assertEqual(type(acl), ACL)
self.assertEqual(acl.owner.id, 'test:tester')
self.assertEqual(len(acl.grants), 1)
self.assertEqual(str(acl.grants[0].grantee), 'test:tester')
self.assertEqual(acl.grants[0].permission, 'FULL_CONTROL')
def test_decode_acl_undefined(self):
headers = {}
acl = decode_acl('container', headers, self.allow_no_owner)
self.assertEqual(type(acl), ACL)
self.assertIsNone(acl.owner.id)
self.assertEqual(len(acl.grants), 0)
def test_decode_acl_empty_list(self):
headers = {sysmeta_header('container', 'acl'): '[]'}
acl = decode_acl('container', headers, self.allow_no_owner)
self.assertEqual(type(acl), ACL)
self.assertIsNone(acl.owner.id)
self.assertEqual(len(acl.grants), 0)
def test_decode_acl_with_invalid_json(self):
headers = {sysmeta_header('container', 'acl'): '['}
self.assertRaises(
InvalidSubresource, decode_acl, 'container',
headers, self.allow_no_owner)
def test_encode_acl_container(self):
acl = ACLPrivate(Owner(id='test:tester',
name='test:tester'))
acp = encode_acl('container', acl)
header_value = json.loads(acp[sysmeta_header('container', 'acl')])
self.assertTrue('Owner' in header_value)
self.assertTrue('Grant' in header_value)
self.assertEqual('test:tester', header_value['Owner'])
self.assertEqual(len(header_value['Grant']), 1)
def test_encode_acl_object(self):
acl = ACLPrivate(Owner(id='test:tester',
name='test:tester'))
acp = encode_acl('object', acl)
header_value = json.loads(acp[sysmeta_header('object', 'acl')])
self.assertTrue('Owner' in header_value)
self.assertTrue('Grant' in header_value)
self.assertEqual('test:tester', header_value['Owner'])
self.assertEqual(len(header_value['Grant']), 1)
def test_encode_acl_many_grant(self):
headers = {}
users = []
for i in range(0, 99):
users.append('id=test:tester%s' % str(i))
users = ','.join(users)
headers['x-amz-grant-read'] = users
acl = ACL.from_headers(headers, Owner('test:tester', 'test:tester'))
acp = encode_acl('container', acl)
header_value = acp[sysmeta_header('container', 'acl')]
header_value = json.loads(header_value)
self.assertTrue('Owner' in header_value)
self.assertTrue('Grant' in header_value)
self.assertEqual('test:tester', header_value['Owner'])
self.assertEqual(len(header_value['Grant']), 99)
def test_from_headers_x_amz_acl(self):
canned_acls = ['public-read', 'public-read-write',
'authenticated-read', 'bucket-owner-read',
'bucket-owner-full-control', 'log-delivery-write']
owner = Owner('test:tester', 'test:tester')
grantee_map = canned_acl_grantees(owner)
for acl_str in canned_acls:
acl = ACL.from_headers({'x-amz-acl': acl_str}, owner)
expected = grantee_map[acl_str]
self.assertEqual(len(acl.grants), len(expected)) # sanity
# parse Grant object to permission and grantee
actual_grants = [(grant.permission, grant.grantee)
for grant in acl.grants]
assertions = zip(sorted(expected), sorted(actual_grants))
for (expected_permission, expected_grantee), \
(permission, grantee) in assertions:
self.assertEqual(expected_permission, permission)
self.assertTrue(
isinstance(grantee, expected_grantee.__class__))
if isinstance(grantee, User):
self.assertEqual(expected_grantee.id, grantee.id)
self.assertEqual(expected_grantee.display_name,
grantee.display_name)
def test_from_headers_x_amz_acl_invalid(self):
with self.assertRaises(InvalidArgument) as cm:
ACL.from_headers({'x-amz-acl': 'invalid'},
Owner('test:tester', 'test:tester'))
self.assertTrue('argument_name' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_name'], 'x-amz-acl')
self.assertTrue('argument_value' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_value'], 'invalid')
def test_canned_acl_grantees(self):
grantee_map = canned_acl_grantees(Owner('test:tester', 'test:tester'))
canned_acls = ['private', 'public-read', 'public-read-write',
'authenticated-read', 'bucket-owner-read',
'bucket-owner-full-control', 'log-delivery-write']
for canned_acl in canned_acls:
self.assertTrue(canned_acl in grantee_map)
self.assertEqual(len(canned_acls), len(grantee_map)) # sanity
def test_base_grantee(self):
grantee = Grantee()
func = lambda: '' in grantee
self.assertRaises(S3NotImplemented, func)
if __name__ == '__main__':
unittest.main()