Modify implied roles to honor domain specific roles

The logic for processing domain specific roles is the same as
regular implied roles, except for the fact that domain specifc
roles themselves should not be returned by the manager level
list_role_assignments() in effective mode, hence ensuring that
the won't be placed in the token.

This patch makes the above changes, and adds tests to ensure this.

A follow-on patch will update the policy rules for domain specific
role assignment as well as provide release notes.

Partially implements: blueprint domain-specific-roles

Change-Id: I8c5d0dfd329a84a0525d625dc3b0c2c9325ab6bb
This commit is contained in:
Henry Nash 2016-01-03 21:45:51 +00:00
parent a16287af5b
commit a270766eb9
6 changed files with 170 additions and 24 deletions

View File

@ -663,9 +663,28 @@ class Manager(manager.Manager):
filter_results.append(ref)
return filter_results
def _strip_domain_roles(self, role_refs):
"""Post process assignment list for domain roles.
Domain roles are only designed to do the job of inferring other roles
and since that has been done before this method is called, we need to
remove any assignments that include a domain role.
"""
def _role_is_global(role_id):
ref = self.role_api.get_role(role_id)
return (ref['domain_id'] is None)
filter_results = []
for ref in role_refs:
if _role_is_global(ref['role_id']):
filter_results.append(ref)
return filter_results
def _list_effective_role_assignments(self, role_id, user_id, group_id,
domain_id, project_id, subtree_ids,
inherited, source_from_group_ids):
inherited, source_from_group_ids,
strip_domain_roles):
"""List role assignments in effective mode.
When using effective mode, besides the direct assignments, the indirect
@ -822,6 +841,8 @@ class Manager(manager.Manager):
ref, user_id, project_id, subtree_ids, expand_groups)
refs = self.add_implied_roles(refs)
if strip_domain_roles:
refs = self._strip_domain_roles(refs)
if role_id:
refs = self._filter_by_role_id(role_id, refs)
@ -854,7 +875,8 @@ class Manager(manager.Manager):
domain_id=None, project_id=None,
include_subtree=False, inherited=None,
effective=None, include_names=False,
source_from_group_ids=None):
source_from_group_ids=None,
strip_domain_roles=True):
"""List role assignments, honoring effective mode and provided filters.
Returns a list of role assignments, where their attributes match the
@ -886,6 +908,12 @@ class Manager(manager.Manager):
difference between this and a group filter, other than it is a list of
groups).
In effective mode, any domain specific roles are usually stripped from
the returned assignments (since such roles are not placed in tokens).
This stripping can be disabled by specifying strip_domain_roles=False,
which is useful for internal calls like trusts which need to examine
the full set of roles.
If OS-INHERIT extension is disabled or the used driver does not support
inherited roles retrieval, inherited role assignments will be ignored.
@ -904,7 +932,8 @@ class Manager(manager.Manager):
if effective:
role_assignments = self._list_effective_role_assignments(
role_id, user_id, group_id, domain_id, project_id,
subtree_ids, inherited, source_from_group_ids)
subtree_ids, inherited, source_from_group_ids,
strip_domain_roles)
else:
role_assignments = self._list_direct_role_assignments(
role_id, user_id, group_id, domain_id, project_id,

View File

@ -101,6 +101,13 @@ class AssignmentTestHelperMixin(object):
{'project': {'project': 3}},
{'project': {'project': 3}}]
# If the 'roles' entity count is defined as top level key in 'entities'
# dict then these are global roles. If it is placed within the
# 'domain' dict, then they will be domain specific roles. A mix of
# domain specific and global roles are allowed, with the role index
# being calculated in the order they are defined in the 'entities'
# dict.
# A set of implied role specifications. In this case, prior role
# index 0 implies role index 1, and role 1 implies roles 2 and 3.
@ -179,6 +186,10 @@ class AssignmentTestHelperMixin(object):
test_data['projects'].append(
_create_project(domain_id, parent_id))
def _create_role(self, domain_id=None):
new_role = unit.new_role_ref(domain_id=domain_id)
return self.role_api.create_role(new_role['id'], new_role)
def _handle_domain_spec(self, test_data, domain_spec):
"""Handle the creation of domains and their contents.
@ -210,6 +221,8 @@ class AssignmentTestHelperMixin(object):
elif entity_type == 'groups':
new_entity = unit.new_group_ref(domain_id=domain_id)
new_entity = self.identity_api.create_group(new_entity)
elif entity_type == 'roles':
new_entity = self._create_role(domain_id=domain_id)
else:
# Must be a bad test plan
raise exception.NotImplemented()
@ -252,10 +265,6 @@ class AssignmentTestHelperMixin(object):
test_data['users'] = [user[0], user[1]....]
"""
def _create_role():
new_role = unit.new_role_ref()
return self.role_api.create_role(new_role['id'], new_role)
test_data = {}
for entity in ['users', 'groups', 'domains', 'projects', 'roles']:
test_data[entity] = []
@ -268,7 +277,7 @@ class AssignmentTestHelperMixin(object):
# Create any roles requested
if 'roles' in entity_pattern:
for _ in range(entity_pattern['roles']):
test_data['roles'].append(_create_role())
test_data['roles'].append(self._create_role())
return test_data
@ -7111,6 +7120,35 @@ class ImpliedRoleTests(AssignmentTestHelperMixin):
self.config_fixture.config(group='os_inherit', enabled=True)
self.execute_assignment_plan(test_plan)
def test_role_assignments_domain_specific_with_implied_roles(self):
test_plan = {
'entities': {'domains': {'users': 1, 'projects': 1, 'roles': 2},
'roles': 2},
# Two level tree of implied roles, with the top and 1st level being
# domain specific roles, and the bottom level being infered global
# roles.
'implied_roles': [{'role': 0, 'implied_roles': [1]},
{'role': 1, 'implied_roles': [2, 3]}],
'assignments': [{'user': 0, 'role': 0, 'project': 0}],
'tests': [
# List all direct assignments for user[0], this should just
# show the one top level role assignment, even though this is a
# domain specific role (since we are in non-effective mode and
# we show any direct role assignment in that mode).
{'params': {'user': 0},
'results': [{'user': 0, 'role': 0, 'project': 0}]},
# Now the effective ones - so the implied roles should be
# expanded out, as well as any domain specific roles should be
# removed.
{'params': {'user': 0, 'effective': True},
'results': [{'user': 0, 'role': 2, 'project': 0,
'indirect': {'role': 1}},
{'user': 0, 'role': 3, 'project': 0,
'indirect': {'role': 1}}]},
]
}
self.execute_assignment_plan(test_plan)
class FilterTests(filtering.FilterTests):
def test_list_entities_filtered(self):

