Implement API protection on target entities

API policy protection is currently limited to using the parameters
passed into the call. However, there are many cases where you want
to also check attributes of the entities an API is operating upon.  The
classic example is ensuring a domain administrator cannot get, update or
delete users, groups or projects outside of their domain.

This patch enables lines in the policy file to also refer to any field
in the target object of the API call. In addition, it includes a separate
sample policy file that shows how to use domains and the new protection
ability to provide domain segregation and administration delegation.
This sample file is also tested to ensure that such protection works
correctly.

DocImpact

Implements bp policy-on-api-target

Change-Id: Ie1a4e14a86d27e8b60e6c17e33dd6b9fa889660c
This commit is contained in:
Henry Nash 2013-07-23 16:07:12 +01:00
parent 5dc50bbf0f
commit c7a5c6cf27
12 changed files with 730 additions and 90 deletions

View File

@ -814,6 +814,71 @@ to be passed as arguments each time::
$ export OS_PASSWORD=my_password
$ export OS_TENANT_NAME=my_tenant
Keystone API protection with Role Based Access Control (RBAC)
-------------------------------------------------------------
Like most OpenStack projects, Keystone supports the protection of its APIs
by defining policy rules based on an RBAC approach. These are stored in a
JSON policy file, the name and location of which is set in the main Keystone
configuration file.
Each keystone v3 API has a line in the policy file which dictates what level
of protection is applied to it, where each line is of the form:
<api name>: <rule statement> or <match statement>
where
<rule statement> can be contain <rule statement> or <match statement>
<match statement> is a set of identifiers that must match between the token
provided by the caller of the API and the parameters or target entities of
the API call in question. For example:
"identity:create_user": [["role:admin", "domain_id:%(user.domain_id)s"]]
indicates that to create a user you must have the admin role in your token and
in addition the domain_id in your token (which implies this must be a domain
scoped token) must match the domain_id in the user object you are trying to
create. In other words, you must have the admin role on the domain in which
you are creating the user, and the token you are using must be scoped to that
domain.
Each component of a match statement is of the form:
<attribute from token>:<constant> or <attribute related to API call>
The following attributes are available
* Attributes from token: user_id, the domain_id or project_id depending on
the scope, and the list of roles you have within that scope
* Attributes related to API call: Any parameters that are passed into the
API call are available, along with any filters specified in the query
string. Attributes of objects passed can be refererenced using an
object.attribute syntax (e.g. user.domain_id). The target objects of an
API are also available using a target.object.attribute syntax. For instance:
"identity:delete_user": [["role:admin", "domain_id:%(target.user.domain_id)s"]]
would ensure that the user object that is being deleted is in the same
domain as the token provided.
The default policy.json file supplied provides a somewhat basic example of
API protection, and does not assume any particular use of domains. For
multi-domain configuration installations where, for example, a cloud
provider wishes to allow adminsistration of the contents of a domain to
be delegated, it is recommended that the supplied policy.v3cloudsample.json
is used as a basis for creating a suitable production policy file. This
example policy file also shows the use of an admin_domain to allow a cloud
provider to enable cloud adminstrators to have wider access across the APIs.
A clean installation would need to perhaps start with the standard policy
file, to allow creation of the admin_domain with the first users within
it. The domain_id of the admin domain would then be obtained and could be
pasted into a modifed version of policy.v3cloudsample.json which could then
be enabled as the main policy file.
Example usage
-------------

View File

