Merge "Modify implied roles to honor domain specific roles"
This commit is contained in:
commit
ddc439a278
@ -663,9 +663,28 @@ class Manager(manager.Manager):
|
||||
filter_results.append(ref)
|
||||
return filter_results
|
||||
|
||||
def _strip_domain_roles(self, role_refs):
|
||||
"""Post process assignment list for domain roles.
|
||||
|
||||
Domain roles are only designed to do the job of inferring other roles
|
||||
and since that has been done before this method is called, we need to
|
||||
remove any assignments that include a domain role.
|
||||
|
||||
"""
|
||||
def _role_is_global(role_id):
|
||||
ref = self.role_api.get_role(role_id)
|
||||
return (ref['domain_id'] is None)
|
||||
|
||||
filter_results = []
|
||||
for ref in role_refs:
|
||||
if _role_is_global(ref['role_id']):
|
||||
filter_results.append(ref)
|
||||
return filter_results
|
||||
|
||||
def _list_effective_role_assignments(self, role_id, user_id, group_id,
|
||||
domain_id, project_id, subtree_ids,
|
||||
inherited, source_from_group_ids):
|
||||
inherited, source_from_group_ids,
|
||||
strip_domain_roles):
|
||||
"""List role assignments in effective mode.
|
||||
|
||||
When using effective mode, besides the direct assignments, the indirect
|
||||
@ -822,6 +841,8 @@ class Manager(manager.Manager):
|
||||
ref, user_id, project_id, subtree_ids, expand_groups)
|
||||
|
||||
refs = self.add_implied_roles(refs)
|
||||
if strip_domain_roles:
|
||||
refs = self._strip_domain_roles(refs)
|
||||
if role_id:
|
||||
refs = self._filter_by_role_id(role_id, refs)
|
||||
|
||||
@ -854,7 +875,8 @@ class Manager(manager.Manager):
|
||||
domain_id=None, project_id=None,
|
||||
include_subtree=False, inherited=None,
|
||||
effective=None, include_names=False,
|
||||
source_from_group_ids=None):
|
||||
source_from_group_ids=None,
|
||||
strip_domain_roles=True):
|
||||
"""List role assignments, honoring effective mode and provided filters.
|
||||
|
||||
Returns a list of role assignments, where their attributes match the
|
||||
@ -886,6 +908,12 @@ class Manager(manager.Manager):
|
||||
difference between this and a group filter, other than it is a list of
|
||||
groups).
|
||||
|
||||
In effective mode, any domain specific roles are usually stripped from
|
||||
the returned assignments (since such roles are not placed in tokens).
|
||||
This stripping can be disabled by specifying strip_domain_roles=False,
|
||||
which is useful for internal calls like trusts which need to examine
|
||||
the full set of roles.
|
||||
|
||||
If OS-INHERIT extension is disabled or the used driver does not support
|
||||
inherited roles retrieval, inherited role assignments will be ignored.
|
||||
|
||||
@ -904,7 +932,8 @@ class Manager(manager.Manager):
|
||||
if effective:
|
||||
role_assignments = self._list_effective_role_assignments(
|
||||
role_id, user_id, group_id, domain_id, project_id,
|
||||
subtree_ids, inherited, source_from_group_ids)
|
||||
subtree_ids, inherited, source_from_group_ids,
|
||||
strip_domain_roles)
|
||||
else:
|
||||
role_assignments = self._list_direct_role_assignments(
|
||||
role_id, user_id, group_id, domain_id, project_id,
|
||||
|
@ -101,6 +101,13 @@ class AssignmentTestHelperMixin(object):
|
||||
{'project': {'project': 3}},
|
||||
{'project': {'project': 3}}]
|
||||
|
||||
# If the 'roles' entity count is defined as top level key in 'entities'
|
||||
# dict then these are global roles. If it is placed within the
|
||||
# 'domain' dict, then they will be domain specific roles. A mix of
|
||||
# domain specific and global roles are allowed, with the role index
|
||||
# being calculated in the order they are defined in the 'entities'
|
||||
# dict.
|
||||
|
||||
# A set of implied role specifications. In this case, prior role
|
||||
# index 0 implies role index 1, and role 1 implies roles 2 and 3.
|
||||
|
||||
@ -179,6 +186,10 @@ class AssignmentTestHelperMixin(object):
|
||||
test_data['projects'].append(
|
||||
_create_project(domain_id, parent_id))
|
||||
|
||||
def _create_role(self, domain_id=None):
|
||||
new_role = unit.new_role_ref(domain_id=domain_id)
|
||||
return self.role_api.create_role(new_role['id'], new_role)
|
||||
|
||||
def _handle_domain_spec(self, test_data, domain_spec):
|
||||
"""Handle the creation of domains and their contents.
|
||||
|
||||
@ -210,6 +221,8 @@ class AssignmentTestHelperMixin(object):
|
||||
elif entity_type == 'groups':
|
||||
new_entity = unit.new_group_ref(domain_id=domain_id)
|
||||
new_entity = self.identity_api.create_group(new_entity)
|
||||
elif entity_type == 'roles':
|
||||
new_entity = self._create_role(domain_id=domain_id)
|
||||
else:
|
||||
# Must be a bad test plan
|
||||
raise exception.NotImplemented()
|
||||
@ -252,10 +265,6 @@ class AssignmentTestHelperMixin(object):
|
||||
test_data['users'] = [user[0], user[1]....]
|
||||
|
||||
"""
|
||||
def _create_role():
|
||||
new_role = unit.new_role_ref()
|
||||
return self.role_api.create_role(new_role['id'], new_role)
|
||||
|
||||
test_data = {}
|
||||
for entity in ['users', 'groups', 'domains', 'projects', 'roles']:
|
||||
test_data[entity] = []
|
||||
@ -268,7 +277,7 @@ class AssignmentTestHelperMixin(object):
|
||||
# Create any roles requested
|
||||
if 'roles' in entity_pattern:
|
||||
for _ in range(entity_pattern['roles']):
|
||||
test_data['roles'].append(_create_role())
|
||||
test_data['roles'].append(self._create_role())
|
||||
|
||||
return test_data
|
||||
|
||||
@ -7192,6 +7201,35 @@ class ImpliedRoleTests(AssignmentTestHelperMixin):
|
||||
self.config_fixture.config(group='os_inherit', enabled=True)
|
||||
self.execute_assignment_plan(test_plan)
|
||||
|
||||
def test_role_assignments_domain_specific_with_implied_roles(self):
|
||||
test_plan = {
|
||||
'entities': {'domains': {'users': 1, 'projects': 1, 'roles': 2},
|
||||
'roles': 2},
|
||||
# Two level tree of implied roles, with the top and 1st level being
|
||||
# domain specific roles, and the bottom level being infered global
|
||||
# roles.
|
||||
'implied_roles': [{'role': 0, 'implied_roles': [1]},
|
||||
{'role': 1, 'implied_roles': [2, 3]}],
|
||||
'assignments': [{'user': 0, 'role': 0, 'project': 0}],
|
||||
'tests': [
|
||||
# List all direct assignments for user[0], this should just
|
||||
# show the one top level role assignment, even though this is a
|
||||
# domain specific role (since we are in non-effective mode and
|
||||
# we show any direct role assignment in that mode).
|
||||
{'params': {'user': 0},
|
||||
'results': [{'user': 0, 'role': 0, 'project': 0}]},
|
||||
# Now the effective ones - so the implied roles should be
|
||||
# expanded out, as well as any domain specific roles should be
|
||||
# removed.
|
||||
{'params': {'user': 0, 'effective': True},
|
||||
'results': [{'user': 0, 'role': 2, 'project': 0,
|
||||
'indirect': {'role': 1}},
|
||||
{'user': 0, 'role': 3, 'project': 0,
|
||||
'indirect': {'role': 1}}]},
|
||||
]
|
||||
}
|
||||
self.execute_assignment_plan(test_plan)
|
||||
|
||||
|
||||
class FilterTests(filtering.FilterTests):
|
||||
def test_list_entities_filtered(self):
|
||||
|
@ -2644,6 +2644,47 @@ class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin,
|
||||
for role in self.role_list:
|
||||
self.assertIn(role, token['roles'])
|
||||
|
||||
def test_trusts_from_domain_specific_implied_role(self):
|
||||
self._create_three_roles()
|
||||
# Overwrite the first role with a domain specific role
|
||||
role = unit.new_role_ref(domain_id=self.domain_id)
|
||||
self.role_list[0] = self.role_api.create_role(role['id'], role)
|
||||
self._create_implied_role(self.role_list[0], self.role_list[1])
|
||||
self._create_implied_role(self.role_list[1], self.role_list[2])
|
||||
self._assign_top_role_to_user_on_project(self.user, self.project)
|
||||
|
||||
# Create a trustee and assign the prior role to her
|
||||
trustee = unit.create_user(self.identity_api, domain_id=self.domain_id)
|
||||
ref = unit.new_trust_ref(
|
||||
trustor_user_id=self.user['id'],
|
||||
trustee_user_id=trustee['id'],
|
||||
project_id=self.project['id'],
|
||||
role_ids=[self.role_list[0]['id']])
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
trust = r.result['trust']
|
||||
|
||||
# Only the role that was specified is in the trust, NOT implied roles
|
||||
self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id'])
|
||||
self.assertThat(trust['roles'], matchers.HasLength(1))
|
||||
|
||||
# Authenticate as the trustee
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=trustee['id'],
|
||||
password=trustee['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.v3_create_token(auth_data)
|
||||
token = r.result['token']
|
||||
|
||||
# The token should have the roles implies by the domain specific role,
|
||||
# but not the domain specific role itself.
|
||||
self.assertThat(token['roles'],
|
||||
matchers.HasLength(len(self.role_list) - 1))
|
||||
for role in token['roles']:
|
||||
self.assertIn(role, self.role_list)
|
||||
for role in [self.role_list[1], self.role_list[2]]:
|
||||
self.assertIn(role, token['roles'])
|
||||
self.assertNotIn(self.role_list[0], token['roles'])
|
||||
|
||||
|
||||
class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -481,9 +481,9 @@ class TokenAPITests(object):
|
||||
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
|
||||
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
|
||||
|
||||
def _create_role(self):
|
||||
def _create_role(self, domain_id=None):
|
||||
"""Call ``POST /roles``."""
|
||||
ref = unit.new_role_ref()
|
||||
ref = unit.new_role_ref(domain_id=domain_id)
|
||||
r = self.post('/roles', body={'role': ref})
|
||||
return self.assertValidRoleResponse(r, ref)
|
||||
|
||||
@ -666,6 +666,25 @@ class TokenAPITests(object):
|
||||
token_roles = self._get_scoped_token_roles()
|
||||
self.assertEqual(2, len(token_roles))
|
||||
|
||||
def test_domain_scpecific_roles_do_not_show_v3_token(self):
|
||||
self.config_fixture.config(group='token', infer_roles=True)
|
||||
initial_token_roles = self._get_scoped_token_roles()
|
||||
|
||||
new_role = self._create_role(domain_id=self.domain_id)
|
||||
self.assignment_api.create_grant(new_role['id'],
|
||||
user_id=self.user['id'],
|
||||
project_id=self.project['id'])
|
||||
implied = self._create_implied_role(new_role['id'])
|
||||
|
||||
token_roles = self._get_scoped_token_roles()
|
||||
self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
|
||||
|
||||
# The implied role from the domain specific role should be in the
|
||||
# token, but not the domain specific role itself.
|
||||
token_role_ids = [role['id'] for role in token_roles]
|
||||
self.assertIn(implied['id'], token_role_ids)
|
||||
self.assertNotIn(new_role['id'], token_role_ids)
|
||||
|
||||
|
||||
class TokenDataTests(object):
|
||||
"""Test the data in specific token types."""
|
||||
|
@ -396,24 +396,39 @@ class V3TokenDataHelper(object):
|
||||
token_domain_id = domain_id
|
||||
|
||||
if token_domain_id or token_project_id:
|
||||
roles = self._get_roles_for_user(token_user_id,
|
||||
token_domain_id,
|
||||
token_project_id)
|
||||
filtered_roles = []
|
||||
if CONF.trust.enabled and trust:
|
||||
# First expand out any roles that were in the trust to include
|
||||
# any implied roles, whether global or domain specific
|
||||
refs = [{'role_id': role['id']} for role in trust['roles']]
|
||||
effective_roles = self.assignment_api.add_implied_roles(refs)
|
||||
for trust_role in effective_roles:
|
||||
effective_trust_roles = (
|
||||
self.assignment_api.add_implied_roles(refs))
|
||||
# Now get the current role assignments for the trustor,
|
||||
# including any domain specific roles.
|
||||
assignment_list = self.assignment_api.list_role_assignments(
|
||||
user_id=token_user_id,
|
||||
project_id=token_project_id,
|
||||
effective=True, strip_domain_roles=False)
|
||||
current_effective_trustor_roles = (
|
||||
list(set([x['role_id'] for x in assignment_list])))
|
||||
# Go through each of the effective trust roles, making sure the
|
||||
# trustor still has them, if any have been removed, then we
|
||||
# will treat the trust as invalid
|
||||
for trust_role in effective_trust_roles:
|
||||
|
||||
match_roles = [x for x in roles
|
||||
if x['id'] == trust_role['role_id']]
|
||||
match_roles = [x for x in current_effective_trustor_roles
|
||||
if x == trust_role['role_id']]
|
||||
if match_roles:
|
||||
filtered_roles.append(match_roles[0])
|
||||
role = self.role_api.get_role(match_roles[0])
|
||||
if role['domain_id'] is None:
|
||||
filtered_roles.append(role)
|
||||
else:
|
||||
raise exception.Forbidden(
|
||||
_('Trustee has no delegated roles.'))
|
||||
else:
|
||||
for role in roles:
|
||||
for role in self._get_roles_for_user(token_user_id,
|
||||
token_domain_id,
|
||||
token_project_id):
|
||||
filtered_roles.append({'id': role['id'],
|
||||
'name': role['name']})
|
||||
|
||||
|
@ -39,8 +39,8 @@ def _admin_trustor_only(context, trust, user_id):
|
||||
raise exception.Forbidden()
|
||||
|
||||
|
||||
@dependency.requires('assignment_api', 'identity_api', 'role_api',
|
||||
'token_provider_api', 'trust_api')
|
||||
@dependency.requires('assignment_api', 'identity_api', 'resource_api',
|
||||
'role_api', 'token_provider_api', 'trust_api')
|
||||
class TrustV3(controller.V3Controller):
|
||||
collection_name = "trusts"
|
||||
member_name = "trust"
|
||||
@ -175,9 +175,13 @@ class TrustV3(controller.V3Controller):
|
||||
original_trust['redelegated_trust_id'])
|
||||
|
||||
if not self._attribute_is_empty(trust, 'project_id'):
|
||||
return self.assignment_api.get_roles_for_user_and_project(
|
||||
original_trust['trustor_user_id'],
|
||||
original_trust['project_id'])
|
||||
self.resource_api.get_project(original_trust['project_id'])
|
||||
# Get a list of roles including any domain specific roles
|
||||
assignment_list = self.assignment_api.list_role_assignments(
|
||||
user_id=original_trust['trustor_user_id'],
|
||||
project_id=original_trust['project_id'],
|
||||
effective=True, strip_domain_roles=False)
|
||||
return list(set([x['role_id'] for x in assignment_list]))
|
||||
else:
|
||||
return []
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user