Fix namespace issues and remove tests for client-specific code
Client specific code has been removed, this cleans up the related lingering tests and fixes further keystoneclient namespace issues. Change-Id: I620865a71c216c4040956fa5dce9949654877526
This commit is contained in:
parent
3768b0537e
commit
faf52ea373
@ -23,7 +23,7 @@ from keystoneauth import exceptions
|
|||||||
# 'interface' it should return the initial URL that was passed to the plugin.
|
# 'interface' it should return the initial URL that was passed to the plugin.
|
||||||
AUTH_INTERFACE = object()
|
AUTH_INTERFACE = object()
|
||||||
|
|
||||||
PLUGIN_NAMESPACE = 'keystonauth.auth.plugin'
|
PLUGIN_NAMESPACE = 'keystoneauth.auth.plugin'
|
||||||
IDENTITY_AUTH_HEADER_NAME = 'X-Auth-Token'
|
IDENTITY_AUTH_HEADER_NAME = 'X-Auth-Token'
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ import uuid
|
|||||||
from keystoneauth import access
|
from keystoneauth import access
|
||||||
from keystoneauth.auth.identity import v3
|
from keystoneauth.auth.identity import v3
|
||||||
from keystoneauth.auth.identity.v3 import base as v3_base
|
from keystoneauth.auth.identity.v3 import base as v3_base
|
||||||
from keystoneauth import client
|
|
||||||
from keystoneauth import exceptions
|
from keystoneauth import exceptions
|
||||||
from keystoneauth import fixture
|
from keystoneauth import fixture
|
||||||
from keystoneauth import session
|
from keystoneauth import session
|
||||||
@ -199,29 +198,6 @@ class V3IdentityPlugin(utils.TestCase):
|
|||||||
self.assertRequestHeaderEqual('Accept', 'application/json')
|
self.assertRequestHeaderEqual('Accept', 'application/json')
|
||||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||||
|
|
||||||
def test_authenticate_with_username_password_unscoped(self):
|
|
||||||
del self.TEST_RESPONSE_DICT['token']['catalog']
|
|
||||||
del self.TEST_RESPONSE_DICT['token']['project']
|
|
||||||
|
|
||||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
|
||||||
self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE)
|
|
||||||
test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id']
|
|
||||||
self.stub_url(method="GET",
|
|
||||||
json=self.TEST_PROJECTS_RESPONSE,
|
|
||||||
parts=['users', test_user_id, 'projects'])
|
|
||||||
|
|
||||||
a = v3.Password(self.TEST_URL,
|
|
||||||
username=self.TEST_USER,
|
|
||||||
password=self.TEST_PASS)
|
|
||||||
s = session.Session(auth=a)
|
|
||||||
cs = client.Client(session=s, auth_url=self.TEST_URL)
|
|
||||||
|
|
||||||
# As a sanity check on the auth_ref, make sure client has the
|
|
||||||
# proper user id, that it fetches the right project response
|
|
||||||
self.assertEqual(test_user_id, a.auth_ref.user_id)
|
|
||||||
t = cs.projects.list(user=a.auth_ref.user_id)
|
|
||||||
self.assertEqual(2, len(t))
|
|
||||||
|
|
||||||
def test_authenticate_with_username_password_domain_scoped(self):
|
def test_authenticate_with_username_password_domain_scoped(self):
|
||||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||||
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
|
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
|
||||||
|
@ -11,22 +11,13 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import re
|
import re
|
||||||
import uuid
|
|
||||||
|
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
import six
|
import six
|
||||||
from testtools import matchers
|
|
||||||
|
|
||||||
from keystoneauth import _discover
|
from keystoneauth import _discover
|
||||||
from keystoneauth.auth import token_endpoint
|
|
||||||
from keystoneauth import client
|
|
||||||
from keystoneauth import discover
|
|
||||||
from keystoneauth import exceptions
|
|
||||||
from keystoneauth import fixture
|
from keystoneauth import fixture
|
||||||
from keystoneauth import session
|
|
||||||
from keystoneauth.tests.unit import utils
|
from keystoneauth.tests.unit import utils
|
||||||
from keystoneauth.v2_0 import client as v2_client
|
|
||||||
from keystoneauth.v3 import client as v3_client
|
|
||||||
|
|
||||||
|
|
||||||
BASE_HOST = 'http://keystone.example.com'
|
BASE_HOST = 'http://keystone.example.com'
|
||||||
@ -232,517 +223,6 @@ V3_VERSION_ENTRY = _create_single_version(V3_VERSION)
|
|||||||
V2_VERSION_ENTRY = _create_single_version(V2_VERSION)
|
V2_VERSION_ENTRY = _create_single_version(V2_VERSION)
|
||||||
|
|
||||||
|
|
||||||
class AvailableVersionsTests(utils.TestCase):
|
|
||||||
|
|
||||||
def test_available_versions_basics(self):
|
|
||||||
examples = {'keystone': V3_VERSION_LIST,
|
|
||||||
'cinder': jsonutils.dumps(CINDER_EXAMPLES),
|
|
||||||
'glance': jsonutils.dumps(GLANCE_EXAMPLES)}
|
|
||||||
|
|
||||||
for path, text in six.iteritems(examples):
|
|
||||||
url = "%s%s" % (BASE_URL, path)
|
|
||||||
|
|
||||||
self.requests_mock.get(url, status_code=300, text=text)
|
|
||||||
versions = discover.available_versions(url)
|
|
||||||
|
|
||||||
for v in versions:
|
|
||||||
for n in ('id', 'status', 'links'):
|
|
||||||
msg = '%s missing from %s version data' % (n, path)
|
|
||||||
self.assertThat(v, matchers.Annotate(msg,
|
|
||||||
matchers.Contains(n)))
|
|
||||||
|
|
||||||
def test_available_versions_individual(self):
|
|
||||||
self.requests_mock.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY)
|
|
||||||
|
|
||||||
versions = discover.available_versions(V3_URL)
|
|
||||||
|
|
||||||
for v in versions:
|
|
||||||
self.assertEqual(v['id'], 'v3.0')
|
|
||||||
self.assertEqual(v['status'], 'stable')
|
|
||||||
self.assertIn('media-types', v)
|
|
||||||
self.assertIn('links', v)
|
|
||||||
|
|
||||||
def test_available_keystone_data(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
versions = discover.available_versions(BASE_URL)
|
|
||||||
self.assertEqual(2, len(versions))
|
|
||||||
|
|
||||||
for v in versions:
|
|
||||||
self.assertIn(v['id'], ('v2.0', 'v3.0'))
|
|
||||||
self.assertEqual(v['updated'], UPDATED)
|
|
||||||
self.assertEqual(v['status'], 'stable')
|
|
||||||
|
|
||||||
if v['id'] == 'v3.0':
|
|
||||||
self.assertEqual(v['media-types'], V3_MEDIA_TYPES)
|
|
||||||
|
|
||||||
def test_available_cinder_data(self):
|
|
||||||
text = jsonutils.dumps(CINDER_EXAMPLES)
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=text)
|
|
||||||
|
|
||||||
versions = discover.available_versions(BASE_URL)
|
|
||||||
self.assertEqual(2, len(versions))
|
|
||||||
|
|
||||||
for v in versions:
|
|
||||||
self.assertEqual(v['status'], 'CURRENT')
|
|
||||||
if v['id'] == 'v1.0':
|
|
||||||
self.assertEqual(v['updated'], '2012-01-04T11:33:21Z')
|
|
||||||
elif v['id'] == 'v2.0':
|
|
||||||
self.assertEqual(v['updated'], '2012-11-21T11:33:21Z')
|
|
||||||
else:
|
|
||||||
self.fail("Invalid version found")
|
|
||||||
|
|
||||||
def test_available_glance_data(self):
|
|
||||||
text = jsonutils.dumps(GLANCE_EXAMPLES)
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=200, text=text)
|
|
||||||
|
|
||||||
versions = discover.available_versions(BASE_URL)
|
|
||||||
self.assertEqual(5, len(versions))
|
|
||||||
|
|
||||||
for v in versions:
|
|
||||||
if v['id'] in ('v2.2', 'v1.1'):
|
|
||||||
self.assertEqual(v['status'], 'CURRENT')
|
|
||||||
elif v['id'] in ('v2.1', 'v2.0', 'v1.0'):
|
|
||||||
self.assertEqual(v['status'], 'SUPPORTED')
|
|
||||||
else:
|
|
||||||
self.fail("Invalid version found")
|
|
||||||
|
|
||||||
|
|
||||||
class ClientDiscoveryTests(utils.TestCase):
|
|
||||||
|
|
||||||
def assertCreatesV3(self, **kwargs):
|
|
||||||
self.requests_mock.post('%s/auth/tokens' % V3_URL,
|
|
||||||
text=V3_AUTH_RESPONSE,
|
|
||||||
headers={'X-Subject-Token': V3_TOKEN})
|
|
||||||
|
|
||||||
kwargs.setdefault('username', 'foo')
|
|
||||||
kwargs.setdefault('password', 'bar')
|
|
||||||
keystone = client.Client(**kwargs)
|
|
||||||
self.assertIsInstance(keystone, v3_client.Client)
|
|
||||||
return keystone
|
|
||||||
|
|
||||||
def assertCreatesV2(self, **kwargs):
|
|
||||||
self.requests_mock.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
|
|
||||||
|
|
||||||
kwargs.setdefault('username', 'foo')
|
|
||||||
kwargs.setdefault('password', 'bar')
|
|
||||||
keystone = client.Client(**kwargs)
|
|
||||||
self.assertIsInstance(keystone, v2_client.Client)
|
|
||||||
return keystone
|
|
||||||
|
|
||||||
def assertVersionNotAvailable(self, **kwargs):
|
|
||||||
kwargs.setdefault('username', 'foo')
|
|
||||||
kwargs.setdefault('password', 'bar')
|
|
||||||
|
|
||||||
self.assertRaises(exceptions.VersionNotAvailable,
|
|
||||||
client.Client, **kwargs)
|
|
||||||
|
|
||||||
def assertDiscoveryFailure(self, **kwargs):
|
|
||||||
kwargs.setdefault('username', 'foo')
|
|
||||||
kwargs.setdefault('password', 'bar')
|
|
||||||
|
|
||||||
self.assertRaises(exceptions.DiscoveryFailure,
|
|
||||||
client.Client, **kwargs)
|
|
||||||
|
|
||||||
def test_discover_v3(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
self.assertCreatesV3(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_discover_v2(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
|
|
||||||
self.requests_mock.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
|
|
||||||
|
|
||||||
self.assertCreatesV2(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_discover_endpoint_v2(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
|
|
||||||
self.assertCreatesV2(endpoint=BASE_URL, token='fake-token')
|
|
||||||
|
|
||||||
def test_discover_endpoint_v3(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
self.assertCreatesV3(endpoint=BASE_URL, token='fake-token')
|
|
||||||
|
|
||||||
def test_discover_invalid_major_version(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
self.assertVersionNotAvailable(auth_url=BASE_URL, version=5)
|
|
||||||
|
|
||||||
def test_discover_200_response_fails(self):
|
|
||||||
self.requests_mock.get(BASE_URL, text='ok')
|
|
||||||
self.assertDiscoveryFailure(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_discover_minor_greater_than_available_fails(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4)
|
|
||||||
|
|
||||||
def test_discover_individual_version_v2(self):
|
|
||||||
self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
|
|
||||||
|
|
||||||
self.assertCreatesV2(auth_url=V2_URL)
|
|
||||||
|
|
||||||
def test_discover_individual_version_v3(self):
|
|
||||||
self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
|
|
||||||
|
|
||||||
self.assertCreatesV3(auth_url=V3_URL)
|
|
||||||
|
|
||||||
def test_discover_individual_endpoint_v2(self):
|
|
||||||
self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
|
|
||||||
self.assertCreatesV2(endpoint=V2_URL, token='fake-token')
|
|
||||||
|
|
||||||
def test_discover_individual_endpoint_v3(self):
|
|
||||||
self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
|
|
||||||
self.assertCreatesV3(endpoint=V3_URL, token='fake-token')
|
|
||||||
|
|
||||||
def test_discover_fail_to_create_bad_individual_version(self):
|
|
||||||
self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
|
|
||||||
self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
|
|
||||||
|
|
||||||
self.assertVersionNotAvailable(auth_url=V2_URL, version=3)
|
|
||||||
self.assertVersionNotAvailable(auth_url=V3_URL, version=2)
|
|
||||||
|
|
||||||
def test_discover_unstable_versions(self):
|
|
||||||
version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta')
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, json=version_list)
|
|
||||||
|
|
||||||
self.assertCreatesV2(auth_url=BASE_URL)
|
|
||||||
self.assertVersionNotAvailable(auth_url=BASE_URL, version=3)
|
|
||||||
self.assertCreatesV3(auth_url=BASE_URL, unstable=True)
|
|
||||||
|
|
||||||
def test_discover_forwards_original_ip(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
ip = '192.168.1.1'
|
|
||||||
self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip)
|
|
||||||
|
|
||||||
self.assertThat(self.requests_mock.last_request.headers['forwarded'],
|
|
||||||
matchers.Contains(ip))
|
|
||||||
|
|
||||||
def test_discover_bad_args(self):
|
|
||||||
self.assertRaises(exceptions.DiscoveryFailure,
|
|
||||||
client.Client)
|
|
||||||
|
|
||||||
def test_discover_bad_response(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, json={'FOO': 'BAR'})
|
|
||||||
self.assertDiscoveryFailure(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_discovery_ignore_invalid(self):
|
|
||||||
resp = [{'id': 'v3.0',
|
|
||||||
'links': [1, 2, 3, 4], # invalid links
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': 'stable',
|
|
||||||
'updated': UPDATED}]
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300,
|
|
||||||
text=_create_version_list(resp))
|
|
||||||
self.assertDiscoveryFailure(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_ignore_entry_without_links(self):
|
|
||||||
v3 = V3_VERSION.copy()
|
|
||||||
v3['links'] = []
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300,
|
|
||||||
text=_create_version_list([v3, V2_VERSION]))
|
|
||||||
self.assertCreatesV2(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_ignore_entry_without_status(self):
|
|
||||||
v3 = V3_VERSION.copy()
|
|
||||||
del v3['status']
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300,
|
|
||||||
text=_create_version_list([v3, V2_VERSION]))
|
|
||||||
self.assertCreatesV2(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_greater_version_than_required(self):
|
|
||||||
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6')
|
|
||||||
self.requests_mock.get(BASE_URL, json=versions)
|
|
||||||
self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4))
|
|
||||||
|
|
||||||
def test_lesser_version_than_required(self):
|
|
||||||
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4')
|
|
||||||
self.requests_mock.get(BASE_URL, json=versions)
|
|
||||||
self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6))
|
|
||||||
|
|
||||||
def test_bad_response(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text="Ugly Duckling")
|
|
||||||
self.assertDiscoveryFailure(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
def test_pass_client_arguments(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
|
|
||||||
kwargs = {'original_ip': '100', 'use_keyring': False,
|
|
||||||
'stale_duration': 15}
|
|
||||||
|
|
||||||
cl = self.assertCreatesV2(auth_url=BASE_URL, **kwargs)
|
|
||||||
|
|
||||||
self.assertEqual(cl.original_ip, '100')
|
|
||||||
self.assertEqual(cl.stale_duration, 15)
|
|
||||||
self.assertFalse(cl.use_keyring)
|
|
||||||
|
|
||||||
def test_overriding_stored_kwargs(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
self.requests_mock.post("%s/auth/tokens" % V3_URL,
|
|
||||||
text=V3_AUTH_RESPONSE,
|
|
||||||
headers={'X-Subject-Token': V3_TOKEN})
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL, debug=False,
|
|
||||||
username='foo')
|
|
||||||
client = disc.create_client(debug=True, password='bar')
|
|
||||||
|
|
||||||
self.assertIsInstance(client, v3_client.Client)
|
|
||||||
self.assertTrue(client.debug_log)
|
|
||||||
self.assertFalse(disc._client_kwargs['debug'])
|
|
||||||
self.assertEqual(client.username, 'foo')
|
|
||||||
self.assertEqual(client.password, 'bar')
|
|
||||||
|
|
||||||
def test_available_versions(self):
|
|
||||||
self.requests_mock.get(BASE_URL,
|
|
||||||
status_code=300,
|
|
||||||
text=V3_VERSION_ENTRY)
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
versions = disc.available_versions()
|
|
||||||
self.assertEqual(1, len(versions))
|
|
||||||
self.assertEqual(V3_VERSION, versions[0])
|
|
||||||
|
|
||||||
def test_unknown_client_version(self):
|
|
||||||
V4_VERSION = {'id': 'v4.0',
|
|
||||||
'links': [{'href': 'http://url', 'rel': 'self'}],
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': 'stable',
|
|
||||||
'updated': UPDATED}
|
|
||||||
versions = fixture.DiscoveryList()
|
|
||||||
versions.add_version(V4_VERSION)
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, json=versions)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
self.assertRaises(exceptions.DiscoveryFailure,
|
|
||||||
disc.create_client, version=4)
|
|
||||||
|
|
||||||
def test_discovery_fail_for_missing_v3(self):
|
|
||||||
versions = fixture.DiscoveryList(v2=True, v3=False)
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, json=versions)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
self.assertRaises(exceptions.DiscoveryFailure,
|
|
||||||
disc.create_client, version=(3, 0))
|
|
||||||
|
|
||||||
def _do_discovery_call(self, token=None, **kwargs):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
if not token:
|
|
||||||
token = uuid.uuid4().hex
|
|
||||||
|
|
||||||
url = 'http://testurl'
|
|
||||||
a = token_endpoint.Token(url, token)
|
|
||||||
s = session.Session(auth=a)
|
|
||||||
|
|
||||||
# will default to true as there is a plugin on the session
|
|
||||||
discover.Discover(s, auth_url=BASE_URL, **kwargs)
|
|
||||||
|
|
||||||
self.assertEqual(BASE_URL, self.requests_mock.last_request.url)
|
|
||||||
|
|
||||||
def test_setting_authenticated_true(self):
|
|
||||||
token = uuid.uuid4().hex
|
|
||||||
self._do_discovery_call(token)
|
|
||||||
self.assertRequestHeaderEqual('X-Auth-Token', token)
|
|
||||||
|
|
||||||
def test_setting_authenticated_false(self):
|
|
||||||
self._do_discovery_call(authenticated=False)
|
|
||||||
self.assertNotIn('X-Auth-Token',
|
|
||||||
self.requests_mock.last_request.headers)
|
|
||||||
|
|
||||||
|
|
||||||
class DiscoverQueryTests(utils.TestCase):
|
|
||||||
|
|
||||||
def test_available_keystone_data(self):
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
versions = disc.version_data()
|
|
||||||
|
|
||||||
self.assertEqual((2, 0), versions[0]['version'])
|
|
||||||
self.assertEqual('stable', versions[0]['raw_status'])
|
|
||||||
self.assertEqual(V2_URL, versions[0]['url'])
|
|
||||||
self.assertEqual((3, 0), versions[1]['version'])
|
|
||||||
self.assertEqual('stable', versions[1]['raw_status'])
|
|
||||||
self.assertEqual(V3_URL, versions[1]['url'])
|
|
||||||
|
|
||||||
version = disc.data_for('v3.0')
|
|
||||||
self.assertEqual((3, 0), version['version'])
|
|
||||||
self.assertEqual('stable', version['raw_status'])
|
|
||||||
self.assertEqual(V3_URL, version['url'])
|
|
||||||
|
|
||||||
version = disc.data_for(2)
|
|
||||||
self.assertEqual((2, 0), version['version'])
|
|
||||||
self.assertEqual('stable', version['raw_status'])
|
|
||||||
self.assertEqual(V2_URL, version['url'])
|
|
||||||
|
|
||||||
self.assertIsNone(disc.url_for('v4'))
|
|
||||||
self.assertEqual(V3_URL, disc.url_for('v3'))
|
|
||||||
self.assertEqual(V2_URL, disc.url_for('v2'))
|
|
||||||
|
|
||||||
def test_available_cinder_data(self):
|
|
||||||
text = jsonutils.dumps(CINDER_EXAMPLES)
|
|
||||||
self.requests_mock.get(BASE_URL, status_code=300, text=text)
|
|
||||||
|
|
||||||
v1_url = "%sv1/" % BASE_URL
|
|
||||||
v2_url = "%sv2/" % BASE_URL
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
versions = disc.version_data()
|
|
||||||
|
|
||||||
self.assertEqual((1, 0), versions[0]['version'])
|
|
||||||
self.assertEqual('CURRENT', versions[0]['raw_status'])
|
|
||||||
self.assertEqual(v1_url, versions[0]['url'])
|
|
||||||
self.assertEqual((2, 0), versions[1]['version'])
|
|
||||||
self.assertEqual('CURRENT', versions[1]['raw_status'])
|
|
||||||
self.assertEqual(v2_url, versions[1]['url'])
|
|
||||||
|
|
||||||
version = disc.data_for('v2.0')
|
|
||||||
self.assertEqual((2, 0), version['version'])
|
|
||||||
self.assertEqual('CURRENT', version['raw_status'])
|
|
||||||
self.assertEqual(v2_url, version['url'])
|
|
||||||
|
|
||||||
version = disc.data_for(1)
|
|
||||||
self.assertEqual((1, 0), version['version'])
|
|
||||||
self.assertEqual('CURRENT', version['raw_status'])
|
|
||||||
self.assertEqual(v1_url, version['url'])
|
|
||||||
|
|
||||||
self.assertIsNone(disc.url_for('v3'))
|
|
||||||
self.assertEqual(v2_url, disc.url_for('v2'))
|
|
||||||
self.assertEqual(v1_url, disc.url_for('v1'))
|
|
||||||
|
|
||||||
def test_available_glance_data(self):
|
|
||||||
text = jsonutils.dumps(GLANCE_EXAMPLES)
|
|
||||||
self.requests_mock.get(BASE_URL, text=text)
|
|
||||||
|
|
||||||
v1_url = "%sv1/" % BASE_URL
|
|
||||||
v2_url = "%sv2/" % BASE_URL
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
versions = disc.version_data()
|
|
||||||
|
|
||||||
self.assertEqual((1, 0), versions[0]['version'])
|
|
||||||
self.assertEqual('SUPPORTED', versions[0]['raw_status'])
|
|
||||||
self.assertEqual(v1_url, versions[0]['url'])
|
|
||||||
self.assertEqual((1, 1), versions[1]['version'])
|
|
||||||
self.assertEqual('CURRENT', versions[1]['raw_status'])
|
|
||||||
self.assertEqual(v1_url, versions[1]['url'])
|
|
||||||
self.assertEqual((2, 0), versions[2]['version'])
|
|
||||||
self.assertEqual('SUPPORTED', versions[2]['raw_status'])
|
|
||||||
self.assertEqual(v2_url, versions[2]['url'])
|
|
||||||
self.assertEqual((2, 1), versions[3]['version'])
|
|
||||||
self.assertEqual('SUPPORTED', versions[3]['raw_status'])
|
|
||||||
self.assertEqual(v2_url, versions[3]['url'])
|
|
||||||
self.assertEqual((2, 2), versions[4]['version'])
|
|
||||||
self.assertEqual('CURRENT', versions[4]['raw_status'])
|
|
||||||
self.assertEqual(v2_url, versions[4]['url'])
|
|
||||||
|
|
||||||
for ver in (2, 2.1, 2.2):
|
|
||||||
version = disc.data_for(ver)
|
|
||||||
self.assertEqual((2, 2), version['version'])
|
|
||||||
self.assertEqual('CURRENT', version['raw_status'])
|
|
||||||
self.assertEqual(v2_url, version['url'])
|
|
||||||
self.assertEqual(v2_url, disc.url_for(ver))
|
|
||||||
|
|
||||||
for ver in (1, 1.1):
|
|
||||||
version = disc.data_for(ver)
|
|
||||||
self.assertEqual((1, 1), version['version'])
|
|
||||||
self.assertEqual('CURRENT', version['raw_status'])
|
|
||||||
self.assertEqual(v1_url, version['url'])
|
|
||||||
self.assertEqual(v1_url, disc.url_for(ver))
|
|
||||||
|
|
||||||
self.assertIsNone(disc.url_for('v3'))
|
|
||||||
self.assertIsNone(disc.url_for('v2.3'))
|
|
||||||
|
|
||||||
def test_allow_deprecated(self):
|
|
||||||
status = 'deprecated'
|
|
||||||
version_list = [{'id': 'v3.0',
|
|
||||||
'links': [{'href': V3_URL, 'rel': 'self'}],
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': status,
|
|
||||||
'updated': UPDATED}]
|
|
||||||
text = jsonutils.dumps({'versions': version_list})
|
|
||||||
self.requests_mock.get(BASE_URL, text=text)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
# deprecated is allowed by default
|
|
||||||
versions = disc.version_data(allow_deprecated=False)
|
|
||||||
self.assertEqual(0, len(versions))
|
|
||||||
|
|
||||||
versions = disc.version_data(allow_deprecated=True)
|
|
||||||
self.assertEqual(1, len(versions))
|
|
||||||
self.assertEqual(status, versions[0]['raw_status'])
|
|
||||||
self.assertEqual(V3_URL, versions[0]['url'])
|
|
||||||
self.assertEqual((3, 0), versions[0]['version'])
|
|
||||||
|
|
||||||
def test_allow_experimental(self):
|
|
||||||
status = 'experimental'
|
|
||||||
version_list = [{'id': 'v3.0',
|
|
||||||
'links': [{'href': V3_URL, 'rel': 'self'}],
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': status,
|
|
||||||
'updated': UPDATED}]
|
|
||||||
text = jsonutils.dumps({'versions': version_list})
|
|
||||||
self.requests_mock.get(BASE_URL, text=text)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
versions = disc.version_data()
|
|
||||||
self.assertEqual(0, len(versions))
|
|
||||||
|
|
||||||
versions = disc.version_data(allow_experimental=True)
|
|
||||||
self.assertEqual(1, len(versions))
|
|
||||||
self.assertEqual(status, versions[0]['raw_status'])
|
|
||||||
self.assertEqual(V3_URL, versions[0]['url'])
|
|
||||||
self.assertEqual((3, 0), versions[0]['version'])
|
|
||||||
|
|
||||||
def test_allow_unknown(self):
|
|
||||||
status = 'abcdef'
|
|
||||||
version_list = fixture.DiscoveryList(BASE_URL, v2=False,
|
|
||||||
v3_status=status)
|
|
||||||
self.requests_mock.get(BASE_URL, json=version_list)
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
versions = disc.version_data()
|
|
||||||
self.assertEqual(0, len(versions))
|
|
||||||
|
|
||||||
versions = disc.version_data(allow_unknown=True)
|
|
||||||
self.assertEqual(1, len(versions))
|
|
||||||
self.assertEqual(status, versions[0]['raw_status'])
|
|
||||||
self.assertEqual(V3_URL, versions[0]['url'])
|
|
||||||
self.assertEqual((3, 0), versions[0]['version'])
|
|
||||||
|
|
||||||
def test_ignoring_invalid_lnks(self):
|
|
||||||
version_list = [{'id': 'v3.0',
|
|
||||||
'links': [{'href': V3_URL, 'rel': 'self'}],
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': 'stable',
|
|
||||||
'updated': UPDATED},
|
|
||||||
{'id': 'v3.1',
|
|
||||||
'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': 'stable',
|
|
||||||
'updated': UPDATED},
|
|
||||||
{'media-types': V3_MEDIA_TYPES,
|
|
||||||
'status': 'stable',
|
|
||||||
'updated': UPDATED,
|
|
||||||
'links': [{'href': V3_URL, 'rel': 'self'}],
|
|
||||||
}]
|
|
||||||
|
|
||||||
text = jsonutils.dumps({'versions': version_list})
|
|
||||||
self.requests_mock.get(BASE_URL, text=text)
|
|
||||||
|
|
||||||
disc = discover.Discover(auth_url=BASE_URL)
|
|
||||||
|
|
||||||
# raw_version_data will return all choices, even invalid ones
|
|
||||||
versions = disc.raw_version_data()
|
|
||||||
self.assertEqual(3, len(versions))
|
|
||||||
|
|
||||||
# only the version with both id and links will be actually returned
|
|
||||||
versions = disc.version_data()
|
|
||||||
self.assertEqual(1, len(versions))
|
|
||||||
|
|
||||||
|
|
||||||
class CatalogHackTests(utils.TestCase):
|
class CatalogHackTests(utils.TestCase):
|
||||||
|
|
||||||
TEST_URL = 'http://keystone.server:5000/v2.0'
|
TEST_URL = 'http://keystone.server:5000/v2.0'
|
||||||
|
@ -17,7 +17,7 @@ import pep8
|
|||||||
import testtools
|
import testtools
|
||||||
|
|
||||||
from keystoneauth.hacking import checks
|
from keystoneauth.hacking import checks
|
||||||
from keystoneauth.tests.unit import client_fixtures
|
from keystoneauth.tests.unit import keystoneauth_fixtures
|
||||||
|
|
||||||
|
|
||||||
class TestCheckOsloNamespaceImports(testtools.TestCase):
|
class TestCheckOsloNamespaceImports(testtools.TestCase):
|
||||||
@ -41,7 +41,7 @@ class TestCheckOsloNamespaceImports(testtools.TestCase):
|
|||||||
self.assertEqual(expected_errors or [], actual_errors)
|
self.assertEqual(expected_errors or [], actual_errors)
|
||||||
|
|
||||||
def test(self):
|
def test(self):
|
||||||
code_ex = self.useFixture(client_fixtures.HackingCode())
|
code_ex = self.useFixture(keystoneauth_fixtures.HackingCode())
|
||||||
code = code_ex.oslo_namespace_imports['code']
|
code = code_ex.oslo_namespace_imports['code']
|
||||||
errors = code_ex.oslo_namespace_imports['expected_errors']
|
errors = code_ex.oslo_namespace_imports['expected_errors']
|
||||||
self.assert_has_errors(code, expected_errors=errors)
|
self.assert_has_errors(code, expected_errors=errors)
|
||||||
|
Loading…
Reference in New Issue
Block a user