Implement controller logic for system group assignments

This commit wires up the remaining bits to expose system role
assignments for groups via the assignment API.

bp system-scope

Change-Id: I5051aa97dbecb88ee706749b26a4140f9798e084
This commit is contained in:
Lance Bragstad 2017-11-29 22:58:37 +00:00 committed by ayoung
parent 410a8f691f
commit 086dd2738b
7 changed files with 393 additions and 12 deletions

View File

@ -439,6 +439,8 @@ class GrantAssignmentV3(controller.V3Controller):
else:
ref['group'] = PROVIDERS.identity_api.get_group(group_id)
# NOTE(lbragstad): This if/else check will need to be expanded in the
# future to handle system hierarchies if that is implemented.
if domain_id:
ref['domain'] = PROVIDERS.resource_api.get_domain(domain_id)
elif project_id:
@ -549,6 +551,57 @@ class GrantAssignmentV3(controller.V3Controller):
"""
PROVIDERS.assignment_api.delete_system_grant_for_user(user_id, role_id)
@controller.protected(callback=_check_grant_protection)
def list_system_grants_for_group(self, request, group_id):
"""List all system grants for a specific group.
:param request: the request object
:param group_id: ID of the group
:returns: a list of grants the group has on the system
"""
refs = PROVIDERS.assignment_api.list_system_grants_for_group(group_id)
return GrantAssignmentV3.wrap_collection(request.context_dict, refs)
@controller.protected(callback=_check_grant_protection)
def check_system_grant_for_group(self, request, role_id, group_id):
"""Check if a group has a specific role on the system.
:param request: the request object
:param role_id: the ID of the role to check
:param group_id: the ID of the group to check
"""
PROVIDERS.assignment_api.check_system_grant_for_group(
group_id, role_id
)
@controller.protected(callback=_check_grant_protection)
def create_system_grant_for_group(self, request, role_id, group_id):
"""Grant a role to a group on the system.
:param request: the request object
:param role_id: the ID of the role to grant to the group
:param group_id: the ID of the group
"""
PROVIDERS.assignment_api.create_system_grant_for_group(
group_id, role_id
)
@controller.protected(callback=functools.partial(_check_grant_protection))
def revoke_system_grant_for_group(self, request, role_id, group_id):
"""Revoke a role from the group on the system.
:param request: the request object
:param role_id: the ID of the role to remove
:param user_id: the ID of the user
"""
PROVIDERS.assignment_api.delete_system_grant_for_group(
group_id, role_id
)
class RoleAssignmentV3(controller.V3Controller):
"""The V3 Role Assignment APIs, really just list_role_assignment()."""

View File

@ -880,6 +880,14 @@ class Manager(manager.Manager):
'user_id': user_id,
'role_id': assignment['id']}
)
if group_id:
assignments = self.list_system_grants_for_group(group_id)
for assignment in assignments:
system_assignments.append(
{'system': {'all': True},
'group_id': group_id,
'role_id': assignment['id']}
)
assignments = []
for assignment in itertools.chain(
@ -1184,9 +1192,14 @@ class Manager(manager.Manager):
"""
target_id = self._SYSTEM_SCOPE_TOKEN
assignment_type = self._GROUP_SYSTEM
return self.driver.list_system_grants(
grants = self.driver.list_system_grants(
group_id, target_id, assignment_type
)
grant_ids = []
for grant in grants:
grant_ids.append(grant['role_id'])
return PROVIDERS.role_api.list_roles_from_ids(grant_ids)
def create_system_grant_for_group(self, group_id, role_id):
"""Grant a group a role on the system.

View File

@ -195,6 +195,25 @@ class Routers(wsgi.RoutersBase):
'role_id': json_home.Parameters.ROLE_ID,
'user_id': json_home.Parameters.USER_ID
})
self._add_resource(
mapper, grant_controller,
path='/system/groups/{group_id}/roles',
get_head_action='list_system_grants_for_group',
rel=json_home.build_v3_resource_relation('system_group_roles'),
path_vars={
'group_id': json_home.Parameters.GROUP_ID
})
self._add_resource(
mapper, grant_controller,
path='/system/groups/{group_id}/roles/{role_id}',
get_head_action='check_system_grant_for_group',
put_action='create_system_grant_for_group',
delete_action='revoke_system_grant_for_group',
rel=json_home.build_v3_resource_relation('system_group_role'),
path_vars={
'role_id': json_home.Parameters.ROLE_ID,
'group_id': json_home.Parameters.GROUP_ID
})
self._add_resource(
mapper, controllers.RoleAssignmentV3(),

View File

@ -3851,10 +3851,9 @@ class SystemAssignmentTests(AssignmentTestHelperMixin):
group_id
)
self.assertEqual(len(system_roles), 1)
self.assertEqual(system_roles[0]['type'], 'GroupSystem')
self.assertEqual(system_roles[0]['target_id'], 'system')
self.assertEqual(system_roles[0]['actor_id'], group_id)
self.assertFalse(system_roles[0]['inherited'])
self.assertIsNone(system_roles[0]['domain_id'])
self.assertEqual(system_roles[0]['id'], role_ref['id'])
self.assertEqual(system_roles[0]['name'], role_ref['name'])
def test_list_system_grants_for_group(self):
group_ref = unit.new_group_ref(CONF.identity.default_domain_id)

View File

@ -1017,7 +1017,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
resource_url=None):
entities = resp.result.get('role_assignments')
if expected_length:
if expected_length or expected_length == 0:
self.assertEqual(expected_length, len(entities))
# Collections should have relational links

View File

@ -3100,7 +3100,7 @@ class ListUserProjectsTestCase(test_v3.RestfulTestCase):
self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])
class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase):
class SystemRoleAssignmentMixin(object):
def _create_new_role(self):
"""Create a role available for use anywhere and return the ID."""
@ -3110,11 +3110,15 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase):
# return the ID of the role we just created.
return response.json_body['role']['id']
# FIXME(lbragstad): These tests contain system-level API calls, which means
# they will log a warning message if they are called with a project-scoped
# token, regardless of the role assignment on the project. We need to fix
# them by using a proper system-scoped admin token to make the call instead
# of a project scoped token.
# FIXME(lbragstad): These tests contain system-level API calls, which means
# they will log a warning message if they are called with a project-scoped
# token, regardless of the role assignment on the project. We need to fix
# them by using a proper system-scoped admin token to make the call instead
# of a project scoped token.
class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase,
SystemRoleAssignmentMixin):
def test_assign_system_role_to_user(self):
system_role_id = self._create_new_role()
@ -3392,3 +3396,283 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase):
'&scope.project.id=%(project_id)s'
) % {'project_id': self.project_id}
self.get(path, expected_status=http_client.BAD_REQUEST)
# FIXME(lbragstad): These tests contain system-level API calls, which means
# they will log a warning message if they are called with a project-scoped
# token, regardless of the role assignment on the project. We need to fix
# them by using a proper system-scoped admin token to make the call instead
# of a project scoped token.
class GroupSystemRoleAssignmentTestCase(test_v3.RestfulTestCase,
SystemRoleAssignmentMixin):
def _create_group(self):
body = {
'group': {
'domain_id': self.domain_id,
'name': uuid.uuid4().hex
}
}
response = self.post('/groups/', body=body)
return response.json_body['group']
def test_assign_system_role_to_group(self):
system_role_id = self._create_new_role()
group = self._create_group()
# assign the role to the group globally
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
# validate the role assignment
self.head(member_url)
# list global roles
collection_url = '/system/groups/%(group_id)s/roles' % {
'group_id': group['id']
}
roles = self.get(collection_url).json_body['roles']
self.assertEqual(len(roles), 1)
self.assertEqual(roles[0]['id'], system_role_id)
self.head(collection_url, expected_status=http_client.OK)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
self.assertEqual(
response.json_body['role_assignments'][0]['role']['id'],
system_role_id
)
def test_assign_system_role_to_non_existant_group_fails(self):
system_role_id = self._create_new_role()
group_id = uuid.uuid4().hex
# assign the role to the group globally
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group_id,
'role_id': system_role_id
}
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_list_role_assignments_for_group_returns_all_assignments(self):
system_role_id = self._create_new_role()
group = self._create_group()
# assign the role to the group globally and on a single project
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
member_url = (
'/projects/%(project_id)s/groups/%(group_id)s/'
'roles/%(role_id)s'
) % {
'project_id': self.project_id,
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
# make sure both assignments exist in the response, there should be two
response = self.get(
'/role_assignments?group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=2)
def test_list_system_roles_for_group_returns_none_without_assignment(self):
group = self._create_group()
# list global roles for group
collection_url = '/system/groups/%(group_id)s/roles' % {
'group_id': group['id']
}
response = self.get(collection_url)
# assert that the group doesn't have any system role assignments, which
# is denoted by an empty list
self.assertEqual(response.json_body['roles'], [])
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=0)
def test_list_system_roles_for_group_does_not_return_project_roles(self):
system_role_id = self._create_new_role()
project_role_id = self._create_new_role()
group = self._create_group()
# assign the group a role on the system and a role on a project
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'], 'role_id': system_role_id
}
self.put(member_url)
member_url = (
'/projects/%(project_id)s/groups/%(group_id)s/'
'roles/%(role_id)s'
) % {
'project_id': self.project_id,
'group_id': group['id'],
'role_id': project_role_id
}
self.put(member_url)
# list system role assignments
collection_url = '/system/groups/%(group_id)s/roles' % {
'group_id': group['id']
}
response = self.get(collection_url)
# assert the project role assignment is not in the system role
# assignments
for role in response.json_body['roles']:
self.assertNotEqual(role['id'], project_role_id)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
def test_list_system_roles_for_group_does_not_return_domain_roles(self):
system_role_id = self._create_new_role()
domain_role_id = self._create_new_role()
group = self._create_group()
# assign a role to the group on a domain
domain_member_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/'
'roles/%(role_id)s' % {
'domain_id': group['domain_id'],
'group_id': group['id'],
'role_id': domain_role_id
}
)
self.put(domain_member_url)
# assign the group a role on the system
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
# list domain role assignments
response = self.get(
'/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
'domain_id': group['domain_id'], 'group_id': group['id']
}
)
self.assertEqual(len(response.json_body['roles']), 1)
# list system role assignments
collection_url = '/system/groups/%(group_id)s/roles' % {
'group_id': group['id']
}
response = self.get(collection_url)
# assert the domain role assignment is not in the system role
# assignments
for role in response.json_body['roles']:
self.assertNotEqual(role['id'], domain_role_id)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
def test_check_group_has_system_role_when_assignment_exists(self):
system_role_id = self._create_new_role()
group = self._create_group()
# assign the group a role on the system
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
# check the group has the system role assignment
self.head(member_url)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
self.assertEqual(
response.json_body['role_assignments'][0]['role']['id'],
system_role_id
)
def test_check_group_does_not_have_system_role_without_assignment(self):
system_role_id = self._create_new_role()
group = self._create_group()
# check the group does't have the system role assignment
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.head(member_url, expected_status=http_client.NOT_FOUND)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=0)
def test_unassign_system_role_from_group(self):
system_role_id = self._create_new_role()
group = self._create_group()
# assign the group a role on the system
member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': system_role_id
}
self.put(member_url)
# ensure the group has the role assignment
self.head(member_url)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertEqual(len(response.json_body['role_assignments']), 1)
self.assertValidRoleAssignmentListResponse(response)
# remove the system role assignment from the group
self.delete(member_url)
# ensure the group doesn't have any system role assignments
collection_url = '/system/groups/%(group_id)s/roles' % {
'group_id': group['id']
}
response = self.get(collection_url)
self.assertEqual(len(response.json_body['roles']), 0)
response = self.get(
'/role_assignments?scope.system=all&group.id=%(group_id)s' % {
'group_id': group['id']
}
)
self.assertValidRoleAssignmentListResponse(response, expected_length=0)

View File

@ -204,6 +204,19 @@ V3_JSON_HOME_RESOURCES = {
'user_id': json_home.Parameters.USER_ID
}
},
json_home.build_v3_resource_relation('system_group_role'): {
'href-template': '/system/groups/{group_id}/roles/{role_id}',
'href-vars': {
'group_id': json_home.Parameters.GROUP_ID,
'role_id': json_home.Parameters.ROLE_ID
}
},
json_home.build_v3_resource_relation('system_group_roles'): {
'href-template': '/system/groups/{group_id}/roles',
'href-vars': {
'group_id': json_home.Parameters.GROUP_ID
}
},
json_home.build_v3_resource_relation('domain'): {
'href-template': '/domains/{domain_id}',
'href-vars': {'domain_id': json_home.Parameters.DOMAIN_ID, }},