Remove validate_v2_token() method

Instead of using validate_v2_token, we can effectively use the
validate_v3_token method and translate the v3 response to a v2 one.

This is a step towards simplifying the token provider API.

Change-Id: Iccb8349e0710288adb107d55437a4ff50d074b1c
This commit is contained in:
Lance Bragstad 2016-09-30 21:03:20 +00:00
parent 9aec18b0f2
commit 52bde3cf08
9 changed files with 71 additions and 171 deletions

View File

@ -594,10 +594,10 @@ class AuthWithToken(object):
self.token_provider_api.revoke_token(token_id, revoke_chain=True)
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.token_provider_api.validate_v3_token,
token_id=token_id)
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.token_provider_api.validate_v3_token,
token_id=token_2_id)
def test_revoke_by_audit_chain_id_chained_token(self):
@ -620,10 +620,10 @@ class AuthWithToken(object):
self.token_provider_api.revoke_token(token_2_id, revoke_chain=True)
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.token_provider_api.validate_v3_token,
token_id=token_id)
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.token_provider_api.validate_v3_token,
token_id=token_2_id)
def _mock_audit_info(self, parent_audit_id):

View File

@ -797,12 +797,6 @@ class TestTokenProvider(unit.TestCase):
self.token_provider_api.validate_v3_token,
None)
def test_validate_v2_token_with_no_token_raises_token_not_found(self):
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
None)
# NOTE(ayoung): renamed to avoid automatic test detection
class PKIProviderTests(object):

View File

@ -46,6 +46,7 @@ from keystone.tests.unit import ksfixtures
from keystone.tests.unit import mapping_fixtures
from keystone.tests.unit import test_v3
from keystone.tests.unit import utils
from keystone.token import controllers as token_controller
from keystone.token.providers import common as token_common
@ -2500,9 +2501,11 @@ class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
"""
r = self._issue_unscoped_token()
token_id = r.headers.get('X-Subject-Token')
v2_token_controller = token_controller.Auth()
self.assertRaises(exception.Unauthorized,
self.token_provider_api.validate_v2_token,
token_id=token_id)
v2_token_controller.validate_token,
self.make_request(is_admin=True),
token_id)
def test_unscoped_token_has_user_domain(self):
r = self._issue_unscoped_token()

View File

@ -15,9 +15,12 @@ import uuid
from six.moves import http_client
import keystone.conf
from keystone.tests import unit
from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
class TestTrustOperations(test_v3.RestfulTestCase):
"""Test module for create, read, update and delete operations on trusts.
@ -241,25 +244,35 @@ class TestTrustOperations(test_v3.RestfulTestCase):
expected_status=http_client.NOT_FOUND)
def test_validate_trust_scoped_token_against_v2(self):
# get a project-scoped token
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
token = self.get_requested_token(auth_data)
user = unit.new_user_ref(CONF.identity.default_domain_id)
trustee = self.identity_api.create_user(user)
# create a new trust
ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.default_domain_user_id,
project_id=self.project_id,
trustor_user_id=self.default_domain_user['id'],
trustee_user_id=trustee['id'],
project_id=self.default_domain_project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
# get a v3 trust-scoped token as the trustee
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
user_id=trustee['id'],
password=user['password'],
trust_id=trust['id'])
r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(
r, self.default_domain_user)
r, trustee)
token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
@ -270,7 +283,7 @@ class TestTrustOperations(test_v3.RestfulTestCase):
method='GET'
)
def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self):
def test_v3_v2_intermix_trustee_not_in_default_domain_failed(self):
# get a project-scoped token
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],

View File

