Modify rules in the v3 policy sample for domain specifc roles

Currently, for global roles, cloud admin has full CRUD permissions
for roles, although a domain or project admin can read roles
(i.e. Get or List). This remains the case.

For domain specific roles, in addition to cloud admin, the domain
admin also has full CRUD permissions for the domain specific roles
of their own domain (but no permissions to see any domain specific
roles from other domains). In addition, a project admin can read
(i.e. Get or List) the domain specific roles from their domain
(but again no permissions to see any domain specific roles from
other domains).

Partially Implements: blueprint domain-specific-roles

Change-Id: I53499f164bfa4d3e65b70b9586b6fe0d71b60f41
This commit is contained in:
Henry Nash 2015-12-28 21:31:34 +00:00
parent a320eaa903
commit a16287af5b
8 changed files with 261 additions and 33 deletions

View File

@ -1356,6 +1356,7 @@ are filtered out (e.g. user passwords).
List of object attributes:
* role:
* target.role.domain_id
* target.role.id
* target.role.name

View File

@ -73,6 +73,12 @@ identity:create_role POST /v3/roles
identity:update_role PATCH /v3/roles/{role_id}
identity:delete_role DELETE /v3/roles/{role_id}
identity:get_domain_role GET /v3/roles/{role_id} where role.domain_id is not null
identity:list_domain_roles GET /v3/roles?domain_id where role.domain_id is not null
identity:create_domain_role POST /v3/roles where role.domain_id is not null
identity:update_domain_role PATCH /v3/roles/{role_id} where role.domain_id is not null
identity:delete_domain_role DELETE /v3/roles/{role_id} where role.domain_id is not null
identity:get_implied_role GET /v3/roles/{prior_role_id}/implies/{implied_role_id}
identity:list_implied_roles GET /v3/roles/{prior_role_id}/implies
identity:create_implied_role PUT /v3/roles/{prior_role_id}/implies/{implied_role_id}

View File

@ -75,6 +75,11 @@
"identity:create_role": "rule:admin_required",
"identity:update_role": "rule:admin_required",
"identity:delete_role": "rule:admin_required",
"identity:get_domain_role": "rule:admin_required",
"identity:list_domain_roles": "rule:admin_required",
"identity:create_domain_role": "rule:admin_required",
"identity:update_domain_role": "rule:admin_required",
"identity:delete_domain_role": "rule:admin_required",
"identity:get_implied_role": "rule:admin_required ",
"identity:list_implied_roles": "rule:admin_required",

View File

@ -81,6 +81,19 @@
"identity:update_role": "rule:cloud_admin",
"identity:delete_role": "rule:cloud_admin",
"identity:get_domain_role": "rule:cloud_admin or rule:get_domain_roles",
"identity:list_domain_roles": "rule:cloud_admin or rule:list_domain_roles",
"identity:create_domain_role": "rule:cloud_admin or rule:domain_admin_matches_domain_role",
"identity:update_domain_role": "rule:cloud_admin or rule:domain_admin_matches_target_domain_role",
"identity:delete_domain_role": "rule:cloud_admin or rule:domain_admin_matches_target_domain_role",
"domain_admin_matches_domain_role": "rule:admin_required and domain_id:%(role.domain_id)s",
"get_domain_roles": "rule:domain_admin_matches_target_domain_role or rule:project_admin_matches_target_domain_role",
"domain_admin_matches_target_domain_role": "rule:admin_required and domain_id:%(target.role.domain_id)s",
"project_admin_matches_target_domain_role": "rule:admin_required and project_domain_id:%(target.role.domain_id)s",
"list_domain_roles": "rule:domain_admin_matches_filter_on_list_domain_roles or rule:project_admin_matches_filter_on_list_domain_roles",
"domain_admin_matches_filter_on_list_domain_roles": "rule:admin_required and domain_id:%(domain_id)s",
"project_admin_matches_filter_on_list_domain_roles": "rule:admin_required and project_domain_id:%(domain_id)s",
"identity:get_implied_role": "rule:cloud_admin",
"identity:list_implied_roles": "rule:cloud_admin",
"identity:create_implied_role": "rule:cloud_admin",

