use provider to validate tokens

Validate V2 token when passed to V3 api
Push the validation logic to the provider
token auth plugin works on data from v2 or v3

Bug 1212778

Change-Id: Ib2f77839c907d5013d33660920fa902613be4fb5
This commit is contained in:
Adam Young 2013-08-16 15:22:28 -04:00
parent dc0cfc6d50
commit 9bee61c57c
3 changed files with 116 additions and 44 deletions

View File

@ -18,7 +18,8 @@ from keystone import auth
from keystone.common import wsgi
from keystone import exception
from keystone.openstack.common import log as logging
from keystone import token
from keystone.openstack.common import timeutils
from keystone.token import provider
METHOD_NAME = 'token'
@ -28,7 +29,7 @@ LOG = logging.getLogger(__name__)
class Token(auth.AuthMethodHandler):
def __init__(self):
self.token_api = token.Manager()
self.provider = provider.Manager()
def authenticate(self, context, auth_payload, user_context):
try:
@ -36,24 +37,42 @@ class Token(auth.AuthMethodHandler):
raise exception.ValidationError(attribute='id',
target=METHOD_NAME)
token_id = auth_payload['id']
token_ref = self.token_api.get_token(token_id)
if ('OS-TRUST:trust' in token_ref['token_data']['token'] or
'trust' in token_ref['token_data']['token']):
response = self.provider.validate_token(token_id)
#for V3 tokens, the esential data is under the 'token' value.
#For V2, the comparable data was nested under 'access'
token_ref = response.get('token', response.get('access'))
#Do not allow tokens used for delegation to
#create another token, or perform any changes of
#state in Keystone. TO do so is to invite elevation of
#priviledge attacks
if 'OS-TRUST:trust' in token_ref:
raise exception.Forbidden()
if 'OS-OAUTH1' in token_ref['token_data']['token']:
if 'trust' in token_ref:
raise exception.Forbidden()
if 'trust_id' in token_ref.get('metadata', {}):
raise exception.Forbidden()
if 'OS-OAUTH1' in token_ref:
raise exception.Forbidden()
wsgi.validate_token_bind(context, token_ref)
user_context.setdefault(
'user_id', token_ref['token_data']['token']['user']['id'])
# to support Grizzly-3 to Grizzly-RC1 transition
expires_at = token_ref['token_data']['token'].get(
'expires_at', token_ref['token_data']['token'].get('expires'))
#new tokens are not allowed to extend the expiration
#time of an old token, otherwise, they could be extened
#forever. The expiration value was stored at different
#locations in v2 and v3 tokens.
expires_at = token_ref.get('expires_at')
if not expires_at:
expires_at = token_ref.get('expires')
if not expires_at:
expires_at = timeutils.normalize_time(
timeutils.parse_isotime(token_ref['token']['expires']))
user_context.setdefault('expires_at', expires_at)
user_context['extras'].update(
token_ref['token_data']['token']['extras'])
user_context['method_names'].extend(
token_ref['token_data']['token']['methods'])
user_context.setdefault('user_id', token_ref['user']['id'])
user_context['extras'].update(token_ref.get('extras', {}))
user_context['method_names'].extend(token_ref.get('methods', []))
except AssertionError as e:
LOG.error(e)
raise exception.Unauthorized(e)

View File

