There are a two interrelated pieces in this patch which are around fixing up places where discovery was being re-run inappropriately. They fall out from adding tests for the functionality and couldn't be sanely shifted back further in the stack without a big dance. Switch the default for "discover_versions" on all of the calls that return an EndpointData to "True". It's a new feature and is a thing that doesn't make a ton of sense to call if you don't want discovery run. However, get_endpoint uses it, so needs to be able to pass in discover_version=False, so the option is still useful. Make sure that get_endpoint and other places where ksa calls get_endpoint_data on behalf of the user work as before without unneeded discovery. Add tests to show that we use actually use the discovery cache properly when we've previously done discovery that can satisfy the new request. This works from the microversion optimization patch, but we had to clean up a couple of things to show it fully in a test. Change-Id: I54053336edf1b3c2bd35a77dbd78f56388b8e806
1652 lines
61 KiB
Python
1652 lines
61 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import abc
|
|
import collections
|
|
import uuid
|
|
|
|
import six
|
|
from six.moves import urllib
|
|
|
|
from keystoneauth1 import _utils
|
|
from keystoneauth1 import access
|
|
from keystoneauth1 import discover
|
|
from keystoneauth1 import exceptions
|
|
from keystoneauth1 import fixture
|
|
from keystoneauth1 import identity
|
|
from keystoneauth1 import plugin
|
|
from keystoneauth1 import session
|
|
from keystoneauth1.tests.unit import utils
|
|
|
|
_Endpoints = collections.namedtuple(
|
|
'ServiceVersion',
|
|
'public, internal, admin')
|
|
|
|
_ServiceVersion = collections.namedtuple(
|
|
'ServiceVersion',
|
|
'discovery, service')
|
|
|
|
|
|
class FakeServiceEndpoints(object):
|
|
def __init__(self, base_url, versions=None, project_id=None, **kwargs):
|
|
self.base_url = base_url
|
|
self._interfaces = {}
|
|
for interface in ('public', 'internal', 'admin'):
|
|
if interface in kwargs and not kwargs[interface]:
|
|
self._interfaces[interface] = False
|
|
else:
|
|
self._interfaces[interface] = True
|
|
|
|
self.versions = {}
|
|
self.unversioned = self._make_urls()
|
|
if not versions:
|
|
self.catalog = self.unversioned
|
|
else:
|
|
self.catalog = self._make_urls(versions[0], project_id)
|
|
for version in versions:
|
|
self.versions[version] = _ServiceVersion(
|
|
self._make_urls(version),
|
|
self._make_urls(version, project_id),
|
|
)
|
|
|
|
def _make_urls(self, *parts):
|
|
return _Endpoints(
|
|
self._make_url('public', *parts),
|
|
self._make_url('internal', *parts),
|
|
self._make_url('admin', *parts),
|
|
)
|
|
|
|
def _make_url(self, interface, *parts):
|
|
if not self._interfaces[interface]:
|
|
return None
|
|
url = urllib.parse.urljoin(self.base_url + '/', interface)
|
|
for part in parts:
|
|
if part:
|
|
url = urllib.parse.urljoin(url + '/', part)
|
|
return url
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class CommonIdentityTests(object):
|
|
|
|
PROJECT_ID = uuid.uuid4().hex
|
|
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
|
|
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
|
|
|
|
TEST_COMPUTE_BASE = 'https://compute.example.com'
|
|
TEST_COMPUTE_PUBLIC = TEST_COMPUTE_BASE + '/nova/public'
|
|
TEST_COMPUTE_INTERNAL = TEST_COMPUTE_BASE + '/nova/internal'
|
|
TEST_COMPUTE_ADMIN = TEST_COMPUTE_BASE + '/nova/admin'
|
|
|
|
TEST_VOLUME = FakeServiceEndpoints(
|
|
base_url='https://block-storage.example.com',
|
|
versions=['v3', 'v2'], project_id=PROJECT_ID)
|
|
|
|
TEST_BAREMETAL_BASE = 'https://baremetal.example.com'
|
|
TEST_BAREMETAL_INTERNAL = TEST_BAREMETAL_BASE + '/internal'
|
|
|
|
TEST_PASS = uuid.uuid4().hex
|
|
|
|
def setUp(self):
|
|
super(CommonIdentityTests, self).setUp()
|
|
|
|
self.TEST_URL = '%s%s' % (self.TEST_ROOT_URL, self.version)
|
|
self.TEST_ADMIN_URL = '%s%s' % (self.TEST_ROOT_ADMIN_URL, self.version)
|
|
self.TEST_DISCOVERY = fixture.DiscoveryList(href=self.TEST_ROOT_URL)
|
|
|
|
self.stub_auth_data()
|
|
|
|
@abc.abstractmethod
|
|
def create_auth_plugin(self, **kwargs):
|
|
"""Create an auth plugin that makes sense for the auth data.
|
|
|
|
It doesn't really matter what auth mechanism is used but it should be
|
|
appropriate to the API version.
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def get_auth_data(self, **kwargs):
|
|
"""Return fake authentication data.
|
|
|
|
This should register a valid token response and ensure that the compute
|
|
endpoints are set to TEST_COMPUTE_PUBLIC, _INTERNAL and _ADMIN.
|
|
"""
|
|
|
|
def stub_auth_data(self, **kwargs):
|
|
token = self.get_auth_data(**kwargs)
|
|
self.user_id = token.user_id
|
|
|
|
try:
|
|
self.project_id = token.project_id
|
|
except AttributeError:
|
|
self.project_id = token.tenant_id
|
|
|
|
self.stub_auth(json=token)
|
|
|
|
@abc.abstractproperty
|
|
def version(self):
|
|
"""The API version being tested."""
|
|
|
|
def test_discovering(self):
|
|
self.stub_url('GET', [],
|
|
base_url=self.TEST_COMPUTE_ADMIN,
|
|
json=self.TEST_DISCOVERY)
|
|
|
|
body = 'SUCCESS'
|
|
|
|
# which gives our sample values
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
|
|
'interface': 'admin',
|
|
'version': self.version})
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
|
|
new_body = 'SC SUCCESS'
|
|
# if we don't specify a version, we use the URL from the SC
|
|
self.stub_url('GET', ['path'],
|
|
base_url=self.TEST_COMPUTE_ADMIN,
|
|
text=new_body)
|
|
|
|
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
|
|
'interface': 'admin'})
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(new_body, resp.text)
|
|
|
|
def test_discovery_uses_provided_session_cache(self):
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
cache = {}
|
|
# now either of the two plugins I use, it should not cause a second
|
|
# request to the discovery url.
|
|
s = session.Session(discovery_cache=cache)
|
|
a = self.create_auth_plugin()
|
|
b = self.create_auth_plugin()
|
|
|
|
for auth in (a, b):
|
|
resp = s.get('/path',
|
|
auth=auth,
|
|
endpoint_filter={'service_type': 'compute',
|
|
'interface': 'admin',
|
|
'version': self.version})
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
self.assertIn(self.TEST_COMPUTE_ADMIN, cache.keys())
|
|
|
|
def test_discovery_uses_session_cache(self):
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
filter = {'service_type': 'compute', 'interface': 'admin',
|
|
'version': self.version}
|
|
|
|
# create a session and call the endpoint, causing its cache to be set
|
|
sess = session.Session()
|
|
sess.get('/path', auth=self.create_auth_plugin(),
|
|
endpoint_filter=filter)
|
|
self.assertIn(self.TEST_COMPUTE_ADMIN, sess._discovery_cache.keys())
|
|
|
|
# now either of the two plugins I use, it should not cause a second
|
|
# request to the discovery url.
|
|
a = self.create_auth_plugin()
|
|
b = self.create_auth_plugin()
|
|
|
|
for auth in (a, b):
|
|
resp = sess.get('/path', auth=auth, endpoint_filter=filter)
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
|
|
def test_discovery_uses_plugin_cache(self):
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
# now either of the two sessions I use, it should not cause a second
|
|
# request to the discovery url. Calling discovery directly should also
|
|
# not cause an additional request.
|
|
sa = session.Session()
|
|
sb = session.Session()
|
|
auth = self.create_auth_plugin()
|
|
|
|
for sess in (sa, sb):
|
|
resp = sess.get('/path',
|
|
auth=auth,
|
|
endpoint_filter={'service_type': 'compute',
|
|
'interface': 'admin',
|
|
'version': self.version})
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
|
|
def test_discovery_uses_session_plugin_cache(self):
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
filter = {'service_type': 'compute', 'interface': 'admin',
|
|
'version': self.version}
|
|
|
|
# create a plugin and call the endpoint, causing its cache to be set
|
|
plugin = self.create_auth_plugin()
|
|
session.Session().get('/path', auth=plugin, endpoint_filter=filter)
|
|
self.assertIn(self.TEST_COMPUTE_ADMIN, plugin._discovery_cache.keys())
|
|
|
|
# with the plugin in the session, no more calls to the discovery URL
|
|
sess = session.Session(auth=plugin)
|
|
for auth in (plugin, self.create_auth_plugin()):
|
|
resp = sess.get('/path', auth=auth, endpoint_filter=filter)
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
|
|
def test_direct_discovery_provided_plugin_cache(self):
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
# now either of the two sessions I use, it should not cause a second
|
|
# request to the discovery url. Calling discovery directly should also
|
|
# not cause an additional request.
|
|
sa = session.Session()
|
|
sb = session.Session()
|
|
discovery_cache = {}
|
|
|
|
expected_url = urllib.parse.urljoin(self.TEST_ROOT_URL, '/v2.0')
|
|
for sess in (sa, sb):
|
|
|
|
disc = discover.get_discovery(
|
|
sess, self.TEST_COMPUTE_ADMIN, cache=discovery_cache)
|
|
url = disc.url_for(('2', '0'))
|
|
|
|
self.assertEqual(expected_url, url)
|
|
|
|
self.assertIn(self.TEST_COMPUTE_ADMIN, discovery_cache.keys())
|
|
|
|
def test_discovering_with_no_data(self):
|
|
# which returns discovery information pointing to TEST_URL but there is
|
|
# no data there.
|
|
self.stub_url('GET', [],
|
|
base_url=self.TEST_COMPUTE_ADMIN,
|
|
status_code=400)
|
|
|
|
# so the url that will be used is the same TEST_COMPUTE_ADMIN
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], base_url=self.TEST_COMPUTE_ADMIN,
|
|
text=body, status_code=200)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
|
|
'interface': 'admin',
|
|
'version': self.version})
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(body, resp.text)
|
|
|
|
def test_direct_discovering_with_no_data(self):
|
|
# returns discovery information pointing to TEST_URL but there is
|
|
# no data there.
|
|
self.stub_url('GET', [],
|
|
base_url=self.TEST_COMPUTE_ADMIN,
|
|
status_code=400)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
# A direct call for discovery should fail
|
|
self.assertRaises(exceptions.BadRequest,
|
|
discover.get_discovery, s, self.TEST_COMPUTE_ADMIN)
|
|
|
|
def test_discovering_with_relative_link(self):
|
|
# need to construct list this way for relative
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2('v2.0')
|
|
disc.add_v3('v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
endpoint_v2 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(2, 0))
|
|
|
|
endpoint_v3 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', endpoint_v2)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', endpoint_v3)
|
|
|
|
def test_direct_discovering(self):
|
|
v2_compute = self.TEST_COMPUTE_ADMIN + '/v2.0'
|
|
v3_compute = self.TEST_COMPUTE_ADMIN + '/v3'
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2(v2_compute)
|
|
disc.add_v3(v3_compute)
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
catalog_url = s.get_endpoint(
|
|
service_type='compute', interface='admin')
|
|
disc = discover.get_discovery(s, catalog_url)
|
|
|
|
url_v2 = disc.url_for(('2', '0'))
|
|
url_v3 = disc.url_for(('3', '0'))
|
|
|
|
self.assertEqual(v2_compute, url_v2)
|
|
self.assertEqual(v3_compute, url_v3)
|
|
|
|
# Verify that passing strings and not tuples works
|
|
url_v2 = disc.url_for('2.0')
|
|
url_v3 = disc.url_for('3.0')
|
|
|
|
self.assertEqual(v2_compute, url_v2)
|
|
self.assertEqual(v3_compute, url_v3)
|
|
|
|
def test_direct_discovering_with_relative_link(self):
|
|
# need to construct list this way for relative
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2('v2.0')
|
|
disc.add_v3('v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
catalog_url = s.get_endpoint(
|
|
service_type='compute', interface='admin')
|
|
disc = discover.get_discovery(s, catalog_url)
|
|
|
|
url_v2 = disc.url_for(('2', '0'))
|
|
url_v3 = disc.url_for(('3', '0'))
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', url_v2)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', url_v3)
|
|
|
|
# Verify that passing strings and not tuples works
|
|
url_v2 = disc.url_for('2.0')
|
|
url_v3 = disc.url_for('3.0')
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', url_v2)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', url_v3)
|
|
|
|
def test_discovering_with_relative_anchored_link(self):
|
|
# need to construct list this way for relative
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2('/v2.0')
|
|
disc.add_v3('/v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
endpoint_v2 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(2, 0))
|
|
|
|
endpoint_v3 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(3, 0))
|
|
|
|
# by the nature of urljoin a relative link with a /path gets joined
|
|
# back to the root.
|
|
self.assertEqual(self.TEST_COMPUTE_BASE + '/v2.0', endpoint_v2)
|
|
self.assertEqual(self.TEST_COMPUTE_BASE + '/v3', endpoint_v3)
|
|
|
|
def test_discovering_with_protocol_relative(self):
|
|
# strip up to and including the : leaving //host/path
|
|
path = self.TEST_COMPUTE_ADMIN[self.TEST_COMPUTE_ADMIN.find(':') + 1:]
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2(path + '/v2.0')
|
|
disc.add_v3(path + '/v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
endpoint_v2 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(2, 0))
|
|
|
|
endpoint_v3 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(3, 0))
|
|
|
|
# ensures that the http is carried over from the lookup url
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', endpoint_v2)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', endpoint_v3)
|
|
|
|
def test_discovering_when_version_missing(self):
|
|
# need to construct list this way for relative
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2('v2.0')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
endpoint_v2 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(2, 0))
|
|
|
|
endpoint_v3 = s.get_endpoint(service_type='compute',
|
|
interface='admin',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', endpoint_v2)
|
|
self.assertIsNone(endpoint_v3)
|
|
|
|
def test_endpoint_data_no_version(self):
|
|
path = self.TEST_COMPUTE_ADMIN[self.TEST_COMPUTE_ADMIN.find(':') + 1:]
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2(path + '/v2.0')
|
|
disc.add_v3(path + '/v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
data = a.get_endpoint_data(session=s,
|
|
service_type='compute',
|
|
interface='admin')
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', data.url)
|
|
|
|
def test_endpoint_data_no_version_no_discovery(self):
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
data = a.get_endpoint_data(session=s,
|
|
service_type='compute',
|
|
interface='admin',
|
|
discover_versions=False)
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN, data.url)
|
|
|
|
def test_endpoint_no_version(self):
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
data = a.get_endpoint(session=s,
|
|
service_type='compute',
|
|
interface='admin')
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN, data)
|
|
|
|
def test_endpoint_data_relative_version(self):
|
|
# need to construct list this way for relative
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2('v2.0')
|
|
disc.add_v3('v3')
|
|
|
|
self.stub_url('GET', [], base_url=self.TEST_COMPUTE_ADMIN, json=disc)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
data_v2 = a.get_endpoint_data(session=s,
|
|
service_type='compute',
|
|
interface='admin',
|
|
version=(2, 0))
|
|
data_v3 = a.get_endpoint_data(session=s,
|
|
service_type='compute',
|
|
interface='admin',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v2.0', data_v2.url)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN + '/v3', data_v3.url)
|
|
|
|
def test_get_versioned_data(self):
|
|
v2_compute = self.TEST_COMPUTE_ADMIN + '/v2.0'
|
|
v3_compute = self.TEST_COMPUTE_ADMIN + '/v3'
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
disc.add_v2(v2_compute)
|
|
disc.add_v3(v3_compute)
|
|
|
|
# Make sure that we don't do more than one discovery call
|
|
# register responses such that if the discovery URL is hit more than
|
|
# once then the response will be invalid and not point to COMPUTE_ADMIN
|
|
resps = [{'json': disc}, {'status_code': 500}]
|
|
self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
data = a.get_endpoint_data(session=s,
|
|
service_type='compute',
|
|
interface='admin')
|
|
self.assertEqual(v3_compute, data.url)
|
|
|
|
v2_data = data.get_versioned_data(s, version='2.0')
|
|
self.assertEqual(v2_compute, v2_data.url)
|
|
self.assertEqual(v2_compute, v2_data.service_url)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN, v2_data.catalog_url)
|
|
|
|
v3_data = data.get_versioned_data(s, version='3.0')
|
|
self.assertEqual(v3_compute, v3_data.url)
|
|
self.assertEqual(v3_compute, v3_data.service_url)
|
|
self.assertEqual(self.TEST_COMPUTE_ADMIN, v3_data.catalog_url)
|
|
|
|
def test_interface_list(self):
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
ep = s.get_endpoint(service_type='baremetal',
|
|
interface=['internal', 'public'])
|
|
self.assertEqual(ep, self.TEST_BAREMETAL_INTERNAL)
|
|
|
|
ep = s.get_endpoint(service_type='baremetal',
|
|
interface=['public', 'internal'])
|
|
self.assertEqual(ep, self.TEST_BAREMETAL_INTERNAL)
|
|
|
|
ep = s.get_endpoint(service_type='compute',
|
|
interface=['internal', 'public'])
|
|
self.assertEqual(ep, self.TEST_COMPUTE_INTERNAL)
|
|
|
|
ep = s.get_endpoint(service_type='compute',
|
|
interface=['public', 'internal'])
|
|
self.assertEqual(ep, self.TEST_COMPUTE_PUBLIC)
|
|
|
|
def test_get_versioned_data_volume_project_id(self):
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
|
|
# The version discovery dict will not have a project_id
|
|
disc.add_nova_microversion(
|
|
href=self.TEST_VOLUME.versions['v3'].discovery.public,
|
|
id='v3.0', status='CURRENT',
|
|
min_version='3.0', version='3.20')
|
|
|
|
# Adding a v2 version to a service named volumev3 is not
|
|
# an error. The service itself is cinder and has more than
|
|
# one major version.
|
|
disc.add_nova_microversion(
|
|
href=self.TEST_VOLUME.versions['v2'].discovery.public,
|
|
id='v2.0', status='SUPPORTED')
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
# volume endpoint ends in v3, we should not make an API call
|
|
endpoint = a.get_endpoint(session=s,
|
|
service_type='volumev3',
|
|
interface='public',
|
|
version='3.0')
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, endpoint)
|
|
|
|
resps = [{'json': disc}, {'status_code': 500}]
|
|
|
|
# We should only try to fetch the versioned discovery url once
|
|
self.requests_mock.get(
|
|
self.TEST_VOLUME.versions['v3'].discovery.public, resps)
|
|
|
|
data = a.get_endpoint_data(session=s,
|
|
service_type='volumev3',
|
|
interface='public')
|
|
self.assertEqual(self.TEST_VOLUME.versions['v3'].service.public,
|
|
data.url)
|
|
|
|
v3_data = data.get_versioned_data(
|
|
s, version='3.0', project_id=self.project_id)
|
|
|
|
self.assertEqual(self.TEST_VOLUME.versions['v3'].service.public,
|
|
v3_data.url)
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, v3_data.catalog_url)
|
|
self.assertEqual((3, 0), v3_data.min_microversion)
|
|
self.assertEqual((3, 20), v3_data.max_microversion)
|
|
self.assertEqual(self.TEST_VOLUME.versions['v3'].service.public,
|
|
v3_data.service_url)
|
|
|
|
# Because of the v3 optimization before, requesting v2 should now go
|
|
# find the unversioned endpoint
|
|
self.requests_mock.get(self.TEST_VOLUME.unversioned.public, resps)
|
|
v2_data = data.get_versioned_data(
|
|
s, version='2.0', project_id=self.project_id)
|
|
|
|
# Even though we never requested volumev2 from the catalog, we should
|
|
# wind up re-constructing it via version discovery and re-appending
|
|
# the project_id to the URL
|
|
self.assertEqual(self.TEST_VOLUME.versions['v2'].service.public,
|
|
v2_data.url)
|
|
self.assertEqual(self.TEST_VOLUME.versions['v2'].service.public,
|
|
v2_data.service_url)
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, v2_data.catalog_url)
|
|
self.assertEqual(None, v2_data.min_microversion)
|
|
self.assertEqual(None, v2_data.max_microversion)
|
|
|
|
def test_get_versioned_data_volume_project_id_unversioned_first(self):
|
|
|
|
disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
|
|
# The version discovery dict will not have a project_id
|
|
disc.add_nova_microversion(
|
|
href=self.TEST_VOLUME.versions['v3'].discovery.public,
|
|
id='v3.0', status='CURRENT',
|
|
min_version='3.0', version='3.20')
|
|
|
|
# Adding a v2 version to a service named volumev3 is not
|
|
# an error. The service itself is cinder and has more than
|
|
# one major version.
|
|
disc.add_nova_microversion(
|
|
href=self.TEST_VOLUME.versions['v2'].discovery.public,
|
|
id='v2.0', status='SUPPORTED')
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'], text=body)
|
|
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
# cinder endpoint ends in v3, we should not make an API call
|
|
endpoint = a.get_endpoint(session=s,
|
|
service_type='volumev3',
|
|
interface='public',
|
|
version='3.0')
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, endpoint)
|
|
|
|
resps = [{'json': disc}, {'status_code': 500}]
|
|
|
|
# We should only try to fetch the unversioned non-project_id url once
|
|
self.requests_mock.get(self.TEST_VOLUME.unversioned.public, resps)
|
|
|
|
# Fetch v2.0 first - since that doesn't match endpoint optimization,
|
|
# it should fetch the unversioned endpoint
|
|
v2_data = s.get_endpoint_data(service_type='volumev3',
|
|
interface='public',
|
|
version='2.0',
|
|
project_id=self.project_id)
|
|
|
|
# Even though we never requested volumev2 from the catalog, we should
|
|
# wind up re-constructing it via version discovery and re-appending
|
|
# the project_id to the URL
|
|
self.assertEqual(self.TEST_VOLUME.versions['v2'].service.public,
|
|
v2_data.url)
|
|
self.assertEqual(self.TEST_VOLUME.versions['v2'].service.public,
|
|
v2_data.service_url)
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, v2_data.catalog_url)
|
|
self.assertEqual(None, v2_data.min_microversion)
|
|
self.assertEqual(None, v2_data.max_microversion)
|
|
|
|
# Since we fetched from the unversioned endpoint to satisfy the
|
|
# request for v2, we should have all the relevant data cached in the
|
|
# discovery object - and should not fetch anything new.
|
|
v3_data = v2_data.get_versioned_data(
|
|
s, version='3.0', project_id=self.project_id)
|
|
|
|
self.assertEqual(self.TEST_VOLUME.versions['v3'].service.public,
|
|
v3_data.url)
|
|
self.assertEqual(self.TEST_VOLUME.catalog.public, v3_data.catalog_url)
|
|
self.assertEqual((3, 0), v3_data.min_microversion)
|
|
self.assertEqual((3, 20), v3_data.max_microversion)
|
|
self.assertEqual(self.TEST_VOLUME.versions['v3'].service.public,
|
|
v3_data.service_url)
|
|
|
|
def test_asking_for_auth_endpoint_ignores_checks(self):
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
auth_url = s.get_endpoint(service_type='compute',
|
|
interface=plugin.AUTH_INTERFACE)
|
|
|
|
self.assertEqual(self.TEST_URL, auth_url)
|
|
|
|
def _create_expired_auth_plugin(self, **kwargs):
|
|
expires = _utils.before_utcnow(minutes=20)
|
|
expired_token = self.get_auth_data(expires=expires)
|
|
expired_auth_ref = access.create(body=expired_token)
|
|
|
|
body = 'SUCCESS'
|
|
self.stub_url('GET', ['path'],
|
|
base_url=self.TEST_COMPUTE_ADMIN, text=body)
|
|
|
|
a = self.create_auth_plugin(**kwargs)
|
|
a.auth_ref = expired_auth_ref
|
|
return a
|
|
|
|
def test_reauthenticate(self):
|
|
a = self._create_expired_auth_plugin()
|
|
expired_auth_ref = a.auth_ref
|
|
s = session.Session(auth=a)
|
|
self.assertIsNot(expired_auth_ref, a.get_access(s))
|
|
|
|
def test_no_reauthenticate(self):
|
|
a = self._create_expired_auth_plugin(reauthenticate=False)
|
|
expired_auth_ref = a.auth_ref
|
|
s = session.Session(auth=a)
|
|
self.assertIs(expired_auth_ref, a.get_access(s))
|
|
|
|
def test_invalidate(self):
|
|
a = self.create_auth_plugin()
|
|
s = session.Session(auth=a)
|
|
|
|
# trigger token fetching
|
|
s.get_auth_headers()
|
|
|
|
self.assertTrue(a.auth_ref)
|
|
self.assertTrue(a.invalidate())
|
|
self.assertIsNone(a.auth_ref)
|
|
self.assertFalse(a.invalidate())
|
|
|
|
def test_get_auth_properties(self):
|
|
a = self.create_auth_plugin()
|
|
s = session.Session()
|
|
|
|
self.assertEqual(self.user_id, a.get_user_id(s))
|
|
self.assertEqual(self.project_id, a.get_project_id(s))
|
|
|
|
def assertAccessInfoEqual(self, a, b):
|
|
self.assertEqual(a.auth_token, b.auth_token)
|
|
self.assertEqual(a._data, b._data)
|
|
|
|
def test_check_cache_id_match(self):
|
|
a = self.create_auth_plugin()
|
|
b = self.create_auth_plugin()
|
|
|
|
self.assertIsNot(a, b)
|
|
self.assertIsNone(a.get_auth_state())
|
|
self.assertIsNone(b.get_auth_state())
|
|
|
|
a_id = a.get_cache_id()
|
|
b_id = b.get_cache_id()
|
|
|
|
self.assertIsNotNone(a_id)
|
|
self.assertIsNotNone(b_id)
|
|
|
|
self.assertEqual(a_id, b_id)
|
|
|
|
def test_check_cache_id_no_match(self):
|
|
a = self.create_auth_plugin(project_id='a')
|
|
b = self.create_auth_plugin(project_id='b')
|
|
|
|
self.assertIsNot(a, b)
|
|
self.assertIsNone(a.get_auth_state())
|
|
self.assertIsNone(b.get_auth_state())
|
|
|
|
a_id = a.get_cache_id()
|
|
b_id = b.get_cache_id()
|
|
|
|
self.assertIsNotNone(a_id)
|
|
self.assertIsNotNone(b_id)
|
|
|
|
self.assertNotEqual(a_id, b_id)
|
|
|
|
def test_get_set_auth_state(self):
|
|
a = self.create_auth_plugin()
|
|
b = self.create_auth_plugin()
|
|
|
|
self.assertEqual(a.get_cache_id(), b.get_cache_id())
|
|
|
|
s = session.Session()
|
|
|
|
a_token = a.get_token(s)
|
|
|
|
self.assertEqual(1, self.requests_mock.call_count)
|
|
|
|
auth_state = a.get_auth_state()
|
|
|
|
self.assertIsNotNone(auth_state)
|
|
|
|
b.set_auth_state(auth_state)
|
|
|
|
b_token = b.get_token(s)
|
|
self.assertEqual(1, self.requests_mock.call_count)
|
|
|
|
self.assertEqual(a_token, b_token)
|
|
self.assertAccessInfoEqual(a.auth_ref, b.auth_ref)
|
|
|
|
|
|
class V3(CommonIdentityTests, utils.TestCase):
|
|
|
|
@property
|
|
def version(self):
|
|
return 'v3'
|
|
|
|
def get_auth_data(self, **kwargs):
|
|
kwargs.setdefault('project_id', self.PROJECT_ID)
|
|
token = fixture.V3Token(**kwargs)
|
|
region = 'RegionOne'
|
|
|
|
svc = token.add_service('identity')
|
|
svc.add_standard_endpoints(admin=self.TEST_ADMIN_URL, region=region)
|
|
|
|
svc = token.add_service('compute')
|
|
svc.add_standard_endpoints(admin=self.TEST_COMPUTE_ADMIN,
|
|
public=self.TEST_COMPUTE_PUBLIC,
|
|
internal=self.TEST_COMPUTE_INTERNAL,
|
|
region=region)
|
|
|
|
svc = token.add_service('volumev2')
|
|
svc.add_standard_endpoints(
|
|
admin=self.TEST_VOLUME.versions['v2'].service.admin,
|
|
public=self.TEST_VOLUME.versions['v2'].service.public,
|
|
internal=self.TEST_VOLUME.versions['v2'].service.internal,
|
|
region=region)
|
|
|
|
svc = token.add_service('volumev3')
|
|
svc.add_standard_endpoints(
|
|
admin=self.TEST_VOLUME.versions['v3'].service.admin,
|
|
public=self.TEST_VOLUME.versions['v3'].service.public,
|
|
internal=self.TEST_VOLUME.versions['v3'].service.internal,
|
|
region=region)
|
|
|
|
svc = token.add_service('baremetal')
|
|
svc.add_standard_endpoints(
|
|
internal=self.TEST_BAREMETAL_INTERNAL,
|
|
region=region)
|
|
|
|
return token
|
|
|
|
def stub_auth(self, subject_token=None, **kwargs):
|
|
if not subject_token:
|
|
subject_token = self.TEST_TOKEN
|
|
|
|
kwargs.setdefault('headers', {})['X-Subject-Token'] = subject_token
|
|
self.stub_url('POST', ['auth', 'tokens'], **kwargs)
|
|
|
|
def create_auth_plugin(self, **kwargs):
|
|
kwargs.setdefault('auth_url', self.TEST_URL)
|
|
kwargs.setdefault('username', self.TEST_USER)
|
|
kwargs.setdefault('password', self.TEST_PASS)
|
|
return identity.V3Password(**kwargs)
|
|
|
|
|
|
class V2(CommonIdentityTests, utils.TestCase):
|
|
|
|
@property
|
|
def version(self):
|
|
return 'v2.0'
|
|
|
|
def create_auth_plugin(self, **kwargs):
|
|
kwargs.setdefault('auth_url', self.TEST_URL)
|
|
kwargs.setdefault('username', self.TEST_USER)
|
|
kwargs.setdefault('password', self.TEST_PASS)
|
|
|
|
try:
|
|
kwargs.setdefault('tenant_id', kwargs.pop('project_id'))
|
|
except KeyError:
|
|
pass
|
|
|
|
try:
|
|
kwargs.setdefault('tenant_name', kwargs.pop('project_name'))
|
|
except KeyError:
|
|
pass
|
|
|
|
return identity.V2Password(**kwargs)
|
|
|
|
def get_auth_data(self, **kwargs):
|
|
kwargs.setdefault('tenant_id', self.PROJECT_ID)
|
|
token = fixture.V2Token(**kwargs)
|
|
region = 'RegionOne'
|
|
|
|
svc = token.add_service('identity')
|
|
svc.add_endpoint(self.TEST_ADMIN_URL, region=region)
|
|
|
|
svc = token.add_service('compute')
|
|
svc.add_endpoint(public=self.TEST_COMPUTE_PUBLIC,
|
|
internal=self.TEST_COMPUTE_INTERNAL,
|
|
admin=self.TEST_COMPUTE_ADMIN,
|
|
region=region)
|
|
|
|
svc = token.add_service('volumev2')
|
|
svc.add_endpoint(
|
|
admin=self.TEST_VOLUME.versions['v2'].service.admin,
|
|
public=self.TEST_VOLUME.versions['v2'].service.public,
|
|
internal=self.TEST_VOLUME.versions['v2'].service.internal,
|
|
region=region)
|
|
|
|
svc = token.add_service('volumev3')
|
|
svc.add_endpoint(
|
|
admin=self.TEST_VOLUME.versions['v3'].service.admin,
|
|
public=self.TEST_VOLUME.versions['v3'].service.public,
|
|
internal=self.TEST_VOLUME.versions['v3'].service.internal,
|
|
region=region)
|
|
|
|
svc = token.add_service('baremetal')
|
|
svc.add_endpoint(
|
|
public=None, admin=None,
|
|
internal=self.TEST_BAREMETAL_INTERNAL,
|
|
region=region)
|
|
|
|
return token
|
|
|
|
def stub_auth(self, **kwargs):
|
|
self.stub_url('POST', ['tokens'], **kwargs)
|
|
|
|
|
|
class CatalogHackTests(utils.TestCase):
|
|
|
|
TEST_URL = 'http://keystone.server:5000/v2.0'
|
|
OTHER_URL = 'http://other.server:5000/path'
|
|
|
|
IDENTITY = 'identity'
|
|
|
|
BASE_URL = 'http://keystone.server:5000/'
|
|
V2_URL = BASE_URL + 'v2.0'
|
|
V3_URL = BASE_URL + 'v3'
|
|
|
|
def test_getting_endpoints(self):
|
|
disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
self.stub_url('GET',
|
|
['/'],
|
|
base_url=self.BASE_URL,
|
|
json=disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
interface='public',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
def test_returns_original_when_discover_fails(self):
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
self.stub_url('GET', [], base_url=self.BASE_URL, status_code=404)
|
|
self.stub_url('GET', [], base_url=self.V2_URL, status_code=404)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
interface='public',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.V2_URL, endpoint)
|
|
|
|
def test_returns_original_skipping_discovery(self):
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
interface='public',
|
|
skip_discovery=True,
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.V2_URL, endpoint)
|
|
|
|
def test_endpoint_override_skips_discovery(self):
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
endpoint = sess.get_endpoint(endpoint_override=self.OTHER_URL,
|
|
service_type=self.IDENTITY,
|
|
interface='public',
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.OTHER_URL, endpoint)
|
|
|
|
def test_endpoint_override_data_runs_discovery(self):
|
|
common_disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
common_disc.add_microversion(href=self.OTHER_URL, id='v2.1',
|
|
min_version='2.1', max_version='2.35')
|
|
|
|
common_m = self.stub_url('GET',
|
|
base_url=self.OTHER_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
data = sess.get_endpoint_data(endpoint_override=self.OTHER_URL,
|
|
service_type=self.IDENTITY,
|
|
interface='public',
|
|
version=(2, 0))
|
|
|
|
self.assertTrue(common_m.called)
|
|
self.assertEqual(self.OTHER_URL, data.url)
|
|
self.assertEqual((2, 1), data.min_microversion)
|
|
self.assertEqual((2, 35), data.max_microversion)
|
|
|
|
def test_forcing_discovery(self):
|
|
v2_disc = fixture.V2Discovery(self.V2_URL)
|
|
common_disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
|
|
v2_m = self.stub_url('GET',
|
|
['v2.0'],
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json={'version': v2_disc})
|
|
|
|
common_m = self.stub_url('GET',
|
|
[],
|
|
base_url=self.BASE_URL,
|
|
status_code=300,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
data = sess.get_endpoint_data(service_type=self.IDENTITY,
|
|
discover_versions=True)
|
|
|
|
# We should get the v2 document, but not the unversioned
|
|
self.assertTrue(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
# got v2 url
|
|
self.assertEqual(self.V2_URL, data.url)
|
|
|
|
def test_forcing_discovery_list_returns_url(self):
|
|
common_disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
|
|
# 2.0 doesn't usually return a list. This is testing that if
|
|
# the catalog url returns an endpoint that has a discovery document
|
|
# with more than one URL and that a different url would be returned
|
|
# by "return the latest" rules, that we get the info of the url from
|
|
# the catalog if we don't provide a version but do provide
|
|
# discover_versions
|
|
v2_m = self.stub_url('GET',
|
|
['v2.0'],
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(v2_m.called)
|
|
|
|
data = sess.get_endpoint_data(service_type=self.IDENTITY,
|
|
discover_versions=True)
|
|
|
|
# We should make the one call
|
|
self.assertTrue(v2_m.called)
|
|
|
|
# got v2 url
|
|
self.assertEqual(self.V2_URL, data.url)
|
|
|
|
def test_latest_version_gets_latest_version(self):
|
|
common_disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
|
|
# 2.0 doesn't usually return a list. But we're testing version matching
|
|
# rules, so it's nice to ensure that we don't fallback to something
|
|
v2_m = self.stub_url('GET',
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(v2_m.called)
|
|
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
version='latest')
|
|
|
|
# We should make the one call
|
|
self.assertTrue(v2_m.called)
|
|
|
|
# And get the v3 url
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
# Make sure latest logic works for min and max version
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
max_version='latest')
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='latest')
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='latest',
|
|
max_version='latest')
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
self.assertRaises(TypeError, sess.get_endpoint,
|
|
service_type=self.IDENTITY,
|
|
min_version='latest', max_version='3.0')
|
|
|
|
def test_version_range(self):
|
|
v2_disc = fixture.V2Discovery(self.V2_URL)
|
|
common_disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
|
|
def stub_urls():
|
|
v2_m = self.stub_url('GET',
|
|
['v2.0'],
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json={'version': v2_disc})
|
|
common_m = self.stub_url('GET',
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
return v2_m, common_m
|
|
v2_m, common_m = stub_urls()
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(v2_m.called)
|
|
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='2.0', max_version='3.0')
|
|
|
|
# We should make the one call
|
|
self.assertFalse(v2_m.called)
|
|
self.assertTrue(common_m.called)
|
|
|
|
# And get the v3 url
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
v2_m, common_m = stub_urls()
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='1', max_version='2')
|
|
|
|
# We should make one more calls
|
|
# TODO(mordred) optimize this - we can peek in the cache
|
|
self.assertTrue(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
# And get the v2 url
|
|
self.assertEqual(self.V2_URL, endpoint)
|
|
|
|
v2_m, common_m = stub_urls()
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='4')
|
|
|
|
# We should make no more calls
|
|
self.assertFalse(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
# And get no url
|
|
self.assertEqual(None, endpoint)
|
|
|
|
v2_m, common_m = stub_urls()
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
min_version='2')
|
|
|
|
# We should make no more calls
|
|
self.assertFalse(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
# And get the v3 url
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
v2_m, common_m = stub_urls()
|
|
self.assertRaises(TypeError, sess.get_endpoint,
|
|
service_type=self.IDENTITY, version=3,
|
|
min_version='2')
|
|
|
|
# We should make no more calls
|
|
self.assertFalse(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
def test_get_endpoint_data(self):
|
|
common_disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
common_disc.add_microversion(href=self.OTHER_URL, id='v2.1',
|
|
min_version='2.1', max_version='2.35')
|
|
|
|
common_m = self.stub_url('GET',
|
|
base_url=self.OTHER_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service('network')
|
|
service.add_endpoint(public=self.OTHER_URL,
|
|
admin=self.OTHER_URL,
|
|
internal=self.OTHER_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(common_m.called)
|
|
|
|
data = sess.get_endpoint_data(service_type='network',
|
|
min_version='2.0', max_version='3.0')
|
|
|
|
# We should make the one call
|
|
self.assertTrue(common_m.called)
|
|
|
|
# And get the v3 url
|
|
self.assertEqual(self.OTHER_URL, data.url)
|
|
self.assertEqual((2, 1), data.min_microversion)
|
|
self.assertEqual((2, 35), data.max_microversion)
|
|
|
|
def test_get_endpoint_data_compute(self):
|
|
common_disc = fixture.DiscoveryList(v2=False, v3=False)
|
|
common_disc.add_nova_microversion(href=self.OTHER_URL, id='v2.1',
|
|
min_version='2.1', version='2.35')
|
|
|
|
common_m = self.stub_url('GET',
|
|
base_url=self.OTHER_URL,
|
|
status_code=200,
|
|
json=common_disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service('compute')
|
|
service.add_endpoint(public=self.OTHER_URL,
|
|
admin=self.OTHER_URL,
|
|
internal=self.OTHER_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(common_m.called)
|
|
|
|
data = sess.get_endpoint_data(service_type='compute',
|
|
min_version='2.0', max_version='3.0')
|
|
|
|
# We should make the one call
|
|
self.assertTrue(common_m.called)
|
|
|
|
# And get the v3 url
|
|
self.assertEqual(self.OTHER_URL, data.url)
|
|
self.assertEqual((2, 1), data.min_microversion)
|
|
self.assertEqual((2, 35), data.max_microversion)
|
|
|
|
def test_getting_endpoints_on_auth_interface(self):
|
|
disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
self.stub_url('GET',
|
|
['/'],
|
|
base_url=self.BASE_URL,
|
|
status_code=300,
|
|
json=disc)
|
|
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
endpoint = sess.get_endpoint(interface=plugin.AUTH_INTERFACE,
|
|
version=(3, 0))
|
|
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
def test_setting_no_discover_hack(self):
|
|
v2_disc = fixture.V2Discovery(self.V2_URL)
|
|
common_disc = fixture.DiscoveryList(href=self.BASE_URL)
|
|
|
|
v2_m = self.stub_url('GET',
|
|
['v2.0'],
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
json=v2_disc)
|
|
|
|
common_m = self.stub_url('GET',
|
|
[],
|
|
base_url=self.BASE_URL,
|
|
status_code=300,
|
|
json=common_disc)
|
|
|
|
resp_text = uuid.uuid4().hex
|
|
|
|
resp_m = self.stub_url('GET',
|
|
['v3', 'path'],
|
|
base_url=self.BASE_URL,
|
|
status_code=200,
|
|
text=resp_text)
|
|
|
|
# it doesn't matter that we auth with v2 here, discovery hack is in
|
|
# base. All identity endpoints point to v2 urls.
|
|
token = fixture.V2Token()
|
|
service = token.add_service(self.IDENTITY)
|
|
service.add_endpoint(public=self.V2_URL,
|
|
admin=self.V2_URL,
|
|
internal=self.V2_URL)
|
|
|
|
self.stub_url('POST',
|
|
['tokens'],
|
|
base_url=self.V2_URL,
|
|
json=token)
|
|
|
|
v2_auth = identity.V2Password(self.V2_URL,
|
|
username=uuid.uuid4().hex,
|
|
password=uuid.uuid4().hex)
|
|
|
|
sess = session.Session(auth=v2_auth)
|
|
|
|
# v2 auth with v2 url doesn't make any discovery calls.
|
|
self.assertFalse(v2_m.called)
|
|
self.assertFalse(common_m.called)
|
|
|
|
# v3 endpoint with hack will strip v2 suffix and call root discovery
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
version=(3, 0),
|
|
allow_version_hack=True)
|
|
|
|
# got v3 url
|
|
self.assertEqual(self.V3_URL, endpoint)
|
|
|
|
# only called root discovery.
|
|
self.assertFalse(v2_m.called)
|
|
self.assertTrue(common_m.called_once)
|
|
|
|
# with hack turned off it calls v2 discovery and finds nothing
|
|
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
|
|
version=(3, 0),
|
|
allow_version_hack=False)
|
|
self.assertIsNone(endpoint)
|
|
|
|
# this one called v2
|
|
self.assertTrue(v2_m.called_once)
|
|
self.assertTrue(common_m.called_once)
|
|
|
|
# get_endpoint returning None raises EndpointNotFound when requesting
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
sess.get,
|
|
'/path',
|
|
endpoint_filter={'service_type': 'identity',
|
|
'version': (3, 0),
|
|
'allow_version_hack': False})
|
|
|
|
self.assertFalse(resp_m.called)
|
|
|
|
# works when allow_version_hack is set
|
|
resp = sess.get('/path',
|
|
endpoint_filter={'service_type': 'identity',
|
|
'version': (3, 0),
|
|
'allow_version_hack': True})
|
|
|
|
self.assertTrue(resp_m.called_once)
|
|
self.assertEqual(resp_text, resp.text)
|
|
|
|
|
|
class GenericPlugin(plugin.BaseAuthPlugin):
|
|
|
|
BAD_TOKEN = uuid.uuid4().hex
|
|
|
|
def __init__(self):
|
|
super(GenericPlugin, self).__init__()
|
|
|
|
self.endpoint = 'http://keystone.host:5000'
|
|
|
|
self.headers = {'headerA': 'valueA',
|
|
'headerB': 'valueB'}
|
|
|
|
self.cert = '/path/to/cert'
|
|
self.connection_params = {'cert': self.cert, 'verify': False}
|
|
|
|
def url(self, prefix):
|
|
return '%s/%s' % (self.endpoint, prefix)
|
|
|
|
def get_token(self, session, **kwargs):
|
|
# NOTE(jamielennox): by specifying get_headers this should not be used
|
|
return self.BAD_TOKEN
|
|
|
|
def get_headers(self, session, **kwargs):
|
|
return self.headers
|
|
|
|
def get_endpoint(self, session, **kwargs):
|
|
return self.endpoint
|
|
|
|
def get_connection_params(self, session, **kwargs):
|
|
return self.connection_params
|
|
|
|
|
|
class GenericAuthPluginTests(utils.TestCase):
|
|
|
|
# filter doesn't matter to GenericPlugin, but we have to specify one
|
|
ENDPOINT_FILTER = {uuid.uuid4().hex: uuid.uuid4().hex}
|
|
|
|
def setUp(self):
|
|
super(GenericAuthPluginTests, self).setUp()
|
|
self.auth = GenericPlugin()
|
|
self.session = session.Session(auth=self.auth)
|
|
|
|
def test_setting_headers(self):
|
|
text = uuid.uuid4().hex
|
|
self.stub_url('GET', base_url=self.auth.url('prefix'), text=text)
|
|
|
|
resp = self.session.get('prefix', endpoint_filter=self.ENDPOINT_FILTER)
|
|
|
|
self.assertEqual(text, resp.text)
|
|
|
|
for k, v in self.auth.headers.items():
|
|
self.assertRequestHeaderEqual(k, v)
|
|
|
|
self.assertIsNone(self.session.get_token())
|
|
self.assertEqual(self.auth.headers,
|
|
self.session.get_auth_headers())
|
|
self.assertNotIn('X-Auth-Token',
|
|
self.requests_mock.last_request.headers)
|
|
|
|
def test_setting_connection_params(self):
|
|
text = uuid.uuid4().hex
|
|
self.stub_url('GET', base_url=self.auth.url('prefix'), text=text)
|
|
|
|
resp = self.session.get('prefix',
|
|
endpoint_filter=self.ENDPOINT_FILTER)
|
|
|
|
self.assertEqual(text, resp.text)
|
|
|
|
# the cert and verify values passed to request are those that were
|
|
# returned from the auth plugin as connection params.
|
|
self.assertEqual(self.auth.cert, self.requests_mock.last_request.cert)
|
|
self.assertFalse(self.requests_mock.last_request.verify)
|
|
|
|
def test_setting_bad_connection_params(self):
|
|
# The uuid name parameter here is unknown and not in the allowed params
|
|
# to be returned to the session and so an error will be raised.
|
|
name = uuid.uuid4().hex
|
|
self.auth.connection_params[name] = uuid.uuid4().hex
|
|
|
|
e = self.assertRaises(exceptions.UnsupportedParameters,
|
|
self.session.get,
|
|
'prefix',
|
|
endpoint_filter=self.ENDPOINT_FILTER)
|
|
|
|
self.assertIn(name, str(e))
|