@ -0,0 +1,101 @@
{
"admin_required": [["role:admin"]],
"cloud_admin": [["rule:admin_required", "domain_id:admin_domain_id"]],
"service_role": [["role:service"]],
"service_or_admin": [["rule:admin_required"], ["rule:service_role"]],
"owner" : [["user_id:%(user_id)s"]],
"admin_or_owner": [["rule:admin_required"], ["rule:owner"]],
"admin_or_cloud_admin": [["rule:admin_required"], ["rule:cloud_admin"]],
"default": [["rule:admin_required"]],
"identity:get_service": [["rule:admin_or_cloud_admin"]],
"identity:list_services": [["rule:admin_or_cloud_admin"]],
"identity:create_service": [["rule:cloud_admin"]],
"identity:update_service": [["rule:cloud_admin"]],
"identity:delete_service": [["rule:cloud_admin"]],
"identity:get_endpoint": [["rule:admin_or_cloud_admin"]],
"identity:list_endpoints": [["rule:admin_or_cloud_admin"]],
"identity:create_endpoint": [["rule:cloud_admin"]],
"identity:update_endpoint": [["rule:cloud_admin"]],
"identity:delete_endpoint": [["rule:cloud_admin"]],
"identity:get_domain": [["rule:cloud_admin"]],
"identity:list_domains": [["rule:cloud_admin"]],
"identity:create_domain": [["rule:cloud_admin"]],
"identity:update_domain": [["rule:cloud_admin"]],
"identity:delete_domain": [["rule:cloud_admin"]],
"identity:get_project": [["rule:admin_required", "domain_id:%(target.project.domain_id)s"]],
"identity:list_projects": [["rule:admin_required", "domain_id:%(domain_id)s"]],
"identity:list_user_projects": [["rule:owner"], ["rule:admin_required", "domain_id:%(domain_id)s"]],
"identity:create_project": [["rule:admin_required", "domain_id:%(project.domain_id)s"]],
"identity:update_project": [["rule:admin_required", "domain_id:%(target.project.domain_id)s"]],
"identity:delete_project": [["rule:admin_required", "domain_id:%(target.project.domain_id)s"]],
"identity:get_user": [["rule:admin_required", "domain_id:%(target.user.domain_id)s"]],
"identity:list_users": [["rule:admin_required", "domain_id:%(domain_id)s"]],
"identity:create_user": [["rule:admin_required", "domain_id:%(user.domain_id)s"]],
"identity:update_user": [["rule:admin_required", "domain_id:%(target.user.domain_id)s"]],
"identity:delete_user": [["rule:admin_required", "domain_id:%(target.user.domain_id)s"]],
"identity:get_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:list_groups": [["rule:admin_required", "domain_id:%(domain_id)s"]],
"identity:list_groups_for_user": [["rule:owner"], ["rule:admin_required", "domain_id:%(domain_id)s"]],
"identity:create_group": [["rule:admin_required", "domain_id:%(group.domain_id)s"]],
"identity:update_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:delete_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:list_users_in_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:remove_user_from_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:check_user_in_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:add_user_to_group": [["rule:admin_required", "domain_id:%(target.group.domain_id)s"]],
"identity:get_credential": [["rule:admin_required"]],
"identity:list_credentials": [["rule:admin_required"]],
"identity:create_credential": [["rule:admin_required"]],
"identity:update_credential": [["rule:admin_required"]],
"identity:delete_credential": [["rule:admin_required"]],
"identity:get_role": [["rule:admin_or_cloud_admin"]],
"identity:list_roles": [["rule:admin_or_cloud_admin"]],
"identity:create_role": [["rule:cloud_admin"]],
"identity:update_role": [["rule:cloud_admin"]],
"identity:delete_role": [["rule:cloud_admin"]],
"admin_on_domain_target" : [["rule:admin_required", "domain_id:%(target.domain.id)s"]],
"admin_on_project_target" : [["rule:admin_required", "project_id:%(target.project.id)s"]],
"identity:check_grant": [["rule:admin_on_project_target"],
["rule:admin_on_domain_target"]],
"identity:list_grants": [["rule:admin_on_project_target"],
["rule:admin_on_domain_target"]],
"identity:create_grant": [["rule:admin_on_project_target"],
["rule:admin_on_domain_target"]],
"identity:revoke_grant": [["rule:admin_on_project_target"],
["rule:admin_on_domain_target"]],
"admin_on_domain_filter" : [["rule:admin_required", "domain_id:%(scope.domain.id)s"]],
"admin_on_project_filter" : [["rule:admin_required", "project_id:%(scope.project.id)s"]],
"identity:list_role_assignments": [["admin_on_domain_filter"],
["admin_on_project_filter"]],
"identity:get_policy": [["rule:cloud_admin"]],
"identity:list_policies": [["rule:cloud_admin"]],
"identity:create_policy": [["rule:cloud_admin"]],
"identity:update_policy": [["rule:cloud_admin"]],
"identity:delete_policy": [["rule:cloud_admin"]],
"identity:check_token": [["rule:admin_required"]],
"identity:validate_token": [["rule:service_or_admin"]],
"identity:validate_token_head": [["rule:service_or_admin"]],
"identity:revocation_list": [["rule:service_or_admin"]],
"identity:revoke_token": [["rule:admin_or_owner"]],
"identity:create_trust": [["user_id:%(trust.trustor_user_id)s"]],
"identity:get_trust": [["rule:admin_or_owner"]],
"identity:list_trusts": [["@"]],
"identity:list_roles_for_trust": [["@"]],
"identity:check_role_for_trust": [["@"]],
"identity:get_role_for_trust": [["@"]],
"identity:delete_trust": [["@"]]
}

View File

@ -354,23 +354,23 @@ class Auth(controller.V3Controller):
msg = _('User not found')
raise exception.Unauthorized(msg)
@controller.protected
@controller.protected()
def check_token(self, context):
token_id = context.get('subject_token_id')
self.token_provider_api.check_v3_token(token_id)
@controller.protected
@controller.protected()
def revoke_token(self, context):
token_id = context.get('subject_token_id')
return self.token_provider_api.revoke_token(token_id)
@controller.protected
@controller.protected()
def validate_token(self, context):
token_id = context.get('subject_token_id')
token_data = self.token_provider_api.validate_v3_token(token_id)
return render_token_data_response(token_id, token_data)
@controller.protected
@controller.protected()
def revocation_list(self, context, auth=None):
return self.token_controllers_ref.revocation_list(context, auth)

View File

