Merge "Avoid wrong deletion of domain assignments"
This commit is contained in:
@@ -257,7 +257,10 @@ class Assignment(keystone_assignment.AssignmentDriverV9):
|
||||
def delete_project_assignments(self, project_id):
|
||||
with sql.transaction() as session:
|
||||
q = session.query(RoleAssignment)
|
||||
q = q.filter_by(target_id=project_id)
|
||||
q = q.filter_by(target_id=project_id).filter(
|
||||
RoleAssignment.type.in_((AssignmentType.USER_PROJECT,
|
||||
AssignmentType.GROUP_PROJECT))
|
||||
)
|
||||
q.delete(False)
|
||||
|
||||
def delete_role_assignments(self, role_id):
|
||||
|
||||
@@ -28,3 +28,6 @@ class SqlIdentityV8(test_backend_sql.SqlIdentity):
|
||||
driver='keystone.assignment.V8_backends.sql.Assignment')
|
||||
self.use_specific_sql_driver_version(
|
||||
'keystone.assignment', 'backends', 'V8_')
|
||||
|
||||
def test_delete_project_assignments_same_id_as_domain(self):
|
||||
self.skipTest("V8 doesn't support project acting as a domain.")
|
||||
|
||||
@@ -4397,6 +4397,78 @@ class IdentityTests(AssignmentTestHelperMixin):
|
||||
include_names=False)
|
||||
assert_does_not_contain_names(role_assign_without_names)
|
||||
|
||||
def test_delete_project_assignments_same_id_as_domain(self):
|
||||
"""Test deleting project assignments in a scenario that
|
||||
project and domain have the same ID. Only project assignments must
|
||||
be deleted (i.e USER_PROJECT or GROUP_PROJECT).
|
||||
|
||||
Test plan:
|
||||
* Create a project and a domain with the same ID;
|
||||
* Create a user, a group and four roles;
|
||||
* Grant one role to user and one to group on project;
|
||||
* Grant one role to user and one to group on domain;
|
||||
* Delete all project assignments;
|
||||
* Domain roles must stay intact.
|
||||
"""
|
||||
# Created a common ID
|
||||
common_id = uuid.uuid4().hex
|
||||
# Create a domain
|
||||
domain = unit.new_domain_ref(id=common_id)
|
||||
domain = self.resource_api.driver.create_domain(common_id, domain)
|
||||
self.assertEqual(common_id, domain['id'])
|
||||
# Create a project
|
||||
project = unit.new_project_ref(id=common_id,
|
||||
domain_id=DEFAULT_DOMAIN_ID)
|
||||
project = self.resource_api.driver.create_project(common_id, project)
|
||||
self.assertEqual(common_id, project['id'])
|
||||
# Create a user
|
||||
user = unit.new_user_ref(domain_id=domain['id'])
|
||||
user = self.identity_api.driver.create_user(user['id'], user)
|
||||
# Create a group
|
||||
group = unit.new_group_ref(domain_id=domain['id'])
|
||||
group = self.identity_api.driver.create_group(group['id'], group)
|
||||
# Create four roles
|
||||
roles = []
|
||||
for _ in range(4):
|
||||
role = unit.new_role_ref()
|
||||
roles.append(self.role_api.create_role(role['id'], role))
|
||||
# Assign roles on domain
|
||||
self.assignment_api.driver.create_grant(user_id=user['id'],
|
||||
domain_id=domain['id'],
|
||||
role_id=roles[0]['id'])
|
||||
self.assignment_api.driver.create_grant(group_id=group['id'],
|
||||
domain_id=domain['id'],
|
||||
role_id=roles[1]['id'])
|
||||
# Assign roles on project
|
||||
self.assignment_api.driver.create_grant(user_id=user['id'],
|
||||
project_id=project['id'],
|
||||
role_id=roles[2]['id'])
|
||||
self.assignment_api.driver.create_grant(group_id=group['id'],
|
||||
project_id=project['id'],
|
||||
role_id=roles[3]['id'])
|
||||
# Make sure they were assigned
|
||||
domain_roles = self.assignment_api.list_role_assignments(
|
||||
domain_id=domain['id'])
|
||||
self.assertThat(domain_roles, matchers.HasLength(2))
|
||||
project_roles = self.assignment_api.list_role_assignments(
|
||||
project_id=project['id']
|
||||
)
|
||||
self.assertThat(project_roles, matchers.HasLength(2))
|
||||
# Delete project assignments
|
||||
self.assignment_api.delete_project_assignments(
|
||||
project_id=project['id'])
|
||||
# Assert only project assignments were deleted
|
||||
project_roles = self.assignment_api.list_role_assignments(
|
||||
project_id=project['id']
|
||||
)
|
||||
self.assertThat(project_roles, matchers.HasLength(0))
|
||||
domain_roles = self.assignment_api.list_role_assignments(
|
||||
domain_id=domain['id'])
|
||||
self.assertThat(domain_roles, matchers.HasLength(2))
|
||||
# Make sure these remaining roles are domain-related
|
||||
for role in domain_roles:
|
||||
self.assertThat(role.keys(), matchers.Contains('domain_id'))
|
||||
|
||||
|
||||
class TokenTests(object):
|
||||
def _create_token_id(self):
|
||||
|
||||
Reference in New Issue
Block a user