Concrete role assignments for federated users

"Shadow users: unified identity" implementation:
Allow concrete role assignments for federated users. Currently,
federated users get roles from mapped group assignments. However, with
the shadow users implementation, federated users are mapped to
identities in the backend; thus, can be assigned roles.

This patch returns locally assigned roles with the mapped group roles
for federated users; allowing for authorization for those roles.

bp shadow-users-newton

Change-Id: I9a150ded6c4b556627147d2671be15d6a3794ba5
This commit is contained in:
Ronald De Rose 2016-02-25 21:39:15 +00:00
parent 3a62069f43
commit eed233cac8
10 changed files with 294 additions and 20 deletions

View File

@ -172,8 +172,8 @@ identity:get_auth_catalog GET /v3/auth/catalog
identity:get_auth_projects GET /v3/auth/projects
identity:get_auth_domains GET /v3/auth/domains
identity:list_projects_for_groups GET /v3/OS-FEDERATION/projects
identity:list_domains_for_groups GET /v3/OS-FEDERATION/domains
identity:list_projects_for_user GET /v3/OS-FEDERATION/projects
identity:list_domains_for_user GET /v3/OS-FEDERATION/domains
identity:list_revoke_events GET /v3/OS-REVOKE/events

View File

@ -173,8 +173,8 @@
"identity:get_auth_projects": "",
"identity:get_auth_domains": "",
"identity:list_projects_for_groups": "",
"identity:list_domains_for_groups": "",
"identity:list_projects_for_user": "",
"identity:list_domains_for_user": "",
"identity:list_revoke_events": "",

View File

@ -198,8 +198,8 @@
"identity:get_auth_projects": "",
"identity:get_auth_domains": "",
"identity:list_projects_for_groups": "",
"identity:list_domains_for_groups": "",
"identity:list_projects_for_user": "",
"identity:list_domains_for_user": "",
"identity:list_revoke_events": "",

View File

@ -429,8 +429,8 @@ class DomainV3(controller.V3Controller):
self.get_member_from_driver = self.resource_api.get_domain
@controller.protected()
def list_domains_for_groups(self, request):
"""List all domains available to an authenticated user's groups.
def list_domains_for_user(self, request):
"""List all domains available to an authenticated user.
:param context: request context
:returns: list of accessible domains
@ -440,6 +440,10 @@ class DomainV3(controller.V3Controller):
auth_context = env[authorization.AUTH_CONTEXT_ENV]
domains = self.assignment_api.list_domains_for_groups(
auth_context['group_ids'])
domains = domains + self.assignment_api.list_domains_for_user(
auth_context['user_id'])
# remove duplicates
domains = [dict(t) for t in set([tuple(d.items()) for d in domains])]
return DomainV3.wrap_collection(request.context_dict, domains)
@ -453,8 +457,8 @@ class ProjectAssignmentV3(controller.V3Controller):
self.get_member_from_driver = self.resource_api.get_project
@controller.protected()
def list_projects_for_groups(self, request):
"""List all projects available to an authenticated user's groups.
def list_projects_for_user(self, request):
"""List all projects available to an authenticated user.
:param context: request context
:returns: list of accessible projects
@ -464,6 +468,10 @@ class ProjectAssignmentV3(controller.V3Controller):
auth_context = env[authorization.AUTH_CONTEXT_ENV]
projects = self.assignment_api.list_projects_for_groups(
auth_context['group_ids'])
projects = projects + self.assignment_api.list_projects_for_user(
auth_context['user_id'])
# remove duplicates
projects = [dict(t) for t in set([tuple(d.items()) for d in projects])]
return ProjectAssignmentV3.wrap_collection(request.context_dict,
projects)

View File

@ -194,13 +194,13 @@ class Routers(wsgi.RoutersBase):
mapper, domain_controller,
path=self._construct_url('domains'),
new_path='/auth/domains',
get_action='list_domains_for_groups',
get_action='list_domains_for_user',
rel=build_resource_relation(resource_name='domains'))
self._add_resource(
mapper, project_controller,
path=self._construct_url('projects'),
new_path='/auth/projects',
get_action='list_projects_for_groups',
get_action='list_projects_for_user',
rel=build_resource_relation(resource_name='projects'))
# Auth operations

View File