@ -128,7 +128,11 @@ class ServiceV3(controller.V3Controller):
collection_name = 'services'
member_name = 'service'
@controller.protected
def __init__(self):
super(ServiceV3, self).__init__()
self.get_member_from_driver = self.catalog_api.get_service
@controller.protected()
def create_service(self, context, service):
ref = self._assign_unique_id(self._normalize_dict(service))
self._require_attribute(ref, 'type')
@ -141,19 +145,19 @@ class ServiceV3(controller.V3Controller):
refs = self.catalog_api.list_services()
return ServiceV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_service(self, context, service_id):
ref = self.catalog_api.get_service(service_id)
return ServiceV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_service(self, context, service_id, service):
self._require_matching_id(service_id, service)
ref = self.catalog_api.update_service(service_id, service)
return ServiceV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def delete_service(self, context, service_id):
return self.catalog_api.delete_service(service_id)
@ -163,6 +167,10 @@ class EndpointV3(controller.V3Controller):
collection_name = 'endpoints'
member_name = 'endpoint'
def __init__(self):
super(EndpointV3, self).__init__()
self.get_member_from_driver = self.catalog_api.get_endpoint
@classmethod
def filter_endpoint(cls, ref):
if 'legacy_endpoint_id' in ref:
@ -174,7 +182,7 @@ class EndpointV3(controller.V3Controller):
ref = cls.filter_endpoint(ref)
return super(EndpointV3, cls).wrap_member(context, ref)
@controller.protected
@controller.protected()
def create_endpoint(self, context, endpoint):
ref = self._assign_unique_id(self._normalize_dict(endpoint))
self._require_attribute(ref, 'service_id')
@ -189,12 +197,12 @@ class EndpointV3(controller.V3Controller):
refs = self.catalog_api.list_endpoints()
return EndpointV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_endpoint(self, context, endpoint_id):
ref = self.catalog_api.get_endpoint(endpoint_id)
return EndpointV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_endpoint(self, context, endpoint_id, endpoint):
self._require_matching_id(endpoint_id, endpoint)
@ -204,6 +212,6 @@ class EndpointV3(controller.V3Controller):
ref = self.catalog_api.update_endpoint(endpoint_id, endpoint)
return EndpointV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def delete_endpoint(self, context, endpoint_id):
return self.catalog_api.delete_endpoint(endpoint_id)

View File

@ -86,23 +86,52 @@ def flatten(d, parent_key=''):
return dict(items)
def protected(f):
"""Wraps API calls with role based access controls (RBAC)."""
@functools.wraps(f)
def wrapper(self, context, *args, **kwargs):
if 'is_admin' in context and context['is_admin']:
LOG.warning(_('RBAC: Bypassing authorization'))
else:
action = 'identity:%s' % f.__name__
creds = _build_policy_check_credentials(self, action,
context, kwargs)
# Simply use the passed kwargs as the target dict, which
# would typically include the prime key of a get/update/delete
# call.
self.policy_api.enforce(creds, action, flatten(kwargs))
LOG.debug(_('RBAC: Authorization granted'))
def protected(callback=None):
"""Wraps API calls with role based access controls (RBAC).
return f(self, context, *args, **kwargs)
This handles both the protection of the API parameters as well as any
target entities for single-entity API calls.
More complex API calls (for example that deal with several different
entities) should pass in a callback function, that will be subsequently
called to check protection for these multiple entities. This callback
function should gather the appropriate entities needed and then call
check_proetction() in the V3Controller class.
"""
def wrapper(f):
@functools.wraps(f)
def inner(self, context, *args, **kwargs):
if 'is_admin' in context and context['is_admin']:
LOG.warning(_('RBAC: Bypassing authorization'))
elif callback is not None:
prep_info = {'f_name': f.__name__,
'input_attr': kwargs}
callback(self, context, prep_info, *args, **kwargs)
else:
action = 'identity:%s' % f.__name__
creds = _build_policy_check_credentials(self, action,
context, kwargs)
# Check to see if we need to include the target entity in our
# policy checks. We deduce this by seeing if the class has
# specified a get_member() method and that kwargs contains the
# appropriate entity id.
policy_dict = {}
if (hasattr(self, 'get_member_from_driver') and
self.get_member_from_driver is not None):
key = '%s_id' % self.member_name
if key in kwargs:
ref = self.get_member_from_driver(kwargs[key])
policy_dict = {'target':
{self.member_name: ref}}
# Add in the kwargs, which means that any entity provided as a
# parameter for calls like create and update will be included.
policy_dict.update(kwargs)
self.policy_api.enforce(creds, action, flatten(policy_dict))
LOG.debug(_('RBAC: Authorization granted'))
return f(self, context, *args, **kwargs)
return inner
return wrapper
@ -206,6 +235,7 @@ class V3Controller(V2Controller):
collection_name = 'entities'
member_name = 'entity'
get_member_from_driver = None
def _delete_tokens_for_group(self, group_id):
user_refs = self.identity_api.list_users_in_group(group_id)
@ -336,3 +366,31 @@ class V3Controller(V2Controller):
def _filter_domain_id(self, ref):
"""Override v2 filter to let domain_id out for v3 calls."""
return ref
def check_protection(self, context, prep_info, target_attr=None):
"""Provide call protection for complex target attributes.
As well as including the standard parameters from the original API
call (which is passed in prep_info), this call will add in any
additional entities or attributes (passed in target_attr), so that
they can be referenced by policy rules.
"""
if 'is_admin' in context and context['is_admin']:
LOG.warning(_('RBAC: Bypassing authorization'))
else:
action = 'identity:%s' % prep_info['f_name']
# TODO(henry-nash) need to log the target attributes as well
creds = _build_policy_check_credentials(self, action,
context,
prep_info['input_attr'])
# Build the dict the policy engine will check against from both the
# parameters passed into the call we are protecting (which was
# stored in the prep_info by protected()), plus the target
# attributes provided.
policy_dict = {}
if target_attr:
policy_dict = {'target': target_attr}
policy_dict.update(prep_info['input_attr'])
self.policy_api.enforce(creds, action, flatten(policy_dict))
LOG.debug(_('RBAC: Authorization granted'))

