Make auth_token return a V2 Catalog

As there is no way to distinguish a v2 or v3 catalog from the headers
provided to an application we will for the meantime always return a v2
catalog. This should not cause any issues as the full token data is not
provided to the service so there is no-one that will get caught out by a
v2/v3 mix, and anyone that is already supporting the v3 catalog format
will have to support the v2 catalog format as well so it will continue
to work.

Change-Id: Ic9b38e0ba4682b47ae295bd3f89bac59ef7437cf
Closes-Bug: #1302970
This commit is contained in:
Jamie Lennox
2014-04-22 12:17:42 +10:00
parent 570a9dc22d
commit c1c5669d0c
2 changed files with 120 additions and 0 deletions

View File

@@ -108,6 +108,8 @@ HTTP_X_ROLES
HTTP_X_SERVICE_CATALOG
json encoded keystone service catalog (optional).
For compatibility reasons this catalog will always be in the V2 catalog
format even if it is a v3 token.
HTTP_X_TENANT_ID
*Deprecated* in favor of HTTP_X_PROJECT_ID
@@ -371,6 +373,42 @@ def confirm_token_not_expired(data):
return timeutils.isotime(at=expires, subsecond=True)
def _v3_to_v2_catalog(catalog):
"""Convert a catalog to v2 format.
X_SERVICE_CATALOG must be specified in v2 format. If you get a token
that is in v3 convert it.
"""
v2_services = []
for v3_service in catalog:
# first copy over the entries we allow for the service
v2_service = {'type': v3_service['type']}
try:
v2_service['name'] = v3_service['name']
except KeyError:
pass
# now convert the endpoints. Because in v3 we specify region per
# URL not per group we have to collect all the entries of the same
# region together before adding it to the new service.
regions = {}
for v3_endpoint in v3_service.get('endpoints', []):
region_name = v3_endpoint.get('region')
try:
region = regions[region_name]
except KeyError:
region = {'region': region_name} if region_name else {}
regions[region_name] = region
interface_name = v3_endpoint['interface'].lower() + 'URL'
region[interface_name] = v3_endpoint['url']
v2_service['endpoints'] = list(regions.values())
v2_services.append(v2_service)
return v2_services
def safe_quote(s):
"""URL-encode strings that are not already URL-encoded."""
return urllib.parse.quote(s) if s == urllib.parse.unquote(s) else s
@@ -939,6 +977,8 @@ class AuthProtocol(object):
if self.include_service_catalog and auth_ref.has_service_catalog():
catalog = auth_ref.service_catalog.get_data()
if _token_is_v3(token_info):
catalog = _v3_to_v2_catalog(catalog)
rval['X-Service-Catalog'] = jsonutils.dumps(catalog)
return rval

View File

@@ -30,8 +30,10 @@ import testresources
import testtools
import webob
from keystoneclient import access
from keystoneclient.common import cms
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient.middleware import auth_token
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import memorycache
@@ -487,6 +489,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertNotIn('X-Service-Catalog', req.headers)
self.assertEqual(body, [FakeApp.SUCCESS])
self.assertIn('keystone.token_info', req.environ)
return req
def test_valid_uuid_request(self):
for _ in range(2): # Do it twice because first result was cached.
@@ -1566,6 +1569,20 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/testadmin/v3/auth/tokens')
def test_gives_v2_catalog(self):
self.set_middleware()
req = self.assert_valid_request_200(
self.examples.SIGNED_v3_TOKEN_SCOPED)
catalog = jsonutils.loads(req.headers['X-Service-Catalog'])
for service in catalog:
for endpoint in service['endpoints']:
# no point checking everything, just that it's in v2 format
self.assertIn('adminURL', endpoint)
self.assertIn('publicURL', endpoint)
self.assertIn('adminURL', endpoint)
class TokenEncodingTest(testtools.TestCase):
def test_unquoted_token(self):
@@ -1783,5 +1800,68 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
self.assertIsNone(self.middleware._cache_get(token))
class CatalogConversionTests(BaseAuthTokenMiddlewareTest):
PUBLIC_URL = 'http://server:5000/v2.0'
ADMIN_URL = 'http://admin:35357/v2.0'
INTERNAL_URL = 'http://internal:5000/v2.0'
REGION_ONE = 'RegionOne'
REGION_TWO = 'RegionTwo'
REGION_THREE = 'RegionThree'
def test_basic_convert(self):
token = fixture.V3Token()
s = token.add_service(type='identity')
s.add_standard_endpoints(public=self.PUBLIC_URL,
admin=self.ADMIN_URL,
internal=self.INTERNAL_URL,
region=self.REGION_ONE)
auth_ref = access.AccessInfo.factory(body=token)
catalog_data = auth_ref.service_catalog.get_data()
catalog = auth_token._v3_to_v2_catalog(catalog_data)
self.assertEqual(1, len(catalog))
service = catalog[0]
self.assertEqual(1, len(service['endpoints']))
endpoints = service['endpoints'][0]
self.assertEqual('identity', service['type'])
self.assertEqual(4, len(endpoints))
self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
self.assertEqual(self.REGION_ONE, endpoints['region'])
def test_multi_region(self):
token = fixture.V3Token()
s = token.add_service(type='identity')
s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
auth_ref = access.AccessInfo.factory(body=token)
catalog_data = auth_ref.service_catalog.get_data()
catalog = auth_token._v3_to_v2_catalog(catalog_data)
self.assertEqual(1, len(catalog))
service = catalog[0]
# the 3 regions will come through as 3 separate endpoints
expected = [{'internalURL': self.INTERNAL_URL,
'region': self.REGION_ONE},
{'publicURL': self.PUBLIC_URL,
'region': self.REGION_TWO},
{'adminURL': self.ADMIN_URL,
'region': self.REGION_THREE}]
self.assertEqual('identity', service['type'])
self.assertEqual(3, len(service['endpoints']))
for e in expected:
self.assertIn(e, expected)
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)