OpenStack Identity (Keystone)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
keystone/keystone/tests/unit/test_v3_auth.py

5038 lines
211 KiB

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import itertools
import operator
import uuid
from keystoneclient.common import cms
import mock
from oslo_log import versionutils
from oslo_serialization import jsonutils as json
from oslo_utils import fixture
from oslo_utils import timeutils
import six
from six.moves import http_client
from six.moves import range
from testtools import matchers
from testtools import testcase
from keystone import auth
from keystone.auth.plugins import totp
from keystone.common import utils
import keystone.conf
from keystone.contrib.revoke import routers
from keystone import exception
from keystone.policy.backends import rules
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
def setUp(self):
super(TestAuthInfo, self).setUp()
auth.controllers.load_auth_methods()
def test_missing_auth_methods(self):
auth_data = {'identity': {}}
auth_data['identity']['token'] = {'id': uuid.uuid4().hex}
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_unsupported_auth_method(self):
auth_data = {'methods': ['abc']}
auth_data['abc'] = {'test': 'test'}
auth_data = {'identity': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_missing_auth_method_data(self):
auth_data = {'methods': ['password']}
auth_data = {'identity': auth_data}
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_project_name_no_domain(self):
auth_data = self.build_authentication_request(
username='test',
password='test',
project_name='abc')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_both_project_and_domain_in_scope(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_get_method_names_duplicates(self):
auth_data = self.build_authentication_request(
token='test',
user_id='test',
password='test')['auth']
auth_data['identity']['methods'] = ['password', 'token',
'password', 'password']
context = None
auth_info = auth.controllers.AuthInfo.create(context, auth_data)
self.assertEqual(['password', 'token'],
auth_info.get_method_names())
def test_get_method_data_invalid_method(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test')['auth']
context = None
auth_info = auth.controllers.AuthInfo.create(context, auth_data)
method_name = uuid.uuid4().hex
self.assertRaises(exception.ValidationError,
auth_info.get_method_data,
method_name)
def test_empty_domain_in_scope(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test',
domain_name='')['auth']
auth_data['scope']['domain'] = []
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
def test_empty_project_in_scope(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test',
project_name='')['auth']
auth_data['scope']['project'] = []
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
None,
auth_data)
class TokenAPITests(object):
# Why is this not just setUp? Because TokenAPITests is not a test class
# itself. If TokenAPITests became a subclass of the testcase, it would get
# called by the enumerate-tests-in-file code. The way the functions get
# resolved in Python for multiple inheritance means that a setUp in this
# would get skipped by the testrunner.
def doSetUp(self):
r = self.v3_create_token(self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password']))
self.v3_token_data = r.result
self.v3_token = r.headers.get('X-Subject-Token')
self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
def _make_auth_request(self, auth_data):
resp = self.post('/auth/tokens', body=auth_data)
token = resp.headers.get('X-Subject-Token')
return token
def _get_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
return self._make_auth_request(auth_data)
def _get_domain_scoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain_id)
return self._make_auth_request(auth_data)
def _get_project_scoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project_id)
return self._make_auth_request(auth_data)
def _get_trust_scoped_token(self, trustee_user, trust):
auth_data = self.build_authentication_request(
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
return self._make_auth_request(auth_data)
def _create_trust(self, impersonation=False):
# Create a trustee user
trustee_user = unit.create_user(self.identity_api,
domain_id=self.domain_id)
ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=trustee_user['id'],
project_id=self.project_id,
impersonation=impersonation,
role_ids=[self.role_id])
# Create a trust
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
return (trustee_user, trust)
def _validate_token(self, token, expected_status=http_client.OK):
return self.get(
'/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=expected_status)
def _revoke_token(self, token, expected_status=http_client.NO_CONTENT):
return self.delete(
'/auth/tokens',
headers={'x-subject-token': token},
expected_status=expected_status)
def _set_user_enabled(self, user, enabled=True):
user['enabled'] = enabled
self.identity_api.update_user(user['id'], user)
def _create_project_and_set_as_default_project(self):
# create a new project
ref = unit.new_project_ref(domain_id=self.domain_id)
r = self.post('/projects', body={'project': ref})
project = self.assertValidProjectResponse(r, ref)
# grant the user a role on the project
self.put(
'/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'project_id': project['id'],
'role_id': self.role['id']})
# make the new project the user's default project
body = {'user': {'default_project_id': project['id']}}
r = self.patch('/users/%(user_id)s' % {
'user_id': self.user['id']},
body=body)
self.assertValidUserResponse(r)
return project
def assertTimestampEqual(self, expected, value):
# Compare two timestamps but ignore the microseconds part
# of the expected timestamp. Keystone does not track microseconds and
# is working to eliminate microseconds from it's datetimes used.
expected = timeutils.parse_isotime(expected).replace(microsecond=0)
value = timeutils.parse_isotime(value).replace(microsecond=0)
self.assertEqual(
expected,
value,
"%s != %s" % (expected, value))
def test_auth_with_token_as_different_user_fails(self):
# get the token for a user. This is self.user which is different from
# self.default_domain_user.
token = self.get_scoped_token()
# try both password and token methods with different identities and it
# should fail
auth_data = self.build_authentication_request(
token=token,
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_token_for_user_without_password_fails(self):
user = unit.new_user_ref(domain_id=self.domain['id'])
del user['password'] # can't have a password for this test
user = self.identity_api.create_user(user)
auth_data = self.build_authentication_request(
user_id=user['id'],
password='password')
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
token_id = r.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(token=token_id)
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_id(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_validate_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
self._validate_token(unscoped_token)
def test_revoke_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
self._validate_token(unscoped_token)
self._revoke_token(unscoped_token)
self._validate_token(unscoped_token,
expected_status=http_client.NOT_FOUND)
def test_create_explicit_unscoped_token(self):
self._create_project_and_set_as_default_project()
# explicitly ask for an unscoped token
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
unscoped="unscoped")
r = self.post('/auth/tokens', body=auth_data, noauth=True)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_users_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
project = self.create_new_default_project_for_user(
self.user['id'], self.domain_id, enable_project=False)
# assign a role to user for the new project
self.assignment_api.add_role_to_user_and_project(self.user['id'],
project['id'],
self.role_id)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_default_project_domain_result_in_unscoped_token(self):
domain_ref = unit.new_domain_ref()
r = self.post('/domains', body={'domain': domain_ref})
domain = self.assertValidDomainResponse(r, domain_ref)
project = self.create_new_default_project_for_user(
self.user['id'], domain['id'])
# assign a role to user for the new project
self.assignment_api.add_role_to_user_and_project(self.user['id'],
project['id'],
self.role_id)
# now disable the project domain
body = {'domain': {'enabled': False}}
r = self.patch('/domains/%(domain_id)s' % {'domain_id': domain['id']},
body=body)
self.assertValidDomainResponse(r)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_is_invalid_after_disabling_user(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
self._validate_token(unscoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
unscoped_token)
def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
self._validate_token(unscoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
unscoped_token)
# Enable the user
self._set_user_enabled(self.user)
# Ensure validating a token for a re-enabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
unscoped_token)
def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
self._validate_token(unscoped_token)
# Disable the user's domain
self.domain['enabled'] = False
self.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
unscoped_token)
def test_unscoped_token_is_invalid_after_changing_user_password(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
self._validate_token(unscoped_token)
# Change user's password
self.user['password'] = 'Password1'
self.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing user's tokens
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
unscoped_token)
def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_id_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_id(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_name(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_name_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_with_only_domain_name_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_with_group_role(self):
group = unit.new_group_ref(domain_id=self.domain_id)
group = self.identity_api.create_group(group)
# add user to group
self.identity_api.add_user_to_group(self.user['id'], group['id'])
# grant the domain role to group
path = '/domains/%s/groups/%s/roles/%s' % (
self.domain['id'], group['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_fails_if_domain_name_unsafe(self):
"""Verify authenticate to a domain with unsafe name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named domain
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
unsafe_name = 'i am not / safe'
domain = unit.new_domain_ref(name=unsafe_name)
self.resource_api.create_domain(domain['id'], domain)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
domain_id=domain['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=domain['name'])
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
domain_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_domain_token_without_grant_returns_unauthorized(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
# this fails because the user does not have a role on self.domain
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_validate_domain_scoped_token(self):
# Grant user access to domain
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
resp = self._validate_token(domain_scoped_token)
resp_json = json.loads(resp.body)
self.assertIsNotNone(resp_json['token']['catalog'])
self.assertIsNotNone(resp_json['token']['roles'])
self.assertIsNotNone(resp_json['token']['domain'])
def test_domain_scoped_token_is_invalid_after_disabling_user(self):
# Grant user access to domain
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
self._validate_token(domain_scoped_token)
# Disable user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
domain_scoped_token)
def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
# Grant user access to domain
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
self._validate_token(domain_scoped_token)
# Delete access to domain
self.assignment_api.delete_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
domain_scoped_token)
def test_domain_scoped_token_invalid_after_disabling_domain(self):
# Grant user access to domain
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
self._validate_token(domain_scoped_token)
# Disable domain
self.domain['enabled'] = False
self.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled domain fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
domain_scoped_token)
def test_v2_validate_domain_scoped_token_returns_unauthorized(self):
# Test that validating a domain scoped token in v2.0 returns
# unauthorized.
# Grant user access to domain
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
scoped_token = self._get_domain_scoped_token()
self.assertRaises(exception.Unauthorized,
self.token_provider_api.validate_v2_token,
scoped_token)
def test_create_project_scoped_token_with_project_id_and_user_id(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_validate_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
self._validate_token(project_scoped_token)
def test_revoke_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
self._validate_token(project_scoped_token)
self._revoke_token(project_scoped_token)
self._validate_token(project_scoped_token,
expected_status=http_client.NOT_FOUND)
def test_project_scoped_token_is_scoped_to_default_project(self):
project = self._create_project_and_set_as_default_project()
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
# ensure the project id in the token matches the default project id
self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_project_scoped_token_no_catalog_is_scoped_to_default_project(
self):
project = self._create_project_and_set_as_default_project()
# attempt to authenticate without requesting a project or catalog
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
# ensure the project id in the token matches the default project id
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_implicit_project_id_scoped_token_with_user_id_no_catalog(self):
self._create_project_and_set_as_default_project()
# create a project scoped token that isn't scoped to the default
# project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
# ensure the project id in the token matches the one we as for
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
self.assertEqual(self.project['id'],
r.result['token']['project']['id'])
def test_project_scoped_token_catalog_attributes(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
catalog = r.result['token']['catalog']
self.assertEqual(1, len(catalog))
catalog = catalog[0]
self.assertEqual(self.service['id'], catalog['id'])
self.assertEqual(self.service['name'], catalog['name'])
self.assertEqual(self.service['type'], catalog['type'])
endpoint = catalog['endpoints']
self.assertEqual(1, len(endpoint))
endpoint = endpoint[0]
self.assertEqual(self.endpoint['id'], endpoint['id'])
self.assertEqual(self.endpoint['interface'], endpoint['interface'])
self.assertEqual(self.endpoint['region_id'], endpoint['region_id'])
self.assertEqual(self.endpoint['url'], endpoint['url'])
def test_project_scoped_token_catalog_excludes_disabled_endpoint(self):
# Create a disabled endpoint
disabled_endpoint_ref = copy.copy(self.endpoint)
disabled_endpoint_id = uuid.uuid4().hex
disabled_endpoint_ref.update({
'id': disabled_endpoint_id,
'enabled': False,
'interface': 'internal'
})
self.catalog_api.create_endpoint(disabled_endpoint_id,
disabled_endpoint_ref)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
resp = self.v3_create_token(auth_data)
# make sure the disabled endpoint id isn't in the list of endpoints
endpoints = resp.result['token']['catalog'][0]['endpoints']
endpoint_ids = [endpoint['id'] for endpoint in endpoints]
self.assertNotIn(disabled_endpoint_id, endpoint_ids)
def test_project_scoped_token_catalog_excludes_disabled_service(self):
"""On authenticate, get a catalog that excludes disabled services."""
# although the endpoint associated with the service is enabled, the
# service is disabled
self.assertTrue(self.endpoint['enabled'])
self.catalog_api.update_service(
self.endpoint['service_id'], {'enabled': False})
service = self.catalog_api.get_service(self.endpoint['service_id'])
self.assertFalse(service['enabled'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertEqual([], r.result['token']['catalog'])
def test_scope_to_project_without_grant_returns_unauthorized(self):
project = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project['id'], project)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_project_scoped_token_with_username_and_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_create_project_scoped_token_with_username_and_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_create_project_scoped_token_fails_if_project_name_unsafe(self):
"""Verify authenticate to a project with unsafe name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named project
self.config_fixture.config(group='resource',
project_name_url_safe='off')
unsafe_name = 'i am not / safe'
project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
name=unsafe_name)
self.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], role_member['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
"""Verify authenticate to a project using unsafe domain name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named domain
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
unsafe_name = 'i am not / safe'
domain = unit.new_domain_ref(name=unsafe_name)
self.resource_api.create_domain(domain['id'], domain)
# Add a (safely named) project to that domain
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
project_id=project['id'])
# An auth request via project ID, but specifying domain by name
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_name=domain['name'])
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
domain_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_project_token_with_same_domain_and_project_name(self):
"""Authenticate to a project with the same name as its domain."""
domain = unit.new_project_ref(is_domain=True)
domain = self.resource_api.create_project(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'],
name=domain['name'])
self.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], role_member['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_name=domain['name'])
r = self.v3_create_token(auth_data)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_create_project_token_fails_with_project_acting_as_domain(self):
domain = unit.new_project_ref(is_domain=True)
domain = self.resource_api.create_project(domain['id'], domain)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
domain_id=domain['id'])
# authentication will fail because the project name is incorrect
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=domain['name'],
project_domain_name=domain['name'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_project_token_with_disabled_project_domain_fails(self):
# create a disabled domain
domain = unit.new_domain_ref()
domain = self.resource_api.create_domain(domain['id'], domain)
# create a project in the domain
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
# assign some role to self.user for the project in the domain
self.assignment_api.add_role_to_user_and_project(
self.user['id'],
project['id'],
self.role_id)
# Disable the domain
domain['enabled'] = False
self.resource_api.update_domain(domain['id'], domain)
# user should not be able to auth with project_id
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
# user should not be able to auth with project_name & domain
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_id=domain['id'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_project_scoped_token_is_invalid_after_disabling_user(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
self._validate_token(project_scoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
project_scoped_token)
def test_project_scoped_token_invalid_after_changing_user_password(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
self._validate_token(project_scoped_token)
# Update user's password
self.user['password'] = 'Password1'
self.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing tokens
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
project_scoped_token)
def test_project_scoped_token_invalid_after_disabling_project(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
self._validate_token(project_scoped_token)
# Disable project
self.project['enabled'] = False
self.resource_api.update_project(self.project['id'], self.project)
# Ensure validating a token for a disabled project fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
project_scoped_token)
def test_project_scoped_token_is_invalid_after_deleting_grant(self):
# disable caching so that user grant deletion is not hidden
# by token caching
self.config_fixture.config(
group='cache',
enabled=False)
# Grant user access to project
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
project_id=self.project['id'])
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
self._validate_token(project_scoped_token)
# Delete access to project
self.assignment_api.delete_grant(self.role['id'],
user_id=self.user['id'],
project_id=self.project['id'])
# Ensure the token has been revoked
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
project_scoped_token)
def test_no_access_to_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
self.create_new_default_project_for_user(self.user['id'],
self.domain_id)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_rescope_unscoped_token_with_trust(self):
trustee_user, trust = self._create_trust()
self._get_trust_scoped_token(trustee_user, trust)
def test_validate_a_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
def test_validate_a_trust_scoped_token_impersonated(self):
trustee_user, trust = self._create_trust(impersonation=True)
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
def test_revoke_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
self._revoke_token(trust_scoped_token)
self._validate_token(trust_scoped_token,
expected_status=http_client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
# Disable trustee
trustee_update_ref = dict(enabled=False)
self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
trust_scoped_token)
def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
# Change trustee's password
trustee_update_ref = dict(password='Password1')
self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
# Ensure updating trustee's password revokes existing tokens
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
trust_scoped_token)
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
# Disable the trustor
trustor_update_ref = dict(enabled=False)
self.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
trust_scoped_token)
def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
# Change trustor's password
trustor_update_ref = dict(password='Password1')
self.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
trust_scoped_token)
def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
# Disable trustor's domain
self.domain['enabled'] = False
self.resource_api.update_domain(self.domain['id'], self.domain)
trustor_update_ref = dict(password='Password1')
self.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token,
trust_scoped_token)
def test_v2_validate_trust_scoped_token(self):
# Test that validating an trust scoped token in v2.0 returns
# unauthorized.
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
self.assertRaises(exception.Unauthorized,
self.token_provider_api.validate_v2_token,
trust_scoped_token)
def test_default_fixture_scope_token(self):
self.assertIsNotNone(self.get_scoped_token())
def test_v3_v2_intermix_new_default_domain(self):
# If the default_domain_id config option is changed, then should be
# able to validate a v3 token with user in the new domain.
# 1) Create a new domain for the user.
new_domain = unit.new_domain_ref()
self.resource_api.create_domain(new_domain['id'], new_domain)
# 2) Create user in new domain.
new_user = unit.create_user(self.identity_api,
domain_id=new_domain['id'])
# 3) Update the default_domain_id config option to the new domain
self.config_fixture.config(
group='identity',
default_domain_id=new_domain['id'])
# 4) Get a token using v3 API.
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=new_user['id'],
password=new_user['password']))
# 5) Validate token using v2 API.
self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token(),
method='GET')
def test_v3_v2_intermix_domain_scoped_token_failed(self):
# grant the domain role to user
self.put(
path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id']))
# generate a domain-scoped v3 token
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id']))
# domain-scoped tokens are not supported by v2
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token(),
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_non_default_project_succeed(self):
# self.project is in a non-default domain
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.project['id']))
# v2 cannot reference projects outside the default domain
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token())
def test_v3_v2_intermix_non_default_user_succeed(self):
self.assignment_api.create_grant(
self.role['id'],
user_id=self.user['id'],
project_id=self.default_domain_project['id'])
# self.user is in a non-default domain
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.default_domain_project['id']))
# v2 cannot reference projects outside the default domain
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token())
def test_v3_v2_intermix_domain_scope_failed(self):
self.assignment_api.create_grant(
self.role['id'],
user_id=self.default_domain_user['id'],
domain_id=self.domain['id'])
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
domain_id=self.domain['id']))
# v2 cannot reference projects outside the default domain
self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token(),
method='GET',
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_unscoped_token_intermix(self):
r = self.v3_create_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']))
self.assertValidUnscopedTokenResponse(r)
v3_token_data = r.result
v3_token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
r = self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token(),
method='GET')
v2_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
v3_token_data['token']['user']['id'])
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
v3_token_data['token']['expires_at'])
def test_v3_v2_token_intermix(self):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
r = self.v3_create_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id']))
self.assertValidProjectScopedTokenResponse(r)
v3_token_data = r.result
v3_token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
r = self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=self.get_admin_token())
v2_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
v3_token_data['token']['user']['id'])
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
v3_token_data['token']['expires_at'])
self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'],
v3_token_data['token']['roles'][0]['name'])
def test_v2_v3_unscoped_token_intermix(self):
r = self.admin_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'userId': self.default_domain_user['id'],
'password': self.default_domain_user['password']
}
}
})
v2_token_data = r.result
v2_token = v2_token_data['access']['token']['id']
r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token})
self.assertValidUnscopedTokenResponse(r)
v3_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
v3_token_data['token']['user']['id'])
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
v3_token_data['token']['expires_at'])
def test_v2_v3_token_intermix(self):
r = self.admin_request(
path='/v2.0/tokens',
method='POST',
body={
'auth': {
'passwordCredentials': {
'userId': self.default_domain_user['id'],
'password': self.default_domain_user['password']
},
'tenantId': self.default_domain_project['id']
}
})
v2_token_data = r.result
v2_token = v2_token_data['access']['token']['id']
r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token})
self.assertValidProjectScopedTokenResponse(r)
v3_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
v3_token_data['token']['user']['id'])
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
v3_token_data['token']['expires_at'])
self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'],
v3_token_data['token']['roles'][0]['name'])
v2_issued_at = timeutils.parse_isotime(
v2_token_data['access']['token']['issued_at'])
v3_issued_at = timeutils.parse_isotime(
v3_token_data['token']['issued_at'])
self.assertEqual(v2_issued_at, v3_issued_at)
def test_create_v3_project_token_from_v2_project_token(self):
r = self.admin_request(
path='/v2.0/tokens',
method='POST',
body={
'auth': {
'passwordCredentials': {
'userId': self.default_domain_user['id'],
'password': self.default_domain_user['password']
},
'tenantId': self.default_domain_project['id']
}
})
v2_token_data = r.result
v2_token = v2_token_data['access']['token']['id']
auth_data = self.build_authentication_request(
token=v2_token,
project_id=self.default_domain_project['id'])
r = self.v3_create_token(auth_data)
self.assertValidScopedTokenResponse(r)
def test_v2_token_deleted_on_v3(self):
# Create a v2 token.
body = {
'auth': {
'passwordCredentials': {
'userId': self.default_domain_user['id'],
'password': self.default_domain_user['password']
},
'tenantId': self.default_domain_project['id']
}
}
r = self.admin_request(
path='/v2.0/tokens', method='POST', body=body)
v2_token = r.result['access']['token']['id']
# Delete the v2 token using v3.
self.delete(
'/auth/tokens', headers={'X-Subject-Token': v2_token})
# Attempting to use the deleted token on v2 should fail.
self.admin_request(
path='/v2.0/tenants', method='GET', token=v2_token,
expected_status=http_client.UNAUTHORIZED)
def test_rescoping_token(self):
expires = self.v3_token_data['token']['expires_at']
# rescope the token
r = self.v3_create_token(self.build_authentication_request(
token=self.v3_token,
project_id=self.project_id))
self.assertValidProjectScopedTokenResponse(r)
# ensure token expiration stayed the same
self.assertTimestampEqual(expires, r.result['token']['expires_at'])
def test_check_token(self):
self.head('/auth/tokens', headers=self.headers,
expected_status=http_client.OK)
def test_validate_token(self):
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r)
def test_validate_missing_subject_token(self):
self.get('/auth/tokens',
expected_status=http_client.NOT_FOUND)
def test_validate_missing_auth_token(self):
self.admin_request(
method='GET',
path='/v3/projects',
token=None,
expected_status=http_client.UNAUTHORIZED)
def test_validate_token_nocatalog(self):
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
r = self.get(
'/auth/tokens?nocatalog',
headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
def test_is_admin_token_by_ids(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
def test_is_admin_token_by_names(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_domain_name=self.domain['name'],
project_name=self.project['name']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
def test_token_for_non_admin_project_is_not_admin(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=uuid.uuid4().hex)
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=uuid.uuid4().hex,
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
def test_only_admin_project_set_acts_as_non_admin(self):
self.config_fixture.config(
group='resource',
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
def _create_role(self, domain_id=None):
"""Call ``POST /roles``."""
ref = unit.new_role_ref(domain_id=domain_id)
r = self.post('/roles', body={'role': ref})
return self.assertValidRoleResponse(r, ref)
def _create_implied_role(self, prior_id):
implied = self._create_role()
url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
self.put(url, expected_status=http_client.CREATED)
return implied
def _delete_implied_role(self, prior_role_id, implied_role_id):
url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
self.delete(url)
def _get_scoped_token_roles(self, is_domain=False):
if is_domain:
v3_token = self.get_domain_scoped_token()
else:
v3_token = self.get_scoped_token()
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
v3_token_data = r.result
token_roles = v3_token_data['token']['roles']
return token_roles
def _create_implied_role_shows_in_v3_token(self, is_domain):
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(2, len(token_roles))
implied2 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(3, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
def test_create_implied_role_shows_in_v3_project_token(self):
# regardless of the default chosen, this should always
# test with the option set.
self.config_fixture.config(group='token', infer_roles=True)
self._create_implied_role_shows_in_v3_token(False)
def test_create_implied_role_shows_in_v3_domain_token(self):
self.config_fixture.config(group='token', infer_roles=True)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domain['id'])
self._create_implied_role_shows_in_v3_token(True)
def test_group_assigned_implied_role_shows_in_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
is_domain = False
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
new_role = self._create_role()
prior = new_role['id']
new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
new_group = self.identity_api.create_group(new_group_ref)
self.assignment_api.create_grant(prior,
group_id=new_group['id'],
project_id=self.project['id'])
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
self.identity_api.add_user_to_group(self.user['id'],
new_group['id'])
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(2, len(token_roles))
implied1 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(3, len(token_roles))
implied2 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
def test_multiple_implied_roles_show_in_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(prior)
implied3 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
self.assertIn(implied3['id'], token_role_ids)
def test_chained_implied_role_shows_in_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(implied1['id'])
implied3 = self._create_implied_role(implied2['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
self.assertIn(implied3['id'], token_role_ids)
def test_implied_role_disabled_by_config(self):
self.config_fixture.config(group='token', infer_roles=False)
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(implied1['id'])
self._create_implied_role(implied2['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
def test_delete_implied_role_do_not_show_in_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
token_roles = self._get_scoped_token_roles()
prior = token_roles[0]['id']
implied = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
self._delete_implied_role(prior, implied['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
def test_unrelated_implied_roles_do_not_change_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
token_roles = self._get_scoped_token_roles()
prior = token_roles[0]['id']
implied = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
unrelated = self._create_role()
url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
self.put(url, expected_status=http_client.CREATED)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
self._delete_implied_role(unrelated['id'], implied['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
def test_domain_scpecific_roles_do_not_show_v3_token(self):
self.config_fixture.config(group='token', infer_roles=True)
initial_token_roles = self._get_scoped_token_roles()
new_role = self._create_role(domain_id=self.domain_id)
self.assignment_api.create_grant(new_role['id'],
user_id=self.user['id'],
project_id=self.project['id'])
implied = self._create_implied_role(new_role['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
# The implied role from the domain specific role should be in the
# token, but not the domain specific role itself.
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(implied['id'], token_role_ids)
self.assertNotIn(new_role['id'], token_role_ids)
def test_remove_all_roles_from_scope_result_in_404(self):
# create a new user
new_user = unit.create_user(self.identity_api,
domain_id=self.domain['id'])
# give the new user a role on a project
path = '/projects/%s/users/%s/roles/%s' % (
self.project['id'], new_user['id'], self.role['id'])
self.put(path=path)
# authenticate as the new user and get a project-scoped token
auth_data = self.build_authentication_request(
user_id=new_user['id'],
password=new_user['password'],
project_id=self.project['id'])
subject_token_id = self.v3_create_token(auth_data).headers.get(
'X-Subject-Token')
# make sure the project-scoped token is valid
headers = {'X-Subject-Token': subject_token_id}
r = self.get('/auth/tokens', headers=headers)
self.assertValidProjectScopedTokenResponse(r)
# remove the roles from the user for the given scope
path = '/projects/%s/users/%s/roles/%s' % (
self.project['id'], new_user['id'], self.role['id'])
self.delete(path=path)
# token validation should now result in 404
self.get('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
def test_create_token_with_nonexistant_user_id_fails(self):
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_token_with_nonexistant_username_fails(self):
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_id_fails(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_name_fails(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_token_with_wrong_password_fails(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_user_and_group_roles_scoped_token(self):
"""Test correct roles are returned in scoped token.
Test Plan:
- Create a domain, with 1 project, 2 users (user1 and user2)
and 2 groups (group1 and group2)
- Make user1 a member of group1, user2 a member of group2
- Create 8 roles, assigning them to each of the 8 combinations
of users/groups on domain/project
- Get a project scoped token for user1, checking that the right
two roles are returned (one directly assigned, one by virtue
of group membership)
- Repeat this for a domain scoped token
- Make user1 also a member of group2
- Get another scoped token making sure the additional role
shows up
- User2 is just here as a spoiler, to make sure we don't get
any roles uniquely assigned to it returned in any of our
tokens
"""
domainA = unit.new_domain_ref()
self.resource_api.create_domain(domainA['id'], domainA)
projectA = unit.new_project_ref(domain_id=domainA['id'])
self.resource_api.create_project(projectA['id'], projectA)
user1 = unit.create_user(self.identity_api, domain_id=domainA['id'])
user2 = unit.create_user(self.identity_api, domain_id=domainA['id'])
group1 = unit.new_group_ref(domain_id=domainA['id'])
group1 = self.identity_api.create_group(group1)
group2 = unit.new_group_ref(domain_id=domainA['id'])
group2 = self.identity_api.create_group(group2)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user2['id'],
group2['id'])
# Now create all the roles and assign them
role_list = []
for _ in range(8):
role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
self.assignment_api.create_grant(role_list[0]['id'],
user_id=user1['id'],
domain_id=domainA['id'])
self.assignment_api.create_grant(role_list[1]['id'],
user_id=user1['id'],
project_id=projectA['id'])
self.assignment_api.create_grant(role_list[2]['id'],
user_id=user2['id'],
domain_id=domainA['id'])
self.assignment_api.create_grant(role_list[3]['id'],
user_id=user2['id'],
project_id=projectA['id'])
self.assignment_api.create_grant(role_list[4]['id'],
group_id=group1['id'],
domain_id=domainA['id'])
self.assignment_api.create_grant(role_list[5]['id'],
group_id=group1['id'],
project_id=projectA['id'])
self.assignment_api.create_grant(role_list[6]['id'],
group_id=group2['id'],
domain_id=domainA['id'])
self.assignment_api.create_grant(role_list[7]['id'],
group_id=group2['id'],
project_id=projectA['id'])
# First, get a project scoped token - which should
# contain the direct user role and the one by virtue
# of group membership
auth_data = self.build_authentication_request(
user_id=user1['id'],
password=user1['password'],
project_id=projectA['id'])
r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
roles_ids.append(ref['id'])
self.assertEqual(2, len(token['roles']))
self.assertIn(role_list[1]['id'], roles_ids)
self.assertIn(role_list[5]['id'], roles_ids)
# Now the same thing for a domain scoped token
auth_data = self.build_authentication_request(
user_id=user1['id'],
password=user1['password'],
domain_id=domainA['id'])
r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
roles_ids.append(ref['id'])
self.assertEqual(2, len(token['roles']))
self.assertIn(role_list[0]['id'], roles_ids)
self.assertIn(role_list[4]['id'], roles_ids)
# Finally, add user1 to the 2nd group, and get a new
# scoped token - the extra role should now be included
# by virtue of the 2nd group
self.identity_api.add_user_to_group(user1['id'],
group2['id'])
auth_data = self.build_authentication_request(
user_id=user1['id'],
password=user1['password'],
project_id=projectA['id'])
r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
roles_ids.append(ref['id'])
self.assertEqual(3, len(token['roles']))
self.assertIn(role_list[1]['id'], roles_ids)
self.assertIn(role_list[5]['id'], roles_ids)
self.assertIn(role_list[7]['id'], roles_ids)
def test_auth_token_cross_domain_group_and_project(self):
"""Verify getting a token in cross domain group/project roles."""
# create domain, project and group and grant roles to user
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
project1 = unit.new_project_ref(domain_id=domain1['id'])
self.resource_api.create_project(project1['id'], project1)
user_foo = unit.create_user(self.identity_api,
domain_id=test_v3.DEFAULT_DOMAIN_ID)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
role_admin = unit.new_role_ref()
self.role_api.create_role(role_admin['id'], role_admin)
role_foo_domain1 = unit.new_role_ref()
self.role_api.create_role(role_foo_domain1['id'], role_foo_domain1)
role_group_domain1 = unit.new_role_ref()
self.role_api.create_role(role_group_domain1['id'], role_group_domain1)
self.assignment_api.add_user_to_project(project1['id'],
user_foo['id'])
new_group = unit.new_group_ref(domain_id=domain1['id'])
new_group = self.identity_api.create_group(new_group)
self.identity_api.add_user_to_group(user_foo['id'],
new_group['id'])
self.assignment_api.create_grant(
user_id=user_foo['id'],
project_id=project1['id'],
role_id=role_member['id'])
self.assignment_api.create_grant(
group_id=new_group['id'],
project_id=project1['id'],
role_id=role_admin['id'])
self.assignment_api.create_grant(
user_id=user_foo['id'],
domain_id=domain1['id'],
role_id=role_foo_domain1['id'])
self.assignment_api.create_grant(
group_id=new_group['id'],
domain_id=domain1['id'],
role_id=role_group_domain1['id'])
# Get a scoped token for the project
auth_data = self.build_authentication_request(
username=user_foo['name'],
user_domain_id=test_v3.DEFAULT_DOMAIN_ID,
password=user_foo['password'],
project_name=project1['name'],
project_domain_id=domain1['id'])
r = self.v3_create_token(auth_data)
scoped_token = self.assertValidScopedTokenResponse(r)
project = scoped_token["project"]
roles_ids = []
for ref in scoped_token['roles']:
roles_ids.append(ref['id'])
self.assertEqual(project1['id'], project["id"])
self.assertIn(role_member['id'], roles_ids)
self.assertIn(role_admin['id'], roles_ids)
self.assertNotIn(role_foo_domain1['id'], roles_ids)
self.assertNotIn(role_group_domain1['id'], roles_ids)
def test_remote_user_no_realm(self):
api = auth.controllers.Auth()
context, auth_info, auth_context = self.build_external_auth_request(
self.default_domain_user['name'])
api.authenticate(context, auth_info, auth_context)
self.assertEqual(self.default_domain_user['id'],
auth_context['user_id'])
# Now test to make sure the user name can, itself, contain the
# '@' character.
user = {'name': 'myname@mydivision'}
self.identity_api.update_user(self.default_domain_user['id'], user)
context, auth_info, auth_context = self.build_external_auth_request(
user["name"])
api.authenticate(context, auth_info, auth_context)
self.assertEqual(self.default_domain_user['id'],
auth_context['user_id'])
def test_remote_user_no_domain(self):
api = auth.controllers.Auth()
context, auth_info, auth_context = self.build_external_auth_request(
self.user['name'])
self.assertRaises(exception.Unauthorized,
api.authenticate,
context,
auth_info,
auth_context)
def test_remote_user_and_password(self):
# both REMOTE_USER and password methods must pass.
# note that they do not have to match
api = auth.controllers.Auth()
auth_data = self.build_authentication_request(
user_domain_id=self.default_domain_user['domain_id'],
username=self.default_domain_user['name'],
password=self.default_domain_user['password'])['auth']
context, auth_info, auth_context = self.build_external_auth_request(
self.default_domain_user['name'], auth_data=auth_data)
api.authenticate(context, auth_info, auth_context)
def test_remote_user_and_explicit_external(self):
# both REMOTE_USER and password methods must pass.
# note that they do not have to match
auth_data = self.build_authentication_request(
user_domain_id=self.domain['id'],
username=self.user['name'],
password=self.user['password'])['auth']
auth_data['identity']['methods'] = ["password", "external"]
auth_data['identity']['external'] = {}
api = auth.controllers.Auth()
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
self.assertRaises(exception.Unauthorized,
api.authenticate,
self.make_request(),
auth_info,
auth_context)
def test_remote_user_bad_password(self):
# both REMOTE_USER and password methods must pass.
api = auth.controllers.Auth()
auth_data = self.build_authentication_request(
user_domain_id=self.domain['id'],
username=self.user['name'],
password='badpassword')['auth']
context, auth_info, auth_context = self.build_external_auth_request(
self.default_domain_user['name'], auth_data=auth_data)
self.assertRaises(exception.Unauthorized,
api.authenticate,
context,
auth_info,
auth_context)
def test_bind_not_set_with_remote_user(self):
self.config_fixture.config(group='token', bind=[])
auth_data = self.build_authentication_request()
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
r = self.v3_create_token(auth_data)
token = self.assertValidUnscopedTokenResponse(r)
self.assertNotIn('bind', token)
def test_verify_with_bound_token(self):
self.config_fixture.config(group='token', bind='kerberos')
auth_data = self.build_authentication_request(
project_id=self.project['id'])
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
token = self.get_requested_token(auth_data)
headers = {'X-Subject-Token': token}
r = self.get('/auth/tokens', headers=headers, token=token)
token = self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(self.default_domain_user['name'],
token['bind']['kerberos'])
def test_auth_with_bind_token(self):
self.config_fixture.config(group='token', bind=['kerberos'])
auth_data = self.build_authentication_request()
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
r = self.v3_create_token(auth_data)
# the unscoped token should have bind information in it
token = self.assertValidUnscopedTokenResponse(r)
self.assertEqual(remote_user, token['bind']['kerberos'])
token = r.headers.get('X-Subject-Token')
# using unscoped token with remote user succeeds
auth_params = {'token': token, 'project_id': self.project_id}
auth_data = self.build_authentication_request(**auth_params)
r = self.v3_create_token(auth_data)
token = self.assertValidProjectScopedTokenResponse(r)
# the bind information should be carried over from the original token
self.assertEqual(remote_user, token['bind']['kerberos'])
def test_v2_v3_bind_token_intermix(self):
self.config_fixture.config(group='token', bind='kerberos')
# we need our own user registered to the default domain because of
# the way external auth works.
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
body = {'auth': {}}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
v2_token_data = resp.result
bind = v2_token_data['access']['token']['bind']
self.assertEqual(self.default_domain_user['name'], bind['kerberos'])
v2_token_id = v2_token_data['access']['token']['id']
# NOTE(gyee): self.get() will try to obtain an auth token if one
# is not provided. When REMOTE_USER is present in the request
# environment, the external user auth plugin is used in conjunction
# with the password auth for the admin user. Therefore, we need to
# cleanup the REMOTE_USER information from the previous call.
del self.admin_app.extra_environ['REMOTE_USER']
headers = {'X-Subject-Token': v2_token_id}
resp = self.get('/auth/tokens', headers=headers)
token_data = resp.result
self.assertDictEqual(v2_token_data['access']['token']['bind'],
token_data['token']['bind'])
class TokenDataTests(object):
"""Test the data in specific token types."""
def test_unscoped_token_format(self):
# ensure the unscoped token response contains the appropriate data
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r)
def test_domain_scoped_token_format(self):
# ensure the domain scoped token response contains the appropriate data
self.assignment_api.create_grant(
self.role['id'],
user_id=self.default_domain_user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self.get_requested_token(
self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
domain_id=self.domain['id'])
)
self.headers['X-Subject-Token'] = domain_scoped_token
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidDomainScopedTokenResponse(r)
def test_project_scoped_token_format(self):
# ensure project scoped token responses contains the appropriate data
project_scoped_token = self.get_requested_token(
self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
)
self.headers['X-Subject-Token'] = project_scoped_token
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidProjectScopedTokenResponse(r)
def test_extra_data_in_unscoped_token_fails_validation(self):
# ensure unscoped token response contains the appropriate data
r = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data
r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError,
self.assertValidUnscopedTokenResponse,
r)
def test_extra_data_in_domain_scoped_token_fails_validation(self):
# ensure domain scoped token response contains the appropriate data
self.assignment_api.create_grant(
self.role['id'],
user_id=self.default_domain_user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self.get_requested_token(
self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
domain_id=self.domain['id'])
)
self.headers['X-Subject-Token'] = domain_scoped_token
r = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data
r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError,
self.assertValidDomainScopedTokenResponse,
r)
def test_extra_data_in_project_scoped_token_fails_validation(self):
# ensure project scoped token responses contains the appropriate data
project_scoped_token = self.get_requested_token(
self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
)
self.headers['X-Subject-Token'] = project_scoped_token
resp = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data
resp.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError,
self.assertValidProjectScopedTokenResponse,
resp)
class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
def config_overrides(self):
super(AllowRescopeScopedTokenDisabledTests, self).config_overrides()
self.config_fixture.config(
group='token',
allow_rescope_scoped_token=False)
def test_rescoping_v3_to_v3_disabled(self):
self.v3_create_token(
self.build_authentication_request(
token=self.get_scoped_token(),
project_id=self.project_id),
expected_status=http_client.FORBIDDEN)
def _v2_token(self):
body = {
'auth': {
"tenantId": self.default_domain_project['id'],
'passwordCredentials': {
'userId': self.default_domain_user['id'],
'password': self.default_domain_user['password']
}
}}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
v2_token_data = resp.result
return v2_token_data
def _v2_token_from_token(self, token):
body = {
'auth': {
"tenantId": self.project['id'],
"token": token
}}
self.admin_request(path='/v2.0/tokens',
method='POST',
body=body,
expected_status=http_client.FORBIDDEN)
def test_rescoping_v2_to_v3_disabled(self):
token = self._v2_token()
self.v3_create_token(
self.build_authentication_request(
token=token['access']['token']['id'],
project_id=self.project_id),
expected_status=http_client.FORBIDDEN)
def test_rescoping_v3_to_v2_disabled(self):
token = {'id': self.get_scoped_token()}
self._v2_token_from_token(token)
def test_rescoping_v2_to_v2_disabled(self):
token = self._v2_token()
self._v2_token_from_token(token['access']['token'])
def test_rescoped_domain_token_disabled(self):
self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
domain_id=self.domainA['id'])
unscoped_token = self.get_requested_token(
self.