Adds a resource for changing a user's password

blueprint v3-user-update-own-password

Change-Id: Icdf0ae76e7dfc82c01394ebbfcad4d751a33a3ef
This commit is contained in:
David Stanek 2013-10-17 16:49:48 +00:00 committed by Dolph Mathews
parent f72f369538
commit c19844f405
5 changed files with 108 additions and 4 deletions

View File

@ -37,6 +37,7 @@
"identity:create_user": "rule:admin_required",
"identity:update_user": "rule:admin_required",
"identity:delete_user": "rule:admin_required",
"identity:change_password": "rule:admin_or_owner",
"identity:get_group": "rule:admin_required",
"identity:list_groups": "rule:admin_required",

View File

@ -80,6 +80,7 @@
"identity:update_policy": "rule:cloud_admin",
"identity:delete_policy": "rule:cloud_admin",
"identity:change_password": "rule:owner",
"identity:check_token": "rule:admin_or_owner",
"identity:validate_token": "rule:service_or_admin",
"identity:validate_token_head": "rule:service_or_admin",

View File

@ -718,12 +718,10 @@ class UserV3(controller.V3Controller):
domain_scope=self._get_domain_id_for_request(context))
return UserV3.wrap_member(context, ref)
@controller.protected()
def update_user(self, context, user_id, user):
def _update_user(self, context, user_id, user, domain_scope):
self._require_matching_id(user_id, user)
ref = self.identity_api.update_user(
user_id, user,
domain_scope=self._get_domain_id_for_request(context))
user_id, user, domain_scope=domain_scope)
if user.get('password') or not user.get('enabled', True):
# revoke all tokens owned by this user
@ -731,6 +729,11 @@ class UserV3(controller.V3Controller):
return UserV3.wrap_member(context, ref)
@controller.protected()
def update_user(self, context, user_id, user):
domain_scope = self._get_domain_id_for_request(context)
return self._update_user(context, user_id, user, domain_scope)
@controller.protected(callback=_check_user_and_group_protection)
def add_user_to_group(self, context, user_id, group_id):
self.identity_api.add_user_to_group(
@ -772,6 +775,29 @@ class UserV3(controller.V3Controller):
def delete_user(self, context, user_id):
return self._delete_user(context, user_id)
@controller.protected()
def change_password(self, context, user_id, user):
original_password = user.get('original_password')
if original_password is None:
raise exception.ValidationError(target='user',
attribute='original_password')
password = user.get('password')
if password is None:
raise exception.ValidationError(target='user',
attribute='password')
domain_scope = self._get_domain_id_for_request(context)
try:
self.identity_api.authenticate(user_id=user_id,
password=original_password,
domain_scope=domain_scope)
except AssertionError:
raise exception.Unauthorized()
update_dict = {'password': password}
self._update_user(context, user_id, update_dict, domain_scope)
class GroupV3(controller.V3Controller):
collection_name = 'groups'

View File

@ -79,6 +79,11 @@ def append_v3_routers(mapper, routers):
routers.append(
router.Router(user_controller,
'users', 'user'))
mapper.connect('/users/{user_id}/password',
controller=user_controller,
action='change_password',
conditions=dict(method=['POST']))
mapper.connect('/groups/{group_id}/users',
controller=user_controller,
action='list_users_in_group',

View File

@ -16,6 +16,8 @@
import uuid
from testtools import matchers
from keystone.common import controller
from keystone import exception
from keystone import tests
@ -1688,3 +1690,72 @@ class TestV3toV2Methods(tests.TestCase):
updated_ref = controller.V3Controller.filter_domain_id(ref)
self.assertIs(ref, updated_ref)
self.assertDictEqual(ref, expected_ref)
class UserChangingPasswordsTestCase(test_v3.RestfulTestCase):
def setUp(self):
super(UserChangingPasswordsTestCase, self).setUp()
self.user_ref = self.new_user_ref(domain_id=self.domain['id'])
self.identity_api.create_user(self.user_ref['id'], self.user_ref)
self.token = self.get_request_token(self.user_ref['password'], 201)
def get_request_token(self, password, expected_status):
auth_data = self.build_authentication_request(
user_id=self.user_ref['id'],
password=password)
r = self.post('/auth/tokens',
body=auth_data,
expected_status=expected_status)
return r.headers.get('X-Subject-Token')
def change_password(self, expected_status, **kwargs):
"""Returns a test response for a change password request."""
return self.post('/users/%s/password' % self.user_ref['id'],
body={'user': kwargs},
token=self.token,
expected_status=expected_status)
def test_changing_password(self):
# original password works
self.get_request_token(self.user_ref['password'],
expected_status=201)
# change password
new_password = uuid.uuid4().hex
self.change_password(password=new_password,
original_password=self.user_ref['password'],
expected_status=204)
# old password fails
self.get_request_token(self.user_ref['password'], expected_status=401)
# new password works
self.get_request_token(new_password, expected_status=201)
def test_changing_password_with_missing_original_password_fails(self):
r = self.change_password(password=uuid.uuid4().hex,
expected_status=400)
self.assertThat(r.result['error']['message'],
matchers.Contains('original_password'))
def test_changing_password_with_missing_password_fails(self):
r = self.change_password(original_password=self.user_ref['password'],
expected_status=400)
self.assertThat(r.result['error']['message'],
matchers.Contains('password'))
def test_changing_password_with_incorrect_password_fails(self):
self.change_password(password=uuid.uuid4().hex,
original_password=uuid.uuid4().hex,
expected_status=401)
def test_changing_password_with_disabled_user_fails(self):
# disable the user account
self.user_ref['enabled'] = False
self.patch('/users/%s' % self.user_ref['id'],
body={'user': self.user_ref})
self.change_password(password=uuid.uuid4().hex,
original_password=self.user_ref['password'],
expected_status=401)