Remove v2.0 token APIs
This commit removes all the v2.0 token APIs with the exception of the v2.0 authenticate for token API. POST /v2.0/tokens affects so much stuff that we can do it in a separate patch and hopefully make it easier for reviewers. bp removed-as-of-queens Change-Id: I508e7350c2a2d25c8fb413ea3523633f8939d80f
This commit is contained in:
parent
75f24c628b
commit
139aa015d2
@ -5,30 +5,6 @@ Tokens
|
||||
======
|
||||
|
||||
|
||||
List endoints for token
|
||||
=======================
|
||||
|
||||
.. rest_method:: GET /v2.0/tokens/{tokenId}/endpoints
|
||||
|
||||
Lists the endpoints associated with a token.
|
||||
|
||||
Normal response codes: 200,203
|
||||
Error response codes: 413,405,404,403,401,400,503
|
||||
|
||||
Request
|
||||
-------
|
||||
|
||||
.. rest_parameters:: parameters.yaml
|
||||
|
||||
- tokenId: token_id_path
|
||||
|
||||
Response Example
|
||||
----------------
|
||||
|
||||
.. literalinclude:: samples/admin/token-endpoints-list-response.json
|
||||
:language: javascript
|
||||
|
||||
|
||||
Authenticate for admin API
|
||||
==========================
|
||||
|
||||
@ -98,76 +74,3 @@ Response Example
|
||||
|
||||
.. literalinclude:: ../v2/samples/admin/authenticate-response.json
|
||||
:language: javascript
|
||||
|
||||
|
||||
Validate token
|
||||
==============
|
||||
|
||||
.. rest_method:: GET /v2.0/tokens/{tokenId}
|
||||
|
||||
Validates a token and confirms that it belongs to a tenant.
|
||||
|
||||
Returns the permissions relevant to a particular client. Valid
|
||||
tokens are in the ``/tokens/{tokenId}`` path. If the token is not
|
||||
valid, this call returns the ``itemNotFound (404)`` response code.
|
||||
This method supports an optional parameter ``belongsTo`` to check
|
||||
the token scope against the ID of a project. If the token does
|
||||
not belong to the project specified in the parameter a
|
||||
``unauthorized (401)`` response code will be returned.
|
||||
|
||||
Normal response codes: 200,203
|
||||
Error response codes: 413,405,404,403,401,400,503
|
||||
|
||||
Request
|
||||
-------
|
||||
|
||||
.. rest_parameters:: parameters.yaml
|
||||
|
||||
- tokenId: token_id_path
|
||||
- belongsTo: belongsTo
|
||||
|
||||
Response Example
|
||||
----------------
|
||||
|
||||
.. literalinclude:: samples/admin/token-validate-response.json
|
||||
:language: javascript
|
||||
|
||||
|
||||
Validate token (admin)
|
||||
======================
|
||||
|
||||
.. rest_method:: HEAD /v2.0/tokens/{tokenId}
|
||||
|
||||
Validates a token and confirms that it belongs to a tenant, for performance.
|
||||
This method supports an optional parameter ``belongsTo`` to check
|
||||
the token scope against the ID of a project. If the token does
|
||||
not belong to the project specified in the parameter a
|
||||
``unauthorized (401)`` response code will be returned.
|
||||
|
||||
Normal response codes: 200,203,204
|
||||
Error response codes: 413,405,404,403,401,400,503
|
||||
|
||||
Request
|
||||
-------
|
||||
|
||||
.. rest_parameters:: parameters.yaml
|
||||
|
||||
- tokenId: token_id_path
|
||||
- belongsTo: belongsTo
|
||||
|
||||
Delete token
|
||||
============
|
||||
|
||||
.. rest_method:: DELETE /v2.0/tokens/{tokenId}
|
||||
|
||||
Deletes a token.
|
||||
|
||||
Normal response codes: 204
|
||||
Error response codes: 413,405,404,403,401,400,503
|
||||
|
||||
Request
|
||||
-------
|
||||
|
||||
.. rest_parameters:: parameters.yaml
|
||||
|
||||
- tokenId: token_id_path
|
||||
|
@ -405,63 +405,6 @@ class AuthWithToken(object):
|
||||
self.assertIn(self.role_member['id'], roles)
|
||||
self.assertIn(self.role_admin['id'], roles)
|
||||
|
||||
def test_belongs_to_no_tenant(self):
|
||||
r = self.controller.authenticate(
|
||||
self.make_request(),
|
||||
auth={
|
||||
'passwordCredentials': {
|
||||
'username': self.user_foo['name'],
|
||||
'password': self.user_foo['password']
|
||||
}
|
||||
})
|
||||
unscoped_token_id = r['access']['token']['id']
|
||||
query_string = 'belongsTo=%s' % self.tenant_bar['id']
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.controller.validate_token,
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=unscoped_token_id)
|
||||
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.controller.validate_token_head,
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=unscoped_token_id)
|
||||
|
||||
def test_belongs_to(self):
|
||||
body_dict = _build_user_auth(
|
||||
username='foo',
|
||||
password='foo2',
|
||||
tenant_name=self.tenant_bar['name'])
|
||||
|
||||
scoped_token = self.controller.authenticate(self.make_request(),
|
||||
body_dict)
|
||||
scoped_token_id = scoped_token['access']['token']['id']
|
||||
|
||||
query_string = 'belongsTo=%s' % uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.controller.validate_token,
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=scoped_token_id)
|
||||
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.controller.validate_token_head,
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=scoped_token_id)
|
||||
|
||||
query_string = 'belongsTo=%s' % self.tenant_bar['id']
|
||||
self.controller.validate_token(
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=scoped_token_id
|
||||
)
|
||||
|
||||
self.controller.validate_token_head(
|
||||
self.make_request(is_admin=True, query_string=query_string),
|
||||
token_id=scoped_token_id
|
||||
)
|
||||
|
||||
def test_token_auth_with_binding(self):
|
||||
self.config_fixture.config(group='token', bind=['kerberos'])
|
||||
body_dict = _build_user_auth()
|
||||
@ -490,35 +433,6 @@ class AuthWithToken(object):
|
||||
bind = scoped_token['access']['token']['bind']
|
||||
self.assertEqual('foo', bind['kerberos'])
|
||||
|
||||
def test_deleting_role_assignment_does_not_revoke_unscoped_token(self):
|
||||
admin_request = self.make_request(is_admin=True)
|
||||
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id)
|
||||
self.resource_api.create_project(project['id'], project)
|
||||
role = unit.new_role_ref()
|
||||
self.role_api.create_role(role['id'], role)
|
||||
self.assignment_api.add_role_to_user_and_project(
|
||||
self.user_foo['id'], project['id'], role['id'])
|
||||
|
||||
# Get an unscoped token.
|
||||
token = self.controller.authenticate(
|
||||
self.make_request(),
|
||||
_build_user_auth(username=self.user_foo['name'],
|
||||
password=self.user_foo['password']))
|
||||
token_id = token['access']['token']['id']
|
||||
|
||||
# Ensure it is valid
|
||||
self.controller.validate_token(admin_request, token_id=token_id)
|
||||
|
||||
# Delete the role assignment, which should not invalidate the token,
|
||||
# because we're not consuming it with just an unscoped token.
|
||||
self.assignment_api.remove_role_from_user_and_project(
|
||||
self.user_foo['id'], project['id'], role['id'])
|
||||
|
||||
# Ensure it is still valid
|
||||
self.controller.validate_token(admin_request, token_id=token_id)
|
||||
|
||||
def test_only_original_audit_id_is_kept(self):
|
||||
def get_audit_ids(token):
|
||||
return token['access']['token']['audit_ids']
|
||||
@ -1089,13 +1003,6 @@ class AuthWithTrust(object):
|
||||
self.make_request(), v3_req_with_trust)
|
||||
return token_auth_response
|
||||
|
||||
def test_validate_v3_trust_scoped_token_against_v2_succeeds(self):
|
||||
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
|
||||
auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
|
||||
trust_token = auth_response.headers['X-Subject-Token']
|
||||
self.controller.validate_token(self.make_request(is_admin=True),
|
||||
trust_token)
|
||||
|
||||
def test_create_v3_token_from_trust(self):
|
||||
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
|
||||
auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
|
||||
@ -1161,27 +1068,6 @@ class AuthWithTrust(object):
|
||||
exception.Forbidden,
|
||||
self.controller.authenticate, self.make_request(), request_body)
|
||||
|
||||
def test_delete_trust_revokes_token(self):
|
||||
unscoped_token = self.get_unscoped_token(self.trustor['name'])
|
||||
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
|
||||
request = self._create_auth_request(
|
||||
unscoped_token['access']['token']['id'])
|
||||
trust_token_resp = self.fetch_v2_token_from_trust(new_trust)
|
||||
trust_scoped_token_id = trust_token_resp['access']['token']['id']
|
||||
self.controller.validate_token(
|
||||
self.make_request(is_admin=True),
|
||||
token_id=trust_scoped_token_id)
|
||||
trust_id = new_trust['id']
|
||||
|
||||
self.time_fixture.advance_time_seconds(1)
|
||||
|
||||
self.trust_controller.delete_trust(request, trust_id=trust_id)
|
||||
self.assertRaises(
|
||||
exception.TokenNotFound,
|
||||
self.controller.validate_token,
|
||||
self.make_request(is_admin=True),
|
||||
token_id=trust_scoped_token_id)
|
||||
|
||||
def test_token_from_trust_with_no_role_fails(self):
|
||||
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
|
||||
for assigned_role in self.assigned_roles:
|
||||
@ -1270,13 +1156,6 @@ class AuthWithTrust(object):
|
||||
exception.Unauthorized,
|
||||
self.controller.authenticate, self.make_request(), request_body)
|
||||
|
||||
def test_validate_trust_scoped_token_against_v2(self):
|
||||
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
|
||||
trust_token_resp = self.fetch_v2_token_from_trust(new_trust)
|
||||
trust_scoped_token_id = trust_token_resp['access']['token']['id']
|
||||
self.controller.validate_token(self.make_request(is_admin=True),
|
||||
token_id=trust_scoped_token_id)
|
||||
|
||||
def test_trust_get_token_fails_with_future_token_if_trustee_disabled(self):
|
||||
"""Test disabling trustee and using an unrevoked token.
|
||||
|
||||
@ -1341,14 +1220,6 @@ class FernetAuthWithTrust(AuthWithTrust, AuthTest):
|
||||
msg = 'The Fernet token provider does not support token persistence'
|
||||
self.skipTest(msg)
|
||||
|
||||
def test_delete_trust_revokes_token(self):
|
||||
# NOTE(amakarov): have to override this for Fernet as TokenNotFound
|
||||
# can't be raised for non-persistent token, but deleted trust will
|
||||
# cause TrustNotFound exception.
|
||||
self.assertRaises(
|
||||
exception.TrustNotFound,
|
||||
super(FernetAuthWithTrust, self).test_delete_trust_revokes_token)
|
||||
|
||||
def test_trust_get_token_fails_with_future_token_if_trustee_disabled(self):
|
||||
"""Test disabling trustee and using an unrevoked token.
|
||||
|
||||
@ -1426,10 +1297,6 @@ class TokenExpirationTest(AuthTest):
|
||||
timeutils.parse_isotime(r['access']['token']['expires'])
|
||||
)
|
||||
|
||||
def test_maintain_uuid_token_expiration(self):
|
||||
self.config_fixture.config(group='token', provider='uuid')
|
||||
self._maintain_token_expiration()
|
||||
|
||||
|
||||
class AuthCatalog(unit.SQLDriverOverrides, AuthTest):
|
||||
"""Test for the catalog provided in the auth response."""
|
||||
@ -1502,38 +1369,6 @@ class AuthCatalog(unit.SQLDriverOverrides, AuthTest):
|
||||
|
||||
self.assertEqual(exp_endpoint, endpoint)
|
||||
|
||||
def test_validate_catalog_disabled_endpoint(self):
|
||||
"""On validate, get back a catalog that excludes disabled endpoints."""
|
||||
endpoint_ref = self._create_endpoints()
|
||||
|
||||
# Authenticate
|
||||
body_dict = _build_user_auth(
|
||||
username='foo',
|
||||
password='foo2',
|
||||
tenant_name="BAR")
|
||||
|
||||
token = self.controller.authenticate(self.make_request(), body_dict)
|
||||
|
||||
# Validate
|
||||
token_id = token['access']['token']['id']
|
||||
validate_ref = self.controller.validate_token(
|
||||
self.make_request(is_admin=True),
|
||||
token_id=token_id)
|
||||
|
||||
# Check the catalog
|
||||
self.assertEqual(1, len(token['access']['serviceCatalog']))
|
||||
endpoint = validate_ref['access']['serviceCatalog'][0]['endpoints'][0]
|
||||
self.assertEqual(
|
||||
1, len(token['access']['serviceCatalog'][0]['endpoints']))
|
||||
|
||||
exp_endpoint = {
|
||||
'id': endpoint_ref['id'],
|
||||
'publicURL': endpoint_ref['url'],
|
||||
'region': endpoint_ref['region_id'],
|
||||
}
|
||||
|
||||
self.assertEqual(exp_endpoint, endpoint)
|
||||
|
||||
|
||||
class NonDefaultAuthTest(unit.TestCase):
|
||||
|
||||
|
@ -20,7 +20,6 @@ from six.moves import http_client
|
||||
from keystone.common import extension as keystone_extension
|
||||
import keystone.conf
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import default_fixtures
|
||||
from keystone.tests.unit import ksfixtures
|
||||
from keystone.tests.unit import rest
|
||||
from keystone.tests.unit.schema import v2
|
||||
@ -148,98 +147,6 @@ class CoreApiTests(object):
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
def test_validate_token(self):
|
||||
token = self.get_scoped_token()
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%(token_id)s' % {
|
||||
'token_id': token,
|
||||
},
|
||||
token=token)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
def test_invalid_token_returns_not_found(self):
|
||||
token = self.get_scoped_token()
|
||||
self.admin_request(
|
||||
path='/v2.0/tokens/%(token_id)s' % {
|
||||
'token_id': 'invalid',
|
||||
},
|
||||
token=token,
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
|
||||
def test_validate_token_service_role(self):
|
||||
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
|
||||
self.user_foo['id'],
|
||||
self.tenant_service['id'],
|
||||
self.role_service['id'])
|
||||
|
||||
token = self.get_scoped_token(
|
||||
tenant_id=default_fixtures.SERVICE_TENANT_ID)
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%s' % token,
|
||||
token=token)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
def test_remove_role_revokes_token(self):
|
||||
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
|
||||
self.user_foo['id'],
|
||||
self.tenant_service['id'],
|
||||
self.role_service['id'])
|
||||
|
||||
token = self.get_scoped_token(
|
||||
tenant_id=default_fixtures.SERVICE_TENANT_ID)
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%s' % token,
|
||||
token=token)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
self.assignment_api.remove_role_from_user_and_project(
|
||||
self.user_foo['id'],
|
||||
self.tenant_service['id'],
|
||||
self.role_service['id'])
|
||||
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%s' % token,
|
||||
token=token,
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_validate_token_belongs_to(self):
|
||||
token = self.get_scoped_token()
|
||||
path = ('/v2.0/tokens/%s?belongsTo=%s' % (token,
|
||||
self.tenant_bar['id']))
|
||||
r = self.admin_request(path=path, token=token)
|
||||
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
|
||||
|
||||
def test_validate_token_no_belongs_to_still_returns_catalog(self):
|
||||
token = self.get_scoped_token()
|
||||
path = ('/v2.0/tokens/%s' % token)
|
||||
r = self.admin_request(path=path, token=token)
|
||||
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
|
||||
|
||||
def test_validate_token_head(self):
|
||||
"""The same call as above, except using HEAD.
|
||||
|
||||
There's no response to validate here, but this is included for the
|
||||
sake of completely covering the core API.
|
||||
|
||||
"""
|
||||
token = self.get_scoped_token()
|
||||
self.admin_request(
|
||||
method='HEAD',
|
||||
path='/v2.0/tokens/%(token_id)s' % {
|
||||
'token_id': token,
|
||||
},
|
||||
token=token,
|
||||
expected_status=http_client.OK)
|
||||
|
||||
def test_endpoints(self):
|
||||
token = self.get_scoped_token()
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%(token_id)s/endpoints' % {
|
||||
'token_id': token,
|
||||
},
|
||||
token=token)
|
||||
self.assertValidEndpointListResponse(r)
|
||||
|
||||
def test_error_response(self):
|
||||
"""Trigger assertValidErrorResponse by convention."""
|
||||
self.public_request(path='/v2.0/tenants',
|
||||
@ -524,21 +431,6 @@ class V2TestCase(object):
|
||||
def get_user_attribute_from_response(self, r, attribute_name):
|
||||
return r.result['user'][attribute_name]
|
||||
|
||||
def test_fetch_revocation_list_nonadmin_fails(self):
|
||||
self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/revoked',
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_fetch_revocation_list_admin_200(self):
|
||||
token = self.get_scoped_token()
|
||||
r = self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/revoked',
|
||||
token=token,
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidRevocationListResponse(r)
|
||||
|
||||
def assertValidRevocationListResponse(self, response):
|
||||
self.assertIsNotNone(response.result['signed'])
|
||||
|
||||
@ -563,12 +455,6 @@ class V2TestCaseFernet(V2TestCase, RestfulTestCase, CoreApiTests):
|
||||
)
|
||||
)
|
||||
|
||||
def test_fetch_revocation_list_md5(self):
|
||||
self.skipTest('Revocation lists do not support Fernet')
|
||||
|
||||
def test_fetch_revocation_list_sha256(self):
|
||||
self.skipTest('Revocation lists do not support Fernet')
|
||||
|
||||
|
||||
class TestFernetTokenProviderV2(RestfulTestCase):
|
||||
|
||||
@ -622,23 +508,6 @@ class TestFernetTokenProviderV2(RestfulTestCase):
|
||||
# Fernet token must be of length 255 per usability requirements
|
||||
self.assertLess(len(unscoped_token), 255)
|
||||
|
||||
def test_validate_unscoped_token(self):
|
||||
# Grab an admin token to validate with
|
||||
project_ref = self.new_project_ref()
|
||||
self.resource_api.create_project(project_ref['id'], project_ref)
|
||||
self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
|
||||
project_ref['id'],
|
||||
self.role_admin['id'])
|
||||
admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
|
||||
unscoped_token = self.get_unscoped_token()
|
||||
path = ('/v2.0/tokens/%s' % unscoped_token)
|
||||
resp = self.admin_request(
|
||||
method='GET',
|
||||
path=path,
|
||||
token=admin_token,
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidUnscopedTokenResponse(resp)
|
||||
|
||||
def test_authenticate_scoped_token(self):
|
||||
project_ref = self.new_project_ref()
|
||||
self.resource_api.create_project(project_ref['id'], project_ref)
|
||||
@ -648,28 +517,6 @@ class TestFernetTokenProviderV2(RestfulTestCase):
|
||||
# Fernet token must be of length 255 per usability requirements
|
||||
self.assertLess(len(token), 255)
|
||||
|
||||
def test_validate_scoped_token(self):
|
||||
project_ref = self.new_project_ref()
|
||||
self.resource_api.create_project(project_ref['id'], project_ref)
|
||||
self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
|
||||
project_ref['id'],
|
||||
self.role_admin['id'])
|
||||
project2_ref = self.new_project_ref()
|
||||
self.resource_api.create_project(project2_ref['id'], project2_ref)
|
||||
self.assignment_api.add_role_to_user_and_project(
|
||||
self.user_foo['id'], project2_ref['id'], self.role_member['id'])
|
||||
admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
|
||||
member_token = self.get_scoped_token(tenant_id=project2_ref['id'])
|
||||
path = ('/v2.0/tokens/%s?belongsTo=%s' % (member_token,
|
||||
project2_ref['id']))
|
||||
# Validate token belongs to project
|
||||
resp = self.admin_request(
|
||||
method='GET',
|
||||
path=path,
|
||||
token=admin_token,
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidScopedTokenResponse(resp)
|
||||
|
||||
def test_token_authentication_and_validation(self):
|
||||
"""Test token authentication for Fernet token provider.
|
||||
|
||||
@ -685,7 +532,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
|
||||
token_id = unscoped_token
|
||||
if six.PY2:
|
||||
token_id = token_id.encode('ascii')
|
||||
r = self.public_request(
|
||||
resp = self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/tokens',
|
||||
body={
|
||||
@ -697,15 +544,6 @@ class TestFernetTokenProviderV2(RestfulTestCase):
|
||||
}
|
||||
},
|
||||
expected_status=http_client.OK)
|
||||
|
||||
token_id = self._get_token_id(r)
|
||||
path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id']))
|
||||
# Validate token belongs to project
|
||||
resp = self.admin_request(
|
||||
method='GET',
|
||||
path=path,
|
||||
token=self.get_admin_token(),
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidScopedTokenResponse(resp)
|
||||
|
||||
def test_rescoped_tokens_maintain_original_expiration(self):
|
||||
|
@ -863,20 +863,6 @@ class TokenAPITests(object):
|
||||
expected_status=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
def test_v2_validate_domain_scoped_token_returns_unauthorized(self):
|
||||
# Test that validating a domain scoped token in v2.0 returns
|
||||
# unauthorized.
|
||||
# Grant user access to domain
|
||||
self.assignment_api.create_grant(self.role['id'],
|
||||
user_id=self.user['id'],
|
||||
domain_id=self.domain['id'])
|
||||
|
||||
scoped_token = self._get_domain_scoped_token()
|
||||
self._validate_token_v2(
|
||||
scoped_token,
|
||||
expected_status=http_client.UNAUTHORIZED
|
||||
)
|
||||
|
||||
def test_create_project_scoped_token_with_project_id_and_user_id(self):
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
@ -1456,158 +1442,9 @@ class TokenAPITests(object):
|
||||
expected_status=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
def test_validate_trust_token_on_v2_fails_outside_default_domain(self):
|
||||
# NOTE(lbragstad): This fails validation against the v2.0 API because
|
||||
# the actors of the trust are not within the default domain.
|
||||
trustee_user, trust = self._create_trust()
|
||||
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
|
||||
self._validate_token_v2(
|
||||
trust_scoped_token,
|
||||
expected_status=http_client.UNAUTHORIZED
|
||||
)
|
||||
|
||||
def test_default_fixture_scope_token(self):
|
||||
self.assertIsNotNone(self.get_scoped_token())
|
||||
|
||||
def test_v3_v2_intermix_new_default_domain(self):
|
||||
# If the default_domain_id config option is changed, then should be
|
||||
# able to validate a v3 token with user in the new domain.
|
||||
|
||||
# 1) Create a new domain for the user.
|
||||
new_domain = unit.new_domain_ref()
|
||||
self.resource_api.create_domain(new_domain['id'], new_domain)
|
||||
|
||||
# 2) Create user in new domain.
|
||||
new_user = unit.create_user(self.identity_api,
|
||||
domain_id=new_domain['id'])
|
||||
|
||||
# 3) Update the default_domain_id config option to the new domain
|
||||
self.config_fixture.config(
|
||||
group='identity',
|
||||
default_domain_id=new_domain['id'])
|
||||
|
||||
# 4) Get a token using v3 API.
|
||||
v3_token = self.get_requested_token(self.build_authentication_request(
|
||||
user_id=new_user['id'],
|
||||
password=new_user['password']))
|
||||
|
||||
# 5) Validate token using v2 API.
|
||||
self.admin_request(
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token(),
|
||||
method='GET')
|
||||
|
||||
def test_v3_v2_intermix_domain_scoped_token_failed(self):
|
||||
# grant the domain role to user
|
||||
self.put(
|
||||
path='/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id']))
|
||||
|
||||
# generate a domain-scoped v3 token
|
||||
v3_token = self.get_requested_token(self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id']))
|
||||
|
||||
# domain-scoped tokens are not supported by v2
|
||||
self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token(),
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_v3_v2_intermix_non_default_project_succeed(self):
|
||||
# self.project is in a non-default domain
|
||||
v3_token = self.get_requested_token(self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
project_id=self.project['id']))
|
||||
|
||||
# v2 cannot reference projects outside the default domain
|
||||
self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token())
|
||||
|
||||
def test_v3_v2_intermix_non_default_user_succeed(self):
|
||||
self.assignment_api.create_grant(
|
||||
self.role['id'],
|
||||
user_id=self.user['id'],
|
||||
project_id=self.default_domain_project['id'])
|
||||
|
||||
# self.user is in a non-default domain
|
||||
v3_token = self.get_requested_token(self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
project_id=self.default_domain_project['id']))
|
||||
|
||||
# v2 cannot reference projects outside the default domain
|
||||
self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token())
|
||||
|
||||
def test_v3_v2_intermix_domain_scope_failed(self):
|
||||
self.assignment_api.create_grant(
|
||||
self.role['id'],
|
||||
user_id=self.default_domain_user['id'],
|
||||
domain_id=self.domain['id'])
|
||||
|
||||
v3_token = self.get_requested_token(self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
domain_id=self.domain['id']))
|
||||
|
||||
# v2 cannot reference projects outside the default domain
|
||||
self.admin_request(
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token(),
|
||||
method='GET',
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_v3_v2_unscoped_token_intermix(self):
|
||||
r = self.v3_create_token(self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password']))
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
v3_token_data = r.result
|
||||
v3_token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token(),
|
||||
method='GET')
|
||||
v2_token_data = r.result
|
||||
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
v3_token_data['token']['user']['id'])
|
||||
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
|
||||
v3_token_data['token']['expires_at'])
|
||||
|
||||
def test_v3_v2_token_intermix(self):
|
||||
r = self.v3_create_token(self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project['id']))
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
v3_token_data = r.result
|
||||
v3_token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
r = self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/%s' % v3_token,
|
||||
token=self.get_admin_token())
|
||||
v2_token_data = r.result
|
||||
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
v3_token_data['token']['user']['id'])
|
||||
self.assertTimestampEqual(v2_token_data['access']['token']['expires'],
|
||||
v3_token_data['token']['expires_at'])
|
||||
self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'],
|
||||
v3_token_data['token']['roles'][0]['name'])
|
||||
|
||||
def test_v2_v3_unscoped_token_intermix(self):
|
||||
r = self.admin_request(
|
||||
method='POST',
|
||||
|
@ -44,7 +44,6 @@ from keystone.tests.unit import ksfixtures
|
||||
from keystone.tests.unit import mapping_fixtures
|
||||
from keystone.tests.unit import test_v3
|
||||
from keystone.tests.unit import utils
|
||||
from keystone.token import controllers as token_controller
|
||||
from keystone.token.providers import common as token_common
|
||||
|
||||
|
||||
@ -2728,22 +2727,6 @@ class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
self._issue_unscoped_token)
|
||||
|
||||
def test_v2_auth_with_federation_token_fails(self):
|
||||
"""Test that using a federation token with v2 auth fails.
|
||||
|
||||
If an admin sets up a federated Keystone environment, and a user
|
||||
incorrectly configures a service (like Nova) to only use v2 auth, the
|
||||
returned message should be informative.
|
||||
|
||||
"""
|
||||
r = self._issue_unscoped_token()
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
v2_token_controller = token_controller.Auth()
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
v2_token_controller.validate_token,
|
||||
self.make_request(is_admin=True),
|
||||
token_id)
|
||||
|
||||
def test_unscoped_token_has_user_domain(self):
|
||||
r = self._issue_unscoped_token()
|
||||
self._check_domains_are_valid(r.json_body['token'])
|
||||
|
@ -275,121 +275,6 @@ class TestTrustOperations(test_v3.RestfulTestCase):
|
||||
self.post('/OS-TRUST/trusts', body={'trust': ref},
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_validate_trust_scoped_token_against_v2(self):
|
||||
# get a project-scoped token
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
token = self.get_requested_token(auth_data)
|
||||
|
||||
user = unit.new_user_ref(CONF.identity.default_domain_id)
|
||||
trustee = self.identity_api.create_user(user)
|
||||
|
||||
# create a new trust
|
||||
ref = unit.new_trust_ref(
|
||||
trustor_user_id=self.default_domain_user['id'],
|
||||
trustee_user_id=trustee['id'],
|
||||
project_id=self.default_domain_project_id,
|
||||
impersonation=False,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
||||
# get a v3 trust-scoped token as the trustee
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=trustee['id'],
|
||||
password=user['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.v3_create_token(auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(
|
||||
r, trustee)
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
self.admin_request(
|
||||
path=path,
|
||||
token=self.get_admin_token(),
|
||||
method='GET'
|
||||
)
|
||||
|
||||
def test_v3_v2_intermix_trustee_not_in_default_domain_failed(self):
|
||||
# get a project-scoped token
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
token = self.get_requested_token(auth_data)
|
||||
|
||||
# create a new trust
|
||||
ref = unit.new_trust_ref(
|
||||
trustor_user_id=self.default_domain_user_id,
|
||||
trustee_user_id=self.trustee_user_id,
|
||||
project_id=self.default_domain_project_id,
|
||||
impersonation=False,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
||||
# get a trust-scoped token as the trustee
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.trustee_user['id'],
|
||||
password=self.trustee_user['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.v3_create_token(auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(
|
||||
r, self.trustee_user)
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
self.admin_request(
|
||||
path=path, token=self.get_admin_token(),
|
||||
method='GET', expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_v3_v2_intermix_project_not_in_default_domain_failed(self):
|
||||
# create a trustee in default domain to delegate stuff to
|
||||
trustee_user = unit.create_user(self.identity_api,
|
||||
domain_id=test_v3.DEFAULT_DOMAIN_ID)
|
||||
trustee_user_id = trustee_user['id']
|
||||
|
||||
# create a new trust
|
||||
ref = unit.new_trust_ref(
|
||||
trustor_user_id=self.default_domain_user_id,
|
||||
trustee_user_id=trustee_user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=False,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
|
||||
# get a project-scoped token as the default_domain_user
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
token = self.get_requested_token(auth_data)
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
||||
# get a trust-scoped token as the trustee
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=trustee_user['id'],
|
||||
password=trustee_user['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.v3_create_token(auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r, trustee_user)
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# ensure the token is invalid against v2
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
self.admin_request(
|
||||
path=path, token=self.get_admin_token(),
|
||||
method='GET', expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_exercise_trust_scoped_token_without_impersonation(self):
|
||||
# create a new trust
|
||||
ref = unit.new_trust_ref(
|
||||
|
@ -12,11 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
from keystoneclient.common import cms
|
||||
from oslo_serialization import jsonutils
|
||||
import six
|
||||
|
||||
from keystone.common import controller
|
||||
@ -69,18 +66,6 @@ class ExternalAuthNotApplicable(Exception):
|
||||
'token_provider_api', 'trust_api')
|
||||
class Auth(controller.V2Controller):
|
||||
|
||||
@controller.v2_deprecated
|
||||
def ca_cert(self, request):
|
||||
with open(CONF.signing.ca_certs, 'r') as ca_file:
|
||||
data = ca_file.read()
|
||||
return data
|
||||
|
||||
@controller.v2_deprecated
|
||||
def signing_cert(self, request):
|
||||
with open(CONF.signing.certfile, 'r') as cert_file:
|
||||
data = cert_file.read()
|
||||
return data
|
||||
|
||||
@controller.v2_auth_deprecated
|
||||
def authenticate(self, request, auth=None):
|
||||
"""Authenticate credentials and return a token.
|
||||
@ -154,145 +139,6 @@ class Auth(controller.V2Controller):
|
||||
|
||||
return token_data
|
||||
|
||||
def _get_auth_token_data(self, user, tenant, metadata, expiry, audit_id):
|
||||
return dict(user=user,
|
||||
tenant=tenant,
|
||||
metadata=metadata,
|
||||
expires=expiry,
|
||||
parent_audit_id=audit_id)
|
||||
|
||||
def _token_belongs_to(self, token, belongs_to):
|
||||
"""Check if the token belongs to the right project.
|
||||
|
||||
:param token: token reference
|
||||
:param belongs_to: project ID that the token belongs to
|
||||
|
||||
"""
|
||||
token_data = token['access']['token']
|
||||
if ('tenant' not in token_data or
|
||||
token_data['tenant']['id'] != belongs_to):
|
||||
raise exception.Unauthorized()
|
||||
|
||||
@controller.v2_deprecated
|
||||
@controller.protected()
|
||||
def validate_token_head(self, request, token_id):
|
||||
"""Check that a token is valid.
|
||||
|
||||
Optionally, also ensure that it is owned by a specific tenant.
|
||||
|
||||
Identical to ``validate_token``, except does not return a response.
|
||||
|
||||
The code in ``keystone.common.wsgi.render_response`` will remove
|
||||
the content body.
|
||||
|
||||
"""
|
||||
v3_token_response = self.token_provider_api.validate_token(token_id)
|
||||
v2_helper = V2TokenDataHelper()
|
||||
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
|
||||
belongs_to = request.params.get('belongsTo')
|
||||
if belongs_to:
|
||||
self._token_belongs_to(token, belongs_to)
|
||||
return token
|
||||
|
||||
@controller.v2_deprecated
|
||||
@controller.protected()
|
||||
def validate_token(self, request, token_id):
|
||||
"""Check that a token is valid.
|
||||
|
||||
Optionally, also ensure that it is owned by a specific tenant.
|
||||
|
||||
Returns metadata about the token along any associated roles.
|
||||
|
||||
"""
|
||||
# TODO(ayoung) validate against revocation API
|
||||
v3_token_response = self.token_provider_api.validate_token(token_id)
|
||||
v2_helper = V2TokenDataHelper()
|
||||
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
|
||||
belongs_to = request.params.get('belongsTo')
|
||||
if belongs_to:
|
||||
self._token_belongs_to(token, belongs_to)
|
||||
return token
|
||||
|
||||
@controller.v2_deprecated
|
||||
def delete_token(self, request, token_id):
|
||||
"""Delete a token, effectively invalidating it for authz."""
|
||||
# TODO(termie): this stuff should probably be moved to middleware
|
||||
self.assert_admin(request)
|
||||
self.token_provider_api.revoke_token(token_id)
|
||||
|
||||
@controller.v2_deprecated
|
||||
@controller.protected()
|
||||
def revocation_list(self, request):
|
||||
if not CONF.token.revoke_by_id:
|
||||
raise exception.Gone()
|
||||
tokens = self.token_provider_api.list_revoked_tokens()
|
||||
|
||||
for t in tokens:
|
||||
expires = t['expires']
|
||||
if expires and isinstance(expires, datetime.datetime):
|
||||
t['expires'] = utils.isotime(expires)
|
||||
data = {'revoked': tokens}
|
||||
json_data = jsonutils.dumps(data)
|
||||
signed_text = cms.cms_sign_text(json_data,
|
||||
CONF.signing.certfile,
|
||||
CONF.signing.keyfile)
|
||||
|
||||
return {'signed': signed_text}
|
||||
|
||||
@controller.v2_deprecated
|
||||
def endpoints(self, request, token_id):
|
||||
"""Return a list of endpoints available to the token."""
|
||||
self.assert_admin(request)
|
||||
|
||||
token_data = self.token_provider_api.validate_token(token_id)
|
||||
token_ref = token_model.KeystoneToken(token_id, token_data)
|
||||
|
||||
catalog_ref = None
|
||||
if token_ref.project_id:
|
||||
catalog_ref = self.catalog_api.get_catalog(
|
||||
token_ref.user_id,
|
||||
token_ref.project_id)
|
||||
|
||||
return Auth.format_endpoint_list(catalog_ref)
|
||||
|
||||
@classmethod
|
||||
def format_endpoint_list(cls, catalog_ref):
|
||||
"""Format a list of endpoints according to Identity API v2.
|
||||
|
||||
The v2.0 API wants an endpoint list to look like::
|
||||
|
||||
{
|
||||
'endpoints': [
|
||||
{
|
||||
'id': $endpoint_id,
|
||||
'name': $SERVICE[name],
|
||||
'type': $SERVICE,
|
||||
'tenantId': $tenant_id,
|
||||
'region': $REGION,
|
||||
}
|
||||
],
|
||||
'endpoints_links': [],
|
||||
}
|
||||
|
||||
"""
|
||||
if not catalog_ref:
|
||||
return {}
|
||||
|
||||
endpoints = []
|
||||
for region_name, region_ref in catalog_ref.items():
|
||||
for service_type, service_ref in region_ref.items():
|
||||
endpoints.append({
|
||||
'id': service_ref.get('id'),
|
||||
'name': service_ref.get('name'),
|
||||
'type': service_type,
|
||||
'region': region_name,
|
||||
'publicURL': service_ref.get('publicURL'),
|
||||
'internalURL': service_ref.get('internalURL'),
|
||||
'adminURL': service_ref.get('adminURL'),
|
||||
})
|
||||
|
||||
return {'endpoints': endpoints, 'endpoints_links': []}
|
||||
|
||||
|
||||
@dependency.requires('resource_api', 'identity_api')
|
||||
class BaseAuthenticationMethod(object):
|
||||
|
@ -22,38 +22,3 @@ class Router(wsgi.ComposableRouter):
|
||||
controller=token_controller,
|
||||
action='authenticate',
|
||||
conditions=dict(method=['POST']))
|
||||
mapper.connect('/tokens/revoked',
|
||||
controller=token_controller,
|
||||
action='revocation_list',
|
||||
conditions=dict(method=['GET']))
|
||||
mapper.connect('/tokens/{token_id}',
|
||||
controller=token_controller,
|
||||
action='validate_token',
|
||||
conditions=dict(method=['GET']))
|
||||
# NOTE(morganfainberg): For policy enforcement reasons, the
|
||||
# ``validate_token_head`` method is still used for HEAD requests.
|
||||
# The controller method makes the same call as the validate_token
|
||||
# call and lets wsgi.render_response remove the body data.
|
||||
mapper.connect('/tokens/{token_id}',
|
||||
controller=token_controller,
|
||||
action='validate_token_head',
|
||||
conditions=dict(method=['HEAD']))
|
||||
mapper.connect('/tokens/{token_id}',
|
||||
controller=token_controller,
|
||||
action='delete_token',
|
||||
conditions=dict(method=['DELETE']))
|
||||
mapper.connect('/tokens/{token_id}/endpoints',
|
||||
controller=token_controller,
|
||||
action='endpoints',
|
||||
conditions=dict(method=['GET']))
|
||||
|
||||
# Certificates used to verify auth tokens
|
||||
mapper.connect('/certificates/ca',
|
||||
controller=token_controller,
|
||||
action='ca_cert',
|
||||
conditions=dict(method=['GET']))
|
||||
|
||||
mapper.connect('/certificates/signing',
|
||||
controller=token_controller,
|
||||
action='signing_cert',
|
||||
conditions=dict(method=['GET']))
|
||||
|
Loading…
x
Reference in New Issue
Block a user