View File

@ -277,7 +277,19 @@ class ProjectAssignmentV3(controller.V3Controller):
@dependency.requires('role_api')
class RoleV3(controller.V3Controller):
"""The V3 Role CRUD APIs."""
"""The V3 Role CRUD APIs.
To ease complexity (and hence risk) in writing the policy rules for the
role APIs, we create separate policy actions for roles that are domain
specific, as opposed to those that are global. In order to achieve this
each of the role API methods has a wrapper method that checks to see if the
role is global or domain specific.
NOTE (henry-nash): If this separate global vs scoped policy action pattern
becomes repeated for other entities, we should consider encapsulating this
into a specialized router class.
"""
collection_name = 'roles'
member_name = 'role'
@ -286,9 +298,104 @@ class RoleV3(controller.V3Controller):
super(RoleV3, self).__init__()
self.get_member_from_driver = self.role_api.get_role
def _is_domain_role(self, role):
return role.get('domain_id') is not None
def _is_domain_role_target(self, role_id):
try:
role = self.role_api.get_role(role_id)
except exception.RoleNotFound:
# We hide this error since we have not yet carried out a policy
# check - and it maybe that the caller isn't authorized to make
# this call. If so, we want that error to be raised instead.
return False
return self._is_domain_role(role)
def create_role_wrapper(self, context, role):
if self._is_domain_role(role):
return self.create_domain_role(context, role=role)
else:
return self.create_role(context, role=role)
@controller.protected()
@validation.validated(schema.role_create, 'role')
def create_role(self, context, role):
return self._create_role(context, role)
@controller.protected()
@validation.validated(schema.role_create, 'role')
def create_domain_role(self, context, role):
return self._create_role(context, role)
def list_roles_wrapper(self, context):
# If there is no domain_id filter defined, then we only want to return
# global roles, so we set the domain_id filter to None.
params = context['query_string']
if 'domain_id' not in params:
context['query_string']['domain_id'] = None
if context['query_string']['domain_id'] is not None:
return self.list_domain_roles(context)
else:
return self.list_roles(context)
@controller.filterprotected('name', 'domain_id')
def list_roles(self, context, filters):
return self._list_roles(context, filters)
@controller.filterprotected('name', 'domain_id')
def list_domain_roles(self, context, filters):
return self._list_roles(context, filters)
def get_role_wrapper(self, context, role_id):
if self._is_domain_role_target(role_id):
return self.get_domain_role(context, role_id=role_id)
else:
return self.get_role(context, role_id=role_id)
@controller.protected()
def get_role(self, context, role_id):
return self._get_role(context, role_id)
@controller.protected()
def get_domain_role(self, context, role_id):
return self._get_role(context, role_id)
def update_role_wrapper(self, context, role_id, role):
# Since we don't allow you change whether a role is global or domain
# specific, we can ignore the new update attributes and just look at
# the existing role.
if self._is_domain_role_target(role_id):
return self.update_domain_role(
context, role_id=role_id, role=role)
else:
return self.update_role(context, role_id=role_id, role=role)
@controller.protected()
@validation.validated(schema.role_update, 'role')
def update_role(self, context, role_id, role):
return self._update_role(context, role_id, role)
@controller.protected()
@validation.validated(schema.role_update, 'role')
def update_domain_role(self, context, role_id, role):
return self._update_role(context, role_id, role)
def delete_role_wrapper(self, context, role_id):
if self._is_domain_role_target(role_id):
return self.delete_domain_role(context, role_id=role_id)
else:
return self.delete_role(context, role_id=role_id)
@controller.protected()
def delete_role(self, context, role_id):
return self._delete_role(context, role_id)
@controller.protected()
def delete_domain_role(self, context, role_id):
return self._delete_role(context, role_id)
def _create_role(self, context, role):
if role['name'] == CONF.member_role_name:
# Use the configured member role ID when creating the configured
# member role name. This avoids the potential of creating a
@ -303,36 +410,23 @@ class RoleV3(controller.V3Controller):
ref = self.role_api.create_role(ref['id'], ref, initiator)
return RoleV3.wrap_member(context, ref)
@controller.filterprotected('name', 'domain_id')
def list_roles(self, context, filters):
def _list_roles(self, context, filters):
hints = RoleV3.build_driver_hints(context, filters)
refs = self.role_api.list_roles(
hints=hints)
return RoleV3.wrap_collection(context, refs, hints=hints)
def list_roles_wrapper(self, context):
# If there is no domain_id filter defined, then we only want to return
# global roles, so we set the domain_id filter to None.
params = context['query_string']
if 'domain_id' not in params:
context['query_string']['domain_id'] = None
return self.list_roles(context)
@controller.protected()
def get_role(self, context, role_id):
def _get_role(self, context, role_id):
ref = self.role_api.get_role(role_id)
return RoleV3.wrap_member(context, ref)
@controller.protected()
@validation.validated(schema.role_update, 'role')
def update_role(self, context, role_id, role):
def _update_role(self, context, role_id, role):
self._require_matching_id(role_id, role)
initiator = notifications._get_request_audit_info(context)
ref = self.role_api.update_role(role_id, role, initiator)
return RoleV3.wrap_member(context, ref)
@controller.protected()
def delete_role(self, context, role_id):
def _delete_role(self, context, role_id):
initiator = notifications._get_request_audit_info(context)
self.role_api.delete_role(role_id, initiator)

