Merge "Add is_admin_project to policy dict"

This commit is contained in:
Jenkins 2016-10-14 15:59:07 +00:00 committed by Gerrit Code Review
commit dd9145e127
6 changed files with 74 additions and 0 deletions

View File

@ -109,4 +109,5 @@ def token_to_auth_context(token):
if token.is_federated_user:
auth_context['group_ids'] = token.federation_group_ids
auth_context['is_admin_project'] = token.is_admin_project
return auth_context

View File

@ -66,6 +66,9 @@ class Request(webob.Request):
# set is not yet known.
context['environment'] = self.environ
if self.context:
context['is_admin_project'] = self.context.is_admin_project
context.setdefault('is_admin', False)
return context

View File

@ -204,6 +204,9 @@ class AuthContextMiddleware(auth_token.BaseAuthProtocol):
request_context.user_domain_id = auth_context.get('user_domain_id')
request_context.roles = auth_context.get('roles')
is_admin_project = auth_context.get('is_admin_project', True)
request_context.is_admin_project = is_admin_project
project_domain_id = auth_context.get('project_domain_id')
request_context.project_domain_id = project_domain_id

View File

@ -194,6 +194,11 @@ class KeystoneToken(dict):
def scoped(self):
return self.project_scoped or self.domain_scoped
@property
def is_admin_project(self):
# True gets returned by default for compatibility with older versions
return self.get('is_admin_project', True)
@property
def trust_id(self):
return self.get('OS-TRUST:trust', {}).get('id')

View File

@ -64,6 +64,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
def load_sample_data(self):
self._populate_default_domain()
# Start by creating a couple of domains
self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
@ -72,6 +73,12 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.domainC = unit.new_domain_ref(enabled=False)
self.resource_api.create_domain(self.domainC['id'], self.domainC)
# Some projects in the domains
self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.projectA['id'], self.projectA)
self.projectB = unit.new_project_ref(domain_id=self.domainB['id'])
self.resource_api.create_project(self.projectB['id'], self.projectB)
# Now create some users, one in domainA and two of them in domainB
self.user1 = unit.create_user(self.identity_api,
domain_id=self.domainA['id'])
@ -93,6 +100,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.role_api.create_role(self.role['id'], self.role)
self.role1 = unit.new_role_ref()
self.role_api.create_role(self.role1['id'], self.role1)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
@ -102,6 +110,12 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assignment_api.create_grant(self.role1['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
project_id=self.projectA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.user2['id'],
project_id=self.projectB['id'])
def _get_id_list_from_ref_list(self, ref_list):
result_list = []
@ -130,6 +144,44 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
def test_list_users_admin_project(self):
self.config_fixture.config(
admin_project_name=self.projectA['name'],
admin_project_domain_name=self.domainA['name'],
group='resource')
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
project_id=self.projectA['id'])
rule = 'role:%s and is_admin_project:True' % self.role['name']
self._set_policy({"identity:list_users": rule})
r = self.get('/users', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('users'))
self.assertIn(self.user1['id'], id_list)
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
def test_list_users_not_in_admin_project(self):
self.config_fixture.config(
admin_project_name=self.projectA['name'],
admin_project_domain_name=self.domainA['name'],
group='resource')
self.auth = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.projectB['id'])
rule = 'role:%s and is_admin_project:True' % self.role['name']
self._set_policy({"identity:list_users": rule})
self.get('/users',
auth=self.auth,
expected_status=exception.ForbiddenAction.code)
def test_list_users_filtered_by_domain(self):
"""GET /users?domain_id=mydomain (filtered).

View File

@ -119,6 +119,9 @@ class TestKeystoneTokenModel(core.TestCase):
self.assertIsNone(token_data.audit_id)
self.assertIsNone(token_data.audit_chain_id)
# by default admin project is True
self.assertTrue(token_data.is_admin_project)
def test_token_model_v3_federated_user(self):
token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
token_data=self.v3_sample_token)
@ -158,3 +161,10 @@ class TestKeystoneTokenModel(core.TestCase):
token_model.KeystoneToken,
token_id=uuid.uuid4().hex,
token_data=self.v3_sample_token)
def test_token_model_is_admin_project(self):
token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
token_data=self.v3_sample_token)
token_data['is_admin_project'] = False
self.assertFalse(token_data.is_admin_project)