Add domain scope support for group policies

This commit adds support for the domain scope type for the group API
policies. It defines appropriate policies for the reader, member, and
admin role and adds tests for each case.

Change-Id: Iaff3c0e45423ef427ef1458250c402c44be4b1d6
Closes-bug: #1808859
Partial-Bug: #968696
This commit is contained in:
Colleen Murphy 2019-03-18 14:20:15 +01:00
parent 947e0a2e39
commit be452fee80
5 changed files with 757 additions and 51 deletions

View File

@ -21,12 +21,14 @@ from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common import validation
import keystone.conf
from keystone import exception
from keystone.identity import schema
from keystone import notifications
from keystone.server import flask as ks_flask
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
@ -73,11 +75,21 @@ class GroupsResource(ks_flask.ResourceBase):
GET/HEAD /groups
"""
filters = ['domain_id', 'name']
ENFORCER.enforce_call(action='identity:list_groups', filters=filters)
target = None
if self.oslo_context.domain_id:
target = {'group': {'domain_id': self.oslo_context.domain_id}}
ENFORCER.enforce_call(action='identity:list_groups', filters=filters,
target_attr=target)
hints = self.build_driver_hints(filters)
domain = self._get_domain_id_for_list_request()
refs = PROVIDERS.identity_api.list_groups(domain_scope=domain,
hints=hints)
if self.oslo_context.domain_id:
filtered_refs = []
for ref in refs:
if ref['domain_id'] == target['group']['domain_id']:
filtered_refs.append(ref)
refs = filtered_refs
return self.wrap_collection(refs, hints=hints)
def post(self):
@ -85,8 +97,11 @@ class GroupsResource(ks_flask.ResourceBase):
POST /groups
"""
ENFORCER.enforce_call(action='identity:create_group')
group = self.request_body_json.get('group', {})
target = {'group': group}
ENFORCER.enforce_call(
action='identity:create_group', target_attr=target
)
validation.lazy_validate(schema.group_create, group)
group = self._normalize_dict(group)
group = self._normalize_domain_id(group)
@ -99,7 +114,10 @@ class GroupsResource(ks_flask.ResourceBase):
PATCH /groups/{group_id}
"""
ENFORCER.enforce_call(action='identity:update_group')
ENFORCER.enforce_call(
action='identity:update_group',
build_target=_build_group_target_enforcement
)
group = self.request_body_json.get('group', {})
validation.lazy_validate(schema.group_update, group)
self._require_matching_id(group)
@ -118,16 +136,16 @@ class GroupsResource(ks_flask.ResourceBase):
return None, http_client.NO_CONTENT
class GroupUsersResource(flask_restful.Resource):
class GroupUsersResource(ks_flask.ResourceBase):
def get(self, group_id):
"""Get list of users in group.
GET/HEAD /groups/{group_id}/users
"""
filters = ['domain_id', 'enabled', 'name', 'password_expires_at']
target = {}
target = None
try:
target['group'] = PROVIDERS.identity_api.get_group(group_id)
target = {'group': PROVIDERS.identity_api.get_group(group_id)}
except exception.GroupNotFound:
# NOTE(morgan): If we have an issue populating the group
# data, leage target empty. This is the safest route and does not
@ -138,6 +156,12 @@ class GroupUsersResource(flask_restful.Resource):
hints = ks_flask.ResourceBase.build_driver_hints(filters)
refs = PROVIDERS.identity_api.list_users_in_group(
group_id, hints=hints)
if (self.oslo_context.domain_id):
filtered_refs = []
for ref in refs:
if ref['domain_id'] == self.oslo_context.domain_id:
filtered_refs.append(ref)
refs = filtered_refs
return ks_flask.ResourceBase.wrap_collection(
refs, hints=hints, collection_name='users')

View File

@ -98,6 +98,10 @@ def _build_user_target_enforcement():
target['user'] = PROVIDERS.identity_api.get_user(
flask.request.view_args.get('user_id')
)
if flask.request.view_args.get('group_id'):
target['group'] = PROVIDERS.identity_api.get_group(
flask.request.view_args.get('group_id')
)
except ks_exception.NotFound: # nosec
# Defer existence in the event the user doesn't exist, we'll
# check this later anyway.
@ -285,10 +289,15 @@ class UserGroupsResource(ks_flask.ResourceBase):
@staticmethod
def _built_target_attr_enforcement():
ref = {}
ref = None
if flask.request.view_args:
ref['user'] = PROVIDERS.identity_api.get_user(
flask.request.view_args.get('user_id'))
try:
ref = {'user': PROVIDERS.identity_api.get_user(
flask.request.view_args.get('user_id'))}
except ks_exception.NotFound: # nosec
# Defer existence in the event the user doesn't exist, we'll
# check this later anyway.
pass
return ref
def get(self, user_id):
@ -303,6 +312,12 @@ class UserGroupsResource(ks_flask.ResourceBase):
filters=filters)
refs = PROVIDERS.identity_api.list_groups_for_user(user_id=user_id,
hints=hints)
if (self.oslo_context.domain_id):
filtered_refs = []
for ref in refs:
if ref['domain_id'] == self.oslo_context.domain_id:
filtered_refs.append(ref)
refs = filtered_refs
return self.wrap_collection(refs, hints=hints)

View File

@ -15,8 +15,34 @@ from oslo_policy import policy
from keystone.common.policies import base
SYSTEM_READER_OR_OWNER = (
'(role:reader and system_scope:all) or user_id:%(user_id)s'
SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER = (
'(role:reader and system_scope:all) or '
'(role:reader and domain_id:%(target.user.domain_id)s) or '
'user_id:%(user_id)s'
)
SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER = (
'(role:reader and system_scope:all) or '
'(role:reader and '
'domain_id:%(target.group.domain_id)s and '
'domain_id:%(target.user.domain_id)s)'
)
SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER = (
'(role:admin and system_scope:all) or '
'(role:admin and '
'domain_id:%(target.group.domain_id)s and '
'domain_id:%(target.user.domain_id)s)'
)
SYSTEM_READER_OR_DOMAIN_READER = (
'(role:reader and system_scope:all) or '
'(role:reader and domain_id:%(target.group.domain_id)s)'
)
SYSTEM_ADMIN_OR_DOMAIN_ADMIN = (
'(role:admin and system_scope:all) or '
'(role:admin and domain_id:%(target.group.domain_id)s)'
)
DEPRECATED_REASON = """
@ -70,14 +96,8 @@ deprecated_add_user_to_group = policy.DeprecatedRule(
group_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_group',
check_str=base.SYSTEM_READER,
# FIXME(lbragstad): Groups have traditionally been a resource managed
# by system or cloud administrators. If, or when, keystone supports the
# ability for groups to be created or managed by project
# administrators, scope_types should also include 'project'. Until
# then, let's make sure these APIs are only accessible to system
# administrators.
scope_types=['system'],
check_str=SYSTEM_READER_OR_DOMAIN_READER,
scope_types=['system', 'domain'],
description='Show group details.',
operations=[{'path': '/v3/groups/{group_id}',
'method': 'GET'},
@ -88,8 +108,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_groups',
check_str=base.SYSTEM_READER,
scope_types=['system'],
check_str=SYSTEM_READER_OR_DOMAIN_READER,
scope_types=['system', 'domain'],
description='List groups.',
operations=[{'path': '/v3/groups',
'method': 'GET'},
@ -100,8 +120,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_groups_for_user',
check_str=SYSTEM_READER_OR_OWNER,
scope_types=['system', 'project'],
check_str=SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER,
scope_types=['system', 'domain', 'project'],
description='List groups to which a user belongs.',
operations=[{'path': '/v3/users/{user_id}/groups',
'method': 'GET'},
@ -112,8 +132,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'create_group',
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
scope_types=['system', 'domain'],
description='Create group.',
operations=[{'path': '/v3/groups',
'method': 'POST'}],
@ -122,8 +142,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'update_group',
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
scope_types=['system', 'domain'],
description='Update group.',
operations=[{'path': '/v3/groups/{group_id}',
'method': 'PATCH'}],
@ -132,8 +152,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'delete_group',
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
scope_types=['system', 'domain'],
description='Delete group.',
operations=[{'path': '/v3/groups/{group_id}',
'method': 'DELETE'}],
@ -142,8 +162,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_users_in_group',
check_str=base.SYSTEM_READER,
scope_types=['system'],
check_str=SYSTEM_READER_OR_DOMAIN_READER,
scope_types=['system', 'domain'],
description='List members of a specific group.',
operations=[{'path': '/v3/groups/{group_id}/users',
'method': 'GET'},
@ -154,8 +174,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'remove_user_from_group',
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER,
scope_types=['system', 'domain'],
description='Remove user from group.',
operations=[{'path': '/v3/groups/{group_id}/users/{user_id}',
'method': 'DELETE'}],
@ -164,8 +184,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'check_user_in_group',
check_str=base.SYSTEM_READER,
scope_types=['system'],
check_str=SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER,
scope_types=['system', 'domain'],
description='Check whether a user is a member of a group.',
operations=[{'path': '/v3/groups/{group_id}/users/{user_id}',
'method': 'HEAD'},
@ -176,8 +196,8 @@ group_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'add_user_to_group',
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER,
scope_types=['system', 'domain'],
description='Add user to group.',
operations=[{'path': '/v3/groups/{group_id}/users/{user_id}',
'method': 'PUT'}],

View File

@ -12,14 +12,17 @@
import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common.policies import group as gp
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
@ -121,7 +124,7 @@ class _SystemUserGroupTests(object):
)
class _SystemMemberAndReaderGroupTests(object):
class _SystemAndDomainMemberAndReaderGroupTests(object):
"""Common default functionality for system readers and system members."""
def test_user_cannot_create_group(self):
@ -215,7 +218,7 @@ class _SystemMemberAndReaderGroupTests(object):
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserGroupTests,
_SystemMemberAndReaderGroupTests):
_SystemAndDomainMemberAndReaderGroupTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
@ -249,7 +252,7 @@ class SystemReaderTests(base_classes.TestCaseWithBootstrap,
class SystemMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserGroupTests,
_SystemMemberAndReaderGroupTests):
_SystemAndDomainMemberAndReaderGroupTests):
def setUp(self):
super(SystemMemberTests, self).setUp()
@ -385,6 +388,639 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
)
class _DomainUserGroupTests(object):
def test_user_can_list_groups_in_domain(self):
# second domain
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
# one group in new domain
group1 = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
# one group in user's domain
group2 = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
# user should only see one group
with self.test_client() as c:
r = c.get('/v3/groups', headers=self.headers)
self.assertEqual(1, len(r.json['groups']))
self.assertNotIn(group1['id'], [g['id'] for g in r.json['groups']])
self.assertEqual(group2['id'], r.json['groups'][0]['id'])
def test_user_cannot_list_groups_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
r = c.get('/v3/groups?domain_id=%s' % domain['id'],
headers=self.headers)
self.assertEqual(0, len(r.json['groups']))
def test_user_can_get_group_in_domain(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
r = c.get('/v3/groups/%s' % group['id'],
headers=self.headers)
self.assertEqual(group['id'], r.json['group']['id'])
def test_user_cannot_get_group_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.get('/v3/groups/%s' % group['id'],
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_get_non_existent_group_forbidden(self):
with self.test_client() as c:
c.get(
'/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_list_groups_in_domain_for_user_in_domain(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
r = c.get('/v3/users/%s/groups' % user['id'],
headers=self.headers)
self.assertEqual(1, len(r.json['groups']))
self.assertEqual(group['id'], r.json['groups'][0]['id'])
def test_user_cannot_list_groups_in_own_domain_user_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get('/v3/users/%s/groups' % user['id'],
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_list_groups_for_non_existent_user_forbidden(self):
with self.test_client() as c:
c.get('/v3/users/%s/groups' % uuid.uuid4().hex,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_list_groups_in_other_domain_user_in_own_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
# one group in other domain
group1 = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
# one group in own domain
group2 = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group1['id'])
PROVIDERS.identity_api.add_user_to_group(user['id'], group2['id'])
with self.test_client() as c:
r = c.get('/v3/users/%s/groups' % user['id'],
headers=self.headers)
# only one group should be visible
self.assertEqual(1, len(r.json['groups']))
self.assertEqual(group2['id'], r.json['groups'][0]['id'])
def test_user_can_list_users_in_own_domain_for_group_in_own_domain(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
r = c.get('/v3/groups/%s/users' % group['id'],
headers=self.headers)
self.assertEqual(1, len(r.json['users']))
self.assertEqual(user['id'], r.json['users'][0]['id'])
def test_user_cannot_list_users_in_other_domain_group_in_own_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
# one user in other domain
user1 = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
# one user in own domain
user2 = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user1['id'], group['id'])
PROVIDERS.identity_api.add_user_to_group(user2['id'], group['id'])
with self.test_client() as c:
r = c.get('/v3/groups/%s/users' % group['id'],
headers=self.headers)
# only one user should be visible
self.assertEqual(1, len(r.json['users']))
self.assertEqual(user2['id'], r.json['users'][0]['id'])
def test_user_cannot_list_users_in_own_domain_group_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get('/v3/groups/%s/users' % group['id'],
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_list_users_in_non_existent_group_forbidden(self):
with self.test_client() as c:
c.get('/v3/groups/%s/users' % uuid.uuid4().hex,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_can_check_user_in_own_domain_group_in_own_domain(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.head('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.NO_CONTENT)
c.get('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.NO_CONTENT)
def test_user_cannot_check_user_in_other_domain_group_in_own_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.head('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
c.get('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
class DomainReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainUserGroupTests,
_SystemAndDomainMemberAndReaderGroupTests):
def setUp(self):
super(DomainReaderTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self._override_policy()
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=domain_admin['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def _override_policy(self):
# TODO(cmurphy): Remove this once the deprecated policies in
# keystone.common.policies.group have been removed. This is only
# here to make sure we test the new policies instead of the deprecated
# ones. Oslo.policy will OR deprecated policies with new policies to
# maintain compatibility and give operators a chance to update
# permissions or update policies without breaking users. This will
# cause these specific tests to fail since we're trying to correct this
# broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups_for_user':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER,
'identity:list_users_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:check_user_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER
}
f.write(jsonutils.dumps(overridden_policies))
class DomainMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainUserGroupTests,
_SystemAndDomainMemberAndReaderGroupTests):
def setUp(self):
super(DomainMemberTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self._override_policy()
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=domain_admin['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def _override_policy(self):
# TODO(cmurphy): Remove this once the deprecated policies in
# keystone.common.policies.group have been removed. This is only
# here to make sure we test the new policies instead of the deprecated
# ones. Oslo.policy will OR deprecated policies with new policies to
# maintain compatibility and give operators a chance to update
# permissions or update policies without breaking users. This will
# cause these specific tests to fail since we're trying to correct this
# broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups_for_user':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER,
'identity:list_users_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:check_user_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER
}
f.write(jsonutils.dumps(overridden_policies))
class DomainAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainUserGroupTests):
def setUp(self):
super(DomainAdminTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self._override_policy()
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.admin_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=domain_admin['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def _override_policy(self):
# TODO(cmurphy): Remove this once the deprecated policies in
# keystone.common.policies.group have been removed. This is only
# here to make sure we test the new policies instead of the deprecated
# ones. Oslo.policy will OR deprecated policies with new policies to
# maintain compatibility and give operators a chance to update
# permissions or update policies without breaking users. This will
# cause these specific tests to fail since we're trying to correct this
# broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_groups_for_user':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER,
'identity:create_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
'identity:update_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
'identity:delete_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
'identity:list_users_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:remove_user_from_group':
gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER,
'identity:check_user_in_group':
gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER,
'identity:add_user_to_group':
gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER
}
f.write(jsonutils.dumps(overridden_policies))
def test_user_can_create_group_for_own_domain(self):
create = {
'group': {
'name': uuid.uuid4().hex,
'domain_id': self.domain_id
}
}
with self.test_client() as c:
c.post('/v3/groups', json=create, headers=self.headers)
def test_user_cannot_create_group_for_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
create = {
'group': {
'name': uuid.uuid4().hex,
'domain_id': domain['id']
}
}
with self.test_client() as c:
c.post('/v3/groups', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_can_update_group_in_own_domain(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
update = {'group': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/groups/%s' % group['id'], json=update,
headers=self.headers)
def test_user_cannot_update_group_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
update = {'group': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/groups/%s' % group['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_delete_group_in_own_domain(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/groups/%s' % group['id'],
headers=self.headers
)
def test_user_cannot_delete_group_in_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.delete(
'/v3/groups/%s' % group['id'],
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_remove_user_in_own_domain_from_group_in_own_domain(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.delete('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers)
def test_user_cannot_remove_user_other_domain_from_group_own_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.delete('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_remove_user_own_domain_from_group_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.delete('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_remove_non_existent_user_from_group_forbidden(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.delete('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': uuid.uuid4().hex},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_remove_user_from_non_existent_group_forbidden(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.delete('/v3/groups/%(group)s/users/%(user)s' % {
'group': uuid.uuid4().hex, 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_can_add_user_in_own_domain_to_group_in_own_domain(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.put('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers)
def test_user_cannot_add_user_other_domain_to_group_own_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.put('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_add_user_own_domain_to_group_other_domain(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.put('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_add_non_existent_user_to_group_forbidden(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.put('/v3/groups/%(group)s/users/%(user)s' % {
'group': group['id'], 'user': uuid.uuid4().hex},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_add_user_from_non_existent_group_forbidden(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.put('/v3/groups/%(group)s/users/%(user)s' % {
'group': uuid.uuid4().hex, 'user': user['id']},
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
class ProjectUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin):

View File

@ -4,6 +4,10 @@ features:
[`bug 1805369 <https://bugs.launchpad.net/keystone/+bug/1805369>`_]
The group API now supports the ``admin``, ``member``, and
``reader`` default roles.
- |
[`bug 1808859 <https://bugs.launchpad.net/keystone/+bug/1808859>`_]
The group API now supports using the ``domain`` scope type for performing
domain-specific actions on groups and group membership.
upgrade:
- |
[`bug 1805369 <https://bugs.launchpad.net/keystone/+bug/1805369>`_]
@ -17,20 +21,27 @@ deprecations:
The group policies have been deprecated. The ``identity:get_group``,
``identity:list_groups``, ``identity:list_users_in_group``, and
``identity:check_user_in_group`` policies now use ``role:reader and
system_scope:all`` instead of ``rule:admin_required``. The
``identity:list_groups_for_user`` policy now uses ``(role:reader and
system_scope:all) or user_id:%(user_id)s`` instead of
``rule:admin_or_owner``. The ``identity:create_group``,
``identity:update_group``, ``identity:delete_group``,
``identity:remove_user_from_group``, and
system_scope:all or (role:reader and
domain_id:%(target.group.domain_id)s)`` instead of ``rule:admin_required``.
The ``identity:list_groups_for_user`` policy now uses ``(role:reader and
system_scope:all) or (role:reader and domain_id:%(target.user.domain_id)s)
or or user_id:%(user_id)s`` instead of ``rule:admin_or_owner``. The
``identity:create_group``, ``identity:update_group``,
``identity:delete_group``, ``identity:remove_user_from_group``, and
``identity:add_user_to_group`` policies now use ``role:admin and
system_scope:all`` instead of ``rule:admin_required``. These new defaults
automatically account for system-scope and support a read-only role, making
it easier for system administrators to delegate subsets of responsibility
without compromising security. Please consider these new defaults if your
deployment overrides group policies.
system_scope:all or (role:admin and domain_id:%(target.group.domain_id)s)``
instead of ``rule:admin_required``. These new defaults automatically
account for system-scope and domain-scope and support a read-only role,
making it easier for system administrators to delegate subsets of
responsibility without compromising security. Please consider these new
defaults if your deployment overrides group policies.
security:
- |
[`bug 1805369 <https://bugs.launchpad.net/keystone/+bug/1805369>`_]
The group API now uses system-scope and default roles to
provide better accessibility to users in a secure way.
- |
[`bug 1808859 <https://bugs.launchpad.net/keystone/+bug/1808859>`_]
The group API now supports using the ``domain`` scope for the reader,
member, and admin role to provide better accessibility to users in a
secure way.