View File

@ -2644,6 +2644,47 @@ class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin,
for role in self.role_list:
self.assertIn(role, token['roles'])
def test_trusts_from_domain_specific_implied_role(self):
self._create_three_roles()
# Overwrite the first role with a domain specific role
role = unit.new_role_ref(domain_id=self.domain_id)
self.role_list[0] = self.role_api.create_role(role['id'], role)
self._create_implied_role(self.role_list[0], self.role_list[1])
self._create_implied_role(self.role_list[1], self.role_list[2])
self._assign_top_role_to_user_on_project(self.user, self.project)
# Create a trustee and assign the prior role to her
trustee = unit.create_user(self.identity_api, domain_id=self.domain_id)
ref = unit.new_trust_ref(
trustor_user_id=self.user['id'],
trustee_user_id=trustee['id'],
project_id=self.project['id'],
role_ids=[self.role_list[0]['id']])
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = r.result['trust']
# Only the role that was specified is in the trust, NOT implied roles
self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id'])
self.assertThat(trust['roles'], matchers.HasLength(1))
# Authenticate as the trustee
auth_data = self.build_authentication_request(
user_id=trustee['id'],
password=trustee['password'],
trust_id=trust['id'])
r = self.v3_create_token(auth_data)
token = r.result['token']
# The token should have the roles implies by the domain specific role,
# but not the domain specific role itself.
self.assertThat(token['roles'],
matchers.HasLength(len(self.role_list) - 1))
for role in token['roles']:
self.assertIn(role, self.role_list)
for role in [self.role_list[1], self.role_list[2]]:
self.assertIn(role, token['roles'])
self.assertNotIn(self.role_list[0], token['roles'])
class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase):
def setUp(self):

