Merge "Add ability to list all system role assignments"
This commit is contained in:
commit
c6cf40ba39
@ -41,6 +41,10 @@ of assignments returned in the collection. For example:
|
||||
|
||||
GET /role_assignments?scope.system=all?user.id={user_id}
|
||||
|
||||
- List system role assignments for all users and groups::
|
||||
|
||||
GET /role_assignments?scope.system=all
|
||||
|
||||
Since Identity API v3.10, you can grant role assignments to users and groups on
|
||||
an entity called the ``system``. The role assignment API also supports listing
|
||||
and filtering role assignments on the system.
|
||||
|
@ -308,9 +308,12 @@ class Assignment(base.AssignmentDriverBase):
|
||||
def list_system_grants(self, actor_id, target_id, assignment_type):
|
||||
with sql.session_for_read() as session:
|
||||
query = session.query(SystemRoleAssignment)
|
||||
query = query.filter_by(actor_id=actor_id)
|
||||
query = query.filter_by(target_id=target_id)
|
||||
query = query.filter_by(type=assignment_type)
|
||||
if actor_id:
|
||||
query = query.filter_by(actor_id=actor_id)
|
||||
if target_id:
|
||||
query = query.filter_by(target_id=target_id)
|
||||
if assignment_type:
|
||||
query = query.filter_by(type=assignment_type)
|
||||
results = query.all()
|
||||
|
||||
return [role.to_dict() for role in results]
|
||||
|
@ -880,7 +880,7 @@ class Manager(manager.Manager):
|
||||
'user_id': user_id,
|
||||
'role_id': assignment['id']}
|
||||
)
|
||||
if group_id:
|
||||
elif group_id:
|
||||
assignments = self.list_system_grants_for_group(group_id)
|
||||
for assignment in assignments:
|
||||
system_assignments.append(
|
||||
@ -888,6 +888,17 @@ class Manager(manager.Manager):
|
||||
'group_id': group_id,
|
||||
'role_id': assignment['id']}
|
||||
)
|
||||
else:
|
||||
assignments = self.list_all_system_grants()
|
||||
for assignment in assignments:
|
||||
a = {}
|
||||
if assignment['type'] == self._GROUP_SYSTEM:
|
||||
a['group_id'] = assignment['actor_id']
|
||||
elif assignment['type'] == self._USER_SYSTEM:
|
||||
a['user_id'] = assignment['actor_id']
|
||||
a['role_id'] = assignment['role_id']
|
||||
a['system'] = {'all': True}
|
||||
system_assignments.append(a)
|
||||
|
||||
assignments = []
|
||||
for assignment in itertools.chain(
|
||||
@ -1240,6 +1251,15 @@ class Manager(manager.Manager):
|
||||
role_id, group_id, target_id, inherited
|
||||
)
|
||||
|
||||
def list_all_system_grants(self):
|
||||
"""Return a list of all system grants."""
|
||||
actor_id = None
|
||||
target_id = self._SYSTEM_SCOPE_TOKEN
|
||||
assignment_type = None
|
||||
return self.driver.list_system_grants(
|
||||
actor_id, target_id, assignment_type
|
||||
)
|
||||
|
||||
|
||||
class RoleManager(manager.Manager):
|
||||
"""Default pivot point for the Role backend."""
|
||||
|
@ -28,8 +28,40 @@ from keystone.tests.unit import test_v3
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
|
||||
class SystemRoleAssignmentMixin(object):
|
||||
|
||||
def _create_new_role(self):
|
||||
"""Create a role available for use anywhere and return the ID."""
|
||||
ref = unit.new_role_ref()
|
||||
response = self.post('/roles', body={'role': ref})
|
||||
# We only really need the role ID, so omit the rest of the response and
|
||||
# return the ID of the role we just created.
|
||||
return response.json_body['role']['id']
|
||||
|
||||
def _create_group(self):
|
||||
body = {
|
||||
'group': {
|
||||
'domain_id': self.domain_id,
|
||||
'name': uuid.uuid4().hex
|
||||
}
|
||||
}
|
||||
response = self.post('/groups/', body=body)
|
||||
return response.json_body['group']
|
||||
|
||||
def _create_user(self):
|
||||
body = {
|
||||
'user': {
|
||||
'domain_id': self.domain_id,
|
||||
'name': uuid.uuid4().hex
|
||||
}
|
||||
}
|
||||
response = self.post('/users/', body=body)
|
||||
return response.json_body['user']
|
||||
|
||||
|
||||
class AssignmentTestCase(test_v3.RestfulTestCase,
|
||||
test_v3.AssignmentTestMixin):
|
||||
test_v3.AssignmentTestMixin,
|
||||
SystemRoleAssignmentMixin):
|
||||
"""Test roles and role assignments."""
|
||||
|
||||
def setUp(self):
|
||||
@ -953,6 +985,116 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
|
||||
self.assertRoleAssignmentInListResponse(r, up_entity)
|
||||
self.assertRoleAssignmentInListResponse(r, up1_entity)
|
||||
|
||||
def test_list_system_role_assignments(self):
|
||||
# create a bunch of roles
|
||||
user_system_role_id = self._create_new_role()
|
||||
user_domain_role_id = self._create_new_role()
|
||||
user_project_role_id = self._create_new_role()
|
||||
group_system_role_id = self._create_new_role()
|
||||
group_domain_role_id = self._create_new_role()
|
||||
group_project_role_id = self._create_new_role()
|
||||
|
||||
# create a user and grant the user a role on the system, domain, and
|
||||
# project
|
||||
user = self._create_user()
|
||||
url = '/system/users/%s/roles/%s' % (user['id'], user_system_role_id)
|
||||
self.put(url)
|
||||
url = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain_id, user['id'], user_domain_role_id
|
||||
)
|
||||
self.put(url)
|
||||
url = '/projects/%s/users/%s/roles/%s' % (
|
||||
self.project_id, user['id'], user_project_role_id
|
||||
)
|
||||
self.put(url)
|
||||
|
||||
# create a group and grant the group a role on the system, domain, and
|
||||
# project
|
||||
group = self._create_group()
|
||||
url = '/system/groups/%s/roles/%s' % (
|
||||
group['id'], group_system_role_id
|
||||
)
|
||||
self.put(url)
|
||||
url = '/domains/%s/groups/%s/roles/%s' % (
|
||||
self.domain_id, group['id'], group_domain_role_id
|
||||
)
|
||||
self.put(url)
|
||||
url = '/projects/%s/groups/%s/roles/%s' % (
|
||||
self.project_id, group['id'], group_project_role_id
|
||||
)
|
||||
self.put(url)
|
||||
|
||||
# /v3/role_assignments?scope.system=all should return two assignments
|
||||
response = self.get('/role_assignments?scope.system=all')
|
||||
self.assertValidRoleAssignmentListResponse(response, expected_length=2)
|
||||
for assignment in response.json_body['role_assignments']:
|
||||
self.assertTrue(assignment['scope']['system']['all'])
|
||||
if assignment.get('user'):
|
||||
self.assertEqual(user_system_role_id, assignment['role']['id'])
|
||||
if assignment.get('group'):
|
||||
self.assertEqual(
|
||||
group_system_role_id,
|
||||
assignment['role']['id']
|
||||
)
|
||||
|
||||
# /v3/role_assignments?scope_system=all&user.id=$USER_ID should return
|
||||
# one role assignment
|
||||
url = '/role_assignments?scope.system=all&user.id=%s' % user['id']
|
||||
response = self.get(url)
|
||||
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
|
||||
self.assertEqual(
|
||||
user_system_role_id,
|
||||
response.json_body['role_assignments'][0]['role']['id']
|
||||
)
|
||||
|
||||
# /v3/role_assignments?scope_system=all&group.id=$GROUP_ID should
|
||||
# return one role assignment
|
||||
url = '/role_assignments?scope.system=all&group.id=%s' % group['id']
|
||||
response = self.get(url)
|
||||
self.assertValidRoleAssignmentListResponse(response, expected_length=1)
|
||||
self.assertEqual(
|
||||
group_system_role_id,
|
||||
response.json_body['role_assignments'][0]['role']['id']
|
||||
)
|
||||
|
||||
# /v3/role_assignments?user.id=$USER_ID should return 3 assignments
|
||||
# and system should be in that list of assignments
|
||||
url = '/role_assignments?user.id=%s' % user['id']
|
||||
response = self.get(url)
|
||||
self.assertValidRoleAssignmentListResponse(response, expected_length=3)
|
||||
for assignment in response.json_body['role_assignments']:
|
||||
if 'system' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
user_system_role_id, assignment['role']['id']
|
||||
)
|
||||
if 'domain' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
user_domain_role_id, assignment['role']['id']
|
||||
)
|
||||
if 'project' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
user_project_role_id, assignment['role']['id']
|
||||
)
|
||||
|
||||
# /v3/role_assignments?group.id=$GROUP_ID should return 3 assignments
|
||||
# and system should be in that list of assignments
|
||||
url = '/role_assignments?group.id=%s' % group['id']
|
||||
response = self.get(url)
|
||||
self.assertValidRoleAssignmentListResponse(response, expected_length=3)
|
||||
for assignment in response.json_body['role_assignments']:
|
||||
if 'system' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
group_system_role_id, assignment['role']['id']
|
||||
)
|
||||
if 'domain' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
group_domain_role_id, assignment['role']['id']
|
||||
)
|
||||
if 'project' in assignment['scope']:
|
||||
self.assertEqual(
|
||||
group_project_role_id, assignment['role']['id']
|
||||
)
|
||||
|
||||
|
||||
class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
|
||||
test_v3.AssignmentTestMixin):
|
||||
@ -3100,17 +3242,6 @@ class ListUserProjectsTestCase(test_v3.RestfulTestCase):
|
||||
self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])
|
||||
|
||||
|
||||
class SystemRoleAssignmentMixin(object):
|
||||
|
||||
def _create_new_role(self):
|
||||
"""Create a role available for use anywhere and return the ID."""
|
||||
ref = unit.new_role_ref()
|
||||
response = self.post('/roles', body={'role': ref})
|
||||
# We only really need the role ID, so omit the rest of the response and
|
||||
# return the ID of the role we just created.
|
||||
return response.json_body['role']['id']
|
||||
|
||||
|
||||
# FIXME(lbragstad): These tests contain system-level API calls, which means
|
||||
# they will log a warning message if they are called with a project-scoped
|
||||
# token, regardless of the role assignment on the project. We need to fix
|
||||
@ -3406,16 +3537,6 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase,
|
||||
class GroupSystemRoleAssignmentTestCase(test_v3.RestfulTestCase,
|
||||
SystemRoleAssignmentMixin):
|
||||
|
||||
def _create_group(self):
|
||||
body = {
|
||||
'group': {
|
||||
'domain_id': self.domain_id,
|
||||
'name': uuid.uuid4().hex
|
||||
}
|
||||
}
|
||||
response = self.post('/groups/', body=body)
|
||||
return response.json_body['group']
|
||||
|
||||
def test_assign_system_role_to_group(self):
|
||||
system_role_id = self._create_new_role()
|
||||
group = self._create_group()
|
||||
|
Loading…
x
Reference in New Issue
Block a user