residual grants after delete action (bug1125637)

remove all applicable grants when role is deleted
(sql/kvs solution only)

Fixes: bug #1125637
Change-Id: I3a958c6d56739e37a95f6c713fab154827e9ceca
This commit is contained in:
Gordon Chung 2013-02-14 19:55:00 -05:00
parent 2505662bba
commit e16742bdf2
3 changed files with 63 additions and 37 deletions

View File

@ -402,21 +402,33 @@ class Identity(kvs.Base, identity.Driver):
return role return role
def delete_role(self, role_id): def delete_role(self, role_id):
try: self.get_role(role_id)
self.db.delete('role-%s' % role_id)
metadata_keys = filter(lambda x: x.startswith("metadata-"), metadata_keys = filter(lambda x: x.startswith("metadata-"),
self.db.keys()) self.db.keys())
for key in metadata_keys: for key in metadata_keys:
tenant_id = key.split('-')[1] meta_id1 = key.split('-')[1]
user_id = key.split('-')[2] meta_id2 = key.split('-')[2]
try: try:
self.remove_role_from_user_and_project(user_id, self.delete_grant(role_id, project_id=meta_id1,
tenant_id, user_id=meta_id2)
role_id)
except exception.RoleNotFound:
pass
except exception.NotFound: except exception.NotFound:
raise exception.RoleNotFound(role_id=role_id) pass
try:
self.delete_grant(role_id, project_id=meta_id1,
group_id=meta_id2)
except exception.NotFound:
pass
try:
self.delete_grant(role_id, domain_id=meta_id1,
user_id=meta_id2)
except exception.NotFound:
pass
try:
self.delete_grant(role_id, domain_id=meta_id1,
group_id=meta_id2)
except exception.NotFound:
pass
self.db.delete('role-%s' % role_id)
role_list = set(self.db.get('role_list', [])) role_list = set(self.db.get('role_list', []))
role_list.remove(role_id) role_list.remove(role_id)
self.db.set('role_list', list(role_list)) self.db.set('role_list', list(role_list))

View File

@ -1003,14 +1003,29 @@ class Identity(sql.Base, identity.Driver):
with session.begin(): with session.begin():
for metadata_ref in session.query(UserProjectGrant): for metadata_ref in session.query(UserProjectGrant):
metadata = metadata_ref.to_dict()
try: try:
self.remove_role_from_user_and_project( self.delete_grant(role_id, user_id=metadata_ref.user_id,
metadata['user_id'], metadata['project_id'], role_id) project_id=metadata_ref.project_id)
except exception.RoleNotFound:
pass
for metadata_ref in session.query(UserDomainGrant):
try:
self.delete_grant(role_id, user_id=metadata_ref.user_id,
domain_id=metadata_ref.domain_id)
except exception.RoleNotFound:
pass
for metadata_ref in session.query(GroupProjectGrant):
try:
self.delete_grant(role_id, group_id=metadata_ref.group_id,
project_id=metadata_ref.project_id)
except exception.RoleNotFound:
pass
for metadata_ref in session.query(GroupDomainGrant):
try:
self.delete_grant(role_id, group_id=metadata_ref.group_id,
domain_id=metadata_ref.domain_id)
except exception.RoleNotFound: except exception.RoleNotFound:
pass pass
# FIXME(dolph): user-domain metadata needs to be updated
if not session.query(Role).filter_by(id=role_id).delete(): if not session.query(Role).filter_by(id=role_id).delete():
raise exception.RoleNotFound(role_id=role_id) raise exception.RoleNotFound(role_id=role_id)

View File

@ -1055,7 +1055,6 @@ class IdentityTests(object):
self.assertIn(role_list[7], roles_ref) self.assertIn(role_list[7], roles_ref)
def test_delete_role_with_user_and_group_grants(self): def test_delete_role_with_user_and_group_grants(self):
raise nose.exc.SkipTest('Blocked by bug 1097472')
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role1['id'], role1) self.identity_api.create_role(role1['id'], role1)
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
@ -1099,22 +1098,22 @@ class IdentityTests(object):
domain_id=domain1['id']) domain_id=domain1['id'])
self.assertEquals(len(roles_ref), 1) self.assertEquals(len(roles_ref), 1)
self.identity_api.delete_role(role1['id']) self.identity_api.delete_role(role1['id'])
self.assertRaises(exception.RoleNotFound, roles_ref = self.identity_api.list_grants(
self.identity_api.list_grants,
user_id=user1['id'], user_id=user1['id'],
project_id=project1['id']) project_id=project1['id'])
self.assertRaises(exception.RoleNotFound, self.assertEquals(len(roles_ref), 0)
self.identity_api.list_grants, roles_ref = self.identity_api.list_grants(
group_id=group1['id'], group_id=group1['id'],
project_id=project1['id']) project_id=project1['id'])
self.assertRaises(exception.RoleNotFound, self.assertEquals(len(roles_ref), 0)
self.identity_api.list_grants, roles_ref = self.identity_api.list_grants(
user_id=user1['id'], user_id=user1['id'],
domain_id=domain1['id']) domain_id=domain1['id'])
self.assertRaises(exception.RoleNotFound, self.assertEquals(len(roles_ref), 0)
self.identity_api.list_grants, roles_ref = self.identity_api.list_grants(
group_id=group1['id'], group_id=group1['id'],
domain_id=domain1['id']) domain_id=domain1['id'])
self.assertEquals(len(roles_ref), 0)
def test_delete_user_with_group_project_domain_links(self): def test_delete_user_with_group_project_domain_links(self):
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}