Merge "Remove the rest of v2.0 legacy"
This commit is contained in:
commit
ee4fbf619b
@ -33,30 +33,6 @@ LOG = log.getLogger(__name__)
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
|
||||
def v2_ec2_deprecated(f):
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
deprecated = versionutils.deprecated(
|
||||
what=f.__name__ + ' of the v2 EC2 APIs',
|
||||
as_of=versionutils.deprecated.MITAKA,
|
||||
in_favor_of=('a similar function in the v3 Credential APIs'),
|
||||
remove_in=+7)
|
||||
return deprecated(f)
|
||||
return wrapper()
|
||||
|
||||
|
||||
def v2_auth_deprecated(f):
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
deprecated = versionutils.deprecated(
|
||||
what=f.__name__ + ' of the v2 Authentication APIs',
|
||||
as_of=versionutils.deprecated.MITAKA,
|
||||
in_favor_of=('a similar function in the v3 Authentication APIs'),
|
||||
remove_in=+7)
|
||||
return deprecated(f)
|
||||
return wrapper()
|
||||
|
||||
|
||||
def protected(callback=None):
|
||||
"""Wrap API calls with role based access controls (RBAC).
|
||||
|
||||
@ -146,77 +122,6 @@ def protected_wrapper(self, f, check_function, request, filter_attr,
|
||||
check_function(self, request, prep_info, *args, **kwargs)
|
||||
|
||||
|
||||
class V2Controller(provider_api.ProviderAPIMixin, wsgi.Application):
|
||||
"""Base controller class for Identity API v2."""
|
||||
|
||||
@staticmethod
|
||||
def v3_to_v2_user(ref):
|
||||
"""Convert a user_ref from v3 to v2 compatible.
|
||||
|
||||
* v2.0 users are not domain aware, and should have domain_id removed
|
||||
* v2.0 users expect the use of tenantId instead of default_project_id
|
||||
* v2.0 users have a username attribute
|
||||
* v2.0 remove password_expires_at
|
||||
|
||||
If ref is a list type, we will iterate through each element and do the
|
||||
conversion.
|
||||
"""
|
||||
def _format_default_project_id(ref):
|
||||
"""Convert default_project_id to tenantId for v2 calls."""
|
||||
default_project_id = ref.pop('default_project_id', None)
|
||||
if default_project_id is not None:
|
||||
ref['tenantId'] = default_project_id
|
||||
elif 'tenantId' in ref:
|
||||
# NOTE(morganfainberg): To avoid v2.0 confusion if somehow a
|
||||
# tenantId property sneaks its way into the extra blob on the
|
||||
# user, we remove it here. If default_project_id is set, we
|
||||
# would override it in either case.
|
||||
del ref['tenantId']
|
||||
|
||||
def _normalize_and_filter_user_properties(ref):
|
||||
_format_default_project_id(ref)
|
||||
ref.pop('password_expires_at', None)
|
||||
ref.pop('domain', None)
|
||||
ref.pop('domain_id', None)
|
||||
if 'username' not in ref and 'name' in ref:
|
||||
ref['username'] = ref['name']
|
||||
return ref
|
||||
|
||||
if isinstance(ref, dict):
|
||||
return _normalize_and_filter_user_properties(ref)
|
||||
elif isinstance(ref, list):
|
||||
return [_normalize_and_filter_user_properties(x) for x in ref]
|
||||
else:
|
||||
raise ValueError(_('Expected dict or list: %s') % type(ref))
|
||||
|
||||
@staticmethod
|
||||
def v3_to_v2_project(ref):
|
||||
"""Convert a project_ref from v3 to v2.
|
||||
|
||||
* v2.0 projects are not domain aware, and should have domain_id removed
|
||||
* v2.0 projects are not hierarchy aware, and should have parent_id
|
||||
removed
|
||||
|
||||
This method should only be applied to project_refs being returned from
|
||||
the v2.0 controller(s).
|
||||
|
||||
If ref is a list type, we will iterate through each element and do the
|
||||
conversion.
|
||||
"""
|
||||
def _filter_project_properties(ref):
|
||||
ref.pop('domain_id', None)
|
||||
ref.pop('parent_id', None)
|
||||
ref.pop('is_domain', None)
|
||||
return ref
|
||||
|
||||
if isinstance(ref, dict):
|
||||
return _filter_project_properties(ref)
|
||||
elif isinstance(ref, list):
|
||||
return [_filter_project_properties(x) for x in ref]
|
||||
else:
|
||||
raise ValueError(_('Expected dict or list: %s') % type(ref))
|
||||
|
||||
|
||||
class V3Controller(provider_api.ProviderAPIMixin, wsgi.Application):
|
||||
"""Base controller class for Identity API v3.
|
||||
|
||||
|
@ -15,5 +15,4 @@
|
||||
from keystone.contrib.ec2 import controllers # noqa
|
||||
from keystone.contrib.ec2.core import * # noqa
|
||||
from keystone.contrib.ec2 import routers # noqa
|
||||
from keystone.contrib.ec2.routers import Ec2Extension # noqa
|
||||
from keystone.contrib.ec2.routers import Routers as Ec2ExtensionV3 # noqa
|
||||
|
@ -41,7 +41,6 @@ from oslo_serialization import jsonutils
|
||||
import six
|
||||
from six.moves import http_client
|
||||
|
||||
from keystone.common import authorization
|
||||
from keystone.common import controller
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import utils
|
||||
@ -55,177 +54,6 @@ CONF = keystone.conf.CONF
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class V2TokenDataHelper(provider_api.ProviderAPIMixin, object):
|
||||
"""Create V2 token data."""
|
||||
|
||||
def v3_to_v2_token(self, v3_token_data, token_id):
|
||||
"""Convert v3 token data into v2.0 token data.
|
||||
|
||||
This method expects a dictionary generated from
|
||||
V3TokenDataHelper.get_token_data() and converts it to look like a v2.0
|
||||
token dictionary.
|
||||
|
||||
:param v3_token_data: dictionary formatted for v3 tokens
|
||||
:param token_id: ID of the token being converted
|
||||
:returns: dictionary formatted for v2 tokens
|
||||
:raises keystone.exception.Unauthorized: If a specific token type is
|
||||
not supported in v2.
|
||||
|
||||
"""
|
||||
token_data = {}
|
||||
# Build v2 token
|
||||
v3_token = v3_token_data['token']
|
||||
|
||||
# NOTE(lbragstad): Version 2.0 tokens don't know about any domain other
|
||||
# than the default domain specified in the configuration.
|
||||
domain_id = v3_token.get('domain', {}).get('id')
|
||||
if domain_id and CONF.identity.default_domain_id != domain_id:
|
||||
msg = ('Unable to validate domain-scoped tokens outside of the '
|
||||
'default domain')
|
||||
raise exception.Unauthorized(msg)
|
||||
|
||||
token = {}
|
||||
token['expires'] = v3_token.get('expires_at')
|
||||
token['issued_at'] = v3_token.get('issued_at')
|
||||
token['audit_ids'] = v3_token.get('audit_ids')
|
||||
if v3_token.get('bind'):
|
||||
token['bind'] = v3_token['bind']
|
||||
token['id'] = token_id
|
||||
|
||||
if 'project' in v3_token:
|
||||
# v3 token_data does not contain all tenant attributes
|
||||
tenant = PROVIDERS.resource_api.get_project(
|
||||
v3_token['project']['id'])
|
||||
# Drop domain specific fields since v2 calls are not domain-aware.
|
||||
token['tenant'] = controller.V2Controller.v3_to_v2_project(
|
||||
tenant)
|
||||
token_data['token'] = token
|
||||
|
||||
# Build v2 user
|
||||
v3_user = v3_token['user']
|
||||
|
||||
user = controller.V2Controller.v3_to_v2_user(v3_user)
|
||||
|
||||
if 'OS-TRUST:trust' in v3_token:
|
||||
v3_trust = v3_token['OS-TRUST:trust']
|
||||
# if token is scoped to trust, both trustor and trustee must
|
||||
# be in the default domain. Furthermore, the delegated project
|
||||
# must also be in the default domain
|
||||
msg = _('Non-default domain is not supported')
|
||||
if CONF.trust.enabled:
|
||||
try:
|
||||
trust_ref = PROVIDERS.trust_api.get_trust(v3_trust['id'])
|
||||
except exception.TrustNotFound:
|
||||
raise exception.TokenNotFound(token_id=token_id)
|
||||
trustee_user_ref = PROVIDERS.identity_api.get_user(
|
||||
trust_ref['trustee_user_id'])
|
||||
if (trustee_user_ref['domain_id'] !=
|
||||
CONF.identity.default_domain_id):
|
||||
raise exception.Unauthorized(msg)
|
||||
trustor_user_ref = PROVIDERS.identity_api.get_user(
|
||||
trust_ref['trustor_user_id'])
|
||||
if (trustor_user_ref['domain_id'] !=
|
||||
CONF.identity.default_domain_id):
|
||||
raise exception.Unauthorized(msg)
|
||||
if trust_ref.get('project_id'):
|
||||
project_ref = PROVIDERS.resource_api.get_project(
|
||||
trust_ref['project_id'])
|
||||
if (project_ref['domain_id'] !=
|
||||
CONF.identity.default_domain_id):
|
||||
raise exception.Unauthorized(msg)
|
||||
|
||||
token_data['trust'] = {
|
||||
'impersonation': v3_trust['impersonation'],
|
||||
'id': v3_trust['id'],
|
||||
'trustee_user_id': v3_trust['trustee_user']['id'],
|
||||
'trustor_user_id': v3_trust['trustor_user']['id']
|
||||
}
|
||||
|
||||
if 'OS-OAUTH1' in v3_token:
|
||||
msg = ('Unable to validate Oauth tokens using the version v2.0 '
|
||||
'API.')
|
||||
raise exception.Unauthorized(msg)
|
||||
|
||||
if 'OS-FEDERATION' in v3_token['user']:
|
||||
msg = _('Unable to validate Federation tokens using the version '
|
||||
'v2.0 API.')
|
||||
raise exception.Unauthorized(msg)
|
||||
|
||||
# Set user roles
|
||||
user['roles'] = []
|
||||
role_ids = []
|
||||
for role in v3_token.get('roles', []):
|
||||
role_ids.append(role.pop('id'))
|
||||
user['roles'].append(role)
|
||||
user['roles_links'] = []
|
||||
|
||||
token_data['user'] = user
|
||||
|
||||
# Get and build v2 service catalog
|
||||
token_data['serviceCatalog'] = []
|
||||
if 'tenant' in token:
|
||||
catalog_ref = PROVIDERS.catalog_api.get_catalog(
|
||||
user['id'], token['tenant']['id'])
|
||||
if catalog_ref:
|
||||
token_data['serviceCatalog'] = self.format_catalog(catalog_ref)
|
||||
|
||||
# Build v2 metadata
|
||||
metadata = {}
|
||||
metadata['roles'] = role_ids
|
||||
# Setting is_admin to keep consistency in v2 response
|
||||
metadata['is_admin'] = 0
|
||||
token_data['metadata'] = metadata
|
||||
|
||||
return {'access': token_data}
|
||||
|
||||
@classmethod
|
||||
def format_catalog(cls, catalog_ref):
|
||||
"""Munge catalogs from internal to output format.
|
||||
|
||||
Internal catalogs look like::
|
||||
|
||||
{$REGION: {
|
||||
{$SERVICE: {
|
||||
$key1: $value1,
|
||||
...
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
The legacy api wants them to look like::
|
||||
|
||||
[{'name': $SERVICE[name],
|
||||
'type': $SERVICE,
|
||||
'endpoints': [{
|
||||
'tenantId': $tenant_id,
|
||||
...
|
||||
'region': $REGION,
|
||||
}],
|
||||
'endpoints_links': [],
|
||||
}]
|
||||
|
||||
"""
|
||||
if not catalog_ref:
|
||||
return []
|
||||
|
||||
services = {}
|
||||
for region, region_ref in catalog_ref.items():
|
||||
for service, service_ref in region_ref.items():
|
||||
new_service_ref = services.get(service, {})
|
||||
new_service_ref['name'] = service_ref.pop('name')
|
||||
new_service_ref['type'] = service
|
||||
new_service_ref['endpoints_links'] = []
|
||||
service_ref['region'] = region
|
||||
|
||||
endpoints_ref = new_service_ref.get('endpoints', [])
|
||||
endpoints_ref.append(service_ref)
|
||||
|
||||
new_service_ref['endpoints'] = endpoints_ref
|
||||
services[service] = new_service_ref
|
||||
|
||||
return list(services.values())
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Ec2ControllerCommon(provider_api.ProviderAPIMixin, object):
|
||||
def check_signature(self, creds_ref, credentials):
|
||||
@ -440,93 +268,6 @@ class Ec2ControllerCommon(provider_api.ProviderAPIMixin, object):
|
||||
headers=headers)
|
||||
|
||||
|
||||
class Ec2Controller(Ec2ControllerCommon, controller.V2Controller):
|
||||
|
||||
@controller.v2_ec2_deprecated
|
||||
def authenticate(self, request, credentials=None, ec2Credentials=None):
|
||||
(user_ref, project_ref, roles_ref, catalog_ref) = self._authenticate(
|
||||
credentials=credentials, ec2credentials=ec2Credentials
|
||||
)
|
||||
|
||||
method_names = ['ec2credential']
|
||||
|
||||
token_id, token_data = self.token_provider_api.issue_token(
|
||||
user_ref['id'], method_names, project_id=project_ref['id'])
|
||||
|
||||
v2_helper = V2TokenDataHelper()
|
||||
token_data = v2_helper.v3_to_v2_token(token_data, token_id)
|
||||
return token_data
|
||||
|
||||
@controller.v2_ec2_deprecated
|
||||
def get_credential(self, request, user_id, credential_id):
|
||||
if not self._is_admin(request):
|
||||
self._assert_identity(request.context_dict, user_id)
|
||||
return super(Ec2Controller, self).get_credential(user_id,
|
||||
credential_id)
|
||||
|
||||
@controller.v2_ec2_deprecated
|
||||
def get_credentials(self, request, user_id):
|
||||
if not self._is_admin(request):
|
||||
self._assert_identity(request.context_dict, user_id)
|
||||
return super(Ec2Controller, self).get_credentials(user_id)
|
||||
|
||||
@controller.v2_ec2_deprecated
|
||||
def create_credential(self, request, user_id, tenant_id):
|
||||
if not self._is_admin(request):
|
||||
self._assert_identity(request.context_dict, user_id)
|
||||
return super(Ec2Controller, self).create_credential(
|
||||
request, user_id, tenant_id)
|
||||
|
||||
@controller.v2_ec2_deprecated
|
||||
def delete_credential(self, request, user_id, credential_id):
|
||||
if not self._is_admin(request):
|
||||
self._assert_identity(request.context_dict, user_id)
|
||||
self._assert_owner(user_id, credential_id)
|
||||
return super(Ec2Controller, self).delete_credential(user_id,
|
||||
credential_id)
|
||||
|
||||
def _assert_identity(self, context, user_id):
|
||||
"""Check that the provided token belongs to the user.
|
||||
|
||||
:param context: standard context
|
||||
:param user_id: id of user
|
||||
:raises keystone.exception.Forbidden: when token is invalid
|
||||
|
||||
"""
|
||||
token_ref = authorization.get_token_ref(context)
|
||||
|
||||
if token_ref.user_id != user_id:
|
||||
raise exception.Forbidden(_('Token belongs to another user'))
|
||||
|
||||
def _is_admin(self, request):
|
||||
"""Wrap admin assertion error return statement.
|
||||
|
||||
:param context: standard context
|
||||
:returns: bool: success
|
||||
|
||||
"""
|
||||
try:
|
||||
# NOTE(morganfainberg): policy_api is required for assert_admin
|
||||
# to properly perform policy enforcement.
|
||||
self.assert_admin(request)
|
||||
return True
|
||||
except (exception.Forbidden, exception.Unauthorized):
|
||||
return False
|
||||
|
||||
def _assert_owner(self, user_id, credential_id):
|
||||
"""Ensure the provided user owns the credential.
|
||||
|
||||
:param user_id: expected credential owner
|
||||
:param credential_id: id of credential object
|
||||
:raises keystone.exception.Forbidden: on failure
|
||||
|
||||
"""
|
||||
ec2_credential_id = utils.hash_access_key(credential_id)
|
||||
cred_ref = self.credential_api.get_credential(ec2_credential_id)
|
||||
if user_id != cred_ref['user_id']:
|
||||
raise exception.Forbidden(_('Credential belongs to another user'))
|
||||
|
||||
|
||||
class Ec2ControllerV3(Ec2ControllerCommon, controller.V3Controller):
|
||||
|
||||
collection_name = 'credentials'
|
||||
|
@ -24,39 +24,6 @@ build_resource_relation = functools.partial(
|
||||
extension_version='1.0')
|
||||
|
||||
|
||||
class Ec2Extension(wsgi.ExtensionRouter):
|
||||
def add_routes(self, mapper):
|
||||
ec2_controller = controllers.Ec2Controller()
|
||||
# validation
|
||||
mapper.connect(
|
||||
'/ec2tokens',
|
||||
controller=ec2_controller,
|
||||
action='authenticate',
|
||||
conditions=dict(method=['POST']))
|
||||
|
||||
# crud
|
||||
mapper.connect(
|
||||
'/users/{user_id}/credentials/OS-EC2',
|
||||
controller=ec2_controller,
|
||||
action='create_credential',
|
||||
conditions=dict(method=['POST']))
|
||||
mapper.connect(
|
||||
'/users/{user_id}/credentials/OS-EC2',
|
||||
controller=ec2_controller,
|
||||
action='get_credentials',
|
||||
conditions=dict(method=['GET']))
|
||||
mapper.connect(
|
||||
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
|
||||
controller=ec2_controller,
|
||||
action='get_credential',
|
||||
conditions=dict(method=['GET']))
|
||||
mapper.connect(
|
||||
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
|
||||
controller=ec2_controller,
|
||||
action='delete_credential',
|
||||
conditions=dict(method=['DELETE']))
|
||||
|
||||
|
||||
class Routers(wsgi.RoutersBase):
|
||||
|
||||
_path_prefixes = ('ec2tokens', 'users')
|
||||
|
@ -215,47 +215,6 @@ class RestfulTestCase(unit.TestCase):
|
||||
def admin_request(self, **kwargs):
|
||||
return self._request(app=self.admin_app, **kwargs)
|
||||
|
||||
def _get_token(self, body):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
r = self.public_request(method='POST', path='/v2.0/tokens', body=body)
|
||||
return self._get_token_id(r)
|
||||
|
||||
def get_admin_token(self):
|
||||
return self._get_token({
|
||||
'auth': {
|
||||
'passwordCredentials': {
|
||||
'username': self.user_req_admin['name'],
|
||||
'password': self.user_req_admin['password']
|
||||
},
|
||||
'tenantId': default_fixtures.SERVICE_TENANT_ID
|
||||
}
|
||||
})
|
||||
|
||||
def get_unscoped_token(self):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
return self._get_token({
|
||||
'auth': {
|
||||
'passwordCredentials': {
|
||||
'username': self.user_foo['name'],
|
||||
'password': self.user_foo['password'],
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
def get_scoped_token(self, tenant_id=None):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
if not tenant_id:
|
||||
tenant_id = self.tenant_bar['id']
|
||||
return self._get_token({
|
||||
'auth': {
|
||||
'passwordCredentials': {
|
||||
'username': self.user_foo['name'],
|
||||
'password': self.user_foo['password'],
|
||||
},
|
||||
'tenantId': tenant_id,
|
||||
},
|
||||
})
|
||||
|
||||
def _get_token_id(self, r):
|
||||
"""Helper method to return a token ID from a response.
|
||||
|
||||
|
@ -18,123 +18,12 @@ from six.moves import http_client
|
||||
from keystone.common import provider_api
|
||||
from keystone.contrib.ec2 import controllers
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import rest
|
||||
from keystone.tests.unit import test_v3
|
||||
|
||||
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class EC2ContribCoreV2(rest.RestfulTestCase):
|
||||
def setUp(self):
|
||||
super(EC2ContribCoreV2, self).setUp()
|
||||
# TODO(morgan): remove test class, v2.0 has been deleted
|
||||
self.skipTest('V2.0 has been deleted, test is nolonger valid')
|
||||
|
||||
def config_overrides(self):
|
||||
super(EC2ContribCoreV2, self).config_overrides()
|
||||
|
||||
def assertValidAuthenticationResponse(self, r):
|
||||
self.assertIsNotNone(r.result.get('access'))
|
||||
self.assertIsNotNone(r.result['access'].get('token'))
|
||||
self.assertIsNotNone(r.result['access'].get('user'))
|
||||
|
||||
# validate token
|
||||
self.assertIsNotNone(r.result['access']['token'].get('id'))
|
||||
self.assertIsNotNone(r.result['access']['token'].get('expires'))
|
||||
tenant = r.result['access']['token'].get('tenant')
|
||||
if tenant is not None:
|
||||
# validate tenant
|
||||
self.assertIsNotNone(tenant.get('id'))
|
||||
self.assertIsNotNone(tenant.get('name'))
|
||||
|
||||
# validate user
|
||||
self.assertIsNotNone(r.result['access']['user'].get('id'))
|
||||
self.assertIsNotNone(r.result['access']['user'].get('name'))
|
||||
|
||||
def assertValidErrorResponse(self, r):
|
||||
resp = r.result
|
||||
self.assertIsNotNone(resp.get('error'))
|
||||
self.assertIsNotNone(resp['error'].get('code'))
|
||||
self.assertIsNotNone(resp['error'].get('title'))
|
||||
self.assertIsNotNone(resp['error'].get('message'))
|
||||
self.assertEqual(int(resp['error']['code']), r.status_code)
|
||||
|
||||
def test_valid_authentication_response_with_proper_secret(self):
|
||||
cred_blob, credential = unit.new_ec2_credential(
|
||||
self.user_foo['id'], self.tenant_bar['id'])
|
||||
|
||||
PROVIDERS.credential_api.create_credential(
|
||||
credential['id'], credential)
|
||||
|
||||
signer = ec2_utils.Ec2Signer(cred_blob['secret'])
|
||||
credentials = {
|
||||
'access': cred_blob['access'],
|
||||
'secret': cred_blob['secret'],
|
||||
'host': 'localhost',
|
||||
'verb': 'GET',
|
||||
'path': '/',
|
||||
'params': {
|
||||
'SignatureVersion': '2',
|
||||
'Action': 'Test',
|
||||
'Timestamp': '2007-01-31T23:59:59Z'
|
||||
},
|
||||
}
|
||||
credentials['signature'] = signer.generate(credentials)
|
||||
resp = self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/ec2tokens',
|
||||
body={'credentials': credentials},
|
||||
expected_status=http_client.OK)
|
||||
self.assertValidAuthenticationResponse(resp)
|
||||
|
||||
def test_authenticate_with_empty_body_returns_bad_request(self):
|
||||
self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/ec2tokens',
|
||||
body={},
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_authenticate_without_json_request_returns_bad_request(self):
|
||||
self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/ec2tokens',
|
||||
body='not json',
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_authenticate_without_request_body_returns_bad_request(self):
|
||||
self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/ec2tokens',
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_authenticate_without_proper_secret_returns_unauthorized(self):
|
||||
cred_blob, credential = unit.new_ec2_credential(
|
||||
self.user_foo['id'], self.tenant_bar['id'])
|
||||
|
||||
PROVIDERS.credential_api.create_credential(
|
||||
credential['id'], credential)
|
||||
|
||||
signer = ec2_utils.Ec2Signer('totally not the secret')
|
||||
credentials = {
|
||||
'access': cred_blob['access'],
|
||||
'secret': 'totally not the secret',
|
||||
'host': 'localhost',
|
||||
'verb': 'GET',
|
||||
'path': '/',
|
||||
'params': {
|
||||
'SignatureVersion': '2',
|
||||
'Action': 'Test',
|
||||
'Timestamp': '2007-01-31T23:59:59Z'
|
||||
},
|
||||
}
|
||||
credentials['signature'] = signer.generate(credentials)
|
||||
self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/ec2tokens',
|
||||
body={'credentials': credentials},
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
|
||||
class EC2ContribCoreV3(test_v3.RestfulTestCase):
|
||||
def setUp(self):
|
||||
super(EC2ContribCoreV3, self).setUp()
|
||||
|
@ -1,283 +0,0 @@
|
||||
# Copyright 2015 UnitedStack, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from keystoneclient.contrib.ec2 import utils as ec2_utils
|
||||
from six.moves import http_client
|
||||
|
||||
from keystone.common import context
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import request
|
||||
from keystone.common import utils
|
||||
from keystone.contrib.ec2 import controllers
|
||||
from keystone.credential.providers import fernet as credential_fernet
|
||||
from keystone import exception
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import default_fixtures
|
||||
from keystone.tests.unit import ksfixtures
|
||||
from keystone.tests.unit.ksfixtures import database
|
||||
from keystone.tests.unit import test_v3 as rest
|
||||
|
||||
CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class V2CredentialEc2TestCase(rest.RestfulTestCase):
|
||||
def _get_token_id(self, r):
|
||||
return r.headers.get('X-Subject-Token')
|
||||
|
||||
def _get_ec2_cred(self):
|
||||
uri = self._get_ec2_cred_uri()
|
||||
r = self.public_request(method='POST', token=self.get_scoped_token(),
|
||||
path=uri, body={'tenant_id': self.project_id})
|
||||
return r.result['credential']
|
||||
|
||||
def _get_ec2_cred_uri(self):
|
||||
return '/v2.0/users/%s/credentials/OS-EC2' % self.user_id
|
||||
|
||||
def test_ec2_cannot_get_non_ec2_credential(self):
|
||||
# TODO(morgan): remove this test class, V2 has been removed.
|
||||
self.skipTest('V2.0 has been removed, remove the whole test class.')
|
||||
access_key = uuid.uuid4().hex
|
||||
cred_id = utils.hash_access_key(access_key)
|
||||
non_ec2_cred = unit.new_credential_ref(
|
||||
user_id=self.user_id,
|
||||
project_id=self.project_id)
|
||||
non_ec2_cred['id'] = cred_id
|
||||
PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred)
|
||||
|
||||
# if access_key is not found, ec2 controller raises Unauthorized
|
||||
# exception
|
||||
path = '/'.join([self._get_ec2_cred_uri(), access_key])
|
||||
self.public_request(method='GET', token=self.get_scoped_token(),
|
||||
path=path,
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def assertValidErrorResponse(self, r):
|
||||
# FIXME(wwwjfy): it's copied from test_v3.py. The logic of this method
|
||||
# in test_v2.py and test_v3.py (both are inherited from rest.py) has no
|
||||
# difference, so they should be refactored into one place. Also, the
|
||||
# function signatures in both files don't match the one in the parent
|
||||
# class in rest.py.
|
||||
resp = r.result
|
||||
self.assertIsNotNone(resp.get('error'))
|
||||
self.assertIsNotNone(resp['error'].get('code'))
|
||||
self.assertIsNotNone(resp['error'].get('title'))
|
||||
self.assertIsNotNone(resp['error'].get('message'))
|
||||
self.assertEqual(int(resp['error']['code']), r.status_code)
|
||||
|
||||
def test_ec2_list_credentials(self):
|
||||
# TODO(morgan): remove this test class, V2 has been removed.
|
||||
self.skipTest('V2.0 has been removed, remove the whole test class.')
|
||||
self._get_ec2_cred()
|
||||
uri = self._get_ec2_cred_uri()
|
||||
r = self.public_request(method='GET', token=self.get_scoped_token(),
|
||||
path=uri)
|
||||
cred_list = r.result['credentials']
|
||||
self.assertEqual(1, len(cred_list))
|
||||
|
||||
# non-EC2 credentials won't be fetched
|
||||
non_ec2_cred = unit.new_credential_ref(
|
||||
user_id=self.user_id,
|
||||
project_id=self.project_id)
|
||||
non_ec2_cred['type'] = uuid.uuid4().hex
|
||||
PROVIDERS.credential_api.create_credential(
|
||||
non_ec2_cred['id'], non_ec2_cred
|
||||
)
|
||||
r = self.public_request(method='GET', token=self.get_scoped_token(),
|
||||
path=uri)
|
||||
cred_list_2 = r.result['credentials']
|
||||
# still one element because non-EC2 credentials are not returned.
|
||||
self.assertEqual(1, len(cred_list_2))
|
||||
self.assertEqual(cred_list[0], cred_list_2[0])
|
||||
|
||||
|
||||
class V2CredentialEc2Controller(unit.TestCase):
|
||||
def setUp(self):
|
||||
super(V2CredentialEc2Controller, self).setUp()
|
||||
# TODO(morgan): remove the whole test case/class, v2.0 is dead.
|
||||
self.skipTest('V2.0 has been removed, test case is not valid.')
|
||||
self.useFixture(database.Database())
|
||||
self.useFixture(
|
||||
ksfixtures.KeyRepository(
|
||||
self.config_fixture,
|
||||
'credential',
|
||||
credential_fernet.MAX_ACTIVE_KEYS
|
||||
)
|
||||
)
|
||||
self.load_backends()
|
||||
self.load_fixtures(default_fixtures)
|
||||
self.user_id = self.user_foo['id']
|
||||
self.project_id = self.tenant_bar['id']
|
||||
self.controller = controllers.Ec2Controller()
|
||||
self.blob, tmp_ref = unit.new_ec2_credential(
|
||||
user_id=self.user_id,
|
||||
project_id=self.project_id)
|
||||
|
||||
self.creds_ref = (controllers.Ec2Controller
|
||||
._convert_v3_to_ec2_credential(tmp_ref))
|
||||
|
||||
def test_signature_validate_no_host_port(self):
|
||||
"""Test signature validation with the access/secret provided."""
|
||||
access = self.blob['access']
|
||||
secret = self.blob['secret']
|
||||
signer = ec2_utils.Ec2Signer(secret)
|
||||
params = {'SignatureMethod': 'HmacSHA256',
|
||||
'SignatureVersion': '2',
|
||||
'AWSAccessKeyId': access}
|
||||
request = {'host': 'foo',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
signature = signer.generate(request)
|
||||
|
||||
sig_ref = {'access': access,
|
||||
'signature': signature,
|
||||
'host': 'foo',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
|
||||
# Now validate the signature based on the dummy request
|
||||
self.assertTrue(self.controller.check_signature(self.creds_ref,
|
||||
sig_ref))
|
||||
|
||||
def test_signature_validate_with_host_port(self):
|
||||
"""Test signature validation when host is bound with port.
|
||||
|
||||
Host is bound with a port, generally, the port here is not the
|
||||
standard port for the protocol, like '80' for HTTP and port 443
|
||||
for HTTPS, the port is not omitted by the client library.
|
||||
"""
|
||||
access = self.blob['access']
|
||||
secret = self.blob['secret']
|
||||
signer = ec2_utils.Ec2Signer(secret)
|
||||
params = {'SignatureMethod': 'HmacSHA256',
|
||||
'SignatureVersion': '2',
|
||||
'AWSAccessKeyId': access}
|
||||
request = {'host': 'foo:8181',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
signature = signer.generate(request)
|
||||
|
||||
sig_ref = {'access': access,
|
||||
'signature': signature,
|
||||
'host': 'foo:8181',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
|
||||
# Now validate the signature based on the dummy request
|
||||
self.assertTrue(self.controller.check_signature(self.creds_ref,
|
||||
sig_ref))
|
||||
|
||||
def test_signature_validate_with_missed_host_port(self):
|
||||
"""Test signature validation when host is bound with well-known port.
|
||||
|
||||
Host is bound with a port, but the port is well-know port like '80'
|
||||
for HTTP and port 443 for HTTPS, sometimes, client library omit
|
||||
the port but then make the request with the port.
|
||||
see (How to create the string to sign): 'http://docs.aws.amazon.com/
|
||||
general/latest/gr/signature-version-2.html'.
|
||||
|
||||
Since "credentials['host']" is not set by client library but is
|
||||
taken from "req.host", so caused the differences.
|
||||
"""
|
||||
access = self.blob['access']
|
||||
secret = self.blob['secret']
|
||||
signer = ec2_utils.Ec2Signer(secret)
|
||||
params = {'SignatureMethod': 'HmacSHA256',
|
||||
'SignatureVersion': '2',
|
||||
'AWSAccessKeyId': access}
|
||||
# Omit the port to generate the signature.
|
||||
cnt_req = {'host': 'foo',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
signature = signer.generate(cnt_req)
|
||||
|
||||
sig_ref = {'access': access,
|
||||
'signature': signature,
|
||||
'host': 'foo:8080',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
|
||||
# Now validate the signature based on the dummy request
|
||||
# Check the signature again after omitting the port.
|
||||
self.assertTrue(self.controller.check_signature(self.creds_ref,
|
||||
sig_ref))
|
||||
|
||||
def test_signature_validate_no_signature(self):
|
||||
"""Signature is not presented in signature reference data."""
|
||||
access = self.blob['access']
|
||||
params = {'SignatureMethod': 'HmacSHA256',
|
||||
'SignatureVersion': '2',
|
||||
'AWSAccessKeyId': access}
|
||||
|
||||
sig_ref = {'access': access,
|
||||
'signature': None,
|
||||
'host': 'foo:8080',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
|
||||
# Now validate the signature based on the dummy request
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
self.controller.check_signature,
|
||||
self.creds_ref, sig_ref)
|
||||
|
||||
def test_signature_validate_invalid_signature(self):
|
||||
"""Signature is not signed on the correct data."""
|
||||
access = self.blob['access']
|
||||
secret = self.blob['secret']
|
||||
signer = ec2_utils.Ec2Signer(secret)
|
||||
params = {'SignatureMethod': 'HmacSHA256',
|
||||
'SignatureVersion': '2',
|
||||
'AWSAccessKeyId': access}
|
||||
request = {'host': 'bar',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
signature = signer.generate(request)
|
||||
|
||||
sig_ref = {'access': access,
|
||||
'signature': signature,
|
||||
'host': 'foo:8080',
|
||||
'verb': 'GET',
|
||||
'path': '/bar',
|
||||
'params': params}
|
||||
|
||||
# Now validate the signature based on the dummy request
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
self.controller.check_signature,
|
||||
self.creds_ref, sig_ref)
|
||||
|
||||
def test_check_non_admin_user(self):
|
||||
"""Checking if user is admin causes uncaught error.
|
||||
|
||||
When checking if a user is an admin, keystone.exception.Unauthorized
|
||||
is raised but not caught if the user is not an admin.
|
||||
"""
|
||||
# make a non-admin user
|
||||
req = request.Request.blank('/')
|
||||
req.context = context.RequestContext(is_admin=False)
|
||||
req.context_dict['is_admin'] = False
|
||||
req.context_dict['token_id'] = uuid.uuid4().hex
|
||||
|
||||
# check if user is admin
|
||||
# no exceptions should be raised
|
||||
self.controller._is_admin(req)
|
@ -41,19 +41,19 @@ SAMPLE_V3_TOKEN = {
|
||||
"id": "02850c5d1d094887bdc46e81e1e15dc7",
|
||||
"interface": "admin",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:35357/v2.0"
|
||||
"url": "http://localhost:35357/v3"
|
||||
},
|
||||
{
|
||||
"id": "446e244b75034a9ab4b0811e82d0b7c8",
|
||||
"interface": "internal",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:5000/v2.0"
|
||||
"url": "http://localhost:5000/v3"
|
||||
},
|
||||
{
|
||||
"id": "47fa3d9f499240abb5dfcf2668f168cd",
|
||||
"interface": "public",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:5000/v2.0"
|
||||
"url": "http://localhost:5000/v3"
|
||||
}
|
||||
],
|
||||
"id": "26d7541715a44a4d9adad96f9872b633",
|
||||
@ -231,19 +231,19 @@ SAMPLE_V3_TOKEN_WITH_EMBEDED_VERSION = {
|
||||
"id": "02850c5d1d094887bdc46e81e1e15dc7",
|
||||
"interface": "admin",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:35357/v2.0"
|
||||
"url": "http://localhost:35357/v3"
|
||||
},
|
||||
{
|
||||
"id": "446e244b75034a9ab4b0811e82d0b7c8",
|
||||
"interface": "internal",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:5000/v2.0"
|
||||
"url": "http://localhost:5000/v3"
|
||||
},
|
||||
{
|
||||
"id": "47fa3d9f499240abb5dfcf2668f168cd",
|
||||
"interface": "public",
|
||||
"region": "RegionOne",
|
||||
"url": "http://localhost:5000/v2.0"
|
||||
"url": "http://localhost:5000/v3"
|
||||
}
|
||||
],
|
||||
"id": "26d7541715a44a4d9adad96f9872b633",
|
||||
|
@ -29,39 +29,6 @@ from keystone.tests.unit import utils
|
||||
from keystone.version import controllers
|
||||
|
||||
|
||||
v2_MEDIA_TYPES = [
|
||||
{
|
||||
"base": "application/json",
|
||||
"type": "application/"
|
||||
"vnd.openstack.identity-v2.0+json"
|
||||
}
|
||||
]
|
||||
|
||||
v2_HTML_DESCRIPTION = {
|
||||
"rel": "describedby",
|
||||
"type": "text/html",
|
||||
"href": "https://docs.openstack.org/"
|
||||
}
|
||||
|
||||
|
||||
v2_EXPECTED_RESPONSE = {
|
||||
"id": "v2.0",
|
||||
"status": "deprecated",
|
||||
"updated": "2016-08-04T00:00:00Z",
|
||||
"links": [
|
||||
{
|
||||
"rel": "self",
|
||||
"href": "", # Will get filled in after initialization
|
||||
},
|
||||
v2_HTML_DESCRIPTION
|
||||
],
|
||||
"media-types": v2_MEDIA_TYPES
|
||||
}
|
||||
|
||||
v2_VERSION_RESPONSE = {
|
||||
"version": v2_EXPECTED_RESPONSE
|
||||
}
|
||||
|
||||
v3_MEDIA_TYPES = [
|
||||
{
|
||||
"base": "application/json",
|
||||
@ -745,11 +712,6 @@ class VersionTestCase(unit.TestCase):
|
||||
if version['id'].startswith('v3'):
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v3/' % self.public_port)
|
||||
elif version['id'] == 'v2.0':
|
||||
# TODO(morgan): remove this if block in future patch,
|
||||
# v2.0 has been removed.
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v2.0/' % self.public_port)
|
||||
self.assertThat(data, _VersionsEqual(expected))
|
||||
|
||||
def test_admin_versions(self):
|
||||
@ -762,11 +724,6 @@ class VersionTestCase(unit.TestCase):
|
||||
if version['id'].startswith('v3'):
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v3/' % self.admin_port)
|
||||
elif version['id'] == 'v2.0':
|
||||
# TODO(morgan): remove this if block in future patch,
|
||||
# v2.0 has been removed.
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v2.0/' % self.admin_port)
|
||||
self.assertThat(data, _VersionsEqual(expected))
|
||||
|
||||
def test_use_site_url_if_endpoint_unset(self):
|
||||
@ -783,50 +740,8 @@ class VersionTestCase(unit.TestCase):
|
||||
if version['id'].startswith('v3'):
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost/v3/')
|
||||
elif version['id'] == 'v2.0':
|
||||
# TODO(morgan): remove this if block in future patch,
|
||||
# v2.0 has been removed.
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost/v2.0/')
|
||||
self.assertThat(data, _VersionsEqual(expected))
|
||||
|
||||
def test_public_version_v2(self):
|
||||
# TODO(morgan): Remove this test in a future patch.
|
||||
self.skipTest('Test is not Valid, v2.0 has been removed.')
|
||||
client = TestClient(self.public_app)
|
||||
resp = client.get('/v2.0/')
|
||||
self.assertEqual(http_client.OK, resp.status_int)
|
||||
data = jsonutils.loads(resp.body)
|
||||
expected = v2_VERSION_RESPONSE
|
||||
self._paste_in_port(expected['version'],
|
||||
'http://localhost:%s/v2.0/' % self.public_port)
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
def test_admin_version_v2(self):
|
||||
# TODO(morgan): Remove this test in a future patch.
|
||||
self.skipTest('Test is not Valid, v2.0 has been removed.')
|
||||
client = TestClient(self.admin_app)
|
||||
resp = client.get('/v2.0/')
|
||||
self.assertEqual(http_client.OK, resp.status_int)
|
||||
data = jsonutils.loads(resp.body)
|
||||
expected = v2_VERSION_RESPONSE
|
||||
self._paste_in_port(expected['version'],
|
||||
'http://localhost:%s/v2.0/' % self.admin_port)
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
def test_use_site_url_if_endpoint_unset_v2(self):
|
||||
# TODO(morgan): Remove this test in a future patch.
|
||||
self.skipTest('Test is not Valid, v2.0 has been removed.')
|
||||
self.config_fixture.config(public_endpoint=None, admin_endpoint=None)
|
||||
for app in (self.public_app, self.admin_app):
|
||||
client = TestClient(app)
|
||||
resp = client.get('/v2.0/')
|
||||
self.assertEqual(http_client.OK, resp.status_int)
|
||||
data = jsonutils.loads(resp.body)
|
||||
expected = v2_VERSION_RESPONSE
|
||||
self._paste_in_port(expected['version'], 'http://localhost/v2.0/')
|
||||
self.assertEqual(data, expected)
|
||||
|
||||
def test_public_version_v3(self):
|
||||
client = TestClient(self.public_app)
|
||||
resp = client.get('/v3/')
|
||||
@ -893,38 +808,6 @@ class VersionTestCase(unit.TestCase):
|
||||
data = jsonutils.loads(resp.body)
|
||||
self.assertEqual(v3_only_response, data)
|
||||
|
||||
def test_v3_disabled(self):
|
||||
# TODO(morgan): Remove this test in a future patch.
|
||||
self.skipTest('Test is not Valid, v2.0 has been removed.')
|
||||
client = TestClient(self.public_app)
|
||||
# request to /v3 should fail
|
||||
resp = client.get('/v3/')
|
||||
self.assertEqual(http_client.NOT_FOUND, resp.status_int)
|
||||
|
||||
# request to /v2.0 should pass
|
||||
resp = client.get('/v2.0/')
|
||||
self.assertEqual(http_client.OK, resp.status_int)
|
||||
data = jsonutils.loads(resp.body)
|
||||
expected = v2_VERSION_RESPONSE
|
||||
self._paste_in_port(expected['version'],
|
||||
'http://localhost:%s/v2.0/' % self.public_port)
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
# only v2 information should be displayed by requests to /
|
||||
v2_only_response = {
|
||||
"versions": {
|
||||
"values": [
|
||||
v2_EXPECTED_RESPONSE
|
||||
]
|
||||
}
|
||||
}
|
||||
self._paste_in_port(v2_only_response['versions']['values'][0],
|
||||
'http://localhost:%s/v2.0/' % self.public_port)
|
||||
resp = client.get('/')
|
||||
self.assertEqual(300, resp.status_int)
|
||||
data = jsonutils.loads(resp.body)
|
||||
self.assertEqual(v2_only_response, data)
|
||||
|
||||
def _test_json_home(self, path, exp_json_home_data):
|
||||
client = TestClient(self.public_app)
|
||||
resp = client.get(path, headers={'Accept': 'application/json-home'})
|
||||
@ -1052,11 +935,6 @@ class VersionSingleAppTestCase(unit.TestCase):
|
||||
if version['id'].startswith('v3'):
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v3/' % app_port())
|
||||
elif version['id'] == 'v2.0':
|
||||
# TODO(morgan): remove this if block in future patch,
|
||||
# v2.0 has been removed.
|
||||
self._paste_in_port(
|
||||
version, 'http://localhost:%s/v2.0/' % app_port())
|
||||
self.assertThat(data, _VersionsEqual(expected))
|
||||
|
||||
def test_public(self):
|
||||
@ -1087,8 +965,6 @@ class VersionBehindSslTestCase(unit.TestCase):
|
||||
for version in expected['versions']['values']:
|
||||
if version['id'].startswith('v3'):
|
||||
self._paste_in_port(version, host + 'v3/')
|
||||
elif version['id'] == 'v2.0':
|
||||
self._paste_in_port(version, host + 'v2.0/')
|
||||
return expected
|
||||
|
||||
def test_versions_without_headers(self):
|
||||
|
@ -282,14 +282,6 @@ class ApplicationTest(BaseWSGITest):
|
||||
resp = req.get_response(FakeApp())
|
||||
self.assertEqual(b"http://foo:1234/identity", resp.body)
|
||||
|
||||
# make sure version portion of the SCRIPT_NAME, '/v2.0', is stripped
|
||||
# from base url
|
||||
req = self._make_request(url='/')
|
||||
req.environ.update({'HTTP_HOST': 'foo:80',
|
||||
'SCRIPT_NAME': '/bar/identity/v2.0'})
|
||||
resp = req.get_response(FakeApp())
|
||||
self.assertEqual(b"http://foo/bar/identity", resp.body)
|
||||
|
||||
# make sure version portion of the SCRIPT_NAME, '/v3' is stripped from
|
||||
# base url
|
||||
req = self._make_request(url='/')
|
||||
|
@ -56,6 +56,8 @@ class TestFernetTokenProvider(unit.TestCase):
|
||||
self.assertIn(token_id, u'%s' % e)
|
||||
|
||||
def test_invalid_v2_token_raises_token_not_found(self):
|
||||
# NOTE(morgan): Kept for regression testing, should not be deleted
|
||||
# even though it references V2.0
|
||||
token_id = uuid.uuid4().hex
|
||||
e = self.assertRaises(
|
||||
exception.TokenNotFound,
|
||||
|
@ -545,12 +545,6 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
|
||||
if 'token_version' in token_data:
|
||||
if token_data['token_version'] in token_model.VERSIONS:
|
||||
return token_data['token_version']
|
||||
# FIXME(morganfainberg): deprecate the following logic in future
|
||||
# revisions. It is better to just specify the token_version in
|
||||
# the token_data itself. This way we can support future versions
|
||||
# that might have the same fields.
|
||||
if 'access' in token_data:
|
||||
return token_model.V2
|
||||
if 'token' in token_data and 'methods' in token_data['token']:
|
||||
return token_model.V3
|
||||
raise exception.UnsupportedTokenVersionException()
|
||||
|
@ -179,15 +179,6 @@ class Version(wsgi.Application):
|
||||
}
|
||||
})
|
||||
|
||||
def get_version_v2(self, request):
|
||||
versions = self._get_versions_list(request.context_dict)
|
||||
if 'v2.0' in _VERSIONS:
|
||||
return wsgi.render_response(body={
|
||||
'version': versions['v2.0']
|
||||
})
|
||||
else:
|
||||
raise exception.VersionNotFound(version='v2.0')
|
||||
|
||||
def _get_json_home_v3(self):
|
||||
|
||||
def all_resources():
|
||||
|
Loading…
Reference in New Issue
Block a user