Merge "Add immutable option for roles and projects"
This commit is contained in:
commit
ad6be0cf39
|
@ -23,6 +23,7 @@ from keystone.common import cache
|
|||
from keystone.common import driver_hints
|
||||
from keystone.common import manager
|
||||
from keystone.common import provider_api
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
|
@ -1302,8 +1303,19 @@ class RoleManager(manager.Manager):
|
|||
def list_roles(self, hints=None):
|
||||
return self.driver.list_roles(hints or driver_hints.Hints())
|
||||
|
||||
def _is_immutable(self, role):
|
||||
return role['options'].get(ro_opt.IMMUTABLE_OPT.option_name, False)
|
||||
|
||||
def update_role(self, role_id, role, initiator=None):
|
||||
original_role = self.driver.get_role(role_id)
|
||||
# Prevent the update of immutable set roles unless the update is
|
||||
# exclusively used for
|
||||
ro_opt.check_immutable_update(
|
||||
original_resource_ref=original_role,
|
||||
new_resource_ref=role,
|
||||
type='role',
|
||||
resource_id=role_id)
|
||||
|
||||
if ('domain_id' in role and
|
||||
role['domain_id'] != original_role['domain_id']):
|
||||
raise exception.ValidationError(
|
||||
|
@ -1315,6 +1327,11 @@ class RoleManager(manager.Manager):
|
|||
return ret
|
||||
|
||||
def delete_role(self, role_id, initiator=None):
|
||||
role = self.driver.get_role(role_id)
|
||||
# Prevent deletion of immutable roles.
|
||||
ro_opt.check_immutable_delete(resource_ref=role,
|
||||
resource_type='role',
|
||||
resource_id=role_id)
|
||||
PROVIDERS.assignment_api.delete_role_assignments(role_id)
|
||||
PROVIDERS.assignment_api._send_app_cred_notification_for_role_removal(
|
||||
role_id
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
from keystone.common import resource_options
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
|
||||
|
||||
ROLE_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('ROLE')
|
||||
|
@ -20,7 +21,7 @@ ROLE_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('ROLE')
|
|||
# This is called on import by design.
|
||||
def register_role_options():
|
||||
for opt in [
|
||||
# PLACEHOLDER for future options
|
||||
ro_opt.IMMUTABLE_OPT,
|
||||
]:
|
||||
ROLE_OPTIONS_REGISTRY.register_option(opt)
|
||||
|
||||
|
|
|
@ -47,6 +47,13 @@ def ref_mapper_to_dict_options(ref):
|
|||
return options
|
||||
|
||||
|
||||
def get_resource_option(model, option_id):
|
||||
"""Get the resource option information from the model's mapper."""
|
||||
if option_id in model._resource_option_mapper.keys():
|
||||
return model._resource_option_mapper[option_id]
|
||||
return None
|
||||
|
||||
|
||||
def resource_options_ref_to_mapper(ref, option_class):
|
||||
"""Convert the _resource_options property-dict to options attr map.
|
||||
|
||||
|
|
|
@ -15,6 +15,17 @@
|
|||
# in their individual registry. Each entry is imported from it's own
|
||||
# module directly to allow for custom implementation details as needed.
|
||||
|
||||
from keystone.common.resource_options.options import immutable
|
||||
|
||||
__all__ = (
|
||||
'IMMUTABLE_OPT',
|
||||
'check_resource_immutable',
|
||||
'check_immutable_update',
|
||||
'check_immutable_delete',
|
||||
)
|
||||
|
||||
# Immutable Option and helper functions
|
||||
IMMUTABLE_OPT = immutable.IMMUTABLE_OPT
|
||||
check_resource_immutable = immutable.check_resource_immutable
|
||||
check_immutable_update = immutable.check_immutable_update
|
||||
check_immutable_delete = immutable.check_immutable_delete
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Implement the "Immutable" resource option
|
||||
from keystone.common.resource_options import core as ro_core
|
||||
from keystone.common.validation import parameter_types
|
||||
from keystone import exception
|
||||
|
||||
IMMUTABLE_OPT = (
|
||||
ro_core.ResourceOption(
|
||||
option_id='IMMU',
|
||||
option_name='immutable',
|
||||
validator=ro_core.boolean_validator,
|
||||
json_schema_validation=parameter_types.boolean
|
||||
))
|
||||
|
||||
|
||||
def check_resource_immutable(resource_ref):
|
||||
"""Check to see if a resource is immutable.
|
||||
|
||||
:param resource_ref: a dict reference of a resource to inspect
|
||||
"""
|
||||
return resource_ref.get('options', {}).get(
|
||||
IMMUTABLE_OPT.option_name, False)
|
||||
|
||||
|
||||
def check_immutable_update(original_resource_ref, new_resource_ref, type,
|
||||
resource_id):
|
||||
"""Check if an update is allowed to an immutable resource.
|
||||
|
||||
Valid cases where an update is allowed:
|
||||
|
||||
* Resource is not immutable
|
||||
* Resource is immutable, and update to set immutable to False or None
|
||||
|
||||
:param original_resource_ref: a dict resource reference representing
|
||||
the current resource
|
||||
:param new_resource_ref: a dict reference of the updates to perform
|
||||
:param type: the resource type, e.g. 'project'
|
||||
:param resource_id: the id of the resource (e.g. project['id']),
|
||||
usually a UUID
|
||||
:raises: ResourceUpdateForbidden
|
||||
"""
|
||||
immutable = check_resource_immutable(original_resource_ref)
|
||||
if immutable:
|
||||
new_options = new_resource_ref.get('options', {})
|
||||
if ((len(new_resource_ref.keys()) > 1) or
|
||||
(IMMUTABLE_OPT.option_name not in new_options) or
|
||||
(new_options[IMMUTABLE_OPT.option_name] not in (False, None))):
|
||||
raise exception.ResourceUpdateForbidden(
|
||||
type=type, resource_id=resource_id)
|
||||
|
||||
|
||||
def check_immutable_delete(resource_ref, resource_type, resource_id):
|
||||
"""Check if a delete is allowed on a resource.
|
||||
|
||||
:param resource_ref: dict reference of the resource
|
||||
:param resource_type: resource type (str) e.g. 'project'
|
||||
:param resource_id: id of the resource (str) e.g. project['id']
|
||||
:raises: ResourceDeleteForbidden
|
||||
"""
|
||||
if check_resource_immutable(resource_ref):
|
||||
raise exception.ResourceDeleteForbidden(
|
||||
type=resource_type, resource_id=resource_id)
|
|
@ -703,3 +703,15 @@ class CacheDeserializationError(Exception):
|
|||
'obj': obj, 'data': data
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class ResourceUpdateForbidden(ForbiddenNotSecurity):
|
||||
message_format = _('Unable to update immutable %(type)s resource: '
|
||||
'`%(resource_id)s. Set resource option "immutable" '
|
||||
'to false first.')
|
||||
|
||||
|
||||
class ResourceDeleteForbidden(ForbiddenNotSecurity):
|
||||
message_format = _('Unable to delete immutable %(type)s resource: '
|
||||
'`%(resource_id)s. Set resource option "immutable" '
|
||||
'to false first.')
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
from keystone.common import resource_options
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
|
||||
|
||||
PROJECT_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('PROJECT')
|
||||
|
@ -20,7 +21,7 @@ PROJECT_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('PROJECT')
|
|||
# This is called on import by design.
|
||||
def register_role_options():
|
||||
for opt in [
|
||||
# PLACEHOLDER for future options
|
||||
ro_opt.IMMUTABLE_OPT,
|
||||
]:
|
||||
PROJECT_OPTIONS_REGISTRY.register_option(opt)
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ from keystone.common import cache
|
|||
from keystone.common import driver_hints
|
||||
from keystone.common import manager
|
||||
from keystone.common import provider_api
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
from keystone.common import utils
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
|
@ -292,6 +293,10 @@ class Manager(manager.Manager):
|
|||
_('Cannot enable project %s since it has disabled '
|
||||
'parents') % project_id)
|
||||
|
||||
def _is_immutable(self, project_ref):
|
||||
return project_ref['options'].get(
|
||||
ro_opt.IMMUTABLE_OPT.option_name, False)
|
||||
|
||||
def _check_whole_subtree_is_disabled(self, project_id, subtree_list=None):
|
||||
if not subtree_list:
|
||||
subtree_list = self.list_projects_in_subtree(project_id)
|
||||
|
@ -306,11 +311,23 @@ class Manager(manager.Manager):
|
|||
self._require_matching_domain_id(project, original_project)
|
||||
|
||||
if original_project['is_domain']:
|
||||
# prevent updates to immutable domains
|
||||
ro_opt.check_immutable_update(
|
||||
original_resource_ref=original_project,
|
||||
new_resource_ref=project,
|
||||
type='domain',
|
||||
resource_id=project_id)
|
||||
domain = self._get_domain_from_project(original_project)
|
||||
self.assert_domain_not_federated(project_id, domain)
|
||||
url_safe_option = CONF.resource.domain_name_url_safe
|
||||
exception_entity = 'Domain'
|
||||
else:
|
||||
# prevent updates to immutable projects
|
||||
ro_opt.check_immutable_update(
|
||||
original_resource_ref=original_project,
|
||||
new_resource_ref=project,
|
||||
type='project',
|
||||
resource_id=project_id)
|
||||
url_safe_option = CONF.resource.project_name_url_safe
|
||||
exception_entity = 'Project'
|
||||
|
||||
|
@ -473,6 +490,11 @@ class Manager(manager.Manager):
|
|||
self._delete_project(project, initiator, cascade)
|
||||
|
||||
def _delete_project(self, project, initiator=None, cascade=False):
|
||||
# Prevent deletion of immutable projects
|
||||
ro_opt.check_immutable_delete(
|
||||
resource_ref=project,
|
||||
resource_type='project',
|
||||
resource_id=project['id'])
|
||||
project_id = project['id']
|
||||
if project['is_domain'] and project['enabled']:
|
||||
raise exception.ValidationError(
|
||||
|
@ -788,6 +810,11 @@ class Manager(manager.Manager):
|
|||
self._delete_domain(domain, initiator)
|
||||
|
||||
def _delete_domain(self, domain, initiator=None):
|
||||
# Disallow deletion of immutable domains
|
||||
ro_opt.check_immutable_delete(
|
||||
resource_ref=domain,
|
||||
resource_type='domain',
|
||||
resource_id=domain['id'])
|
||||
# To help avoid inadvertent deletes, we insist that the domain
|
||||
# has been previously disabled. This also prevents a user deleting
|
||||
# their own domain since, once it is disabled, they won't be able
|
||||
|
@ -912,6 +939,12 @@ class Manager(manager.Manager):
|
|||
:returns: The value of the created tag
|
||||
"""
|
||||
project = self.driver.get_project(project_id)
|
||||
if ro_opt.check_resource_immutable(resource_ref=project):
|
||||
raise exception.ResourceUpdateForbidden(
|
||||
message=_(
|
||||
'Cannot create project tags for %(project_id)s, project '
|
||||
'is immutable. Set "immutable" option to false before '
|
||||
'creating project tags.') % {'project_id': project_id})
|
||||
tag_name = tag.strip()
|
||||
project['tags'].append(tag_name)
|
||||
self.update_project(project_id, {'tags': project['tags']})
|
||||
|
@ -953,7 +986,13 @@ class Manager(manager.Manager):
|
|||
|
||||
:returns: A list of tags
|
||||
"""
|
||||
self.driver.get_project(project_id)
|
||||
project = self.driver.get_project(project_id)
|
||||
if ro_opt.check_resource_immutable(resource_ref=project):
|
||||
raise exception.ResourceUpdateForbidden(
|
||||
message=_(
|
||||
'Cannot update project tags for %(project_id)s, project '
|
||||
'is immutable. Set "immutable" option to false before '
|
||||
'creating project tags.') % {'project_id': project_id})
|
||||
tag_list = [t.strip() for t in tags]
|
||||
project = {'tags': tag_list}
|
||||
self.update_project(project_id, project)
|
||||
|
@ -969,6 +1008,12 @@ class Manager(manager.Manager):
|
|||
does not exist on the project
|
||||
"""
|
||||
project = self.driver.get_project(project_id)
|
||||
if ro_opt.check_resource_immutable(resource_ref=project):
|
||||
raise exception.ResourceUpdateForbidden(
|
||||
message=_(
|
||||
'Cannot delete project tags for %(project_id)s, project '
|
||||
'is immutable. Set "immutable" option to false before '
|
||||
'creating project tags.') % {'project_id': project_id})
|
||||
try:
|
||||
project['tags'].remove(tag)
|
||||
except ValueError:
|
||||
|
|
|
@ -16,6 +16,7 @@ import copy
|
|||
import uuid
|
||||
|
||||
from keystone.common import provider_api
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
from keystone import exception
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import default_fixtures
|
||||
|
@ -163,3 +164,138 @@ class RoleTests(object):
|
|||
self.assertRaises(exception.RoleNotFound,
|
||||
PROVIDERS.role_api.get_role,
|
||||
role_id)
|
||||
|
||||
def test_create_role_immutable(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
role_created = PROVIDERS.role_api.create_role(role_id, role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_created)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertTrue(
|
||||
role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
role_created['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_cannot_update_immutable_role(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
update_role = {'name': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.role_api.update_role,
|
||||
role_id,
|
||||
update_role)
|
||||
|
||||
def test_cannot_update_immutable_role_while_unsetting_immutable(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
update_role = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}
|
||||
}
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.role_api.update_role,
|
||||
role_id,
|
||||
update_role)
|
||||
|
||||
def test_cannot_delete_immutable_role(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
self.assertRaises(exception.ResourceDeleteForbidden,
|
||||
PROVIDERS.role_api.delete_role,
|
||||
role_id)
|
||||
|
||||
def test_update_role_set_immutable(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
update_role = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}
|
||||
}
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
role_update = PROVIDERS.role_api.update_role(role_id, update_role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_update['options'])
|
||||
self.assertTrue(
|
||||
role_update['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
self.assertTrue(
|
||||
role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_update_role_set_immutable_with_additional_updates(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
update_role = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}
|
||||
}
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
role_update = PROVIDERS.role_api.update_role(role_id, update_role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertEqual(role_update['name'], update_role['name'])
|
||||
self.assertEqual(role_via_manager['name'], update_role['name'])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_update['options'])
|
||||
self.assertTrue(
|
||||
role_update['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
self.assertTrue(
|
||||
role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_update_role_unset_immutable(self):
|
||||
role = unit.new_role_ref()
|
||||
role_id = role['id']
|
||||
role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.role_api.create_role(role_id, role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertTrue(
|
||||
role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
update_role = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: False
|
||||
}
|
||||
}
|
||||
PROVIDERS.role_api.update_role(role_id, update_role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
self.assertFalse(
|
||||
role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
update_role = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: None
|
||||
}
|
||||
}
|
||||
role_updated = PROVIDERS.role_api.update_role(role_id, update_role)
|
||||
role_via_manager = PROVIDERS.role_api.get_role(role_id)
|
||||
self.assertTrue('options' in role_updated)
|
||||
self.assertTrue('options' in role_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_updated['options'])
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options'])
|
||||
|
|
|
@ -19,6 +19,7 @@ from testtools import matchers
|
|||
|
||||
from keystone.common import driver_hints
|
||||
from keystone.common import provider_api
|
||||
from keystone.common.resource_options import options as ro_opt
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.resource.backends import sql as resource_sql
|
||||
|
@ -1682,6 +1683,336 @@ class ResourceTests(object):
|
|||
)
|
||||
self.assertEqual(project_tag_ref, [])
|
||||
|
||||
def test_create_project_immutable(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
|
||||
p_created = PROVIDERS.resource_api.create_project(
|
||||
project['id'], project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in p_created)
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertTrue(
|
||||
project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
p_created['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_cannot_update_immutable_project(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
|
||||
update_project = {'name': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.resource_api.update_project,
|
||||
project['id'],
|
||||
update_project)
|
||||
|
||||
def test_cannot_update_immutable_project_while_unsetting_immutable(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
|
||||
update_project = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}}
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.resource_api.update_project,
|
||||
project['id'],
|
||||
update_project)
|
||||
|
||||
def test_cannot_delete_immutable_project(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
self.assertRaises(exception.ResourceDeleteForbidden,
|
||||
PROVIDERS.resource_api.delete_project,
|
||||
project['id'])
|
||||
|
||||
def test_update_project_set_immutable(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
update_project = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}}
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
p_update = PROVIDERS.resource_api.update_project(
|
||||
project['id'], update_project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in p_update['options'])
|
||||
self.assertTrue(
|
||||
p_update['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
self.assertTrue(
|
||||
project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_update_project_set_immutable_with_additional_updates(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
update_project = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}}
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
p_update = PROVIDERS.resource_api.update_project(
|
||||
project['id'], update_project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertEqual(p_update['name'], update_project['name'])
|
||||
self.assertEqual(project_via_manager['name'], update_project['name'])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in p_update['options'])
|
||||
self.assertTrue(
|
||||
p_update['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
self.assertTrue(
|
||||
project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_update_project_unset_immutable(self):
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertTrue(
|
||||
project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
update_project = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: False
|
||||
}}
|
||||
PROVIDERS.resource_api.update_project(project['id'], update_project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
self.assertFalse(
|
||||
project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
update_project = {'name': uuid.uuid4().hex}
|
||||
p_updated = PROVIDERS.resource_api.update_project(
|
||||
project['id'], update_project)
|
||||
self.assertEqual(p_updated['name'], update_project['name'])
|
||||
|
||||
update_project = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: None
|
||||
}}
|
||||
p_updated = PROVIDERS.resource_api.update_project(
|
||||
project['id'], update_project)
|
||||
project_via_manager = PROVIDERS.resource_api.get_project(project['id'])
|
||||
self.assertTrue('options' in p_updated)
|
||||
self.assertTrue('options' in project_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in p_updated['options'])
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options'])
|
||||
|
||||
def test_cannot_delete_project_tags_immutable_project(self):
|
||||
project, tags = self._create_project_and_tags(num_of_tags=2)
|
||||
update_project = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}
|
||||
}
|
||||
PROVIDERS.resource_api.update_project(project['id'], update_project)
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.resource_api.delete_project_tag,
|
||||
project['id'],
|
||||
tags[0])
|
||||
|
||||
def test_cannot_update_project_tags_immutable_project(self):
|
||||
# Update and Add tag use the same API
|
||||
project, tags = self._create_project_and_tags(num_of_tags=2)
|
||||
update_project = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}
|
||||
}
|
||||
PROVIDERS.resource_api.update_project(project['id'], update_project)
|
||||
tags.append(uuid.uuid4().hex)
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.resource_api.update_project_tags,
|
||||
project['id'],
|
||||
tags)
|
||||
|
||||
@unit.skip_if_no_multiple_domains_support
|
||||
def test_create_domain_immutable(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
'options': {'immutable': True}
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue('options' in domain_via_manager)
|
||||
self.assertTrue(domain_via_manager['options']['immutable'])
|
||||
|
||||
@unit.skip_if_no_multiple_domains_support
|
||||
def test_cannot_update_immutable_domain(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
'options': {'immutable': True}
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
update_domain = {'name': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.ResourceUpdateForbidden,
|
||||
PROVIDERS.resource_api.update_domain,
|
||||
domain_id,
|
||||
update_domain)
|
||||
|
||||
@unit.skip_if_no_multiple_domains_support
|
||||
def test_cannot_delete_immutable_domain(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
'options': {'immutable': True}
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
self.assertRaises(exception.ResourceDeleteForbidden,
|
||||
PROVIDERS.resource_api.delete_domain,
|
||||
domain_id,)
|
||||
|
||||
@unit.skip_if_no_multiple_domains_support
|
||||
def test_cannot_delete_disabled_domain_with_immutable_project(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
project = unit.new_project_ref(domain_id)
|
||||
project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True
|
||||
PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
# Disable the domain
|
||||
PROVIDERS.resource_api.update_domain(domain_id, {'enabled': False})
|
||||
# attempt to delete the domain, should error when the immutable
|
||||
# project is reached
|
||||
self.assertRaises(exception.ResourceDeleteForbidden,
|
||||
PROVIDERS.resource_api.delete_domain,
|
||||
domain_id)
|
||||
|
||||
@unit.skip_if_no_multiple_domains_support
|
||||
def test_update_domain_set_immutable(self):
|
||||
# domains are projects, this should be the same as the project version
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue('options' in domain_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options'])
|
||||
|
||||
domain_update = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: True
|
||||
}}
|
||||
d_update = PROVIDERS.resource_api.update_domain(
|
||||
domain_id, domain_update)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in d_update['options'])
|
||||
self.assertTrue(
|
||||
d_update['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options'])
|
||||
self.assertTrue(
|
||||
domain_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
def test_update_domain_unset_immutable(self):
|
||||
# domains are projects, this should be the same as the project version
|
||||
domain_id = uuid.uuid4().hex
|
||||
|
||||
domain = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'id': domain_id,
|
||||
'is_domain': True,
|
||||
}
|
||||
|
||||
PROVIDERS.resource_api.create_domain(domain_id, domain)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue('options' in domain_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options'])
|
||||
|
||||
update_domain = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: False
|
||||
}}
|
||||
d_updated = PROVIDERS.resource_api.update_domain(
|
||||
domain_id, update_domain)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue('options' in domain_via_manager)
|
||||
self.assertTrue('options' in d_updated)
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options'])
|
||||
self.assertTrue(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in d_updated['options'])
|
||||
self.assertFalse(
|
||||
d_updated['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
self.assertFalse(
|
||||
domain_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name])
|
||||
|
||||
update_domain = {'name': uuid.uuid4().hex}
|
||||
d_updated = PROVIDERS.resource_api.update_domain(
|
||||
domain_id, update_domain)
|
||||
self.assertEqual(d_updated['name'], update_domain['name'])
|
||||
|
||||
update_domain = {
|
||||
'options': {
|
||||
ro_opt.IMMUTABLE_OPT.option_name: None
|
||||
}}
|
||||
d_updated = PROVIDERS.resource_api.update_domain(
|
||||
domain_id, update_domain)
|
||||
domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id)
|
||||
self.assertTrue('options' in d_updated)
|
||||
self.assertTrue('options' in domain_via_manager)
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in d_updated['options'])
|
||||
self.assertFalse(
|
||||
ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options'])
|
||||
|
||||
|
||||
class ResourceDriverTests(object):
|
||||
"""Test for the resource driver.
|
||||
|
|
|
@ -116,6 +116,24 @@ def _assert_backends(testcase, **kwargs):
|
|||
|
||||
class IdentityTests(identity_tests.IdentityTests):
|
||||
|
||||
def test_update_domain_set_immutable(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_cannot_delete_disabled_domain_with_immutable(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_delete_immutable_domain(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_create_domain_immutable(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_update_domain_unset_immutable(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_cannot_update_immutable_domain(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
def test_delete_user_with_group_project_domain_links(self):
|
||||
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
|
||||
|
||||
|
|
Loading…
Reference in New Issue