Support for Keystone password_lock option

This feature was added in Keystone V3 API. Proposed patch adds support
to Horizon

Co-Authored-By: Ivan Kolodyazhny <e0ne@e0ne.info>
Closes-bug: #1766485
Change-Id: Ic20a58c76826d703b43fa6a2d77ae5f77dcda1f4
This commit is contained in:
vmarkov 2018-05-15 17:04:47 +03:00 committed by Ivan Kolodyazhny
parent 754804667a
commit 7f849239ea
9 changed files with 235 additions and 29 deletions

View File

@ -133,6 +133,9 @@ class CreateUserForm(PasswordMixin, BaseUserForm, AddExtraColumnMixIn):
enabled = forms.BooleanField(label=_("Enabled"), enabled = forms.BooleanField(label=_("Enabled"),
required=False, required=False,
initial=True) initial=True)
lock_password = forms.BooleanField(label=_("Lock password"),
required=False,
initial=False)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
roles = kwargs.pop('roles') roles = kwargs.pop('roles')
@ -141,7 +144,7 @@ class CreateUserForm(PasswordMixin, BaseUserForm, AddExtraColumnMixIn):
ordering = ["domain_id", "domain_name", "name", ordering = ["domain_id", "domain_name", "name",
"description", "email", "password", "description", "email", "password",
"confirm_password", "project", "role_id", "confirm_password", "project", "role_id",
"enabled"] "enabled", "lock_password"]
self.add_extra_fields(ordering) self.add_extra_fields(ordering)
self.fields = collections.OrderedDict( self.fields = collections.OrderedDict(
(key, self.fields[key]) for key in ordering) (key, self.fields[key]) for key in ordering)
@ -178,6 +181,10 @@ class CreateUserForm(PasswordMixin, BaseUserForm, AddExtraColumnMixIn):
else: else:
kwargs = {} kwargs = {}
if "lock_password" in data:
kwargs.update({'options':
{'lock_password': data['lock_password']}})
new_user = \ new_user = \
api.keystone.user_create(request, api.keystone.user_create(request,
name=data['name'], name=data['name'],
@ -235,6 +242,10 @@ class UpdateUserForm(BaseUserForm, AddExtraColumnMixIn):
project = forms.ThemableChoiceField(label=_("Primary Project"), project = forms.ThemableChoiceField(label=_("Primary Project"),
required=PROJECT_REQUIRED) required=PROJECT_REQUIRED)
lock_password = forms.BooleanField(label=_("Lock password"),
required=False,
initial=False)
def __init__(self, request, *args, **kwargs): def __init__(self, request, *args, **kwargs):
super(UpdateUserForm, self).__init__(request, *args, **kwargs) super(UpdateUserForm, self).__init__(request, *args, **kwargs)
self.add_extra_fields() self.add_extra_fields()
@ -261,6 +272,10 @@ class UpdateUserForm(BaseUserForm, AddExtraColumnMixIn):
if 'description' not in self.changed_data: if 'description' not in self.changed_data:
data.pop('description') data.pop('description')
if 'lock_password' in data:
data.update({'options': {'lock_password': data['lock_password']}})
data.pop('lock_password')
try: try:
if "email" in data: if "email" in data:
data['email'] = data['email'] data['email'] = data['email']

View File

@ -64,7 +64,9 @@ class ChangePasswordLink(policy.PolicyTargetMixin, tables.LinkAction):
("target.user.domain_id", "domain_id")) ("target.user.domain_id", "domain_id"))
def allowed(self, request, user): def allowed(self, request, user):
return api.keystone.keystone_can_edit_user() options = getattr(user, "options", {})
lock_password = options.get("lock_password", False)
return not lock_password and api.keystone.keystone_can_edit_user()
class ToggleEnabled(policy.PolicyTargetMixin, tables.BatchAction): class ToggleEnabled(policy.PolicyTargetMixin, tables.BatchAction):

View File

