From b7fd668554c9b896c2faef712c8daf9cb344dbdc Mon Sep 17 00:00:00 2001 From: Matthieu Huin Date: Tue, 27 May 2014 15:43:38 +0200 Subject: [PATCH] Uses nacim's version instead --- tests/test_swiftpolicy.py | 142 ++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 67 deletions(-) diff --git a/tests/test_swiftpolicy.py b/tests/test_swiftpolicy.py index be28417..23ef355 100644 --- a/tests/test_swiftpolicy.py +++ b/tests/test_swiftpolicy.py @@ -18,9 +18,10 @@ import time import unittest from collections import defaultdict -from swiftpolicy import swiftpolicy +from swift.common.middleware import keystoneauth from swift.common.swob import Request, Response from swift.common.http import HTTP_FORBIDDEN +from swiftpolicy.enforcer import AclCheck class UnmockTimeModule(object): @@ -164,7 +165,7 @@ class FakeApp(object): class SwiftAuth(unittest.TestCase): def setUp(self): - self.test_auth = swiftpolicy.filter_factory({})(FakeApp()) + self.test_auth = keystoneauth.filter_factory({})(FakeApp()) self.test_auth.logger = FakeLogger() def _make_request(self, path=None, headers=None, **kwargs): @@ -182,7 +183,7 @@ class SwiftAuth(unittest.TestCase): def _get_successful_middleware(self): response_iter = iter([('200 OK', {}, '')]) - return swiftpolicy.filter_factory({})(FakeApp(response_iter)) + return keystoneauth.filter_factory({})(FakeApp(response_iter)) def test_invalid_request_authorized(self): role = self.test_auth.reseller_admin_role @@ -231,28 +232,30 @@ class SwiftAuth(unittest.TestCase): def test_anonymous_is_not_authorized_for_unknown_reseller_prefix(self): req = self._make_request(path='/v1/BLAH_foo/c/o', headers={'X_IDENTITY_STATUS': 'Invalid'}) + # check user is not authorized even object is "public" + req.acl = '.r:*' resp = req.get_response(self.test_auth) self.assertEqual(resp.status_int, 401) -# def test_blank_reseller_prefix(self): -# conf = {'reseller_prefix': ''} -# test_auth = swiftpolicy.filter_factory(conf)(FakeApp()) -# account = tenant_id = 'foo' -# self.assertTrue(test_auth._reseller_check(account, tenant_id)) + def test_blank_reseller_prefix(self): + conf = {'reseller_prefix': ''} + test_auth = keystoneauth.filter_factory(conf)(FakeApp()) + account = tenant_id = 'foo' + self.assertEqual(account, test_auth._get_account_for_tenant(tenant_id)) def test_reseller_prefix_added_underscore(self): conf = {'reseller_prefix': 'AUTH'} - test_auth = swiftpolicy.filter_factory(conf)(FakeApp()) + test_auth = keystoneauth.filter_factory(conf)(FakeApp()) self.assertEqual(test_auth.reseller_prefix, "AUTH_") def test_reseller_prefix_not_added_double_underscores(self): conf = {'reseller_prefix': 'AUTH_'} - test_auth = swiftpolicy.filter_factory(conf)(FakeApp()) + test_auth = keystoneauth.filter_factory(conf)(FakeApp()) self.assertEqual(test_auth.reseller_prefix, "AUTH_") def test_override_asked_for_but_not_allowed(self): conf = {'allow_overrides': 'false'} - self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp()) + self.test_auth = keystoneauth.filter_factory(conf)(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) @@ -260,7 +263,7 @@ class SwiftAuth(unittest.TestCase): def test_override_asked_for_and_allowed(self): conf = {'allow_overrides': 'true'} - self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp()) + self.test_auth = keystoneauth.filter_factory(conf)(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) @@ -297,7 +300,7 @@ class SwiftAuth(unittest.TestCase): class TestAuthorize(unittest.TestCase): def setUp(self): - self.test_auth = swiftpolicy.filter_factory({})(FakeApp()) + self.test_auth = keystoneauth.filter_factory({})(FakeApp()) self.test_auth.logger = FakeLogger() def _make_request(self, path, **kwargs): @@ -310,9 +313,7 @@ class TestAuthorize(unittest.TestCase): identity['HTTP_X_TENANT_ID']) def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name', - user_id='user_id', user_name='user_name', roles=None): - if roles is None: - roles = [] + user_id='user_id', user_name='user_name', roles=[]): if isinstance(roles, list): roles = ','.join(roles) return {'HTTP_X_USER_ID': user_id, @@ -373,6 +374,7 @@ class TestAuthorize(unittest.TestCase): self.assertTrue(req.environ.get('swift_owner')) def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self): + #import pdb; pdb.set_trace() roles = [r.upper() for r in self.test_auth.operator_roles.split(',')] identity = self._get_identity(roles=roles) req = self._check_authenticate(identity=identity) @@ -449,57 +451,6 @@ class TestAuthorize(unittest.TestCase): acl = '*:%s' % user self._check_authenticate(identity=identity, acl=acl) -# def test_cross_tenant_authorization_success(self): -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', -# ['tenantID:userA']), -# 'tenantID:userA') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', -# ['tenantNAME:userA']), -# 'tenantNAME:userA') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userA']), -# '*:userA') - -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', -# ['tenantID:userID']), -# 'tenantID:userID') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', -# ['tenantNAME:userID']), -# 'tenantNAME:userID') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userID']), -# '*:userID') - -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantID:*']), -# 'tenantID:*') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']), -# 'tenantNAME:*') -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:*']), -# '*:*') - -# def test_cross_tenant_authorization_failure(self): -# self.assertEqual( -# self.test_auth._authorize_cross_tenant( -# 'userID', 'userA', 'tenantID', 'tenantNAME', -# ['tenantXYZ:userA']), -# None) - def test_delete_own_account_not_allowed(self): roles = self.test_auth.operator_roles.split(',') identity = self._get_identity(roles=roles) @@ -520,5 +471,62 @@ class TestAuthorize(unittest.TestCase): env={'REQUEST_METHOD': 'DELETE'}) self.assertEqual(bool(req.environ.get('swift_owner')), True) + +class TestAclCheckCrossTenant(unittest.TestCase): + def setUp(self): + self.cross_tenant_check = AclCheck._authorize_cross_tenant + + def test_cross_tenant_authorization_success(self): + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', + ['tenantID:userA']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', + ['tenantNAME:userA']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userA']), + True) + + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', + ['tenantID:userID']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', + ['tenantNAME:userID']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userID']), + True) + + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantID:*']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']), + True) + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:*']), + True) + + def test_cross_tenant_authorization_failure(self): + self.assertEqual( + self.cross_tenant_check( + 'userID', 'userA', 'tenantID', 'tenantNAME', + ['tenantXYZ:userA']), + False) + + if __name__ == '__main__': unittest.main()