Create, update and delete hierarchical projects

Changes at basic operations related to projects
were modified to handle projects hierarchy.

This changes includes:
- Create and update projects with parent_project_id
  field
- Avoid deleting projects that are not leaf in the
  hierarchy
- Deny the update of a project's parent
- Limit the max depth of a tree branch (in the
  hierarchy)

Co-Authored-By: Henrique Truta <henrique@lsd.ufcg.edu.br>
Co-Authored-By: Rodrigo Duarte <rodrigods@lsd.ufcg.edu.br>
Co-Authored-By: Raildo Mascena <raildo@lsd.ufcg.edu.br>
Co-Authored-By: Samuel de Medeiros Queiroz <samuel@lsd.ufcg.edu.br>
Co-Authored-By: Thiago Paiva Brito <thiagop@lsd.ufcg.edu.br>

Change-Id: Ia8ae4c1d8d1e7e591a272928af0dfd17da4047c1
Implements: blueprint hierarchical-multitenancy
This commit is contained in:
Rodrigo Duarte Sousa 2014-08-29 17:18:59 -03:00
parent d8ed7c160f
commit 11cb0d3033
9 changed files with 620 additions and 105 deletions

View File

@ -59,6 +59,11 @@
# server. (string value)
#admin_endpoint=<None>
# Maximum depth of the project hierarchy. (integer value)
# WARNING: setting it to a large value may adversely impact
# performance.
#max_project_tree_depth=5
# The number of worker processes to serve the public WSGI
# application. Defaults to number of CPUs (minimum of 2).
# (integer value)

View File

