Validate token calls return 404 on invalid tokens

Invalid X-Subject-Token was resulting in HTTP 401 rather than 404
This is causing the auth_token middleware to re-authenticate
It expects a 404 on an invalid token.

Change-Id: Ieb4cbd4f69b4f3c5944eebc262e694e831d1ca6d
Fixed-Bug: #1221889
Fixed-Bug: #1186059
This commit is contained in:
Arvind Tiwari 2013-09-11 12:28:14 -06:00 committed by Morgan Fainberg
parent af68f75978
commit f1f0bbc4b4
10 changed files with 175 additions and 47 deletions

View File

@ -3,7 +3,7 @@
"cloud_admin": [["rule:admin_required", "domain_id:admin_domain_id"]],
"service_role": [["role:service"]],
"service_or_admin": [["rule:admin_required"], ["rule:service_role"]],
"owner" : [["user_id:%(user_id)s"]],
"owner" : [["user_id:%(user_id)s"], ["user_id:%(target.entity.user_id)s"]],
"admin_or_owner": [["rule:admin_required"], ["rule:owner"]],
"admin_or_cloud_admin": [["rule:admin_required"], ["rule:cloud_admin"]],
@ -85,7 +85,7 @@
"identity:update_policy": [["rule:cloud_admin"]],
"identity:delete_policy": [["rule:cloud_admin"]],
"identity:check_token": [["rule:admin_required"]],
"identity:check_token": [["rule:admin_or_owner"]],
"identity:validate_token": [["rule:service_or_admin"]],
"identity:validate_token_head": [["rule:service_or_admin"]],
"identity:revocation_list": [["rule:service_or_admin"]],

View File

@ -128,18 +128,27 @@ def protected(callback=None):
action = 'identity:%s' % f.__name__
creds = _build_policy_check_credentials(self, action,
context, kwargs)
policy_dict = {}
# Check to see if we need to include the target entity in our
# policy checks. We deduce this by seeing if the class has
# specified a get_member() method and that kwargs contains the
# appropriate entity id.
policy_dict = {}
if (hasattr(self, 'get_member_from_driver') and
self.get_member_from_driver is not None):
key = '%s_id' % self.member_name
if key in kwargs:
ref = self.get_member_from_driver(kwargs[key])
policy_dict = {'target':
{self.member_name: ref}}
key = '%s_id' % self.member_name
if key in kwargs:
ref = self.get_member_from_driver(kwargs[key])
policy_dict['target'] = {self.member_name: ref}
if context.get('subject_token_id') is not None:
token_ref = self.token_api.get_token(
context['subject_token_id'])
policy_dict.setdefault('target', {})
policy_dict['target'].setdefault(self.member_name, {})
policy_dict['target'][self.member_name]['user_id'] = (
token_ref['user_id'])
# Add in the kwargs, which means that any entity provided as a
# parameter for calls like create and update will be included.

View File

@ -432,7 +432,7 @@ class AuthWithToken(AuthTest):
# Check the token is now invalid
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.controller.validate_token,
dict(is_admin=True, query_string={}),
token_id=token_id)

View File

@ -2834,30 +2834,30 @@ class TokenCacheInvalidation(object):
def _check_unscoped_tokens_are_invalid(self):
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_token,
self.unscoped_token_id)
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.unscoped_token_id)
def _check_scoped_tokens_are_invalid(self):
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_token,
self.scoped_token_id)
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_token,
self.scoped_token_id,
self.tenant['id'])
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.scoped_token_id)
self.assertRaises(
exception.Unauthorized,
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.scoped_token_id,
self.tenant['id'])

View File