View File

@ -72,7 +72,7 @@ class Routers(wsgi.RoutersBase):
routers.append(
router.Router(controllers.RoleV3(), 'roles', 'role',
resource_descriptions=self.v3_resources,
overrides={'list': 'list_roles_wrapper'}))
method_template='%s_wrapper'))
implied_roles_controller = controllers.ImpliedRolesV3()
self._add_resource(

View File

@ -20,21 +20,15 @@ class Router(wsgi.ComposableRouter):
def __init__(self, controller, collection_key, key,
resource_descriptions=None,
is_entity_implemented=True,
overrides=None):
method_template=None):
self.controller = controller
self.key = key
self.collection_key = collection_key
self._resource_descriptions = resource_descriptions
self._is_entity_implemented = is_entity_implemented
self.overrides = overrides
self.method_template = method_template or '%s'
def add_routes(self, mapper):
def _assign_action(action, key, overrides):
if overrides is not None and action in overrides:
return overrides[action]
else:
return '%(action)s_%(key)s' % {'action': action, 'key': key}
collection_path = '/%(collection_key)s' % {
'collection_key': self.collection_key}
entity_path = '/%(collection_key)s/{%(key)s_id}' % {
@ -44,27 +38,27 @@ class Router(wsgi.ComposableRouter):
mapper.connect(
collection_path,
controller=self.controller,
action=_assign_action('create', self.key, self.overrides),
action=self.method_template % 'create_%s' % self.key,
conditions=dict(method=['POST']))
mapper.connect(
collection_path,
controller=self.controller,
action=_assign_action('list', self.collection_key, self.overrides),
action=self.method_template % 'list_%s' % self.collection_key,
conditions=dict(method=['GET']))
mapper.connect(
entity_path,
controller=self.controller,
action=_assign_action('get', self.key, self.overrides),
action=self.method_template % 'get_%s' % self.key,
conditions=dict(method=['GET']))
mapper.connect(
entity_path,
controller=self.controller,
action=_assign_action('update', self.key, self.overrides),
action=self.method_template % 'update_%s' % self.key,
conditions=dict(method=['PATCH']))
mapper.connect(
entity_path,
controller=self.controller,
action=_assign_action('delete', self.key, self.overrides),
action=self.method_template % 'delete_%s' % self.key,
conditions=dict(method=['DELETE']))
# Add the collection resource and entity resource to the resource

View File

@ -620,13 +620,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.domain_admin_user = unit.create_user(
self.identity_api,
domain_id=self.domainA['id'])
self.domainB_admin_user = unit.create_user(
self.identity_api,
domain_id=self.domainB['id'])
self.project_admin_user = unit.create_user(
self.identity_api,
domain_id=self.domainA['id'])
self.project_adminB_user = unit.create_user(
self.identity_api,
domain_id=self.domainB['id'])
# The admin role and another plain role
# The admin role, a domain specific role and another plain role
self.admin_role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.admin_role['id'], self.admin_role)
self.roleA = unit.new_role_ref(domain_id=self.domainA['id'])
self.role_api.create_role(self.roleA['id'], self.roleA)
self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
@ -642,13 +650,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.domainB_admin_user['id'],
domain_id=self.domainB['id'])
# Create and assign roles to the project
self.project = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.project['id'], self.project)
self.projectB = unit.new_project_ref(domain_id=self.domainB['id'])
self.resource_api.create_project(self.projectB['id'], self.projectB)
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.project_admin_user['id'],
project_id=self.project['id'])
self.assignment_api.create_grant(
self.admin_role['id'], user_id=self.project_adminB_user['id'],
project_id=self.projectB['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=self.project['id'])
@ -768,6 +784,33 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.post('/roles', auth=self.auth, body={'role': role_ref},
expected_status=status_created)
def _domain_role_management_cases(self, domain_id, read_status_OK=False,
expected=None):
# Set the different status values for different types of call depending
# on whether we expect the calls to fail or not.
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/roles/%s' % self.roleA['id']
list_url = '/roles?domain_id=%s' % domain_id
if read_status_OK:
self.get(entity_url, auth=self.auth)
self.get(list_url, auth=self.auth)
else:
self.get(entity_url, auth=self.auth,
expected_status=status_OK)
self.get(list_url, auth=self.auth,
expected_status=status_OK)
role = {'name': 'Updated'}
self.patch(entity_url, auth=self.auth, body={'role': role},
expected_status=status_OK)
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
role_ref = unit.new_role_ref(domain_id=domain_id)
self.post('/roles', auth=self.auth, body={'role': role_ref},
expected_status=status_created)
def test_user_management(self):
# First, authenticate with a user that does not have the domain
# admin role - shouldn't be able to do much.
@ -1555,3 +1598,75 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
project_id=self.admin_project['id'])
self._role_management_cases()
def test_domain_role_management_no_admin_no_rights(self):
# A non-admin domain user shouldn't be able to manipulate domain roles
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
domain_id=self.domainA['id'])
self._domain_role_management_cases(
self.domainA['id'], expected=exception.ForbiddenAction.code)
# ...and nor should non-admin project user
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
project_id=self.project['id'])
self._domain_role_management_cases(
self.domainA['id'], expected=exception.ForbiddenAction.code)
def test_domain_role_management_with_cloud_admin(self):
# A cloud admin user should have rights to manipulate domain roles
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
project_id=self.admin_project['id'])
self._domain_role_management_cases(self.domainA['id'])
def test_domain_role_management_with_domain_admin(self):
# A domain admin user should only be able to manipulate the domain
# specific roles in their own domain
self.auth = self.build_authentication_request(
user_id=self.domainB_admin_user['id'],
password=self.domainB_admin_user['password'],
domain_id=self.domainB['id'])
# Try to access the domain specific roles in another domain
self._domain_role_management_cases(
self.domainA['id'], expected=exception.ForbiddenAction.code)
# ...but they should be able to work with those in their own domain
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._domain_role_management_cases(self.domainA['id'])
def test_domain_role_management_with_project_admin(self):
# A project admin user should have not access to domain specific roles
# in another domain. They should be able to get and list domain
# specific roles from their own domain, but not be able to create,
# update or delete them,
self.auth = self.build_authentication_request(
user_id=self.project_adminB_user['id'],
password=self.project_adminB_user['password'],
project_id=self.projectB['id'])
# Try access the domain specific roless in another domain
self._domain_role_management_cases(
self.domainA['id'], expected=exception.ForbiddenAction.code)
# ...but they should be ablet to work with those in their own domain
self.auth = self.build_authentication_request(
user_id=self.project_admin_user['id'],
password=self.project_admin_user['password'],
project_id=self.project['id'])
self._domain_role_management_cases(
self.domainA['id'], read_status_OK=True,
expected=exception.ForbiddenAction.code)