diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 58c4cf4a38..caab808115 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -23,6 +23,7 @@ from keystone.common import cache from keystone.common import driver_hints from keystone.common import manager from keystone.common import provider_api +from keystone.common.resource_options import options as ro_opt import keystone.conf from keystone import exception from keystone.i18n import _ @@ -1302,8 +1303,19 @@ class RoleManager(manager.Manager): def list_roles(self, hints=None): return self.driver.list_roles(hints or driver_hints.Hints()) + def _is_immutable(self, role): + return role['options'].get(ro_opt.IMMUTABLE_OPT.option_name, False) + def update_role(self, role_id, role, initiator=None): original_role = self.driver.get_role(role_id) + # Prevent the update of immutable set roles unless the update is + # exclusively used for + ro_opt.check_immutable_update( + original_resource_ref=original_role, + new_resource_ref=role, + type='role', + resource_id=role_id) + if ('domain_id' in role and role['domain_id'] != original_role['domain_id']): raise exception.ValidationError( @@ -1315,6 +1327,11 @@ class RoleManager(manager.Manager): return ret def delete_role(self, role_id, initiator=None): + role = self.driver.get_role(role_id) + # Prevent deletion of immutable roles. + ro_opt.check_immutable_delete(resource_ref=role, + resource_type='role', + resource_id=role_id) PROVIDERS.assignment_api.delete_role_assignments(role_id) PROVIDERS.assignment_api._send_app_cred_notification_for_role_removal( role_id diff --git a/keystone/assignment/role_backends/resource_options.py b/keystone/assignment/role_backends/resource_options.py index 3bd74453be..52ca4ba18c 100644 --- a/keystone/assignment/role_backends/resource_options.py +++ b/keystone/assignment/role_backends/resource_options.py @@ -11,6 +11,7 @@ # under the License. from keystone.common import resource_options +from keystone.common.resource_options import options as ro_opt ROLE_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('ROLE') @@ -20,7 +21,7 @@ ROLE_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('ROLE') # This is called on import by design. def register_role_options(): for opt in [ - # PLACEHOLDER for future options + ro_opt.IMMUTABLE_OPT, ]: ROLE_OPTIONS_REGISTRY.register_option(opt) diff --git a/keystone/common/resource_options/core.py b/keystone/common/resource_options/core.py index 56dd205080..2113a2342b 100644 --- a/keystone/common/resource_options/core.py +++ b/keystone/common/resource_options/core.py @@ -47,6 +47,13 @@ def ref_mapper_to_dict_options(ref): return options +def get_resource_option(model, option_id): + """Get the resource option information from the model's mapper.""" + if option_id in model._resource_option_mapper.keys(): + return model._resource_option_mapper[option_id] + return None + + def resource_options_ref_to_mapper(ref, option_class): """Convert the _resource_options property-dict to options attr map. diff --git a/keystone/common/resource_options/options/__init__.py b/keystone/common/resource_options/options/__init__.py index 398186aa69..bb1a874199 100644 --- a/keystone/common/resource_options/options/__init__.py +++ b/keystone/common/resource_options/options/__init__.py @@ -15,6 +15,17 @@ # in their individual registry. Each entry is imported from it's own # module directly to allow for custom implementation details as needed. +from keystone.common.resource_options.options import immutable __all__ = ( + 'IMMUTABLE_OPT', + 'check_resource_immutable', + 'check_immutable_update', + 'check_immutable_delete', ) + +# Immutable Option and helper functions +IMMUTABLE_OPT = immutable.IMMUTABLE_OPT +check_resource_immutable = immutable.check_resource_immutable +check_immutable_update = immutable.check_immutable_update +check_immutable_delete = immutable.check_immutable_delete diff --git a/keystone/common/resource_options/options/immutable.py b/keystone/common/resource_options/options/immutable.py new file mode 100644 index 0000000000..f8fd02655b --- /dev/null +++ b/keystone/common/resource_options/options/immutable.py @@ -0,0 +1,73 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Implement the "Immutable" resource option +from keystone.common.resource_options import core as ro_core +from keystone.common.validation import parameter_types +from keystone import exception + +IMMUTABLE_OPT = ( + ro_core.ResourceOption( + option_id='IMMU', + option_name='immutable', + validator=ro_core.boolean_validator, + json_schema_validation=parameter_types.boolean + )) + + +def check_resource_immutable(resource_ref): + """Check to see if a resource is immutable. + + :param resource_ref: a dict reference of a resource to inspect + """ + return resource_ref.get('options', {}).get( + IMMUTABLE_OPT.option_name, False) + + +def check_immutable_update(original_resource_ref, new_resource_ref, type, + resource_id): + """Check if an update is allowed to an immutable resource. + + Valid cases where an update is allowed: + + * Resource is not immutable + * Resource is immutable, and update to set immutable to False or None + + :param original_resource_ref: a dict resource reference representing + the current resource + :param new_resource_ref: a dict reference of the updates to perform + :param type: the resource type, e.g. 'project' + :param resource_id: the id of the resource (e.g. project['id']), + usually a UUID + :raises: ResourceUpdateForbidden + """ + immutable = check_resource_immutable(original_resource_ref) + if immutable: + new_options = new_resource_ref.get('options', {}) + if ((len(new_resource_ref.keys()) > 1) or + (IMMUTABLE_OPT.option_name not in new_options) or + (new_options[IMMUTABLE_OPT.option_name] not in (False, None))): + raise exception.ResourceUpdateForbidden( + type=type, resource_id=resource_id) + + +def check_immutable_delete(resource_ref, resource_type, resource_id): + """Check if a delete is allowed on a resource. + + :param resource_ref: dict reference of the resource + :param resource_type: resource type (str) e.g. 'project' + :param resource_id: id of the resource (str) e.g. project['id'] + :raises: ResourceDeleteForbidden + """ + if check_resource_immutable(resource_ref): + raise exception.ResourceDeleteForbidden( + type=resource_type, resource_id=resource_id) diff --git a/keystone/exception.py b/keystone/exception.py index d42db593ad..a4d3f80ed6 100644 --- a/keystone/exception.py +++ b/keystone/exception.py @@ -703,3 +703,15 @@ class CacheDeserializationError(Exception): 'obj': obj, 'data': data } ) + + +class ResourceUpdateForbidden(ForbiddenNotSecurity): + message_format = _('Unable to update immutable %(type)s resource: ' + '`%(resource_id)s. Set resource option "immutable" ' + 'to false first.') + + +class ResourceDeleteForbidden(ForbiddenNotSecurity): + message_format = _('Unable to delete immutable %(type)s resource: ' + '`%(resource_id)s. Set resource option "immutable" ' + 'to false first.') diff --git a/keystone/resource/backends/resource_options.py b/keystone/resource/backends/resource_options.py index a2d4594a40..bc9d4116e5 100644 --- a/keystone/resource/backends/resource_options.py +++ b/keystone/resource/backends/resource_options.py @@ -11,6 +11,7 @@ # under the License. from keystone.common import resource_options +from keystone.common.resource_options import options as ro_opt PROJECT_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('PROJECT') @@ -20,7 +21,7 @@ PROJECT_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('PROJECT') # This is called on import by design. def register_role_options(): for opt in [ - # PLACEHOLDER for future options + ro_opt.IMMUTABLE_OPT, ]: PROJECT_OPTIONS_REGISTRY.register_option(opt) diff --git a/keystone/resource/core.py b/keystone/resource/core.py index 04eb78f3cb..27f1ec6b9e 100644 --- a/keystone/resource/core.py +++ b/keystone/resource/core.py @@ -20,6 +20,7 @@ from keystone.common import cache from keystone.common import driver_hints from keystone.common import manager from keystone.common import provider_api +from keystone.common.resource_options import options as ro_opt from keystone.common import utils import keystone.conf from keystone import exception @@ -292,6 +293,10 @@ class Manager(manager.Manager): _('Cannot enable project %s since it has disabled ' 'parents') % project_id) + def _is_immutable(self, project_ref): + return project_ref['options'].get( + ro_opt.IMMUTABLE_OPT.option_name, False) + def _check_whole_subtree_is_disabled(self, project_id, subtree_list=None): if not subtree_list: subtree_list = self.list_projects_in_subtree(project_id) @@ -306,11 +311,23 @@ class Manager(manager.Manager): self._require_matching_domain_id(project, original_project) if original_project['is_domain']: + # prevent updates to immutable domains + ro_opt.check_immutable_update( + original_resource_ref=original_project, + new_resource_ref=project, + type='domain', + resource_id=project_id) domain = self._get_domain_from_project(original_project) self.assert_domain_not_federated(project_id, domain) url_safe_option = CONF.resource.domain_name_url_safe exception_entity = 'Domain' else: + # prevent updates to immutable projects + ro_opt.check_immutable_update( + original_resource_ref=original_project, + new_resource_ref=project, + type='project', + resource_id=project_id) url_safe_option = CONF.resource.project_name_url_safe exception_entity = 'Project' @@ -473,6 +490,11 @@ class Manager(manager.Manager): self._delete_project(project, initiator, cascade) def _delete_project(self, project, initiator=None, cascade=False): + # Prevent deletion of immutable projects + ro_opt.check_immutable_delete( + resource_ref=project, + resource_type='project', + resource_id=project['id']) project_id = project['id'] if project['is_domain'] and project['enabled']: raise exception.ValidationError( @@ -788,6 +810,11 @@ class Manager(manager.Manager): self._delete_domain(domain, initiator) def _delete_domain(self, domain, initiator=None): + # Disallow deletion of immutable domains + ro_opt.check_immutable_delete( + resource_ref=domain, + resource_type='domain', + resource_id=domain['id']) # To help avoid inadvertent deletes, we insist that the domain # has been previously disabled. This also prevents a user deleting # their own domain since, once it is disabled, they won't be able @@ -912,6 +939,12 @@ class Manager(manager.Manager): :returns: The value of the created tag """ project = self.driver.get_project(project_id) + if ro_opt.check_resource_immutable(resource_ref=project): + raise exception.ResourceUpdateForbidden( + message=_( + 'Cannot create project tags for %(project_id)s, project ' + 'is immutable. Set "immutable" option to false before ' + 'creating project tags.') % {'project_id': project_id}) tag_name = tag.strip() project['tags'].append(tag_name) self.update_project(project_id, {'tags': project['tags']}) @@ -953,7 +986,13 @@ class Manager(manager.Manager): :returns: A list of tags """ - self.driver.get_project(project_id) + project = self.driver.get_project(project_id) + if ro_opt.check_resource_immutable(resource_ref=project): + raise exception.ResourceUpdateForbidden( + message=_( + 'Cannot update project tags for %(project_id)s, project ' + 'is immutable. Set "immutable" option to false before ' + 'creating project tags.') % {'project_id': project_id}) tag_list = [t.strip() for t in tags] project = {'tags': tag_list} self.update_project(project_id, project) @@ -969,6 +1008,12 @@ class Manager(manager.Manager): does not exist on the project """ project = self.driver.get_project(project_id) + if ro_opt.check_resource_immutable(resource_ref=project): + raise exception.ResourceUpdateForbidden( + message=_( + 'Cannot delete project tags for %(project_id)s, project ' + 'is immutable. Set "immutable" option to false before ' + 'creating project tags.') % {'project_id': project_id}) try: project['tags'].remove(tag) except ValueError: diff --git a/keystone/tests/unit/assignment/test_core.py b/keystone/tests/unit/assignment/test_core.py index c0174a80f8..7aee3e42ac 100644 --- a/keystone/tests/unit/assignment/test_core.py +++ b/keystone/tests/unit/assignment/test_core.py @@ -16,6 +16,7 @@ import copy import uuid from keystone.common import provider_api +from keystone.common.resource_options import options as ro_opt from keystone import exception from keystone.tests import unit from keystone.tests.unit import default_fixtures @@ -163,3 +164,138 @@ class RoleTests(object): self.assertRaises(exception.RoleNotFound, PROVIDERS.role_api.get_role, role_id) + + def test_create_role_immutable(self): + role = unit.new_role_ref() + role_id = role['id'] + role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + role_created = PROVIDERS.role_api.create_role(role_id, role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_created) + self.assertTrue('options' in role_via_manager) + self.assertTrue( + role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + role_created['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_cannot_update_immutable_role(self): + role = unit.new_role_ref() + role_id = role['id'] + role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.role_api.create_role(role_id, role) + update_role = {'name': uuid.uuid4().hex} + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.role_api.update_role, + role_id, + update_role) + + def test_cannot_update_immutable_role_while_unsetting_immutable(self): + role = unit.new_role_ref() + role_id = role['id'] + role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.role_api.create_role(role_id, role) + update_role = { + 'name': uuid.uuid4().hex, + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + } + } + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.role_api.update_role, + role_id, + update_role) + + def test_cannot_delete_immutable_role(self): + role = unit.new_role_ref() + role_id = role['id'] + role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.role_api.create_role(role_id, role) + self.assertRaises(exception.ResourceDeleteForbidden, + PROVIDERS.role_api.delete_role, + role_id) + + def test_update_role_set_immutable(self): + role = unit.new_role_ref() + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + update_role = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + } + } + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) + role_update = PROVIDERS.role_api.update_role(role_id, update_role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in role_update['options']) + self.assertTrue( + role_update['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) + self.assertTrue( + role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_update_role_set_immutable_with_additional_updates(self): + role = unit.new_role_ref() + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + update_role = { + 'name': uuid.uuid4().hex, + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + } + } + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) + role_update = PROVIDERS.role_api.update_role(role_id, update_role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertEqual(role_update['name'], update_role['name']) + self.assertEqual(role_via_manager['name'], update_role['name']) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in role_update['options']) + self.assertTrue( + role_update['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) + self.assertTrue( + role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_update_role_unset_immutable(self): + role = unit.new_role_ref() + role_id = role['id'] + role['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.role_api.create_role(role_id, role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_via_manager) + self.assertTrue( + role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + update_role = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: False + } + } + PROVIDERS.role_api.update_role(role_id, update_role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_via_manager) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) + self.assertFalse( + role_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + update_role = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: None + } + } + role_updated = PROVIDERS.role_api.update_role(role_id, update_role) + role_via_manager = PROVIDERS.role_api.get_role(role_id) + self.assertTrue('options' in role_updated) + self.assertTrue('options' in role_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in role_updated['options']) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in role_via_manager['options']) diff --git a/keystone/tests/unit/resource/test_backends.py b/keystone/tests/unit/resource/test_backends.py index 98863e5927..3368f74ed0 100644 --- a/keystone/tests/unit/resource/test_backends.py +++ b/keystone/tests/unit/resource/test_backends.py @@ -19,6 +19,7 @@ from testtools import matchers from keystone.common import driver_hints from keystone.common import provider_api +from keystone.common.resource_options import options as ro_opt import keystone.conf from keystone import exception from keystone.resource.backends import sql as resource_sql @@ -1682,6 +1683,336 @@ class ResourceTests(object): ) self.assertEqual(project_tag_ref, []) + def test_create_project_immutable(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + + p_created = PROVIDERS.resource_api.create_project( + project['id'], project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in p_created) + self.assertTrue('options' in project_via_manager) + self.assertTrue( + project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + p_created['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_cannot_update_immutable_project(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.resource_api.create_project(project['id'], project) + + update_project = {'name': uuid.uuid4().hex} + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.resource_api.update_project, + project['id'], + update_project) + + def test_cannot_update_immutable_project_while_unsetting_immutable(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.resource_api.create_project(project['id'], project) + + update_project = { + 'name': uuid.uuid4().hex, + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + }} + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.resource_api.update_project, + project['id'], + update_project) + + def test_cannot_delete_immutable_project(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.resource_api.create_project(project['id'], project) + self.assertRaises(exception.ResourceDeleteForbidden, + PROVIDERS.resource_api.delete_project, + project['id']) + + def test_update_project_set_immutable(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + PROVIDERS.resource_api.create_project(project['id'], project) + update_project = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + }} + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in project_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + p_update = PROVIDERS.resource_api.update_project( + project['id'], update_project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in p_update['options']) + self.assertTrue( + p_update['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + self.assertTrue( + project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_update_project_set_immutable_with_additional_updates(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + PROVIDERS.resource_api.create_project(project['id'], project) + update_project = { + 'name': uuid.uuid4().hex, + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + }} + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in project_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + p_update = PROVIDERS.resource_api.update_project( + project['id'], update_project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertEqual(p_update['name'], update_project['name']) + self.assertEqual(project_via_manager['name'], update_project['name']) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in p_update['options']) + self.assertTrue( + p_update['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + self.assertTrue( + project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_update_project_unset_immutable(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.resource_api.create_project(project['id'], project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in project_via_manager) + self.assertTrue( + project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + update_project = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: False + }} + PROVIDERS.resource_api.update_project(project['id'], update_project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in project_via_manager) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + self.assertFalse( + project_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + update_project = {'name': uuid.uuid4().hex} + p_updated = PROVIDERS.resource_api.update_project( + project['id'], update_project) + self.assertEqual(p_updated['name'], update_project['name']) + + update_project = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: None + }} + p_updated = PROVIDERS.resource_api.update_project( + project['id'], update_project) + project_via_manager = PROVIDERS.resource_api.get_project(project['id']) + self.assertTrue('options' in p_updated) + self.assertTrue('options' in project_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in p_updated['options']) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in project_via_manager['options']) + + def test_cannot_delete_project_tags_immutable_project(self): + project, tags = self._create_project_and_tags(num_of_tags=2) + update_project = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + } + } + PROVIDERS.resource_api.update_project(project['id'], update_project) + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.resource_api.delete_project_tag, + project['id'], + tags[0]) + + def test_cannot_update_project_tags_immutable_project(self): + # Update and Add tag use the same API + project, tags = self._create_project_and_tags(num_of_tags=2) + update_project = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + } + } + PROVIDERS.resource_api.update_project(project['id'], update_project) + tags.append(uuid.uuid4().hex) + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.resource_api.update_project_tags, + project['id'], + tags) + + @unit.skip_if_no_multiple_domains_support + def test_create_domain_immutable(self): + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + 'options': {'immutable': True} + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue('options' in domain_via_manager) + self.assertTrue(domain_via_manager['options']['immutable']) + + @unit.skip_if_no_multiple_domains_support + def test_cannot_update_immutable_domain(self): + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + 'options': {'immutable': True} + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + update_domain = {'name': uuid.uuid4().hex} + self.assertRaises(exception.ResourceUpdateForbidden, + PROVIDERS.resource_api.update_domain, + domain_id, + update_domain) + + @unit.skip_if_no_multiple_domains_support + def test_cannot_delete_immutable_domain(self): + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + 'options': {'immutable': True} + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + self.assertRaises(exception.ResourceDeleteForbidden, + PROVIDERS.resource_api.delete_domain, + domain_id,) + + @unit.skip_if_no_multiple_domains_support + def test_cannot_delete_disabled_domain_with_immutable_project(self): + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + project = unit.new_project_ref(domain_id) + project['options'][ro_opt.IMMUTABLE_OPT.option_name] = True + PROVIDERS.resource_api.create_project(project['id'], project) + # Disable the domain + PROVIDERS.resource_api.update_domain(domain_id, {'enabled': False}) + # attempt to delete the domain, should error when the immutable + # project is reached + self.assertRaises(exception.ResourceDeleteForbidden, + PROVIDERS.resource_api.delete_domain, + domain_id) + + @unit.skip_if_no_multiple_domains_support + def test_update_domain_set_immutable(self): + # domains are projects, this should be the same as the project version + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue('options' in domain_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options']) + + domain_update = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: True + }} + d_update = PROVIDERS.resource_api.update_domain( + domain_id, domain_update) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in d_update['options']) + self.assertTrue( + d_update['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options']) + self.assertTrue( + domain_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + def test_update_domain_unset_immutable(self): + # domains are projects, this should be the same as the project version + domain_id = uuid.uuid4().hex + + domain = { + 'name': uuid.uuid4().hex, + 'id': domain_id, + 'is_domain': True, + } + + PROVIDERS.resource_api.create_domain(domain_id, domain) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue('options' in domain_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options']) + + update_domain = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: False + }} + d_updated = PROVIDERS.resource_api.update_domain( + domain_id, update_domain) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue('options' in domain_via_manager) + self.assertTrue('options' in d_updated) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options']) + self.assertTrue( + ro_opt.IMMUTABLE_OPT.option_name in d_updated['options']) + self.assertFalse( + d_updated['options'][ro_opt.IMMUTABLE_OPT.option_name]) + self.assertFalse( + domain_via_manager['options'][ro_opt.IMMUTABLE_OPT.option_name]) + + update_domain = {'name': uuid.uuid4().hex} + d_updated = PROVIDERS.resource_api.update_domain( + domain_id, update_domain) + self.assertEqual(d_updated['name'], update_domain['name']) + + update_domain = { + 'options': { + ro_opt.IMMUTABLE_OPT.option_name: None + }} + d_updated = PROVIDERS.resource_api.update_domain( + domain_id, update_domain) + domain_via_manager = PROVIDERS.resource_api.get_domain(domain_id) + self.assertTrue('options' in d_updated) + self.assertTrue('options' in domain_via_manager) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in d_updated['options']) + self.assertFalse( + ro_opt.IMMUTABLE_OPT.option_name in domain_via_manager['options']) + class ResourceDriverTests(object): """Test for the resource driver. diff --git a/keystone/tests/unit/test_backend_ldap.py b/keystone/tests/unit/test_backend_ldap.py index aa7a50747f..8328beec5a 100644 --- a/keystone/tests/unit/test_backend_ldap.py +++ b/keystone/tests/unit/test_backend_ldap.py @@ -116,6 +116,24 @@ def _assert_backends(testcase, **kwargs): class IdentityTests(identity_tests.IdentityTests): + def test_update_domain_set_immutable(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + + def test_cannot_delete_disabled_domain_with_immutable(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + + def test_delete_immutable_domain(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + + def test_create_domain_immutable(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + + def test_update_domain_unset_immutable(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + + def test_cannot_update_immutable_domain(self): + self.skip_test_overrides('N/A: LDAP does not support multiple domains') + def test_delete_user_with_group_project_domain_links(self): self.skip_test_overrides('N/A: LDAP does not support multiple domains')