View File

@ -24,7 +24,7 @@ from keystone.identity import controllers as identity_controllers
@dependency.requires('catalog_api', 'identity_api', 'endpoint_filter_api')
class EndpointFilterV3Controller(controller.V3Controller):
@controller.protected
@controller.protected()
def add_endpoint_to_project(self, context, project_id, endpoint_id):
"""Establishes an association between an endpoint and a project."""
# NOTE(gyee): we just need to make sure endpoint and project exist
@ -38,7 +38,7 @@ class EndpointFilterV3Controller(controller.V3Controller):
self.endpoint_filter_api.add_endpoint_to_project(endpoint_id,
project_id)
@controller.protected
@controller.protected()
def check_endpoint_in_project(self, context, project_id, endpoint_id):
"""Verifies endpoint is currently associated with given project."""
self.catalog_api.get_endpoint(endpoint_id)
@ -48,7 +48,7 @@ class EndpointFilterV3Controller(controller.V3Controller):
self.endpoint_filter_api.check_endpoint_in_project(endpoint_id,
project_id)
@controller.protected
@controller.protected()
def list_endpoints_for_project(self, context, project_id):
"""Lists all endpoints currently associated with a given project."""
self.identity_api.get_project(project_id)
@ -59,13 +59,13 @@ class EndpointFilterV3Controller(controller.V3Controller):
return catalog_controllers.EndpointV3.wrap_collection(context,
endpoints)
@controller.protected
@controller.protected()
def remove_endpoint_from_project(self, context, project_id, endpoint_id):
"""Remove the endpoint from the association with given project."""
self.endpoint_filter_api.remove_endpoint_from_project(endpoint_id,
project_id)
@controller.protected
@controller.protected()
def list_projects_for_endpoint(self, context, endpoint_id):
"""Return a list of projects associated with the endpoint."""
refs = self.endpoint_filter_api.list_project_endpoints(endpoint_id)

View File

@ -25,6 +25,10 @@ class CredentialV3(controller.V3Controller):
collection_name = 'credentials'
member_name = 'credential'
def __init__(self):
super(CredentialV3, self).__init__()
self.get_member_from_driver = self.credential_api.get_credential
def _assign_unique_id(self, ref):
# Generates and assigns a unique identifer to
# a credential reference.
@ -46,29 +50,29 @@ class CredentialV3(controller.V3Controller):
else:
return super(CredentialV3, self)._assign_unique_id(ref)
@controller.protected
@controller.protected()
def create_credential(self, context, credential):
ref = self._assign_unique_id(self._normalize_dict(credential))
ref = self.credential_api.create_credential(ref['id'], ref)
return CredentialV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def list_credentials(self, context):
refs = self.credential_api.list_credentials()
return CredentialV3.wrap_collection(context, refs)
@controller.protected
@controller.protected()
def get_credential(self, context, credential_id):
ref = self.credential_api.get_credential(credential_id)
return CredentialV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_credential(self, context, credential_id, credential):
self._require_matching_id(credential_id, credential)
ref = self.credential_api.update_credential(credential_id, credential)
return CredentialV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def delete_credential(self, context, credential_id):
return self.credential_api.delete_credential(credential_id)

View File

