
This module mockpatch of oslotest[1] is deprecated since version 1.13 and may be removed in version 2.0. Use fixtures.Mock* classes instead[2] [1]OpenStack Testing Framework and Utilities [2]https://docs.openstack.org/developer/oslotest/api/oslotest.mockpatch.html#module-oslotest.mockpatch Change-Id: Ia0ae1fd162b048e431d202633244c14706436e31
916 lines
34 KiB
Python
916 lines
34 KiB
Python
# Copyright 2014 IBM Corp.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
import datetime
|
|
|
|
import fixtures
|
|
import testtools
|
|
|
|
from tempest.lib import auth
|
|
from tempest.lib import exceptions
|
|
from tempest.lib.services.identity.v2 import token_client as v2_client
|
|
from tempest.lib.services.identity.v3 import token_client as v3_client
|
|
from tempest.tests import base
|
|
from tempest.tests.lib import fake_credentials
|
|
from tempest.tests.lib import fake_identity
|
|
|
|
|
|
def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs):
|
|
return fake_credentials.FakeCredentials()
|
|
|
|
|
|
class BaseAuthTestsSetUp(base.TestCase):
|
|
_auth_provider_class = None
|
|
credentials = fake_credentials.FakeCredentials()
|
|
|
|
def _auth(self, credentials, auth_url, **params):
|
|
"""returns auth method according to keystone"""
|
|
return self._auth_provider_class(credentials, auth_url, **params)
|
|
|
|
def setUp(self):
|
|
super(BaseAuthTestsSetUp, self).setUp()
|
|
self.patchobject(auth, 'get_credentials', fake_get_credentials)
|
|
self.auth_provider = self._auth(self.credentials,
|
|
fake_identity.FAKE_AUTH_URL)
|
|
|
|
|
|
class TestBaseAuthProvider(BaseAuthTestsSetUp):
|
|
"""Tests for base AuthProvider
|
|
|
|
This tests auth.AuthProvider class which is base for the other so we
|
|
obviously don't test not implemented method or the ones which strongly
|
|
depends on them.
|
|
"""
|
|
|
|
class FakeAuthProviderImpl(auth.AuthProvider):
|
|
def _decorate_request(self):
|
|
pass
|
|
|
|
def _fill_credentials(self):
|
|
pass
|
|
|
|
def _get_auth(self):
|
|
pass
|
|
|
|
def base_url(self):
|
|
pass
|
|
|
|
def is_expired(self):
|
|
pass
|
|
|
|
_auth_provider_class = FakeAuthProviderImpl
|
|
|
|
def _auth(self, credentials, auth_url, **params):
|
|
"""returns auth method according to keystone"""
|
|
return self._auth_provider_class(credentials, **params)
|
|
|
|
def test_check_credentials_bad_type(self):
|
|
self.assertFalse(self.auth_provider.check_credentials([]))
|
|
|
|
def test_auth_data_property_when_cache_exists(self):
|
|
self.auth_provider.cache = 'foo'
|
|
self.useFixture(fixtures.MockPatchObject(self.auth_provider,
|
|
'is_expired',
|
|
return_value=False))
|
|
self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
|
|
|
|
def test_delete_auth_data_property_through_deleter(self):
|
|
self.auth_provider.cache = 'foo'
|
|
del self.auth_provider.auth_data
|
|
self.assertIsNone(self.auth_provider.cache)
|
|
|
|
def test_delete_auth_data_property_through_clear_auth(self):
|
|
self.auth_provider.cache = 'foo'
|
|
self.auth_provider.clear_auth()
|
|
self.assertIsNone(self.auth_provider.cache)
|
|
|
|
def test_set_and_reset_alt_auth_data(self):
|
|
self.auth_provider.set_alt_auth_data('foo', 'bar')
|
|
self.assertEqual(self.auth_provider.alt_part, 'foo')
|
|
self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
|
|
|
|
self.auth_provider.reset_alt_auth_data()
|
|
self.assertIsNone(self.auth_provider.alt_part)
|
|
self.assertIsNone(self.auth_provider.alt_auth_data)
|
|
|
|
def test_auth_class(self):
|
|
self.assertRaises(TypeError,
|
|
auth.AuthProvider,
|
|
fake_credentials.FakeCredentials)
|
|
|
|
|
|
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
|
|
_endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
|
|
_auth_provider_class = auth.KeystoneV2AuthProvider
|
|
credentials = fake_credentials.FakeKeystoneV2Credentials()
|
|
|
|
def setUp(self):
|
|
super(TestKeystoneV2AuthProvider, self).setUp()
|
|
self.patchobject(v2_client.TokenClient, 'raw_request',
|
|
fake_identity._fake_v2_response)
|
|
self.target_url = 'test_api'
|
|
|
|
def _get_fake_identity(self):
|
|
return fake_identity.IDENTITY_V2_RESPONSE['access']
|
|
|
|
def _get_fake_alt_identity(self):
|
|
return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
|
|
|
|
def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
|
|
replacement=None):
|
|
if replacement:
|
|
return ep[endpoint_type].replace('v2', replacement)
|
|
return ep[endpoint_type]
|
|
|
|
def _get_token_from_fake_identity(self):
|
|
return fake_identity.TOKEN
|
|
|
|
def _get_from_fake_identity(self, attr):
|
|
access = fake_identity.IDENTITY_V2_RESPONSE['access']
|
|
if attr == 'user_id':
|
|
return access['user']['id']
|
|
elif attr == 'tenant_id':
|
|
return access['token']['tenant']['id']
|
|
|
|
def _test_request_helper(self, filters, expected):
|
|
url, headers, body = self.auth_provider.auth_request('GET',
|
|
self.target_url,
|
|
filters=filters)
|
|
|
|
self.assertEqual(expected['url'], url)
|
|
self.assertEqual(expected['token'], headers['X-Auth-Token'])
|
|
self.assertEqual(expected['body'], body)
|
|
|
|
def _auth_data_with_expiry(self, date_as_string):
|
|
token, access = self.auth_provider.auth_data
|
|
access['token']['expires'] = date_as_string
|
|
return token, access
|
|
|
|
def test_request(self):
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
|
|
url = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
|
|
|
|
expected = {
|
|
'body': None,
|
|
'url': url,
|
|
'token': self._get_token_from_fake_identity(),
|
|
}
|
|
self._test_request_helper(filters, expected)
|
|
|
|
def test_request_with_alt_auth_cleans_alt(self):
|
|
"""Test alternate auth data for headers
|
|
|
|
Assert that when the alt data is provided for headers, after an
|
|
auth_request the data alt_data is cleaned-up.
|
|
"""
|
|
self.auth_provider.set_alt_auth_data(
|
|
'headers',
|
|
(fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
self.auth_provider.auth_request('GET', self.target_url,
|
|
filters=filters)
|
|
|
|
# Assert alt auth data is clear after it
|
|
self.assertIsNone(self.auth_provider.alt_part)
|
|
self.assertIsNone(self.auth_provider.alt_auth_data)
|
|
|
|
def _test_request_with_identical_alt_auth(self, part):
|
|
"""Test alternate but identical auth data for headers
|
|
|
|
Assert that when the alt data is provided, but it's actually
|
|
identical, an exception is raised.
|
|
"""
|
|
self.auth_provider.set_alt_auth_data(
|
|
part,
|
|
(fake_identity.TOKEN, self._get_fake_identity()))
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
|
|
self.assertRaises(exceptions.BadAltAuth,
|
|
self.auth_provider.auth_request,
|
|
'GET', self.target_url, filters=filters)
|
|
|
|
def test_request_with_identical_alt_auth_headers(self):
|
|
self._test_request_with_identical_alt_auth('headers')
|
|
|
|
def test_request_with_identical_alt_auth_url(self):
|
|
self._test_request_with_identical_alt_auth('url')
|
|
|
|
def test_request_with_identical_alt_auth_body(self):
|
|
self._test_request_with_identical_alt_auth('body')
|
|
|
|
def test_request_with_alt_part_without_alt_data(self):
|
|
"""Test empty alternate auth data
|
|
|
|
Assert that when alt_part is defined, the corresponding original
|
|
request element is kept the same.
|
|
"""
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
self.auth_provider.set_alt_auth_data('headers', None)
|
|
|
|
url, headers, body = self.auth_provider.auth_request('GET',
|
|
self.target_url,
|
|
filters=filters)
|
|
# The original headers where empty
|
|
self.assertNotEqual(url, self.target_url)
|
|
self.assertIsNone(headers)
|
|
self.assertIsNone(body)
|
|
|
|
def _test_request_with_alt_part_without_alt_data_no_change(self, body):
|
|
"""Test empty alternate auth data with no effect
|
|
|
|
Assert that when alt_part is defined, no auth_data is provided,
|
|
and the corresponding original request element was not going to
|
|
be changed anyways, and exception is raised
|
|
"""
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
self.auth_provider.set_alt_auth_data('body', None)
|
|
|
|
self.assertRaises(exceptions.BadAltAuth,
|
|
self.auth_provider.auth_request,
|
|
'GET', self.target_url, filters=filters)
|
|
|
|
def test_request_with_alt_part_without_alt_data_no_change_headers(self):
|
|
self._test_request_with_alt_part_without_alt_data_no_change('headers')
|
|
|
|
def test_request_with_alt_part_without_alt_data_no_change_url(self):
|
|
self._test_request_with_alt_part_without_alt_data_no_change('url')
|
|
|
|
def test_request_with_alt_part_without_alt_data_no_change_body(self):
|
|
self._test_request_with_alt_part_without_alt_data_no_change('body')
|
|
|
|
def test_request_with_bad_service(self):
|
|
filters = {
|
|
'service': 'BAD_SERVICE',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self.auth_provider.auth_request, 'GET',
|
|
self.target_url, filters=filters)
|
|
|
|
def test_request_without_service(self):
|
|
filters = {
|
|
'service': None,
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'fakeRegion'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self.auth_provider.auth_request, 'GET',
|
|
self.target_url, filters=filters)
|
|
|
|
def test_check_credentials_missing_attribute(self):
|
|
for attr in ['username', 'password']:
|
|
cred = copy.copy(self.credentials)
|
|
del cred[attr]
|
|
self.assertFalse(self.auth_provider.check_credentials(cred))
|
|
|
|
def test_fill_credentials(self):
|
|
self.auth_provider.fill_credentials()
|
|
creds = self.auth_provider.credentials
|
|
for attr in ['user_id', 'tenant_id']:
|
|
self.assertEqual(self._get_from_fake_identity(attr),
|
|
getattr(creds, attr))
|
|
|
|
def _test_base_url_helper(self, expected_url, filters,
|
|
auth_data=None):
|
|
|
|
url = self.auth_provider.base_url(filters, auth_data)
|
|
self.assertEqual(url, expected_url)
|
|
|
|
def test_base_url(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_to_get_admin_endpoint(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'adminURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_unknown_region(self):
|
|
"""If the region is unknown, the first endpoint is returned."""
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'AintNoBodyKnowThisRegion'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][0])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_with_non_existent_service(self):
|
|
self.filters = {
|
|
'service': 'BAD_SERVICE',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self._test_base_url_helper, None, self.filters)
|
|
|
|
def test_base_url_without_service(self):
|
|
self.filters = {
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self._test_base_url_helper, None, self.filters)
|
|
|
|
def test_base_url_with_known_name(self):
|
|
"""If name and service is known, return the endpoint."""
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'name': 'nova'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_with_known_name_and_unknown_servce(self):
|
|
"""Test with Known Name and Unknown service
|
|
|
|
If the name is known but the service is unknown, raise an exception.
|
|
"""
|
|
self.filters = {
|
|
'service': 'AintNoBodyKnowThatService',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'name': 'AintNoBodyKnowThatName'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self._test_base_url_helper, None, self.filters)
|
|
|
|
def test_base_url_with_unknown_name_and_known_service(self):
|
|
"""Test with Unknown Name and Known Service
|
|
|
|
If the name is unknown, raise an exception. Note that filtering by
|
|
name is only successful service exists.
|
|
"""
|
|
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'name': 'AintNoBodyKnowThatName'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self._test_base_url_helper, None, self.filters)
|
|
|
|
def test_base_url_without_name(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_with_api_version_filter(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v12'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1], replacement='v12')
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_with_skip_path_filter(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'skip_path': True
|
|
}
|
|
expected = 'http://fake_url/'
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
def test_base_url_with_unversioned_endpoint(self):
|
|
auth_data = {
|
|
'serviceCatalog': [
|
|
{
|
|
'type': 'identity',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'publicURL': 'http://fake_url'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'identity',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v2.0'
|
|
}
|
|
|
|
expected = 'http://fake_url/v2.0'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
def test_base_url_with_extra_path_endpoint(self):
|
|
auth_data = {
|
|
'serviceCatalog': [
|
|
{
|
|
'type': 'compute',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'publicURL': 'http://fake_url/some_path/v2.0'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v2.0'
|
|
}
|
|
|
|
expected = 'http://fake_url/some_path/v2.0'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
def test_base_url_with_unversioned_extra_path_endpoint(self):
|
|
auth_data = {
|
|
'serviceCatalog': [
|
|
{
|
|
'type': 'compute',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'publicURL': 'http://fake_url/some_path'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v2.0'
|
|
}
|
|
|
|
expected = 'http://fake_url/some_path/v2.0'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
def test_token_not_expired(self):
|
|
expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
|
|
self._verify_expiry(expiry_data=expiry_data, should_be_expired=False)
|
|
|
|
def test_token_expired(self):
|
|
expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
|
|
self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
|
|
|
|
def test_token_not_expired_to_be_renewed(self):
|
|
expiry_data = (datetime.datetime.utcnow() +
|
|
self.auth_provider.token_expiry_threshold / 2)
|
|
self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
|
|
|
|
def _verify_expiry(self, expiry_data, should_be_expired):
|
|
for expiry_format in self.auth_provider.EXPIRY_DATE_FORMATS:
|
|
auth_data = self._auth_data_with_expiry(
|
|
expiry_data.strftime(expiry_format))
|
|
self.assertEqual(self.auth_provider.is_expired(auth_data),
|
|
should_be_expired)
|
|
|
|
def test_set_scope_all_valid(self):
|
|
for scope in self.auth_provider.SCOPES:
|
|
self.auth_provider.scope = scope
|
|
self.assertEqual(scope, self.auth_provider.scope)
|
|
|
|
def test_set_scope_invalid(self):
|
|
with testtools.ExpectedException(exceptions.InvalidScope,
|
|
'.* invalid_scope .*'):
|
|
self.auth_provider.scope = 'invalid_scope'
|
|
|
|
|
|
class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
|
|
_endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
|
|
_auth_provider_class = auth.KeystoneV3AuthProvider
|
|
credentials = fake_credentials.FakeKeystoneV3Credentials()
|
|
|
|
def setUp(self):
|
|
super(TestKeystoneV3AuthProvider, self).setUp()
|
|
self.patchobject(v3_client.V3TokenClient, 'raw_request',
|
|
fake_identity._fake_v3_response)
|
|
|
|
def _get_fake_identity(self):
|
|
return fake_identity.IDENTITY_V3_RESPONSE['token']
|
|
|
|
def _get_fake_alt_identity(self):
|
|
return fake_identity.ALT_IDENTITY_V3['token']
|
|
|
|
def _get_result_url_from_endpoint(self, ep, replacement=None):
|
|
if replacement:
|
|
return ep['url'].replace('v3', replacement)
|
|
return ep['url']
|
|
|
|
def _auth_data_with_expiry(self, date_as_string):
|
|
token, access = self.auth_provider.auth_data
|
|
access['expires_at'] = date_as_string
|
|
return token, access
|
|
|
|
def _get_from_fake_identity(self, attr):
|
|
token = fake_identity.IDENTITY_V3_RESPONSE['token']
|
|
if attr == 'user_id':
|
|
return token['user']['id']
|
|
elif attr == 'project_id':
|
|
return token['project']['id']
|
|
elif attr == 'user_domain_id':
|
|
return token['user']['domain']['id']
|
|
elif attr == 'project_domain_id':
|
|
return token['project']['domain']['id']
|
|
|
|
def test_check_credentials_missing_attribute(self):
|
|
# reset credentials to fresh ones
|
|
self.credentials.reset()
|
|
for attr in ['username', 'password', 'user_domain_name',
|
|
'project_domain_name']:
|
|
cred = copy.copy(self.credentials)
|
|
del cred[attr]
|
|
self.assertFalse(self.auth_provider.check_credentials(cred),
|
|
"Credentials should be invalid without %s" % attr)
|
|
|
|
def test_check_domain_credentials_missing_attribute(self):
|
|
# reset credentials to fresh ones
|
|
self.credentials.reset()
|
|
domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
|
|
for attr in ['username', 'password', 'user_domain_name']:
|
|
cred = copy.copy(domain_creds)
|
|
del cred[attr]
|
|
self.assertFalse(self.auth_provider.check_credentials(cred),
|
|
"Credentials should be invalid without %s" % attr)
|
|
|
|
def test_fill_credentials(self):
|
|
self.auth_provider.fill_credentials()
|
|
creds = self.auth_provider.credentials
|
|
for attr in ['user_id', 'project_id', 'user_domain_id',
|
|
'project_domain_id']:
|
|
self.assertEqual(self._get_from_fake_identity(attr),
|
|
getattr(creds, attr))
|
|
|
|
# Overwrites v2 test
|
|
def test_base_url_to_get_admin_endpoint(self):
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'admin',
|
|
'region': 'MiddleEarthRegion'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][2])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
# Overwrites v2 test
|
|
def test_base_url_with_unversioned_endpoint(self):
|
|
auth_data = {
|
|
'catalog': [
|
|
{
|
|
'type': 'identity',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'url': 'http://fake_url',
|
|
'interface': 'public'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'identity',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v3'
|
|
}
|
|
|
|
expected = 'http://fake_url/v3'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
def test_base_url_with_extra_path_endpoint(self):
|
|
auth_data = {
|
|
'catalog': [
|
|
{
|
|
'type': 'compute',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'url': 'http://fake_url/some_path/v2.0',
|
|
'interface': 'public'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v2.0'
|
|
}
|
|
|
|
expected = 'http://fake_url/some_path/v2.0'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
def test_base_url_with_unversioned_extra_path_endpoint(self):
|
|
auth_data = {
|
|
'catalog': [
|
|
{
|
|
'type': 'compute',
|
|
'endpoints': [
|
|
{
|
|
'region': 'FakeRegion',
|
|
'url': 'http://fake_url/some_path',
|
|
'interface': 'public'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion',
|
|
'api_version': 'v2.0'
|
|
}
|
|
|
|
expected = 'http://fake_url/some_path/v2.0'
|
|
self._test_base_url_helper(expected, filters, ('token', auth_data))
|
|
|
|
# Base URL test with scope only for V3
|
|
def test_base_url_scope_project(self):
|
|
self.auth_provider.scope = 'project'
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
expected = self._get_result_url_from_endpoint(
|
|
self._endpoints[0]['endpoints'][1])
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
# Base URL test with scope only for V3
|
|
def test_base_url_unscoped_identity(self):
|
|
self.auth_provider.scope = 'unscoped'
|
|
self.patchobject(v3_client.V3TokenClient, 'raw_request',
|
|
fake_identity._fake_v3_response_no_scope)
|
|
self.filters = {
|
|
'service': 'identity',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
expected = fake_identity.FAKE_AUTH_URL
|
|
self._test_base_url_helper(expected, self.filters)
|
|
|
|
# Base URL test with scope only for V3
|
|
def test_base_url_unscoped_other(self):
|
|
self.auth_provider.scope = 'unscoped'
|
|
self.patchobject(v3_client.V3TokenClient, 'raw_request',
|
|
fake_identity._fake_v3_response_no_scope)
|
|
self.filters = {
|
|
'service': 'compute',
|
|
'endpoint_type': 'publicURL',
|
|
'region': 'FakeRegion'
|
|
}
|
|
self.assertRaises(exceptions.EndpointNotFound,
|
|
self.auth_provider.base_url,
|
|
auth_data=self.auth_provider.auth_data,
|
|
filters=self.filters)
|
|
|
|
def test_auth_parameters_with_scope_unset(self):
|
|
# No scope defaults to 'project'
|
|
all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
|
|
self.auth_provider.credentials = all_creds
|
|
auth_params = self.auth_provider._auth_params()
|
|
self.assertNotIn('scope', auth_params.keys())
|
|
for attr in all_creds.get_init_attributes():
|
|
if attr.startswith('domain_'):
|
|
self.assertNotIn(attr, auth_params.keys())
|
|
else:
|
|
self.assertIn(attr, auth_params.keys())
|
|
self.assertEqual(getattr(all_creds, attr), auth_params[attr])
|
|
|
|
def test_auth_parameters_with_project_scope(self):
|
|
all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
|
|
self.auth_provider.credentials = all_creds
|
|
self.auth_provider.scope = 'project'
|
|
auth_params = self.auth_provider._auth_params()
|
|
self.assertNotIn('scope', auth_params.keys())
|
|
for attr in all_creds.get_init_attributes():
|
|
if attr.startswith('domain_'):
|
|
self.assertNotIn(attr, auth_params.keys())
|
|
else:
|
|
self.assertIn(attr, auth_params.keys())
|
|
self.assertEqual(getattr(all_creds, attr), auth_params[attr])
|
|
|
|
def test_auth_parameters_with_domain_scope(self):
|
|
all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
|
|
self.auth_provider.credentials = all_creds
|
|
self.auth_provider.scope = 'domain'
|
|
auth_params = self.auth_provider._auth_params()
|
|
self.assertNotIn('scope', auth_params.keys())
|
|
for attr in all_creds.get_init_attributes():
|
|
if attr.startswith('project_'):
|
|
self.assertNotIn(attr, auth_params.keys())
|
|
else:
|
|
self.assertIn(attr, auth_params.keys())
|
|
self.assertEqual(getattr(all_creds, attr), auth_params[attr])
|
|
|
|
def test_auth_parameters_unscoped(self):
|
|
all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
|
|
self.auth_provider.credentials = all_creds
|
|
self.auth_provider.scope = 'unscoped'
|
|
auth_params = self.auth_provider._auth_params()
|
|
self.assertNotIn('scope', auth_params.keys())
|
|
for attr in all_creds.get_init_attributes():
|
|
if attr.startswith('project_') or attr.startswith('domain_'):
|
|
self.assertNotIn(attr, auth_params.keys())
|
|
else:
|
|
self.assertIn(attr, auth_params.keys())
|
|
self.assertEqual(getattr(all_creds, attr), auth_params[attr])
|
|
|
|
|
|
class TestKeystoneV3Credentials(base.TestCase):
|
|
def testSetAttrUserDomain(self):
|
|
creds = auth.KeystoneV3Credentials()
|
|
creds.user_domain_name = 'user_domain'
|
|
creds.domain_name = 'domain'
|
|
self.assertEqual('user_domain', creds.user_domain_name)
|
|
creds = auth.KeystoneV3Credentials()
|
|
creds.domain_name = 'domain'
|
|
creds.user_domain_name = 'user_domain'
|
|
self.assertEqual('user_domain', creds.user_domain_name)
|
|
|
|
def testSetAttrProjectDomain(self):
|
|
creds = auth.KeystoneV3Credentials()
|
|
creds.project_domain_name = 'project_domain'
|
|
creds.domain_name = 'domain'
|
|
self.assertEqual('project_domain', creds.user_domain_name)
|
|
creds = auth.KeystoneV3Credentials()
|
|
creds.domain_name = 'domain'
|
|
creds.project_domain_name = 'project_domain'
|
|
self.assertEqual('project_domain', creds.project_domain_name)
|
|
|
|
def testProjectTenantNoCollision(self):
|
|
creds = auth.KeystoneV3Credentials(tenant_id='tenant')
|
|
self.assertEqual('tenant', creds.project_id)
|
|
creds = auth.KeystoneV3Credentials(project_id='project')
|
|
self.assertEqual('project', creds.tenant_id)
|
|
creds = auth.KeystoneV3Credentials(tenant_name='tenant')
|
|
self.assertEqual('tenant', creds.project_name)
|
|
creds = auth.KeystoneV3Credentials(project_name='project')
|
|
self.assertEqual('project', creds.tenant_name)
|
|
|
|
def testProjectTenantCollision(self):
|
|
attrs = {'tenant_id': 'tenant', 'project_id': 'project'}
|
|
self.assertRaises(
|
|
exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
|
|
attrs = {'tenant_name': 'tenant', 'project_name': 'project'}
|
|
self.assertRaises(
|
|
exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
|
|
|
|
|
|
class TestReplaceVersion(base.TestCase):
|
|
def test_version_no_trailing_path(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0',
|
|
auth.replace_version('http://localhost:35357/v3', 'v2.0'))
|
|
|
|
def test_version_no_trailing_path_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0/',
|
|
auth.replace_version('http://localhost:35357/v3/', 'v2.0'))
|
|
|
|
def test_version_trailing_path(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0/uuid',
|
|
auth.replace_version('http://localhost:35357/v3/uuid', 'v2.0'))
|
|
|
|
def test_version_trailing_path_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0/uuid/',
|
|
auth.replace_version('http://localhost:35357/v3/uuid/', 'v2.0'))
|
|
|
|
def test_no_version_base(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0',
|
|
auth.replace_version('http://localhost:35357', 'v2.0'))
|
|
|
|
def test_no_version_base_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost:35357/v2.0',
|
|
auth.replace_version('http://localhost:35357/', 'v2.0'))
|
|
|
|
def test_no_version_path(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0',
|
|
auth.replace_version('http://localhost/identity', 'v2.0'))
|
|
|
|
def test_no_version_path_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0',
|
|
auth.replace_version('http://localhost/identity/', 'v2.0'))
|
|
|
|
def test_path_version(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0',
|
|
auth.replace_version('http://localhost/identity/v3', 'v2.0'))
|
|
|
|
def test_path_version_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0/',
|
|
auth.replace_version('http://localhost/identity/v3/', 'v2.0'))
|
|
|
|
def test_path_version_trailing_path(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0/uuid',
|
|
auth.replace_version('http://localhost/identity/v3/uuid', 'v2.0'))
|
|
|
|
def test_path_version_trailing_path_solidus(self):
|
|
self.assertEqual(
|
|
'http://localhost/identity/v2.0/uuid/',
|
|
auth.replace_version('http://localhost/identity/v3/uuid/', 'v2.0'))
|
|
|
|
|
|
class TestKeystoneV3AuthProvider_DomainScope(BaseAuthTestsSetUp):
|
|
_endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
|
|
_auth_provider_class = auth.KeystoneV3AuthProvider
|
|
credentials = fake_credentials.FakeKeystoneV3Credentials()
|
|
|
|
def setUp(self):
|
|
super(TestKeystoneV3AuthProvider_DomainScope, self).setUp()
|
|
self.patchobject(v3_client.V3TokenClient, 'raw_request',
|
|
fake_identity._fake_v3_response_domain_scope)
|
|
|
|
def test_get_auth_with_domain_scope(self):
|
|
self.auth_provider.scope = 'domain'
|
|
_, auth_data = self.auth_provider.get_auth()
|
|
self.assertIn('domain', auth_data)
|
|
self.assertNotIn('project', auth_data)
|
|
|
|
|
|
class TestGetCredentials(base.TestCase):
|
|
|
|
def test_invalid_identity_version(self):
|
|
with testtools.ExpectedException(exceptions.InvalidIdentityVersion,
|
|
'.* v1 .*'):
|
|
auth.get_credentials('http://localhost/identity/v3',
|
|
identity_version='v1')
|