From 35a6708dccdc71d69a147daf0b0c2fde17489d4b Mon Sep 17 00:00:00 2001 From: Tihomir Trifonov Date: Wed, 10 Oct 2012 17:29:20 +0300 Subject: [PATCH] User shouldn't remove their own roles on project Added a check if the user is trying to delete their own 'admin' role on the current project. This is added to prevent the users from locking their own access to Dashboard. Also, if the 'admin' role on the current project is removed, Dashboard automatically switches to another project for which the user has 'admin' role, but the previous project still is displayed as current. Fixes bug 1046538 Also improved the unit test scenarios a bit. Change-Id: I1e60a11d628d6490ad24a8149b43ac307afb4780 --- .../dashboards/admin/projects/tests.py | 186 ++++++++---------- .../dashboards/admin/projects/workflows.py | 38 ++-- .../test/test_data/keystone_data.py | 20 +- 3 files changed, 124 insertions(+), 120 deletions(-) diff --git a/openstack_dashboard/dashboards/admin/projects/tests.py b/openstack_dashboard/dashboards/admin/projects/tests.py index 2ea5d02831..a7d264125c 100644 --- a/openstack_dashboard/dashboards/admin/projects/tests.py +++ b/openstack_dashboard/dashboards/admin/projects/tests.py @@ -139,10 +139,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests): for role in roles: if "role_" + role.id in workflow_data: ulist = workflow_data["role_" + role.id] - for user in ulist: + for user_id in ulist: api.add_tenant_user_role(IsA(http.HttpRequest), tenant_id=self.tenant.id, - user_id=user, + user_id=user_id, role_id=role.id) api.nova.tenant_quota_update(IsA(http.HttpRequest), @@ -261,10 +261,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests): for role in roles: if "role_" + role.id in workflow_data: ulist = workflow_data["role_" + role.id] - for user in ulist: + for user_id in ulist: api.add_tenant_user_role(IsA(http.HttpRequest), tenant_id=self.tenant.id, - user_id=user, + user_id=user_id, role_id=role.id) api.nova.tenant_quota_update(IsA(http.HttpRequest), @@ -320,10 +320,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests): for role in roles: if "role_" + role.id in workflow_data: ulist = workflow_data["role_" + role.id] - for user in ulist: + for user_id in ulist: api.add_tenant_user_role(IsA(http.HttpRequest), tenant_id=self.tenant.id, - user_id=user, + user_id=user_id, role_id=role.id) \ .AndRaise(self.exceptions.keystone) break @@ -443,13 +443,12 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): 'add_tenant_user_role'), api.keystone: ('user_list', 'role_list',)}) - def test_update_project_post(self): + def test_update_project_save(self): project = self.tenants.first() quota = self.quotas.first() default_role = self.roles.first() users = self.users.list() roles = self.roles.list() - current_roles = self.roles.list() # get/init api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \ @@ -466,10 +465,9 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.roles_for_user(IsA(http.HttpRequest), user.id, self.tenant.id).AndReturn(roles) - role_ids = [role.id for role in roles] - if role_ids: - workflow_data.setdefault("role_" + role_ids[0], []) \ - .append(user.id) + + workflow_data["role_1"] = ['3'] # admin role + workflow_data["role_2"] = ['2'] # member role # update some fields project._info["name"] = "updated name" @@ -494,40 +492,41 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.keystone.user_list(IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(users) - for user in users: - api.roles_for_user(IsA(http.HttpRequest), - user.id, - self.tenant.id) \ - .AndReturn(current_roles) - for role in roles: - if "role_" + role.id in workflow_data: - ulist = workflow_data["role_" + role.id] - if role not in current_roles: - api.add_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user, - role_id=role.id) - else: - current_roles.pop(current_roles.index(role)) - for to_delete in current_roles: - api.remove_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user.id, - role_id=to_delete.id) - for role in roles: - if "role_" + role.id in workflow_data: - ulist = workflow_data["role_" + role.id] - for user in ulist: - if not filter(lambda x: user == x.id, users): - api.add_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user, - role_id=role.id) + # admin user - try to remove all roles on current project, warning + api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \ + .AndReturn(roles) + + # member user 1 - has role 1, will remove it + api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \ + .AndReturn((roles[0],)) + # remove role 1 + api.remove_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='2', + role_id='1') + # add role 2 + api.add_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='2', + role_id='2') + + # member user 3 - has role 2 + api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \ + .AndReturn((roles[1],)) + # remove role 2 + api.remove_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='3', + role_id='2') + # add role 1 + api.add_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='3', + role_id='1') api.tenant_quota_update(IsA(http.HttpRequest), project.id, **updated_quota) - self.mox.ReplayAll() # submit form data @@ -542,6 +541,7 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): res = self.client.post(url, workflow_data) self.assertNoFormErrors(res) + self.assertMessageCount(error=0, warning=1) self.assertRedirectsNoFollow(res, INDEX_URL) @test.create_stubs({api: ('tenant_get',)}) @@ -646,7 +646,6 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): default_role = self.roles.first() users = self.users.list() roles = self.roles.list() - current_roles = self.roles.list() # get/init api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \ @@ -659,14 +658,14 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.keystone.role_list(IsA(http.HttpRequest)).AndReturn(roles) workflow_data = {} + for user in users: api.roles_for_user(IsA(http.HttpRequest), user.id, self.tenant.id).AndReturn(roles) - role_ids = [role.id for role in roles] - if role_ids: - workflow_data.setdefault("role_" + role_ids[0], []) \ - .append(user.id) + + workflow_data["role_1"] = ['1', '3'] # admin role + workflow_data["role_2"] = ['1', '2', '3'] # member role # update some fields project._info["name"] = "updated name" @@ -692,35 +691,22 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.keystone.user_list(IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(users) - for user in users: - api.roles_for_user(IsA(http.HttpRequest), - user.id, - self.tenant.id) \ - .AndReturn(current_roles) - for role in roles: - if "role_" + role.id in workflow_data: - ulist = workflow_data["role_" + role.id] - if role not in current_roles: - api.add_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user, - role_id=role.id) - else: - current_roles.pop(current_roles.index(role)) - for to_delete in current_roles: - api.remove_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user.id, - role_id=to_delete.id) - for role in roles: - if "role_" + role.id in workflow_data: - ulist = workflow_data["role_" + role.id] - for user in ulist: - if not filter(lambda x: user == x.id, users): - api.add_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user, - role_id=role.id) + # admin user - try to remove all roles on current project, warning + api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \ + .AndReturn(roles) + + # member user 1 - has role 1, will remove it + api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \ + .AndReturn((roles[1],)) + + # member user 3 - has role 2 + api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \ + .AndReturn((roles[0],)) + # add role 2 + api.add_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='3', + role_id='2') api.tenant_quota_update(IsA(http.HttpRequest), project.id, @@ -740,6 +726,7 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): res = self.client.post(url, workflow_data) self.assertNoFormErrors(res) + self.assertMessageCount(error=1, warning=0) self.assertRedirectsNoFollow(res, INDEX_URL) @test.create_stubs({api: ('tenant_get', @@ -757,7 +744,6 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): default_role = self.roles.first() users = self.users.list() roles = self.roles.list() - current_roles = self.roles.list() # get/init api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \ @@ -774,10 +760,8 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.roles_for_user(IsA(http.HttpRequest), user.id, self.tenant.id).AndReturn(roles) - role_ids = [role.id for role in roles] - if role_ids: - workflow_data.setdefault("role_" + role_ids[0], []) \ - .append(user.id) + workflow_data["role_1"] = ['1', '3'] # admin role + workflow_data["role_2"] = ['1', '2', '3'] # member role # update some fields project._info["name"] = "updated name" @@ -802,28 +786,23 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): api.keystone.user_list(IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(users) - for user in users: - api.roles_for_user(IsA(http.HttpRequest), - user.id, - self.tenant.id) \ - .AndReturn(current_roles) - for role in roles: - if "role_" + role.id in workflow_data: - if role not in current_roles: - api.add_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user, - role_id=role.id) - else: - current_roles.pop(current_roles.index(role)) - for to_delete in current_roles: - api.remove_tenant_user_role(IsA(http.HttpRequest), - tenant_id=self.tenant.id, - user_id=user.id, - role_id=to_delete.id) \ - .AndRaise(self.exceptions.nova) - break - break + # admin user - try to remove all roles on current project, warning + api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \ + .AndReturn(roles) + + # member user 1 - has role 1, will remove it + api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \ + .AndReturn((roles[1],)) + + # member user 3 - has role 2 + api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \ + .AndReturn((roles[0],)) + # add role 2 + api.add_tenant_user_role(IsA(http.HttpRequest), + tenant_id=self.tenant.id, + user_id='3', + role_id='2')\ + .AndRaise(self.exceptions.nova) self.mox.ReplayAll() @@ -839,4 +818,5 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests): res = self.client.post(url, workflow_data) self.assertNoFormErrors(res) + self.assertMessageCount(error=1, warning=0) self.assertRedirectsNoFollow(res, INDEX_URL) diff --git a/openstack_dashboard/dashboards/admin/projects/workflows.py b/openstack_dashboard/dashboards/admin/projects/workflows.py index e6c72f7a2a..3b98ad4d62 100644 --- a/openstack_dashboard/dashboards/admin/projects/workflows.py +++ b/openstack_dashboard/dashboards/admin/projects/workflows.py @@ -25,6 +25,7 @@ from django.core.urlresolvers import reverse from horizon import exceptions from horizon import workflows from horizon import forms +from horizon import messages from openstack_dashboard import api @@ -299,12 +300,15 @@ class UpdateProject(workflows.Workflow): tenant_id=project_id) users_to_modify = len(project_members) for user in project_members: - current_roles = api.roles_for_user(self.request, - user.id, - project_id) + current_roles = [role for role in + api.roles_for_user(self.request, + user.id, + project_id)] + effective_roles = [] for role in available_roles: role_list = data["role_" + role.id] if user.id in role_list: + effective_roles.append(role) if role not in current_roles: # user role has changed api.add_tenant_user_role(request, @@ -314,12 +318,22 @@ class UpdateProject(workflows.Workflow): else: # user role is unchanged current_roles.pop(current_roles.index(role)) - # delete user's removed roles - for to_delete in current_roles: - api.remove_tenant_user_role(request, - tenant_id=project_id, - user_id=user.id, - role_id=to_delete.id) + if user.id == request.user.id and \ + project_id == request.user.tenant_id and \ + any(x.name == 'admin' for x in current_roles): + # Cannot remove "admin" role on current(admin) project + msg = _('You cannot remove the "admin" role from the ' + 'project you are currently logged into. Please ' + 'switch to another project with admin permissions ' + 'or remove the role manually via the CLI') + messages.warning(request, msg) + else: + # delete user's removed roles + for to_delete in current_roles: + api.remove_tenant_user_role(request, + tenant_id=project_id, + user_id=user.id, + role_id=to_delete.id) users_to_modify -= 1 # add new roles to project @@ -330,11 +344,11 @@ class UpdateProject(workflows.Workflow): for role in available_roles: role_list = data["role_" + role.id] users_added = 0 - for user in role_list: - if not filter(lambda x: user == x.id, project_members): + for user_id in role_list: + if not filter(lambda x: user_id == x.id, project_members): api.add_tenant_user_role(request, tenant_id=project_id, - user_id=user, + user_id=user_id, role_id=role.id) users_added += 1 users_to_modify -= users_added diff --git a/openstack_dashboard/test/test_data/keystone_data.py b/openstack_dashboard/test/test_data/keystone_data.py index ae48b21491..03d89ba3f4 100644 --- a/openstack_dashboard/test/test_data/keystone_data.py +++ b/openstack_dashboard/test/test_data/keystone_data.py @@ -99,7 +99,7 @@ def data(TEST): member_role_dict = {'id': "2", 'name': settings.OPENSTACK_KEYSTONE_DEFAULT_ROLE} member_role = roles.Role(roles.RoleManager, member_role_dict) - TEST.roles.add(member_role, admin_role) + TEST.roles.add(admin_role, member_role) TEST.roles.admin = admin_role TEST.roles.member = member_role @@ -110,11 +110,21 @@ def data(TEST): 'token': 'test_token', 'enabled': True} user = users.User(users.UserManager(None), user_dict) - user_dict.update({'id': "2", - 'name': 'user_two', - 'email': 'two@example.com'}) + user_dict = {'id': "2", + 'name': 'user_two', + 'email': 'two@example.com', + 'password': 'password', + 'token': 'test_token', + 'enabled': True} user2 = users.User(users.UserManager(None), user_dict) - TEST.users.add(user, user2) + user_dict = {'id': "3", + 'name': 'user_three', + 'email': 'three@example.com', + 'password': 'password', + 'token': 'test_token', + 'enabled': True} + user3 = users.User(users.UserManager(None), user_dict) + TEST.users.add(user, user2, user3) TEST.user = user # Your "current" user TEST.user.service_catalog = SERVICE_CATALOG