Merge "Handle NotFound when listing role assignments for deleted users"

This commit is contained in:
Jenkins 2017-05-16 22:46:28 +00:00 committed by Gerrit Code Review
commit ab2928f2cd
2 changed files with 82 additions and 12 deletions

View File

@ -943,19 +943,43 @@ class Manager(manager.Manager):
_domain = self.resource_api.get_domain(value)
new_assign['domain_name'] = _domain['name']
elif key == 'user_id':
_user = self.identity_api.get_user(value)
new_assign['user_name'] = _user['name']
new_assign['user_domain_id'] = _user['domain_id']
new_assign['user_domain_name'] = (
self.resource_api.get_domain(_user['domain_id'])
['name'])
try:
# Note(knikolla): Try to get the user, otherwise
# if the user wasn't found in the backend
# use empty values.
_user = self.identity_api.get_user(value)
except exception.UserNotFound:
msg = ('User %(user)s not found in the'
' backend but still has role assignments.')
LOG.warning(msg, {'user': value})
new_assign['user_name'] = ''
new_assign['user_domain_id'] = ''
new_assign['user_domain_name'] = ''
else:
new_assign['user_name'] = _user['name']
new_assign['user_domain_id'] = _user['domain_id']
new_assign['user_domain_name'] = (
self.resource_api.get_domain(_user['domain_id'])
['name'])
elif key == 'group_id':
_group = self.identity_api.get_group(value)
new_assign['group_name'] = _group['name']
new_assign['group_domain_id'] = _group['domain_id']
new_assign['group_domain_name'] = (
self.resource_api.get_domain(_group['domain_id'])
['name'])
try:
# Note(knikolla): Try to get the group, otherwise
# if the group wasn't found in the backend
# use empty values.
_group = self.identity_api.get_group(value)
except exception.GroupNotFound:
msg = ('Group %(group)s not found in the'
' backend but still has role assignments.')
LOG.warning(msg, {'group': value})
new_assign['group_name'] = ''
new_assign['group_domain_id'] = ''
new_assign['group_domain_name'] = ''
else:
new_assign['group_name'] = _group['name']
new_assign['group_domain_id'] = _group['domain_id']
new_assign['group_domain_name'] = (
self.resource_api.get_domain(_group['domain_id'])
['name'])
elif key == 'project_id':
_project = self.resource_api.get_project(value)
new_assign['project_name'] = _project['name']

View File

@ -579,6 +579,52 @@ class AssignmentTests(AssignmentTestHelperMixin):
role_id=uuid.uuid4().hex)
self.assertEqual([], assignment_list)
def test_list_role_assignments_user_not_found(self):
def _user_not_found(value):
raise exception.UserNotFound(user_id=value)
# Note(knikolla): Patch get_user to return UserNotFound
# this simulates the possibility of a user being deleted
# directly in the backend and still having lingering role
# assignments.
with mock.patch.object(self.identity_api, 'get_user',
_user_not_found):
assignment_list = self.assignment_api.list_role_assignments(
include_names=True
)
self.assertNotEqual([], assignment_list)
for assignment in assignment_list:
if 'user_name' in assignment:
# Note(knikolla): In the case of a not found user we
# populate the values with empty strings.
self.assertEqual('', assignment['user_name'])
self.assertEqual('', assignment['user_domain_id'])
self.assertEqual('', assignment['user_domain_name'])
def test_list_role_assignments_group_not_found(self):
def _group_not_found(value):
raise exception.GroupNotFound(group_id=value)
# Note(knikolla): Patch get_group to return GroupNotFound
# this simulates the case of a group being deleted
# directly in the backend and still having lingering role
# assignments.
with mock.patch.object(self.identity_api, 'get_group',
_group_not_found):
assignment_list = self.assignment_api.list_role_assignments(
include_names=True
)
self.assertNotEqual([], assignment_list)
for assignment in assignment_list:
if 'group_name' in assignment:
# Note(knikolla): In the case of a not found group we
# populate the values with empty strings.
self.assertEqual('', assignment['group_name'])
self.assertEqual('', assignment['group_domain_id'])
self.assertEqual('', assignment['group_domain_name'])
def test_add_duplicate_role_grant(self):
roles_ref = self.assignment_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])