@ -408,7 +408,11 @@ class DomainV3(controller.V3Controller):
collection_name = 'domains'
member_name = 'domain'
@controller.protected
def __init__(self):
super(DomainV3, self).__init__()
self.get_member_from_driver = self.assignment_api.get_domain
@controller.protected()
def create_domain(self, context, domain):
self._require_attribute(domain, 'name')
@ -421,12 +425,12 @@ class DomainV3(controller.V3Controller):
refs = self.identity_api.list_domains()
return DomainV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_domain(self, context, domain_id):
ref = self.identity_api.get_domain(domain_id)
return DomainV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_domain(self, context, domain_id, domain):
self._require_matching_id(domain_id, domain)
@ -515,7 +519,7 @@ class DomainV3(controller.V3Controller):
for user in user_ids:
user_cntl._delete_user(context, user)
@controller.protected
@controller.protected()
def delete_domain(self, context, domain_id):
# explicitly forbid deleting the default domain (this should be a
# carefully orchestrated manual process involving configuration
@ -551,7 +555,11 @@ class ProjectV3(controller.V3Controller):
collection_name = 'projects'
member_name = 'project'
@controller.protected
def __init__(self):
super(ProjectV3, self).__init__()
self.get_member_from_driver = self.assignment_api.get_project
@controller.protected()
def create_project(self, context, project):
self._require_attribute(project, 'name')
@ -570,12 +578,12 @@ class ProjectV3(controller.V3Controller):
refs = self.identity_api.list_user_projects(user_id)
return ProjectV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_project(self, context, project_id):
ref = self.identity_api.get_project(project_id)
return ProjectV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_project(self, context, project_id, project):
self._require_matching_id(project_id, project)
@ -600,7 +608,7 @@ class ProjectV3(controller.V3Controller):
# to this project
return self.identity_api.delete_project(project_id)
@controller.protected
@controller.protected()
def delete_project(self, context, project_id):
return self._delete_project(context, project_id)
@ -609,7 +617,18 @@ class UserV3(controller.V3Controller):
collection_name = 'users'
member_name = 'user'
@controller.protected
def __init__(self):
super(UserV3, self).__init__()
self.get_member_from_driver = self.identity_api.get_user
def _check_user_and_group_protection(self, context, prep_info,
user_id, group_id):
ref = {}
ref['user'] = self.identity_api.get_user(user_id)
ref['group'] = self.identity_api.get_group(group_id)
self.check_protection(context, prep_info, ref)
@controller.protected()
def create_user(self, context, user):
self._require_attribute(user, 'name')
@ -631,14 +650,14 @@ class UserV3(controller.V3Controller):
domain_scope=self._get_domain_id_for_request(context))
return UserV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_user(self, context, user_id):
ref = self.identity_api.get_user(
user_id,
domain_scope=self._get_domain_id_for_request(context))
return UserV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_user(self, context, user_id, user):
self._require_matching_id(user_id, user)
ref = self.identity_api.update_user(
@ -651,7 +670,7 @@ class UserV3(controller.V3Controller):
return UserV3.wrap_member(context, ref)
@controller.protected
@controller.protected(callback=_check_user_and_group_protection)
def add_user_to_group(self, context, user_id, group_id):
self.identity_api.add_user_to_group(
user_id, group_id,
@ -660,13 +679,13 @@ class UserV3(controller.V3Controller):
# immediate effect
self._delete_tokens_for_user(user_id)
@controller.protected
@controller.protected(callback=_check_user_and_group_protection)
def check_user_in_group(self, context, user_id, group_id):
return self.identity_api.check_user_in_group(
user_id, group_id,
domain_scope=self._get_domain_id_for_request(context))
@controller.protected
@controller.protected(callback=_check_user_and_group_protection)
def remove_user_from_group(self, context, user_id, group_id):
self.identity_api.remove_user_from_group(
user_id, group_id,
@ -688,7 +707,7 @@ class UserV3(controller.V3Controller):
return self.identity_api.delete_user(
user_id, domain_scope=domain_id)
@controller.protected
@controller.protected()
def delete_user(self, context, user_id):
return self._delete_user(context, user_id)
@ -697,7 +716,11 @@ class GroupV3(controller.V3Controller):
collection_name = 'groups'
member_name = 'group'
@controller.protected
def __init__(self):
super(GroupV3, self).__init__()
self.get_member_from_driver = self.identity_api.get_group
@controller.protected()
def create_group(self, context, group):
self._require_attribute(group, 'name')
@ -719,14 +742,14 @@ class GroupV3(controller.V3Controller):
domain_scope=self._get_domain_id_for_request(context))
return GroupV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_group(self, context, group_id):
ref = self.identity_api.get_group(
group_id,
domain_scope=self._get_domain_id_for_request(context))
return GroupV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_group(self, context, group_id, group):
self._require_matching_id(group_id, group)
@ -749,7 +772,7 @@ class GroupV3(controller.V3Controller):
for user in user_refs:
self._delete_tokens_for_user(user['id'])
@controller.protected
@controller.protected()
def delete_group(self, context, group_id):
return self._delete_group(context, group_id)
@ -758,7 +781,11 @@ class RoleV3(controller.V3Controller):
collection_name = 'roles'
member_name = 'role'
@controller.protected
def __init__(self):
super(RoleV3, self).__init__()
self.get_member_from_driver = self.assignment_api.get_role
@controller.protected()
def create_role(self, context, role):
self._require_attribute(role, 'name')
@ -771,19 +798,19 @@ class RoleV3(controller.V3Controller):
refs = self.identity_api.list_roles()
return RoleV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_role(self, context, role_id):
ref = self.identity_api.get_role(role_id)
return RoleV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_role(self, context, role_id, role):
self._require_matching_id(role_id, role)
ref = self.identity_api.update_role(role_id, role)
return RoleV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def delete_role(self, context, role_id):
return self.identity_api.delete_role(role_id)
@ -802,9 +829,34 @@ class RoleV3(controller.V3Controller):
context['path'].startswith('/OS-INHERIT') and
context['path'].endswith('/inherited_to_projects'))
@controller.protected
def create_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
def _check_grant_protection(self, context, protection, role_id=None,
user_id=None, group_id=None,
domain_id=None, project_id=None):
"""Check protection for role grant APIs.
The policy rule might want to inspect attributes of any of the entities
involved in the grant. So we get these and pass them to the
check_protection() handler in the controller.
"""
ref = {}
if role_id:
ref['role'] = self.identity_api.get_role(role_id)
if user_id:
ref['user'] = self.identity_api.get_user(user_id)
else:
ref['group'] = self.identity_api.get_group(group_id)
if domain_id:
ref['domain'] = self.assignment_api.get_domain(domain_id)
else:
ref['project'] = self.assignment_api.get_project(project_id)
self.check_protection(context, protection, ref)
@controller.protected(callback=_check_grant_protection)
def create_grant(self, context, role_id, user_id=None,
group_id=None, domain_id=None, project_id=None):
"""Grants a role to a user or group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
@ -818,9 +870,9 @@ class RoleV3(controller.V3Controller):
role_id, user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
@controller.protected
def list_grants(self, context, user_id=None, group_id=None,
domain_id=None, project_id=None):
@controller.protected(callback=_check_grant_protection)
def list_grants(self, context, user_id=None,
group_id=None, domain_id=None, project_id=None):
"""Lists roles granted to user/group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
@ -830,9 +882,9 @@ class RoleV3(controller.V3Controller):
self._check_if_inherited(context))
return RoleV3.wrap_collection(context, refs)
@controller.protected
def check_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
@controller.protected(callback=_check_grant_protection)
def check_grant(self, context, role_id, user_id=None,
group_id=None, domain_id=None, project_id=None):
"""Checks if a role has been granted on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
@ -846,9 +898,9 @@ class RoleV3(controller.V3Controller):
role_id, user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
@controller.protected
def revoke_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
@controller.protected(callback=_check_grant_protection)
def revoke_grant(self, context, role_id, user_id=None,
group_id=None, domain_id=None, project_id=None):
"""Revokes a role from user/group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
@ -1179,14 +1231,14 @@ class RoleAssignmentV3(controller.V3Controller):
return self.wrap_collection(context, formatted_refs, filters)
@controller.protected
@controller.protected()
def get_role_assignment(self, context):
raise exception.NotImplemented()
@controller.protected
@controller.protected()
def update_role_assignment(self, context):
raise exception.NotImplemented()
@controller.protected
@controller.protected()
def delete_role_assignment(self, context):
raise exception.NotImplemented()

