diff --git a/keystoneclient/tests/unit/auth/__init__.py b/keystoneclient/tests/unit/auth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/keystoneclient/tests/unit/auth/test_access.py b/keystoneclient/tests/unit/auth/test_access.py new file mode 100644 index 0000000..405fb8b --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_access.py @@ -0,0 +1,61 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient import access +from keystoneclient import auth +from keystoneclient.auth.identity import access as access_plugin +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class AccessInfoPluginTests(utils.TestCase): + + def setUp(self): + super(AccessInfoPluginTests, self).setUp() + self.session = session.Session() + self.auth_token = uuid.uuid4().hex + + def _plugin(self, **kwargs): + token = fixture.V3Token() + s = token.add_service('identity') + s.add_standard_endpoints(public=self.TEST_ROOT_URL) + + auth_ref = access.AccessInfo.factory(body=token, + auth_token=self.auth_token) + return access_plugin.AccessInfoPlugin(auth_ref, **kwargs) + + def test_auth_ref(self): + plugin = self._plugin() + self.assertEqual(self.TEST_ROOT_URL, + plugin.get_endpoint(self.session, + service_type='identity', + interface='public')) + self.assertEqual(self.auth_token, plugin.get_token(session)) + + def test_auth_url(self): + auth_url = 'http://keystone.test.url' + plugin = self._plugin(auth_url=auth_url) + + self.assertEqual(auth_url, + plugin.get_endpoint(self.session, + interface=auth.AUTH_INTERFACE)) + + def test_invalidate(self): + plugin = self._plugin() + auth_ref = plugin.auth_ref + + self.assertIsInstance(auth_ref, access.AccessInfo) + self.assertFalse(plugin.invalidate()) + self.assertIs(auth_ref, plugin.auth_ref) diff --git a/keystoneclient/tests/unit/auth/test_cli.py b/keystoneclient/tests/unit/auth/test_cli.py new file mode 100644 index 0000000..d65de73 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_cli.py @@ -0,0 +1,196 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import uuid + +import fixtures +import mock +from oslo_config import cfg + +from keystoneclient.auth import base +from keystoneclient.auth import cli +from keystoneclient.tests.unit.auth import utils + + +class TesterPlugin(base.BaseAuthPlugin): + + def get_token(self, *args, **kwargs): + return None + + @classmethod + def get_options(cls): + # NOTE(jamielennox): this is kind of horrible. If you specify this as + # a deprecated_name= value it will convert - to _ which is not what we + # want for a CLI option. + deprecated = [cfg.DeprecatedOpt('test-other')] + return [ + cfg.StrOpt('test-opt', help='tester', deprecated_opts=deprecated) + ] + + +class CliTests(utils.TestCase): + + def setUp(self): + super(CliTests, self).setUp() + self.p = argparse.ArgumentParser() + + def env(self, name, value=None): + if value is not None: + # environment variables are always strings + value = str(value) + + return self.useFixture(fixtures.EnvironmentVariable(name, value)) + + def test_creating_with_no_args(self): + ret = cli.register_argparse_arguments(self.p, []) + self.assertIsNone(ret) + self.assertIn('--os-auth-plugin', self.p.format_usage()) + + def test_load_with_nothing(self): + cli.register_argparse_arguments(self.p, []) + opts = self.p.parse_args([]) + self.assertIsNone(cli.load_from_argparse_arguments(opts)) + + @utils.mock_plugin + def test_basic_params_added(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + ret = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, ret) + + for n in ('--os-a-int', '--os-a-bool', '--os-a-float'): + self.assertIn(n, self.p.format_usage()) + + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_param_loading(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name, + '--os-a-int', str(self.a_int), + '--os-a-float', str(self.a_float), + '--os-a-bool', str(self.a_bool)] + + klass = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, klass) + + opts = self.p.parse_args(argv) + self.assertEqual(name, opts.os_auth_plugin) + + a = cli.load_from_argparse_arguments(opts) + self.assertTestVals(a) + + self.assertEqual(name, opts.os_auth_plugin) + self.assertEqual(str(self.a_int), opts.os_a_int) + self.assertEqual(str(self.a_float), opts.os_a_float) + self.assertEqual(str(self.a_bool), opts.os_a_bool) + + @utils.mock_plugin + def test_default_options(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name, + '--os-a-float', str(self.a_float)] + + klass = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, klass) + + opts = self.p.parse_args(argv) + self.assertEqual(name, opts.os_auth_plugin) + + a = cli.load_from_argparse_arguments(opts) + + self.assertEqual(self.a_float, a['a_float']) + self.assertEqual(3, a['a_int']) + + @utils.mock_plugin + def test_with_default_string_value(self, m): + name = uuid.uuid4().hex + klass = cli.register_argparse_arguments(self.p, [], default=name) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_overrides_default_string_value(self, m): + name = uuid.uuid4().hex + default = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + klass = cli.register_argparse_arguments(self.p, argv, default=default) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_with_default_type_value(self, m): + klass = cli.register_argparse_arguments(self.p, [], + default=utils.MockPlugin) + self.assertIs(utils.MockPlugin, klass) + self.assertEqual(0, m.call_count) + + @utils.mock_plugin + def test_overrides_default_type_value(self, m): + # using this test plugin would fail if called because there + # is no get_options() function + class TestPlugin(object): + pass + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + klass = cli.register_argparse_arguments(self.p, argv, + default=TestPlugin) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_env_overrides_default_opt(self, m): + name = uuid.uuid4().hex + val = uuid.uuid4().hex + self.env('OS_A_STR', val) + + klass = cli.register_argparse_arguments(self.p, [], default=name) + opts = self.p.parse_args([]) + a = klass.load_from_argparse_arguments(opts) + + self.assertEqual(val, a['a_str']) + + def test_deprecated_cli_options(self): + TesterPlugin.register_argparse_arguments(self.p) + val = uuid.uuid4().hex + opts = self.p.parse_args(['--os-test-other', val]) + self.assertEqual(val, opts.os_test_opt) + + def test_deprecated_multi_cli_options(self): + TesterPlugin.register_argparse_arguments(self.p) + val1 = uuid.uuid4().hex + val2 = uuid.uuid4().hex + # argarse rules say that the last specified wins. + opts = self.p.parse_args(['--os-test-other', val2, + '--os-test-opt', val1]) + self.assertEqual(val1, opts.os_test_opt) + + def test_deprecated_env_options(self): + val = uuid.uuid4().hex + + with mock.patch.dict('os.environ', {'OS_TEST_OTHER': val}): + TesterPlugin.register_argparse_arguments(self.p) + + opts = self.p.parse_args([]) + self.assertEqual(val, opts.os_test_opt) + + def test_deprecated_env_multi_options(self): + val1 = uuid.uuid4().hex + val2 = uuid.uuid4().hex + + with mock.patch.dict('os.environ', {'OS_TEST_OPT': val1, + 'OS_TEST_OTHER': val2}): + TesterPlugin.register_argparse_arguments(self.p) + + opts = self.p.parse_args([]) + self.assertEqual(val1, opts.os_test_opt) diff --git a/keystoneclient/tests/unit/auth/test_conf.py b/keystoneclient/tests/unit/auth/test_conf.py new file mode 100644 index 0000000..c3ce8eb --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_conf.py @@ -0,0 +1,177 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config +import stevedore + +from keystoneclient.auth import base +from keystoneclient.auth import conf +from keystoneclient.auth.identity import v2 as v2_auth +from keystoneclient.auth.identity import v3 as v3_auth +from keystoneclient import exceptions +from keystoneclient.tests.unit.auth import utils + + +class ConfTests(utils.TestCase): + + def setUp(self): + super(ConfTests, self).setUp() + self.conf_fixture = self.useFixture(config.Config()) + + # NOTE(jamielennox): we register the basic config options first because + # we need them in place before we can stub them. We will need to run + # the register again after we stub the auth section and auth plugin so + # it can load the plugin specific options. + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + def test_loading_v2(self): + section = uuid.uuid4().hex + username = uuid.uuid4().hex + password = uuid.uuid4().hex + trust_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(v2_auth.Password.get_options(), + group=section) + + self.conf_fixture.config(auth_plugin=self.V2PASS, + username=username, + password=password, + trust_id=trust_id, + tenant_id=tenant_id, + group=section) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + + self.assertEqual(username, a.username) + self.assertEqual(password, a.password) + self.assertEqual(trust_id, a.trust_id) + self.assertEqual(tenant_id, a.tenant_id) + + def test_loading_v3(self): + section = uuid.uuid4().hex + token = uuid.uuid4().hex + trust_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + project_domain_name = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(v3_auth.Token.get_options(), + group=section) + + self.conf_fixture.config(auth_plugin=self.V3TOKEN, + token=token, + trust_id=trust_id, + project_id=project_id, + project_domain_name=project_domain_name, + group=section) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + + self.assertEqual(token, a.auth_methods[0].token) + self.assertEqual(trust_id, a.trust_id) + self.assertEqual(project_id, a.project_id) + self.assertEqual(project_domain_name, a.project_domain_name) + + def test_loading_invalid_plugin(self): + auth_plugin = uuid.uuid4().hex + self.conf_fixture.config(auth_plugin=auth_plugin, + group=self.GROUP) + + e = self.assertRaises(exceptions.NoMatchingPlugin, + conf.load_from_conf_options, + self.conf_fixture.conf, + self.GROUP) + + self.assertEqual(auth_plugin, e.name) + + def test_loading_with_no_data(self): + self.assertIsNone(conf.load_from_conf_options(self.conf_fixture.conf, + self.GROUP)) + + @mock.patch('stevedore.DriverManager') + def test_other_params(self, m): + m.return_value = utils.MockManager(utils.MockPlugin) + driver_name = uuid.uuid4().hex + + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=self.GROUP) + self.conf_fixture.config(auth_plugin=driver_name, + group=self.GROUP, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + m.assert_called_once_with(namespace=base.PLUGIN_NAMESPACE, + name=driver_name, + invoke_on_load=False) + + @utils.mock_plugin + def test_same_section(self, m): + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + self.conf_fixture.config(auth_plugin=uuid.uuid4().hex, + group=self.GROUP, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + @utils.mock_plugin + def test_diff_section(self, m): + section = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=section) + self.conf_fixture.config(group=section, + auth_plugin=uuid.uuid4().hex, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + def test_plugins_are_all_opts(self): + manager = stevedore.ExtensionManager(base.PLUGIN_NAMESPACE, + invoke_on_load=False, + propagate_map_exceptions=True) + + def inner(driver): + for p in driver.plugin.get_options(): + self.assertIsInstance(p, cfg.Opt) + + manager.map(inner) + + def test_get_common(self): + opts = conf.get_common_conf_options() + for opt in opts: + self.assertIsInstance(opt, cfg.Opt) + self.assertEqual(2, len(opts)) + + def test_get_named(self): + loaded_opts = conf.get_plugin_options('v2password') + plugin_opts = v2_auth.Password.get_options() + + self.assertEqual(plugin_opts, loaded_opts) diff --git a/keystoneclient/tests/unit/auth/test_identity_common.py b/keystoneclient/tests/unit/auth/test_identity_common.py new file mode 100644 index 0000000..db30bea --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_common.py @@ -0,0 +1,422 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import datetime +import uuid + +from oslo_utils import timeutils +import six + +from keystoneclient import access +from keystoneclient.auth import base +from keystoneclient.auth import identity +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +@six.add_metaclass(abc.ABCMeta) +class CommonIdentityTests(object): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + + TEST_COMPUTE_PUBLIC = 'http://nova/novapi/public' + TEST_COMPUTE_INTERNAL = 'http://nova/novapi/internal' + TEST_COMPUTE_ADMIN = 'http://nova/novapi/admin' + + TEST_PASS = uuid.uuid4().hex + + def setUp(self): + super(CommonIdentityTests, self).setUp() + + self.TEST_URL = '%s%s' % (self.TEST_ROOT_URL, self.version) + self.TEST_ADMIN_URL = '%s%s' % (self.TEST_ROOT_ADMIN_URL, self.version) + self.TEST_DISCOVERY = fixture.DiscoveryList(href=self.TEST_ROOT_URL) + + self.stub_auth_data() + + @abc.abstractmethod + def create_auth_plugin(self, **kwargs): + """Create an auth plugin that makes sense for the auth data. + + It doesn't really matter what auth mechanism is used but it should be + appropriate to the API version. + """ + + @abc.abstractmethod + def get_auth_data(self, **kwargs): + """Return fake authentication data. + + This should register a valid token response and ensure that the compute + endpoints are set to TEST_COMPUTE_PUBLIC, _INTERNAL and _ADMIN. + """ + + def stub_auth_data(self, **kwargs): + token = self.get_auth_data(**kwargs) + self.user_id = token.user_id + + try: + self.project_id = token.project_id + except AttributeError: + self.project_id = token.tenant_id + + self.stub_auth(json=token) + + @abc.abstractproperty + def version(self): + """The API version being tested.""" + + def test_discovering(self): + self.stub_url('GET', [], + base_url=self.TEST_COMPUTE_ADMIN, + json=self.TEST_DISCOVERY) + + body = 'SUCCESS' + + # which gives our sample values + self.stub_url('GET', ['path'], text=body) + + a = self.create_auth_plugin() + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + new_body = 'SC SUCCESS' + # if we don't specify a version, we use the URL from the SC + self.stub_url('GET', ['path'], + base_url=self.TEST_COMPUTE_ADMIN, + text=new_body) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin'}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(new_body, resp.text) + + def test_discovery_uses_session_cache(self): + # register responses such that if the discovery URL is hit more than + # once then the response will be invalid and not point to COMPUTE_ADMIN + resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] + self.requests.get(self.TEST_COMPUTE_ADMIN, resps) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], text=body) + + # now either of the two plugins I use, it should not cause a second + # request to the discovery url. + s = session.Session() + a = self.create_auth_plugin() + b = self.create_auth_plugin() + + for auth in (a, b): + resp = s.get('/path', + auth=auth, + endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_discovery_uses_plugin_cache(self): + # register responses such that if the discovery URL is hit more than + # once then the response will be invalid and not point to COMPUTE_ADMIN + resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] + self.requests.get(self.TEST_COMPUTE_ADMIN, resps) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], text=body) + + # now either of the two sessions I use, it should not cause a second + # request to the discovery url. + sa = session.Session() + sb = session.Session() + auth = self.create_auth_plugin() + + for sess in (sa, sb): + resp = sess.get('/path', + auth=auth, + endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_discovering_with_no_data(self): + # which returns discovery information pointing to TEST_URL but there is + # no data there. + self.stub_url('GET', [], + base_url=self.TEST_COMPUTE_ADMIN, + status_code=400) + + # so the url that will be used is the same TEST_COMPUTE_ADMIN + body = 'SUCCESS' + self.stub_url('GET', ['path'], base_url=self.TEST_COMPUTE_ADMIN, + text=body, status_code=200) + + a = self.create_auth_plugin() + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_asking_for_auth_endpoint_ignores_checks(self): + a = self.create_auth_plugin() + s = session.Session(auth=a) + + auth_url = s.get_endpoint(service_type='compute', + interface=base.AUTH_INTERFACE) + + self.assertEqual(self.TEST_URL, auth_url) + + def _create_expired_auth_plugin(self, **kwargs): + expires = timeutils.utcnow() - datetime.timedelta(minutes=20) + expired_token = self.get_auth_data(expires=expires) + expired_auth_ref = access.AccessInfo.factory(body=expired_token) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], + base_url=self.TEST_COMPUTE_ADMIN, text=body) + + a = self.create_auth_plugin(**kwargs) + a.auth_ref = expired_auth_ref + return a + + def test_reauthenticate(self): + a = self._create_expired_auth_plugin() + expired_auth_ref = a.auth_ref + s = session.Session(auth=a) + self.assertIsNot(expired_auth_ref, a.get_access(s)) + + def test_no_reauthenticate(self): + a = self._create_expired_auth_plugin(reauthenticate=False) + expired_auth_ref = a.auth_ref + s = session.Session(auth=a) + self.assertIs(expired_auth_ref, a.get_access(s)) + + def test_invalidate(self): + a = self.create_auth_plugin() + s = session.Session(auth=a) + + # trigger token fetching + s.get_auth_headers() + + self.assertTrue(a.auth_ref) + self.assertTrue(a.invalidate()) + self.assertIsNone(a.auth_ref) + self.assertFalse(a.invalidate()) + + def test_get_auth_properties(self): + a = self.create_auth_plugin() + s = session.Session() + + self.assertEqual(self.user_id, a.get_user_id(s)) + self.assertEqual(self.project_id, a.get_project_id(s)) + + +class V3(CommonIdentityTests, utils.TestCase): + + @property + def version(self): + return 'v3' + + def get_auth_data(self, **kwargs): + token = fixture.V3Token(**kwargs) + region = 'RegionOne' + + svc = token.add_service('identity') + svc.add_standard_endpoints(admin=self.TEST_ADMIN_URL, region=region) + + svc = token.add_service('compute') + svc.add_standard_endpoints(admin=self.TEST_COMPUTE_ADMIN, + public=self.TEST_COMPUTE_PUBLIC, + internal=self.TEST_COMPUTE_INTERNAL, + region=region) + + return token + + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + kwargs.setdefault('headers', {})['X-Subject-Token'] = subject_token + self.stub_url('POST', ['auth', 'tokens'], **kwargs) + + def create_auth_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + kwargs.setdefault('username', self.TEST_USER) + kwargs.setdefault('password', self.TEST_PASS) + return identity.V3Password(**kwargs) + + +class V2(CommonIdentityTests, utils.TestCase): + + @property + def version(self): + return 'v2.0' + + def create_auth_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + kwargs.setdefault('username', self.TEST_USER) + kwargs.setdefault('password', self.TEST_PASS) + return identity.V2Password(**kwargs) + + def get_auth_data(self, **kwargs): + token = fixture.V2Token(**kwargs) + region = 'RegionOne' + + svc = token.add_service('identity') + svc.add_endpoint(self.TEST_ADMIN_URL, region=region) + + svc = token.add_service('compute') + svc.add_endpoint(public=self.TEST_COMPUTE_PUBLIC, + internal=self.TEST_COMPUTE_INTERNAL, + admin=self.TEST_COMPUTE_ADMIN, + region=region) + + return token + + def stub_auth(self, **kwargs): + self.stub_url('POST', ['tokens'], **kwargs) + + +class CatalogHackTests(utils.TestCase): + + TEST_URL = 'http://keystone.server:5000/v2.0' + OTHER_URL = 'http://other.server:5000/path' + + IDENTITY = 'identity' + + BASE_URL = 'http://keystone.server:5000/' + V2_URL = BASE_URL + 'v2.0' + V3_URL = BASE_URL + 'v3' + + def test_getting_endpoints(self): + disc = fixture.DiscoveryList(href=self.BASE_URL) + self.stub_url('GET', + ['/'], + base_url=self.BASE_URL, + json=disc) + + token = fixture.V2Token() + service = token.add_service(self.IDENTITY) + service.add_endpoint(public=self.V2_URL, + admin=self.V2_URL, + internal=self.V2_URL) + + self.stub_url('POST', + ['tokens'], + base_url=self.V2_URL, + json=token) + + v2_auth = identity.V2Password(self.V2_URL, + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + sess = session.Session(auth=v2_auth) + + endpoint = sess.get_endpoint(service_type=self.IDENTITY, + interface='public', + version=(3, 0)) + + self.assertEqual(self.V3_URL, endpoint) + + def test_returns_original_when_discover_fails(self): + token = fixture.V2Token() + service = token.add_service(self.IDENTITY) + service.add_endpoint(public=self.V2_URL, + admin=self.V2_URL, + internal=self.V2_URL) + + self.stub_url('POST', + ['tokens'], + base_url=self.V2_URL, + json=token) + + self.stub_url('GET', [], base_url=self.BASE_URL, status_code=404) + + v2_auth = identity.V2Password(self.V2_URL, + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + sess = session.Session(auth=v2_auth) + + endpoint = sess.get_endpoint(service_type=self.IDENTITY, + interface='public', + version=(3, 0)) + + self.assertEqual(self.V2_URL, endpoint) + + +class GenericPlugin(base.BaseAuthPlugin): + + BAD_TOKEN = uuid.uuid4().hex + + def __init__(self): + super(GenericPlugin, self).__init__() + + self.endpoint = 'http://keystone.host:5000' + + self.headers = {'headerA': 'valueA', + 'headerB': 'valueB'} + + def url(self, prefix): + return '%s/%s' % (self.endpoint, prefix) + + def get_token(self, session, **kwargs): + # NOTE(jamielennox): by specifying get_headers this should not be used + return self.BAD_TOKEN + + def get_headers(self, session, **kwargs): + return self.headers + + def get_endpoint(self, session, **kwargs): + return self.endpoint + + +class GenericAuthPluginTests(utils.TestCase): + + # filter doesn't matter to GenericPlugin, but we have to specify one + ENDPOINT_FILTER = {uuid.uuid4().hex: uuid.uuid4().hex} + + def setUp(self): + super(GenericAuthPluginTests, self).setUp() + self.auth = GenericPlugin() + self.session = session.Session(auth=self.auth) + + def test_setting_headers(self): + text = uuid.uuid4().hex + self.stub_url('GET', base_url=self.auth.url('prefix'), text=text) + + resp = self.session.get('prefix', endpoint_filter=self.ENDPOINT_FILTER) + + self.assertEqual(text, resp.text) + + for k, v in six.iteritems(self.auth.headers): + self.assertRequestHeaderEqual(k, v) + + self.assertIsNone(self.session.get_token()) + self.assertEqual(self.auth.headers, + self.session.get_auth_headers()) + self.assertNotIn('X-Auth-Token', self.requests.last_request.headers) diff --git a/keystoneclient/tests/unit/auth/test_identity_v2.py b/keystoneclient/tests/unit/auth/test_identity_v2.py new file mode 100644 index 0000000..6d432a7 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_v2.py @@ -0,0 +1,295 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystoneclient.auth.identity import v2 +from keystoneclient import exceptions +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class V2IdentityPlugin(utils.TestCase): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') + + TEST_PASS = 'password' + + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "adminURL": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:8774/v1.0", + "publicURL": "http://cdn.admin-nets.local:8774/v1.0/" + }], + "type": "nova_compat", + "name": "nova_compat" + }, { + "endpoints": [{ + "adminURL": "http://nova/novapi/admin", + "region": "RegionOne", + "internalURL": "http://nova/novapi/internal", + "publicURL": "http://nova/novapi/public" + }], + "type": "compute", + "name": "nova" + }, { + "endpoints": [{ + "adminURL": "http://glance/glanceapi/admin", + "region": "RegionOne", + "internalURL": "http://glance/glanceapi/internal", + "publicURL": "http://glance/glanceapi/public" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "adminURL": TEST_ADMIN_URL, + "region": "RegionOne", + "internalURL": "http://127.0.0.1:5000/v2.0", + "publicURL": "http://127.0.0.1:5000/v2.0" + }], + "type": "identity", + "name": "keystone" + }, { + "endpoints": [{ + "adminURL": "http://swift/swiftapi/admin", + "region": "RegionOne", + "internalURL": "http://swift/swiftapi/internal", + "publicURL": "http://swift/swiftapi/public" + }], + "type": "object-store", + "name": "swift" + }] + + def setUp(self): + super(V2IdentityPlugin, self).setUp() + self.TEST_RESPONSE_DICT = { + "access": { + "token": { + "expires": "2020-01-01T00:00:10.000123Z", + "id": self.TEST_TOKEN, + "tenant": { + "id": self.TEST_TENANT_ID + }, + }, + "user": { + "id": self.TEST_USER + }, + "serviceCatalog": self.TEST_SERVICE_CATALOG, + }, + } + + def stub_auth(self, **kwargs): + self.stub_url('POST', ['tokens'], **kwargs) + + def test_authenticate_with_username_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + self.assertIsNone(a.user_id) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_user_id_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, user_id=self.TEST_USER, + password=self.TEST_PASS) + self.assertIsNone(a.username) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER, + 'password': self.TEST_PASS}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID) + self.assertIsNone(a.user_id) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}, + 'tenantId': self.TEST_TENANT_ID}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_user_id_password_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, user_id=self.TEST_USER, + password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID) + self.assertIsNone(a.username) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER, + 'password': self.TEST_PASS}, + 'tenantId': self.TEST_TENANT_ID}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_token(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Token(self.TEST_URL, 'foo') + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'token': {'id': 'foo'}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('x-Auth-Token', 'foo') + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_trust_id(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, trust_id='trust') + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}, + 'trust_id': 'trust'}} + + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def _do_service_url_test(self, base_url, endpoint_filter): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', ['path'], + base_url=base_url, + text='SUCCESS', status_code=200) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter=endpoint_filter) + + self.assertEqual(resp.status_code, 200) + self.assertEqual(self.requests.last_request.url, base_url + '/path') + + def test_service_url(self): + endpoint_filter = {'service_type': 'compute', + 'interface': 'admin', + 'service_name': 'nova'} + self._do_service_url_test('http://nova/novapi/admin', endpoint_filter) + + def test_service_url_defaults_to_public(self): + endpoint_filter = {'service_type': 'compute'} + self._do_service_url_test('http://nova/novapi/public', endpoint_filter) + + def test_endpoint_filter_without_service_type_fails(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.EndpointNotFound, s.get, '/path', + endpoint_filter={'interface': 'admin'}) + + def test_full_url_overrides_endpoint_filter(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', [], + base_url='http://testurl/', + text='SUCCESS', status_code=200) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('http://testurl/', + endpoint_filter={'service_type': 'compute'}) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, 'SUCCESS') + + def test_invalid_auth_response_dict(self): + self.stub_auth(json={'hello': 'world'}) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalid_auth_response_type(self): + self.stub_url('POST', ['tokens'], text='testdata') + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalidate_response(self): + resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT) + resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT) + + resp_data1['access']['token']['id'] = 'token1' + resp_data2['access']['token']['id'] = 'token2' + + auth_responses = [{'json': resp_data1}, {'json': resp_data2}] + self.stub_auth(response_list=auth_responses) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual('token1', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers()) + + a.invalidate() + self.assertEqual('token2', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers()) + + def test_doesnt_log_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + password = uuid.uuid4().hex + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=password) + s = session.Session(auth=a) + self.assertEqual(self.TEST_TOKEN, s.get_token()) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + self.assertNotIn(password, self.logger.output) + + def test_password_with_no_user_id_or_name(self): + self.assertRaises(TypeError, + v2.Password, self.TEST_URL, password=self.TEST_PASS) diff --git a/keystoneclient/tests/unit/auth/test_identity_v3.py b/keystoneclient/tests/unit/auth/test_identity_v3.py new file mode 100644 index 0000000..29cbb0e --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_v3.py @@ -0,0 +1,490 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystoneclient import access +from keystoneclient.auth.identity import v3 +from keystoneclient import client +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class V3IdentityPlugin(utils.TestCase): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') + + TEST_PASS = 'password' + + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "url": "http://cdn.admin-nets.local:8774/v1.0/", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:8774/v1.0", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "interface": "admin" + }], + "type": "nova_compat" + }, { + "endpoints": [{ + "url": "http://nova/novapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://nova/novapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://nova/novapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "compute", + "name": "nova", + }, { + "endpoints": [{ + "url": "http://glance/glanceapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://glance/glanceapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://glance/glanceapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "internal" + }, { + "url": TEST_ADMIN_URL, + "region": "RegionOne", + "interface": "admin" + }], + "type": "identity" + }, { + "endpoints": [{ + "url": "http://swift/swiftapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://swift/swiftapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://swift/swiftapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "object-store" + }] + + def setUp(self): + super(V3IdentityPlugin, self).setUp() + + V3_URL = "%sv3" % self.TEST_URL + self.TEST_DISCOVERY_RESPONSE = { + 'versions': {'values': [fixture.V3Discovery(V3_URL)]}} + + self.TEST_RESPONSE_DICT = { + "token": { + "methods": [ + "token", + "password" + ], + + "expires_at": "2020-01-01T00:00:10.000123Z", + "project": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_TENANT_ID, + "name": self.TEST_TENANT_NAME + }, + "user": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_USER, + "name": self.TEST_USER + }, + "issued_at": "2013-05-29T16:55:21.468960Z", + "catalog": self.TEST_SERVICE_CATALOG + }, + } + self.TEST_PROJECTS_RESPONSE = { + "projects": [ + { + "domain_id": "1789d1", + "enabled": "True", + "id": "263fd9", + "links": { + "self": "https://identity:5000/v3/projects/263fd9" + }, + "name": "Dev Group A" + }, + { + "domain_id": "1789d1", + "enabled": "True", + "id": "e56ad3", + "links": { + "self": "https://identity:5000/v3/projects/e56ad3" + }, + "name": "Dev Group B" + } + ], + "links": { + "self": "https://identity:5000/v3/projects", + } + } + + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + self.stub_url('POST', ['auth', 'tokens'], + headers={'X-Subject-Token': subject_token}, **kwargs) + + def test_authenticate_with_username_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}}} + + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_unscoped(self): + del self.TEST_RESPONSE_DICT['token']['catalog'] + del self.TEST_RESPONSE_DICT['token']['project'] + + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE) + test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id'] + self.stub_url(method="GET", + json=self.TEST_PROJECTS_RESPONSE, + parts=['users', test_user_id, 'projects']) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + cs = client.Client(session=s, auth_url=self.TEST_URL) + + # As a sanity check on the auth_ref, make sure client has the + # proper user id, that it fetches the right project response + self.assertEqual(test_user_id, a.auth_ref.user_id) + t = cs.projects.list(user=a.auth_ref.user_id) + self.assertEqual(2, len(t)) + + def test_authenticate_with_username_password_domain_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID) + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_project_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, + project_id=self.TEST_DOMAIN_ID) + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID) + + def test_authenticate_with_token(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['token'], + 'token': {'id': self.TEST_TOKEN}}}} + + self.assertRequestBodyIs(json=req) + + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_expired(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + d = copy.deepcopy(self.TEST_RESPONSE_DICT) + d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z' + + a = v3.Password(self.TEST_URL, username='username', + password='password') + a.auth_ref = access.AccessInfo.factory(body=d) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + self.assertEqual(a.auth_ref['expires_at'], + self.TEST_RESPONSE_DICT['token']['expires_at']) + + def test_with_domain_and_project_scoping(self): + a = v3.Password(self.TEST_URL, username='username', + password='password', project_id='project', + domain_id='domain') + + self.assertRaises(exceptions.AuthorizationFailure, + a.get_token, None) + self.assertRaises(exceptions.AuthorizationFailure, + a.get_headers, None) + + def test_with_trust_id(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, trust_id='trust') + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_mechanisms_factory(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS) + t = v3.TokenMethod(token='foo') + a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust') + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_mechanisms(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + p = v3.PasswordMethod(username=self.TEST_USER, + password=self.TEST_PASS) + t = v3.TokenMethod(token='foo') + a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust') + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_scopes(self): + s = session.Session() + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, password=self.TEST_PASS, + domain_id='x', project_id='x') + self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, password=self.TEST_PASS, + domain_id='x', trust_id='x') + self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s) + + def _do_service_url_test(self, base_url, endpoint_filter): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', ['path'], + base_url=base_url, + text='SUCCESS', status_code=200) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter=endpoint_filter) + + self.assertEqual(resp.status_code, 200) + self.assertEqual(self.requests.last_request.url, base_url + '/path') + + def test_service_url(self): + endpoint_filter = {'service_type': 'compute', + 'interface': 'admin', + 'service_name': 'nova'} + self._do_service_url_test('http://nova/novapi/admin', endpoint_filter) + + def test_service_url_defaults_to_public(self): + endpoint_filter = {'service_type': 'compute'} + self._do_service_url_test('http://nova/novapi/public', endpoint_filter) + + def test_endpoint_filter_without_service_type_fails(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.EndpointNotFound, s.get, '/path', + endpoint_filter={'interface': 'admin'}) + + def test_full_url_overrides_endpoint_filter(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', [], + base_url='http://testurl/', + text='SUCCESS', status_code=200) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('http://testurl/', + endpoint_filter={'service_type': 'compute'}) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, 'SUCCESS') + + def test_invalid_auth_response_dict(self): + self.stub_auth(json={'hello': 'world'}) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalid_auth_response_type(self): + self.stub_url('POST', ['auth', 'tokens'], text='testdata') + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalidate_response(self): + auth_responses = [{'status_code': 200, 'json': self.TEST_RESPONSE_DICT, + 'headers': {'X-Subject-Token': 'token1'}}, + {'status_code': 200, 'json': self.TEST_RESPONSE_DICT, + 'headers': {'X-Subject-Token': 'token2'}}] + + self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual('token1', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers()) + a.invalidate() + self.assertEqual('token2', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers()) + + def test_doesnt_log_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + password = uuid.uuid4().hex + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=password) + s = session.Session(a) + self.assertEqual(self.TEST_TOKEN, s.get_token()) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + self.assertNotIn(password, self.logger.output) + + def test_sends_nocatalog(self): + del self.TEST_RESPONSE_DICT['token']['catalog'] + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS, + include_catalog=False) + s = session.Session(auth=a) + + s.get_token() + + auth_url = self.TEST_URL + '/auth/tokens' + self.assertEqual(auth_url, a.token_url) + self.assertEqual(auth_url + '?nocatalog', + self.requests.last_request.url) diff --git a/keystoneclient/tests/unit/auth/test_password.py b/keystoneclient/tests/unit/auth/test_password.py new file mode 100644 index 0000000..c5067c0 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_password.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient.auth.identity.generic import password +from keystoneclient.auth.identity import v2 +from keystoneclient.auth.identity import v3 +from keystoneclient.tests.unit.auth import utils + + +class PasswordTests(utils.GenericPluginTestCase): + + PLUGIN_CLASS = password.Password + V2_PLUGIN_CLASS = v2.Password + V3_PLUGIN_CLASS = v3.Password + + def new_plugin(self, **kwargs): + kwargs.setdefault('username', uuid.uuid4().hex) + kwargs.setdefault('password', uuid.uuid4().hex) + return super(PasswordTests, self).new_plugin(**kwargs) + + def test_with_user_domain_params(self): + self.stub_discovery() + + self.assertCreateV3(domain_id=uuid.uuid4().hex, + user_domain_id=uuid.uuid4().hex) + + def test_v3_user_params_v2_url(self): + self.stub_discovery(v3=False) + self.assertDiscoveryFailure(user_domain_id=uuid.uuid4().hex) + + def test_options(self): + opts = [o.name for o in self.PLUGIN_CLASS.get_options()] + + allowed_opts = ['user-name', + 'user-domain-id', + 'user-domain-name', + 'user-id', + 'password', + + 'domain-id', + 'domain-name', + 'tenant-id', + 'tenant-name', + 'project-id', + 'project-name', + 'project-domain-id', + 'project-domain-name', + 'trust-id', + 'auth-url'] + + self.assertEqual(set(allowed_opts), set(opts)) + self.assertEqual(len(allowed_opts), len(opts)) diff --git a/keystoneclient/tests/unit/auth/test_token.py b/keystoneclient/tests/unit/auth/test_token.py new file mode 100644 index 0000000..928e2b2 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_token.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient.auth.identity.generic import token +from keystoneclient.auth.identity import v2 +from keystoneclient.auth.identity import v3 +from keystoneclient.tests.unit.auth import utils + + +class TokenTests(utils.GenericPluginTestCase): + + PLUGIN_CLASS = token.Token + V2_PLUGIN_CLASS = v2.Token + V3_PLUGIN_CLASS = v3.Token + + def new_plugin(self, **kwargs): + kwargs.setdefault('token', uuid.uuid4().hex) + return super(TokenTests, self).new_plugin(**kwargs) + + def test_options(self): + opts = [o.name for o in self.PLUGIN_CLASS.get_options()] + + allowed_opts = ['token', + 'domain-id', + 'domain-name', + 'tenant-id', + 'tenant-name', + 'project-id', + 'project-name', + 'project-domain-id', + 'project-domain-name', + 'trust-id', + 'auth-url'] + + self.assertEqual(set(allowed_opts), set(opts)) + self.assertEqual(len(allowed_opts), len(opts)) diff --git a/keystoneclient/tests/unit/auth/test_token_endpoint.py b/keystoneclient/tests/unit/auth/test_token_endpoint.py new file mode 100644 index 0000000..4b5f82c --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_token_endpoint.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import matchers + +from keystoneclient.auth import token_endpoint +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class TokenEndpointTest(utils.TestCase): + + TEST_TOKEN = 'aToken' + TEST_URL = 'http://server/prefix' + + def test_basic_case(self): + self.requests.get(self.TEST_URL, text='body') + + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + data = s.get(self.TEST_URL, authenticated=True) + + self.assertEqual(data.text, 'body') + self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN) + + def test_basic_endpoint_case(self): + self.stub_url('GET', ['p'], text='body') + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + data = s.get('/p', + authenticated=True, + endpoint_filter={'service': 'identity'}) + + self.assertEqual(self.TEST_URL, a.get_endpoint(s)) + self.assertEqual('body', data.text) + self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN) + + def test_token_endpoint_options(self): + opt_names = [opt.name for opt in token_endpoint.Token.get_options()] + + self.assertThat(opt_names, matchers.HasLength(2)) + + self.assertIn('token', opt_names) + self.assertIn('endpoint', opt_names) + + def test_token_endpoint_user_id(self): + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session() + + # we can't know this information about this sort of plugin + self.assertIsNone(a.get_user_id(s)) + self.assertIsNone(a.get_project_id(s)) diff --git a/keystoneclient/tests/unit/auth/utils.py b/keystoneclient/tests/unit/auth/utils.py new file mode 100644 index 0000000..6580c73 --- /dev/null +++ b/keystoneclient/tests/unit/auth/utils.py @@ -0,0 +1,200 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import uuid + +import mock +from oslo_config import cfg +import six + +from keystoneclient import access +from keystoneclient.auth import base +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class MockPlugin(base.BaseAuthPlugin): + + INT_DESC = 'test int' + FLOAT_DESC = 'test float' + BOOL_DESC = 'test bool' + STR_DESC = 'test str' + STR_DEFAULT = uuid.uuid4().hex + + def __init__(self, **kwargs): + self._data = kwargs + + def __getitem__(self, key): + return self._data[key] + + def get_token(self, *args, **kwargs): + return 'aToken' + + def get_endpoint(self, *args, **kwargs): + return 'http://test' + + @classmethod + def get_options(cls): + return [ + cfg.IntOpt('a-int', default='3', help=cls.INT_DESC), + cfg.BoolOpt('a-bool', help=cls.BOOL_DESC), + cfg.FloatOpt('a-float', help=cls.FLOAT_DESC), + cfg.StrOpt('a-str', help=cls.STR_DESC, default=cls.STR_DEFAULT), + ] + + +class MockManager(object): + + def __init__(self, driver): + self.driver = driver + + +def mock_plugin(f): + @functools.wraps(f) + def inner(*args, **kwargs): + with mock.patch.object(base, 'get_plugin_class') as m: + m.return_value = MockPlugin + args = list(args) + [m] + return f(*args, **kwargs) + + return inner + + +class TestCase(utils.TestCase): + + GROUP = 'auth' + V2PASS = 'v2password' + V3TOKEN = 'v3token' + + a_int = 88 + a_float = 88.8 + a_bool = False + + TEST_VALS = {'a_int': a_int, + 'a_float': a_float, + 'a_bool': a_bool} + + def assertTestVals(self, plugin, vals=TEST_VALS): + for k, v in six.iteritems(vals): + self.assertEqual(v, plugin[k]) + + +class GenericPluginTestCase(utils.TestCase): + + TEST_URL = 'http://keystone.host:5000/' + + # OVERRIDE THESE IN SUB CLASSES + PLUGIN_CLASS = None + V2_PLUGIN_CLASS = None + V3_PLUGIN_CLASS = None + + def setUp(self): + super(GenericPluginTestCase, self).setUp() + + self.token_v2 = fixture.V2Token() + self.token_v3 = fixture.V3Token() + self.token_v3_id = uuid.uuid4().hex + self.session = session.Session() + + self.stub_url('POST', ['v2.0', 'tokens'], json=self.token_v2) + self.stub_url('POST', ['v3', 'auth', 'tokens'], + headers={'X-Subject-Token': self.token_v3_id}, + json=self.token_v3) + + def new_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + return self.PLUGIN_CLASS(**kwargs) + + def stub_discovery(self, base_url=None, **kwargs): + kwargs.setdefault('href', self.TEST_URL) + disc = fixture.DiscoveryList(**kwargs) + self.stub_url('GET', json=disc, base_url=base_url, status_code=300) + return disc + + def assertCreateV3(self, **kwargs): + auth = self.new_plugin(**kwargs) + auth_ref = auth.get_auth_ref(self.session) + self.assertIsInstance(auth_ref, access.AccessInfoV3) + self.assertEqual(self.TEST_URL + 'v3/auth/tokens', + self.requests.last_request.url) + self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS) + return auth + + def assertCreateV2(self, **kwargs): + auth = self.new_plugin(**kwargs) + auth_ref = auth.get_auth_ref(self.session) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + self.assertEqual(self.TEST_URL + 'v2.0/tokens', + self.requests.last_request.url) + self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS) + return auth + + def assertDiscoveryFailure(self, **kwargs): + plugin = self.new_plugin(**kwargs) + self.assertRaises(exceptions.DiscoveryFailure, + plugin.get_auth_ref, + self.session) + + def test_create_v3_if_domain_params(self): + self.stub_discovery() + + self.assertCreateV3(domain_id=uuid.uuid4().hex) + self.assertCreateV3(domain_name=uuid.uuid4().hex) + self.assertCreateV3(project_name=uuid.uuid4().hex, + project_domain_name=uuid.uuid4().hex) + self.assertCreateV3(project_name=uuid.uuid4().hex, + project_domain_id=uuid.uuid4().hex) + + def test_create_v2_if_no_domain_params(self): + self.stub_discovery() + self.assertCreateV2() + self.assertCreateV2(project_id=uuid.uuid4().hex) + self.assertCreateV2(project_name=uuid.uuid4().hex) + self.assertCreateV2(tenant_id=uuid.uuid4().hex) + self.assertCreateV2(tenant_name=uuid.uuid4().hex) + + def test_v3_params_v2_url(self): + self.stub_discovery(v3=False) + self.assertDiscoveryFailure(domain_name=uuid.uuid4().hex) + + def test_v2_params_v3_url(self): + self.stub_discovery(v2=False) + self.assertCreateV3() + + def test_no_urls(self): + self.stub_discovery(v2=False, v3=False) + self.assertDiscoveryFailure() + + def test_path_based_url_v2(self): + self.stub_url('GET', ['v2.0'], status_code=403) + self.assertCreateV2(auth_url=self.TEST_URL + 'v2.0') + + def test_path_based_url_v3(self): + self.stub_url('GET', ['v3'], status_code=403) + self.assertCreateV3(auth_url=self.TEST_URL + 'v3') + + def test_disc_error_for_failure(self): + self.stub_url('GET', [], status_code=403) + self.assertDiscoveryFailure() + + def test_v3_plugin_from_failure(self): + url = self.TEST_URL + 'v3' + self.stub_url('GET', [], base_url=url, status_code=403) + self.assertCreateV3(auth_url=url) + + def test_unknown_discovery_version(self): + # make a v4 entry that's mostly the same as a v3 + self.stub_discovery(v2=False, v3_id='v4.0') + self.assertDiscoveryFailure() diff --git a/keystoneclient/tests/unit/test_discovery.py b/keystoneclient/tests/unit/test_discovery.py new file mode 100644 index 0000000..6c208a3 --- /dev/null +++ b/keystoneclient/tests/unit/test_discovery.py @@ -0,0 +1,802 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re +import uuid + +from oslo_serialization import jsonutils +import six +from testtools import matchers + +from keystoneclient import _discover +from keystoneclient.auth import token_endpoint +from keystoneclient import client +from keystoneclient import discover +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils +from keystoneclient.v2_0 import client as v2_client +from keystoneclient.v3 import client as v3_client + + +BASE_HOST = 'http://keystone.example.com' +BASE_URL = "%s:5000/" % BASE_HOST +UPDATED = '2013-03-06T00:00:00Z' + +TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "adminURL": "%s:8774/v1.0" % BASE_HOST, + "region": "RegionOne", + "internalURL": "%s://127.0.0.1:8774/v1.0" % BASE_HOST, + "publicURL": "%s:8774/v1.0/" % BASE_HOST + }], + "type": "nova_compat", + "name": "nova_compat" +}, { + "endpoints": [{ + "adminURL": "http://nova/novapi/admin", + "region": "RegionOne", + "internalURL": "http://nova/novapi/internal", + "publicURL": "http://nova/novapi/public" + }], + "type": "compute", + "name": "nova" +}, { + "endpoints": [{ + "adminURL": "http://glance/glanceapi/admin", + "region": "RegionOne", + "internalURL": "http://glance/glanceapi/internal", + "publicURL": "http://glance/glanceapi/public" + }], + "type": "image", + "name": "glance" +}, { + "endpoints": [{ + "adminURL": "%s:35357/v2.0" % BASE_HOST, + "region": "RegionOne", + "internalURL": "%s:5000/v2.0" % BASE_HOST, + "publicURL": "%s:5000/v2.0" % BASE_HOST + }], + "type": "identity", + "name": "keystone" +}, { + "endpoints": [{ + "adminURL": "http://swift/swiftapi/admin", + "region": "RegionOne", + "internalURL": "http://swift/swiftapi/internal", + "publicURL": "http://swift/swiftapi/public" + }], + "type": "object-store", + "name": "swift" +}] + +V2_URL = "%sv2.0" % BASE_URL +V2_VERSION = fixture.V2Discovery(V2_URL) +V2_VERSION.updated_str = UPDATED + +V2_AUTH_RESPONSE = jsonutils.dumps({ + "access": { + "token": { + "expires": "2020-01-01T00:00:10.000123Z", + "id": 'fakeToken', + "tenant": { + "id": '1' + }, + }, + "user": { + "id": 'test' + }, + "serviceCatalog": TEST_SERVICE_CATALOG, + }, +}) + +V3_URL = "%sv3" % BASE_URL +V3_VERSION = fixture.V3Discovery(V3_URL) +V3_MEDIA_TYPES = V3_VERSION.media_types +V3_VERSION.updated_str = UPDATED + +V3_TOKEN = six.u('3e2813b7ba0b4006840c3825860b86ed'), +V3_AUTH_RESPONSE = jsonutils.dumps({ + "token": { + "methods": [ + "token", + "password" + ], + + "expires_at": "2020-01-01T00:00:10.000123Z", + "project": { + "domain": { + "id": '1', + "name": 'test-domain' + }, + "id": '1', + "name": 'test-project' + }, + "user": { + "domain": { + "id": '1', + "name": 'test-domain' + }, + "id": '1', + "name": 'test-user' + }, + "issued_at": "2013-05-29T16:55:21.468960Z", + }, +}) + +CINDER_EXAMPLES = { + "versions": [ + { + "status": "CURRENT", + "updated": "2012-01-04T11:33:21Z", + "id": "v1.0", + "links": [ + { + "href": "%sv1/" % BASE_URL, + "rel": "self" + } + ] + }, + { + "status": "CURRENT", + "updated": "2012-11-21T11:33:21Z", + "id": "v2.0", + "links": [ + { + "href": "%sv2/" % BASE_URL, + "rel": "self" + } + ] + } + ] +} + +GLANCE_EXAMPLES = { + "versions": [ + { + "status": "CURRENT", + "id": "v2.2", + "links": [ + { + "href": "%sv2/" % BASE_URL, + "rel": "self" + } + ] + }, + { + "status": "SUPPORTED", + "id": "v2.1", + "links": [ + { + "href": "%sv2/" % BASE_URL, + "rel": "self" + } + ] + }, + { + "status": "SUPPORTED", + "id": "v2.0", + "links": [ + { + "href": "%sv2/" % BASE_URL, + "rel": "self" + } + ] + }, + { + "status": "CURRENT", + "id": "v1.1", + "links": [ + { + "href": "%sv1/" % BASE_URL, + "rel": "self" + } + ] + }, + { + "status": "SUPPORTED", + "id": "v1.0", + "links": [ + { + "href": "%sv1/" % BASE_URL, + "rel": "self" + } + ] + } + ] +} + + +def _create_version_list(versions): + return jsonutils.dumps({'versions': {'values': versions}}) + + +def _create_single_version(version): + return jsonutils.dumps({'version': version}) + + +V3_VERSION_LIST = _create_version_list([V3_VERSION, V2_VERSION]) +V2_VERSION_LIST = _create_version_list([V2_VERSION]) + +V3_VERSION_ENTRY = _create_single_version(V3_VERSION) +V2_VERSION_ENTRY = _create_single_version(V2_VERSION) + + +class AvailableVersionsTests(utils.TestCase): + + def test_available_versions_basics(self): + examples = {'keystone': V3_VERSION_LIST, + 'cinder': jsonutils.dumps(CINDER_EXAMPLES), + 'glance': jsonutils.dumps(GLANCE_EXAMPLES)} + + for path, text in six.iteritems(examples): + url = "%s%s" % (BASE_URL, path) + + self.requests.get(url, status_code=300, text=text) + versions = discover.available_versions(url) + + for v in versions: + for n in ('id', 'status', 'links'): + msg = '%s missing from %s version data' % (n, path) + self.assertThat(v, matchers.Annotate(msg, + matchers.Contains(n))) + + def test_available_versions_individual(self): + self.requests.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY) + + versions = discover.available_versions(V3_URL) + + for v in versions: + self.assertEqual(v['id'], 'v3.0') + self.assertEqual(v['status'], 'stable') + self.assertIn('media-types', v) + self.assertIn('links', v) + + def test_available_keystone_data(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + versions = discover.available_versions(BASE_URL) + self.assertEqual(2, len(versions)) + + for v in versions: + self.assertIn(v['id'], ('v2.0', 'v3.0')) + self.assertEqual(v['updated'], UPDATED) + self.assertEqual(v['status'], 'stable') + + if v['id'] == 'v3.0': + self.assertEqual(v['media-types'], V3_MEDIA_TYPES) + + def test_available_cinder_data(self): + text = jsonutils.dumps(CINDER_EXAMPLES) + self.requests.get(BASE_URL, status_code=300, text=text) + + versions = discover.available_versions(BASE_URL) + self.assertEqual(2, len(versions)) + + for v in versions: + self.assertEqual(v['status'], 'CURRENT') + if v['id'] == 'v1.0': + self.assertEqual(v['updated'], '2012-01-04T11:33:21Z') + elif v['id'] == 'v2.0': + self.assertEqual(v['updated'], '2012-11-21T11:33:21Z') + else: + self.fail("Invalid version found") + + def test_available_glance_data(self): + text = jsonutils.dumps(GLANCE_EXAMPLES) + self.requests.get(BASE_URL, status_code=200, text=text) + + versions = discover.available_versions(BASE_URL) + self.assertEqual(5, len(versions)) + + for v in versions: + if v['id'] in ('v2.2', 'v1.1'): + self.assertEqual(v['status'], 'CURRENT') + elif v['id'] in ('v2.1', 'v2.0', 'v1.0'): + self.assertEqual(v['status'], 'SUPPORTED') + else: + self.fail("Invalid version found") + + +class ClientDiscoveryTests(utils.TestCase): + + def assertCreatesV3(self, **kwargs): + self.requests.post('%s/auth/tokens' % V3_URL, + text=V3_AUTH_RESPONSE, + headers={'X-Subject-Token': V3_TOKEN}) + + kwargs.setdefault('username', 'foo') + kwargs.setdefault('password', 'bar') + keystone = client.Client(**kwargs) + self.assertIsInstance(keystone, v3_client.Client) + return keystone + + def assertCreatesV2(self, **kwargs): + self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE) + + kwargs.setdefault('username', 'foo') + kwargs.setdefault('password', 'bar') + keystone = client.Client(**kwargs) + self.assertIsInstance(keystone, v2_client.Client) + return keystone + + def assertVersionNotAvailable(self, **kwargs): + kwargs.setdefault('username', 'foo') + kwargs.setdefault('password', 'bar') + + self.assertRaises(exceptions.VersionNotAvailable, + client.Client, **kwargs) + + def assertDiscoveryFailure(self, **kwargs): + kwargs.setdefault('username', 'foo') + kwargs.setdefault('password', 'bar') + + self.assertRaises(exceptions.DiscoveryFailure, + client.Client, **kwargs) + + def test_discover_v3(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + self.assertCreatesV3(auth_url=BASE_URL) + + def test_discover_v2(self): + self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) + self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE) + + self.assertCreatesV2(auth_url=BASE_URL) + + def test_discover_endpoint_v2(self): + self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) + self.assertCreatesV2(endpoint=BASE_URL, token='fake-token') + + def test_discover_endpoint_v3(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + self.assertCreatesV3(endpoint=BASE_URL, token='fake-token') + + def test_discover_invalid_major_version(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + self.assertVersionNotAvailable(auth_url=BASE_URL, version=5) + + def test_discover_200_response_fails(self): + self.requests.get(BASE_URL, text='ok') + self.assertDiscoveryFailure(auth_url=BASE_URL) + + def test_discover_minor_greater_than_available_fails(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4) + + def test_discover_individual_version_v2(self): + self.requests.get(V2_URL, text=V2_VERSION_ENTRY) + + self.assertCreatesV2(auth_url=V2_URL) + + def test_discover_individual_version_v3(self): + self.requests.get(V3_URL, text=V3_VERSION_ENTRY) + + self.assertCreatesV3(auth_url=V3_URL) + + def test_discover_individual_endpoint_v2(self): + self.requests.get(V2_URL, text=V2_VERSION_ENTRY) + self.assertCreatesV2(endpoint=V2_URL, token='fake-token') + + def test_discover_individual_endpoint_v3(self): + self.requests.get(V3_URL, text=V3_VERSION_ENTRY) + self.assertCreatesV3(endpoint=V3_URL, token='fake-token') + + def test_discover_fail_to_create_bad_individual_version(self): + self.requests.get(V2_URL, text=V2_VERSION_ENTRY) + self.requests.get(V3_URL, text=V3_VERSION_ENTRY) + + self.assertVersionNotAvailable(auth_url=V2_URL, version=3) + self.assertVersionNotAvailable(auth_url=V3_URL, version=2) + + def test_discover_unstable_versions(self): + version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta') + self.requests.get(BASE_URL, status_code=300, json=version_list) + + self.assertCreatesV2(auth_url=BASE_URL) + self.assertVersionNotAvailable(auth_url=BASE_URL, version=3) + self.assertCreatesV3(auth_url=BASE_URL, unstable=True) + + def test_discover_forwards_original_ip(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + ip = '192.168.1.1' + self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip) + + self.assertThat(self.requests.last_request.headers['forwarded'], + matchers.Contains(ip)) + + def test_discover_bad_args(self): + self.assertRaises(exceptions.DiscoveryFailure, + client.Client) + + def test_discover_bad_response(self): + self.requests.get(BASE_URL, status_code=300, json={'FOO': 'BAR'}) + self.assertDiscoveryFailure(auth_url=BASE_URL) + + def test_discovery_ignore_invalid(self): + resp = [{'id': 'v3.0', + 'links': [1, 2, 3, 4], # invalid links + 'media-types': V3_MEDIA_TYPES, + 'status': 'stable', + 'updated': UPDATED}] + self.requests.get(BASE_URL, status_code=300, + text=_create_version_list(resp)) + self.assertDiscoveryFailure(auth_url=BASE_URL) + + def test_ignore_entry_without_links(self): + v3 = V3_VERSION.copy() + v3['links'] = [] + self.requests.get(BASE_URL, status_code=300, + text=_create_version_list([v3, V2_VERSION])) + self.assertCreatesV2(auth_url=BASE_URL) + + def test_ignore_entry_without_status(self): + v3 = V3_VERSION.copy() + del v3['status'] + self.requests.get(BASE_URL, status_code=300, + text=_create_version_list([v3, V2_VERSION])) + self.assertCreatesV2(auth_url=BASE_URL) + + def test_greater_version_than_required(self): + versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6') + self.requests.get(BASE_URL, json=versions) + self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4)) + + def test_lesser_version_than_required(self): + versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4') + self.requests.get(BASE_URL, json=versions) + self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6)) + + def test_bad_response(self): + self.requests.get(BASE_URL, status_code=300, text="Ugly Duckling") + self.assertDiscoveryFailure(auth_url=BASE_URL) + + def test_pass_client_arguments(self): + self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) + kwargs = {'original_ip': '100', 'use_keyring': False, + 'stale_duration': 15} + + cl = self.assertCreatesV2(auth_url=BASE_URL, **kwargs) + + self.assertEqual(cl.original_ip, '100') + self.assertEqual(cl.stale_duration, 15) + self.assertFalse(cl.use_keyring) + + def test_overriding_stored_kwargs(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + self.requests.post("%s/auth/tokens" % V3_URL, + text=V3_AUTH_RESPONSE, + headers={'X-Subject-Token': V3_TOKEN}) + + disc = discover.Discover(auth_url=BASE_URL, debug=False, + username='foo') + client = disc.create_client(debug=True, password='bar') + + self.assertIsInstance(client, v3_client.Client) + self.assertTrue(client.debug_log) + self.assertFalse(disc._client_kwargs['debug']) + self.assertEqual(client.username, 'foo') + self.assertEqual(client.password, 'bar') + + def test_available_versions(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_ENTRY) + disc = discover.Discover(auth_url=BASE_URL) + + versions = disc.available_versions() + self.assertEqual(1, len(versions)) + self.assertEqual(V3_VERSION, versions[0]) + + def test_unknown_client_version(self): + V4_VERSION = {'id': 'v4.0', + 'links': [{'href': 'http://url', 'rel': 'self'}], + 'media-types': V3_MEDIA_TYPES, + 'status': 'stable', + 'updated': UPDATED} + versions = fixture.DiscoveryList() + versions.add_version(V4_VERSION) + self.requests.get(BASE_URL, status_code=300, json=versions) + + disc = discover.Discover(auth_url=BASE_URL) + self.assertRaises(exceptions.DiscoveryFailure, + disc.create_client, version=4) + + def test_discovery_fail_for_missing_v3(self): + versions = fixture.DiscoveryList(v2=True, v3=False) + self.requests.get(BASE_URL, status_code=300, json=versions) + + disc = discover.Discover(auth_url=BASE_URL) + self.assertRaises(exceptions.DiscoveryFailure, + disc.create_client, version=(3, 0)) + + def _do_discovery_call(self, token=None, **kwargs): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + if not token: + token = uuid.uuid4().hex + + url = 'http://testurl' + a = token_endpoint.Token(url, token) + s = session.Session(auth=a) + + # will default to true as there is a plugin on the session + discover.Discover(s, auth_url=BASE_URL, **kwargs) + + self.assertEqual(BASE_URL, self.requests.last_request.url) + + def test_setting_authenticated_true(self): + token = uuid.uuid4().hex + self._do_discovery_call(token) + self.assertRequestHeaderEqual('X-Auth-Token', token) + + def test_setting_authenticated_false(self): + self._do_discovery_call(authenticated=False) + self.assertNotIn('X-Auth-Token', self.requests.last_request.headers) + + +class DiscoverQueryTests(utils.TestCase): + + def test_available_keystone_data(self): + self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) + + disc = discover.Discover(auth_url=BASE_URL) + versions = disc.version_data() + + self.assertEqual((2, 0), versions[0]['version']) + self.assertEqual('stable', versions[0]['raw_status']) + self.assertEqual(V2_URL, versions[0]['url']) + self.assertEqual((3, 0), versions[1]['version']) + self.assertEqual('stable', versions[1]['raw_status']) + self.assertEqual(V3_URL, versions[1]['url']) + + version = disc.data_for('v3.0') + self.assertEqual((3, 0), version['version']) + self.assertEqual('stable', version['raw_status']) + self.assertEqual(V3_URL, version['url']) + + version = disc.data_for(2) + self.assertEqual((2, 0), version['version']) + self.assertEqual('stable', version['raw_status']) + self.assertEqual(V2_URL, version['url']) + + self.assertIsNone(disc.url_for('v4')) + self.assertEqual(V3_URL, disc.url_for('v3')) + self.assertEqual(V2_URL, disc.url_for('v2')) + + def test_available_cinder_data(self): + text = jsonutils.dumps(CINDER_EXAMPLES) + self.requests.get(BASE_URL, status_code=300, text=text) + + v1_url = "%sv1/" % BASE_URL + v2_url = "%sv2/" % BASE_URL + + disc = discover.Discover(auth_url=BASE_URL) + versions = disc.version_data() + + self.assertEqual((1, 0), versions[0]['version']) + self.assertEqual('CURRENT', versions[0]['raw_status']) + self.assertEqual(v1_url, versions[0]['url']) + self.assertEqual((2, 0), versions[1]['version']) + self.assertEqual('CURRENT', versions[1]['raw_status']) + self.assertEqual(v2_url, versions[1]['url']) + + version = disc.data_for('v2.0') + self.assertEqual((2, 0), version['version']) + self.assertEqual('CURRENT', version['raw_status']) + self.assertEqual(v2_url, version['url']) + + version = disc.data_for(1) + self.assertEqual((1, 0), version['version']) + self.assertEqual('CURRENT', version['raw_status']) + self.assertEqual(v1_url, version['url']) + + self.assertIsNone(disc.url_for('v3')) + self.assertEqual(v2_url, disc.url_for('v2')) + self.assertEqual(v1_url, disc.url_for('v1')) + + def test_available_glance_data(self): + text = jsonutils.dumps(GLANCE_EXAMPLES) + self.requests.get(BASE_URL, text=text) + + v1_url = "%sv1/" % BASE_URL + v2_url = "%sv2/" % BASE_URL + + disc = discover.Discover(auth_url=BASE_URL) + versions = disc.version_data() + + self.assertEqual((1, 0), versions[0]['version']) + self.assertEqual('SUPPORTED', versions[0]['raw_status']) + self.assertEqual(v1_url, versions[0]['url']) + self.assertEqual((1, 1), versions[1]['version']) + self.assertEqual('CURRENT', versions[1]['raw_status']) + self.assertEqual(v1_url, versions[1]['url']) + self.assertEqual((2, 0), versions[2]['version']) + self.assertEqual('SUPPORTED', versions[2]['raw_status']) + self.assertEqual(v2_url, versions[2]['url']) + self.assertEqual((2, 1), versions[3]['version']) + self.assertEqual('SUPPORTED', versions[3]['raw_status']) + self.assertEqual(v2_url, versions[3]['url']) + self.assertEqual((2, 2), versions[4]['version']) + self.assertEqual('CURRENT', versions[4]['raw_status']) + self.assertEqual(v2_url, versions[4]['url']) + + for ver in (2, 2.1, 2.2): + version = disc.data_for(ver) + self.assertEqual((2, 2), version['version']) + self.assertEqual('CURRENT', version['raw_status']) + self.assertEqual(v2_url, version['url']) + self.assertEqual(v2_url, disc.url_for(ver)) + + for ver in (1, 1.1): + version = disc.data_for(ver) + self.assertEqual((1, 1), version['version']) + self.assertEqual('CURRENT', version['raw_status']) + self.assertEqual(v1_url, version['url']) + self.assertEqual(v1_url, disc.url_for(ver)) + + self.assertIsNone(disc.url_for('v3')) + self.assertIsNone(disc.url_for('v2.3')) + + def test_allow_deprecated(self): + status = 'deprecated' + version_list = [{'id': 'v3.0', + 'links': [{'href': V3_URL, 'rel': 'self'}], + 'media-types': V3_MEDIA_TYPES, + 'status': status, + 'updated': UPDATED}] + text = jsonutils.dumps({'versions': version_list}) + self.requests.get(BASE_URL, text=text) + + disc = discover.Discover(auth_url=BASE_URL) + + # deprecated is allowed by default + versions = disc.version_data(allow_deprecated=False) + self.assertEqual(0, len(versions)) + + versions = disc.version_data(allow_deprecated=True) + self.assertEqual(1, len(versions)) + self.assertEqual(status, versions[0]['raw_status']) + self.assertEqual(V3_URL, versions[0]['url']) + self.assertEqual((3, 0), versions[0]['version']) + + def test_allow_experimental(self): + status = 'experimental' + version_list = [{'id': 'v3.0', + 'links': [{'href': V3_URL, 'rel': 'self'}], + 'media-types': V3_MEDIA_TYPES, + 'status': status, + 'updated': UPDATED}] + text = jsonutils.dumps({'versions': version_list}) + self.requests.get(BASE_URL, text=text) + + disc = discover.Discover(auth_url=BASE_URL) + + versions = disc.version_data() + self.assertEqual(0, len(versions)) + + versions = disc.version_data(allow_experimental=True) + self.assertEqual(1, len(versions)) + self.assertEqual(status, versions[0]['raw_status']) + self.assertEqual(V3_URL, versions[0]['url']) + self.assertEqual((3, 0), versions[0]['version']) + + def test_allow_unknown(self): + status = 'abcdef' + version_list = fixture.DiscoveryList(BASE_URL, v2=False, + v3_status=status) + self.requests.get(BASE_URL, json=version_list) + disc = discover.Discover(auth_url=BASE_URL) + + versions = disc.version_data() + self.assertEqual(0, len(versions)) + + versions = disc.version_data(allow_unknown=True) + self.assertEqual(1, len(versions)) + self.assertEqual(status, versions[0]['raw_status']) + self.assertEqual(V3_URL, versions[0]['url']) + self.assertEqual((3, 0), versions[0]['version']) + + def test_ignoring_invalid_lnks(self): + version_list = [{'id': 'v3.0', + 'links': [{'href': V3_URL, 'rel': 'self'}], + 'media-types': V3_MEDIA_TYPES, + 'status': 'stable', + 'updated': UPDATED}, + {'id': 'v3.1', + 'media-types': V3_MEDIA_TYPES, + 'status': 'stable', + 'updated': UPDATED}, + {'media-types': V3_MEDIA_TYPES, + 'status': 'stable', + 'updated': UPDATED, + 'links': [{'href': V3_URL, 'rel': 'self'}], + }] + + text = jsonutils.dumps({'versions': version_list}) + self.requests.get(BASE_URL, text=text) + + disc = discover.Discover(auth_url=BASE_URL) + + # raw_version_data will return all choices, even invalid ones + versions = disc.raw_version_data() + self.assertEqual(3, len(versions)) + + # only the version with both id and links will be actually returned + versions = disc.version_data() + self.assertEqual(1, len(versions)) + + +class CatalogHackTests(utils.TestCase): + + TEST_URL = 'http://keystone.server:5000/v2.0' + OTHER_URL = 'http://other.server:5000/path' + + IDENTITY = 'identity' + + BASE_URL = 'http://keystone.server:5000/' + V2_URL = BASE_URL + 'v2.0' + V3_URL = BASE_URL + 'v3' + + def setUp(self): + super(CatalogHackTests, self).setUp() + self.hacks = _discover._VersionHacks() + self.hacks.add_discover_hack(self.IDENTITY, + re.compile('/v2.0/?$'), + '/') + + def test_version_hacks(self): + self.assertEqual(self.BASE_URL, + self.hacks.get_discover_hack(self.IDENTITY, + self.V2_URL)) + + self.assertEqual(self.BASE_URL, + self.hacks.get_discover_hack(self.IDENTITY, + self.V2_URL + '/')) + + self.assertEqual(self.OTHER_URL, + self.hacks.get_discover_hack(self.IDENTITY, + self.OTHER_URL)) + + def test_ignored_non_service_type(self): + self.assertEqual(self.V2_URL, + self.hacks.get_discover_hack('other', self.V2_URL)) + + +class DiscoverUtils(utils.TestCase): + + def test_version_number(self): + def assertVersion(inp, out): + self.assertEqual(out, _discover.normalize_version_number(inp)) + + def versionRaises(inp): + self.assertRaises(TypeError, + _discover.normalize_version_number, + inp) + + assertVersion('v1.2', (1, 2)) + assertVersion('v11', (11, 0)) + assertVersion('1.2', (1, 2)) + assertVersion('1.5.1', (1, 5, 1)) + assertVersion('1', (1, 0)) + assertVersion(1, (1, 0)) + assertVersion(5.2, (5, 2)) + assertVersion((6, 1), (6, 1)) + assertVersion([1, 4], (1, 4)) + + versionRaises('hello') + versionRaises('1.a') + versionRaises('vacuum') diff --git a/keystoneclient/tests/unit/test_hacking_checks.py b/keystoneclient/tests/unit/test_hacking_checks.py new file mode 100644 index 0000000..220d258 --- /dev/null +++ b/keystoneclient/tests/unit/test_hacking_checks.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import textwrap + +import mock +import pep8 +import testtools + +from keystoneclient.hacking import checks +from keystoneclient.tests.unit import client_fixtures + + +class TestCheckOsloNamespaceImports(testtools.TestCase): + + # We are patching pep8 so that only the check under test is actually + # installed. + @mock.patch('pep8._checks', + {'physical_line': {}, 'logical_line': {}, 'tree': {}}) + def run_check(self, code): + pep8.register_check(checks.check_oslo_namespace_imports) + + lines = textwrap.dedent(code).strip().splitlines(True) + + checker = pep8.Checker(lines=lines) + checker.check_all() + checker.report._deferred_print.sort() + return checker.report._deferred_print + + def assert_has_errors(self, code, expected_errors=None): + actual_errors = [e[:3] for e in self.run_check(code)] + self.assertEqual(expected_errors or [], actual_errors) + + def test(self): + code_ex = self.useFixture(client_fixtures.HackingCode()) + code = code_ex.oslo_namespace_imports['code'] + errors = code_ex.oslo_namespace_imports['expected_errors'] + self.assert_has_errors(code, expected_errors=errors) diff --git a/keystoneclient/tests/unit/test_session.py b/keystoneclient/tests/unit/test_session.py new file mode 100644 index 0000000..1d01c3a --- /dev/null +++ b/keystoneclient/tests/unit/test_session.py @@ -0,0 +1,866 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import itertools +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config +from oslo_serialization import jsonutils +import requests +import six +from testtools import matchers + +from keystoneclient import adapter +from keystoneclient.auth import base +from keystoneclient import exceptions +from keystoneclient import session as client_session +from keystoneclient.tests.unit import utils + + +class SessionTests(utils.TestCase): + + TEST_URL = 'http://127.0.0.1:5000/' + + def test_get(self): + session = client_session.Session() + self.stub_url('GET', text='response') + resp = session.get(self.TEST_URL) + + self.assertEqual('GET', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + + def test_post(self): + session = client_session.Session() + self.stub_url('POST', text='response') + resp = session.post(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('POST', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_head(self): + session = client_session.Session() + self.stub_url('HEAD') + resp = session.head(self.TEST_URL) + + self.assertEqual('HEAD', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertRequestBodyIs('') + + def test_put(self): + session = client_session.Session() + self.stub_url('PUT', text='response') + resp = session.put(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('PUT', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_delete(self): + session = client_session.Session() + self.stub_url('DELETE', text='response') + resp = session.delete(self.TEST_URL) + + self.assertEqual('DELETE', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertEqual(resp.text, 'response') + + def test_patch(self): + session = client_session.Session() + self.stub_url('PATCH', text='response') + resp = session.patch(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('PATCH', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertEqual(resp.text, 'response') + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_user_agent(self): + session = client_session.Session(user_agent='test-agent') + self.stub_url('GET', text='response') + resp = session.get(self.TEST_URL) + + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'test-agent') + + resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'}) + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'new-agent') + + resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'}, + user_agent='overrides-agent') + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'overrides-agent') + + def test_http_session_opts(self): + session = client_session.Session(cert='cert.pem', timeout=5, + verify='certs') + + FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'}) + RESP = mock.Mock(return_value=FAKE_RESP) + + with mock.patch.object(session.session, 'request', RESP) as mocked: + session.post(self.TEST_URL, data='value') + + mock_args, mock_kwargs = mocked.call_args + + self.assertEqual(mock_args[0], 'POST') + self.assertEqual(mock_args[1], self.TEST_URL) + self.assertEqual(mock_kwargs['data'], 'value') + self.assertEqual(mock_kwargs['cert'], 'cert.pem') + self.assertEqual(mock_kwargs['verify'], 'certs') + self.assertEqual(mock_kwargs['timeout'], 5) + + def test_not_found(self): + session = client_session.Session() + self.stub_url('GET', status_code=404) + self.assertRaises(exceptions.NotFound, session.get, self.TEST_URL) + + def test_server_error(self): + session = client_session.Session() + self.stub_url('GET', status_code=500) + self.assertRaises(exceptions.InternalServerError, + session.get, self.TEST_URL) + + def test_session_debug_output(self): + """Test request and response headers in debug logs + + in order to redact secure headers while debug is true. + """ + session = client_session.Session(verify=False) + headers = {'HEADERA': 'HEADERVALB'} + security_headers = {'Authorization': uuid.uuid4().hex, + 'X-Auth-Token': uuid.uuid4().hex, + 'X-Subject-Token': uuid.uuid4().hex, } + body = 'BODYRESPONSE' + data = 'BODYDATA' + all_headers = dict( + itertools.chain(headers.items(), security_headers.items())) + self.stub_url('POST', text=body, headers=all_headers) + resp = session.post(self.TEST_URL, headers=all_headers, data=data) + self.assertEqual(resp.status_code, 200) + + self.assertIn('curl', self.logger.output) + self.assertIn('POST', self.logger.output) + self.assertIn('--insecure', self.logger.output) + self.assertIn(body, self.logger.output) + self.assertIn("'%s'" % data, self.logger.output) + + for k, v in six.iteritems(headers): + self.assertIn(k, self.logger.output) + self.assertIn(v, self.logger.output) + + # Assert that response headers contains actual values and + # only debug logs has been masked + for k, v in six.iteritems(security_headers): + self.assertIn('%s: {SHA1}' % k, self.logger.output) + self.assertEqual(v, resp.headers[k]) + self.assertNotIn(v, self.logger.output) + + def test_logging_cacerts(self): + path_to_certs = '/path/to/certs' + session = client_session.Session(verify=path_to_certs) + + self.stub_url('GET', text='text') + session.get(self.TEST_URL) + + self.assertIn('--cacert', self.logger.output) + self.assertIn(path_to_certs, self.logger.output) + + def test_connect_retries(self): + + def _timeout_error(request, context): + raise requests.exceptions.Timeout() + + self.stub_url('GET', text=_timeout_error) + + session = client_session.Session() + retries = 3 + + with mock.patch('time.sleep') as m: + self.assertRaises(exceptions.RequestTimeout, + session.get, + self.TEST_URL, connect_retries=retries) + + self.assertEqual(retries, m.call_count) + # 3 retries finishing with 2.0 means 0.5, 1.0 and 2.0 + m.assert_called_with(2.0) + + # we count retries so there will be one initial request + 3 retries + self.assertThat(self.requests.request_history, + matchers.HasLength(retries + 1)) + + def test_uses_tcp_keepalive_by_default(self): + session = client_session.Session() + requests_session = session.session + self.assertIsInstance(requests_session.adapters['http://'], + client_session.TCPKeepAliveAdapter) + self.assertIsInstance(requests_session.adapters['https://'], + client_session.TCPKeepAliveAdapter) + + def test_does_not_set_tcp_keepalive_on_custom_sessions(self): + mock_session = mock.Mock() + client_session.Session(session=mock_session) + self.assertFalse(mock_session.mount.called) + + +class RedirectTests(utils.TestCase): + + REDIRECT_CHAIN = ['http://myhost:3445/', + 'http://anotherhost:6555/', + 'http://thirdhost/', + 'http://finaldestination:55/'] + + DEFAULT_REDIRECT_BODY = 'Redirect' + DEFAULT_RESP_BODY = 'Found' + + def setup_redirects(self, method='GET', status_code=305, + redirect_kwargs={}, final_kwargs={}): + redirect_kwargs.setdefault('text', self.DEFAULT_REDIRECT_BODY) + + for s, d in zip(self.REDIRECT_CHAIN, self.REDIRECT_CHAIN[1:]): + self.requests.register_uri(method, s, status_code=status_code, + headers={'Location': d}, + **redirect_kwargs) + + final_kwargs.setdefault('status_code', 200) + final_kwargs.setdefault('text', self.DEFAULT_RESP_BODY) + self.requests.register_uri(method, self.REDIRECT_CHAIN[-1], + **final_kwargs) + + def assertResponse(self, resp): + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, self.DEFAULT_RESP_BODY) + + def test_basic_get(self): + session = client_session.Session() + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[-2]) + self.assertResponse(resp) + + def test_basic_post_keeps_correct_method(self): + session = client_session.Session() + self.setup_redirects(method='POST', status_code=301) + resp = session.post(self.REDIRECT_CHAIN[-2]) + self.assertResponse(resp) + + def test_redirect_forever(self): + session = client_session.Session(redirect=True) + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertResponse(resp) + self.assertTrue(len(resp.history), len(self.REDIRECT_CHAIN)) + + def test_no_redirect(self): + session = client_session.Session(redirect=False) + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertEqual(resp.status_code, 305) + self.assertEqual(resp.url, self.REDIRECT_CHAIN[0]) + + def test_redirect_limit(self): + self.setup_redirects() + for i in (1, 2): + session = client_session.Session(redirect=i) + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertEqual(resp.status_code, 305) + self.assertEqual(resp.url, self.REDIRECT_CHAIN[i]) + self.assertEqual(resp.text, self.DEFAULT_REDIRECT_BODY) + + def test_history_matches_requests(self): + self.setup_redirects(status_code=301) + session = client_session.Session(redirect=True) + req_resp = requests.get(self.REDIRECT_CHAIN[0], + allow_redirects=True) + + ses_resp = session.get(self.REDIRECT_CHAIN[0]) + + self.assertEqual(len(req_resp.history), len(ses_resp.history)) + + for r, s in zip(req_resp.history, ses_resp.history): + self.assertEqual(r.url, s.url) + self.assertEqual(r.status_code, s.status_code) + + +class ConstructSessionFromArgsTests(utils.TestCase): + + KEY = 'keyfile' + CERT = 'certfile' + CACERT = 'cacert-path' + + def _s(self, k=None, **kwargs): + k = k or kwargs + return client_session.Session.construct(k) + + def test_verify(self): + self.assertFalse(self._s(insecure=True).verify) + self.assertTrue(self._s(verify=True, insecure=True).verify) + self.assertFalse(self._s(verify=False, insecure=True).verify) + self.assertEqual(self._s(cacert=self.CACERT).verify, self.CACERT) + + def test_cert(self): + tup = (self.CERT, self.KEY) + self.assertEqual(self._s(cert=tup).cert, tup) + self.assertEqual(self._s(cert=self.CERT, key=self.KEY).cert, tup) + self.assertIsNone(self._s(key=self.KEY).cert) + + def test_pass_through(self): + value = 42 # only a number because timeout needs to be + for key in ['timeout', 'session', 'original_ip', 'user_agent']: + args = {key: value} + self.assertEqual(getattr(self._s(args), key), value) + self.assertNotIn(key, args) + + +class AuthPlugin(base.BaseAuthPlugin): + """Very simple debug authentication plugin. + + Takes Parameters such that it can throw exceptions at the right times. + """ + + TEST_TOKEN = 'aToken' + TEST_USER_ID = 'aUser' + TEST_PROJECT_ID = 'aProject' + + SERVICE_URLS = { + 'identity': {'public': 'http://identity-public:1111/v2.0', + 'admin': 'http://identity-admin:1111/v2.0'}, + 'compute': {'public': 'http://compute-public:2222/v1.0', + 'admin': 'http://compute-admin:2222/v1.0'}, + 'image': {'public': 'http://image-public:3333/v2.0', + 'admin': 'http://image-admin:3333/v2.0'} + } + + def __init__(self, token=TEST_TOKEN, invalidate=True): + self.token = token + self._invalidate = invalidate + + def get_token(self, session): + return self.token + + def get_endpoint(self, session, service_type=None, interface=None, + **kwargs): + try: + return self.SERVICE_URLS[service_type][interface] + except (KeyError, AttributeError): + return None + + def invalidate(self): + return self._invalidate + + def get_user_id(self, session): + return self.TEST_USER_ID + + def get_project_id(self, session): + return self.TEST_PROJECT_ID + + +class CalledAuthPlugin(base.BaseAuthPlugin): + + ENDPOINT = 'http://fakeendpoint/' + + def __init__(self, invalidate=True): + self.get_token_called = False + self.get_endpoint_called = False + self.endpoint_arguments = {} + self.invalidate_called = False + self._invalidate = invalidate + + def get_token(self, session): + self.get_token_called = True + return 'aToken' + + def get_endpoint(self, session, **kwargs): + self.get_endpoint_called = True + self.endpoint_arguments = kwargs + return self.ENDPOINT + + def invalidate(self): + self.invalidate_called = True + return self._invalidate + + +class SessionAuthTests(utils.TestCase): + + TEST_URL = 'http://127.0.0.1:5000/' + TEST_JSON = {'hello': 'world'} + + def stub_service_url(self, service_type, interface, path, + method='GET', **kwargs): + base_url = AuthPlugin.SERVICE_URLS[service_type][interface] + uri = "%s/%s" % (base_url.rstrip('/'), path.lstrip('/')) + + self.requests.register_uri(method, uri, **kwargs) + + def test_auth_plugin_default_with_plugin(self): + self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON) + + # if there is an auth_plugin then it should default to authenticated + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + resp = sess.get(self.TEST_URL) + self.assertDictEqual(resp.json(), self.TEST_JSON) + + self.assertRequestHeaderEqual('X-Auth-Token', AuthPlugin.TEST_TOKEN) + + def test_auth_plugin_disable(self): + self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON) + + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + resp = sess.get(self.TEST_URL, authenticated=False) + self.assertDictEqual(resp.json(), self.TEST_JSON) + + self.assertRequestHeaderEqual('X-Auth-Token', None) + + def test_service_type_urls(self): + service_type = 'compute' + interface = 'public' + path = '/instances' + status = 200 + body = 'SUCCESS' + + self.stub_service_url(service_type=service_type, + interface=interface, + path=path, + status_code=status, + text=body) + + sess = client_session.Session(auth=AuthPlugin()) + resp = sess.get(path, + endpoint_filter={'service_type': service_type, + 'interface': interface}) + + self.assertEqual(self.requests.last_request.url, + AuthPlugin.SERVICE_URLS['compute']['public'] + path) + self.assertEqual(resp.text, body) + self.assertEqual(resp.status_code, status) + + def test_service_url_raises_if_no_auth_plugin(self): + sess = client_session.Session() + self.assertRaises(exceptions.MissingAuthPlugin, + sess.get, '/path', + endpoint_filter={'service_type': 'compute', + 'interface': 'public'}) + + def test_service_url_raises_if_no_url_returned(self): + sess = client_session.Session(auth=AuthPlugin()) + self.assertRaises(exceptions.EndpointNotFound, + sess.get, '/path', + endpoint_filter={'service_type': 'unknown', + 'interface': 'public'}) + + def test_raises_exc_only_when_asked(self): + # A request that returns a HTTP error should by default raise an + # exception by default, if you specify raise_exc=False then it will not + self.requests.get(self.TEST_URL, status_code=401) + + sess = client_session.Session() + self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL) + + resp = sess.get(self.TEST_URL, raise_exc=False) + self.assertEqual(401, resp.status_code) + + def test_passed_auth_plugin(self): + passed = CalledAuthPlugin() + sess = client_session.Session() + + self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', + status_code=200) + endpoint_filter = {'service_type': 'identity'} + + # no plugin with authenticated won't work + self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path', + authenticated=True) + + # no plugin with an endpoint filter won't work + self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path', + authenticated=False, endpoint_filter=endpoint_filter) + + resp = sess.get('path', auth=passed, endpoint_filter=endpoint_filter) + + self.assertEqual(200, resp.status_code) + self.assertTrue(passed.get_endpoint_called) + self.assertTrue(passed.get_token_called) + + def test_passed_auth_plugin_overrides(self): + fixed = CalledAuthPlugin() + passed = CalledAuthPlugin() + + sess = client_session.Session(fixed) + + self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', + status_code=200) + + resp = sess.get('path', auth=passed, + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(200, resp.status_code) + self.assertTrue(passed.get_endpoint_called) + self.assertTrue(passed.get_token_called) + self.assertFalse(fixed.get_endpoint_called) + self.assertFalse(fixed.get_token_called) + + def test_requests_auth_plugin(self): + sess = client_session.Session() + + requests_auth = object() + + FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'}) + RESP = mock.Mock(return_value=FAKE_RESP) + + with mock.patch.object(sess.session, 'request', RESP) as mocked: + sess.get(self.TEST_URL, requests_auth=requests_auth) + + mocked.assert_called_once_with('GET', self.TEST_URL, + headers=mock.ANY, + allow_redirects=mock.ANY, + auth=requests_auth, + verify=mock.ANY) + + def test_reauth_called(self): + auth = CalledAuthPlugin(invalidate=True) + sess = client_session.Session(auth=auth) + + self.requests.get(self.TEST_URL, + [{'text': 'Failed', 'status_code': 401}, + {'text': 'Hello', 'status_code': 200}]) + + # allow_reauth=True is the default + resp = sess.get(self.TEST_URL, authenticated=True) + + self.assertEqual(200, resp.status_code) + self.assertEqual('Hello', resp.text) + self.assertTrue(auth.invalidate_called) + + def test_reauth_not_called(self): + auth = CalledAuthPlugin(invalidate=True) + sess = client_session.Session(auth=auth) + + self.requests.get(self.TEST_URL, + [{'text': 'Failed', 'status_code': 401}, + {'text': 'Hello', 'status_code': 200}]) + + self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL, + authenticated=True, allow_reauth=False) + self.assertFalse(auth.invalidate_called) + + def test_endpoint_override_overrides_filter(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + override_base = 'http://mytest/' + path = 'path' + override_url = override_base + path + resp_text = uuid.uuid4().hex + + self.requests.get(override_url, text=resp_text) + + resp = sess.get(path, + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(resp_text, resp.text) + self.assertEqual(override_url, self.requests.last_request.url) + + self.assertTrue(auth.get_token_called) + self.assertFalse(auth.get_endpoint_called) + + def test_endpoint_override_ignore_full_url(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + path = 'path' + url = self.TEST_URL + path + + resp_text = uuid.uuid4().hex + self.requests.get(url, text=resp_text) + + resp = sess.get(url, + endpoint_override='http://someother.url', + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(resp_text, resp.text) + self.assertEqual(url, self.requests.last_request.url) + + self.assertTrue(auth.get_token_called) + self.assertFalse(auth.get_endpoint_called) + + def test_user_and_project_id(self): + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + + self.assertEqual(auth.TEST_USER_ID, sess.get_user_id()) + self.assertEqual(auth.TEST_PROJECT_ID, sess.get_project_id()) + + +class AdapterTest(utils.TestCase): + + SERVICE_TYPE = uuid.uuid4().hex + SERVICE_NAME = uuid.uuid4().hex + INTERFACE = uuid.uuid4().hex + REGION_NAME = uuid.uuid4().hex + USER_AGENT = uuid.uuid4().hex + VERSION = uuid.uuid4().hex + + TEST_URL = CalledAuthPlugin.ENDPOINT + + def _create_loaded_adapter(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + return adapter.Adapter(sess, + auth=auth, + service_type=self.SERVICE_TYPE, + service_name=self.SERVICE_NAME, + interface=self.INTERFACE, + region_name=self.REGION_NAME, + user_agent=self.USER_AGENT, + version=self.VERSION) + + def _verify_endpoint_called(self, adpt): + self.assertEqual(self.SERVICE_TYPE, + adpt.auth.endpoint_arguments['service_type']) + self.assertEqual(self.SERVICE_NAME, + adpt.auth.endpoint_arguments['service_name']) + self.assertEqual(self.INTERFACE, + adpt.auth.endpoint_arguments['interface']) + self.assertEqual(self.REGION_NAME, + adpt.auth.endpoint_arguments['region_name']) + self.assertEqual(self.VERSION, + adpt.auth.endpoint_arguments['version']) + + def test_setting_variables_on_request(self): + response = uuid.uuid4().hex + self.stub_url('GET', text=response) + adpt = self._create_loaded_adapter() + resp = adpt.get('/') + self.assertEqual(resp.text, response) + + self._verify_endpoint_called(adpt) + self.assertTrue(adpt.auth.get_token_called) + self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT) + + def test_setting_variables_on_get_endpoint(self): + adpt = self._create_loaded_adapter() + url = adpt.get_endpoint() + + self.assertEqual(self.TEST_URL, url) + self._verify_endpoint_called(adpt) + + def test_legacy_binding(self): + key = uuid.uuid4().hex + val = uuid.uuid4().hex + response = jsonutils.dumps({key: val}) + + self.stub_url('GET', text=response) + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.LegacyJsonAdapter(sess, + service_type=self.SERVICE_TYPE, + user_agent=self.USER_AGENT) + + resp, body = adpt.get('/') + self.assertEqual(self.SERVICE_TYPE, + auth.endpoint_arguments['service_type']) + self.assertEqual(resp.text, response) + self.assertEqual(val, body[key]) + + def test_legacy_binding_non_json_resp(self): + response = uuid.uuid4().hex + self.stub_url('GET', text=response, + headers={'Content-Type': 'text/html'}) + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.LegacyJsonAdapter(sess, + service_type=self.SERVICE_TYPE, + user_agent=self.USER_AGENT) + + resp, body = adpt.get('/') + self.assertEqual(self.SERVICE_TYPE, + auth.endpoint_arguments['service_type']) + self.assertEqual(resp.text, response) + self.assertIsNone(body) + + def test_methods(self): + sess = client_session.Session() + adpt = adapter.Adapter(sess) + url = 'http://url' + + for method in ['get', 'head', 'post', 'put', 'patch', 'delete']: + with mock.patch.object(adpt, 'request') as m: + getattr(adpt, method)(url) + m.assert_called_once_with(url, method.upper()) + + def test_setting_endpoint_override(self): + endpoint_override = 'http://overrideurl' + path = '/path' + endpoint_url = endpoint_override + path + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.Adapter(sess, endpoint_override=endpoint_override) + + response = uuid.uuid4().hex + self.requests.get(endpoint_url, text=response) + + resp = adpt.get(path) + + self.assertEqual(response, resp.text) + self.assertEqual(endpoint_url, self.requests.last_request.url) + + self.assertEqual(endpoint_override, adpt.get_endpoint()) + + def test_adapter_invalidate(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + adpt.invalidate() + + self.assertTrue(auth.invalidate_called) + + def test_adapter_get_token(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + self.assertEqual(self.TEST_TOKEN, adpt.get_token()) + self.assertTrue(auth.get_token_called) + + def test_adapter_connect_retries(self): + retries = 2 + sess = client_session.Session() + adpt = adapter.Adapter(sess, connect_retries=retries) + + def _refused_error(request, context): + raise requests.exceptions.ConnectionError() + + self.stub_url('GET', text=_refused_error) + + with mock.patch('time.sleep') as m: + self.assertRaises(exceptions.ConnectionRefused, + adpt.get, self.TEST_URL) + self.assertEqual(retries, m.call_count) + + # we count retries so there will be one initial request + 2 retries + self.assertThat(self.requests.request_history, + matchers.HasLength(retries + 1)) + + def test_user_and_project_id(self): + auth = AuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + self.assertEqual(auth.TEST_USER_ID, adpt.get_user_id()) + self.assertEqual(auth.TEST_PROJECT_ID, adpt.get_project_id()) + + +class ConfLoadingTests(utils.TestCase): + + GROUP = 'sessiongroup' + + def setUp(self): + super(ConfLoadingTests, self).setUp() + + self.conf_fixture = self.useFixture(config.Config()) + client_session.Session.register_conf_options(self.conf_fixture.conf, + self.GROUP) + + def config(self, **kwargs): + kwargs['group'] = self.GROUP + self.conf_fixture.config(**kwargs) + + def get_session(self, **kwargs): + return client_session.Session.load_from_conf_options( + self.conf_fixture.conf, + self.GROUP, + **kwargs) + + def test_insecure_timeout(self): + self.config(insecure=True, timeout=5) + s = self.get_session() + + self.assertFalse(s.verify) + self.assertEqual(5, s.timeout) + + def test_client_certs(self): + cert = '/path/to/certfile' + key = '/path/to/keyfile' + + self.config(certfile=cert, keyfile=key) + s = self.get_session() + + self.assertTrue(s.verify) + self.assertEqual((cert, key), s.cert) + + def test_cacert(self): + cafile = '/path/to/cacert' + + self.config(cafile=cafile) + s = self.get_session() + + self.assertEqual(cafile, s.verify) + + def test_deprecated(self): + def new_deprecated(): + return cfg.DeprecatedOpt(uuid.uuid4().hex, group=uuid.uuid4().hex) + + opt_names = ['cafile', 'certfile', 'keyfile', 'insecure', 'timeout'] + depr = dict([(n, [new_deprecated()]) for n in opt_names]) + opts = client_session.Session.get_conf_options(deprecated_opts=depr) + + self.assertThat(opt_names, matchers.HasLength(len(opts))) + for opt in opts: + self.assertIn(depr[opt.name][0], opt.deprecated_opts) + + +class CliLoadingTests(utils.TestCase): + + def setUp(self): + super(CliLoadingTests, self).setUp() + + self.parser = argparse.ArgumentParser() + client_session.Session.register_cli_options(self.parser) + + def get_session(self, val, **kwargs): + args = self.parser.parse_args(val.split()) + return client_session.Session.load_from_cli_options(args, **kwargs) + + def test_insecure_timeout(self): + s = self.get_session('--insecure --timeout 5.5') + + self.assertFalse(s.verify) + self.assertEqual(5.5, s.timeout) + + def test_client_certs(self): + cert = '/path/to/certfile' + key = '/path/to/keyfile' + + s = self.get_session('--os-cert %s --os-key %s' % (cert, key)) + + self.assertTrue(s.verify) + self.assertEqual((cert, key), s.cert) + + def test_cacert(self): + cacert = '/path/to/cacert' + + s = self.get_session('--os-cacert %s' % cacert) + + self.assertEqual(cacert, s.verify)