View File

@ -481,9 +481,9 @@ class TokenAPITests(object):
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
def _create_role(self):
def _create_role(self, domain_id=None):
"""Call ``POST /roles``."""
ref = unit.new_role_ref()
ref = unit.new_role_ref(domain_id=domain_id)
r = self.post('/roles', body={'role': ref})
return self.assertValidRoleResponse(r, ref)
@ -666,6 +666,25 @@ class TokenAPITests(object):
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
def test_domain_scpecific_roles_do_not_show_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
initial_token_roles = self._get_scoped_token_roles()
new_role = self._create_role(domain_id=self.domain_id)
self.assignment_api.create_grant(new_role['id'],
user_id=self.user['id'],
project_id=self.project['id'])
implied = self._create_implied_role(new_role['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
# The implied role from the domain specific role should be in the
# token, but not the domain specific role itself.
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(implied['id'], token_role_ids)
self.assertNotIn(new_role['id'], token_role_ids)
class TokenDataTests(object):
"""Test the data in specific token types."""

View File

@ -396,24 +396,39 @@ class V3TokenDataHelper(object):
token_domain_id = domain_id
if token_domain_id or token_project_id:
roles = self._get_roles_for_user(token_user_id,
token_domain_id,
token_project_id)
filtered_roles = []
if CONF.trust.enabled and trust:
# First expand out any roles that were in the trust to include
# any implied roles, whether global or domain specific
refs = [{'role_id': role['id']} for role in trust['roles']]
effective_roles = self.assignment_api.add_implied_roles(refs)
for trust_role in effective_roles:
effective_trust_roles = (
self.assignment_api.add_implied_roles(refs))
# Now get the current role assignments for the trustor,
# including any domain specific roles.
assignment_list = self.assignment_api.list_role_assignments(
user_id=token_user_id,
project_id=token_project_id,
effective=True, strip_domain_roles=False)
current_effective_trustor_roles = (
list(set([x['role_id'] for x in assignment_list])))
# Go through each of the effective trust roles, making sure the
# trustor still has them, if any have been removed, then we
# will treat the trust as invalid
for trust_role in effective_trust_roles:
match_roles = [x for x in roles
if x['id'] == trust_role['role_id']]
match_roles = [x for x in current_effective_trustor_roles
if x == trust_role['role_id']]
if match_roles:
filtered_roles.append(match_roles[0])
role = self.role_api.get_role(match_roles[0])
if role['domain_id'] is None:
filtered_roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
else:
for role in roles:
for role in self._get_roles_for_user(token_user_id,
token_domain_id,
token_project_id):
filtered_roles.append({'id': role['id'],
'name': role['name']})

View File

@ -43,8 +43,8 @@ def _admin_trustor_only(context, trust, user_id):
raise exception.Forbidden()
@dependency.requires('assignment_api', 'identity_api', 'role_api',
'token_provider_api', 'trust_api')
@dependency.requires('assignment_api', 'identity_api', 'resource_api',
'role_api', 'token_provider_api', 'trust_api')
class TrustV3(controller.V3Controller):
collection_name = "trusts"
member_name = "trust"
@ -179,9 +179,13 @@ class TrustV3(controller.V3Controller):
original_trust['redelegated_trust_id'])
if not self._attribute_is_empty(trust, 'project_id'):
return self.assignment_api.get_roles_for_user_and_project(
original_trust['trustor_user_id'],
original_trust['project_id'])
self.resource_api.get_project(original_trust['project_id'])
# Get a list of roles including any domain specific roles
assignment_list = self.assignment_api.list_role_assignments(
user_id=original_trust['trustor_user_id'],
project_id=original_trust['project_id'],
effective=True, strip_domain_roles=False)
return list(set([x['role_id'] for x in assignment_list]))
else:
return []