Policy based domain isolation can't be defined.

Policy based domain isolation is not possible on token
APIs due to lack of domain_id in policy_dict for API target

Closes-Bug: 1233874
Closes-Bug: 1251048

Change-Id: I855ec8ff4899ba3797a2e2bb23945ab4b23d2bea
This commit is contained in:
Arvind Tiwari 2013-10-08 15:16:11 -06:00
parent 576f5d99ea
commit 19620076f5
4 changed files with 64 additions and 6 deletions

View File

@ -3,8 +3,8 @@
"cloud_admin": "rule:admin_required and domain_id:admin_domain_id",
"service_role": "role:service",
"service_or_admin": "rule:admin_required or rule:service_role",
"owner" : "user_id:%(user_id)s or user_id:%(target.entity.user_id)s",
"admin_or_owner": "rule:admin_required or rule:owner",
"owner" : "user_id:%(user_id)s or user_id:%(target.token.user_id)s",
"admin_or_owner": "(rule:admin_required and domain_id:%(target.token.user.domain.id)s) or rule:owner",
"admin_or_cloud_admin": "rule:admin_required or rule:cloud_admin",
"default": "rule:admin_required",

View File

@ -277,6 +277,20 @@ class AuthInfo(object):
@dependency.requires('identity_api', 'token_provider_api')
class Auth(controller.V3Controller):
# Note(atiwari): From V3 auth controller code we are
# calling protection() wrappers, so we need to setup
# the member_name and collection_name attributes of
# auth controller code.
# In the absence of these attributes, default 'entity'
# string will be used to represent the target which is
# generic. Policy can be defined using 'entity' but it
# would not reflect the exact entity that is in context.
# We are defining collection_name = 'tokens' and
# member_name = 'token' to facilitate policy decisions.
collection_name = 'tokens'
member_name = 'token'
def __init__(self, *args, **kw):
super(Auth, self).__init__(*args, **kw)
config.setup_authentication()

View File

@ -147,6 +147,8 @@ def protected(callback=None):
ref = self.get_member_from_driver(kwargs[key])
policy_dict['target'] = {self.member_name: ref}
# TODO(henry-nash): Move this entire code to a member
# method inside v3 Auth
if context.get('subject_token_id') is not None:
token_ref = self.token_api.get_token(
context['subject_token_id'])
@ -154,6 +156,14 @@ def protected(callback=None):
policy_dict['target'].setdefault(self.member_name, {})
policy_dict['target'][self.member_name]['user_id'] = (
token_ref['user_id'])
if 'domain' in token_ref['user']:
policy_dict['target'][self.member_name].setdefault(
'user', {})
policy_dict['target'][self.member_name][
'user'].setdefault('domain', {})
policy_dict['target'][self.member_name]['user'][
'domain']['id'] = (
token_ref['user']['domain']['id'])
# Add in the kwargs, which means that any entity provided as a
# parameter for calls like create and update will be included.

View File

@ -363,12 +363,13 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
"""Test token revoke using v3 Identity API by token owner and admin."""
def setUp(self):
"""Setup for Test Cases.
One domain A
Two users userNormalA and userAdminA
Two domains, domainA and domainB
Two users in domainA, userNormalA and userAdminA
One user in domainB, userAdminB
"""
super(TestTokenRevokeSelfAndAdmin, self).setUp()
# DomainA setup
self.domainA = self.new_domain_ref()
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
@ -434,7 +435,7 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
self.head('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
def test_admin_revokes_user_token(self):
def test_adminA_revokes_userA_token(self):
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
@ -470,6 +471,39 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
self.head('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
def test_adminB_fails_revoking_userA_token(self):
# DomainB setup
self.domainB = self.new_domain_ref()
self.assignment_api.create_domain(self.domainB['id'], self.domainB)
self.userAdminB = self.new_user_ref(domain_id=self.domainB['id'])
self.userAdminB['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.userAdminB['id'], self.userAdminB)
self.assignment_api.create_grant(self.role1['id'],
user_id=self.userAdminB['id'],
domain_id=self.domainB['id'])
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userNormalA['id'],
password=self.userNormalA['password'],
user_domain_id=self.domainA['id']))
user_token = r.headers.get('X-Subject-Token')
headers = {'X-Subject-Token': user_token}
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userAdminB['id'],
password=self.userAdminB['password'],
domain_name=self.domainB['name']))
adminB_token = r.headers.get('X-Subject-Token')
self.head('/auth/tokens', headers=headers, expected_status=403,
token=adminB_token)
self.delete('/auth/tokens', headers=headers, expected_status=403,
token=adminB_token)
class TestTokenRevoking(test_v3.RestfulTestCase):
"""Test token revocation on the v3 Identity API."""