User shouldn't remove their own roles on project

Added a check if the user is trying to delete their
own 'admin' role on the current project. This is added
to prevent the users from locking their own
access to Dashboard. Also, if the 'admin' role on the
current project is removed, Dashboard automatically
switches to another project for which the user has 'admin'
role, but the previous project still is displayed as current.

Fixes bug 1046538

Also improved the unit test scenarios a bit.

Change-Id: I1e60a11d628d6490ad24a8149b43ac307afb4780
This commit is contained in:
Tihomir Trifonov 2012-10-10 17:29:20 +03:00
parent cb8e7c1f8f
commit 35a6708dcc
3 changed files with 124 additions and 120 deletions

View File

@ -139,10 +139,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests):
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
for user in ulist:
for user_id in ulist:
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
user_id=user_id,
role_id=role.id)
api.nova.tenant_quota_update(IsA(http.HttpRequest),
@ -261,10 +261,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests):
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
for user in ulist:
for user_id in ulist:
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
user_id=user_id,
role_id=role.id)
api.nova.tenant_quota_update(IsA(http.HttpRequest),
@ -320,10 +320,10 @@ class CreateProjectWorkflowTests(test.BaseAdminViewTests):
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
for user in ulist:
for user_id in ulist:
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
user_id=user_id,
role_id=role.id) \
.AndRaise(self.exceptions.keystone)
break
@ -443,13 +443,12 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
'add_tenant_user_role'),
api.keystone: ('user_list',
'role_list',)})
def test_update_project_post(self):
def test_update_project_save(self):
project = self.tenants.first()
quota = self.quotas.first()
default_role = self.roles.first()
users = self.users.list()
roles = self.roles.list()
current_roles = self.roles.list()
# get/init
api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \
@ -466,10 +465,9 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id).AndReturn(roles)
role_ids = [role.id for role in roles]
if role_ids:
workflow_data.setdefault("role_" + role_ids[0], []) \
.append(user.id)
workflow_data["role_1"] = ['3'] # admin role
workflow_data["role_2"] = ['2'] # member role
# update some fields
project._info["name"] = "updated name"
@ -494,40 +492,41 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.keystone.user_list(IsA(http.HttpRequest),
tenant_id=self.tenant.id).AndReturn(users)
for user in users:
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id) \
.AndReturn(current_roles)
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
if role not in current_roles:
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
role_id=role.id)
else:
current_roles.pop(current_roles.index(role))
for to_delete in current_roles:
# admin user - try to remove all roles on current project, warning
api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \
.AndReturn(roles)
# member user 1 - has role 1, will remove it
api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \
.AndReturn((roles[0],))
# remove role 1
api.remove_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user.id,
role_id=to_delete.id)
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
for user in ulist:
if not filter(lambda x: user == x.id, users):
user_id='2',
role_id='1')
# add role 2
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
role_id=role.id)
user_id='2',
role_id='2')
# member user 3 - has role 2
api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \
.AndReturn((roles[1],))
# remove role 2
api.remove_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id='3',
role_id='2')
# add role 1
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id='3',
role_id='1')
api.tenant_quota_update(IsA(http.HttpRequest),
project.id,
**updated_quota)
self.mox.ReplayAll()
# submit form data
@ -542,6 +541,7 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
res = self.client.post(url, workflow_data)
self.assertNoFormErrors(res)
self.assertMessageCount(error=0, warning=1)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api: ('tenant_get',)})
@ -646,7 +646,6 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
default_role = self.roles.first()
users = self.users.list()
roles = self.roles.list()
current_roles = self.roles.list()
# get/init
api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \
@ -659,14 +658,14 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.keystone.role_list(IsA(http.HttpRequest)).AndReturn(roles)
workflow_data = {}
for user in users:
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id).AndReturn(roles)
role_ids = [role.id for role in roles]
if role_ids:
workflow_data.setdefault("role_" + role_ids[0], []) \
.append(user.id)
workflow_data["role_1"] = ['1', '3'] # admin role
workflow_data["role_2"] = ['1', '2', '3'] # member role
# update some fields
project._info["name"] = "updated name"
@ -692,35 +691,22 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.keystone.user_list(IsA(http.HttpRequest),
tenant_id=self.tenant.id).AndReturn(users)
for user in users:
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id) \
.AndReturn(current_roles)
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
if role not in current_roles:
# admin user - try to remove all roles on current project, warning
api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \
.AndReturn(roles)
# member user 1 - has role 1, will remove it
api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \
.AndReturn((roles[1],))
# member user 3 - has role 2
api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \
.AndReturn((roles[0],))
# add role 2
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
role_id=role.id)
else:
current_roles.pop(current_roles.index(role))
for to_delete in current_roles:
api.remove_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user.id,
role_id=to_delete.id)
for role in roles:
if "role_" + role.id in workflow_data:
ulist = workflow_data["role_" + role.id]
for user in ulist:
if not filter(lambda x: user == x.id, users):
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
role_id=role.id)
user_id='3',
role_id='2')
api.tenant_quota_update(IsA(http.HttpRequest),
project.id,
@ -740,6 +726,7 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
res = self.client.post(url, workflow_data)
self.assertNoFormErrors(res)
self.assertMessageCount(error=1, warning=0)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api: ('tenant_get',
@ -757,7 +744,6 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
default_role = self.roles.first()
users = self.users.list()
roles = self.roles.list()
current_roles = self.roles.list()
# get/init
api.tenant_get(IsA(http.HttpRequest), self.tenant.id, admin=True) \
@ -774,10 +760,8 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id).AndReturn(roles)
role_ids = [role.id for role in roles]
if role_ids:
workflow_data.setdefault("role_" + role_ids[0], []) \
.append(user.id)
workflow_data["role_1"] = ['1', '3'] # admin role
workflow_data["role_2"] = ['1', '2', '3'] # member role
# update some fields
project._info["name"] = "updated name"
@ -802,28 +786,23 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
api.keystone.user_list(IsA(http.HttpRequest),
tenant_id=self.tenant.id).AndReturn(users)
for user in users:
api.roles_for_user(IsA(http.HttpRequest),
user.id,
self.tenant.id) \
.AndReturn(current_roles)
for role in roles:
if "role_" + role.id in workflow_data:
if role not in current_roles:
# admin user - try to remove all roles on current project, warning
api.roles_for_user(IsA(http.HttpRequest), '1', self.tenant.id) \
.AndReturn(roles)
# member user 1 - has role 1, will remove it
api.roles_for_user(IsA(http.HttpRequest), '2', self.tenant.id) \
.AndReturn((roles[1],))
# member user 3 - has role 2
api.roles_for_user(IsA(http.HttpRequest), '3', self.tenant.id) \
.AndReturn((roles[0],))
# add role 2
api.add_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user,
role_id=role.id)
else:
current_roles.pop(current_roles.index(role))
for to_delete in current_roles:
api.remove_tenant_user_role(IsA(http.HttpRequest),
tenant_id=self.tenant.id,
user_id=user.id,
role_id=to_delete.id) \
user_id='3',
role_id='2')\
.AndRaise(self.exceptions.nova)
break
break
self.mox.ReplayAll()
@ -839,4 +818,5 @@ class UpdateProjectWorkflowTests(test.BaseAdminViewTests):
res = self.client.post(url, workflow_data)
self.assertNoFormErrors(res)
self.assertMessageCount(error=1, warning=0)
self.assertRedirectsNoFollow(res, INDEX_URL)