@ -402,6 +402,15 @@ class CoreApiTests(object):
token=token)
self.assertValidAuthenticationResponse(r)
def test_invalid_token_404(self):
token = self.get_scoped_token()
self.admin_request(
path='/v2.0/tokens/%(token_id)s' % {
'token_id': 'invalid',
},
token=token,
expected_status=404)
def test_validate_token_service_role(self):
self.metadata_foobar = self.identity_api.add_role_to_user_and_project(
self.user_foo['id'],

View File

@ -801,13 +801,13 @@ class TestTokenProvider(tests.TestCase):
'my.package.MyProvider')
def test_provider_token_expiration_validation(self):
self.assertRaises(exception.Unauthorized,
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._is_valid_token,
SAMPLE_V2_TOKEN_EXPIRED)
self.assertRaises(exception.Unauthorized,
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._is_valid_token,
SAMPLE_V3_TOKEN_EXPIRED)
self.assertRaises(exception.Unauthorized,
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._is_valid_token,
SAMPLE_MALFORMED_TOKEN)
self.assertEqual(

View File

@ -323,8 +323,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase):
def test_revoke_token(self):
headers = {'X-Subject-Token': self.get_scoped_token()}
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers, expected_status=401)
self.head('/auth/tokens', headers=headers, expected_status=404)
# make sure we have a CRL
r = self.get('/auth/tokens/OS-PKI/revoked')
self.assertIn('signed', r.result)
@ -352,6 +351,118 @@ class TestUUIDTokenAPIs(TestPKITokenAPIs):
pass
class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
"""Test token revoke using v3 Identity API by token owner and admin."""
def setUp(self):
"""Setup for Test Cases.
One domain A
Two users userNormalA and userAdminA
"""
super(TestTokenRevokeSelfAndAdmin, self).setUp()
self.domainA = self.new_domain_ref()
self.identity_api.create_domain(self.domainA['id'], self.domainA)
self.userAdminA = self.new_user_ref(domain_id=self.domainA['id'])
self.userAdminA['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.userAdminA['id'], self.userAdminA)
self.userNormalA = self.new_user_ref(
domain_id=self.domainA['id'])
self.userNormalA['password'] = uuid.uuid4().hex
self.identity_api.create_user(self.userNormalA['id'], self.userNormalA)
self.role1 = self.new_role_ref()
self.role1['name'] = 'admin'
self.identity_api.create_role(self.role1['id'], self.role1)
self.identity_api.create_grant(self.role1['id'],
user_id=self.userAdminA['id'],
domain_id=self.domainA['id'])
# Finally, switch to the v3 sample policy file
self.orig_policy_file = CONF.policy_file
from keystone.policy.backends import rules
rules.reset()
self.opt(policy_file=tests.etcdir('policy.v3cloudsample.json'))
def test_user_revokes_own_token(self):
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userNormalA['id'],
password=self.userNormalA['password'],
user_domain_id=self.domainA['id']))
user_token = r.headers.get('X-Subject-Token')
self.assertNotEmpty(user_token)
headers = {'X-Subject-Token': user_token}
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userAdminA['id'],
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
adminA_token = r.headers.get('X-Subject-Token')
self.head('/auth/tokens', headers=headers, expected_status=204,
token=adminA_token)
self.head('/auth/tokens', headers=headers, expected_status=204,
token=user_token)
self.delete('/auth/tokens', headers=headers, expected_status=204,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.head('/auth/tokens', headers=headers, expected_status=401,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.delete('/auth/tokens', headers=headers, expected_status=401,
token=user_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.delete('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.head('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
def test_admin_revokes_user_token(self):
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userNormalA['id'],
password=self.userNormalA['password'],
user_domain_id=self.domainA['id']))
user_token = r.headers.get('X-Subject-Token')
self.assertNotEmpty(user_token)
headers = {'X-Subject-Token': user_token}
r = self.post(
'/auth/tokens',
body=self.build_authentication_request(
user_id=self.userAdminA['id'],
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
adminA_token = r.headers.get('X-Subject-Token')
self.head('/auth/tokens', headers=headers, expected_status=204,
token=adminA_token)
self.head('/auth/tokens', headers=headers, expected_status=204,
token=user_token)
self.delete('/auth/tokens', headers=headers, expected_status=204,
token=adminA_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.head('/auth/tokens', headers=headers, expected_status=401,
token=user_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.delete('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.head('/auth/tokens', headers=headers, expected_status=404,
token=adminA_token)
class TestTokenRevoking(test_v3.RestfulTestCase):
"""Test token revocation on the v3 Identity API."""
@ -517,7 +628,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
self.delete(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=401)
expected_status=404)
def test_deleting_role_revokes_token(self):
"""Test deleting a role revokes token.
@ -632,16 +743,16 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
# Check the tokens that used role1 is invalid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA},
expected_status=401)
expected_status=404)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB},
expected_status=401)
expected_status=404)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD},
expected_status=401)
expected_status=404)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE},
expected_status=401)
expected_status=404)
# ...but the one using role2 is still valid
self.head('/auth/tokens',
@ -701,7 +812,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
# user should no longer have access to the project
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=401)
expected_status=404)
resp = self.post(
'/auth/tokens',
body=self.build_authentication_request(
@ -731,7 +842,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
# user should no longer have access to the project
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=401)
expected_status=404)
resp = self.post(
'/auth/tokens',
body=self.build_authentication_request(
@ -792,10 +903,10 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
self.delete(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
expected_status=401)
expected_status=404)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
expected_status=401)
expected_status=404)
# But user3's token should still be valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token3},
@ -872,7 +983,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
'user_id': self.user1['id']})
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
expected_status=401)
expected_status=404)
# But user2's token should still be valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
@ -883,7 +994,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
'user_id': self.user2['id']})
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
expected_status=401)
expected_status=404)
def test_removing_role_assignment_does_not_affect_other_users(self):
"""Revoking a role from one user should not affect other users."""
@ -918,7 +1029,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
# authorization for the first user should now fail
self.head('/auth/tokens',
headers={'X-Subject-Token': user1_token},
expected_status=401)
expected_status=404)
self.post(
'/auth/tokens',
body=self.build_authentication_request(

View File

@ -365,7 +365,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.get('/auth/tokens', headers=headers,
expected_status=401)
expected_status=404)
def test_deleting_consumer_also_deletes_tokens(self):
self.test_oauth_flow()
@ -386,7 +386,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.head('/auth/tokens', headers=headers,
expected_status=401)
expected_status=404)
def test_change_user_password_also_deletes_tokens(self):
self.test_oauth_flow()

View File

@ -190,8 +190,11 @@ class Manager(manager.Manager):
LOG.exception(_('Unexpected error or malformed token determining '
'token expiry: %s') % token)
# Token is expired, we have a malformed token, or something went wrong.
raise exception.Unauthorized(_('Failed to validate token'))
# FIXME(morganfainberg): This error message needs to be updated to
# reflect the token couldn't be found, but this change needs to wait
# until Icehouse due to string freeze in Havana. This should be:
# "Failed to find valid token" or something similar.
raise exception.TokenNotFound(_('Failed to validate token'))
def _token_belongs_to(self, token, belongs_to):
"""Check if the token belongs to the right tenant.
@ -292,7 +295,7 @@ class Provider(object):
:param token_id: identity of the token
:type token_id: string
:returns: token_data
:raises: keystone.exception.Unauthorized
:raises: keystone.exception.TokenNotFound
"""
raise exception.NotImplemented()
@ -304,7 +307,7 @@ class Provider(object):
:param token_id: identity of the token
:type token_id: string
:returns: token data
:raises: keystone.exception.Unauthorized
:raises: keystone.exception.TokenNotFound
"""
raise exception.NotImplemented()
@ -317,6 +320,6 @@ class Provider(object):
:param belongs_to: project_id token belongs to
:type belongs_to: string
:returns: token data
:raises: keystone.exception.Unauthorized
:raises: keystone.exception.TokenNotFound
"""
raise exception.NotImplemented()

View File

@ -459,11 +459,8 @@ class Provider(token.provider.Provider):
def _verify_token(self, token_id):
"""Verify the given token and return the token_ref."""
try:
token_ref = self.token_api.get_token(token_id)
return self._verify_token_ref(token_ref)
except exception.TokenNotFound:
raise exception.Unauthorized()
token_ref = self.token_api.get_token(token_id)
return self._verify_token_ref(token_ref)
def _verify_token_ref(self, token_ref):
"""Verify and return the given token_ref."""
@ -551,9 +548,9 @@ class Provider(token.provider.Provider):
token_data = self.v2_token_data_helper.format_token(
token_ref, roles_ref, catalog_ref)
return token_data
except (exception.ValidationError, exception.TokenNotFound) as e:
except exception.ValidationError as e:
LOG.exception(_('Failed to validate token'))
raise exception.Unauthorized(e)
raise exception.TokenNotFound(e)
def validate_v3_token(self, token_id):
try:
@ -561,7 +558,6 @@ class Provider(token.provider.Provider):
token_data = self._validate_v3_token_ref(token_ref)
return token_data
except (exception.ValidationError,
exception.TokenNotFound,
exception.UserNotFound):
LOG.exception(_('Failed to validate token'))