@ -19,7 +19,6 @@ import copy
import functools
import uuid
import six
from six.moves import urllib
from keystone.assignment import schema
@ -416,9 +415,28 @@ class ProjectV3(controller.V3Controller):
hints=hints)
return ProjectV3.wrap_collection(context, refs, hints=hints)
def _expand_project_ref(self, context, ref):
user_id = self.get_auth_context(context).get('user_id')
if ('parents_as_list' in context['query_string'] and
self.query_filter_is_true(
context['query_string']['parents_as_list'])):
parents = self.assignment_api.list_project_parents(
ref['id'], user_id)
ref['parents'] = [ProjectV3.wrap_member(context, p)
for p in parents]
if ('subtree_as_list' in context['query_string'] and
self.query_filter_is_true(
context['query_string']['subtree_as_list'])):
subtree = self.assignment_api.list_projects_in_subtree(
ref['id'], user_id)
ref['subtree'] = [ProjectV3.wrap_member(context, p)
for p in subtree]
@controller.protected()
def get_project(self, context, project_id):
ref = self.assignment_api.get_project(project_id)
self._expand_project_ref(context, ref)
return ProjectV3.wrap_member(context, ref)
@controller.protected()
@ -843,26 +861,6 @@ class RoleAssignmentV3(controller.V3Controller):
return new_refs
def _query_filter_is_true(self, filter_value):
"""Determine if bool query param is 'True'.
We treat this the same way as we do for policy
enforcement:
{bool_param}=0 is treated as False
Any other value is considered to be equivalent to
True, including the absence of a value
"""
if (isinstance(filter_value, six.string_types) and
filter_value == '0'):
val = False
else:
val = True
return val
def _filter_inherited(self, entry):
if ('inherited_to_projects' in entry and
not CONF.os_inherit.enabled):
@ -888,7 +886,7 @@ class RoleAssignmentV3(controller.V3Controller):
if self._filter_inherited(x)])
if ('effective' in context['query_string'] and
self._query_filter_is_true(
self.query_filter_is_true(
context['query_string']['effective'])):
formatted_refs = self._expand_indirect_assignments(context,

View File

@ -26,7 +26,7 @@ from keystone.common import manager
from keystone import config
from keystone import exception
from keystone.i18n import _
from keystone.i18n import _LI
from keystone.i18n import _LE, _LI
from keystone import notifications
from keystone.openstack.common import log
@ -79,12 +79,43 @@ class Manager(manager.Manager):
return [x['id'] for
x in self.identity_api.list_groups_for_user(user_id)]
def _get_hierarchy_depth(self, parents_list):
return len(parents_list) + 1
def _assert_max_hierarchy_depth(self, project_id, parents_list=None):
if parents_list is None:
parents_list = self.list_project_parents(project_id)
max_depth = CONF.max_project_tree_depth
if self._get_hierarchy_depth(parents_list) > max_depth:
raise exception.ForbiddenAction(
action=_('max hierarchy depth reached for '
'%s branch.') % project_id)
@notifications.created(_PROJECT)
def create_project(self, tenant_id, tenant):
tenant = tenant.copy()
tenant.setdefault('enabled', True)
tenant['enabled'] = clean.project_enabled(tenant['enabled'])
tenant.setdefault('description', '')
tenant.setdefault('parent_id', None)
if tenant.get('parent_id') is not None:
parent_ref = self.get_project(tenant.get('parent_id'))
parents_list = self.list_project_parents(parent_ref['id'])
parents_list.append(parent_ref)
for ref in parents_list:
if ref.get('domain_id') != tenant.get('domain_id'):
raise exception.ForbiddenAction(
action=_('cannot create a project within a different '
'domain than its parents.'))
if not ref.get('enabled', True):
raise exception.ForbiddenAction(
action=_('cannot create a project in a '
'branch containing a disabled '
'project: %s') % ref['id'])
self._assert_max_hierarchy_depth(tenant.get('parent_id'),
parents_list)
ret = self.driver.create_project(tenant_id, tenant)
if SHOULD_CACHE(ret):
self.get_project.set(ret, self, tenant_id)
@ -126,15 +157,48 @@ class Manager(manager.Manager):
"""
pass
def _assert_all_parents_are_enabled(self, project_id):
parents_list = self.list_project_parents(project_id)
for project in parents_list:
if not project.get('enabled', True):
raise exception.ForbiddenAction(
action=_('cannot enable project %s since it has '
'disabled parents') % project_id)
def _assert_whole_subtree_is_disabled(self, project_id):
subtree_list = self.driver.list_projects_in_subtree(project_id)
for ref in subtree_list:
if ref.get('enabled', True):
raise exception.ForbiddenAction(
action=_('cannot disable project %s since '
'its subtree contains enabled '
'projects') % project_id)
@notifications.updated(_PROJECT)
def update_project(self, tenant_id, tenant):
original_tenant = self.driver.get_project(tenant_id)
tenant = tenant.copy()
parent_id = original_tenant.get('parent_id')
if 'parent_id' in tenant and tenant.get('parent_id') != parent_id:
raise exception.ForbiddenAction(
action=_('Update of `parent_id` is not allowed.'))
if 'enabled' in tenant:
tenant['enabled'] = clean.project_enabled(tenant['enabled'])
if (original_tenant.get('enabled', True) and
not tenant.get('enabled', True)):
# NOTE(rodrigods): for the current implementation we only allow to
# disable a project if all projects below it in the hierarchy are
# already disabled. This also means that we can not enable a
# project that has disabled parents.
original_tenant_enabled = original_tenant.get('enabled', True)
tenant_enabled = tenant.get('enabled', True)
if not original_tenant_enabled and tenant_enabled:
self._assert_all_parents_are_enabled(tenant_id)
if original_tenant_enabled and not tenant_enabled:
self._assert_whole_subtree_is_disabled(tenant_id)
self._disable_project(tenant_id)
ret = self.driver.update_project(tenant_id, tenant)
self.get_project.invalidate(self, tenant_id)
self.get_project_by_name.invalidate(self, original_tenant['name'],
@ -143,6 +207,11 @@ class Manager(manager.Manager):
@notifications.deleted(_PROJECT)
def delete_project(self, tenant_id):
if not self.driver.is_leaf_project(tenant_id):
raise exception.ForbiddenAction(
action=_('cannot delete the project %s since it is not '
'a leaf in the hierarchy.') % tenant_id)
project = self.driver.get_project(tenant_id)
project_user_ids = self.list_user_ids_for_project(tenant_id)
for user_id in project_user_ids:
@ -308,6 +377,29 @@ class Manager(manager.Manager):
return self.driver.list_projects_for_user(
user_id, group_ids, hints or driver_hints.Hints())
def _filter_projects_list(self, projects_list, user_id):
user_projects = self.list_projects_for_user(user_id)
user_projects_ids = set([proj['id'] for proj in user_projects])
# Keep only the projects present in user_projects
projects_list = [proj for proj in projects_list
if proj['id'] in user_projects_ids]
def list_project_parents(self, project_id, user_id=None):
parents = self.driver.list_project_parents(project_id)
# If a user_id was provided, the returned list should be filtered
# against the projects this user has access to.
if user_id:
self._filter_projects_list(parents, user_id)
return parents
def list_projects_in_subtree(self, project_id, user_id=None):
subtree = self.driver.list_projects_in_subtree(project_id)
# If a user_id was provided, the returned list should be filtered
# against the projects this user has access to.
if user_id:
self._filter_projects_list(subtree, user_id)
return subtree
@cache.on_arguments(should_cache_fn=SHOULD_CACHE,
expiration_time=EXPIRATION_TIME)
def get_domain(self, domain_id):
@ -420,21 +512,38 @@ class Manager(manager.Manager):
Users: Reference domains for grants
"""
def _delete_projects(project, projects, examined):
if project['id'] in examined:
msg = _LE('Circular reference or a repeated entry found '
'projects hierarchy - %(project_id)s.')
LOG.error(msg, {'project_id': project['id']})
return
examined.add(project['id'])
children = [proj for proj in projects
if proj.get('parent_id') == project['id']]
for proj in children:
_delete_projects(proj, projects, examined)
try:
self.delete_project(project['id'])
except exception.ProjectNotFound:
LOG.debug(('Project %(projectid)s not found when '
'deleting domain contents for %(domainid)s, '
'continuing with cleanup.'),
{'projectid': project['id'],
'domainid': domain_id})
user_refs = self.identity_api.list_users(domain_scope=domain_id)
proj_refs = self.list_projects()
proj_refs = self.list_projects_in_domain(domain_id)
group_refs = self.identity_api.list_groups(domain_scope=domain_id)
# First delete the projects themselves
for project in proj_refs:
if project['domain_id'] == domain_id:
try:
self.delete_project(project['id'])
except exception.ProjectNotFound:
LOG.debug(('Project %(projectid)s not found when '
'deleting domain contents for %(domainid)s, '
'continuing with cleanup.'),
{'projectid': project['id'],
'domainid': domain_id})
# Deleting projects recursively
roots = [x for x in proj_refs if x.get('parent_id') is None]
examined = set()
for project in roots:
_delete_projects(project, proj_refs, examined)
for group in group_refs:
# Cleanup any existing groups.

View File

@ -21,7 +21,6 @@ from oslo.utils import timeutils
import six
from keystone.assignment import controllers as assignment_controllers
from keystone.common import authorization
from keystone.common import controller
from keystone.common import dependency
from keystone.common import wsgi
@ -547,12 +546,6 @@ class Auth(controller.V3Controller):
return {'signed': signed_text}
def get_auth_context(self, context):
# TODO(dolphm): this method of accessing the auth context is terrible,
# but context needs to be refactored to always have reasonable values.
env_context = context.get('environment', {})
return env_context.get(authorization.AUTH_CONTEXT_ENV, {})
def _combine_lists_uniquely(self, a, b):
# it's most likely that only one of these will be filled so avoid
# the combination if possible.

View File

@ -77,6 +77,9 @@ FILE_OPTIONS = {
'to set this value if the base URL contains a path '
'(e.g. /prefix/v2.0) or the endpoint should be found '
'on a different server.'),
cfg.IntOpt('max_project_tree_depth', default=5,
help='The maximum depth of projects hierarchy is limited '
'for performance reasons'),
cfg.IntOpt('public_workers',
help='The number of worker processes to serve the public '
'WSGI application. Defaults to number of CPUs '

View File

@ -15,6 +15,8 @@
import functools
import uuid
import six
from keystone.common import authorization
from keystone.common import dependency
from keystone.common import driver_hints
@ -316,6 +318,12 @@ class V3Controller(wsgi.Application):
return '%s/%s/%s' % (endpoint, 'v3', path.lstrip('/'))
def get_auth_context(cls, context):
# TODO(dolphm): this method of accessing the auth context is terrible,
# but context needs to be refactored to always have reasonable values.
env_context = context.get('environment', {})
return env_context.get(authorization.AUTH_CONTEXT_ENV, {})
@classmethod
def full_url(cls, context, path=None):
url = cls.base_url(context, path)
@ -324,6 +332,27 @@ class V3Controller(wsgi.Application):
return url
@classmethod
def query_filter_is_true(cls, filter_value):
"""Determine if bool query param is 'True'.
We treat this the same way as we do for policy
enforcement:
{bool_param}=0 is treated as False
Any other value is considered to be equivalent to
True, including the absence of a value
"""
if (isinstance(filter_value, six.string_types) and
filter_value == '0'):
val = False
else:
val = True
return val
@classmethod
def _add_self_referential_link(cls, context, ref):
ref.setdefault('links', {})

View File

@ -2031,18 +2031,44 @@ class IdentityTests(object):
self.assertIn(project1['id'], project_ids)
self.assertIn(project2['id'], project_ids)
def test_check_leaf_projects(self):
root_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'parent_id': None}
self.assignment_api.create_project(root_project['id'], root_project)
def _create_projects_hierarchy(self, hierarchy_size=2,
domain_id=DEFAULT_DOMAIN_ID):
"""Creates a project hierarchy with specified size.
leaf_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'parent_id': root_project['id']}
self.assignment_api.create_project(leaf_project['id'], leaf_project)
:param hierarchy_size: the desired hierarchy size, default is 2 -
a project with one child.
:param domain_id: domain where the projects hierarchy will be created.
:returns projects: a list of the projects in the created hierarchy.
"""
project_id = uuid.uuid4().hex
project = {'id': project_id,
'description': '',
'domain_id': domain_id,
'enabled': True,
'name': uuid.uuid4().hex,
'parent_id': None}
self.assignment_api.create_project(project_id, project)
projects = [project]
for i in range(1, hierarchy_size):
new_project = {'id': uuid.uuid4().hex,
'description': '',
'domain_id': domain_id,
'enabled': True,
'name': uuid.uuid4().hex,
'parent_id': project_id}
self.assignment_api.create_project(new_project['id'], new_project)
projects.append(new_project)
project_id = new_project['id']
return projects
def test_check_leaf_projects(self):
projects_hierarchy = self._create_projects_hierarchy()
root_project = projects_hierarchy[0]
leaf_project = projects_hierarchy[1]
self.assertFalse(self.assignment_api.is_leaf_project(
root_project['id']))
@ -2057,35 +2083,15 @@ class IdentityTests(object):
root_project['id']))
def test_list_projects_in_subtree(self):
project1 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': None}
self.assignment_api.create_project(project1['id'], project1)
project2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': project1['id']}
self.assignment_api.create_project(project2['id'], project2)
project3 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': project2['id']}
self.assignment_api.create_project(project3['id'], project3)
projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
project1 = projects_hierarchy[0]
project2 = projects_hierarchy[1]
project3 = projects_hierarchy[2]
project4 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'name': uuid.uuid4().hex,
'parent_id': project2['id']}
self.assignment_api.create_project(project4['id'], project4)
@ -2104,35 +2110,15 @@ class IdentityTests(object):
self.assertEqual(0, len(subtree))
def test_list_project_parents(self):
project1 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': None}
self.assignment_api.create_project(project1['id'], project1)
project2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': project1['id']}
self.assignment_api.create_project(project2['id'], project2)
project3 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': project2['id']}
self.assignment_api.create_project(project3['id'], project3)
projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
project1 = projects_hierarchy[0]
project2 = projects_hierarchy[1]
project3 = projects_hierarchy[2]
project4 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'name': uuid.uuid4().hex,
'parent_id': project2['id']}
self.assignment_api.create_project(project4['id'], project4)
@ -2618,6 +2604,228 @@ class IdentityTests(object):
self.assignment_api.get_project,
project['id'])
def test_domain_delete_hierarchy(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
self.assignment_api.create_domain(domain['id'], domain)
# Creating a root and a leaf project inside the domain
projects_hierarchy = self._create_projects_hierarchy(
domain_id=domain['id'])
root_project = projects_hierarchy[0]
leaf_project = projects_hierarchy[0]
# Disable the domain
domain['enabled'] = False
self.assignment_api.update_domain(domain['id'], domain)
# Delete the domain
self.assignment_api.delete_domain(domain['id'])
# Make sure the domain no longer exists
self.assertRaises(exception.DomainNotFound,
self.assignment_api.get_domain,
domain['id'])
# Make sure the root project no longer exists
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
root_project['id'])
# Make sure the leaf project no longer exists
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
leaf_project['id'])
def test_hierarchical_projects_crud(self):
# create a hierarchy with just a root project (which is a leaf as well)
projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1)
root_project1 = projects_hierarchy[0]
# create a hierarchy with one root project and one leaf project
projects_hierarchy = self._create_projects_hierarchy()
root_project2 = projects_hierarchy[0]
leaf_project = projects_hierarchy[1]
# update description from leaf_project
leaf_project['description'] = 'new description'
self.assignment_api.update_project(leaf_project['id'], leaf_project)
proj_ref = self.assignment_api.get_project(leaf_project['id'])
self.assertDictEqual(proj_ref, leaf_project)
# update the parent_id is not allowed
leaf_project['parent_id'] = root_project1['id']
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.update_project,
leaf_project['id'],
leaf_project)
# delete root_project1
self.assignment_api.delete_project(root_project1['id'])
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
root_project1['id'])
# delete root_project2 is not allowed since it is not a leaf project
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.delete_project,
root_project2['id'])
def test_create_project_with_invalid_parent(self):
project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': 'fake'}
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.create_project,
project['id'],
project)
def test_create_leaf_project_with_invalid_domain(self):
root_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': True,
'parent_id': None}
self.assignment_api.create_project(root_project['id'], root_project)
leaf_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': '',
'domain_id': 'fake',
'enabled': True,
'parent_id': root_project['id']}
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.create_project,
leaf_project['id'],
leaf_project)
def test_delete_hierarchical_leaf_project(self):
projects_hierarchy = self._create_projects_hierarchy()
root_project = projects_hierarchy[0]
leaf_project = projects_hierarchy[1]
self.assignment_api.delete_project(leaf_project['id'])
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
leaf_project['id'])
self.assignment_api.delete_project(root_project['id'])
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
root_project['id'])
def test_delete_hierarchical_not_leaf_project(self):
projects_hierarchy = self._create_projects_hierarchy()
root_project = projects_hierarchy[0]
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.delete_project,
root_project['id'])
def test_update_project_parent(self):
projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
project1 = projects_hierarchy[0]
project2 = projects_hierarchy[1]
project3 = projects_hierarchy[2]
# project2 is the parent from project3
self.assertEqual(project3.get('parent_id'), project2['id'])
# try to update project3 parent to parent1
project3['parent_id'] = project1['id']
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.update_project,
project3['id'],
project3)
def test_create_project_under_disabled_one(self):
project1 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'enabled': False,
'parent_id': None}
self.assignment_api.create_project(project1['id'], project1)
project2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'parent_id': project1['id']}
# It's not possible to create a project under a disabled one in the
# hierarchy
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.create_project,
project2['id'],
project2)
def test_disable_hierarchical_leaf_project(self):
projects_hierarchy = self._create_projects_hierarchy()
leaf_project = projects_hierarchy[1]
leaf_project['enabled'] = False
self.assignment_api.update_project(leaf_project['id'], leaf_project)
project_ref = self.assignment_api.get_project(leaf_project['id'])
self.assertEqual(project_ref['enabled'], leaf_project['enabled'])
def test_disable_hierarchical_not_leaf_project(self):
projects_hierarchy = self._create_projects_hierarchy()
root_project = projects_hierarchy[0]
root_project['enabled'] = False
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.update_project,
root_project['id'],
root_project)
def test_enable_project_with_disabled_parent(self):
projects_hierarchy = self._create_projects_hierarchy()
root_project = projects_hierarchy[0]
leaf_project = projects_hierarchy[1]
# Disable leaf and root
leaf_project['enabled'] = False
self.assignment_api.update_project(leaf_project['id'], leaf_project)
root_project['enabled'] = False
self.assignment_api.update_project(root_project['id'], root_project)
# Try to enable the leaf project, it's not possible since it has
# a disabled parent
leaf_project['enabled'] = True
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.update_project,
leaf_project['id'],
leaf_project)
def _get_hierarchy_depth(self, project_id):
return len(self.assignment_api.list_project_parents(project_id)) + 1
def test_check_hierarchy_depth(self):
# First create a hierarchy with the max allowed depth
projects_hierarchy = self._create_projects_hierarchy(
CONF.max_project_tree_depth)
leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1]
depth = self._get_hierarchy_depth(leaf_project['id'])
self.assertEqual(CONF.max_project_tree_depth, depth)
# Creating another project in the hierarchy shouldn't be allowed
project_id = uuid.uuid4().hex
project = {
'id': project_id,
'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'parent_id': leaf_project['id']}
self.assertRaises(exception.ForbiddenAction,
self.assignment_api.create_project,
project_id,
project)
def test_project_update_missing_attrs_with_a_value(self):
# Creating a project with no description attribute.
project = {'id': uuid.uuid4().hex,

View File

@ -421,6 +421,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_projects_for_groups(self):
self.skipTest('Blocked by bug: 1390125')
def test_domain_delete_hierarchy(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_list_role_assignments_unfiltered(self):
new_domain = self._get_domain_fixture()
new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
@ -1529,6 +1532,39 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
def test_list_project_parents(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_hierarchical_projects_crud(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_create_project_under_disabled_one(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_create_project_with_invalid_parent(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_create_leaf_project_with_invalid_domain(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_update_project_parent(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_enable_project_with_disabled_parent(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_disable_hierarchical_leaf_project(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_disable_hierarchical_not_leaf_project(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_delete_hierarchical_leaf_project(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_delete_hierarchical_not_leaf_project(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_check_hierarchy_depth(self):
self.skipTest('N/A: LDAP does not support hierarchical projects')
def test_multi_role_grant_by_user_group_on_project_domain(self):
# This is a partial implementation of the standard test that
# is defined in test_backend.py. It omits both domain and

View File

@ -426,6 +426,41 @@ class AssignmentTestCase(test_v3.RestfulTestCase):
headers={'x-subject-token': subject_token},
expected_status=404)
def test_delete_domain_hierarchy(self):
"""Call ``DELETE /domains/{domain_id}``."""
domain = self.new_domain_ref()
self.assignment_api.create_domain(domain['id'], domain)
root_project = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(root_project['id'], root_project)
leaf_project = self.new_project_ref(
domain_id=domain['id'],
parent_id=root_project['id'])
self.assignment_api.create_project(leaf_project['id'], leaf_project)
# Need to disable it first.
self.patch('/domains/%(domain_id)s' % {
'domain_id': domain['id']},
body={'domain': {'enabled': False}})
self.delete(
'/domains/%(domain_id)s' % {
'domain_id': domain['id']})
self.assertRaises(exception.DomainNotFound,
self.assignment_api.get_domain,
domain['id'])
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
root_project['id'])
self.assertRaises(exception.ProjectNotFound,
self.assignment_api.get_project,
leaf_project['id'])
# Project CRUD tests
def test_list_projects(self):
@ -454,6 +489,37 @@ class AssignmentTestCase(test_v3.RestfulTestCase):
"""Call ``POST /projects``."""
self.post('/projects', body={'project': {}}, expected_status=400)
def _create_projects_hierarchy(self, hierarchy_size=1):
"""Creates a project hierarchy with specified size.
:param hierarchy_size: the desired hierarchy size, default is 1 -
a project with one child.
:returns projects: a list of the projects in the created hierarchy.
"""
resp = self.get(
'/projects/%(project_id)s' % {
'project_id': self.project_id})
projects = [resp.result]
for i in range(hierarchy_size):
new_ref = self.new_project_ref(
domain_id=self.domain_id,
parent_id=projects[i]['project']['id'])
resp = self.post('/projects',
body={'project': new_ref})
self.assertValidProjectResponse(resp, new_ref)
projects.append(resp.result)
return projects
def test_create_hierarchical_project(self):
"""Call ``POST /projects``."""
self._create_projects_hierarchy()
def test_get_project(self):
"""Call ``GET /projects/{project_id}``."""
r = self.get(
@ -461,6 +527,32 @@ class AssignmentTestCase(test_v3.RestfulTestCase):
'project_id': self.project_id})
self.assertValidProjectResponse(r, self.project)
def test_get_project_with_parents_list(self):
"""Call ``GET /projects/{project_id}?parents_as_list``."""
projects = self._create_projects_hierarchy(hierarchy_size=2)
r = self.get(
'/projects/%(project_id)s?parents_as_list' % {
'project_id': projects[1]['project']['id']})
self.assertEqual(1, len(r.result['project']['parents']))
self.assertValidProjectResponse(r, projects[1]['project'])
self.assertIn(projects[0], r.result['project']['parents'])
self.assertNotIn(projects[2], r.result['project']['parents'])
def test_get_project_with_subtree_list(self):
"""Call ``GET /projects/{project_id}?subtree_as_list``."""
projects = self._create_projects_hierarchy(hierarchy_size=2)
r = self.get(
'/projects/%(project_id)s?subtree_as_list' % {
'project_id': projects[1]['project']['id']})
self.assertEqual(1, len(r.result['project']['subtree']))
self.assertValidProjectResponse(r, projects[1]['project'])
self.assertNotIn(projects[0], r.result['project']['subtree'])
self.assertIn(projects[2], r.result['project']['subtree'])
def test_update_project(self):
"""Call ``PATCH /projects/{project_id}``."""
ref = self.new_project_ref(domain_id=self.domain_id)
@ -487,6 +579,40 @@ class AssignmentTestCase(test_v3.RestfulTestCase):
body={'project': project})
self.assertValidProjectResponse(r, project)
def test_update_project_parent_id(self):
"""Call ``PATCH /projects/{project_id}``."""
projects = self._create_projects_hierarchy()
leaf_project = projects[1]['project']
leaf_project['parent_id'] = None
self.patch(
'/projects/%(project_id)s' % {
'project_id': leaf_project['id']},
body={'project': leaf_project},
expected_status=403)
def test_disable_leaf_project(self):
"""Call ``PATCH /projects/{project_id}``."""
projects = self._create_projects_hierarchy()
leaf_project = projects[1]['project']
leaf_project['enabled'] = False
r = self.patch(
'/projects/%(project_id)s' % {
'project_id': leaf_project['id']},
body={'project': leaf_project})
self.assertEqual(
leaf_project['enabled'], r.result['project']['enabled'])
def test_disable_not_leaf_project(self):
"""Call ``PATCH /projects/{project_id}``."""
projects = self._create_projects_hierarchy()
root_project = projects[0]['project']
root_project['enabled'] = False
self.patch(
'/projects/%(project_id)s' % {
'project_id': root_project['id']},
body={'project': root_project},
expected_status=403)
def test_delete_project(self):
"""Call ``DELETE /projects/{project_id}``
@ -523,6 +649,14 @@ class AssignmentTestCase(test_v3.RestfulTestCase):
r = self.credential_api.get_credential(self.credential2['id'])
self.assertDictEqual(r, self.credential2)
def test_delete_not_leaf_project(self):
"""Call ``DELETE /projects/{project_id}``."""
self._create_projects_hierarchy()
self.delete(
'/projects/%(project_id)s' % {
'project_id': self.project_id},
expected_status=403)
# Role CRUD tests
def test_create_role(self):