View File

@ -25,6 +25,7 @@ from django.core.urlresolvers import reverse
from horizon import exceptions
from horizon import workflows
from horizon import forms
from horizon import messages
from openstack_dashboard import api
@ -299,12 +300,15 @@ class UpdateProject(workflows.Workflow):
tenant_id=project_id)
users_to_modify = len(project_members)
for user in project_members:
current_roles = api.roles_for_user(self.request,
current_roles = [role for role in
api.roles_for_user(self.request,
user.id,
project_id)
project_id)]
effective_roles = []
for role in available_roles:
role_list = data["role_" + role.id]
if user.id in role_list:
effective_roles.append(role)
if role not in current_roles:
# user role has changed
api.add_tenant_user_role(request,
@ -314,6 +318,16 @@ class UpdateProject(workflows.Workflow):
else:
# user role is unchanged
current_roles.pop(current_roles.index(role))
if user.id == request.user.id and \
project_id == request.user.tenant_id and \
any(x.name == 'admin' for x in current_roles):
# Cannot remove "admin" role on current(admin) project
msg = _('You cannot remove the "admin" role from the '
'project you are currently logged into. Please '
'switch to another project with admin permissions '
'or remove the role manually via the CLI')
messages.warning(request, msg)
else:
# delete user's removed roles
for to_delete in current_roles:
api.remove_tenant_user_role(request,
@ -330,11 +344,11 @@ class UpdateProject(workflows.Workflow):
for role in available_roles:
role_list = data["role_" + role.id]
users_added = 0
for user in role_list:
if not filter(lambda x: user == x.id, project_members):
for user_id in role_list:
if not filter(lambda x: user_id == x.id, project_members):
api.add_tenant_user_role(request,
tenant_id=project_id,
user_id=user,
user_id=user_id,
role_id=role.id)
users_added += 1
users_to_modify -= users_added

View File

@ -99,7 +99,7 @@ def data(TEST):
member_role_dict = {'id': "2",
'name': settings.OPENSTACK_KEYSTONE_DEFAULT_ROLE}
member_role = roles.Role(roles.RoleManager, member_role_dict)
TEST.roles.add(member_role, admin_role)
TEST.roles.add(admin_role, member_role)
TEST.roles.admin = admin_role
TEST.roles.member = member_role
@ -110,11 +110,21 @@ def data(TEST):
'token': 'test_token',
'enabled': True}
user = users.User(users.UserManager(None), user_dict)
user_dict.update({'id': "2",
user_dict = {'id': "2",
'name': 'user_two',
'email': 'two@example.com'})
'email': 'two@example.com',
'password': 'password',
'token': 'test_token',
'enabled': True}
user2 = users.User(users.UserManager(None), user_dict)
TEST.users.add(user, user2)
user_dict = {'id': "3",
'name': 'user_three',
'email': 'three@example.com',
'password': 'password',
'token': 'test_token',
'enabled': True}
user3 = users.User(users.UserManager(None), user_dict)
TEST.users.add(user, user2, user3)
TEST.user = user # Your "current" user
TEST.user.service_catalog = SERVICE_CATALOG