@ -1281,6 +1281,35 @@ class TestAuthJSON(test_v3.RestfulTestCase):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def get_v2_token(self, tenant_id=None):
body = {
'auth': {
'passwordCredentials': {
'username': self.default_domain_user['name'],
'password': self.default_domain_user['password'],
},
},
}
r = self.admin_request(method='POST', path='/v2.0/tokens', body=body)
return r
def test_validate_v2_unscoped_token_with_v3_api(self):
v2_token = self.get_v2_token().result['access']['token']['id']
auth_data = self.build_authentication_request(token=v2_token)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_validate_v2_scoped_token_with_v3_api(self):
v2_response = self.get_v2_token(
tenant_id=self.default_domain_project['id'])
result = v2_response.result
v2_token = result['access']['token']['id']
auth_data = self.build_authentication_request(
token=v2_token,
project_id=self.default_domain_project['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidScopedTokenResponse(r)
def test_invalid_user_id(self):
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,

View File

@ -448,14 +448,21 @@ class Provider(token.provider.Provider):
def _verify_token(self, token_id, belongs_to=None):
"""Verify the given token and return the token_ref."""
token_ref = self.token_api.get_token(token_id)
try:
token_ref = self.token_api.get_token(token_id)
return self._verify_token_ref(token_ref, belongs_to)
except exception.TokenNotFound:
raise exception.Unauthorized()
def _verify_token_ref(self, token_ref, belongs_to=None):
"""Verify and return the given token_ref."""
if not token_ref:
raise exception.ValidationError(_('Bad Token Reference'))
raise exception.Unauthorized()
if belongs_to:
if not (token_ref['tenant'] and
token_ref['tenant']['id'] == belongs_to):
msg = _('id does not match belongs_to')
raise exception.ValidationError(msg)
raise exception.Unauthorized()
return token_ref
def revoke_token(self, token_id):
@ -503,8 +510,11 @@ class Provider(token.provider.Provider):
raise exception.Unauthorized(msg)
def validate_v2_token(self, token_id, belongs_to=None):
token_ref = self._verify_token(token_id, belongs_to)
return self._validate_v2_token_ref(token_ref)
def _validate_v2_token_ref(self, token_ref):
try:
token_ref = self._verify_token(token_id, belongs_to)
self._assert_default_domain(token_ref)
# FIXME(gyee): performance or correctness? Should we return the
# cached token or reconstruct it? Obviously if we are going with
@ -542,32 +552,46 @@ class Provider(token.provider.Provider):
def validate_v3_token(self, token_id):
try:
token_ref = self._verify_token(token_id)
# FIXME(gyee): performance or correctness? Should we return the
# cached token or reconstruct it? Obviously if we are going with
# the cached token, any role, project, or domain name changes
# will not be reflected. One may argue that with PKI tokens,
# we are essentially doing cached token validation anyway.
# Lets go with the cached token strategy. Since token
# management layer is now pluggable, one can always provide
# their own implementation to suit their needs.
token_data = token_ref.get('token_data')
if not token_data or 'token' not in token_data:
# token ref is created by V2 API
project_id = None
project_ref = token_ref.get('tenant')
if project_ref:
project_id = project_ref['id']
token_data = self.v3_token_data_helper.get_token_data(
token_ref['user']['id'],
['password', 'token'],
{},
project_id=project_id,
bind=token_ref.get('bind'),
expires=token_ref['expires'])
token_data = self._validate_v3_token_ref(token_ref)
return token_data
except (exception.ValidationError, exception.TokenNotFound) as e:
except (exception.ValidationError,
exception.TokenNotFound,
exception.UserNotFound):
LOG.exception(_('Failed to validate token'))
raise exception.Unauthorized(e)
def _validate_v3_token_ref(self, token_ref):
# FIXME(gyee): performance or correctness? Should we return the
# cached token or reconstruct it? Obviously if we are going with
# the cached token, any role, project, or domain name changes
# will not be reflected. One may argue that with PKI tokens,
# we are essentially doing cached token validation anyway.
# Lets go with the cached token strategy. Since token
# management layer is now pluggable, one can always provide
# their own implementation to suit their needs.
token_data = token_ref.get('token_data')
if not token_data or 'token' not in token_data:
# token ref is created by V2 API
project_id = None
project_ref = token_ref.get('tenant')
if project_ref:
project_id = project_ref['id']
token_data = self.v3_token_data_helper.get_token_data(
token_ref['user']['id'],
['password', 'token'],
{},
project_id=project_id,
bind=token_ref.get('bind'),
expires=token_ref['expires'])
return token_data
def validate_token(self, token_id, belongs_to=None):
token_ref = self._verify_token(token_id, belongs_to=belongs_to)
version = self.get_token_version(token_ref)
if version == token.provider.V3:
return self._validate_v3_token_ref(token_ref)
elif version == token.provider.V2:
return self._validate_v2_token_ref(token_ref)
raise token.provider.UnsupportedTokenVersionException()
def check_v2_token(self, token_id, belongs_to=None):
try: