Move V2TokenDataHelper to the v2.0 controller

This was only really used by the controller layer - so this commit
makes it so that it lives there.

Change-Id: I5670e3bb33792c48539f7c861a51ba2b52c6ea9d
This commit is contained in:
Lance Bragstad 2016-10-20 22:44:52 +00:00
parent 8307f2c155
commit fd54718181
3 changed files with 179 additions and 181 deletions

View File

@ -147,7 +147,7 @@ class Auth(controller.V2Controller):
user_ref['id'], ['password'], expires_at=expiry,
project_id=project_id, trust=trust_ref, parent_audit_id=audit_id,
auth_context=auth_context)
v2_helper = common.V2TokenDataHelper()
v2_helper = V2TokenDataHelper()
token_data = v2_helper.v3_to_v2_token(token_data, token_id)
# NOTE(wanghong): We consume a trust use only when we are using trusts
@ -190,7 +190,7 @@ class Auth(controller.V2Controller):
"""
v3_token_response = self.token_provider_api.validate_token(token_id)
v2_helper = common.V2TokenDataHelper()
v2_helper = V2TokenDataHelper()
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
belongs_to = request.params.get('belongsTo')
if belongs_to:
@ -209,7 +209,7 @@ class Auth(controller.V2Controller):
"""
# TODO(ayoung) validate against revocation API
v3_token_response = self.token_provider_api.validate_token(token_id)
v2_helper = common.V2TokenDataHelper()
v2_helper = V2TokenDataHelper()
token = v2_helper.v3_to_v2_token(v3_token_response, token_id)
belongs_to = request.params.get('belongsTo')
if belongs_to:
@ -390,7 +390,7 @@ class TokenAuthenticationMethod(BaseAuthenticationMethod):
# v3_to_v2_token, because federated tokens aren't supported by
# v2.0 (the same applies to OAuth tokens, domain-scoped tokens,
# etc..).
v2_helper = common.V2TokenDataHelper()
v2_helper = V2TokenDataHelper()
v2_helper.v3_to_v2_token(v3_token_data, old_token)
token_model_ref = token_model.KeystoneToken(
token_id=old_token,
@ -517,3 +517,176 @@ class ExternalAuthenticationMethod(BaseAuthenticationMethod):
bind = {'kerberos': username}
audit_id = None
return (user_ref, tenant_id, expiry, bind, audit_id)
@dependency.requires('catalog_api', 'resource_api', 'assignment_api',
'trust_api', 'identity_api')
class V2TokenDataHelper(object):
"""Create V2 token data."""
def v3_to_v2_token(self, v3_token_data, token_id):
"""Convert v3 token data into v2.0 token data.
This method expects a dictionary generated from
V3TokenDataHelper.get_token_data() and converts it to look like a v2.0
token dictionary.
:param v3_token_data: dictionary formatted for v3 tokens
:param token_id: ID of the token being converted
:returns: dictionary formatted for v2 tokens
:raises keystone.exception.Unauthorized: If a specific token type is
not supported in v2.
"""
token_data = {}
# Build v2 token
v3_token = v3_token_data['token']
# NOTE(lbragstad): Version 2.0 tokens don't know about any domain other
# than the default domain specified in the configuration.
domain_id = v3_token.get('domain', {}).get('id')
if domain_id and CONF.identity.default_domain_id != domain_id:
msg = ('Unable to validate domain-scoped tokens outside of the '
'default domain')
raise exception.Unauthorized(msg)
token = {}
token['expires'] = v3_token.get('expires_at')
token['issued_at'] = v3_token.get('issued_at')
token['audit_ids'] = v3_token.get('audit_ids')
if v3_token.get('bind'):
token['bind'] = v3_token['bind']
token['id'] = token_id
if 'project' in v3_token:
# v3 token_data does not contain all tenant attributes
tenant = self.resource_api.get_project(
v3_token['project']['id'])
# Drop domain specific fields since v2 calls are not domain-aware.
token['tenant'] = controller.V2Controller.v3_to_v2_project(
tenant)
token_data['token'] = token
# Build v2 user
v3_user = v3_token['user']
user = controller.V2Controller.v3_to_v2_user(v3_user)
if 'OS-TRUST:trust' in v3_token:
v3_trust = v3_token['OS-TRUST:trust']
# if token is scoped to trust, both trustor and trustee must
# be in the default domain. Furthermore, the delegated project
# must also be in the default domain
msg = _('Non-default domain is not supported')
if CONF.trust.enabled:
try:
trust_ref = self.trust_api.get_trust(v3_trust['id'])
except exception.TrustNotFound:
raise exception.TokenNotFound(token_id=token_id)
trustee_user_ref = self.identity_api.get_user(
trust_ref['trustee_user_id'])
if (trustee_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
trustor_user_ref = self.identity_api.get_user(
trust_ref['trustor_user_id'])
if (trustor_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
if trust_ref.get('project_id'):
project_ref = self.resource_api.get_project(
trust_ref['project_id'])
if (project_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
token_data['trust'] = {
'impersonation': v3_trust['impersonation'],
'id': v3_trust['id'],
'trustee_user_id': v3_trust['trustee_user']['id'],
'trustor_user_id': v3_trust['trustor_user']['id']
}
if 'OS-OAUTH1' in v3_token:
msg = ('Unable to validate Oauth tokens using the version v2.0 '
'API.')
raise exception.Unauthorized(msg)
if 'OS-FEDERATION' in v3_token['user']:
msg = _('Unable to validate Federation tokens using the version '
'v2.0 API.')
raise exception.Unauthorized(msg)
# Set user roles
user['roles'] = []
role_ids = []
for role in v3_token.get('roles', []):
role_ids.append(role.pop('id'))
user['roles'].append(role)
user['roles_links'] = []
token_data['user'] = user
# Get and build v2 service catalog
token_data['serviceCatalog'] = []
if 'tenant' in token:
catalog_ref = self.catalog_api.get_catalog(
user['id'], token['tenant']['id'])
if catalog_ref:
token_data['serviceCatalog'] = self.format_catalog(catalog_ref)
# Build v2 metadata
metadata = {}
metadata['roles'] = role_ids
# Setting is_admin to keep consistency in v2 response
metadata['is_admin'] = 0
token_data['metadata'] = metadata
return {'access': token_data}
@classmethod
def format_catalog(cls, catalog_ref):
"""Munge catalogs from internal to output format.
Internal catalogs look like::
{$REGION: {
{$SERVICE: {
$key1: $value1,
...
}
}
}
The legacy api wants them to look like::
[{'name': $SERVICE[name],
'type': $SERVICE,
'endpoints': [{
'tenantId': $tenant_id,
...
'region': $REGION,
}],
'endpoints_links': [],
}]
"""
if not catalog_ref:
return []
services = {}
for region, region_ref in catalog_ref.items():
for service, service_ref in region_ref.items():
new_service_ref = services.get(service, {})
new_service_ref['name'] = service_ref.pop('name')
new_service_ref['type'] = service
new_service_ref['endpoints_links'] = []
service_ref['region'] = region
endpoints_ref = new_service_ref.get('endpoints', [])
endpoints_ref.append(service_ref)
new_service_ref['endpoints'] = endpoints_ref
services[service] = new_service_ref
return list(services.values())

View File

@ -24,7 +24,6 @@ from oslo_utils import timeutils
import six
from six.moves.urllib import parse
from keystone.common import controller as common_controller
from keystone.common import dependency
from keystone.common import utils
import keystone.conf
@ -83,179 +82,6 @@ def build_audit_info(parent_audit_id=None):
return [audit_id]
@dependency.requires('catalog_api', 'resource_api', 'assignment_api',
'trust_api', 'identity_api')
class V2TokenDataHelper(object):
"""Create V2 token data."""
def v3_to_v2_token(self, v3_token_data, token_id):
"""Convert v3 token data into v2.0 token data.
This method expects a dictionary generated from
V3TokenDataHelper.get_token_data() and converts it to look like a v2.0
token dictionary.
:param v3_token_data: dictionary formatted for v3 tokens
:param token_id: ID of the token being converted
:returns: dictionary formatted for v2 tokens
:raises keystone.exception.Unauthorized: If a specific token type is
not supported in v2.
"""
token_data = {}
# Build v2 token
v3_token = v3_token_data['token']
# NOTE(lbragstad): Version 2.0 tokens don't know about any domain other
# than the default domain specified in the configuration.
domain_id = v3_token.get('domain', {}).get('id')
if domain_id and CONF.identity.default_domain_id != domain_id:
msg = ('Unable to validate domain-scoped tokens outside of the '
'default domain')
raise exception.Unauthorized(msg)
token = {}
token['expires'] = v3_token.get('expires_at')
token['issued_at'] = v3_token.get('issued_at')
token['audit_ids'] = v3_token.get('audit_ids')
if v3_token.get('bind'):
token['bind'] = v3_token['bind']
token['id'] = token_id
if 'project' in v3_token:
# v3 token_data does not contain all tenant attributes
tenant = self.resource_api.get_project(
v3_token['project']['id'])
# Drop domain specific fields since v2 calls are not domain-aware.
token['tenant'] = common_controller.V2Controller.v3_to_v2_project(
tenant)
token_data['token'] = token
# Build v2 user
v3_user = v3_token['user']
user = common_controller.V2Controller.v3_to_v2_user(v3_user)
if 'OS-TRUST:trust' in v3_token:
v3_trust = v3_token['OS-TRUST:trust']
# if token is scoped to trust, both trustor and trustee must
# be in the default domain. Furthermore, the delegated project
# must also be in the default domain
msg = _('Non-default domain is not supported')
if CONF.trust.enabled:
try:
trust_ref = self.trust_api.get_trust(v3_trust['id'])
except exception.TrustNotFound:
raise exception.TokenNotFound(token_id=token_id)
trustee_user_ref = self.identity_api.get_user(
trust_ref['trustee_user_id'])
if (trustee_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
trustor_user_ref = self.identity_api.get_user(
trust_ref['trustor_user_id'])
if (trustor_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
if trust_ref.get('project_id'):
project_ref = self.resource_api.get_project(
trust_ref['project_id'])
if (project_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
token_data['trust'] = {
'impersonation': v3_trust['impersonation'],
'id': v3_trust['id'],
'trustee_user_id': v3_trust['trustee_user']['id'],
'trustor_user_id': v3_trust['trustor_user']['id']
}
if 'OS-OAUTH1' in v3_token:
msg = ('Unable to validate Oauth tokens using the version v2.0 '
'API.')
raise exception.Unauthorized(msg)
if 'OS-FEDERATION' in v3_token['user']:
msg = _('Unable to validate Federation tokens using the version '
'v2.0 API.')
raise exception.Unauthorized(msg)
# Set user roles
user['roles'] = []
role_ids = []
for role in v3_token.get('roles', []):
role_ids.append(role.pop('id'))
user['roles'].append(role)
user['roles_links'] = []
token_data['user'] = user
# Get and build v2 service catalog
token_data['serviceCatalog'] = []
if 'tenant' in token:
catalog_ref = self.catalog_api.get_catalog(
user['id'], token['tenant']['id'])
if catalog_ref:
token_data['serviceCatalog'] = self.format_catalog(catalog_ref)
# Build v2 metadata
metadata = {}
metadata['roles'] = role_ids
# Setting is_admin to keep consistency in v2 response
metadata['is_admin'] = 0
token_data['metadata'] = metadata
return {'access': token_data}
@classmethod
def format_catalog(cls, catalog_ref):
"""Munge catalogs from internal to output format.
Internal catalogs look like::
{$REGION: {
{$SERVICE: {
$key1: $value1,
...
}
}
}
The legacy api wants them to look like::
[{'name': $SERVICE[name],
'type': $SERVICE,
'endpoints': [{
'tenantId': $tenant_id,
...
'region': $REGION,
}],
'endpoints_links': [],
}]
"""
if not catalog_ref:
return []
services = {}
for region, region_ref in catalog_ref.items():
for service, service_ref in region_ref.items():
new_service_ref = services.get(service, {})
new_service_ref['name'] = service_ref.pop('name')
new_service_ref['type'] = service
new_service_ref['endpoints_links'] = []
service_ref['region'] = region
endpoints_ref = new_service_ref.get('endpoints', [])
endpoints_ref.append(service_ref)
new_service_ref['endpoints'] = endpoints_ref
services[service] = new_service_ref
return list(services.values())
@dependency.requires('assignment_api', 'catalog_api', 'federation_api',
'identity_api', 'resource_api', 'role_api', 'trust_api',
'oauth_api')
@ -595,7 +421,6 @@ class BaseProvider(base.Provider):
def __init__(self, *args, **kwargs):
super(BaseProvider, self).__init__(*args, **kwargs)
self.v3_token_data_helper = V3TokenDataHelper()
self.v2_token_data_helper = V2TokenDataHelper()
def get_token_version(self, token_data):
if token_data and isinstance(token_data, dict):

View File

@ -21,7 +21,7 @@ from keystone import exception
from keystone.i18n import _
from keystone import identity
from keystone.models import token_model
from keystone.token.providers import common
from keystone.token import controllers as token_controllers
LOG = log.getLogger(__name__)
@ -101,7 +101,7 @@ class UserController(identity.controllers.User):
token_ref.user_id, token_ref.methods,
project_id=token_ref.project_id,
parent_audit_id=token_ref.audit_chain_id)
v2_helper = common.V2TokenDataHelper()
v2_helper = token_controllers.V2TokenDataHelper()
v2_token_data = v2_helper.v3_to_v2_token(new_token_data, new_token_id)
LOG.debug('TOKEN_REF %s', new_token_data)
return v2_token_data