@ -350,6 +350,8 @@ def validate_groups(group_ids, mapping_id, identity_api):
is 0.
"""
# TODO(rderose): remove cardinality check, as federated users can now
# receive direct role assignments
validate_groups_cardinality(group_ids, mapping_id)
validate_groups_in_backend(group_ids, mapping_id, identity_api)

View File

@ -2637,6 +2637,258 @@ class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin):
user_id2 = r.json_body['token']['user']['id']
self.assertEqual(user_id, user_id2)
def test_user_role_assignment(self):
# create project and role
project_ref = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml()
# exchange an unscoped token for a scoped token; resulting in
# unauthorized because the user doesn't have any role assignments
v3_scope_request = self._scope_request(unscoped_token, 'project',
project_ref['id'])
r = self.v3_create_token(v3_scope_request,
expected_status=http_client.UNAUTHORIZED)
# assign project role to federated user
self.assignment_api.add_role_to_user_and_project(
user_id, project_ref['id'], role_ref['id'])
# exchange an unscoped token for a scoped token
r = self.v3_create_token(v3_scope_request,
expected_status=http_client.CREATED)
scoped_token = r.headers['X-Subject-Token']
# ensure user can access resource based on role assignment
path = '/projects/%(project_id)s' % {'project_id': project_ref['id']}
r = self.v3_request(path=path, method='GET',
expected_status=http_client.OK,
token=scoped_token)
self.assertValidProjectResponse(r, project_ref)
# create a 2nd project
project_ref2 = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(project_ref2['id'], project_ref2)
# ensure the user cannot access the 2nd resource (forbidden)
path = '/projects/%(project_id)s' % {'project_id': project_ref2['id']}
r = self.v3_request(path=path, method='GET',
expected_status=http_client.FORBIDDEN,
token=scoped_token)
def test_domain_scoped_user_role_assignment(self):
# create domain and role
domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml()
# exchange an unscoped token for a scoped token; resulting in
# unauthorized because the user doesn't have any role assignments
v3_scope_request = self._scope_request(unscoped_token, 'domain',
domain_ref['id'])
r = self.v3_create_token(v3_scope_request,
expected_status=http_client.UNAUTHORIZED)
# assign domain role to user
self.assignment_api.create_grant(role_ref['id'],
user_id=user_id,
domain_id=domain_ref['id'])
# exchange an unscoped token for domain scoped token and test
r = self.v3_create_token(v3_scope_request,
expected_status=http_client.CREATED)
self.assertIsNotNone(r.headers.get('X-Subject-Token'))
token_resp = r.result['token']
self.assertIn('domain', token_resp)
def test_auth_projects_matches_federation_projects(self):
# create project and role
project_ref = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml()
# assign project role to federated user
self.assignment_api.add_role_to_user_and_project(
user_id, project_ref['id'], role_ref['id'])
# get auth projects
r = self.get('/auth/projects', token=unscoped_token)
auth_projects = r.result['projects']
# get federation projects
r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
fed_projects = r.result['projects']
# compare
self.assertItemsEqual(auth_projects, fed_projects)
def test_auth_projects_matches_federation_projects_with_group_assign(self):
# create project, role, group
domain_id = CONF.identity.default_domain_id
project_ref = unit.new_project_ref(domain_id=domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
group_ref = unit.new_group_ref(domain_id=domain_id)
group_ref = self.identity_api.create_group(group_ref)
# authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml()
# assign role to group at project
self.assignment_api.create_grant(role_ref['id'],
group_id=group_ref['id'],
project_id=project_ref['id'],
domain_id=domain_id)
# add user to group
self.identity_api.add_user_to_group(user_id=user_id,
group_id=group_ref['id'])
# get auth projects
r = self.get('/auth/projects', token=unscoped_token)
auth_projects = r.result['projects']
# get federation projects
r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
fed_projects = r.result['projects']
# compare
self.assertItemsEqual(auth_projects, fed_projects)
def test_auth_domains_matches_federation_domains(self):
# create domain and role
domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id and token
user_id, unscoped_token = self._authenticate_via_saml()
# assign domain role to user
self.assignment_api.create_grant(role_ref['id'],
user_id=user_id,
domain_id=domain_ref['id'])
# get auth domains
r = self.get('/auth/domains', token=unscoped_token)
auth_domains = r.result['domains']
# get federation domains
r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
fed_domains = r.result['domains']
# compare
self.assertItemsEqual(auth_domains, fed_domains)
def test_auth_domains_matches_federation_domains_with_group_assign(self):
# create role, group, and domain
domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
group_ref = unit.new_group_ref(domain_id=domain_ref['id'])
group_ref = self.identity_api.create_group(group_ref)
# authenticate via saml get back a user id and token
user_id, unscoped_token = self._authenticate_via_saml()
# assign domain role to group
self.assignment_api.create_grant(role_ref['id'],
group_id=group_ref['id'],
domain_id=domain_ref['id'])
# add user to group
self.identity_api.add_user_to_group(user_id=user_id,
group_id=group_ref['id'])
# get auth domains
r = self.get('/auth/domains', token=unscoped_token)
auth_domains = r.result['domains']
# get federation domains
r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
fed_domains = r.result['domains']
# compare
self.assertItemsEqual(auth_domains, fed_domains)
def test_list_domains_for_user_duplicates(self):
# create role
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id and token
user_id, unscoped_token = self._authenticate_via_saml()
# get federation group domains
r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
group_domains = r.result['domains']
domain_from_group = group_domains[0]
# assign group domain and role to user, this should create a
# duplicate domain
self.assignment_api.create_grant(role_ref['id'],
user_id=user_id,
domain_id=domain_from_group['id'])
# get user domains and test for duplicates
r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
user_domains = r.result['domains']
user_domain_ids = []
for domain in user_domains:
self.assertNotIn(domain['id'], user_domain_ids)
user_domain_ids.append(domain['id'])
def test_list_projects_for_user_duplicates(self):
# create role
role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id and token
user_id, unscoped_token = self._authenticate_via_saml()
# get federation group projects
r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
group_projects = r.result['projects']
project_from_group = group_projects[0]
# assign group project and role to user, this should create a
# duplicate project
self.assignment_api.add_role_to_user_and_project(
user_id, project_from_group['id'], role_ref['id'])
# get user projects and test for duplicates
r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
user_projects = r.result['projects']
user_project_ids = []
for project in user_projects:
self.assertNotIn(project['id'], user_project_ids)
user_project_ids.append(project['id'])
def _authenticate_via_saml(self):
r = self._issue_unscoped_token()
unscoped_token = r.headers['X-Subject-Token']
token_resp = r.json_body['token']
self.assertValidMappedUser(token_resp)
return token_resp['user']['id'], unscoped_token
class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
JSON_HOME_DATA = {

View File

@ -294,12 +294,12 @@ class V3TokenDataHelper(object):
user_id, project_id)
return [self.role_api.get_role(role_id) for role_id in roles]
def populate_roles_for_groups(self, token_data, group_ids,
project_id=None, domain_id=None,
user_id=None):
def populate_roles_for_federated_user(self, token_data, group_ids,
project_id=None, domain_id=None,
user_id=None):
"""Populate roles basing on provided groups and project/domain.
Used for ephemeral users with dynamically assigned groups.
Used for federated users with dynamically assigned groups.
This method does not return anything, yet it modifies token_data in
place.
@ -309,8 +309,7 @@ class V3TokenDataHelper(object):
:param domain_id: domain ID to scope to
:param user_id: user ID
:raises keystone.exception.Unauthorized: when no roles were found for a
(group_ids, project_id) or (group_ids, domain_id) pairs.
:raises keystone.exception.Unauthorized: when no roles were found
"""
def check_roles(roles, user_id, project_id, domain_id):
@ -335,6 +334,12 @@ class V3TokenDataHelper(object):
roles = self.assignment_api.get_roles_for_groups(group_ids,
project_id,
domain_id)
roles = roles + self._get_roles_for_user(user_id, domain_id,
project_id)
# remove duplicates
roles = [dict(t) for t in set([tuple(d.items()) for d in roles])]
check_roles(roles, user_id, project_id, domain_id)
token_data['roles'] = roles
@ -653,7 +658,7 @@ class BaseProvider(provider.Provider):
}
if project_id or domain_id:
self.v3_token_data_helper.populate_roles_for_groups(
self.v3_token_data_helper.populate_roles_for_federated_user(
token_data, group_ids, project_id, domain_id, user_id)
return token_data

View File

@ -153,7 +153,7 @@ class Provider(common.BaseProvider):
"""
group_ids = [x['id'] for x in federated_dict['group_ids']]
self.v3_token_data_helper.populate_roles_for_groups(
self.v3_token_data_helper.populate_roles_for_federated_user(
token_dict, group_ids, project_id, domain_id, user_id)
def _extract_v2_token_data(self, token_data):

View File

@ -0,0 +1,7 @@
---
upgrade:
- In the policy.json file, we changed `identity:list_projects_for_groups`
to `identity:list_projects_for_user`. Likewise, we changed
`identity:list_domains_for_groups` to `identity:list_domains_for_user`. If
you have customized the policy.json file, you will need to make these
changes. This was done to better support new features around federation.