@ -491,28 +491,18 @@ class TokenCacheInvalidation(object):
exception.TokenNotFound,
self.token_provider_api.validate_v3_token,
self.unscoped_token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.unscoped_token_id)
def _check_scoped_tokens_are_invalid(self):
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_v3_token,
self.scoped_token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_v2_token,
self.scoped_token_id)
def _check_scoped_tokens_are_valid(self):
self.token_provider_api.validate_v3_token(self.scoped_token_id)
self.token_provider_api.validate_v2_token(self.scoped_token_id)
def _check_unscoped_tokens_are_valid(self):
self.token_provider_api.validate_v3_token(self.unscoped_token_id)
self.token_provider_api.validate_v2_token(self.unscoped_token_id)
def test_delete_unscoped_token(self):
self.token_provider_api._persistence.delete_token(

View File

@ -30,6 +30,7 @@ from keystone import exception
from keystone.i18n import _
from keystone.models import token_model
from keystone.token import provider
from keystone.token import providers
CONF = keystone.conf.CONF
@ -175,9 +176,14 @@ class Auth(controller.V2Controller):
size=CONF.max_token_size)
try:
v3_token_data = self.token_provider_api.validate_v3_token(
old_token
)
v2_helper = providers.common.V2TokenDataHelper()
v2_token = v2_helper.v3_to_v2_token(v3_token_data, old_token)
token_model_ref = token_model.KeystoneToken(
token_id=old_token,
token_data=self.token_provider_api.validate_v2_token(old_token)
token_data=v2_token
)
except exception.NotFound as e:
raise exception.Unauthorized(e)
@ -423,7 +429,9 @@ class Auth(controller.V2Controller):
the content body.
"""
token = self.token_provider_api.validate_v2_token(token_id)
v3_token_response = self.token_provider_api.validate_v3_token(token_id)
v2_helper = providers.common.V2TokenDataHelper()
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
belongs_to = request.params.get('belongsTo')
if belongs_to:
self._token_belongs_to(token, belongs_to)
@ -440,7 +448,9 @@ class Auth(controller.V2Controller):
"""
# TODO(ayoung) validate against revocation API
token = self.token_provider_api.validate_v2_token(token_id)
v3_token_response = self.token_provider_api.validate_v3_token(token_id)
v2_helper = providers.common.V2TokenDataHelper()
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
belongs_to = request.params.get('belongsTo')
if belongs_to:
self._token_belongs_to(token, belongs_to)

View File

@ -16,7 +16,6 @@
import abc
import base64
import copy
import datetime
import sys
import uuid
@ -34,7 +33,6 @@ from keystone.i18n import _, _LE
from keystone.models import token_model
from keystone import notifications
from keystone.token import persistence
from keystone.token import providers
from keystone.token import utils
@ -217,32 +215,6 @@ class Manager(manager.Manager):
token_data, CONF.identity.default_domain_id)
self.revoke_api.check_token(token_values)
def validate_v2_token(self, token_id):
# NOTE(lbragstad): Only go to the persistence backend if the token
# provider requires it.
if self._needs_persistence:
# NOTE(morganfainberg): Ensure we never use the long-form token_id
# (PKI) as part of the cache_key.
unique_id = utils.generate_unique_id(token_id)
token_ref = self._persistence.get_token(unique_id)
token = self._validate_v2_token(token_ref)
else:
# NOTE(lbragstad): If the token doesn't require persistence, then
# it is a fernet token. The fernet token provider doesn't care if
# it's creating version 2.0 tokens or v3 tokens, so we use the same
# validate_non_persistent_token() method to validate both. Then we
# can leverage a separate method to make version 3 token data look
# like version 2.0 token data. The pattern we want to move towards
# is one where the token providers just handle data and the
# controller layers handle interpreting the token data in a format
# that makes sense for the request.
v3_token_ref = self.validate_non_persistent_token(token_id)
v2_token_data_helper = providers.common.V2TokenDataHelper()
token = v2_token_data_helper.v3_to_v2_token(v3_token_ref, token_id)
self._is_valid_token(token)
return token
def check_revocation_v3(self, token):
try:
token_data = token['token']
@ -285,10 +257,6 @@ class Manager(manager.Manager):
def validate_non_persistent_token(self, token_id):
return self.driver.validate_non_persistent_token(token_id)
@MEMOIZE_TOKENS
def _validate_v2_token(self, token_id):
return self.driver.validate_v2_token(token_id)
@MEMOIZE_TOKENS
def _validate_v3_token(self, token_id):
return self.driver.validate_v3_token(token_id)
@ -344,7 +312,17 @@ class Manager(manager.Manager):
# isn't reflexive: there is no way to populate v3 validation cache
# on issuing a token using v2.0 API.
if CONF.token.cache_on_issue:
self._validate_v2_token.set(token_data, TOKENS_REGION, token_id)
if self._needs_persistence:
validate_response = self.driver.validate_v3_token(token_ref)
else:
validate_response = self.driver.validate_non_persistent_token(
token_id
)
self._validate_v3_token.set(
validate_response,
TOKENS_REGION,
token_id
)
return token_id, token_data
@ -395,17 +373,6 @@ class Manager(manager.Manager):
self.validate_non_persistent_token.set(
token_data, TOKENS_REGION, token_id)
try:
v2_helper = providers.common.V2TokenDataHelper()
v2_token_data = v2_helper.v3_to_v2_token(
copy.deepcopy(token_data), token_id)
except exception.Unauthorized:
# Ignore trust and oauth tokens
pass
else:
self._validate_v2_token.set(
v2_token_data, TOKENS_REGION, token_id)
return token_id, token_data
def invalidate_individual_token_cache(self, token_id):
@ -418,7 +385,6 @@ class Manager(manager.Manager):
# consulted before accepting a token as valid. For now we will
# do the explicit individual token invalidation.
self._validate_v2_token.invalidate(self, token_id)
self._validate_v3_token.invalidate(self, token_id)
# This method isn't actually called in the case of non-persistent
# tokens, but we include the invalidation in case this ever changes
@ -584,20 +550,6 @@ class Provider(object):
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def validate_v2_token(self, token_ref):
"""Validate the given V2 token and return the token data.
Must raise Unauthorized exception if unable to validate token.
:param token_ref: the token reference
:type token_ref: dict
:returns: token data
:raises keystone.exception.TokenNotFound: If the token doesn't exist.
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def validate_non_persistent_token(self, token_id):
"""Validate a given non-persistent token id and return the token_data.

