Merge "Allow domain admins to list users in groups with v3 policy"

This commit is contained in:
Jenkins 2016-06-02 18:36:25 +00:00 committed by Gerrit Code Review
commit 048af33f36
3 changed files with 88 additions and 3 deletions

View File

@ -55,7 +55,7 @@
"admin_and_matching_group_domain_id": "rule:admin_required and domain_id:%(group.domain_id)s",
"identity:get_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
"identity:list_groups": "rule:cloud_admin or rule:admin_and_matching_domain_id",
"identity:list_groups_for_user": "rule:owner or rule:admin_and_matching_domain_id",
"identity:list_groups_for_user": "rule:owner or rule:admin_and_matching_target_user_domain_id",
"identity:create_group": "rule:cloud_admin or rule:admin_and_matching_group_domain_id",
"identity:update_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
"identity:delete_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",

View File

@ -211,6 +211,11 @@ class UserV3(controller.V3Controller):
ref['group'] = self.identity_api.get_group(group_id)
self.check_protection(context, prep_info, ref)
def _check_group_protection(self, context, prep_info, group_id):
ref = {}
ref['group'] = self.identity_api.get_group(group_id)
self.check_protection(context, prep_info, ref)
@controller.protected()
@validation.validated(schema.user_create, 'user')
def create_user(self, context, user):
@ -229,7 +234,8 @@ class UserV3(controller.V3Controller):
hints=hints)
return UserV3.wrap_collection(context, refs, hints=hints)
@controller.filterprotected('domain_id', 'enabled', 'name')
@controller.filterprotected('domain_id', 'enabled', 'name',
callback=_check_group_protection)
def list_users_in_group(self, context, filters, group_id):
hints = UserV3.build_driver_hints(context, filters)
refs = self.identity_api.list_users_in_group(group_id, hints=hints)
@ -299,6 +305,11 @@ class GroupV3(controller.V3Controller):
super(GroupV3, self).__init__()
self.get_member_from_driver = self.identity_api.get_group
def _check_user_protection(self, context, prep_info, user_id):
ref = {}
ref['user'] = self.identity_api.get_user(user_id)
self.check_protection(context, prep_info, ref)
@controller.protected()
@validation.validated(schema.group_create, 'group')
def create_group(self, context, group):
@ -317,7 +328,7 @@ class GroupV3(controller.V3Controller):
hints=hints)
return GroupV3.wrap_collection(context, refs, hints=hints)
@controller.filterprotected('name')
@controller.filterprotected('name', callback=_check_user_protection)
def list_groups_for_user(self, context, filters, user_id):
hints = GroupV3.build_driver_hints(context, filters)
refs = self.identity_api.list_groups_for_user(user_id, hints=hints)

View File

@ -576,6 +576,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
- just_a_user has a non-admin role on both domainA and the project.
- admin_domain has admin_project, and user cloud_admin_user, with an
'admin' role on admin_project.
- domainA has two groups (group1, group2), domainB has one group
(group3)
We test various api protection rules from the cloud sample policy
file to make sure the sample is valid and that we correctly enforce it.
@ -668,6 +670,15 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
user_id=self.just_a_user['id'],
project_id=self.project['id'])
self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group1 = self.identity_api.create_group(self.group1)
self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group2 = self.identity_api.create_group(self.group2)
self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
self.group3 = self.identity_api.create_group(self.group3)
def _stati(self, expected_status):
# Return the expected return codes for APIs with and without data
# with any specified status overriding the normal values
@ -696,6 +707,33 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.post('/users', auth=self.auth, body={'user': user_ref},
expected_status=status_created)
def _test_group_management(self, group, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/groups/%s' % group['id']
list_url = '/groups?domain_id=%s' % group['domain_id']
users_url = '/groups/%s/users' % group['id']
group_member_url = '/groups/%s/users/%s' % (group['id'],
self.just_a_user['id'])
self.get(entity_url, auth=self.auth,
expected_status=status_OK)
self.get(list_url, auth=self.auth,
expected_status=status_OK)
self.put(group_member_url, auth=self.auth,
expected_status=status_no_data)
self.get(users_url, auth=self.auth,
expected_status=status_OK)
group_ref = unit.new_group_ref(domain_id=group['domain_id'])
self.post('/groups', auth=self.auth, body={'group': group_ref},
expected_status=status_created)
self.delete(group_member_url, auth=self.auth,
expected_status=status_no_data)
group = {'description': 'Updated'}
self.patch(entity_url, auth=self.auth, body={'group': group},
expected_status=status_OK)
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
def _test_project_management(self, domain_id, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
entity_url = '/projects/%s' % self.project['id']
@ -920,6 +958,42 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self._test_user_management(self.domainA['id'])
def test_group_management(self):
# First, authenticate with a user that does not have the domain
# admin role - shouldn't be able to do much.
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'],
domain_id=self.domainA['id'])
self._test_group_management(
self.group1, expected=exception.ForbiddenAction.code)
# ...but should be able to list groups of which they are a member
url = '/users/%s/groups' % self.just_a_user['id']
self.get(url, auth=self.auth)
# Now, authenticate with a user that does have the domain admin role
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
self._test_group_management(self.group1)
self._test_group_management(self.group3,
expected=exception.ForbiddenAction.code)
def test_group_management_by_cloud_admin(self):
# Test groups management with a cloud admin. This user should
# be able to manage groups in any domain.
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
project_id=self.admin_project['id'])
self._test_group_management(self.group1)
self._test_group_management(self.group3)
def test_project_management(self):
# First, authenticate with a user that does not have the project
# admin role - shouldn't be able to do much.