OpenStack Identity (Keystone)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
keystone/keystone/tests/unit/test_v3_auth.py

5723 lines
238 KiB

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import fixtures
import itertools
import operator
import re
from unittest import mock
import uuid
import freezegun
import http.client
from oslo_serialization import jsonutils as json
from oslo_utils import fixture
from oslo_utils import timeutils
from testtools import matchers
from testtools import testcase
from keystone import auth
from keystone.auth.plugins import totp
from keystone.common import authorization
from keystone.common import provider_api
from keystone.common.rbac_enforcer import policy
from keystone.common import utils
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet
from keystone import exception
from keystone.identity.backends import resource_options as ro
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestMFARules(test_v3.RestfulTestCase):
def config_overrides(self):
super(TestMFARules, self).config_overrides()
self.useFixture(
ksfixtures.KeyRepository(
self.config_fixture,
'fernet_tokens',
CONF.fernet_tokens.max_active_keys
)
)
self.useFixture(
ksfixtures.KeyRepository(
self.config_fixture,
'credential',
credential_fernet.MAX_ACTIVE_KEYS
)
)
def assertValidErrorResponse(self, r):
resp = r.result
if r.headers.get(authorization.AUTH_RECEIPT_HEADER):
self.assertIsNotNone(resp.get('receipt'))
self.assertIsNotNone(resp.get('receipt').get('methods'))
else:
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp['error'].get('code'))
self.assertIsNotNone(resp['error'].get('title'))
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status_code)
def _create_totp_cred(self):
totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
PROVIDERS.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
def cleanup(testcase):
totp_creds = testcase.credential_api.list_credentials_for_user(
testcase.user['id'], type='totp')
for cred in totp_creds:
testcase.credential_api.delete_credential(cred['id'])
self.addCleanup(cleanup, testcase=self)
return totp_cred
def auth_plugin_config_override(self, methods=None, **method_classes):
methods = ['totp', 'token', 'password']
super(TestMFARules, self).auth_plugin_config_override(methods)
def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
user = self.user.copy()
# Do not update password
user.pop('password')
user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
PROVIDERS.identity_api.update_user(user['id'], user)
def test_MFA_single_method_rules_requirements_met_succeeds(self):
# ensure that a simple password works if a password-only rules exists
rule_list = [['password'], ['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id))
def test_MFA_multi_method_rules_requirements_met_succeeds(self):
# validate that multiple auth-methods function if all are specified
# and the rules requires it
rule_list = [['password', 'totp']]
totp_cred = self._create_totp_cred()
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
auth_req = self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
self.v3_create_token(auth_req)
def test_MFA_single_method_rules_requirements_not_met_fails(self):
# if a rule matching a single auth type is specified and is not matched
# the result should be unauthorized
rule_list = [['totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
def test_MFA_multi_method_rules_requirements_not_met_fails(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
# Bogus auth methods are thrown out from rules.
rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id))
def test_MFA_rules_disabled_MFA_succeeeds(self):
# ensure that if MFA is "disableD" authentication succeeds, even if
# not enough auth methods are specified
rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list,
rules_enabled=False)
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id))
def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
# if all the rules are bogus, the result is the same as the default
# behavior, any single password method is sufficient
rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
['BoGus'],
['NonExistantMethod']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
self.v3_create_token(
self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id))
def test_MFA_rules_rescope_works_without_token_method_in_rules(self):
rule_list = [['password', 'totp']]
totp_cred = self._create_totp_cred()
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
auth_data = self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
r = self.v3_create_token(auth_data)
auth_data = self.build_authentication_request(
token=r.headers.get('X-Subject-Token'),
project_id=self.project_id)
self.v3_create_token(auth_data)
def test_MFA_requirements_makes_correct_receipt_for_password(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body=self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content.
self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual(
set(frozenset(r) for r in rule_list),
set(frozenset(r) for r in resp_data.get('required_auth_methods')))
def test_MFA_requirements_makes_correct_receipt_for_totp(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
totp_cred = self._create_totp_cred()
rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body=self.build_authentication_request(
user_id=self.user_id,
user_domain_id=self.domain_id,
project_id=self.project_id,
passcode=totp._generate_totp_passcodes(
totp_cred['blob'])[0]),
expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content.
self.assertEqual(
{'totp'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual(
set(frozenset(r) for r in rule_list),
set(frozenset(r) for r in resp_data.get('required_auth_methods')))
def test_MFA_requirements_makes_correct_receipt_for_pass_and_totp(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
totp_cred = self._create_totp_cred()
rule_list = [['password', 'totp', 'token']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body=self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id,
passcode=totp._generate_totp_passcodes(
totp_cred['blob'])[0]),
expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content.
self.assertEqual(
{'password', 'totp'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual(
set(frozenset(r) for r in rule_list),
set(frozenset(r) for r in resp_data.get('required_auth_methods')))
def test_MFA_requirements_returns_correct_required_auth_methods(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
rule_list = [
['password', 'totp', 'token'],
['password', 'totp'],
['token', 'totp'],
['BoGusAuThMeTh0dHandl3r']
]
expect_rule_list = rule_list = [
['password', 'totp', 'token'],
['password', 'totp'],
]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body=self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content.
self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual(
set(frozenset(r) for r in expect_rule_list),
set(frozenset(r) for r in resp_data.get('required_auth_methods')))
def test_MFA_consuming_receipt_with_totp(self):
# if multiple rules are specified and only one is passed,
# unauthorized is expected
totp_cred = self._create_totp_cred()
rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that
# tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body=self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER))
receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER)
resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content.
self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual(
set(frozenset(r) for r in rule_list),
set(frozenset(r) for r in resp_data.get('required_auth_methods')))
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
headers={authorization.AUTH_RECEIPT_HEADER: receipt},
body=self.build_authentication_request(
user_id=self.user_id,
user_domain_id=self.domain_id,
project_id=self.project_id,
passcode=totp._generate_totp_passcodes(
totp_cred['blob'])[0]))
def test_MFA_consuming_receipt_not_found(self):
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time):
response = self.admin_request(
method='POST',
path='/v3/auth/tokens',
headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"},
body=self.build_authentication_request(
user_id=self.user_id,
user_domain_id=self.domain_id,
project_id=self.project_id),
expected_status=http.client.UNAUTHORIZED)
self.assertEqual(401, response.result['error']['code'])
class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
def setUp(self):
super(TestAuthInfo, self).setUp()
auth.core.load_auth_methods()
def test_unsupported_auth_method(self):
auth_data = {'methods': ['abc']}
auth_data['abc'] = {'test': 'test'}
auth_data = {'identity': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.core.AuthInfo.create,
auth_data)
def test_missing_auth_method_data(self):
auth_data = {'methods': ['password']}
auth_data = {'identity': auth_data}
self.assertRaises(exception.ValidationError,
auth.core.AuthInfo.create,
auth_data)
def test_project_name_no_domain(self):
auth_data = self.build_authentication_request(
username='test',
password='test',
project_name='abc')['auth']
self.assertRaises(exception.ValidationError,
auth.core.AuthInfo.create,
auth_data)
def test_both_project_and_domain_in_scope(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
self.assertRaises(exception.ValidationError,
auth.core.AuthInfo.create,
auth_data)
def test_get_method_names_duplicates(self):
auth_data = self.build_authentication_request(
token='test',
user_id='test',
password='test')['auth']
auth_data['identity']['methods'] = ['password', 'token',
'password', 'password']
auth_info = auth.core.AuthInfo.create(auth_data)
self.assertEqual(['password', 'token'],
auth_info.get_method_names())
def test_get_method_data_invalid_method(self):
auth_data = self.build_authentication_request(
user_id='test',
password='test')['auth']
auth_info = auth.core.AuthInfo.create(auth_data)
method_name = uuid.uuid4().hex
self.assertRaises(exception.ValidationError,
auth_info.get_method_data,
method_name)
class TokenAPITests(object):
# Why is this not just setUp? Because TokenAPITests is not a test class
# itself. If TokenAPITests became a subclass of the testcase, it would get
# called by the enumerate-tests-in-file code. The way the functions get
# resolved in Python for multiple inheritance means that a setUp in this
# would get skipped by the testrunner.
def doSetUp(self):
r = self.v3_create_token(self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password']))
self.v3_token_data = r.result
self.v3_token = r.headers.get('X-Subject-Token')
self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
def _get_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
return r.headers.get('X-Subject-Token')
def _get_domain_scoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain_id)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
return r.headers.get('X-Subject-Token')
def _get_project_scoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project_id)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
return r.headers.get('X-Subject-Token')
def _get_trust_scoped_token(self, trustee_user, trust):
auth_data = self.build_authentication_request(
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
return r.headers.get('X-Subject-Token')
def _create_trust(self, impersonation=False):
# Create a trustee user
trustee_user = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain_id)
ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=trustee_user['id'],
project_id=self.project_id,
impersonation=impersonation,
role_ids=[self.role_id])
# Create a trust
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
return (trustee_user, trust)
def _validate_token(self, token,
expected_status=http.client.OK, allow_expired=False):
path = '/v3/auth/tokens'
if allow_expired:
path += '?allow_expired=1'
return self.admin_request(
path=path,
headers={'X-Auth-Token': self.get_admin_token(),
'X-Subject-Token': token},
method='GET',
expected_status=expected_status
)
def _revoke_token(self, token, expected_status=http.client.NO_CONTENT):
return self.delete(
'/auth/tokens',
headers={'x-subject-token': token},
expected_status=expected_status)
def _set_user_enabled(self, user, enabled=True):
user['enabled'] = enabled
PROVIDERS.identity_api.update_user(user['id'], user)
def _create_project_and_set_as_default_project(self):
# create a new project
ref = unit.new_project_ref(domain_id=self.domain_id)
r = self.post('/projects', body={'project': ref})
project = self.assertValidProjectResponse(r, ref)
# grant the user a role on the project
self.put(
'/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'project_id': project['id'],
'role_id': self.role['id']})
# make the new project the user's default project
body = {'user': {'default_project_id': project['id']}}
r = self.patch('/users/%(user_id)s' % {
'user_id': self.user['id']},
body=body)
self.assertValidUserResponse(r)
return project
def test_auth_with_token_as_different_user_fails(self):
# get the token for a user. This is self.user which is different from
# self.default_domain_user.
token = self.get_scoped_token()
# try both password and token methods with different identities and it
# should fail
auth_data = self.build_authentication_request(
token=token,
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_token_for_user_without_password_fails(self):
user = unit.new_user_ref(domain_id=self.domain['id'])
del user['password'] # can't have a password for this test
user = PROVIDERS.identity_api.create_user(user)
auth_data = self.build_authentication_request(
user_id=user['id'],
password='password')
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
token_id = r.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(token=token_id)
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_id(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_create_unscoped_token_with_user_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_validate_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
def test_validate_expired_unscoped_token_returns_not_found(self):
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token',
expiration=10)
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
unscoped_token = self._get_unscoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
def test_revoke_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
self._revoke_token(unscoped_token)
self._validate_token(unscoped_token,
expected_status=http.client.NOT_FOUND)
def test_create_explicit_unscoped_token(self):
self._create_project_and_set_as_default_project()
# explicitly ask for an unscoped token
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
unscoped="unscoped")
r = self.post('/auth/tokens', body=auth_data, noauth=True)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_users_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
project = self.create_new_default_project_for_user(
self.user['id'], self.domain_id, enable_project=False)
# assign a role to user for the new project
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], self.role_id
)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_default_project_domain_result_in_unscoped_token(self):
domain_ref = unit.new_domain_ref()
r = self.post('/domains', body={'domain': domain_ref})
domain = self.assertValidDomainResponse(r, domain_ref)
project = self.create_new_default_project_for_user(
self.user['id'], domain['id'])
# assign a role to user for the new project
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], self.role_id
)
# now disable the project domain
body = {'domain': {'enabled': False}}
r = self.patch('/domains/%(domain_id)s' % {'domain_id': domain['id']},
body=body)
self.assertValidDomainResponse(r)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_is_invalid_after_disabling_user(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
# Enable the user
self._set_user_enabled(self.user)
# Ensure validating a token for a re-enabled user fails
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
# Disable the user's domain
self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled user fails
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
def test_unscoped_token_is_invalid_after_changing_user_password(self):
unscoped_token = self._get_unscoped_token()
# Make sure the token is valid
r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r)
# Change user's password
self.user['password'] = 'Password1'
PROVIDERS.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing user's tokens
self._validate_token(
unscoped_token,
expected_status=http.client.NOT_FOUND
)
def test_create_system_token_with_user_id(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': self.role_id
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
def test_create_system_token_with_username(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': self.role_id
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
def test_create_system_token_fails_without_system_assignment(self):
auth_request_body = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
system=True
)
self.v3_create_token(
auth_request_body,
expected_status=http.client.UNAUTHORIZED
)
def test_system_token_is_invalid_after_disabling_user(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': self.role_id
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token')
self._validate_token(token)
# NOTE(lbragstad): This would make a good test for groups, but
# apparently it's not possible to disable a group.
user_ref = {
'user': {
'enabled': False
}
}
self.patch(
'/users/%(user_id)s' % {'user_id': self.user['id']},
body=user_ref
)
self.admin_request(
path='/v3/auth/tokens',
headers={'X-Auth-Token': token,
'X-Subject-Token': token},
method='GET',
expected_status=http.client.UNAUTHORIZED
)
self.admin_request(
path='/v3/auth/tokens',
headers={'X-Auth-Token': token,
'X-Subject-Token': token},
method='HEAD',
expected_status=http.client.UNAUTHORIZED
)
def test_create_system_token_via_system_group_assignment(self):
ref = {
'group': unit.new_group_ref(
domain_id=CONF.identity.default_domain_id
)
}
group = self.post('/groups', body=ref).json_body['group']
path = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
'group_id': group['id'],
'role_id': self.role_id
}
self.put(path=path)
path = '/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': group['id'],
'user_id': self.user['id']
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token')
self._validate_token(token)
def test_revoke_system_token(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': self.role_id
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token')
self._validate_token(token)
self._revoke_token(token)
self._validate_token(token, expected_status=http.client.NOT_FOUND)
def test_system_token_is_invalid_after_deleting_system_role(self):
ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': system_role['id']
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token')
self._validate_token(token)
self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']})
self._validate_token(token, expected_status=http.client.NOT_FOUND)
def test_rescoping_a_system_token_for_a_project_token_fails(self):
ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': system_role['id']
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
system_token = response.headers.get('X-Subject-Token')
auth_request_body = self.build_authentication_request(
token=system_token, project_id=self.project_id
)
self.v3_create_token(
auth_request_body, expected_status=http.client.FORBIDDEN
)
def test_rescoping_a_system_token_for_a_domain_token_fails(self):
ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'],
'role_id': system_role['id']
}
self.put(path=path)
auth_request_body = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'],
user_domain_id=self.domain['id'],
system=True
)
response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response)
system_token = response.headers.get('X-Subject-Token')
auth_request_body = self.build_authentication_request(
token=system_token, domain_id=CONF.identity.default_domain_id
)
self.v3_create_token(
auth_request_body, expected_status=http.client.FORBIDDEN
)
def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_id_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_id(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_name(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_scoped_with_domain_name_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_with_only_domain_name_and_username(self):
# grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_with_group_role(self):
group = unit.new_group_ref(domain_id=self.domain_id)
group = PROVIDERS.identity_api.create_group(group)
# add user to group
PROVIDERS.identity_api.add_user_to_group(self.user['id'], group['id'])
# grant the domain role to group
path = '/domains/%s/groups/%s/roles/%s' % (
self.domain['id'], group['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_create_domain_token_fails_if_domain_name_unsafe(self):
"""Verify authenticate to a domain with unsafe name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named domain
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
unsafe_name = 'i am not / safe'
domain = unit.new_domain_ref(name=unsafe_name)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
domain_id=domain['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=domain['name'])
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
domain_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_domain_token_without_grant_returns_unauthorized(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
# this fails because the user does not have a role on self.domain
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_validate_domain_scoped_token(self):
# Grant user access to domain
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
domain_scoped_token = self._get_domain_scoped_token()
r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r)
resp_json = json.loads(r.body)
self.assertIsNotNone(resp_json['token']['catalog'])
self.assertIsNotNone(resp_json['token']['roles'])
self.assertIsNotNone(resp_json['token']['domain'])
def test_validate_expired_domain_scoped_token_returns_not_found(self):
# Grant user access to domain
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token',
expiration=10)
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
domain_scoped_token = self._get_domain_scoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token(
domain_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_domain_scoped_token_is_invalid_after_disabling_user(self):
# Grant user access to domain
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r)
# Disable user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self._validate_token(
domain_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
# Grant user access to domain
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r)
# Delete access to domain
PROVIDERS.assignment_api.delete_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
# Ensure validating a token for a disabled user fails
self._validate_token(
domain_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_domain_scoped_token_invalid_after_disabling_domain(self):
# Grant user access to domain
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid
r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r)
# Disable domain
self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled domain fails
self._validate_token(
domain_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_create_project_scoped_token_with_project_id_and_user_id(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_validate_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
def test_validate_expired_project_scoped_token_returns_not_found(self):
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token',
expiration=10)
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
project_scoped_token = self._get_project_scoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token(
project_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_revoke_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
self._revoke_token(project_scoped_token)
self._validate_token(project_scoped_token,
expected_status=http.client.NOT_FOUND)
def test_project_scoped_token_is_scoped_to_default_project(self):
project = self._create_project_and_set_as_default_project()
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
# ensure the project id in the token matches the default project id
self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_project_scoped_token_no_catalog_is_scoped_to_default_project(
self):
project = self._create_project_and_set_as_default_project()
# attempt to authenticate without requesting a project or catalog
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
# ensure the project id in the token matches the default project id
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_implicit_project_id_scoped_token_with_user_id_no_catalog(self):
self._create_project_and_set_as_default_project()
# create a project scoped token that isn't scoped to the default
# project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
# ensure the project id in the token matches the one we as for
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
self.assertEqual(self.project['id'],
r.result['token']['project']['id'])
def test_project_scoped_token_catalog_attributes(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
catalog = r.result['token']['catalog']
self.assertEqual(1, len(catalog))
catalog = catalog[0]
self.assertEqual(self.service['id'], catalog['id'])
self.assertEqual(self.service['name'], catalog['name'])
self.assertEqual(self.service['type'], catalog['type'])
endpoint = catalog['endpoints']
self.assertEqual(1, len(endpoint))
endpoint = endpoint[0]
self.assertEqual(self.endpoint['id'], endpoint['id'])
self.assertEqual(self.endpoint['interface'], endpoint['interface'])
self.assertEqual(self.endpoint['region_id'], endpoint['region_id'])
self.assertEqual(self.endpoint['url'], endpoint['url'])
def test_project_scoped_token_catalog_excludes_disabled_endpoint(self):
# Create a disabled endpoint
disabled_endpoint_ref = copy.copy(self.endpoint)
disabled_endpoint_id = uuid.uuid4().hex
disabled_endpoint_ref.update({
'id': disabled_endpoint_id,
'enabled': False,
'interface': 'internal'
})
PROVIDERS.catalog_api.create_endpoint(
disabled_endpoint_id, disabled_endpoint_ref
)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
resp = self.v3_create_token(auth_data)
# make sure the disabled endpoint id isn't in the list of endpoints
endpoints = resp.result['token']['catalog'][0]['endpoints']
endpoint_ids = [endpoint['id'] for endpoint in endpoints]
self.assertNotIn(disabled_endpoint_id, endpoint_ids)
def test_project_scoped_token_catalog_excludes_disabled_service(self):
"""On authenticate, get a catalog that excludes disabled services."""
# although the endpoint associated with the service is enabled, the
# service is disabled
self.assertTrue(self.endpoint['enabled'])
PROVIDERS.catalog_api.update_service(
self.endpoint['service_id'], {'enabled': False})
service = PROVIDERS.catalog_api.get_service(
self.endpoint['service_id']
)
self.assertFalse(service['enabled'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertEqual([], r.result['token']['catalog'])
def test_scope_to_project_without_grant_returns_unauthorized(self):
project = unit.new_project_ref(domain_id=self.domain_id)
PROVIDERS.resource_api.create_project(project['id'], project)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_project_scoped_token_with_username_and_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_create_project_scoped_token_with_username_and_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
project_id=self.project['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_create_project_scoped_token_fails_if_project_name_unsafe(self):
"""Verify authenticate to a project with unsafe name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named project
self.config_fixture.config(group='resource',
project_name_url_safe='off')
unsafe_name = 'i am not / safe'
project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
name=unsafe_name)
PROVIDERS.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], role_member['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
"""Verify authenticate to a project using unsafe domain name fails."""
# Start with url name restrictions off, so we can create the unsafe
# named domain
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
unsafe_name = 'i am not / safe'
domain = unit.new_domain_ref(name=unsafe_name)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
# Add a (safely named) project to that domain
project = unit.new_project_ref(domain_id=domain['id'])
PROVIDERS.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
project_id=project['id'])
# An auth request via project ID, but specifying domain by name
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_name=domain['name'])
# Since name url restriction is off, we should be able to authenticate
self.v3_create_token(auth_data)
# Set the name url restriction to new, which should still allow us to
# authenticate
self.config_fixture.config(group='resource',
project_name_url_safe='new')
self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to
# authenticate
self.config_fixture.config(group='resource',
domain_name_url_safe='strict')
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_same_domain_and_project_name(self):
"""Authenticate to a project with the same name as its domain."""
domain = unit.new_project_ref(is_domain=True)
domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'],
name=domain['name'])
PROVIDERS.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'], project['id'], role_member['id'])
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_name=domain['name'])
r = self.v3_create_token(auth_data)
self.assertEqual(project['id'], r.result['token']['project']['id'])
def test_create_project_token_fails_with_project_acting_as_domain(self):
domain = unit.new_project_ref(is_domain=True)
domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.create_grant(
role_member['id'],
user_id=self.user['id'],
domain_id=domain['id'])
# authentication will fail because the project name is incorrect
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=domain['name'],
project_domain_name=domain['name'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_disabled_project_domain_fails(self):
# create a disabled domain
domain = unit.new_domain_ref()
domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
# create a project in the domain
project = unit.new_project_ref(domain_id=domain['id'])
PROVIDERS.resource_api.create_project(project['id'], project)
# assign some role to self.user for the project in the domain
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user['id'],
project['id'],
self.role_id)
# Disable the domain
domain['enabled'] = False
PROVIDERS.resource_api.update_domain(domain['id'], domain)
# user should not be able to auth with project_id
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
# user should not be able to auth with project_name & domain
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_name=project['name'],
project_domain_id=domain['id'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_default_domain_as_project(self):
# Authenticate to a project with the default domain as project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=test_v3.DEFAULT_DOMAIN_ID)
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_project_scoped_token_is_invalid_after_disabling_user(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
self._validate_token(
project_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_project_scoped_token_invalid_after_changing_user_password(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Update user's password
self.user['password'] = 'Password1'
PROVIDERS.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing tokens
self._validate_token(
project_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_project_scoped_token_invalid_after_disabling_project(self):
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable project
self.project['enabled'] = False
PROVIDERS.resource_api.update_project(self.project['id'], self.project)
# Ensure validating a token for a disabled project fails
self._validate_token(
project_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_project_scoped_token_is_invalid_after_deleting_grant(self):
# disable caching so that user grant deletion is not hidden
# by token caching
self.config_fixture.config(
group='cache',
enabled=False)
# Grant user access to project
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
project_id=self.project['id']
)
project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid
r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Delete access to project
PROVIDERS.assignment_api.delete_grant(
self.role['id'], user_id=self.user['id'],
project_id=self.project['id']
)
# Ensure the token has been revoked
self._validate_token(
project_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_no_access_to_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
self.create_new_default_project_for_user(self.user['id'],
self.domain_id)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_rescope_unscoped_token_with_trust(self):
trustee_user, trust = self._create_trust()
self._get_trust_scoped_token(trustee_user, trust)
def test_validate_a_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
def test_validate_expired_trust_scoped_token_returns_not_found(self):
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token',
expiration=10)
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(
trustee_user, trust
)
frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_validate_a_trust_scoped_token_impersonated(self):
trustee_user, trust = self._create_trust(impersonation=True)
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
def test_revoke_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
self._revoke_token(trust_scoped_token)
self._validate_token(trust_scoped_token,
expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable trustee
trustee_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(
trustee_user['id'], trustee_update_ref
)
# Ensure validating a token for a disabled user fails
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_trust_token_is_invalid_when_trustee_domain_disabled(self):
# create a new domain with new user in that domain
new_domain_ref = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(
new_domain_ref['id'], new_domain_ref
)
trustee_ref = unit.create_user(PROVIDERS.identity_api,
domain_id=new_domain_ref['id'])
new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
PROVIDERS.resource_api.create_project(
new_project_ref['id'], new_project_ref
)
# grant the trustor access to the new project
PROVIDERS.assignment_api.create_grant(
self.role['id'],
user_id=self.user_id,
project_id=new_project_ref['id'])
trust_ref = unit.new_trust_ref(trustor_user_id=self.user_id,
trustee_user_id=trustee_ref['id'],
expires=dict(minutes=1),
project_id=new_project_ref['id'],
impersonation=True,
role_ids=[self.role['id']])
resp = self.post('/OS-TRUST/trusts', body={'trust': trust_ref})
self.assertValidTrustResponse(resp, trust_ref)
trust_id = resp.json_body['trust']['id']
# get a project-scoped token using the trust
trust_auth_data = self.build_authentication_request(
user_id=trustee_ref['id'],
password=trustee_ref['password'],
trust_id=trust_id)
trust_scoped_token = self.get_requested_token(trust_auth_data)
# ensure the project-scoped token from the trust is valid
self._validate_token(trust_scoped_token)
disable_body = {'domain': {'enabled': False}}
self.patch(
'/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']},
body=disable_body)
# ensure the project-scoped token from the trust is invalid
self._validate_token(trust_scoped_token,
expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Change trustee's password
trustee_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user(
trustee_user['id'], trustee_update_ref
)
# Ensure updating trustee's password revokes existing tokens
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable the trustor
trustor_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Change trustor's password
trustor_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable trustor's domain
self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
trustor_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens
self._validate_token(
trust_scoped_token,
expected_status=http.client.NOT_FOUND
)
def test_default_fixture_scope_token(self):
self.assertIsNotNone(self.get_scoped_token())
def test_rescoping_token(self):
expires = self.v3_token_data['token']['expires_at']
# rescope the token
r = self.v3_create_token(self.build_authentication_request(
token=self.v3_token,
project_id=self.project_id))
self.assertValidProjectScopedTokenResponse(r)
# ensure token expiration stayed the same
self.assertTimestampEqual(expires, r.result['token']['expires_at'])
def test_check_token(self):
self.head('/auth/tokens', headers=self.headers,
expected_status=http.client.OK)
def test_validate_token(self):
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r)
def test_validate_missing_subject_token(self):
self.get('/auth/tokens',
expected_status=http.client.NOT_FOUND)
def test_validate_missing_auth_token(self):
self.admin_request(
method='GET',
path='/v3/projects',
token=None,
expected_status=http.client.UNAUTHORIZED)
def test_validate_token_nocatalog(self):
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
r = self.get(
'/auth/tokens?nocatalog',
headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
def test_is_admin_token_by_ids(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
def test_is_admin_token_by_names(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_domain_name=self.domain['name'],
project_name=self.project['name']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
def test_token_for_non_admin_project_is_not_admin(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=self.domain['name'],
admin_project_name=uuid.uuid4().hex)
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
self.config_fixture.config(
group='resource',
admin_project_domain_name=uuid.uuid4().hex,
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
def test_only_admin_project_set_acts_as_non_admin(self):
self.config_fixture.config(
group='resource',
admin_project_name=self.project['name'])
r = self.v3_create_token(self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id']))
self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
v3_token = r.headers.get('X-Subject-Token')
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
def _create_role(self, domain_id=None):
"""Call ``POST /roles``."""
ref = unit.new_role_ref(domain_id=domain_id)
r = self.post('/roles', body={'role': ref})
return self.assertValidRoleResponse(r, ref)
def _create_implied_role(self, prior_id):
implied = self._create_role()
url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
self.put(url, expected_status=http.client.CREATED)
return implied
def _delete_implied_role(self, prior_role_id, implied_role_id):
url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
self.delete(url)
def _get_scoped_token_roles(self, is_domain=False):
if is_domain:
v3_token = self.get_domain_scoped_token()
else:
v3_token = self.get_scoped_token()
r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
v3_token_data = r.result
token_roles = v3_token_data['token']['roles']
return token_roles
def _create_implied_role_shows_in_v3_token(self, is_domain):
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(2, len(token_roles))
implied2 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(3, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
def test_create_implied_role_shows_in_v3_project_token(self):
# regardless of the default chosen, this should always
# test with the option set.
self.config_fixture.config(group='token')
self._create_implied_role_shows_in_v3_token(False)
def test_create_implied_role_shows_in_v3_domain_token(self):
self.config_fixture.config(group='token')
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id']
)
self._create_implied_role_shows_in_v3_token(True)
def test_create_implied_role_shows_in_v3_system_token(self):
self.config_fixture.config(group='token')
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user['id'], self.role['id']
)
token_id = self.get_system_scoped_token()
r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
token_roles = r.result['token']['roles']
prior = token_roles[0]['id']
self._create_implied_role(prior)
r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
token_roles = r.result['token']['roles']
self.assertEqual(2, len(token_roles))
def test_group_assigned_implied_role_shows_in_v3_token(self):
self.config_fixture.config(group='token')
is_domain = False
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
new_role = self._create_role()
prior = new_role['id']
new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
new_group = PROVIDERS.identity_api.create_group(new_group_ref)
PROVIDERS.assignment_api.create_grant(
prior, group_id=new_group['id'], project_id=self.project['id']
)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(1, len(token_roles))
PROVIDERS.identity_api.add_user_to_group(
self.user['id'], new_group['id']
)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(2, len(token_roles))
implied1 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(3, len(token_roles))
implied2 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles(is_domain)
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
def test_multiple_implied_roles_show_in_v3_token(self):
self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(prior)
implied3 = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
self.assertIn(implied3['id'], token_role_ids)
def test_chained_implied_role_shows_in_v3_token(self):
self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(implied1['id'])
implied3 = self._create_implied_role(implied2['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
self.assertIn(implied1['id'], token_role_ids)
self.assertIn(implied2['id'], token_role_ids)
self.assertIn(implied3['id'], token_role_ids)
def test_implied_role_disabled_by_config(self):
self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
prior = token_roles[0]['id']
implied1 = self._create_implied_role(prior)
implied2 = self._create_implied_role(implied1['id'])
self._create_implied_role(implied2['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(4, len(token_roles))
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(prior, token_role_ids)
def test_delete_implied_role_do_not_show_in_v3_token(self):
self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles()
prior = token_roles[0]['id']
implied = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
self._delete_implied_role(prior, implied['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(1, len(token_roles))
def test_unrelated_implied_roles_do_not_change_v3_token(self):
self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles()
prior = token_roles[0]['id']
implied = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
unrelated = self._create_role()
url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
self.put(url, expected_status=http.client.CREATED)
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
self._delete_implied_role(unrelated['id'], implied['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles))
def test_domain_specific_roles_do_not_show_v3_token(self):
self.config_fixture.config(group='token')
initial_token_roles = self._get_scoped_token_roles()
new_role = self._create_role(domain_id=self.domain_id)
PROVIDERS.assignment_api.create_grant(
new_role['id'], user_id=self.user['id'],
project_id=self.project['id']
)
implied = self._create_implied_role(new_role['id'])
token_roles = self._get_scoped_token_roles()
self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
# The implied role from the domain specific role should be in the
# token, but not the domain specific role itself.
token_role_ids = [role['id'] for role in token_roles]
self.assertIn(implied['id'], token_role_ids)
self.assertNotIn(new_role['id'], token_role_ids)
def test_remove_all_roles_from_scope_result_in_404(self):
# create a new user
new_user = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain['id'])
# give the new user a role on a project
path = '/projects/%s/users/%s/roles/%s' % (
self.project['id'], new_user['id'], self.role['id'])
self.put(path=path)
# authenticate as the new user and get a project-scoped token
auth_data = self.build_authentication_request(
user_id=new_user['id'],
password=new_user['password'],
project_id=self.project['id'])
subject_token_id = self.v3_create_token(auth_data).headers.get(
'X-Subject-Token')
# make sure the project-scoped token is valid
headers = {'X-Subject-Token': subject_token_id}
r = self.get('/auth/tokens', headers=headers)
self.assertValidProjectScopedTokenResponse(r)
# remove the roles from the user for the given scope
path = '/projects/%s/users/%s/roles/%s' % (
self.project['id'], new_user['id'], self.role['id'])
self.delete(path=path)
# token validation should now result in 404
self.get('/auth/tokens', headers=headers,
expected_status=http.client.NOT_FOUND)
def test_create_token_with_nonexistant_user_id_fails(self):
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_username_fails(self):
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_id_fails(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_name_fails(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_wrong_password_fails(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
self.v3_create_token(auth_data,
expected_status=http.client.UNAUTHORIZED)
def test_user_and_group_roles_scoped_token(self):
"""Test correct roles are returned in scoped token.
Test Plan:
- Create a domain, with 1 project, 2 users (user1 and user2)
and 2 groups (group1 and group2)
- Make user1 a member of group1, user2 a member of group2
- Create 8 roles, assigning them to each of the 8 combinations
of users/groups on domain/project
- Get a project scoped token for user1, checking that the right
two roles are returned (one directly assigned, one by virtue
of group membership)
- Repeat this for a domain scoped token
- Make user1 also a member of group2
- Get another scoped token making sure the additional role
shows up
- User2 is just here as a spoiler, to make sure we don't get
any roles uniquely assigned to it returned in any of our
tokens
"""
domainA = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domainA['id'], domainA)
projectA = unit.new_project_ref(domain_id=domainA['id'])
PROVIDERS.resource_api.create_project(projectA['id'], projectA)