From a16287af5b7761c8453b2a8e278d78652497377c Mon Sep 17 00:00:00 2001 From: Henry Nash Date: Mon, 28 Dec 2015 21:31:34 +0000 Subject: [PATCH] Modify rules in the v3 policy sample for domain specifc roles Currently, for global roles, cloud admin has full CRUD permissions for roles, although a domain or project admin can read roles (i.e. Get or List). This remains the case. For domain specific roles, in addition to cloud admin, the domain admin also has full CRUD permissions for the domain specific roles of their own domain (but no permissions to see any domain specific roles from other domains). In addition, a project admin can read (i.e. Get or List) the domain specific roles from their domain (but again no permissions to see any domain specific roles from other domains). Partially Implements: blueprint domain-specific-roles Change-Id: I53499f164bfa4d3e65b70b9586b6fe0d71b60f41 --- doc/source/configuration.rst | 1 + doc/source/policy_mapping.rst | 6 + etc/policy.json | 5 + etc/policy.v3cloudsample.json | 13 +++ keystone/assignment/controllers.py | 130 +++++++++++++++++++--- keystone/assignment/routers.py | 2 +- keystone/common/router.py | 20 ++-- keystone/tests/unit/test_v3_protection.py | 117 ++++++++++++++++++- 8 files changed, 261 insertions(+), 33 deletions(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 90e1bbdf53..0620ea89d7 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -1356,6 +1356,7 @@ are filtered out (e.g. user passwords). List of object attributes: * role: + * target.role.domain_id * target.role.id * target.role.name diff --git a/doc/source/policy_mapping.rst b/doc/source/policy_mapping.rst index c67c322234..2d3cd60a3f 100644 --- a/doc/source/policy_mapping.rst +++ b/doc/source/policy_mapping.rst @@ -73,6 +73,12 @@ identity:create_role POST /v3/roles identity:update_role PATCH /v3/roles/{role_id} identity:delete_role DELETE /v3/roles/{role_id} +identity:get_domain_role GET /v3/roles/{role_id} where role.domain_id is not null +identity:list_domain_roles GET /v3/roles?domain_id where role.domain_id is not null +identity:create_domain_role POST /v3/roles where role.domain_id is not null +identity:update_domain_role PATCH /v3/roles/{role_id} where role.domain_id is not null +identity:delete_domain_role DELETE /v3/roles/{role_id} where role.domain_id is not null + identity:get_implied_role GET /v3/roles/{prior_role_id}/implies/{implied_role_id} identity:list_implied_roles GET /v3/roles/{prior_role_id}/implies identity:create_implied_role PUT /v3/roles/{prior_role_id}/implies/{implied_role_id} diff --git a/etc/policy.json b/etc/policy.json index 79df5e7c3a..797af24d61 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -75,6 +75,11 @@ "identity:create_role": "rule:admin_required", "identity:update_role": "rule:admin_required", "identity:delete_role": "rule:admin_required", + "identity:get_domain_role": "rule:admin_required", + "identity:list_domain_roles": "rule:admin_required", + "identity:create_domain_role": "rule:admin_required", + "identity:update_domain_role": "rule:admin_required", + "identity:delete_domain_role": "rule:admin_required", "identity:get_implied_role": "rule:admin_required ", "identity:list_implied_roles": "rule:admin_required", diff --git a/etc/policy.v3cloudsample.json b/etc/policy.v3cloudsample.json index bd44fd5c83..dc606809f1 100644 --- a/etc/policy.v3cloudsample.json +++ b/etc/policy.v3cloudsample.json @@ -81,6 +81,19 @@ "identity:update_role": "rule:cloud_admin", "identity:delete_role": "rule:cloud_admin", + "identity:get_domain_role": "rule:cloud_admin or rule:get_domain_roles", + "identity:list_domain_roles": "rule:cloud_admin or rule:list_domain_roles", + "identity:create_domain_role": "rule:cloud_admin or rule:domain_admin_matches_domain_role", + "identity:update_domain_role": "rule:cloud_admin or rule:domain_admin_matches_target_domain_role", + "identity:delete_domain_role": "rule:cloud_admin or rule:domain_admin_matches_target_domain_role", + "domain_admin_matches_domain_role": "rule:admin_required and domain_id:%(role.domain_id)s", + "get_domain_roles": "rule:domain_admin_matches_target_domain_role or rule:project_admin_matches_target_domain_role", + "domain_admin_matches_target_domain_role": "rule:admin_required and domain_id:%(target.role.domain_id)s", + "project_admin_matches_target_domain_role": "rule:admin_required and project_domain_id:%(target.role.domain_id)s", + "list_domain_roles": "rule:domain_admin_matches_filter_on_list_domain_roles or rule:project_admin_matches_filter_on_list_domain_roles", + "domain_admin_matches_filter_on_list_domain_roles": "rule:admin_required and domain_id:%(domain_id)s", + "project_admin_matches_filter_on_list_domain_roles": "rule:admin_required and project_domain_id:%(domain_id)s", + "identity:get_implied_role": "rule:cloud_admin", "identity:list_implied_roles": "rule:cloud_admin", "identity:create_implied_role": "rule:cloud_admin", diff --git a/keystone/assignment/controllers.py b/keystone/assignment/controllers.py index ebdba485e8..b161b6496d 100644 --- a/keystone/assignment/controllers.py +++ b/keystone/assignment/controllers.py @@ -277,7 +277,19 @@ class ProjectAssignmentV3(controller.V3Controller): @dependency.requires('role_api') class RoleV3(controller.V3Controller): - """The V3 Role CRUD APIs.""" + """The V3 Role CRUD APIs. + + To ease complexity (and hence risk) in writing the policy rules for the + role APIs, we create separate policy actions for roles that are domain + specific, as opposed to those that are global. In order to achieve this + each of the role API methods has a wrapper method that checks to see if the + role is global or domain specific. + + NOTE (henry-nash): If this separate global vs scoped policy action pattern + becomes repeated for other entities, we should consider encapsulating this + into a specialized router class. + + """ collection_name = 'roles' member_name = 'role' @@ -286,9 +298,104 @@ class RoleV3(controller.V3Controller): super(RoleV3, self).__init__() self.get_member_from_driver = self.role_api.get_role + def _is_domain_role(self, role): + return role.get('domain_id') is not None + + def _is_domain_role_target(self, role_id): + try: + role = self.role_api.get_role(role_id) + except exception.RoleNotFound: + # We hide this error since we have not yet carried out a policy + # check - and it maybe that the caller isn't authorized to make + # this call. If so, we want that error to be raised instead. + return False + return self._is_domain_role(role) + + def create_role_wrapper(self, context, role): + if self._is_domain_role(role): + return self.create_domain_role(context, role=role) + else: + return self.create_role(context, role=role) + @controller.protected() @validation.validated(schema.role_create, 'role') def create_role(self, context, role): + return self._create_role(context, role) + + @controller.protected() + @validation.validated(schema.role_create, 'role') + def create_domain_role(self, context, role): + return self._create_role(context, role) + + def list_roles_wrapper(self, context): + # If there is no domain_id filter defined, then we only want to return + # global roles, so we set the domain_id filter to None. + params = context['query_string'] + if 'domain_id' not in params: + context['query_string']['domain_id'] = None + + if context['query_string']['domain_id'] is not None: + return self.list_domain_roles(context) + else: + return self.list_roles(context) + + @controller.filterprotected('name', 'domain_id') + def list_roles(self, context, filters): + return self._list_roles(context, filters) + + @controller.filterprotected('name', 'domain_id') + def list_domain_roles(self, context, filters): + return self._list_roles(context, filters) + + def get_role_wrapper(self, context, role_id): + if self._is_domain_role_target(role_id): + return self.get_domain_role(context, role_id=role_id) + else: + return self.get_role(context, role_id=role_id) + + @controller.protected() + def get_role(self, context, role_id): + return self._get_role(context, role_id) + + @controller.protected() + def get_domain_role(self, context, role_id): + return self._get_role(context, role_id) + + def update_role_wrapper(self, context, role_id, role): + # Since we don't allow you change whether a role is global or domain + # specific, we can ignore the new update attributes and just look at + # the existing role. + if self._is_domain_role_target(role_id): + return self.update_domain_role( + context, role_id=role_id, role=role) + else: + return self.update_role(context, role_id=role_id, role=role) + + @controller.protected() + @validation.validated(schema.role_update, 'role') + def update_role(self, context, role_id, role): + return self._update_role(context, role_id, role) + + @controller.protected() + @validation.validated(schema.role_update, 'role') + def update_domain_role(self, context, role_id, role): + return self._update_role(context, role_id, role) + + def delete_role_wrapper(self, context, role_id): + if self._is_domain_role_target(role_id): + return self.delete_domain_role(context, role_id=role_id) + else: + return self.delete_role(context, role_id=role_id) + + @controller.protected() + def delete_role(self, context, role_id): + return self._delete_role(context, role_id) + + @controller.protected() + def delete_domain_role(self, context, role_id): + return self._delete_role(context, role_id) + + def _create_role(self, context, role): if role['name'] == CONF.member_role_name: # Use the configured member role ID when creating the configured # member role name. This avoids the potential of creating a @@ -303,36 +410,23 @@ class RoleV3(controller.V3Controller): ref = self.role_api.create_role(ref['id'], ref, initiator) return RoleV3.wrap_member(context, ref) - @controller.filterprotected('name', 'domain_id') - def list_roles(self, context, filters): + def _list_roles(self, context, filters): hints = RoleV3.build_driver_hints(context, filters) refs = self.role_api.list_roles( hints=hints) return RoleV3.wrap_collection(context, refs, hints=hints) - def list_roles_wrapper(self, context): - # If there is no domain_id filter defined, then we only want to return - # global roles, so we set the domain_id filter to None. - params = context['query_string'] - if 'domain_id' not in params: - context['query_string']['domain_id'] = None - return self.list_roles(context) - - @controller.protected() - def get_role(self, context, role_id): + def _get_role(self, context, role_id): ref = self.role_api.get_role(role_id) return RoleV3.wrap_member(context, ref) - @controller.protected() - @validation.validated(schema.role_update, 'role') - def update_role(self, context, role_id, role): + def _update_role(self, context, role_id, role): self._require_matching_id(role_id, role) initiator = notifications._get_request_audit_info(context) ref = self.role_api.update_role(role_id, role, initiator) return RoleV3.wrap_member(context, ref) - @controller.protected() - def delete_role(self, context, role_id): + def _delete_role(self, context, role_id): initiator = notifications._get_request_audit_info(context) self.role_api.delete_role(role_id, initiator) diff --git a/keystone/assignment/routers.py b/keystone/assignment/routers.py index d1cdd5ee4e..9bef401eb2 100644 --- a/keystone/assignment/routers.py +++ b/keystone/assignment/routers.py @@ -72,7 +72,7 @@ class Routers(wsgi.RoutersBase): routers.append( router.Router(controllers.RoleV3(), 'roles', 'role', resource_descriptions=self.v3_resources, - overrides={'list': 'list_roles_wrapper'})) + method_template='%s_wrapper')) implied_roles_controller = controllers.ImpliedRolesV3() self._add_resource( diff --git a/keystone/common/router.py b/keystone/common/router.py index 7e77a86f4b..74e03ad26e 100644 --- a/keystone/common/router.py +++ b/keystone/common/router.py @@ -20,21 +20,15 @@ class Router(wsgi.ComposableRouter): def __init__(self, controller, collection_key, key, resource_descriptions=None, is_entity_implemented=True, - overrides=None): + method_template=None): self.controller = controller self.key = key self.collection_key = collection_key self._resource_descriptions = resource_descriptions self._is_entity_implemented = is_entity_implemented - self.overrides = overrides + self.method_template = method_template or '%s' def add_routes(self, mapper): - def _assign_action(action, key, overrides): - if overrides is not None and action in overrides: - return overrides[action] - else: - return '%(action)s_%(key)s' % {'action': action, 'key': key} - collection_path = '/%(collection_key)s' % { 'collection_key': self.collection_key} entity_path = '/%(collection_key)s/{%(key)s_id}' % { @@ -44,27 +38,27 @@ class Router(wsgi.ComposableRouter): mapper.connect( collection_path, controller=self.controller, - action=_assign_action('create', self.key, self.overrides), + action=self.method_template % 'create_%s' % self.key, conditions=dict(method=['POST'])) mapper.connect( collection_path, controller=self.controller, - action=_assign_action('list', self.collection_key, self.overrides), + action=self.method_template % 'list_%s' % self.collection_key, conditions=dict(method=['GET'])) mapper.connect( entity_path, controller=self.controller, - action=_assign_action('get', self.key, self.overrides), + action=self.method_template % 'get_%s' % self.key, conditions=dict(method=['GET'])) mapper.connect( entity_path, controller=self.controller, - action=_assign_action('update', self.key, self.overrides), + action=self.method_template % 'update_%s' % self.key, conditions=dict(method=['PATCH'])) mapper.connect( entity_path, controller=self.controller, - action=_assign_action('delete', self.key, self.overrides), + action=self.method_template % 'delete_%s' % self.key, conditions=dict(method=['DELETE'])) # Add the collection resource and entity resource to the resource diff --git a/keystone/tests/unit/test_v3_protection.py b/keystone/tests/unit/test_v3_protection.py index 5e6654d00d..aa325d27ad 100644 --- a/keystone/tests/unit/test_v3_protection.py +++ b/keystone/tests/unit/test_v3_protection.py @@ -620,13 +620,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.domain_admin_user = unit.create_user( self.identity_api, domain_id=self.domainA['id']) + self.domainB_admin_user = unit.create_user( + self.identity_api, + domain_id=self.domainB['id']) self.project_admin_user = unit.create_user( self.identity_api, domain_id=self.domainA['id']) + self.project_adminB_user = unit.create_user( + self.identity_api, + domain_id=self.domainB['id']) - # The admin role and another plain role + # The admin role, a domain specific role and another plain role self.admin_role = unit.new_role_ref(name='admin') self.role_api.create_role(self.admin_role['id'], self.admin_role) + self.roleA = unit.new_role_ref(domain_id=self.domainA['id']) + self.role_api.create_role(self.roleA['id'], self.roleA) self.role = unit.new_role_ref() self.role_api.create_role(self.role['id'], self.role) @@ -642,13 +650,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.assignment_api.create_grant(self.role['id'], user_id=self.just_a_user['id'], domain_id=self.domainA['id']) + self.assignment_api.create_grant(self.admin_role['id'], + user_id=self.domainB_admin_user['id'], + domain_id=self.domainB['id']) # Create and assign roles to the project self.project = unit.new_project_ref(domain_id=self.domainA['id']) self.resource_api.create_project(self.project['id'], self.project) + self.projectB = unit.new_project_ref(domain_id=self.domainB['id']) + self.resource_api.create_project(self.projectB['id'], self.projectB) self.assignment_api.create_grant(self.admin_role['id'], user_id=self.project_admin_user['id'], project_id=self.project['id']) + self.assignment_api.create_grant( + self.admin_role['id'], user_id=self.project_adminB_user['id'], + project_id=self.projectB['id']) self.assignment_api.create_grant(self.role['id'], user_id=self.just_a_user['id'], project_id=self.project['id']) @@ -768,6 +784,33 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.post('/roles', auth=self.auth, body={'role': role_ref}, expected_status=status_created) + def _domain_role_management_cases(self, domain_id, read_status_OK=False, + expected=None): + # Set the different status values for different types of call depending + # on whether we expect the calls to fail or not. + status_OK, status_created, status_no_data = self._stati(expected) + entity_url = '/roles/%s' % self.roleA['id'] + list_url = '/roles?domain_id=%s' % domain_id + + if read_status_OK: + self.get(entity_url, auth=self.auth) + self.get(list_url, auth=self.auth) + else: + self.get(entity_url, auth=self.auth, + expected_status=status_OK) + self.get(list_url, auth=self.auth, + expected_status=status_OK) + + role = {'name': 'Updated'} + self.patch(entity_url, auth=self.auth, body={'role': role}, + expected_status=status_OK) + self.delete(entity_url, auth=self.auth, + expected_status=status_no_data) + + role_ref = unit.new_role_ref(domain_id=domain_id) + self.post('/roles', auth=self.auth, body={'role': role_ref}, + expected_status=status_created) + def test_user_management(self): # First, authenticate with a user that does not have the domain # admin role - shouldn't be able to do much. @@ -1555,3 +1598,75 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, project_id=self.admin_project['id']) self._role_management_cases() + + def test_domain_role_management_no_admin_no_rights(self): + # A non-admin domain user shouldn't be able to manipulate domain roles + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + domain_id=self.domainA['id']) + + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...and nor should non-admin project user + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + project_id=self.project['id']) + + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + def test_domain_role_management_with_cloud_admin(self): + # A cloud admin user should have rights to manipulate domain roles + self.auth = self.build_authentication_request( + user_id=self.cloud_admin_user['id'], + password=self.cloud_admin_user['password'], + project_id=self.admin_project['id']) + + self._domain_role_management_cases(self.domainA['id']) + + def test_domain_role_management_with_domain_admin(self): + # A domain admin user should only be able to manipulate the domain + # specific roles in their own domain + self.auth = self.build_authentication_request( + user_id=self.domainB_admin_user['id'], + password=self.domainB_admin_user['password'], + domain_id=self.domainB['id']) + + # Try to access the domain specific roles in another domain + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...but they should be able to work with those in their own domain + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + self._domain_role_management_cases(self.domainA['id']) + + def test_domain_role_management_with_project_admin(self): + # A project admin user should have not access to domain specific roles + # in another domain. They should be able to get and list domain + # specific roles from their own domain, but not be able to create, + # update or delete them, + self.auth = self.build_authentication_request( + user_id=self.project_adminB_user['id'], + password=self.project_adminB_user['password'], + project_id=self.projectB['id']) + + # Try access the domain specific roless in another domain + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...but they should be ablet to work with those in their own domain + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + self._domain_role_management_cases( + self.domainA['id'], read_status_OK=True, + expected=exception.ForbiddenAction.code)