@ -20,6 +20,8 @@
<dd>{{ user.enabled|yesno|capfirst }}</dd> <dd>{{ user.enabled|yesno|capfirst }}</dd>
<dt>{% trans "Password Expires At" %}</dt> <dt>{% trans "Password Expires At" %}</dt>
<dd>{{ user.password_expires_at }}</dd> <dd>{{ user.password_expires_at }}</dd>
<dt>{% trans "Lock password" %}</dt>
<dd>{{ lock_password|yesno:"yes,no"|capfirst }}</dd>
<dt>{% trans "Primary Project" %}</dt> <dt>{% trans "Primary Project" %}</dt>
{% if user.project_id %} {% if user.project_id %}
<dd class="word-wrap"> <dd class="word-wrap">

View File

@ -152,9 +152,11 @@ class UsersViewTests(test.BaseAdminViewTests):
kwargs = {'phone_num': phone_number} kwargs = {'phone_num': phone_number}
self.mock_user_create.assert_called_once_with( self.mock_user_create.assert_called_once_with(
test.IsHttpRequest(), name=user.name, description=user.description, test.IsHttpRequest(), name=user.name,
email=user.email, password=user.password, project=self.tenant.id, description=user.description, email=user.email,
enabled=True, domain=domain_id, **kwargs) password=user.password, project=self.tenant.id,
enabled=True, domain=domain_id, options={'lock_password': False},
**kwargs)
self.mock_role_list.assert_called_once_with(test.IsHttpRequest()) self.mock_role_list.assert_called_once_with(test.IsHttpRequest())
self.mock_get_default_role.assert_called_once_with( self.mock_get_default_role.assert_called_once_with(
test.IsHttpRequest()) test.IsHttpRequest())
@ -229,7 +231,8 @@ class UsersViewTests(test.BaseAdminViewTests):
password=user.password, password=user.password,
project=self.tenant.id, project=self.tenant.id,
enabled=True, enabled=True,
domain=domain_id) domain=domain_id,
options={'lock_password': False})
self.mock_role_list.assert_called_once_with(test.IsHttpRequest()) self.mock_role_list.assert_called_once_with(test.IsHttpRequest())
self.mock_get_default_role.assert_called_once_with( self.mock_get_default_role.assert_called_once_with(
test.IsHttpRequest()) test.IsHttpRequest())
@ -394,6 +397,71 @@ class UsersViewTests(test.BaseAdminViewTests):
self.mock_get_default_role, 2, self.mock_get_default_role, 2,
mock.call(test.IsHttpRequest())) mock.call(test.IsHttpRequest()))
@test.create_mocks({api.keystone: ('user_create',
'get_default_domain',
'tenant_list',
'add_tenant_user_role',
'get_default_role',
'roles_for_user',
'role_list')})
def test_create_lock_pass(self):
user = self.users.get(id="1")
domain = self._get_default_domain()
domain_id = user.domain_id
role = self.roles.first()
self.mock_get_default_domain.return_value = domain
self.mock_tenant_list.return_value = [self.tenants.list(), False]
self.mock_user_create.return_value = user
self.mock_role_list.return_value = self.roles.list()
self.mock_get_default_role.return_value = role
self.mock_roles_for_user.return_value = []
self.mock_add_tenant_user_role.return_value = None
formData = {'method': 'CreateUserForm',
'domain_id': domain_id,
'name': user.name,
'description': user.description,
'email': user.email,
'password': user.password,
'project': self.tenant.id,
'role_id': self.roles.first().id,
'enabled': True,
'confirm_password': user.password,
'lock_password': True}
res = self.client.post(USER_CREATE_URL, formData)
self.assertNoFormErrors(res)
self.assertMessageCount(success=1)
self.mock_get_default_domain.assert_has_calls([
mock.call(test.IsHttpRequest()),
mock.call(test.IsHttpRequest(), False),
])
self.assertEqual(2, self.mock_get_default_domain.call_count)
if api.keystone.VERSIONS.active >= 3:
self.mock_tenant_list.assert_called_once_with(
test.IsHttpRequest(), domain=domain.id)
else:
self.mock_tenant_list.assert_called_once_with(
test.IsHttpRequest(), user=None)
self.mock_user_create.assert_called_once_with(
test.IsHttpRequest(), name=user.name,
options={'lock_password': True}, description=user.description,
email=user.email, password=user.password, project=self.tenant.id,
enabled=True, domain=domain.id)
self.mock_role_list.assert_called_once_with(test.IsHttpRequest())
self.mock_get_default_role.assert_called_once_with(
test.IsHttpRequest())
self.mock_roles_for_user.assert_called_once_with(
test.IsHttpRequest(), user.id, self.tenant.id)
self.mock_add_tenant_user_role.assert_called_once_with(
test.IsHttpRequest(), self.tenant.id, user.id, role.id)
@override_settings(USER_TABLE_EXTRA_INFO={'phone_num': 'Phone Number'}) @override_settings(USER_TABLE_EXTRA_INFO={'phone_num': 'Phone Number'})
@test.create_mocks({api.keystone: ('user_get', @test.create_mocks({api.keystone: ('user_get',
'domain_get', 'domain_get',
@ -435,6 +503,8 @@ class UsersViewTests(test.BaseAdminViewTests):
self.mock_user_update.assert_called_once_with(test.IsHttpRequest(), self.mock_user_update.assert_called_once_with(test.IsHttpRequest(),
user.id, user.id,
email=user.email, email=user.email,
options={'lock_password':
False},
name=user.name, name=user.name,
**kwargs) **kwargs)
@ -477,6 +547,8 @@ class UsersViewTests(test.BaseAdminViewTests):
self.mock_user_update.assert_called_once_with(test.IsHttpRequest(), self.mock_user_update.assert_called_once_with(test.IsHttpRequest(),
user.id, user.id,
email=user.email, email=user.email,
options={'lock_password':
False},
name=user.name, name=user.name,
project=new_project_id) project=new_project_id)
@ -518,6 +590,8 @@ class UsersViewTests(test.BaseAdminViewTests):
self.mock_user_update.assert_called_once_with(test.IsHttpRequest(), self.mock_user_update.assert_called_once_with(test.IsHttpRequest(),
user.id, user.id,
email=user.email or "", email=user.email or "",
options={'lock_password':
False},
name=user.name, name=user.name,
project=self.tenant.id) project=self.tenant.id)
@ -545,6 +619,40 @@ class UsersViewTests(test.BaseAdminViewTests):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertMessageCount(error=1) self.assertMessageCount(error=1)
self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1',
admin=True)
self.mock_domain_get.assert_called_once_with(test.IsHttpRequest(),
domain_id)
self.mock_tenant_list.assert_called_once_with(
test.IsHttpRequest(), domain=domain.id)
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_keystone_can_edit_user, 2,
mock.call())
@test.create_mocks({api.keystone: ('user_get',
'domain_get',
'tenant_list',
'keystone_can_edit_user', )})
def test_update_with_locked_password(self):
user = self.users.get(id="6")
domain_id = user.domain_id
domain = self.domains.get(id=domain_id)
self.mock_user_get.return_value = user
self.mock_domain_get.return_value = domain
self.mock_tenant_list.return_value = [self.tenants.list(), False]
self.mock_keystone_can_edit_user.return_value = False
formData = {'method': 'UpdateUserForm',
'id': user.id,
'name': user.name,
'project': self.tenant.id, }
res = self.client.post(USER_UPDATE_URL, formData)
self.assertNoFormErrors(res)
self.assertMessageCount(error=1)
self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1', self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1',
admin=True) admin=True)
self.mock_domain_get.assert_called_once_with(test.IsHttpRequest(), self.mock_domain_get.assert_called_once_with(test.IsHttpRequest(),
@ -578,8 +686,33 @@ class UsersViewTests(test.BaseAdminViewTests):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1', self.mock_user_get.assert_called_with(test.IsHttpRequest(), '1',
admin=False) admin=False)
self.assertEqual(self.mock_user_get.call_count, 1)
self.mock_user_update_password.assert_called_once_with(
test.IsHttpRequest(), user.id, test_password, admin=False)
@test.create_mocks({api.keystone: ('user_get',
'user_update_password')})
def test_change_locked_password(self):
user = self.users.get(id="6")
test_password = 'normalpwd'
self.mock_user_get.return_value = user
self.mock_user_update_password.return_value = None
formData = {'method': 'ChangePasswordForm',
'id': user.id,
'name': user.name,
'password': test_password,
'confirm_password': test_password}
res = self.client.post(USER_CHANGE_PASSWORD_URL, formData)
self.assertRedirectsNoFollow(res, USERS_INDEX_URL)
self.mock_user_get.assert_called_with(test.IsHttpRequest(), '1',
admin=False)
self.assertEqual(self.mock_user_get.call_count, 1)
self.mock_user_update_password.assert_called_once_with( self.mock_user_update_password.assert_called_once_with(
test.IsHttpRequest(), user.id, test_password, admin=False) test.IsHttpRequest(), user.id, test_password, admin=False)
@ -605,8 +738,9 @@ class UsersViewTests(test.BaseAdminViewTests):
self.assertFormError(res, "form", None, self.assertFormError(res, "form", None,
['The admin password is incorrect.']) ['The admin password is incorrect.'])
self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1', self.mock_user_get.assert_called_with(test.IsHttpRequest(), '1',
admin=False) admin=False)
self.assertEqual(self.mock_user_get.call_count, 1)
self.mock_user_verify_admin_password.assert_called_once_with( self.mock_user_verify_admin_password.assert_called_once_with(
test.IsHttpRequest(), admin_password) test.IsHttpRequest(), admin_password)
@ -1214,6 +1348,8 @@ class UsersViewTests(test.BaseAdminViewTests):
self.mock_user_update.assert_called_once_with(test.IsHttpRequest(), self.mock_user_update.assert_called_once_with(test.IsHttpRequest(),
user.id, user.id,
email=user.email, email=user.email,
options={'lock_password':
False},
name=user.name, name=user.name,
description='changed') description='changed')

