754 lines
31 KiB
Python
754 lines
31 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import uuid
|
|
|
|
import fixtures
|
|
import mock
|
|
from oslo_config import cfg
|
|
from six.moves import http_client
|
|
from testtools import matchers
|
|
|
|
from keystone.common import controller
|
|
from keystone import exception
|
|
from keystone.tests import unit
|
|
from keystone.tests.unit import test_v3
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
# NOTE(morganfainberg): To be removed when admin_token_auth middleware is
|
|
# removed. This was moved to it's own testcase so it can setup the
|
|
# admin_token_auth pipeline without impacting other tests.
|
|
class IdentityTestCaseStaticAdminToken(test_v3.RestfulTestCase):
|
|
EXTENSION_TO_ADD = 'admin_token_auth'
|
|
|
|
def config_overrides(self):
|
|
super(IdentityTestCaseStaticAdminToken, self).config_overrides()
|
|
self.config_fixture.config(
|
|
admin_token='ADMIN')
|
|
|
|
def test_list_users_with_static_admin_token_and_multiple_backends(self):
|
|
# domain-specific operations with the bootstrap ADMIN token is
|
|
# disallowed when domain-specific drivers are enabled
|
|
self.config_fixture.config(group='identity',
|
|
domain_specific_drivers_enabled=True)
|
|
self.get('/users', token=CONF.admin_token,
|
|
expected_status=exception.Unauthorized.code)
|
|
|
|
def test_create_user_with_admin_token_and_no_domain(self):
|
|
"""Call ``POST /users`` with admin token but no domain id.
|
|
|
|
It should not be possible to use the admin token to create a user
|
|
while not explicitly passing the domain in the request body.
|
|
|
|
"""
|
|
# Passing a valid domain id to new_user_ref() since domain_id is
|
|
# not an optional parameter.
|
|
ref = unit.new_user_ref(domain_id=self.domain_id)
|
|
# Delete the domain id before sending the request.
|
|
del ref['domain_id']
|
|
self.post('/users', body={'user': ref}, token=CONF.admin_token,
|
|
expected_status=http_client.BAD_REQUEST)
|
|
|
|
|
|
class IdentityTestCase(test_v3.RestfulTestCase):
|
|
"""Test users and groups."""
|
|
|
|
def setUp(self):
|
|
super(IdentityTestCase, self).setUp()
|
|
|
|
self.group = unit.new_group_ref(domain_id=self.domain_id)
|
|
self.group = self.identity_api.create_group(self.group)
|
|
self.group_id = self.group['id']
|
|
|
|
self.credential = unit.new_credential_ref(
|
|
user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
|
|
self.credential_api.create_credential(self.credential['id'],
|
|
self.credential)
|
|
|
|
# user crud tests
|
|
|
|
def test_create_user(self):
|
|
"""Call ``POST /users``."""
|
|
ref = unit.new_user_ref(domain_id=self.domain_id)
|
|
r = self.post(
|
|
'/users',
|
|
body={'user': ref})
|
|
return self.assertValidUserResponse(r, ref)
|
|
|
|
def test_create_user_without_domain(self):
|
|
"""Call ``POST /users`` without specifying domain.
|
|
|
|
According to the identity-api specification, if you do not
|
|
explicitly specific the domain_id in the entity, it should
|
|
take the domain scope of the token as the domain_id.
|
|
|
|
"""
|
|
# Create a user with a role on the domain so we can get a
|
|
# domain scoped token
|
|
domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain['id'], domain)
|
|
user = unit.create_user(self.identity_api, domain_id=domain['id'])
|
|
self.assignment_api.create_grant(
|
|
role_id=self.role_id, user_id=user['id'],
|
|
domain_id=domain['id'])
|
|
|
|
ref = unit.new_user_ref(domain_id=domain['id'])
|
|
ref_nd = ref.copy()
|
|
ref_nd.pop('domain_id')
|
|
auth = self.build_authentication_request(
|
|
user_id=user['id'],
|
|
password=user['password'],
|
|
domain_id=domain['id'])
|
|
r = self.post('/users', body={'user': ref_nd}, auth=auth)
|
|
self.assertValidUserResponse(r, ref)
|
|
|
|
# Now try the same thing without a domain token - which should fail
|
|
ref = unit.new_user_ref(domain_id=domain['id'])
|
|
ref_nd = ref.copy()
|
|
ref_nd.pop('domain_id')
|
|
auth = self.build_authentication_request(
|
|
user_id=self.user['id'],
|
|
password=self.user['password'],
|
|
project_id=self.project['id'])
|
|
|
|
# TODO(henry-nash): Due to bug #1283539 we currently automatically
|
|
# use the default domain_id if a domain scoped token is not being
|
|
# used. For now we just check that a deprecation warning has been
|
|
# issued. Change the code below to expect a failure once this bug is
|
|
# fixed.
|
|
with mock.patch(
|
|
'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
|
|
r = self.post('/users', body={'user': ref_nd}, auth=auth)
|
|
self.assertTrue(mock_dep.called)
|
|
|
|
ref['domain_id'] = CONF.identity.default_domain_id
|
|
return self.assertValidUserResponse(r, ref)
|
|
|
|
def test_create_user_with_admin_token_and_domain(self):
|
|
"""Call ``POST /users`` with admin token and domain id."""
|
|
ref = unit.new_user_ref(domain_id=self.domain_id)
|
|
self.post('/users', body={'user': ref}, token=self.get_admin_token(),
|
|
expected_status=http_client.CREATED)
|
|
|
|
def test_user_management_normalized_keys(self):
|
|
"""Illustrate the inconsistent handling of hyphens in keys.
|
|
|
|
To quote Morgan in bug 1526244:
|
|
|
|
the reason this is converted from "domain-id" to "domain_id" is
|
|
because of how we process/normalize data. The way we have to handle
|
|
specific data types for known columns requires avoiding "-" in the
|
|
actual python code since "-" is not valid for attributes in python
|
|
w/o significant use of "getattr" etc.
|
|
|
|
In short, historically we handle some things in conversions. The
|
|
use of "extras" has long been a poor design choice that leads to
|
|
odd/strange inconsistent behaviors because of other choices made in
|
|
handling data from within the body. (In many cases we convert from
|
|
"-" to "_" throughout openstack)
|
|
|
|
Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9
|
|
|
|
"""
|
|
# Create two domains to work with.
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
domain2 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain2['id'], domain2)
|
|
|
|
# We can successfully create a normal user without any surprises.
|
|
user = unit.new_user_ref(domain_id=domain1['id'])
|
|
r = self.post(
|
|
'/users',
|
|
body={'user': user})
|
|
self.assertValidUserResponse(r, user)
|
|
user['id'] = r.json['user']['id']
|
|
|
|
# Query strings are not normalized: so we get all users back (like
|
|
# self.user), not just the ones in the specified domain.
|
|
r = self.get(
|
|
'/users?domain-id=%s' % domain1['id'])
|
|
self.assertValidUserListResponse(r, ref=self.user)
|
|
self.assertNotEqual(domain1['id'], self.user['domain_id'])
|
|
|
|
# When creating a new user, if we move the 'domain_id' into the
|
|
# 'domain-id' attribute, the server will normalize the request
|
|
# attribute, and effectively "move it back" for us.
|
|
user = unit.new_user_ref(domain_id=domain1['id'])
|
|
user['domain-id'] = user.pop('domain_id')
|
|
r = self.post(
|
|
'/users',
|
|
body={'user': user})
|
|
self.assertNotIn('domain-id', r.json['user'])
|
|
self.assertEqual(domain1['id'], r.json['user']['domain_id'])
|
|
# (move this attribute back so we can use assertValidUserResponse)
|
|
user['domain_id'] = user.pop('domain-id')
|
|
self.assertValidUserResponse(r, user)
|
|
user['id'] = r.json['user']['id']
|
|
|
|
# If we try updating the user's 'domain_id' by specifying a
|
|
# 'domain-id', then it'll be stored into extras rather than normalized,
|
|
# and the user's actual 'domain_id' is not affected.
|
|
r = self.patch(
|
|
'/users/%s' % user['id'],
|
|
body={'user': {'domain-id': domain2['id']}})
|
|
self.assertEqual(domain2['id'], r.json['user']['domain-id'])
|
|
self.assertEqual(user['domain_id'], r.json['user']['domain_id'])
|
|
self.assertNotEqual(domain2['id'], user['domain_id'])
|
|
self.assertValidUserResponse(r, user)
|
|
|
|
def test_create_user_bad_request(self):
|
|
"""Call ``POST /users``."""
|
|
self.post('/users', body={'user': {}},
|
|
expected_status=http_client.BAD_REQUEST)
|
|
|
|
def test_list_users(self):
|
|
"""Call ``GET /users``."""
|
|
resource_url = '/users'
|
|
r = self.get(resource_url)
|
|
self.assertValidUserListResponse(r, ref=self.user,
|
|
resource_url=resource_url)
|
|
|
|
def test_list_users_with_multiple_backends(self):
|
|
"""Call ``GET /users`` when multiple backends is enabled.
|
|
|
|
In this scenario, the controller requires a domain to be specified
|
|
either as a filter or by using a domain scoped token.
|
|
|
|
"""
|
|
self.config_fixture.config(group='identity',
|
|
domain_specific_drivers_enabled=True)
|
|
|
|
# Create a new domain with a new project and user
|
|
domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain['id'], domain)
|
|
|
|
project = unit.new_project_ref(domain_id=domain['id'])
|
|
self.resource_api.create_project(project['id'], project)
|
|
|
|
user = unit.create_user(self.identity_api, domain_id=domain['id'])
|
|
|
|
# Create both project and domain role grants for the user so we
|
|
# can get both project and domain scoped tokens
|
|
self.assignment_api.create_grant(
|
|
role_id=self.role_id, user_id=user['id'],
|
|
domain_id=domain['id'])
|
|
self.assignment_api.create_grant(
|
|
role_id=self.role_id, user_id=user['id'],
|
|
project_id=project['id'])
|
|
|
|
dom_auth = self.build_authentication_request(
|
|
user_id=user['id'],
|
|
password=user['password'],
|
|
domain_id=domain['id'])
|
|
project_auth = self.build_authentication_request(
|
|
user_id=user['id'],
|
|
password=user['password'],
|
|
project_id=project['id'])
|
|
|
|
# First try using a domain scoped token
|
|
resource_url = '/users'
|
|
r = self.get(resource_url, auth=dom_auth)
|
|
self.assertValidUserListResponse(r, ref=user,
|
|
resource_url=resource_url)
|
|
|
|
# Now try using a project scoped token
|
|
resource_url = '/users'
|
|
r = self.get(resource_url, auth=project_auth)
|
|
self.assertValidUserListResponse(r, ref=user,
|
|
resource_url=resource_url)
|
|
|
|
# Now try with an explicit filter
|
|
resource_url = ('/users?domain_id=%(domain_id)s' %
|
|
{'domain_id': domain['id']})
|
|
r = self.get(resource_url)
|
|
self.assertValidUserListResponse(r, ref=user,
|
|
resource_url=resource_url)
|
|
|
|
def test_list_users_no_default_project(self):
|
|
"""Call ``GET /users`` making sure no default_project_id."""
|
|
user = unit.new_user_ref(self.domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
resource_url = '/users'
|
|
r = self.get(resource_url)
|
|
self.assertValidUserListResponse(r, ref=user,
|
|
resource_url=resource_url)
|
|
|
|
def test_get_user(self):
|
|
"""Call ``GET /users/{user_id}``."""
|
|
r = self.get('/users/%(user_id)s' % {
|
|
'user_id': self.user['id']})
|
|
self.assertValidUserResponse(r, self.user)
|
|
|
|
def test_get_user_with_default_project(self):
|
|
"""Call ``GET /users/{user_id}`` making sure of default_project_id."""
|
|
user = unit.new_user_ref(domain_id=self.domain_id,
|
|
project_id=self.project_id)
|
|
user = self.identity_api.create_user(user)
|
|
r = self.get('/users/%(user_id)s' % {'user_id': user['id']})
|
|
self.assertValidUserResponse(r, user)
|
|
|
|
def test_add_user_to_group(self):
|
|
"""Call ``PUT /groups/{group_id}/users/{user_id}``."""
|
|
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
|
|
def test_list_groups_for_user(self):
|
|
"""Call ``GET /users/{user_id}/groups``."""
|
|
user1 = unit.create_user(self.identity_api,
|
|
domain_id=self.domain['id'])
|
|
user2 = unit.create_user(self.identity_api,
|
|
domain_id=self.domain['id'])
|
|
|
|
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': user1['id']})
|
|
|
|
# Scenarios below are written to test the default policy configuration
|
|
|
|
# One should be allowed to list one's own groups
|
|
auth = self.build_authentication_request(
|
|
user_id=user1['id'],
|
|
password=user1['password'])
|
|
resource_url = ('/users/%(user_id)s/groups' %
|
|
{'user_id': user1['id']})
|
|
r = self.get(resource_url, auth=auth)
|
|
self.assertValidGroupListResponse(r, ref=self.group,
|
|
resource_url=resource_url)
|
|
|
|
# Administrator is allowed to list others' groups
|
|
resource_url = ('/users/%(user_id)s/groups' %
|
|
{'user_id': user1['id']})
|
|
r = self.get(resource_url)
|
|
self.assertValidGroupListResponse(r, ref=self.group,
|
|
resource_url=resource_url)
|
|
|
|
# Ordinary users should not be allowed to list other's groups
|
|
auth = self.build_authentication_request(
|
|
user_id=user2['id'],
|
|
password=user2['password'])
|
|
r = self.get('/users/%(user_id)s/groups' % {
|
|
'user_id': user1['id']}, auth=auth,
|
|
expected_status=exception.ForbiddenAction.code)
|
|
|
|
def test_check_user_in_group(self):
|
|
"""Call ``HEAD /groups/{group_id}/users/{user_id}``."""
|
|
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
self.head('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
|
|
def test_list_users_in_group(self):
|
|
"""Call ``GET /groups/{group_id}/users``."""
|
|
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
resource_url = ('/groups/%(group_id)s/users' %
|
|
{'group_id': self.group_id})
|
|
r = self.get(resource_url)
|
|
self.assertValidUserListResponse(r, ref=self.user,
|
|
resource_url=resource_url)
|
|
self.assertIn('/groups/%(group_id)s/users' % {
|
|
'group_id': self.group_id}, r.result['links']['self'])
|
|
|
|
def test_remove_user_from_group(self):
|
|
"""Call ``DELETE /groups/{group_id}/users/{user_id}``."""
|
|
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
|
|
'group_id': self.group_id, 'user_id': self.user['id']})
|
|
|
|
def test_update_user(self):
|
|
"""Call ``PATCH /users/{user_id}``."""
|
|
user = unit.new_user_ref(domain_id=self.domain_id)
|
|
del user['id']
|
|
r = self.patch('/users/%(user_id)s' % {
|
|
'user_id': self.user['id']},
|
|
body={'user': user})
|
|
self.assertValidUserResponse(r, user)
|
|
|
|
def test_admin_password_reset(self):
|
|
# bootstrap a user as admin
|
|
user_ref = unit.create_user(self.identity_api,
|
|
domain_id=self.domain['id'])
|
|
|
|
# auth as user should work before a password change
|
|
old_password_auth = self.build_authentication_request(
|
|
user_id=user_ref['id'],
|
|
password=user_ref['password'])
|
|
r = self.v3_create_token(old_password_auth)
|
|
old_token = r.headers.get('X-Subject-Token')
|
|
|
|
# auth as user with a token should work before a password change
|
|
old_token_auth = self.build_authentication_request(token=old_token)
|
|
self.v3_create_token(old_token_auth)
|
|
|
|
# administrative password reset
|
|
new_password = uuid.uuid4().hex
|
|
self.patch('/users/%s' % user_ref['id'],
|
|
body={'user': {'password': new_password}})
|
|
|
|
# auth as user with original password should not work after change
|
|
self.v3_create_token(old_password_auth,
|
|
expected_status=http_client.UNAUTHORIZED)
|
|
|
|
# auth as user with an old token should not work after change
|
|
self.v3_create_token(old_token_auth,
|
|
expected_status=http_client.NOT_FOUND)
|
|
|
|
# new password should work
|
|
new_password_auth = self.build_authentication_request(
|
|
user_id=user_ref['id'],
|
|
password=new_password)
|
|
self.v3_create_token(new_password_auth)
|
|
|
|
def test_update_user_domain_id(self):
|
|
"""Call ``PATCH /users/{user_id}`` with domain_id."""
|
|
user = unit.new_user_ref(domain_id=self.domain['id'])
|
|
user = self.identity_api.create_user(user)
|
|
user['domain_id'] = CONF.identity.default_domain_id
|
|
r = self.patch('/users/%(user_id)s' % {
|
|
'user_id': user['id']},
|
|
body={'user': user},
|
|
expected_status=exception.ValidationError.code)
|
|
self.config_fixture.config(domain_id_immutable=False)
|
|
user['domain_id'] = self.domain['id']
|
|
r = self.patch('/users/%(user_id)s' % {
|
|
'user_id': user['id']},
|
|
body={'user': user})
|
|
self.assertValidUserResponse(r, user)
|
|
|
|
def test_delete_user(self):
|
|
"""Call ``DELETE /users/{user_id}``.
|
|
|
|
As well as making sure the delete succeeds, we ensure
|
|
that any credentials that reference this user are
|
|
also deleted, while other credentials are unaffected.
|
|
In addition, no tokens should remain valid for this user.
|
|
|
|
"""
|
|
# First check the credential for this user is present
|
|
r = self.credential_api.get_credential(self.credential['id'])
|
|
self.assertDictEqual(self.credential, r)
|
|
# Create a second credential with a different user
|
|
|
|
user2 = unit.new_user_ref(domain_id=self.domain['id'],
|
|
project_id=self.project['id'])
|
|
user2 = self.identity_api.create_user(user2)
|
|
credential2 = unit.new_credential_ref(user_id=user2['id'],
|
|
project_id=self.project['id'])
|
|
self.credential_api.create_credential(credential2['id'], credential2)
|
|
|
|
# Create a token for this user which we can check later
|
|
# gets deleted
|
|
auth_data = self.build_authentication_request(
|
|
user_id=self.user['id'],
|
|
password=self.user['password'],
|
|
project_id=self.project['id'])
|
|
token = self.get_requested_token(auth_data)
|
|
# Confirm token is valid for now
|
|
self.head('/auth/tokens',
|
|
headers={'X-Subject-Token': token},
|
|
expected_status=http_client.OK)
|
|
|
|
# Now delete the user
|
|
self.delete('/users/%(user_id)s' % {
|
|
'user_id': self.user['id']})
|
|
|
|
# Deleting the user should have deleted any credentials
|
|
# that reference this project
|
|
self.assertRaises(exception.CredentialNotFound,
|
|
self.credential_api.get_credential,
|
|
self.credential['id'])
|
|
# And the no tokens we remain valid
|
|
tokens = self.token_provider_api._persistence._list_tokens(
|
|
self.user['id'])
|
|
self.assertEqual(0, len(tokens))
|
|
# But the credential for user2 is unaffected
|
|
r = self.credential_api.get_credential(credential2['id'])
|
|
self.assertDictEqual(credential2, r)
|
|
|
|
# group crud tests
|
|
|
|
def test_create_group(self):
|
|
"""Call ``POST /groups``."""
|
|
# Create a new group to avoid a duplicate check failure
|
|
ref = unit.new_group_ref(domain_id=self.domain_id)
|
|
r = self.post(
|
|
'/groups',
|
|
body={'group': ref})
|
|
return self.assertValidGroupResponse(r, ref)
|
|
|
|
def test_create_group_bad_request(self):
|
|
"""Call ``POST /groups``."""
|
|
self.post('/groups', body={'group': {}},
|
|
expected_status=http_client.BAD_REQUEST)
|
|
|
|
def test_list_groups(self):
|
|
"""Call ``GET /groups``."""
|
|
resource_url = '/groups'
|
|
r = self.get(resource_url)
|
|
self.assertValidGroupListResponse(r, ref=self.group,
|
|
resource_url=resource_url)
|
|
|
|
def test_get_group(self):
|
|
"""Call ``GET /groups/{group_id}``."""
|
|
r = self.get('/groups/%(group_id)s' % {
|
|
'group_id': self.group_id})
|
|
self.assertValidGroupResponse(r, self.group)
|
|
|
|
def test_update_group(self):
|
|
"""Call ``PATCH /groups/{group_id}``."""
|
|
group = unit.new_group_ref(domain_id=self.domain_id)
|
|
del group['id']
|
|
r = self.patch('/groups/%(group_id)s' % {
|
|
'group_id': self.group_id},
|
|
body={'group': group})
|
|
self.assertValidGroupResponse(r, group)
|
|
|
|
def test_update_group_domain_id(self):
|
|
"""Call ``PATCH /groups/{group_id}`` with domain_id."""
|
|
self.group['domain_id'] = CONF.identity.default_domain_id
|
|
r = self.patch('/groups/%(group_id)s' % {
|
|
'group_id': self.group['id']},
|
|
body={'group': self.group},
|
|
expected_status=exception.ValidationError.code)
|
|
self.config_fixture.config(domain_id_immutable=False)
|
|
self.group['domain_id'] = self.domain['id']
|
|
r = self.patch('/groups/%(group_id)s' % {
|
|
'group_id': self.group['id']},
|
|
body={'group': self.group})
|
|
self.assertValidGroupResponse(r, self.group)
|
|
|
|
def test_delete_group(self):
|
|
"""Call ``DELETE /groups/{group_id}``."""
|
|
self.delete('/groups/%(group_id)s' % {
|
|
'group_id': self.group_id})
|
|
|
|
def test_create_user_password_not_logged(self):
|
|
# When a user is created, the password isn't logged at any level.
|
|
|
|
log_fix = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
|
|
|
ref = unit.new_user_ref(domain_id=self.domain_id)
|
|
self.post(
|
|
'/users',
|
|
body={'user': ref})
|
|
|
|
self.assertNotIn(ref['password'], log_fix.output)
|
|
|
|
def test_update_password_not_logged(self):
|
|
# When admin modifies user password, the password isn't logged at any
|
|
# level.
|
|
|
|
log_fix = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
|
|
|
# bootstrap a user as admin
|
|
user_ref = unit.create_user(self.identity_api,
|
|
domain_id=self.domain['id'])
|
|
|
|
self.assertNotIn(user_ref['password'], log_fix.output)
|
|
|
|
# administrative password reset
|
|
new_password = uuid.uuid4().hex
|
|
self.patch('/users/%s' % user_ref['id'],
|
|
body={'user': {'password': new_password}})
|
|
|
|
self.assertNotIn(new_password, log_fix.output)
|
|
|
|
|
|
class IdentityV3toV2MethodsTestCase(unit.TestCase):
|
|
"""Test users V3 to V2 conversion methods."""
|
|
|
|
def new_user_ref(self, **kwargs):
|
|
"""Construct a bare bones user ref.
|
|
|
|
Omits all optional components.
|
|
"""
|
|
ref = unit.new_user_ref(**kwargs)
|
|
# description is already omitted
|
|
del ref['email']
|
|
del ref['enabled']
|
|
del ref['password']
|
|
return ref
|
|
|
|
def setUp(self):
|
|
super(IdentityV3toV2MethodsTestCase, self).setUp()
|
|
self.load_backends()
|
|
user_id = uuid.uuid4().hex
|
|
project_id = uuid.uuid4().hex
|
|
|
|
# User with only default_project_id in ref
|
|
self.user1 = self.new_user_ref(
|
|
id=user_id,
|
|
name=user_id,
|
|
project_id=project_id,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
# User without default_project_id or tenantId in ref
|
|
self.user2 = self.new_user_ref(
|
|
id=user_id,
|
|
name=user_id,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
# User with both tenantId and default_project_id in ref
|
|
self.user3 = self.new_user_ref(
|
|
id=user_id,
|
|
name=user_id,
|
|
project_id=project_id,
|
|
tenantId=project_id,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
# User with only tenantId in ref
|
|
self.user4 = self.new_user_ref(
|
|
id=user_id,
|
|
name=user_id,
|
|
tenantId=project_id,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
|
|
# Expected result if the user is meant to have a tenantId element
|
|
self.expected_user = {'id': user_id,
|
|
'name': user_id,
|
|
'username': user_id,
|
|
'tenantId': project_id}
|
|
|
|
# Expected result if the user is not meant to have a tenantId element
|
|
self.expected_user_no_tenant_id = {'id': user_id,
|
|
'name': user_id,
|
|
'username': user_id}
|
|
|
|
def test_v3_to_v2_user_method(self):
|
|
|
|
updated_user1 = controller.V2Controller.v3_to_v2_user(self.user1)
|
|
self.assertIs(self.user1, updated_user1)
|
|
self.assertDictEqual(self.expected_user, self.user1)
|
|
updated_user2 = controller.V2Controller.v3_to_v2_user(self.user2)
|
|
self.assertIs(self.user2, updated_user2)
|
|
self.assertDictEqual(self.expected_user_no_tenant_id, self.user2)
|
|
updated_user3 = controller.V2Controller.v3_to_v2_user(self.user3)
|
|
self.assertIs(self.user3, updated_user3)
|
|
self.assertDictEqual(self.expected_user, self.user3)
|
|
updated_user4 = controller.V2Controller.v3_to_v2_user(self.user4)
|
|
self.assertIs(self.user4, updated_user4)
|
|
self.assertDictEqual(self.expected_user_no_tenant_id, self.user4)
|
|
|
|
def test_v3_to_v2_user_method_list(self):
|
|
user_list = [self.user1, self.user2, self.user3, self.user4]
|
|
updated_list = controller.V2Controller.v3_to_v2_user(user_list)
|
|
|
|
self.assertEqual(len(user_list), len(updated_list))
|
|
|
|
for i, ref in enumerate(updated_list):
|
|
# Order should not change.
|
|
self.assertIs(ref, user_list[i])
|
|
|
|
self.assertDictEqual(self.expected_user, self.user1)
|
|
self.assertDictEqual(self.expected_user_no_tenant_id, self.user2)
|
|
self.assertDictEqual(self.expected_user, self.user3)
|
|
self.assertDictEqual(self.expected_user_no_tenant_id, self.user4)
|
|
|
|
|
|
class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
|
|
|
|
def setUp(self):
|
|
super(UserSelfServiceChangingPasswordsTestCase, self).setUp()
|
|
self.user_ref = unit.create_user(self.identity_api,
|
|
domain_id=self.domain['id'])
|
|
self.token = self.get_request_token(self.user_ref['password'],
|
|
http_client.CREATED)
|
|
|
|
def get_request_token(self, password, expected_status):
|
|
auth_data = self.build_authentication_request(
|
|
user_id=self.user_ref['id'],
|
|
password=password)
|
|
r = self.v3_create_token(auth_data,
|
|
expected_status=expected_status)
|
|
return r.headers.get('X-Subject-Token')
|
|
|
|
def change_password(self, expected_status, **kwargs):
|
|
"""Returns a test response for a change password request."""
|
|
return self.post('/users/%s/password' % self.user_ref['id'],
|
|
body={'user': kwargs},
|
|
token=self.token,
|
|
expected_status=expected_status)
|
|
|
|
def test_changing_password(self):
|
|
# original password works
|
|
token_id = self.get_request_token(self.user_ref['password'],
|
|
expected_status=http_client.CREATED)
|
|
# original token works
|
|
old_token_auth = self.build_authentication_request(token=token_id)
|
|
self.v3_create_token(old_token_auth)
|
|
|
|
# change password
|
|
new_password = uuid.uuid4().hex
|
|
self.change_password(password=new_password,
|
|
original_password=self.user_ref['password'],
|
|
expected_status=http_client.NO_CONTENT)
|
|
|
|
# old password fails
|
|
self.get_request_token(self.user_ref['password'],
|
|
expected_status=http_client.UNAUTHORIZED)
|
|
|
|
# old token fails
|
|
self.v3_create_token(old_token_auth,
|
|
expected_status=http_client.NOT_FOUND)
|
|
|
|
# new password works
|
|
self.get_request_token(new_password,
|
|
expected_status=http_client.CREATED)
|
|
|
|
def test_changing_password_with_missing_original_password_fails(self):
|
|
r = self.change_password(password=uuid.uuid4().hex,
|
|
expected_status=http_client.BAD_REQUEST)
|
|
self.assertThat(r.result['error']['message'],
|
|
matchers.Contains('original_password'))
|
|
|
|
def test_changing_password_with_missing_password_fails(self):
|
|
r = self.change_password(original_password=self.user_ref['password'],
|
|
expected_status=http_client.BAD_REQUEST)
|
|
self.assertThat(r.result['error']['message'],
|
|
matchers.Contains('password'))
|
|
|
|
def test_changing_password_with_incorrect_password_fails(self):
|
|
self.change_password(password=uuid.uuid4().hex,
|
|
original_password=uuid.uuid4().hex,
|
|
expected_status=http_client.UNAUTHORIZED)
|
|
|
|
def test_changing_password_with_disabled_user_fails(self):
|
|
# disable the user account
|
|
self.user_ref['enabled'] = False
|
|
self.patch('/users/%s' % self.user_ref['id'],
|
|
body={'user': self.user_ref})
|
|
|
|
self.change_password(password=uuid.uuid4().hex,
|
|
original_password=self.user_ref['password'],
|
|
expected_status=http_client.UNAUTHORIZED)
|
|
|
|
def test_changing_password_not_logged(self):
|
|
# When a user changes their password, the password isn't logged at any
|
|
# level.
|
|
|
|
log_fix = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
|
|
|
# change password
|
|
new_password = uuid.uuid4().hex
|
|
self.change_password(password=new_password,
|
|
original_password=self.user_ref['password'],
|
|
expected_status=http_client.NO_CONTENT)
|
|
|
|
self.assertNotIn(self.user_ref['password'], log_fix.output)
|
|
self.assertNotIn(new_password, log_fix.output)
|