View File

@ -728,85 +728,6 @@ class BaseProvider(provider.Provider):
raise exception.Unauthorized()
return token_ref
def validate_v2_token(self, token_ref):
user_id = token_ref['user_id']
token_id = token_ref['id']
methods = None # list of methods used to obtain a token
bind = None # dictionary of bind methods
issued_at = None # time at which the token was issued
expires_at = None # time at which the token will expire
audit_ids = None # list of audit ids specific to the token
domain_id = None # domain scope of the token
project_id = None # project scope of the token
access_token = None # dictionary containing OAUTH1 information
token_dict = None # existing token information
trust_ref = None # dictionary containing trust scope
trust_id = token_ref.get('trust_id')
if trust_id:
trust_ref = self.trust_api.get_trust(trust_id)
token_data = token_ref.get('token_data')
if (self.get_token_version(token_data) == token.provider.V2):
methods = ['password', 'token']
bind = token_ref.get('bind')
# I have no idea why issued_at and expires_at come from two
# different places...
issued_at = token_ref['token_data']['access']['token']['issued_at']
expires_at = token_ref['expires']
audit_ids = token_ref['token_data']['access']['token'].get(
'audit_ids'
)
project_id = None
project_ref = token_ref.get('tenant')
if project_ref:
project_id = project_ref['id']
else:
methods = token_data['token']['methods']
bind = token_data['token'].get('bind')
issued_at = token_data['token']['issued_at']
expires_at = token_data['token']['expires_at']
audit_ids = token_data['token'].get('audit_ids')
domain_id = token_data['token'].get('domain', {}).get('id')
project_id = token_data['token'].get('project', {}).get('id')
access_token = None
if token_data['token'].get('OS-OAUTH1'):
access_token = {
'id': token_data['token'].get('OS-OAUTH1', {}).get(
'access_token_id'
),
'consumer_id': token_data['token'].get(
'OS-OAUTH1', {}
).get('consumer_id')
}
trust_ref = None
token_dict = None
if token_data['token']['user'].get(
federation_constants.FEDERATION):
token_dict = {'user': token_ref['user']}
# NOTE(lbragstad): Let's use the get_token_data() to rebuild the
# information about the token. Even though the token reference we get
# back formatted like a v3 token, we can use the v3_to_v2_token()
# method to enforce the v2.0 contracts and make the reference look like
# a v2 token. This beats having two different methods to build the same
# information for each format. Let's just use one and translate it when
# needed.
v3_token_data = self.v3_token_data_helper.get_token_data(
user_id,
method_names=methods,
domain_id=domain_id,
project_id=project_id,
issued_at=issued_at,
expires=expires_at,
trust=trust_ref,
token=token_dict,
bind=bind,
access_token=access_token,
audit_info=audit_ids)
return self.v2_token_data_helper.v3_to_v2_token(
v3_token_data, token_id
)
def validate_non_persistent_token(self, token_id):
try:
(user_id, methods, audit_ids, domain_id, project_id, trust_id,
@ -880,6 +801,9 @@ class BaseProvider(provider.Provider):
project_ref = token_ref.get('tenant')
if project_ref:
project_id = project_ref['id']
trust_id = token_ref.get('trust_id')
if trust_id:
trust_ref = self.trust_api.get_trust(trust_id)
else:
# NOTE(lbragstad): Otherwise assume we are validating a token that
# was created using the v3 token API.

View File

@ -0,0 +1,14 @@
---
upgrade:
- The ``validate_v2_token()`` method has been removed
from the token provider interface. The token provider
API now uses other validation methods and translates
v3 token responses to v2 format when needed. Having
``validate_v2_token()`` defined with the Ocata codebase
will fail since the interface no longer includes that
method. Please take this into consideration and plan
accordingly if you're maintaining a custom token provider.
critical:
- If writing a custom token provider, see the upgrade
section about the removal of the ``validate_v2_token()``
method.