From a270766eb9c3f2074af550a72661a6a825e9975b Mon Sep 17 00:00:00 2001 From: Henry Nash Date: Sun, 3 Jan 2016 21:45:51 +0000 Subject: [PATCH] Modify implied roles to honor domain specific roles The logic for processing domain specific roles is the same as regular implied roles, except for the fact that domain specifc roles themselves should not be returned by the manager level list_role_assignments() in effective mode, hence ensuring that the won't be placed in the token. This patch makes the above changes, and adds tests to ensure this. A follow-on patch will update the policy rules for domain specific role assignment as well as provide release notes. Partially implements: blueprint domain-specific-roles Change-Id: I8c5d0dfd329a84a0525d625dc3b0c2c9325ab6bb --- keystone/assignment/core.py | 35 +++++++++++++++-- keystone/tests/unit/test_backend.py | 48 ++++++++++++++++++++--- keystone/tests/unit/test_v3_assignment.py | 41 +++++++++++++++++++ keystone/tests/unit/test_v3_auth.py | 23 ++++++++++- keystone/token/providers/common.py | 33 +++++++++++----- keystone/trust/controllers.py | 14 ++++--- 6 files changed, 170 insertions(+), 24 deletions(-) diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 74ddf57616..a64c5d3738 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -663,9 +663,28 @@ class Manager(manager.Manager): filter_results.append(ref) return filter_results + def _strip_domain_roles(self, role_refs): + """Post process assignment list for domain roles. + + Domain roles are only designed to do the job of inferring other roles + and since that has been done before this method is called, we need to + remove any assignments that include a domain role. + + """ + def _role_is_global(role_id): + ref = self.role_api.get_role(role_id) + return (ref['domain_id'] is None) + + filter_results = [] + for ref in role_refs: + if _role_is_global(ref['role_id']): + filter_results.append(ref) + return filter_results + def _list_effective_role_assignments(self, role_id, user_id, group_id, domain_id, project_id, subtree_ids, - inherited, source_from_group_ids): + inherited, source_from_group_ids, + strip_domain_roles): """List role assignments in effective mode. When using effective mode, besides the direct assignments, the indirect @@ -822,6 +841,8 @@ class Manager(manager.Manager): ref, user_id, project_id, subtree_ids, expand_groups) refs = self.add_implied_roles(refs) + if strip_domain_roles: + refs = self._strip_domain_roles(refs) if role_id: refs = self._filter_by_role_id(role_id, refs) @@ -854,7 +875,8 @@ class Manager(manager.Manager): domain_id=None, project_id=None, include_subtree=False, inherited=None, effective=None, include_names=False, - source_from_group_ids=None): + source_from_group_ids=None, + strip_domain_roles=True): """List role assignments, honoring effective mode and provided filters. Returns a list of role assignments, where their attributes match the @@ -886,6 +908,12 @@ class Manager(manager.Manager): difference between this and a group filter, other than it is a list of groups). + In effective mode, any domain specific roles are usually stripped from + the returned assignments (since such roles are not placed in tokens). + This stripping can be disabled by specifying strip_domain_roles=False, + which is useful for internal calls like trusts which need to examine + the full set of roles. + If OS-INHERIT extension is disabled or the used driver does not support inherited roles retrieval, inherited role assignments will be ignored. @@ -904,7 +932,8 @@ class Manager(manager.Manager): if effective: role_assignments = self._list_effective_role_assignments( role_id, user_id, group_id, domain_id, project_id, - subtree_ids, inherited, source_from_group_ids) + subtree_ids, inherited, source_from_group_ids, + strip_domain_roles) else: role_assignments = self._list_direct_role_assignments( role_id, user_id, group_id, domain_id, project_id, diff --git a/keystone/tests/unit/test_backend.py b/keystone/tests/unit/test_backend.py index c5d348553c..3b42d01b15 100644 --- a/keystone/tests/unit/test_backend.py +++ b/keystone/tests/unit/test_backend.py @@ -101,6 +101,13 @@ class AssignmentTestHelperMixin(object): {'project': {'project': 3}}, {'project': {'project': 3}}] + # If the 'roles' entity count is defined as top level key in 'entities' + # dict then these are global roles. If it is placed within the + # 'domain' dict, then they will be domain specific roles. A mix of + # domain specific and global roles are allowed, with the role index + # being calculated in the order they are defined in the 'entities' + # dict. + # A set of implied role specifications. In this case, prior role # index 0 implies role index 1, and role 1 implies roles 2 and 3. @@ -179,6 +186,10 @@ class AssignmentTestHelperMixin(object): test_data['projects'].append( _create_project(domain_id, parent_id)) + def _create_role(self, domain_id=None): + new_role = unit.new_role_ref(domain_id=domain_id) + return self.role_api.create_role(new_role['id'], new_role) + def _handle_domain_spec(self, test_data, domain_spec): """Handle the creation of domains and their contents. @@ -210,6 +221,8 @@ class AssignmentTestHelperMixin(object): elif entity_type == 'groups': new_entity = unit.new_group_ref(domain_id=domain_id) new_entity = self.identity_api.create_group(new_entity) + elif entity_type == 'roles': + new_entity = self._create_role(domain_id=domain_id) else: # Must be a bad test plan raise exception.NotImplemented() @@ -252,10 +265,6 @@ class AssignmentTestHelperMixin(object): test_data['users'] = [user[0], user[1]....] """ - def _create_role(): - new_role = unit.new_role_ref() - return self.role_api.create_role(new_role['id'], new_role) - test_data = {} for entity in ['users', 'groups', 'domains', 'projects', 'roles']: test_data[entity] = [] @@ -268,7 +277,7 @@ class AssignmentTestHelperMixin(object): # Create any roles requested if 'roles' in entity_pattern: for _ in range(entity_pattern['roles']): - test_data['roles'].append(_create_role()) + test_data['roles'].append(self._create_role()) return test_data @@ -7111,6 +7120,35 @@ class ImpliedRoleTests(AssignmentTestHelperMixin): self.config_fixture.config(group='os_inherit', enabled=True) self.execute_assignment_plan(test_plan) + def test_role_assignments_domain_specific_with_implied_roles(self): + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1, 'roles': 2}, + 'roles': 2}, + # Two level tree of implied roles, with the top and 1st level being + # domain specific roles, and the bottom level being infered global + # roles. + 'implied_roles': [{'role': 0, 'implied_roles': [1]}, + {'role': 1, 'implied_roles': [2, 3]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment, even though this is a + # domain specific role (since we are in non-effective mode and + # we show any direct role assignment in that mode). + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'project': 0}]}, + # Now the effective ones - so the implied roles should be + # expanded out, as well as any domain specific roles should be + # removed. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 2, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}]}, + ] + } + self.execute_assignment_plan(test_plan) + class FilterTests(filtering.FilterTests): def test_list_entities_filtered(self): diff --git a/keystone/tests/unit/test_v3_assignment.py b/keystone/tests/unit/test_v3_assignment.py index 882a8f6f26..a6dc08138f 100644 --- a/keystone/tests/unit/test_v3_assignment.py +++ b/keystone/tests/unit/test_v3_assignment.py @@ -2644,6 +2644,47 @@ class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin, for role in self.role_list: self.assertIn(role, token['roles']) + def test_trusts_from_domain_specific_implied_role(self): + self._create_three_roles() + # Overwrite the first role with a domain specific role + role = unit.new_role_ref(domain_id=self.domain_id) + self.role_list[0] = self.role_api.create_role(role['id'], role) + self._create_implied_role(self.role_list[0], self.role_list[1]) + self._create_implied_role(self.role_list[1], self.role_list[2]) + self._assign_top_role_to_user_on_project(self.user, self.project) + + # Create a trustee and assign the prior role to her + trustee = unit.create_user(self.identity_api, domain_id=self.domain_id) + ref = unit.new_trust_ref( + trustor_user_id=self.user['id'], + trustee_user_id=trustee['id'], + project_id=self.project['id'], + role_ids=[self.role_list[0]['id']]) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = r.result['trust'] + + # Only the role that was specified is in the trust, NOT implied roles + self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id']) + self.assertThat(trust['roles'], matchers.HasLength(1)) + + # Authenticate as the trustee + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id']) + r = self.v3_create_token(auth_data) + token = r.result['token'] + + # The token should have the roles implies by the domain specific role, + # but not the domain specific role itself. + self.assertThat(token['roles'], + matchers.HasLength(len(self.role_list) - 1)) + for role in token['roles']: + self.assertIn(role, self.role_list) + for role in [self.role_list[1], self.role_list[2]]: + self.assertIn(role, token['roles']) + self.assertNotIn(self.role_list[0], token['roles']) + class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase): def setUp(self): diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index 704b0aac88..5a45525443 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -481,9 +481,9 @@ class TokenAPITests(object): r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) - def _create_role(self): + def _create_role(self, domain_id=None): """Call ``POST /roles``.""" - ref = unit.new_role_ref() + ref = unit.new_role_ref(domain_id=domain_id) r = self.post('/roles', body={'role': ref}) return self.assertValidRoleResponse(r, ref) @@ -666,6 +666,25 @@ class TokenAPITests(object): token_roles = self._get_scoped_token_roles() self.assertEqual(2, len(token_roles)) + def test_domain_scpecific_roles_do_not_show_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + initial_token_roles = self._get_scoped_token_roles() + + new_role = self._create_role(domain_id=self.domain_id) + self.assignment_api.create_grant(new_role['id'], + user_id=self.user['id'], + project_id=self.project['id']) + implied = self._create_implied_role(new_role['id']) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(len(initial_token_roles) + 1, len(token_roles)) + + # The implied role from the domain specific role should be in the + # token, but not the domain specific role itself. + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(implied['id'], token_role_ids) + self.assertNotIn(new_role['id'], token_role_ids) + class TokenDataTests(object): """Test the data in specific token types.""" diff --git a/keystone/token/providers/common.py b/keystone/token/providers/common.py index 4243cc4c2a..2d125f3944 100644 --- a/keystone/token/providers/common.py +++ b/keystone/token/providers/common.py @@ -396,24 +396,39 @@ class V3TokenDataHelper(object): token_domain_id = domain_id if token_domain_id or token_project_id: - roles = self._get_roles_for_user(token_user_id, - token_domain_id, - token_project_id) filtered_roles = [] if CONF.trust.enabled and trust: + # First expand out any roles that were in the trust to include + # any implied roles, whether global or domain specific refs = [{'role_id': role['id']} for role in trust['roles']] - effective_roles = self.assignment_api.add_implied_roles(refs) - for trust_role in effective_roles: + effective_trust_roles = ( + self.assignment_api.add_implied_roles(refs)) + # Now get the current role assignments for the trustor, + # including any domain specific roles. + assignment_list = self.assignment_api.list_role_assignments( + user_id=token_user_id, + project_id=token_project_id, + effective=True, strip_domain_roles=False) + current_effective_trustor_roles = ( + list(set([x['role_id'] for x in assignment_list]))) + # Go through each of the effective trust roles, making sure the + # trustor still has them, if any have been removed, then we + # will treat the trust as invalid + for trust_role in effective_trust_roles: - match_roles = [x for x in roles - if x['id'] == trust_role['role_id']] + match_roles = [x for x in current_effective_trustor_roles + if x == trust_role['role_id']] if match_roles: - filtered_roles.append(match_roles[0]) + role = self.role_api.get_role(match_roles[0]) + if role['domain_id'] is None: + filtered_roles.append(role) else: raise exception.Forbidden( _('Trustee has no delegated roles.')) else: - for role in roles: + for role in self._get_roles_for_user(token_user_id, + token_domain_id, + token_project_id): filtered_roles.append({'id': role['id'], 'name': role['name']}) diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index 748a2f7c34..7f9cace759 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -43,8 +43,8 @@ def _admin_trustor_only(context, trust, user_id): raise exception.Forbidden() -@dependency.requires('assignment_api', 'identity_api', 'role_api', - 'token_provider_api', 'trust_api') +@dependency.requires('assignment_api', 'identity_api', 'resource_api', + 'role_api', 'token_provider_api', 'trust_api') class TrustV3(controller.V3Controller): collection_name = "trusts" member_name = "trust" @@ -179,9 +179,13 @@ class TrustV3(controller.V3Controller): original_trust['redelegated_trust_id']) if not self._attribute_is_empty(trust, 'project_id'): - return self.assignment_api.get_roles_for_user_and_project( - original_trust['trustor_user_id'], - original_trust['project_id']) + self.resource_api.get_project(original_trust['project_id']) + # Get a list of roles including any domain specific roles + assignment_list = self.assignment_api.list_role_assignments( + user_id=original_trust['trustor_user_id'], + project_id=original_trust['project_id'], + effective=True, strip_domain_roles=False) + return list(set([x['role_id'] for x in assignment_list])) else: return []