View File

@ -21,7 +21,7 @@ class PolicyV3(controller.V3Controller):
collection_name = 'policies'
member_name = 'policy'
@controller.protected
@controller.protected()
def create_policy(self, context, policy):
ref = self._assign_unique_id(self._normalize_dict(policy))
self._require_attribute(ref, 'blob')
@ -35,16 +35,16 @@ class PolicyV3(controller.V3Controller):
refs = self.policy_api.list_policies()
return PolicyV3.wrap_collection(context, refs, filters)
@controller.protected
@controller.protected()
def get_policy(self, context, policy_id):
ref = self.policy_api.get_policy(policy_id)
return PolicyV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def update_policy(self, context, policy_id, policy):
ref = self.policy_api.update_policy(policy_id, policy)
return PolicyV3.wrap_member(context, ref)
@controller.protected
@controller.protected()
def delete_policy(self, context, policy_id):
return self.policy_api.delete_policy(policy_id)

View File

@ -22,6 +22,7 @@ from keystone import config
from keystone import exception
from keystone.openstack.common import jsonutils
from keystone.policy.backends import rules
from keystone.tests import core as test
import test_v3
@ -42,7 +43,8 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
- Three domains: A,B & C. C is disabled.
- DomainA has user1, DomainB has user2 and user3
- DomainA has group1 and group2, DomainB has group3
- User1 has a role on DomainA
- User1 has two roles on DomainA
- User2 has one role on DomainA
Remember that there will also be a fourth domain in existence,
the default domain.
@ -85,9 +87,17 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.role = self.new_role_ref()
self.identity_api.create_role(self.role['id'], self.role)
self.role1 = self.new_role_ref()
self.identity_api.create_role(self.role1['id'], self.role1)
self.identity_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
self.identity_api.create_grant(self.role['id'],
user_id=self.user2['id'],
domain_id=self.domainA['id'])
self.identity_api.create_grant(self.role1['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
# Initialize the policy engine and allow us to write to a temp
# file in each test to create the policies
@ -168,6 +178,75 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
r = self.get(url_by_name, auth=self.auth)
self.assertEquals(self.user1['id'], r.result['user']['id'])
def test_get_user_protected_match_target(self):
"""GET /users/{id} (match target)
Test Plan:
- Update policy to protect api by domain_id
- Try and read a user who is in DomainB with a token scoped
to Domain A - this should fail
- Retry this for a user who is in Domain A, which should succeed.
- Finally, try getting a user that does not exist, which should
still return UserNotFound
"""
new_policy = {'identity:get_user':
[["domain_id:%(target.user.domain_id)s"]]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/users/%s' % self.user2['id']
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
url_by_name = '/users/%s' % self.user1['id']
r = self.get(url_by_name, auth=self.auth)
self.assertEquals(self.user1['id'], r.result['user']['id'])
url_by_name = '/users/%s' % uuid.uuid4().hex
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.UserNotFound.code)
def test_revoke_grant_protected_match_target(self):
"""DELETE /domains/{id}/users/{id}/roles/{id} (match target)
Test Plan:
- Update policy to protect api by domain_id of entities in
the grant
- Try and delete the existing grant that has a user who is
from a different domain - this should fail.
- Retry this for a user who is in Domain A, which should succeed.
"""
new_policy = {'identity:revoke_grant':
[["domain_id:%(target.user.domain_id)s"]]}
self._set_policy(new_policy)
collection_url = (
'/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domainA['id'],
'user_id': self.user2['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role['id']}
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
self.delete(member_url, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
collection_url = (
'/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domainA['id'],
'user_id': self.user1['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role1['id']}
self.delete(member_url, auth=self.auth)
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)
@ -306,3 +385,276 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainA['id'], id_list)
class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase):
"""Test policy enforcement of the sample v3 cloud policy file."""
def setUp(self):
"""Setup for v3 Cloud Policy Sample Test Cases.
The following data is created:
- Three domains: A, B and admin_domain, and one project
- DomainA has users: domain_admin and just_a_user. domain_admin has
role 'admin', just_a_user does not
- admin_domain has user cloud_admin, with a plain role
- domain_admin and just_a_user gave the same roles on the project
We test various api protection rules from the cloud sample policy
file to make sure the sample is valid and that we correctly enforce it.
"""
# Ensure that test_v3.RestfulTestCase doesn't load its own
# sample data, which would make checking the results of our
# tests harder
super(IdentityTestv3CloudPolicySample, self).setUp(
load_sample_data=False)
# Start by creating a couple of domains
self.domainA = self.new_domain_ref()
self.identity_api.create_domain(self.domainA['id'], self.domainA)
self.domainB = self.new_domain_ref()
self.identity_api.create_domain(self.domainB['id'], self.domainB)
self.admin_domain = {'id': 'admin_domain_id', 'name': 'Admin_domain'}
self.assignment_api.create_domain(self.admin_domain['id'],
self.admin_domain)
# And our users
self.cloud_admin_user = self.new_user_ref(
domain_id=self.admin_domain['id'])
self.cloud_admin_user['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.cloud_admin_user['id'],
self.cloud_admin_user)
self.just_a_user = self.new_user_ref(domain_id=self.domainA['id'])
self.just_a_user['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.just_a_user['id'], self.just_a_user)
self.domain_admin_user = self.new_user_ref(
domain_id=self.domainA['id'])
self.domain_admin_user['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.domain_admin_user['id'],
self.domain_admin_user)
self.project_admin_user = self.new_user_ref(
domain_id=self.domainA['id'])
self.project_admin_user['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.project_admin_user['id'],
self.project_admin_user)
# The admin role and another plain role
self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'}
self.assignment_api.create_role(self.admin_role['id'], self.admin_role)
self.role = self.new_role_ref()
self.identity_api.create_role(self.role['id'], self.role)
# The cloud admin just gets the admin role
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.cloud_admin_user['id'],
domain_id=self.admin_domain['id'])
# Assign roles to the domain
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.domain_admin_user['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
domain_id=self.domainA['id'])
# Create a assign roles to the project
self.project = self.new_project_ref(domain_id=self.domainA['id'])
self.assignment_api.create_project(self.project['id'], self.project)
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.project_admin_user['id'],
project_id=self.project['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=self.project['id'])
# Finally, switch to the v3 sample policy file
self.orig_policy_file = CONF.policy_file
rules.reset()
self.opt(policy_file=test.etcdir('policy.v3cloudsample.json'))
def tearDown(self):
super(IdentityTestv3CloudPolicySample, self).tearDown()
rules.reset()
self.opt(policy_file=self.orig_policy_file)
def _stati(self, expected_status):
# Return the expected return codes for APIs with and without data
# with any specified status overriding the normal values
if expected_status is None:
return (200, 201, 204)
else:
return (expected_status, expected_status, expected_status)
def _test_user_management(self, domain_id, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/users/%s' % self.just_a_user['id']
list_url = '/users?domain_id=%s' % domain_id
self.get(entity_url, auth=self.auth,
expected_status=status_OK)
self.get(list_url, auth=self.auth,
expected_status=status_OK)
user = {'description': 'Updated'}
self.patch(entity_url, auth=self.auth, body={'user': user},
expected_status=status_OK)
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
user_ref = self.new_user_ref(domain_id=domain_id)
self.post('/users', auth=self.auth, body={'user': user_ref},
expected_status=status_created)
def _test_project_management(self, domain_id, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/projects/%s' % self.project['id']
list_url = '/projects?domain_id=%s' % domain_id
self.get(entity_url, auth=self.auth,
expected_status=status_OK)
self.get(list_url, auth=self.auth,
expected_status=status_OK)
project = {'description': 'Updated'}
self.patch(entity_url, auth=self.auth, body={'project': project},
expected_status=status_OK)
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
proj_ref = self.new_project_ref(domain_id=domain_id)
self.post('/projects', auth=self.auth, body={'project': proj_ref},
expected_status=status_created)
def _test_domain_management(self, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/domains/%s' % self.domainB['id']
list_url = '/domains'
self.get(entity_url, auth=self.auth,
expected_status=status_OK)
self.get(list_url, auth=self.auth,
expected_status=status_OK)
domain = {'description': 'Updated', 'enabled': False}
self.patch(entity_url, auth=self.auth, body={'domain': domain},
expected_status=status_OK)
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
domain_ref = self.new_domain_ref()
self.post('/domains', auth=self.auth, body={'domain': domain_ref},
expected_status=status_created)
def _test_grants(self, target, entity_id, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
a_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(a_role['id'], a_role)
collection_url = (
'/%(target)s/%(target_id)s/users/%(user_id)s/roles' % {
'target': target,
'target_id': entity_id,
'user_id': self.just_a_user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': a_role['id']}
self.put(member_url, auth=self.auth,
expected_status=status_no_data)
self.head(member_url, auth=self.auth,
expected_status=status_no_data)
self.get(collection_url, auth=self.auth,
expected_status=status_OK)
self.delete(member_url, auth=self.auth,
expected_status=status_no_data)
def test_user_management(self):
# First, authentication with a user that does not have the domain
# admin role - houldn't be able to do much.
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
domain_id=self.domainA['id'])
self._test_user_management(
self.domainA['id'], expected=exception.ForbiddenAction.code)
# Now, authentication with a user that does have the domain admin role
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._test_user_management(self.domainA['id'])
def test_project_management(self):
# First, authentication with a user that does not have the project
# admin role - houldn't be able to do much.
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
domain_id=self.domainA['id'])
self._test_project_management(
self.domainA['id'], expected=exception.ForbiddenAction.code)
# ...but should still be able to list projects of which they are
# a member
url = '/users/%s/projects' % self.just_a_user['id']
self.get(url, auth=self.auth)
# Now, authentication with a user that does have the domain admin role
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._test_project_management(self.domainA['id'])
def test_domain_grants(self):
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
domain_id=self.domainA['id'])
self._test_grants('domains', self.domainA['id'],
expected=exception.ForbiddenAction.code)
# Now, authentication with a user that does have the domain admin role
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._test_grants('domains', self.domainA['id'])
def test_project_grants(self):
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
project_id=self.project['id'])
self._test_grants('projects', self.project['id'],
expected=exception.ForbiddenAction.code)
# Now, authentication with a user that does have the domain admin role
self.auth = self.build_authentication_request(
user_id=self.project_admin_user['id'],
password=self.project_admin_user['password'],
project_id=self.project['id'])
self._test_grants('projects', self.project['id'])
def test_cloud_admin(self):
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._test_domain_management(
expected=exception.ForbiddenAction.code)
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
domain_id=self.admin_domain['id'])
self._test_domain_management()

View File

@ -390,7 +390,7 @@ class Auth(controller.V2Controller):
_('Token does not belong to specified tenant.'))
return data
@controller.protected
@controller.protected()
def validate_token_head(self, context, token_id):
"""Check that a token is valid.
@ -402,7 +402,7 @@ class Auth(controller.V2Controller):
belongs_to = context['query_string'].get('belongsTo')
self.token_provider_api.check_v2_token(token_id, belongs_to)
@controller.protected
@controller.protected()
def validate_token(self, context, token_id):
"""Check that a token is valid.
@ -420,7 +420,7 @@ class Auth(controller.V2Controller):
self.assert_admin(context)
self.token_api.delete_token(token_id)
@controller.protected
@controller.protected()
def revocation_list(self, context, auth=None):
tokens = self.token_api.list_revoked_tokens()

View File

@ -118,7 +118,7 @@ class TrustV3(controller.V3Controller):
target='roles')
return trust_roles
@controller.protected
@controller.protected()
def create_trust(self, context, trust=None):
"""Create a new trust.
@ -167,7 +167,7 @@ class TrustV3(controller.V3Controller):
raise exception.ValidationError(attribute=e.args[0],
target='trust')
@controller.protected
@controller.protected()
def list_trusts(self, context):
query = context['query_string']
trusts = []
@ -192,7 +192,7 @@ class TrustV3(controller.V3Controller):
self._fill_in_roles(context, trust, global_roles)
return TrustV3.wrap_collection(context, trusts)
@controller.protected
@controller.protected()
def delete_trust(self, context, trust_id):
trust = self.trust_api.get_trust(trust_id)
if not trust:
@ -204,7 +204,7 @@ class TrustV3(controller.V3Controller):
userid = trust['trustor_user_id']
self.token_api.delete_tokens(userid, trust_id=trust_id)
@controller.protected
@controller.protected()
def list_roles_for_trust(self, context, trust_id):
trust = self.get_trust(context, trust_id)['trust']
if not trust:
@ -214,7 +214,7 @@ class TrustV3(controller.V3Controller):
return {'roles': trust['roles'],
'links': trust['roles_links']}
@controller.protected
@controller.protected()
def check_role_for_trust(self, context, trust_id, role_id):
"""Checks if a role has been assigned to a trust."""
trust = self.trust_api.get_trust(trust_id)
@ -227,7 +227,7 @@ class TrustV3(controller.V3Controller):
if not matching_roles:
raise exception.RoleNotFound(role_id=role_id)
@controller.protected
@controller.protected()
def get_role_for_trust(self, context, trust_id, role_id):
"""Checks if a role has been assigned to a trust."""
trust = self.trust_api.get_trust(trust_id)