Add validate token for v3
There was no API to validate a token using v3. bp auth-token-use-client Change-Id: Idcd6cedde5c485b9df7a2e056d1673d3ce6cdf90
This commit is contained in:
@@ -12,12 +12,17 @@
|
|||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
import testresources
|
||||||
|
|
||||||
from keystoneclient import access
|
from keystoneclient import access
|
||||||
|
from keystoneclient import exceptions
|
||||||
from keystoneclient.tests import client_fixtures
|
from keystoneclient.tests import client_fixtures
|
||||||
from keystoneclient.tests.v3 import utils
|
from keystoneclient.tests.v3 import utils
|
||||||
|
|
||||||
|
|
||||||
class TokenTests(utils.TestCase):
|
class TokenTests(utils.TestCase, testresources.ResourcedTestCase):
|
||||||
|
|
||||||
|
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
|
||||||
|
|
||||||
def test_revoke_token_with_token_id(self):
|
def test_revoke_token_with_token_id(self):
|
||||||
token_id = uuid.uuid4().hex
|
token_id = uuid.uuid4().hex
|
||||||
@@ -27,8 +32,8 @@ class TokenTests(utils.TestCase):
|
|||||||
|
|
||||||
def test_revoke_token_with_access_info_instance(self):
|
def test_revoke_token_with_access_info_instance(self):
|
||||||
token_id = uuid.uuid4().hex
|
token_id = uuid.uuid4().hex
|
||||||
examples = self.useFixture(client_fixtures.Examples())
|
token_ref = self.examples.TOKEN_RESPONSES[
|
||||||
token_ref = examples.TOKEN_RESPONSES[examples.v3_UUID_TOKEN_DEFAULT]
|
self.examples.v3_UUID_TOKEN_DEFAULT]
|
||||||
token = access.AccessInfoV3(token_id, token_ref['token'])
|
token = access.AccessInfoV3(token_id, token_ref['token'])
|
||||||
self.stub_url('DELETE', ['/auth/tokens'], status_code=204)
|
self.stub_url('DELETE', ['/auth/tokens'], status_code=204)
|
||||||
self.client.tokens.revoke_token(token)
|
self.client.tokens.revoke_token(token)
|
||||||
@@ -40,3 +45,66 @@ class TokenTests(utils.TestCase):
|
|||||||
json=sample_revoked_response)
|
json=sample_revoked_response)
|
||||||
resp = self.client.tokens.get_revoked()
|
resp = self.client.tokens.get_revoked()
|
||||||
self.assertEqual(sample_revoked_response, resp)
|
self.assertEqual(sample_revoked_response, resp)
|
||||||
|
|
||||||
|
def test_validate_token_with_token_id(self):
|
||||||
|
# Can validate a token passing a string token ID.
|
||||||
|
token_id = uuid.uuid4().hex
|
||||||
|
token_ref = self.examples.TOKEN_RESPONSES[
|
||||||
|
self.examples.v3_UUID_TOKEN_DEFAULT]
|
||||||
|
self.stub_url('GET', ['auth', 'tokens'],
|
||||||
|
headers={'X-Subject-Token': token_id, }, json=token_ref)
|
||||||
|
access_info = self.client.tokens.validate(token_id)
|
||||||
|
|
||||||
|
self.assertRequestHeaderEqual('X-Subject-Token', token_id)
|
||||||
|
self.assertIsInstance(access_info, access.AccessInfoV3)
|
||||||
|
self.assertEqual(token_id, access_info.auth_token)
|
||||||
|
|
||||||
|
def test_validate_token_with_access_info(self):
|
||||||
|
# Can validate a token passing an access info.
|
||||||
|
token_id = uuid.uuid4().hex
|
||||||
|
token_ref = self.examples.TOKEN_RESPONSES[
|
||||||
|
self.examples.v3_UUID_TOKEN_DEFAULT]
|
||||||
|
token = access.AccessInfoV3(token_id, token_ref['token'])
|
||||||
|
self.stub_url('GET', ['auth', 'tokens'],
|
||||||
|
headers={'X-Subject-Token': token_id, }, json=token_ref)
|
||||||
|
access_info = self.client.tokens.validate(token)
|
||||||
|
|
||||||
|
self.assertRequestHeaderEqual('X-Subject-Token', token_id)
|
||||||
|
self.assertIsInstance(access_info, access.AccessInfoV3)
|
||||||
|
self.assertEqual(token_id, access_info.auth_token)
|
||||||
|
|
||||||
|
def test_validate_token_invalid(self):
|
||||||
|
# When the token is invalid the server typically returns a 404.
|
||||||
|
token_id = uuid.uuid4().hex
|
||||||
|
self.stub_url('GET', ['auth', 'tokens'], status_code=404)
|
||||||
|
self.assertRaises(exceptions.NotFound,
|
||||||
|
self.client.tokens.validate, token_id)
|
||||||
|
|
||||||
|
def test_validate_token_catalog(self):
|
||||||
|
# Can validate a token and a catalog is requested by default.
|
||||||
|
token_id = uuid.uuid4().hex
|
||||||
|
token_ref = self.examples.TOKEN_RESPONSES[
|
||||||
|
self.examples.v3_UUID_TOKEN_DEFAULT]
|
||||||
|
self.stub_url('GET', ['auth', 'tokens'],
|
||||||
|
headers={'X-Subject-Token': token_id, }, json=token_ref)
|
||||||
|
access_info = self.client.tokens.validate(token_id)
|
||||||
|
|
||||||
|
self.assertQueryStringIs()
|
||||||
|
self.assertTrue(access_info.has_service_catalog())
|
||||||
|
|
||||||
|
def test_validate_token_nocatalog(self):
|
||||||
|
# Can validate a token and request no catalog.
|
||||||
|
token_id = uuid.uuid4().hex
|
||||||
|
token_ref = self.examples.TOKEN_RESPONSES[
|
||||||
|
self.examples.v3_UUID_TOKEN_UNSCOPED]
|
||||||
|
self.stub_url('GET', ['auth', 'tokens'],
|
||||||
|
headers={'X-Subject-Token': token_id, }, json=token_ref)
|
||||||
|
access_info = self.client.tokens.validate(token_id,
|
||||||
|
include_catalog=False)
|
||||||
|
|
||||||
|
self.assertQueryStringIs('nocatalog')
|
||||||
|
self.assertFalse(access_info.has_service_catalog())
|
||||||
|
|
||||||
|
|
||||||
|
def load_tests(loader, tests, pattern):
|
||||||
|
return testresources.OptimisingTestSuite(tests)
|
||||||
|
@@ -12,6 +12,14 @@
|
|||||||
|
|
||||||
from keystoneclient import access
|
from keystoneclient import access
|
||||||
from keystoneclient import base
|
from keystoneclient import base
|
||||||
|
from keystoneclient import utils
|
||||||
|
|
||||||
|
|
||||||
|
def _calc_id(token):
|
||||||
|
if isinstance(token, access.AccessInfo):
|
||||||
|
return token.auth_token
|
||||||
|
|
||||||
|
return base.getid(token)
|
||||||
|
|
||||||
|
|
||||||
class TokenManager(object):
|
class TokenManager(object):
|
||||||
@@ -28,10 +36,7 @@ class TokenManager(object):
|
|||||||
token_id.
|
token_id.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(token, access.AccessInfo):
|
token_id = _calc_id(token)
|
||||||
token_id = token.auth_token
|
|
||||||
else:
|
|
||||||
token_id = base.getid(token)
|
|
||||||
headers = {'X-Subject-Token': token_id}
|
headers = {'X-Subject-Token': token_id}
|
||||||
return self._client.delete('/auth/tokens', headers=headers)
|
return self._client.delete('/auth/tokens', headers=headers)
|
||||||
|
|
||||||
@@ -45,3 +50,29 @@ class TokenManager(object):
|
|||||||
|
|
||||||
resp, body = self._client.get('/auth/tokens/OS-PKI/revoked')
|
resp, body = self._client.get('/auth/tokens/OS-PKI/revoked')
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
@utils.positional.method(1)
|
||||||
|
def validate(self, token, include_catalog=True):
|
||||||
|
"""Validate a token.
|
||||||
|
|
||||||
|
:param token: Token to be validated. This can be an instance of
|
||||||
|
:py:class:`keystoneclient.access.AccessInfo` or a string
|
||||||
|
token_id.
|
||||||
|
:param include_catalog: If False, the response is requested to not
|
||||||
|
include the catalog.
|
||||||
|
|
||||||
|
:rtype: :py:class:`keystoneclient.access.AccessInfoV3`
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
token_id = _calc_id(token)
|
||||||
|
headers = {'X-Subject-Token': token_id}
|
||||||
|
|
||||||
|
url = '/auth/tokens'
|
||||||
|
if not include_catalog:
|
||||||
|
url += '?nocatalog'
|
||||||
|
|
||||||
|
resp, body = self._client.get(url, headers=headers)
|
||||||
|
|
||||||
|
access_info = access.AccessInfo.factory(resp=resp, body=body)
|
||||||
|
return access_info
|
||||||
|
Reference in New Issue
Block a user