View File

@ -130,6 +130,7 @@ class UpdateView(forms.ModalFormView):
def get_initial(self): def get_initial(self):
user = self.get_object() user = self.get_object()
options = getattr(user, "options", {})
domain_id = getattr(user, "domain_id", None) domain_id = getattr(user, "domain_id", None)
domain_name = '' domain_name = ''
# Retrieve the domain name where the project belongs # Retrieve the domain name where the project belongs
@ -154,7 +155,8 @@ class UpdateView(forms.ModalFormView):
'name': user.name, 'name': user.name,
'project': user.project_id, 'project': user.project_id,
'email': getattr(user, 'email', None), 'email': getattr(user, 'email', None),
'description': getattr(user, 'description', None)} 'description': getattr(user, 'description', None),
'lock_password': options.get("lock_password", False)}
if api.keystone.VERSIONS.active >= 3: if api.keystone.VERSIONS.active >= 3:
for key in settings.USER_TABLE_EXTRA_INFO: for key in settings.USER_TABLE_EXTRA_INFO:
data[key] = getattr(user, key, None) data[key] = getattr(user, key, None)
@ -209,6 +211,8 @@ class DetailView(tabs.TabView):
context["user"] = user context["user"] = user
context["url"] = self.get_redirect_url() context["url"] = self.get_redirect_url()
context["actions"] = table.render_row_actions(user) context["actions"] = table.render_row_actions(user)
options = getattr(user, "options", {})
context["lock_password"] = options.get("lock_password", False)
return context return context
@memoized.memoized_method @memoized.memoized_method

