Remove foreign assignments when deleting a domain

When we delete a domain, we remove all the users/groups/projects
within that domain (along with any role assignments any of those
users/groups have on any projects/domains). However, we do not
remove any role assignments on the domain being deleted for
users/groups from other domains.

Co-Author: Ajaya Agrawal <ajku.agr@gmail.com>
Co-Author: Samuel de Medeiros Queiroz <samueldmq@gmail.com>
Co-Author: David Stanek <dstanek@dstanek.com>

Closes-Bug: #1277847
Change-Id: I94cd3c242633cf013df0fd7ec32978c131c235ee
This commit is contained in:
wanghong 2014-10-10 14:36:57 +08:00 committed by David Stanek
parent a67b1c70a1
commit ab565d1eb9
4 changed files with 91 additions and 2 deletions

View File

@ -264,6 +264,14 @@ class Assignment(keystone_assignment.AssignmentDriverV9):
q = q.filter_by(role_id=role_id)
q.delete(False)
def delete_domain_assignments(self, domain_id):
with sql.session_for_write() as session:
q = session.query(RoleAssignment)
q = q.filter(RoleAssignment.target_id == domain_id).filter(
(RoleAssignment.type == AssignmentType.USER_DOMAIN) |
(RoleAssignment.type == AssignmentType.GROUP_DOMAIN))
q.delete(False)
def delete_user_assignments(self, user_id):
with sql.session_for_write() as session:
q = session.query(RoleAssignment)

View File

@ -29,7 +29,7 @@ from keystone.common import driver_hints
from keystone.common import manager
from keystone import exception
from keystone.i18n import _
from keystone.i18n import _LI, _LE
from keystone.i18n import _LI, _LE, _LW
from keystone import notifications
@ -49,6 +49,7 @@ MEMOIZE_COMPUTED_ASSIGNMENTS = cache.get_memoization_decorator(
region=COMPUTED_ASSIGNMENTS_REGION)
@notifications.listener
@dependency.provider('assignment_api')
@dependency.requires('credential_api', 'identity_api', 'resource_api',
'revoke_api', 'role_api')
@ -99,6 +100,17 @@ class Manager(manager.Manager):
elif not isinstance(self.driver, AssignmentDriverV9):
raise exception.UnsupportedDriverVersion(driver=assignment_driver)
self.event_callbacks = {
notifications.ACTIONS.deleted: {
'domain': [self._delete_domain_assignments],
},
}
def _delete_domain_assignments(self, service, resource_type, operations,
payload):
domain_id = payload['resource_info']
self.driver.delete_domain_assignments(domain_id)
def _get_group_ids_for_user_id(self, user_id):
# TODO(morganfainberg): Implement a way to get only group_ids
# instead of the more expensive to_dict() call for each record.
@ -1340,7 +1352,10 @@ class AssignmentDriverV9(AssignmentDriverBase):
"""
pass
@abc.abstractmethod
def delete_domain_assignments(self, domain_id):
"""Deletes all assignments for a domain."""
raise exception.NotImplemented()
class V9AssignmentWrapperForV8Driver(AssignmentDriverV9):
@ -1373,6 +1388,14 @@ class V9AssignmentWrapperForV8Driver(AssignmentDriverV9):
def __init__(self, wrapped_driver):
self.driver = wrapped_driver
def delete_domain_assignments(self, domain_id):
"""Deletes all assignments for a domain."""
msg = _LW('delete_domain_assignments method not found in custom '
'assignment driver. Domain assignments for domain (%s) to '
'users from other domains will not be removed. This was '
'added in V9 of the assignment driver.')
LOG.warning(msg, domain_id)
def default_role_driver(self):
return self.driver.default_role_driver()

View File

@ -2419,6 +2419,51 @@ class AssignmentTests(AssignmentTestHelperMixin):
for assignment in group_assignments:
self.assertThat(assignment.keys(), matchers.Contains('user_id'))
def test_remove_foreign_assignments_when_deleting_a_domain(self):
# A user and a group are in default domain and have assigned a role on
# two new domains. This test makes sure that when one of the new
# domains is deleted, the role assignments for the user and the group
# from the default domain are deleted only on that domain.
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
role = unit.new_role_ref()
role = self.role_api.create_role(role['id'], role)
new_domains = [unit.new_domain_ref(), unit.new_domain_ref()]
for new_domain in new_domains:
self.resource_api.create_domain(new_domain['id'], new_domain)
self.assignment_api.create_grant(group_id=group['id'],
domain_id=new_domain['id'],
role_id=role['id'])
self.assignment_api.create_grant(user_id=self.user_two['id'],
domain_id=new_domain['id'],
role_id=role['id'])
# Check there are 4 role assignments for that role
role_assignments = self.assignment_api.list_role_assignments(
role_id=role['id'])
self.assertThat(role_assignments, matchers.HasLength(4))
# Delete first new domain and check only 2 assignments were left
self.resource_api.update_domain(new_domains[0]['id'],
{'enabled': False})
self.resource_api.delete_domain(new_domains[0]['id'])
role_assignments = self.assignment_api.list_role_assignments(
role_id=role['id'])
self.assertThat(role_assignments, matchers.HasLength(2))
# Delete second new domain and check no assignments were left
self.resource_api.update_domain(new_domains[1]['id'],
{'enabled': False})
self.resource_api.delete_domain(new_domains[1]['id'])
role_assignments = self.assignment_api.list_role_assignments(
role_id=role['id'])
self.assertEqual([], role_assignments)
class InheritanceTests(AssignmentTestHelperMixin):

View File

@ -1010,6 +1010,13 @@ class BaseLDAPIdentity(identity_tests.IdentityTests,
super(BaseLDAPIdentity, self).
test_create_project_with_domain_id_mismatch_to_parent_domain)
def test_remove_foreign_assignments_when_deleting_a_domain(self):
"""Multiple domains are not supported."""
self.assertRaises(
(exception.ValidationError, exception.DomainNotFound),
super(BaseLDAPIdentity,
self).test_remove_foreign_assignments_when_deleting_a_domain)
class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
@ -2715,6 +2722,12 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
super(BaseLDAPIdentity, self).\
test_create_project_with_domain_id_mismatch_to_parent_domain
def test_remove_foreign_assignments_when_deleting_a_domain(self):
# With multi LDAP this method should work, so override the override
# from BaseLDAPIdentity
base = super(BaseLDAPIdentity, self)
base.test_remove_foreign_assignments_when_deleting_a_domain()
class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
"""Class to test the use of domain configs stored in the database.