View File

@ -59,6 +59,13 @@ class PasswordForm(forms.SelfHandlingForm):
@sensitive_variables('data') @sensitive_variables('data')
def handle(self, request, data): def handle(self, request, data):
user_is_editable = api.keystone.keystone_can_edit_user() user_is_editable = api.keystone.keystone_can_edit_user()
user_id = request.user.id
user = api.keystone.user_get(self.request, user_id, admin=False)
options = getattr(user, "options", {})
lock_password = options.get("lock_password", False)
if lock_password:
messages.error(request, _('Password is locked.'))
return False
if user_is_editable: if user_is_editable:
try: try:

View File

@ -29,9 +29,11 @@ INDEX_URL = reverse('horizon:settings:password:index')
class ChangePasswordTests(test.TestCase): class ChangePasswordTests(test.TestCase):
@mock.patch.object(api.keystone, 'user_update_own_password') @test.create_mocks({
def test_change_password(self, mock_user_update_own_password): api.keystone: ['user_update_own_password', 'user_get']})
mock_user_update_own_password.return_value = None def test_change_password(self):
self.mock_user_update_own_password.return_value = None
self.mock_user_get.return_value = mock.Mock(options={})
formData = {'method': 'PasswordForm', formData = {'method': 'PasswordForm',
'current_password': 'oldpwd', 'current_password': 'oldpwd',
@ -40,22 +42,17 @@ class ChangePasswordTests(test.TestCase):
res = self.client.post(INDEX_URL, formData) res = self.client.post(INDEX_URL, formData)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
mock_user_update_own_password.assert_called_once_with( self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1',
admin=False)
self.mock_user_update_own_password.assert_called_once_with(
test.IsHttpRequest(), 'oldpwd', 'normalpwd') test.IsHttpRequest(), 'oldpwd', 'normalpwd')
def test_change_validation_passwords_not_matching(self): @test.create_mocks({
formData = {'method': 'PasswordForm', api.keystone: ['user_update_own_password', 'user_get']})
'current_password': 'currpasswd', def test_change_password_sets_logout_reason(self):
'new_password': 'testpassword',
'confirm_password': 'doesnotmatch'}
res = self.client.post(INDEX_URL, formData)
self.assertFormError(res, "form", None, ['Passwords do not match.']) self.mock_user_update_own_password.return_value = None
self.mock_user_get.return_value = mock.Mock(options={})
@mock.patch.object(api.keystone, 'user_update_own_password')
def test_change_password_sets_logout_reason(self,
mock_user_update_own_password):
mock_user_update_own_password.return_value = None
formData = {'method': 'PasswordForm', formData = {'method': 'PasswordForm',
'current_password': 'oldpwd', 'current_password': 'oldpwd',
@ -73,5 +70,30 @@ class ChangePasswordTests(test.TestCase):
redirect_response = res.client.get(path, http.QueryDict(query)) redirect_response = res.client.get(path, http.QueryDict(query))
self.assertRedirectsNoFollow(redirect_response, settings.LOGIN_URL) self.assertRedirectsNoFollow(redirect_response, settings.LOGIN_URL)
mock_user_update_own_password.assert_called_once_with( self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1',
admin=False)
self.mock_user_update_own_password.assert_called_once_with(
test.IsHttpRequest(), 'oldpwd', 'normalpwd') test.IsHttpRequest(), 'oldpwd', 'normalpwd')
def test_change_validation_passwords_not_matching(self):
formData = {'method': 'PasswordForm',
'current_password': 'currpasswd',
'new_password': 'testpassword',
'confirm_password': 'doesnotmatch'}
res = self.client.post(INDEX_URL, formData)
self.assertFormError(res, "form", None, ['Passwords do not match.'])
@test.create_mocks({
api.keystone: ['user_get']})
def test_lock_password(self):
self.mock_user_get.return_value = mock.Mock(
options={'lock_password': True})
formData = {'method': 'PasswordForm',
'current_password': 'oldpwd',
'new_password': 'normalpwd',
'confirm_password': 'normalpwd'}
res = self.client.post(INDEX_URL, formData)
self.assertNoFormErrors(res)
self.mock_user_get.assert_called_once_with(test.IsHttpRequest(), '1',
admin=False)

View File

@ -217,7 +217,18 @@ def data(TEST):
'enabled': True, 'enabled': True,
'domain_id': "1"} 'domain_id': "1"}
user5 = users.User(None, user_dict) user5 = users.User(None, user_dict)
TEST.users.add(user, user2, user3, user4, user5) user_dict = {'id': "6",
'name': 'user_six',
'description': 'test_description',
'email': 'test@example.com',
'password': 'password',
'token': 'test_token',
'project_id': '1',
'enabled': True,
'domain_id': "1",
'lock_password': True}
user6 = users.User(None, user_dict)
TEST.users.add(user, user2, user3, user4, user5, user6)
TEST.user = user # Your "current" user TEST.user = user # Your "current" user
TEST.user.service_catalog = copy.deepcopy(SERVICE_CATALOG) TEST.user.service_catalog = copy.deepcopy(SERVICE_CATALOG)

View File

@ -0,0 +1,7 @@
---
features:
- |
Added support for Keystone locking user option. Locked user can't change
own password using the self-service password change API. By default, users
are unlocked and can change their own passwords. In case of older Keystone
not supporting this feature, all users treated as unlocked.