Move tests to the unit subdirectory

Move all the existing tests to the unit/ subdirectory. This gives us
some room to add a functional/ directory later with other tests.

Change-Id: I0fb8d5b628eb8ee1f35f05f42d0c0ac9f285e8c3
Implements: functional-testing
This commit is contained in:
Jamie Lennox 2015-02-11 19:03:25 +11:00
parent 5790f49dca
commit 51e8842af2
14 changed files with 3729 additions and 0 deletions

View File

@ -0,0 +1,61 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneclient import access
from keystoneclient import auth
from keystoneclient.auth.identity import access as access_plugin
from keystoneclient import fixture
from keystoneclient import session
from keystoneclient.tests.unit import utils
class AccessInfoPluginTests(utils.TestCase):
def setUp(self):
super(AccessInfoPluginTests, self).setUp()
self.session = session.Session()
self.auth_token = uuid.uuid4().hex
def _plugin(self, **kwargs):
token = fixture.V3Token()
s = token.add_service('identity')
s.add_standard_endpoints(public=self.TEST_ROOT_URL)
auth_ref = access.AccessInfo.factory(body=token,
auth_token=self.auth_token)
return access_plugin.AccessInfoPlugin(auth_ref, **kwargs)
def test_auth_ref(self):
plugin = self._plugin()
self.assertEqual(self.TEST_ROOT_URL,
plugin.get_endpoint(self.session,
service_type='identity',
interface='public'))
self.assertEqual(self.auth_token, plugin.get_token(session))
def test_auth_url(self):
auth_url = 'http://keystone.test.url'
plugin = self._plugin(auth_url=auth_url)
self.assertEqual(auth_url,
plugin.get_endpoint(self.session,
interface=auth.AUTH_INTERFACE))
def test_invalidate(self):
plugin = self._plugin()
auth_ref = plugin.auth_ref
self.assertIsInstance(auth_ref, access.AccessInfo)
self.assertFalse(plugin.invalidate())
self.assertIs(auth_ref, plugin.auth_ref)

View File

@ -0,0 +1,196 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import uuid
import fixtures
import mock
from oslo_config import cfg
from keystoneclient.auth import base
from keystoneclient.auth import cli
from keystoneclient.tests.unit.auth import utils
class TesterPlugin(base.BaseAuthPlugin):
def get_token(self, *args, **kwargs):
return None
@classmethod
def get_options(cls):
# NOTE(jamielennox): this is kind of horrible. If you specify this as
# a deprecated_name= value it will convert - to _ which is not what we
# want for a CLI option.
deprecated = [cfg.DeprecatedOpt('test-other')]
return [
cfg.StrOpt('test-opt', help='tester', deprecated_opts=deprecated)
]
class CliTests(utils.TestCase):
def setUp(self):
super(CliTests, self).setUp()
self.p = argparse.ArgumentParser()
def env(self, name, value=None):
if value is not None:
# environment variables are always strings
value = str(value)
return self.useFixture(fixtures.EnvironmentVariable(name, value))
def test_creating_with_no_args(self):
ret = cli.register_argparse_arguments(self.p, [])
self.assertIsNone(ret)
self.assertIn('--os-auth-plugin', self.p.format_usage())
def test_load_with_nothing(self):
cli.register_argparse_arguments(self.p, [])
opts = self.p.parse_args([])
self.assertIsNone(cli.load_from_argparse_arguments(opts))
@utils.mock_plugin
def test_basic_params_added(self, m):
name = uuid.uuid4().hex
argv = ['--os-auth-plugin', name]
ret = cli.register_argparse_arguments(self.p, argv)
self.assertIs(utils.MockPlugin, ret)
for n in ('--os-a-int', '--os-a-bool', '--os-a-float'):
self.assertIn(n, self.p.format_usage())
m.assert_called_once_with(name)
@utils.mock_plugin
def test_param_loading(self, m):
name = uuid.uuid4().hex
argv = ['--os-auth-plugin', name,
'--os-a-int', str(self.a_int),
'--os-a-float', str(self.a_float),
'--os-a-bool', str(self.a_bool)]
klass = cli.register_argparse_arguments(self.p, argv)
self.assertIs(utils.MockPlugin, klass)
opts = self.p.parse_args(argv)
self.assertEqual(name, opts.os_auth_plugin)
a = cli.load_from_argparse_arguments(opts)
self.assertTestVals(a)
self.assertEqual(name, opts.os_auth_plugin)
self.assertEqual(str(self.a_int), opts.os_a_int)
self.assertEqual(str(self.a_float), opts.os_a_float)
self.assertEqual(str(self.a_bool), opts.os_a_bool)
@utils.mock_plugin
def test_default_options(self, m):
name = uuid.uuid4().hex
argv = ['--os-auth-plugin', name,
'--os-a-float', str(self.a_float)]
klass = cli.register_argparse_arguments(self.p, argv)
self.assertIs(utils.MockPlugin, klass)
opts = self.p.parse_args(argv)
self.assertEqual(name, opts.os_auth_plugin)
a = cli.load_from_argparse_arguments(opts)
self.assertEqual(self.a_float, a['a_float'])
self.assertEqual(3, a['a_int'])
@utils.mock_plugin
def test_with_default_string_value(self, m):
name = uuid.uuid4().hex
klass = cli.register_argparse_arguments(self.p, [], default=name)
self.assertIs(utils.MockPlugin, klass)
m.assert_called_once_with(name)
@utils.mock_plugin
def test_overrides_default_string_value(self, m):
name = uuid.uuid4().hex
default = uuid.uuid4().hex
argv = ['--os-auth-plugin', name]
klass = cli.register_argparse_arguments(self.p, argv, default=default)
self.assertIs(utils.MockPlugin, klass)
m.assert_called_once_with(name)
@utils.mock_plugin
def test_with_default_type_value(self, m):
klass = cli.register_argparse_arguments(self.p, [],
default=utils.MockPlugin)
self.assertIs(utils.MockPlugin, klass)
self.assertEqual(0, m.call_count)
@utils.mock_plugin
def test_overrides_default_type_value(self, m):
# using this test plugin would fail if called because there
# is no get_options() function
class TestPlugin(object):
pass
name = uuid.uuid4().hex
argv = ['--os-auth-plugin', name]
klass = cli.register_argparse_arguments(self.p, argv,
default=TestPlugin)
self.assertIs(utils.MockPlugin, klass)
m.assert_called_once_with(name)
@utils.mock_plugin
def test_env_overrides_default_opt(self, m):
name = uuid.uuid4().hex
val = uuid.uuid4().hex
self.env('OS_A_STR', val)
klass = cli.register_argparse_arguments(self.p, [], default=name)
opts = self.p.parse_args([])
a = klass.load_from_argparse_arguments(opts)
self.assertEqual(val, a['a_str'])
def test_deprecated_cli_options(self):
TesterPlugin.register_argparse_arguments(self.p)
val = uuid.uuid4().hex
opts = self.p.parse_args(['--os-test-other', val])
self.assertEqual(val, opts.os_test_opt)
def test_deprecated_multi_cli_options(self):
TesterPlugin.register_argparse_arguments(self.p)
val1 = uuid.uuid4().hex
val2 = uuid.uuid4().hex
# argarse rules say that the last specified wins.
opts = self.p.parse_args(['--os-test-other', val2,
'--os-test-opt', val1])
self.assertEqual(val1, opts.os_test_opt)
def test_deprecated_env_options(self):
val = uuid.uuid4().hex
with mock.patch.dict('os.environ', {'OS_TEST_OTHER': val}):
TesterPlugin.register_argparse_arguments(self.p)
opts = self.p.parse_args([])
self.assertEqual(val, opts.os_test_opt)
def test_deprecated_env_multi_options(self):
val1 = uuid.uuid4().hex
val2 = uuid.uuid4().hex
with mock.patch.dict('os.environ', {'OS_TEST_OPT': val1,
'OS_TEST_OTHER': val2}):
TesterPlugin.register_argparse_arguments(self.p)
opts = self.p.parse_args([])
self.assertEqual(val1, opts.os_test_opt)

View File

@ -0,0 +1,177 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from oslo_config import cfg
from oslo_config import fixture as config
import stevedore
from keystoneclient.auth import base
from keystoneclient.auth import conf
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import exceptions
from keystoneclient.tests.unit.auth import utils
class ConfTests(utils.TestCase):
def setUp(self):
super(ConfTests, self).setUp()
self.conf_fixture = self.useFixture(config.Config())
# NOTE(jamielennox): we register the basic config options first because
# we need them in place before we can stub them. We will need to run
# the register again after we stub the auth section and auth plugin so
# it can load the plugin specific options.
conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
def test_loading_v2(self):
section = uuid.uuid4().hex
username = uuid.uuid4().hex
password = uuid.uuid4().hex
trust_id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
self.conf_fixture.config(auth_section=section, group=self.GROUP)
conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
self.conf_fixture.register_opts(v2_auth.Password.get_options(),
group=section)
self.conf_fixture.config(auth_plugin=self.V2PASS,
username=username,
password=password,
trust_id=trust_id,
tenant_id=tenant_id,
group=section)
a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
self.assertEqual(username, a.username)
self.assertEqual(password, a.password)
self.assertEqual(trust_id, a.trust_id)
self.assertEqual(tenant_id, a.tenant_id)
def test_loading_v3(self):
section = uuid.uuid4().hex
token = uuid.uuid4().hex
trust_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
project_domain_name = uuid.uuid4().hex
self.conf_fixture.config(auth_section=section, group=self.GROUP)
conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
self.conf_fixture.register_opts(v3_auth.Token.get_options(),
group=section)
self.conf_fixture.config(auth_plugin=self.V3TOKEN,
token=token,
trust_id=trust_id,
project_id=project_id,
project_domain_name=project_domain_name,
group=section)
a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
self.assertEqual(token, a.auth_methods[0].token)
self.assertEqual(trust_id, a.trust_id)
self.assertEqual(project_id, a.project_id)
self.assertEqual(project_domain_name, a.project_domain_name)
def test_loading_invalid_plugin(self):
auth_plugin = uuid.uuid4().hex
self.conf_fixture.config(auth_plugin=auth_plugin,
group=self.GROUP)
e = self.assertRaises(exceptions.NoMatchingPlugin,
conf.load_from_conf_options,
self.conf_fixture.conf,
self.GROUP)
self.assertEqual(auth_plugin, e.name)
def test_loading_with_no_data(self):
self.assertIsNone(conf.load_from_conf_options(self.conf_fixture.conf,
self.GROUP))
@mock.patch('stevedore.DriverManager')
def test_other_params(self, m):
m.return_value = utils.MockManager(utils.MockPlugin)
driver_name = uuid.uuid4().hex
self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
group=self.GROUP)
self.conf_fixture.config(auth_plugin=driver_name,
group=self.GROUP,
**self.TEST_VALS)
a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
self.assertTestVals(a)
m.assert_called_once_with(namespace=base.PLUGIN_NAMESPACE,
name=driver_name,
invoke_on_load=False)
@utils.mock_plugin
def test_same_section(self, m):
self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
group=self.GROUP)
conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
self.conf_fixture.config(auth_plugin=uuid.uuid4().hex,
group=self.GROUP,
**self.TEST_VALS)
a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
self.assertTestVals(a)
@utils.mock_plugin
def test_diff_section(self, m):
section = uuid.uuid4().hex
self.conf_fixture.config(auth_section=section, group=self.GROUP)
conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
group=section)
self.conf_fixture.config(group=section,
auth_plugin=uuid.uuid4().hex,
**self.TEST_VALS)
a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
self.assertTestVals(a)
def test_plugins_are_all_opts(self):
manager = stevedore.ExtensionManager(base.PLUGIN_NAMESPACE,
invoke_on_load=False,
propagate_map_exceptions=True)
def inner(driver):
for p in driver.plugin.get_options():
self.assertIsInstance(p, cfg.Opt)
manager.map(inner)
def test_get_common(self):
opts = conf.get_common_conf_options()
for opt in opts:
self.assertIsInstance(opt, cfg.Opt)
self.assertEqual(2, len(opts))
def test_get_named(self):
loaded_opts = conf.get_plugin_options('v2password')
plugin_opts = v2_auth.Password.get_options()
self.assertEqual(plugin_opts, loaded_opts)

View File

@ -0,0 +1,422 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
import uuid
from oslo_utils import timeutils
import six
from keystoneclient import access
from keystoneclient.auth import base
from keystoneclient.auth import identity
from keystoneclient import fixture
from keystoneclient import session
from keystoneclient.tests.unit import utils
@six.add_metaclass(abc.ABCMeta)
class CommonIdentityTests(object):
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_COMPUTE_PUBLIC = 'http://nova/novapi/public'
TEST_COMPUTE_INTERNAL = 'http://nova/novapi/internal'
TEST_COMPUTE_ADMIN = 'http://nova/novapi/admin'
TEST_PASS = uuid.uuid4().hex
def setUp(self):
super(CommonIdentityTests, self).setUp()
self.TEST_URL = '%s%s' % (self.TEST_ROOT_URL, self.version)
self.TEST_ADMIN_URL = '%s%s' % (self.TEST_ROOT_ADMIN_URL, self.version)
self.TEST_DISCOVERY = fixture.DiscoveryList(href=self.TEST_ROOT_URL)
self.stub_auth_data()
@abc.abstractmethod
def create_auth_plugin(self, **kwargs):
"""Create an auth plugin that makes sense for the auth data.
It doesn't really matter what auth mechanism is used but it should be
appropriate to the API version.
"""
@abc.abstractmethod
def get_auth_data(self, **kwargs):
"""Return fake authentication data.
This should register a valid token response and ensure that the compute
endpoints are set to TEST_COMPUTE_PUBLIC, _INTERNAL and _ADMIN.
"""
def stub_auth_data(self, **kwargs):
token = self.get_auth_data(**kwargs)
self.user_id = token.user_id
try:
self.project_id = token.project_id
except AttributeError:
self.project_id = token.tenant_id
self.stub_auth(json=token)
@abc.abstractproperty
def version(self):
"""The API version being tested."""
def test_discovering(self):
self.stub_url('GET', [],
base_url=self.TEST_COMPUTE_ADMIN,
json=self.TEST_DISCOVERY)
body = 'SUCCESS'
# which gives our sample values
self.stub_url('GET', ['path'], text=body)
a = self.create_auth_plugin()
s = session.Session(auth=a)
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
'interface': 'admin',
'version': self.version})
self.assertEqual(200, resp.status_code)
self.assertEqual(body, resp.text)
new_body = 'SC SUCCESS'
# if we don't specify a version, we use the URL from the SC
self.stub_url('GET', ['path'],
base_url=self.TEST_COMPUTE_ADMIN,
text=new_body)
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
'interface': 'admin'})
self.assertEqual(200, resp.status_code)
self.assertEqual(new_body, resp.text)
def test_discovery_uses_session_cache(self):
# register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body)
# now either of the two plugins I use, it should not cause a second
# request to the discovery url.
s = session.Session()
a = self.create_auth_plugin()
b = self.create_auth_plugin()
for auth in (a, b):
resp = s.get('/path',
auth=auth,
endpoint_filter={'service_type': 'compute',
'interface': 'admin',
'version': self.version})
self.assertEqual(200, resp.status_code)
self.assertEqual(body, resp.text)
def test_discovery_uses_plugin_cache(self):
# register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body)
# now either of the two sessions I use, it should not cause a second
# request to the discovery url.
sa = session.Session()
sb = session.Session()
auth = self.create_auth_plugin()
for sess in (sa, sb):
resp = sess.get('/path',
auth=auth,
endpoint_filter={'service_type': 'compute',
'interface': 'admin',
'version': self.version})
self.assertEqual(200, resp.status_code)
self.assertEqual(body, resp.text)
def test_discovering_with_no_data(self):
# which returns discovery information pointing to TEST_URL but there is
# no data there.
self.stub_url('GET', [],
base_url=self.TEST_COMPUTE_ADMIN,
status_code=400)
# so the url that will be used is the same TEST_COMPUTE_ADMIN
body = 'SUCCESS'
self.stub_url('GET', ['path'], base_url=self.TEST_COMPUTE_ADMIN,
text=body, status_code=200)
a = self.create_auth_plugin()
s = session.Session(auth=a)
resp = s.get('/path', endpoint_filter={'service_type': 'compute',
'interface': 'admin',
'version': self.version})
self.assertEqual(200, resp.status_code)
self.assertEqual(body, resp.text)
def test_asking_for_auth_endpoint_ignores_checks(self):
a = self.create_auth_plugin()
s = session.Session(auth=a)
auth_url = s.get_endpoint(service_type='compute',
interface=base.AUTH_INTERFACE)
self.assertEqual(self.TEST_URL, auth_url)
def _create_expired_auth_plugin(self, **kwargs):
expires = timeutils.utcnow() - datetime.timedelta(minutes=20)
expired_token = self.get_auth_data(expires=expires)
expired_auth_ref = access.AccessInfo.factory(body=expired_token)
body = 'SUCCESS'
self.stub_url('GET', ['path'],
base_url=self.TEST_COMPUTE_ADMIN, text=body)
a = self.create_auth_plugin(**kwargs)
a.auth_ref = expired_auth_ref
return a
def test_reauthenticate(self):
a = self._create_expired_auth_plugin()
expired_auth_ref = a.auth_ref
s = session.Session(auth=a)
self.assertIsNot(expired_auth_ref, a.get_access(s))
def test_no_reauthenticate(self):
a = self._create_expired_auth_plugin(reauthenticate=False)
expired_auth_ref = a.auth_ref
s = session.Session(auth=a)
self.assertIs(expired_auth_ref, a.get_access(s))
def test_invalidate(self):
a = self.create_auth_plugin()
s = session.Session(auth=a)
# trigger token fetching
s.get_auth_headers()
self.assertTrue(a.auth_ref)
self.assertTrue(a.invalidate())
self.assertIsNone(a.auth_ref)
self.assertFalse(a.invalidate())
def test_get_auth_properties(self):
a = self.create_auth_plugin()
s = session.Session()
self.assertEqual(self.user_id, a.get_user_id(s))
self.assertEqual(self.project_id, a.get_project_id(s))
class V3(CommonIdentityTests, utils.TestCase):
@property
def version(self):
return 'v3'
def get_auth_data(self, **kwargs):
token = fixture.V3Token(**kwargs)
region = 'RegionOne'
svc = token.add_service('identity')
svc.add_standard_endpoints(admin=self.TEST_ADMIN_URL, region=region)
svc = token.add_service('compute')
svc.add_standard_endpoints(admin=self.TEST_COMPUTE_ADMIN,
public=self.TEST_COMPUTE_PUBLIC,
internal=self.TEST_COMPUTE_INTERNAL,
region=region)
return token
def stub_auth(self, subject_token=None, **kwargs):
if not subject_token:
subject_token = self.TEST_TOKEN
kwargs.setdefault('headers', {})['X-Subject-Token'] = subject_token
self.stub_url('POST', ['auth', 'tokens'], **kwargs)
def create_auth_plugin(self, **kwargs):
kwargs.setdefault('auth_url', self.TEST_URL)
kwargs.setdefault('username', self.TEST_USER)
kwargs.setdefault('password', self.TEST_PASS)
return identity.V3Password(**kwargs)
class V2(CommonIdentityTests, utils.TestCase):
@property
def version(self):
return 'v2.0'
def create_auth_plugin(self, **kwargs):
kwargs.setdefault('auth_url', self.TEST_URL)
kwargs.setdefault('username', self.TEST_USER)
kwargs.setdefault('password', self.TEST_PASS)
return identity.V2Password(**kwargs)
def get_auth_data(self, **kwargs):
token = fixture.V2Token(**kwargs)
region = 'RegionOne'
svc = token.add_service('identity')
svc.add_endpoint(self.TEST_ADMIN_URL, region=region)
svc = token.add_service('compute')
svc.add_endpoint(public=self.TEST_COMPUTE_PUBLIC,
internal=self.TEST_COMPUTE_INTERNAL,
admin=self.TEST_COMPUTE_ADMIN,
region=region)
return token
def stub_auth(self, **kwargs):
self.stub_url('POST', ['tokens'], **kwargs)
class CatalogHackTests(utils.TestCase):
TEST_URL = 'http://keystone.server:5000/v2.0'
OTHER_URL = 'http://other.server:5000/path'
IDENTITY = 'identity'
BASE_URL = 'http://keystone.server:5000/'
V2_URL = BASE_URL + 'v2.0'
V3_URL = BASE_URL + 'v3'
def test_getting_endpoints(self):
disc = fixture.DiscoveryList(href=self.BASE_URL)
self.stub_url('GET',
['/'],
base_url=self.BASE_URL,
json=disc)
token = fixture.V2Token()
service = token.add_service(self.IDENTITY)
service.add_endpoint(public=self.V2_URL,
admin=self.V2_URL,
internal=self.V2_URL)
self.stub_url('POST',
['tokens'],
base_url=self.V2_URL,
json=token)
v2_auth = identity.V2Password(self.V2_URL,
username=uuid.uuid4().hex,
password=uuid.uuid4().hex)
sess = session.Session(auth=v2_auth)
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
interface='public',
version=(3, 0))
self.assertEqual(self.V3_URL, endpoint)
def test_returns_original_when_discover_fails(self):
token = fixture.V2Token()
service = token.add_service(self.IDENTITY)
service.add_endpoint(public=self.V2_URL,
admin=self.V2_URL,
internal=self.V2_URL)
self.stub_url('POST',
['tokens'],
base_url=self.V2_URL,
json=token)
self.stub_url('GET', [], base_url=self.BASE_URL, status_code=404)
v2_auth = identity.V2Password(self.V2_URL,
username=uuid.uuid4().hex,
password=uuid.uuid4().hex)
sess = session.Session(auth=v2_auth)
endpoint = sess.get_endpoint(service_type=self.IDENTITY,
interface='public',
version=(3, 0))
self.assertEqual(self.V2_URL, endpoint)
class GenericPlugin(base.BaseAuthPlugin):
BAD_TOKEN = uuid.uuid4().hex
def __init__(self):
super(GenericPlugin, self).__init__()
self.endpoint = 'http://keystone.host:5000'
self.headers = {'headerA': 'valueA',
'headerB': 'valueB'}
def url(self, prefix):
return '%s/%s' % (self.endpoint, prefix)
def get_token(self, session, **kwargs):
# NOTE(jamielennox): by specifying get_headers this should not be used
return self.BAD_TOKEN
def get_headers(self, session, **kwargs):
return self.headers
def get_endpoint(self, session, **kwargs):
return self.endpoint
class GenericAuthPluginTests(utils.TestCase):
# filter doesn't matter to GenericPlugin, but we have to specify one
ENDPOINT_FILTER = {uuid.uuid4().hex: uuid.uuid4().hex}
def setUp(self):
super(GenericAuthPluginTests, self).setUp()
self.auth = GenericPlugin()
self.session = session.Session(auth=self.auth)
def test_setting_headers(self):
text = uuid.uuid4().hex
self.stub_url('GET', base_url=self.auth.url('prefix'), text=text)
resp = self.session.get('prefix', endpoint_filter=self.ENDPOINT_FILTER)
self.assertEqual(text, resp.text)
for k, v in six.iteritems(self.auth.headers):
self.assertRequestHeaderEqual(k, v)
self.assertIsNone(self.session.get_token())
self.assertEqual(self.auth.headers,
self.session.get_auth_headers())
self.assertNotIn('X-Auth-Token', self.requests.last_request.headers)

View File

@ -0,0 +1,295 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import uuid
from keystoneclient.auth.identity import v2
from keystoneclient import exceptions
from keystoneclient import session
from keystoneclient.tests.unit import utils
class V2IdentityPlugin(utils.TestCase):
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0')
TEST_PASS = 'password'
TEST_SERVICE_CATALOG = [{
"endpoints": [{
"adminURL": "http://cdn.admin-nets.local:8774/v1.0",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8774/v1.0",
"publicURL": "http://cdn.admin-nets.local:8774/v1.0/"
}],
"type": "nova_compat",
"name": "nova_compat"
}, {
"endpoints": [{
"adminURL": "http://nova/novapi/admin",
"region": "RegionOne",
"internalURL": "http://nova/novapi/internal",
"publicURL": "http://nova/novapi/public"
}],
"type": "compute",
"name": "nova"
}, {
"endpoints": [{
"adminURL": "http://glance/glanceapi/admin",
"region": "RegionOne",
"internalURL": "http://glance/glanceapi/internal",
"publicURL": "http://glance/glanceapi/public"
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"adminURL": TEST_ADMIN_URL,
"region": "RegionOne",
"internalURL": "http://127.0.0.1:5000/v2.0",
"publicURL": "http://127.0.0.1:5000/v2.0"
}],
"type": "identity",
"name": "keystone"
}, {
"endpoints": [{
"adminURL": "http://swift/swiftapi/admin",
"region": "RegionOne",
"internalURL": "http://swift/swiftapi/internal",
"publicURL": "http://swift/swiftapi/public"
}],
"type": "object-store",
"name": "swift"
}]
def setUp(self):
super(V2IdentityPlugin, self).setUp()
self.TEST_RESPONSE_DICT = {
"access": {
"token": {
"expires": "2020-01-01T00:00:10.000123Z",
"id": self.TEST_TOKEN,
"tenant": {
"id": self.TEST_TENANT_ID
},
},
"user": {
"id": self.TEST_USER
},
"serviceCatalog": self.TEST_SERVICE_CATALOG,
},
}
def stub_auth(self, **kwargs):
self.stub_url('POST', ['tokens'], **kwargs)
def test_authenticate_with_username_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
self.assertIsNone(a.user_id)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
'password': self.TEST_PASS}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_user_id_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
password=self.TEST_PASS)
self.assertIsNone(a.username)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
'password': self.TEST_PASS}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_username_password_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
self.assertIsNone(a.user_id)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
'password': self.TEST_PASS},
'tenantId': self.TEST_TENANT_ID}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_user_id_password_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
self.assertIsNone(a.username)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
'password': self.TEST_PASS},
'tenantId': self.TEST_TENANT_ID}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_token(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Token(self.TEST_URL, 'foo')
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'token': {'id': 'foo'}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('x-Auth-Token', 'foo')
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_with_trust_id(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, trust_id='trust')
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
'password': self.TEST_PASS},
'trust_id': 'trust'}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def _do_service_url_test(self, base_url, endpoint_filter):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
self.stub_url('GET', ['path'],
base_url=base_url,
text='SUCCESS', status_code=200)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
resp = s.get('/path', endpoint_filter=endpoint_filter)
self.assertEqual(resp.status_code, 200)
self.assertEqual(self.requests.last_request.url, base_url + '/path')
def test_service_url(self):
endpoint_filter = {'service_type': 'compute',
'interface': 'admin',
'service_name': 'nova'}
self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)
def test_service_url_defaults_to_public(self):
endpoint_filter = {'service_type': 'compute'}
self._do_service_url_test('http://nova/novapi/public', endpoint_filter)
def test_endpoint_filter_without_service_type_fails(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
endpoint_filter={'interface': 'admin'})
def test_full_url_overrides_endpoint_filter(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
self.stub_url('GET', [],
base_url='http://testurl/',
text='SUCCESS', status_code=200)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
resp = s.get('http://testurl/',
endpoint_filter={'service_type': 'compute'})
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.text, 'SUCCESS')
def test_invalid_auth_response_dict(self):
self.stub_auth(json={'hello': 'world'})
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
authenticated=True)
def test_invalid_auth_response_type(self):
self.stub_url('POST', ['tokens'], text='testdata')
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
authenticated=True)
def test_invalidate_response(self):
resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT)
resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT)
resp_data1['access']['token']['id'] = 'token1'
resp_data2['access']['token']['id'] = 'token2'
auth_responses = [{'json': resp_data1}, {'json': resp_data2}]
self.stub_auth(response_list=auth_responses)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertEqual('token1', s.get_token())
self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())
a.invalidate()
self.assertEqual('token2', s.get_token())
self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())
def test_doesnt_log_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
password = uuid.uuid4().hex
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=password)
s = session.Session(auth=a)
self.assertEqual(self.TEST_TOKEN, s.get_token())
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
self.assertNotIn(password, self.logger.output)
def test_password_with_no_user_id_or_name(self):
self.assertRaises(TypeError,
v2.Password, self.TEST_URL, password=self.TEST_PASS)

View File

@ -0,0 +1,490 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import uuid
from keystoneclient import access
from keystoneclient.auth.identity import v3
from keystoneclient import client
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
from keystoneclient.tests.unit import utils
class V3IdentityPlugin(utils.TestCase):
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')
TEST_PASS = 'password'
TEST_SERVICE_CATALOG = [{
"endpoints": [{
"url": "http://cdn.admin-nets.local:8774/v1.0/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://127.0.0.1:8774/v1.0",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://cdn.admin-nets.local:8774/v1.0",
"region": "RegionOne",
"interface": "admin"
}],
"type": "nova_compat"
}, {
"endpoints": [{
"url": "http://nova/novapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://nova/novapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://nova/novapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "compute",
"name": "nova",
}, {
"endpoints": [{
"url": "http://glance/glanceapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://glance/glanceapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://glance/glanceapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"url": "http://127.0.0.1:5000/v3",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://127.0.0.1:5000/v3",
"region": "RegionOne",
"interface": "internal"
}, {
"url": TEST_ADMIN_URL,
"region": "RegionOne",
"interface": "admin"
}],
"type": "identity"
}, {
"endpoints": [{
"url": "http://swift/swiftapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://swift/swiftapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://swift/swiftapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "object-store"
}]
def setUp(self):
super(V3IdentityPlugin, self).setUp()
V3_URL = "%sv3" % self.TEST_URL
self.TEST_DISCOVERY_RESPONSE = {
'versions': {'values': [fixture.V3Discovery(V3_URL)]}}
self.TEST_RESPONSE_DICT = {
"token": {
"methods": [
"token",
"password"
],
"expires_at": "2020-01-01T00:00:10.000123Z",
"project": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_TENANT_ID,
"name": self.TEST_TENANT_NAME
},
"user": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_USER,
"name": self.TEST_USER
},
"issued_at": "2013-05-29T16:55:21.468960Z",
"catalog": self.TEST_SERVICE_CATALOG
},
}
self.TEST_PROJECTS_RESPONSE = {
"projects": [
{
"domain_id": "1789d1",
"enabled": "True",
"id": "263fd9",
"links": {
"self": "https://identity:5000/v3/projects/263fd9"
},
"name": "Dev Group A"
},
{
"domain_id": "1789d1",
"enabled": "True",
"id": "e56ad3",
"links": {
"self": "https://identity:5000/v3/projects/e56ad3"
},
"name": "Dev Group B"
}
],
"links": {
"self": "https://identity:5000/v3/projects",
}
}
def stub_auth(self, subject_token=None, **kwargs):
if not subject_token:
subject_token = self.TEST_TOKEN
self.stub_url('POST', ['auth', 'tokens'],
headers={'X-Subject-Token': subject_token}, **kwargs)
def test_authenticate_with_username_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL,
username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_username_password_unscoped(self):
del self.TEST_RESPONSE_DICT['token']['catalog']
del self.TEST_RESPONSE_DICT['token']['project']
self.stub_auth(json=self.TEST_RESPONSE_DICT)
self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE)
test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id']
self.stub_url(method="GET",
json=self.TEST_PROJECTS_RESPONSE,
parts=['users', test_user_id, 'projects'])
a = v3.Password(self.TEST_URL,
username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
cs = client.Client(session=s, auth_url=self.TEST_URL)
# As a sanity check on the auth_ref, make sure client has the
# proper user id, that it fetches the right project response
self.assertEqual(test_user_id, a.auth_ref.user_id)
t = cs.projects.list(user=a.auth_ref.user_id)
self.assertEqual(2, len(t))
def test_authenticate_with_username_password_domain_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_username_password_project_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS,
project_id=self.TEST_DOMAIN_ID)
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID)
def test_authenticate_with_token(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['token'],
'token': {'id': self.TEST_TOKEN}}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_with_expired(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
d = copy.deepcopy(self.TEST_RESPONSE_DICT)
d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'
a = v3.Password(self.TEST_URL, username='username',
password='password')
a.auth_ref = access.AccessInfo.factory(body=d)
s = session.Session(auth=a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
self.assertEqual(a.auth_ref['expires_at'],
self.TEST_RESPONSE_DICT['token']['expires_at'])
def test_with_domain_and_project_scoping(self):
a = v3.Password(self.TEST_URL, username='username',
password='password', project_id='project',
domain_id='domain')
self.assertRaises(exceptions.AuthorizationFailure,
a.get_token, None)
self.assertRaises(exceptions.AuthorizationFailure,
a.get_headers, None)
def test_with_trust_id(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, trust_id='trust')
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_with_multiple_mechanisms_factory(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS)
t = v3.TokenMethod(token='foo')
a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
s = session.Session(a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password', 'token'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}},
'token': {'id': 'foo'}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_with_multiple_mechanisms(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
p = v3.PasswordMethod(username=self.TEST_USER,
password=self.TEST_PASS)
t = v3.TokenMethod(token='foo')
a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
s = session.Session(auth=a)
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
req = {'auth': {'identity':
{'methods': ['password', 'token'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}},
'token': {'id': 'foo'}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_with_multiple_scopes(self):
s = session.Session()
a = v3.Password(self.TEST_URL,
username=self.TEST_USER, password=self.TEST_PASS,
domain_id='x', project_id='x')
self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)
a = v3.Password(self.TEST_URL,
username=self.TEST_USER, password=self.TEST_PASS,
domain_id='x', trust_id='x')
self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)
def _do_service_url_test(self, base_url, endpoint_filter):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
self.stub_url('GET', ['path'],
base_url=base_url,
text='SUCCESS', status_code=200)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
resp = s.get('/path', endpoint_filter=endpoint_filter)
self.assertEqual(resp.status_code, 200)
self.assertEqual(self.requests.last_request.url, base_url + '/path')
def test_service_url(self):
endpoint_filter = {'service_type': 'compute',
'interface': 'admin',
'service_name': 'nova'}
self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)
def test_service_url_defaults_to_public(self):
endpoint_filter = {'service_type': 'compute'}
self._do_service_url_test('http://nova/novapi/public', endpoint_filter)
def test_endpoint_filter_without_service_type_fails(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
endpoint_filter={'interface': 'admin'})
def test_full_url_overrides_endpoint_filter(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
self.stub_url('GET', [],
base_url='http://testurl/',
text='SUCCESS', status_code=200)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
resp = s.get('http://testurl/',
endpoint_filter={'service_type': 'compute'})
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.text, 'SUCCESS')
def test_invalid_auth_response_dict(self):
self.stub_auth(json={'hello': 'world'})
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
authenticated=True)
def test_invalid_auth_response_type(self):
self.stub_url('POST', ['auth', 'tokens'], text='testdata')
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
authenticated=True)
def test_invalidate_response(self):
auth_responses = [{'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
'headers': {'X-Subject-Token': 'token1'}},
{'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
'headers': {'X-Subject-Token': 'token2'}}]
self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
self.assertEqual('token1', s.get_token())
self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())
a.invalidate()
self.assertEqual('token2', s.get_token())
self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())
def test_doesnt_log_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
password = uuid.uuid4().hex
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=password)
s = session.Session(a)
self.assertEqual(self.TEST_TOKEN, s.get_token())
self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
s.get_auth_headers())
self.assertNotIn(password, self.logger.output)
def test_sends_nocatalog(self):
del self.TEST_RESPONSE_DICT['token']['catalog']
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL,
username=self.TEST_USER,
password=self.TEST_PASS,
include_catalog=False)
s = session.Session(auth=a)
s.get_token()
auth_url = self.TEST_URL + '/auth/tokens'
self.assertEqual(auth_url, a.token_url)
self.assertEqual(auth_url + '?nocatalog',
self.requests.last_request.url)

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneclient.auth.identity.generic import password
from keystoneclient.auth.identity import v2
from keystoneclient.auth.identity import v3
from keystoneclient.tests.unit.auth import utils
class PasswordTests(utils.GenericPluginTestCase):
PLUGIN_CLASS = password.Password
V2_PLUGIN_CLASS = v2.Password
V3_PLUGIN_CLASS = v3.Password
def new_plugin(self, **kwargs):
kwargs.setdefault('username', uuid.uuid4().hex)
kwargs.setdefault('password', uuid.uuid4().hex)
return super(PasswordTests, self).new_plugin(**kwargs)
def test_with_user_domain_params(self):
self.stub_discovery()
self.assertCreateV3(domain_id=uuid.uuid4().hex,
user_domain_id=uuid.uuid4().hex)
def test_v3_user_params_v2_url(self):
self.stub_discovery(v3=False)
self.assertDiscoveryFailure(user_domain_id=uuid.uuid4().hex)
def test_options(self):
opts = [o.name for o in self.PLUGIN_CLASS.get_options()]
allowed_opts = ['user-name',
'user-domain-id',
'user-domain-name',
'user-id',
'password',
'domain-id',
'domain-name',
'tenant-id',
'tenant-name',
'project-id',
'project-name',
'project-domain-id',
'project-domain-name',
'trust-id',
'auth-url']
self.assertEqual(set(allowed_opts), set(opts))
self.assertEqual(len(allowed_opts), len(opts))

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneclient.auth.identity.generic import token
from keystoneclient.auth.identity import v2
from keystoneclient.auth.identity import v3
from keystoneclient.tests.unit.auth import utils
class TokenTests(utils.GenericPluginTestCase):
PLUGIN_CLASS = token.Token
V2_PLUGIN_CLASS = v2.Token
V3_PLUGIN_CLASS = v3.Token
def new_plugin(self, **kwargs):
kwargs.setdefault('token', uuid.uuid4().hex)
return super(TokenTests, self).new_plugin(**kwargs)
def test_options(self):
opts = [o.name for o in self.PLUGIN_CLASS.get_options()]
allowed_opts = ['token',
'domain-id',
'domain-name',
'tenant-id',
'tenant-name',
'project-id',
'project-name',
'project-domain-id',
'project-domain-name',
'trust-id',
'auth-url']
self.assertEqual(set(allowed_opts), set(opts))
self.assertEqual(len(allowed_opts), len(opts))

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import matchers
from keystoneclient.auth import token_endpoint
from keystoneclient import session
from keystoneclient.tests.unit import utils
class TokenEndpointTest(utils.TestCase):
TEST_TOKEN = 'aToken'
TEST_URL = 'http://server/prefix'
def test_basic_case(self):
self.requests.get(self.TEST_URL, text='body')
a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a)
data = s.get(self.TEST_URL, authenticated=True)
self.assertEqual(data.text, 'body')
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
def test_basic_endpoint_case(self):
self.stub_url('GET', ['p'], text='body')
a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a)
data = s.get('/p',
authenticated=True,
endpoint_filter={'service': 'identity'})
self.assertEqual(self.TEST_URL, a.get_endpoint(s))
self.assertEqual('body', data.text)
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
def test_token_endpoint_options(self):
opt_names = [opt.name for opt in token_endpoint.Token.get_options()]
self.assertThat(opt_names, matchers.HasLength(2))
self.assertIn('token', opt_names)
self.assertIn('endpoint', opt_names)
def test_token_endpoint_user_id(self):
a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session()
# we can't know this information about this sort of plugin
self.assertIsNone(a.get_user_id(s))
self.assertIsNone(a.get_project_id(s))

View File

@ -0,0 +1,200 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import uuid
import mock
from oslo_config import cfg
import six
from keystoneclient import access
from keystoneclient.auth import base
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
from keystoneclient.tests.unit import utils
class MockPlugin(base.BaseAuthPlugin):
INT_DESC = 'test int'
FLOAT_DESC = 'test float'
BOOL_DESC = 'test bool'
STR_DESC = 'test str'
STR_DEFAULT = uuid.uuid4().hex
def __init__(self, **kwargs):
self._data = kwargs
def __getitem__(self, key):
return self._data[key]
def get_token(self, *args, **kwargs):
return 'aToken'
def get_endpoint(self, *args, **kwargs):
return 'http://test'
@classmethod
def get_options(cls):
return [
cfg.IntOpt('a-int', default='3', help=cls.INT_DESC),
cfg.BoolOpt('a-bool', help=cls.BOOL_DESC),
cfg.FloatOpt('a-float', help=cls.FLOAT_DESC),
cfg.StrOpt('a-str', help=cls.STR_DESC, default=cls.STR_DEFAULT),
]
class MockManager(object):
def __init__(self, driver):
self.driver = driver
def mock_plugin(f):
@functools.wraps(f)
def inner(*args, **kwargs):
with mock.patch.object(base, 'get_plugin_class') as m:
m.return_value = MockPlugin
args = list(args) + [m]
return f(*args, **kwargs)
return inner
class TestCase(utils.TestCase):
GROUP = 'auth'
V2PASS = 'v2password'
V3TOKEN = 'v3token'
a_int = 88
a_float = 88.8
a_bool = False
TEST_VALS = {'a_int': a_int,
'a_float': a_float,
'a_bool': a_bool}
def assertTestVals(self, plugin, vals=TEST_VALS):
for k, v in six.iteritems(vals):
self.assertEqual(v, plugin[k])
class GenericPluginTestCase(utils.TestCase):
TEST_URL = 'http://keystone.host:5000/'
# OVERRIDE THESE IN SUB CLASSES
PLUGIN_CLASS = None
V2_PLUGIN_CLASS = None
V3_PLUGIN_CLASS = None
def setUp(self):
super(GenericPluginTestCase, self).setUp()
self.token_v2 = fixture.V2Token()
self.token_v3 = fixture.V3Token()
self.token_v3_id = uuid.uuid4().hex
self.session = session.Session()
self.stub_url('POST', ['v2.0', 'tokens'], json=self.token_v2)
self.stub_url('POST', ['v3', 'auth', 'tokens'],
headers={'X-Subject-Token': self.token_v3_id},
json=self.token_v3)
def new_plugin(self, **kwargs):
kwargs.setdefault('auth_url', self.TEST_URL)
return self.PLUGIN_CLASS(**kwargs)
def stub_discovery(self, base_url=None, **kwargs):
kwargs.setdefault('href', self.TEST_URL)
disc = fixture.DiscoveryList(**kwargs)
self.stub_url('GET', json=disc, base_url=base_url, status_code=300)
return disc
def assertCreateV3(self, **kwargs):
auth = self.new_plugin(**kwargs)
auth_ref = auth.get_auth_ref(self.session)
self.assertIsInstance(auth_ref, access.AccessInfoV3)
self.assertEqual(self.TEST_URL + 'v3/auth/tokens',
self.requests.last_request.url)
self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS)
return auth
def assertCreateV2(self, **kwargs):
auth = self.new_plugin(**kwargs)
auth_ref = auth.get_auth_ref(self.session)
self.assertIsInstance(auth_ref, access.AccessInfoV2)
self.assertEqual(self.TEST_URL + 'v2.0/tokens',
self.requests.last_request.url)
self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS)
return auth
def assertDiscoveryFailure(self, **kwargs):
plugin = self.new_plugin(**kwargs)
self.assertRaises(exceptions.DiscoveryFailure,
plugin.get_auth_ref,
self.session)
def test_create_v3_if_domain_params(self):
self.stub_discovery()
self.assertCreateV3(domain_id=uuid.uuid4().hex)
self.assertCreateV3(domain_name=uuid.uuid4().hex)
self.assertCreateV3(project_name=uuid.uuid4().hex,
project_domain_name=uuid.uuid4().hex)
self.assertCreateV3(project_name=uuid.uuid4().hex,
project_domain_id=uuid.uuid4().hex)
def test_create_v2_if_no_domain_params(self):
self.stub_discovery()
self.assertCreateV2()
self.assertCreateV2(project_id=uuid.uuid4().hex)
self.assertCreateV2(project_name=uuid.uuid4().hex)
self.assertCreateV2(tenant_id=uuid.uuid4().hex)
self.assertCreateV2(tenant_name=uuid.uuid4().hex)
def test_v3_params_v2_url(self):
self.stub_discovery(v3=False)
self.assertDiscoveryFailure(domain_name=uuid.uuid4().hex)
def test_v2_params_v3_url(self):
self.stub_discovery(v2=False)
self.assertCreateV3()
def test_no_urls(self):
self.stub_discovery(v2=False, v3=False)
self.assertDiscoveryFailure()
def test_path_based_url_v2(self):
self.stub_url('GET', ['v2.0'], status_code=403)
self.assertCreateV2(auth_url=self.TEST_URL + 'v2.0')
def test_path_based_url_v3(self):
self.stub_url('GET', ['v3'], status_code=403)
self.assertCreateV3(auth_url=self.TEST_URL + 'v3')
def test_disc_error_for_failure(self):
self.stub_url('GET', [], status_code=403)
self.assertDiscoveryFailure()
def test_v3_plugin_from_failure(self):
url = self.TEST_URL + 'v3'
self.stub_url('GET', [], base_url=url, status_code=403)
self.assertCreateV3(auth_url=url)
def test_unknown_discovery_version(self):
# make a v4 entry that's mostly the same as a v3
self.stub_discovery(v2=False, v3_id='v4.0')
self.assertDiscoveryFailure()

View File

@ -0,0 +1,802 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import uuid
from oslo_serialization import jsonutils
import six
from testtools import matchers
from keystoneclient import _discover
from keystoneclient.auth import token_endpoint
from keystoneclient import client
from keystoneclient import discover
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
from keystoneclient.tests.unit import utils
from keystoneclient.v2_0 import client as v2_client
from keystoneclient.v3 import client as v3_client
BASE_HOST = 'http://keystone.example.com'
BASE_URL = "%s:5000/" % BASE_HOST
UPDATED = '2013-03-06T00:00:00Z'
TEST_SERVICE_CATALOG = [{
"endpoints": [{
"adminURL": "%s:8774/v1.0" % BASE_HOST,
"region": "RegionOne",
"internalURL": "%s://127.0.0.1:8774/v1.0" % BASE_HOST,
"publicURL": "%s:8774/v1.0/" % BASE_HOST
}],
"type": "nova_compat",
"name": "nova_compat"
}, {
"endpoints": [{
"adminURL": "http://nova/novapi/admin",
"region": "RegionOne",
"internalURL": "http://nova/novapi/internal",
"publicURL": "http://nova/novapi/public"
}],
"type": "compute",
"name": "nova"
}, {
"endpoints": [{
"adminURL": "http://glance/glanceapi/admin",
"region": "RegionOne",
"internalURL": "http://glance/glanceapi/internal",
"publicURL": "http://glance/glanceapi/public"
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"adminURL": "%s:35357/v2.0" % BASE_HOST,
"region": "RegionOne",
"internalURL": "%s:5000/v2.0" % BASE_HOST,
"publicURL": "%s:5000/v2.0" % BASE_HOST
}],
"type": "identity",
"name": "keystone"
}, {
"endpoints": [{
"adminURL": "http://swift/swiftapi/admin",
"region": "RegionOne",
"internalURL": "http://swift/swiftapi/internal",
"publicURL": "http://swift/swiftapi/public"
}],
"type": "object-store",
"name": "swift"
}]
V2_URL = "%sv2.0" % BASE_URL
V2_VERSION = fixture.V2Discovery(V2_URL)
V2_VERSION.updated_str = UPDATED
V2_AUTH_RESPONSE = jsonutils.dumps({
"access": {
"token": {
"expires": "2020-01-01T00:00:10.000123Z",
"id": 'fakeToken',
"tenant": {
"id": '1'
},
},
"user": {
"id": 'test'
},
"serviceCatalog": TEST_SERVICE_CATALOG,
},
})
V3_URL = "%sv3" % BASE_URL
V3_VERSION = fixture.V3Discovery(V3_URL)
V3_MEDIA_TYPES = V3_VERSION.media_types
V3_VERSION.updated_str = UPDATED
V3_TOKEN = six.u('3e2813b7ba0b4006840c3825860b86ed'),
V3_AUTH_RESPONSE = jsonutils.dumps({
"token": {
"methods": [
"token",
"password"
],
"expires_at": "2020-01-01T00:00:10.000123Z",
"project": {
"domain": {
"id": '1',
"name": 'test-domain'
},
"id": '1',
"name": 'test-project'
},
"user": {
"domain": {
"id": '1',
"name": 'test-domain'
},
"id": '1',
"name": 'test-user'
},
"issued_at": "2013-05-29T16:55:21.468960Z",
},
})
CINDER_EXAMPLES = {
"versions": [
{
"status": "CURRENT",
"updated": "2012-01-04T11:33:21Z",
"id": "v1.0",
"links": [
{
"href": "%sv1/" % BASE_URL,
"rel": "self"
}
]
},
{
"status": "CURRENT",
"updated": "2012-11-21T11:33:21Z",
"id": "v2.0",
"links": [
{
"href": "%sv2/" % BASE_URL,
"rel": "self"
}
]
}
]
}
GLANCE_EXAMPLES = {
"versions": [
{
"status": "CURRENT",
"id": "v2.2",
"links": [
{
"href": "%sv2/" % BASE_URL,
"rel": "self"
}
]
},
{
"status": "SUPPORTED",
"id": "v2.1",
"links": [
{
"href": "%sv2/" % BASE_URL,
"rel": "self"
}
]
},
{
"status": "SUPPORTED",
"id": "v2.0",
"links": [
{
"href": "%sv2/" % BASE_URL,
"rel": "self"
}
]
},
{
"status": "CURRENT",
"id": "v1.1",
"links": [
{
"href": "%sv1/" % BASE_URL,
"rel": "self"
}
]
},
{
"status": "SUPPORTED",
"id": "v1.0",
"links": [
{
"href": "%sv1/" % BASE_URL,
"rel": "self"
}
]
}
]
}
def _create_version_list(versions):
return jsonutils.dumps({'versions': {'values': versions}})
def _create_single_version(version):
return jsonutils.dumps({'version': version})
V3_VERSION_LIST = _create_version_list([V3_VERSION, V2_VERSION])
V2_VERSION_LIST = _create_version_list([V2_VERSION])
V3_VERSION_ENTRY = _create_single_version(V3_VERSION)
V2_VERSION_ENTRY = _create_single_version(V2_VERSION)
class AvailableVersionsTests(utils.TestCase):
def test_available_versions_basics(self):
examples = {'keystone': V3_VERSION_LIST,
'cinder': jsonutils.dumps(CINDER_EXAMPLES),
'glance': jsonutils.dumps(GLANCE_EXAMPLES)}
for path, text in six.iteritems(examples):
url = "%s%s" % (BASE_URL, path)
self.requests.get(url, status_code=300, text=text)
versions = discover.available_versions(url)
for v in versions:
for n in ('id', 'status', 'links'):
msg = '%s missing from %s version data' % (n, path)
self.assertThat(v, matchers.Annotate(msg,
matchers.Contains(n)))
def test_available_versions_individual(self):
self.requests.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY)
versions = discover.available_versions(V3_URL)
for v in versions:
self.assertEqual(v['id'], 'v3.0')
self.assertEqual(v['status'], 'stable')
self.assertIn('media-types', v)
self.assertIn('links', v)
def test_available_keystone_data(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions))
for v in versions:
self.assertIn(v['id'], ('v2.0', 'v3.0'))
self.assertEqual(v['updated'], UPDATED)
self.assertEqual(v['status'], 'stable')
if v['id'] == 'v3.0':
self.assertEqual(v['media-types'], V3_MEDIA_TYPES)
def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.get(BASE_URL, status_code=300, text=text)
versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions))
for v in versions:
self.assertEqual(v['status'], 'CURRENT')
if v['id'] == 'v1.0':
self.assertEqual(v['updated'], '2012-01-04T11:33:21Z')
elif v['id'] == 'v2.0':
self.assertEqual(v['updated'], '2012-11-21T11:33:21Z')
else:
self.fail("Invalid version found")
def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.get(BASE_URL, status_code=200, text=text)
versions = discover.available_versions(BASE_URL)
self.assertEqual(5, len(versions))
for v in versions:
if v['id'] in ('v2.2', 'v1.1'):
self.assertEqual(v['status'], 'CURRENT')
elif v['id'] in ('v2.1', 'v2.0', 'v1.0'):
self.assertEqual(v['status'], 'SUPPORTED')
else:
self.fail("Invalid version found")
class ClientDiscoveryTests(utils.TestCase):
def assertCreatesV3(self, **kwargs):
self.requests.post('%s/auth/tokens' % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
keystone = client.Client(**kwargs)
self.assertIsInstance(keystone, v3_client.Client)
return keystone
def assertCreatesV2(self, **kwargs):
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
keystone = client.Client(**kwargs)
self.assertIsInstance(keystone, v2_client.Client)
return keystone
def assertVersionNotAvailable(self, **kwargs):
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
self.assertRaises(exceptions.VersionNotAvailable,
client.Client, **kwargs)
def assertDiscoveryFailure(self, **kwargs):
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
self.assertRaises(exceptions.DiscoveryFailure,
client.Client, **kwargs)
def test_discover_v3(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(auth_url=BASE_URL)
def test_discover_v2(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
self.assertCreatesV2(auth_url=BASE_URL)
def test_discover_endpoint_v2(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.assertCreatesV2(endpoint=BASE_URL, token='fake-token')
def test_discover_endpoint_v3(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(endpoint=BASE_URL, token='fake-token')
def test_discover_invalid_major_version(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=5)
def test_discover_200_response_fails(self):
self.requests.get(BASE_URL, text='ok')
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discover_minor_greater_than_available_fails(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4)
def test_discover_individual_version_v2(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(auth_url=V2_URL)
def test_discover_individual_version_v3(self):
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(auth_url=V3_URL)
def test_discover_individual_endpoint_v2(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(endpoint=V2_URL, token='fake-token')
def test_discover_individual_endpoint_v3(self):
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(endpoint=V3_URL, token='fake-token')
def test_discover_fail_to_create_bad_individual_version(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertVersionNotAvailable(auth_url=V2_URL, version=3)
self.assertVersionNotAvailable(auth_url=V3_URL, version=2)
def test_discover_unstable_versions(self):
version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta')
self.requests.get(BASE_URL, status_code=300, json=version_list)
self.assertCreatesV2(auth_url=BASE_URL)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=3)
self.assertCreatesV3(auth_url=BASE_URL, unstable=True)
def test_discover_forwards_original_ip(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
ip = '192.168.1.1'
self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip)
self.assertThat(self.requests.last_request.headers['forwarded'],
matchers.Contains(ip))
def test_discover_bad_args(self):
self.assertRaises(exceptions.DiscoveryFailure,
client.Client)
def test_discover_bad_response(self):
self.requests.get(BASE_URL, status_code=300, json={'FOO': 'BAR'})
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discovery_ignore_invalid(self):
resp = [{'id': 'v3.0',
'links': [1, 2, 3, 4], # invalid links
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}]
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list(resp))
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_ignore_entry_without_links(self):
v3 = V3_VERSION.copy()
v3['links'] = []
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL)
def test_ignore_entry_without_status(self):
v3 = V3_VERSION.copy()
del v3['status']
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL)
def test_greater_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6')
self.requests.get(BASE_URL, json=versions)
self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4))
def test_lesser_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4')
self.requests.get(BASE_URL, json=versions)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6))
def test_bad_response(self):
self.requests.get(BASE_URL, status_code=300, text="Ugly Duckling")
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_pass_client_arguments(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
kwargs = {'original_ip': '100', 'use_keyring': False,
'stale_duration': 15}
cl = self.assertCreatesV2(auth_url=BASE_URL, **kwargs)
self.assertEqual(cl.original_ip, '100')
self.assertEqual(cl.stale_duration, 15)
self.assertFalse(cl.use_keyring)
def test_overriding_stored_kwargs(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.requests.post("%s/auth/tokens" % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
disc = discover.Discover(auth_url=BASE_URL, debug=False,
username='foo')
client = disc.create_client(debug=True, password='bar')
self.assertIsInstance(client, v3_client.Client)
self.assertTrue(client.debug_log)
self.assertFalse(disc._client_kwargs['debug'])
self.assertEqual(client.username, 'foo')
self.assertEqual(client.password, 'bar')
def test_available_versions(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_ENTRY)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.available_versions()
self.assertEqual(1, len(versions))
self.assertEqual(V3_VERSION, versions[0])
def test_unknown_client_version(self):
V4_VERSION = {'id': 'v4.0',
'links': [{'href': 'http://url', 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}
versions = fixture.DiscoveryList()
versions.add_version(V4_VERSION)
self.requests.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure,
disc.create_client, version=4)
def test_discovery_fail_for_missing_v3(self):
versions = fixture.DiscoveryList(v2=True, v3=False)
self.requests.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure,
disc.create_client, version=(3, 0))
def _do_discovery_call(self, token=None, **kwargs):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
if not token:
token = uuid.uuid4().hex
url = 'http://testurl'
a = token_endpoint.Token(url, token)
s = session.Session(auth=a)
# will default to true as there is a plugin on the session
discover.Discover(s, auth_url=BASE_URL, **kwargs)
self.assertEqual(BASE_URL, self.requests.last_request.url)
def test_setting_authenticated_true(self):
token = uuid.uuid4().hex
self._do_discovery_call(token)
self.assertRequestHeaderEqual('X-Auth-Token', token)
def test_setting_authenticated_false(self):
self._do_discovery_call(authenticated=False)
self.assertNotIn('X-Auth-Token', self.requests.last_request.headers)
class DiscoverQueryTests(utils.TestCase):
def test_available_keystone_data(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
self.assertEqual((2, 0), versions[0]['version'])
self.assertEqual('stable', versions[0]['raw_status'])
self.assertEqual(V2_URL, versions[0]['url'])
self.assertEqual((3, 0), versions[1]['version'])
self.assertEqual('stable', versions[1]['raw_status'])
self.assertEqual(V3_URL, versions[1]['url'])
version = disc.data_for('v3.0')
self.assertEqual((3, 0), version['version'])
self.assertEqual('stable', version['raw_status'])
self.assertEqual(V3_URL, version['url'])
version = disc.data_for(2)
self.assertEqual((2, 0), version['version'])
self.assertEqual('stable', version['raw_status'])
self.assertEqual(V2_URL, version['url'])
self.assertIsNone(disc.url_for('v4'))
self.assertEqual(V3_URL, disc.url_for('v3'))
self.assertEqual(V2_URL, disc.url_for('v2'))
def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.get(BASE_URL, status_code=300, text=text)
v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
self.assertEqual((1, 0), versions[0]['version'])
self.assertEqual('CURRENT', versions[0]['raw_status'])
self.assertEqual(v1_url, versions[0]['url'])
self.assertEqual((2, 0), versions[1]['version'])
self.assertEqual('CURRENT', versions[1]['raw_status'])
self.assertEqual(v2_url, versions[1]['url'])
version = disc.data_for('v2.0')
self.assertEqual((2, 0), version['version'])
self.assertEqual('CURRENT', version['raw_status'])
self.assertEqual(v2_url, version['url'])
version = disc.data_for(1)
self.assertEqual((1, 0), version['version'])
self.assertEqual('CURRENT', version['raw_status'])
self.assertEqual(v1_url, version['url'])
self.assertIsNone(disc.url_for('v3'))
self.assertEqual(v2_url, disc.url_for('v2'))
self.assertEqual(v1_url, disc.url_for('v1'))
def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.get(BASE_URL, text=text)
v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
self.assertEqual((1, 0), versions[0]['version'])
self.assertEqual('SUPPORTED', versions[0]['raw_status'])
self.assertEqual(v1_url, versions[0]['url'])
self.assertEqual((1, 1), versions[1]['version'])
self.assertEqual('CURRENT', versions[1]['raw_status'])
self.assertEqual(v1_url, versions[1]['url'])
self.assertEqual((2, 0), versions[2]['version'])
self.assertEqual('SUPPORTED', versions[2]['raw_status'])
self.assertEqual(v2_url, versions[2]['url'])
self.assertEqual((2, 1), versions[3]['version'])
self.assertEqual('SUPPORTED', versions[3]['raw_status'])
self.assertEqual(v2_url, versions[3]['url'])
self.assertEqual((2, 2), versions[4]['version'])
self.assertEqual('CURRENT', versions[4]['raw_status'])
self.assertEqual(v2_url, versions[4]['url'])
for ver in (2, 2.1, 2.2):
version = disc.data_for(ver)
self.assertEqual((2, 2), version['version'])
self.assertEqual('CURRENT', version['raw_status'])
self.assertEqual(v2_url, version['url'])
self.assertEqual(v2_url, disc.url_for(ver))
for ver in (1, 1.1):
version = disc.data_for(ver)
self.assertEqual((1, 1), version['version'])
self.assertEqual('CURRENT', version['raw_status'])
self.assertEqual(v1_url, version['url'])
self.assertEqual(v1_url, disc.url_for(ver))
self.assertIsNone(disc.url_for('v3'))
self.assertIsNone(disc.url_for('v2.3'))
def test_allow_deprecated(self):
status = 'deprecated'
version_list = [{'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': status,
'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)
# deprecated is allowed by default
versions = disc.version_data(allow_deprecated=False)
self.assertEqual(0, len(versions))
versions = disc.version_data(allow_deprecated=True)
self.assertEqual(1, len(versions))
self.assertEqual(status, versions[0]['raw_status'])
self.assertEqual(V3_URL, versions[0]['url'])
self.assertEqual((3, 0), versions[0]['version'])
def test_allow_experimental(self):
status = 'experimental'
version_list = [{'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': status,
'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
self.assertEqual(0, len(versions))
versions = disc.version_data(allow_experimental=True)
self.assertEqual(1, len(versions))
self.assertEqual(status, versions[0]['raw_status'])
self.assertEqual(V3_URL, versions[0]['url'])
self.assertEqual((3, 0), versions[0]['version'])
def test_allow_unknown(self):
status = 'abcdef'
version_list = fixture.DiscoveryList(BASE_URL, v2=False,
v3_status=status)
self.requests.get(BASE_URL, json=version_list)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
self.assertEqual(0, len(versions))
versions = disc.version_data(allow_unknown=True)
self.assertEqual(1, len(versions))
self.assertEqual(status, versions[0]['raw_status'])
self.assertEqual(V3_URL, versions[0]['url'])
self.assertEqual((3, 0), versions[0]['version'])
def test_ignoring_invalid_lnks(self):
version_list = [{'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED},
{'id': 'v3.1',
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED},
{'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED,
'links': [{'href': V3_URL, 'rel': 'self'}],
}]
text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)
# raw_version_data will return all choices, even invalid ones
versions = disc.raw_version_data()
self.assertEqual(3, len(versions))
# only the version with both id and links will be actually returned
versions = disc.version_data()
self.assertEqual(1, len(versions))
class CatalogHackTests(utils.TestCase):
TEST_URL = 'http://keystone.server:5000/v2.0'
OTHER_URL = 'http://other.server:5000/path'
IDENTITY = 'identity'
BASE_URL = 'http://keystone.server:5000/'
V2_URL = BASE_URL + 'v2.0'
V3_URL = BASE_URL + 'v3'
def setUp(self):
super(CatalogHackTests, self).setUp()
self.hacks = _discover._VersionHacks()
self.hacks.add_discover_hack(self.IDENTITY,
re.compile('/v2.0/?$'),
'/')
def test_version_hacks(self):
self.assertEqual(self.BASE_URL,
self.hacks.get_discover_hack(self.IDENTITY,
self.V2_URL))
self.assertEqual(self.BASE_URL,
self.hacks.get_discover_hack(self.IDENTITY,
self.V2_URL + '/'))
self.assertEqual(self.OTHER_URL,
self.hacks.get_discover_hack(self.IDENTITY,
self.OTHER_URL))
def test_ignored_non_service_type(self):
self.assertEqual(self.V2_URL,
self.hacks.get_discover_hack('other', self.V2_URL))
class DiscoverUtils(utils.TestCase):
def test_version_number(self):
def assertVersion(inp, out):
self.assertEqual(out, _discover.normalize_version_number(inp))
def versionRaises(inp):
self.assertRaises(TypeError,
_discover.normalize_version_number,
inp)
assertVersion('v1.2', (1, 2))
assertVersion('v11', (11, 0))
assertVersion('1.2', (1, 2))
assertVersion('1.5.1', (1, 5, 1))
assertVersion('1', (1, 0))
assertVersion(1, (1, 0))
assertVersion(5.2, (5, 2))
assertVersion((6, 1), (6, 1))
assertVersion([1, 4], (1, 4))
versionRaises('hello')
versionRaises('1.a')
versionRaises('vacuum')

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import textwrap
import mock
import pep8
import testtools
from keystoneclient.hacking import checks
from keystoneclient.tests.unit import client_fixtures
class TestCheckOsloNamespaceImports(testtools.TestCase):
# We are patching pep8 so that only the check under test is actually
# installed.
@mock.patch('pep8._checks',
{'physical_line': {}, 'logical_line': {}, 'tree': {}})
def run_check(self, code):
pep8.register_check(checks.check_oslo_namespace_imports)
lines = textwrap.dedent(code).strip().splitlines(True)
checker = pep8.Checker(lines=lines)
checker.check_all()
checker.report._deferred_print.sort()
return checker.report._deferred_print
def assert_has_errors(self, code, expected_errors=None):
actual_errors = [e[:3] for e in self.run_check(code)]
self.assertEqual(expected_errors or [], actual_errors)
def test(self):
code_ex = self.useFixture(client_fixtures.HackingCode())
code = code_ex.oslo_namespace_imports['code']
errors = code_ex.oslo_namespace_imports['expected_errors']
self.assert_has_errors(code, expected_errors=errors)

View File

@ -0,0 +1,866 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import itertools
import uuid
import mock
from oslo_config import cfg
from oslo_config import fixture as config
from oslo_serialization import jsonutils
import requests
import six
from testtools import matchers
from keystoneclient import adapter
from keystoneclient.auth import base
from keystoneclient import exceptions
from keystoneclient import session as client_session
from keystoneclient.tests.unit import utils
class SessionTests(utils.TestCase):
TEST_URL = 'http://127.0.0.1:5000/'
def test_get(self):
session = client_session.Session()
self.stub_url('GET', text='response')
resp = session.get(self.TEST_URL)
self.assertEqual('GET', self.requests.last_request.method)
self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok)
def test_post(self):
session = client_session.Session()
self.stub_url('POST', text='response')
resp = session.post(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('POST', self.requests.last_request.method)
self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok)
self.assertRequestBodyIs(json={'hello': 'world'})
def test_head(self):
session = client_session.Session()
self.stub_url('HEAD')
resp = session.head(self.TEST_URL)
self.assertEqual('HEAD', self.requests.last_request.method)
self.assertTrue(resp.ok)
self.assertRequestBodyIs('')
def test_put(self):
session = client_session.Session()
self.stub_url('PUT', text='response')
resp = session.put(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('PUT', self.requests.last_request.method)
self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok)
self.assertRequestBodyIs(json={'hello': 'world'})
def test_delete(self):
session = client_session.Session()
self.stub_url('DELETE', text='response')
resp = session.delete(self.TEST_URL)
self.assertEqual('DELETE', self.requests.last_request.method)
self.assertTrue(resp.ok)
self.assertEqual(resp.text, 'response')
def test_patch(self):
session = client_session.Session()
self.stub_url('PATCH', text='response')
resp = session.patch(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('PATCH', self.requests.last_request.method)
self.assertTrue(resp.ok)
self.assertEqual(resp.text, 'response')
self.assertRequestBodyIs(json={'hello': 'world'})
def test_user_agent(self):
session = client_session.Session(user_agent='test-agent')
self.stub_url('GET', text='response')
resp = session.get(self.TEST_URL)
self.assertTrue(resp.ok)
self.assertRequestHeaderEqual('User-Agent', 'test-agent')
resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'})
self.assertTrue(resp.ok)
self.assertRequestHeaderEqual('User-Agent', 'new-agent')
resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'},
user_agent='overrides-agent')
self.assertTrue(resp.ok)
self.assertRequestHeaderEqual('User-Agent', 'overrides-agent')
def test_http_session_opts(self):
session = client_session.Session(cert='cert.pem', timeout=5,
verify='certs')
FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'})
RESP = mock.Mock(return_value=FAKE_RESP)
with mock.patch.object(session.session, 'request', RESP) as mocked:
session.post(self.TEST_URL, data='value')
mock_args, mock_kwargs = mocked.call_args
self.assertEqual(mock_args[0], 'POST')
self.assertEqual(mock_args[1], self.TEST_URL)
self.assertEqual(mock_kwargs['data'], 'value')
self.assertEqual(mock_kwargs['cert'], 'cert.pem')
self.assertEqual(mock_kwargs['verify'], 'certs')
self.assertEqual(mock_kwargs['timeout'], 5)
def test_not_found(self):
session = client_session.Session()
self.stub_url('GET', status_code=404)
self.assertRaises(exceptions.NotFound, session.get, self.TEST_URL)
def test_server_error(self):
session = client_session.Session()
self.stub_url('GET', status_code=500)
self.assertRaises(exceptions.InternalServerError,
session.get, self.TEST_URL)
def test_session_debug_output(self):
"""Test request and response headers in debug logs
in order to redact secure headers while debug is true.
"""
session = client_session.Session(verify=False)
headers = {'HEADERA': 'HEADERVALB'}
security_headers = {'Authorization': uuid.uuid4().hex,
'X-Auth-Token': uuid.uuid4().hex,
'X-Subject-Token': uuid.uuid4().hex, }
body = 'BODYRESPONSE'
data = 'BODYDATA'
all_headers = dict(
itertools.chain(headers.items(), security_headers.items()))
self.stub_url('POST', text=body, headers=all_headers)
resp = session.post(self.TEST_URL, headers=all_headers, data=data)
self.assertEqual(resp.status_code, 200)
self.assertIn('curl', self.logger.output)
self.assertIn('POST', self.logger.output)
self.assertIn('--insecure', self.logger.output)
self.assertIn(body, self.logger.output)
self.assertIn("'%s'" % data, self.logger.output)
for k, v in six.iteritems(headers):
self.assertIn(k, self.logger.output)
self.assertIn(v, self.logger.output)
# Assert that response headers contains actual values and
# only debug logs has been masked
for k, v in six.iteritems(security_headers):
self.assertIn('%s: {SHA1}' % k, self.logger.output)
self.assertEqual(v, resp.headers[k])
self.assertNotIn(v, self.logger.output)
def test_logging_cacerts(self):
path_to_certs = '/path/to/certs'
session = client_session.Session(verify=path_to_certs)
self.stub_url('GET', text='text')
session.get(self.TEST_URL)
self.assertIn('--cacert', self.logger.output)
self.assertIn(path_to_certs, self.logger.output)
def test_connect_retries(self):
def _timeout_error(request, context):
raise requests.exceptions.Timeout()
self.stub_url('GET', text=_timeout_error)
session = client_session.Session()
retries = 3
with mock.patch('time.sleep') as m:
self.assertRaises(exceptions.RequestTimeout,
session.get,
self.TEST_URL, connect_retries=retries)
self.assertEqual(retries, m.call_count)
# 3 retries finishing with 2.0 means 0.5, 1.0 and 2.0
m.assert_called_with(2.0)
# we count retries so there will be one initial request + 3 retries
self.assertThat(self.requests.request_history,
matchers.HasLength(retries + 1))
def test_uses_tcp_keepalive_by_default(self):
session = client_session.Session()
requests_session = session.session
self.assertIsInstance(requests_session.adapters['http://'],
client_session.TCPKeepAliveAdapter)
self.assertIsInstance(requests_session.adapters['https://'],
client_session.TCPKeepAliveAdapter)
def test_does_not_set_tcp_keepalive_on_custom_sessions(self):
mock_session = mock.Mock()
client_session.Session(session=mock_session)
self.assertFalse(mock_session.mount.called)
class RedirectTests(utils.TestCase):
REDIRECT_CHAIN = ['http://myhost:3445/',
'http://anotherhost:6555/',
'http://thirdhost/',
'http://finaldestination:55/']
DEFAULT_REDIRECT_BODY = 'Redirect'
DEFAULT_RESP_BODY = 'Found'
def setup_redirects(self, method='GET', status_code=305,
redirect_kwargs={}, final_kwargs={}):
redirect_kwargs.setdefault('text', self.DEFAULT_REDIRECT_BODY)
for s, d in zip(self.REDIRECT_CHAIN, self.REDIRECT_CHAIN[1:]):
self.requests.register_uri(method, s, status_code=status_code,
headers={'Location': d},
**redirect_kwargs)
final_kwargs.setdefault('status_code', 200)
final_kwargs.setdefault('text', self.DEFAULT_RESP_BODY)
self.requests.register_uri(method, self.REDIRECT_CHAIN[-1],
**final_kwargs)
def assertResponse(self, resp):
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.text, self.DEFAULT_RESP_BODY)
def test_basic_get(self):
session = client_session.Session()
self.setup_redirects()
resp = session.get(self.REDIRECT_CHAIN[-2])
self.assertResponse(resp)
def test_basic_post_keeps_correct_method(self):
session = client_session.Session()
self.setup_redirects(method='POST', status_code=301)
resp = session.post(self.REDIRECT_CHAIN[-2])
self.assertResponse(resp)
def test_redirect_forever(self):
session = client_session.Session(redirect=True)
self.setup_redirects()
resp = session.get(self.REDIRECT_CHAIN[0])
self.assertResponse(resp)
self.assertTrue(len(resp.history), len(self.REDIRECT_CHAIN))
def test_no_redirect(self):
session = client_session.Session(redirect=False)
self.setup_redirects()
resp = session.get(self.REDIRECT_CHAIN[0])
self.assertEqual(resp.status_code, 305)
self.assertEqual(resp.url, self.REDIRECT_CHAIN[0])
def test_redirect_limit(self):
self.setup_redirects()
for i in (1, 2):
session = client_session.Session(redirect=i)
resp = session.get(self.REDIRECT_CHAIN[0])
self.assertEqual(resp.status_code, 305)
self.assertEqual(resp.url, self.REDIRECT_CHAIN[i])
self.assertEqual(resp.text, self.DEFAULT_REDIRECT_BODY)
def test_history_matches_requests(self):
self.setup_redirects(status_code=301)
session = client_session.Session(redirect=True)
req_resp = requests.get(self.REDIRECT_CHAIN[0],
allow_redirects=True)
ses_resp = session.get(self.REDIRECT_CHAIN[0])
self.assertEqual(len(req_resp.history), len(ses_resp.history))
for r, s in zip(req_resp.history, ses_resp.history):
self.assertEqual(r.url, s.url)
self.assertEqual(r.status_code, s.status_code)
class ConstructSessionFromArgsTests(utils.TestCase):
KEY = 'keyfile'
CERT = 'certfile'
CACERT = 'cacert-path'
def _s(self, k=None, **kwargs):
k = k or kwargs
return client_session.Session.construct(k)
def test_verify(self):
self.assertFalse(self._s(insecure=True).verify)
self.assertTrue(self._s(verify=True, insecure=True).verify)
self.assertFalse(self._s(verify=False, insecure=True).verify)
self.assertEqual(self._s(cacert=self.CACERT).verify, self.CACERT)
def test_cert(self):
tup = (self.CERT, self.KEY)
self.assertEqual(self._s(cert=tup).cert, tup)
self.assertEqual(self._s(cert=self.CERT, key=self.KEY).cert, tup)
self.assertIsNone(self._s(key=self.KEY).cert)
def test_pass_through(self):
value = 42 # only a number because timeout needs to be
for key in ['timeout', 'session', 'original_ip', 'user_agent']:
args = {key: value}
self.assertEqual(getattr(self._s(args), key), value)
self.assertNotIn(key, args)
class AuthPlugin(base.BaseAuthPlugin):
"""Very simple debug authentication plugin.
Takes Parameters such that it can throw exceptions at the right times.
"""
TEST_TOKEN = 'aToken'
TEST_USER_ID = 'aUser'
TEST_PROJECT_ID = 'aProject'
SERVICE_URLS = {
'identity': {'public': 'http://identity-public:1111/v2.0',
'admin': 'http://identity-admin:1111/v2.0'},
'compute': {'public': 'http://compute-public:2222/v1.0',
'admin': 'http://compute-admin:2222/v1.0'},
'image': {'public': 'http://image-public:3333/v2.0',
'admin': 'http://image-admin:3333/v2.0'}
}
def __init__(self, token=TEST_TOKEN, invalidate=True):
self.token = token
self._invalidate = invalidate
def get_token(self, session):
return self.token
def get_endpoint(self, session, service_type=None, interface=None,
**kwargs):
try:
return self.SERVICE_URLS[service_type][interface]
except (KeyError, AttributeError):
return None
def invalidate(self):
return self._invalidate
def get_user_id(self, session):
return self.TEST_USER_ID
def get_project_id(self, session):
return self.TEST_PROJECT_ID
class CalledAuthPlugin(base.BaseAuthPlugin):
ENDPOINT = 'http://fakeendpoint/'
def __init__(self, invalidate=True):
self.get_token_called = False
self.get_endpoint_called = False
self.endpoint_arguments = {}
self.invalidate_called = False
self._invalidate = invalidate
def get_token(self, session):
self.get_token_called = True
return 'aToken'
def get_endpoint(self, session, **kwargs):
self.get_endpoint_called = True
self.endpoint_arguments = kwargs
return self.ENDPOINT
def invalidate(self):
self.invalidate_called = True
return self._invalidate
class SessionAuthTests(utils.TestCase):
TEST_URL = 'http://127.0.0.1:5000/'
TEST_JSON = {'hello': 'world'}
def stub_service_url(self, service_type, interface, path,
method='GET', **kwargs):
base_url = AuthPlugin.SERVICE_URLS[service_type][interface]
uri = "%s/%s" % (base_url.rstrip('/'), path.lstrip('/'))
self.requests.register_uri(method, uri, **kwargs)
def test_auth_plugin_default_with_plugin(self):
self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON)
# if there is an auth_plugin then it should default to authenticated
auth = AuthPlugin()
sess = client_session.Session(auth=auth)
resp = sess.get(self.TEST_URL)
self.assertDictEqual(resp.json(), self.TEST_JSON)
self.assertRequestHeaderEqual('X-Auth-Token', AuthPlugin.TEST_TOKEN)
def test_auth_plugin_disable(self):
self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON)
auth = AuthPlugin()
sess = client_session.Session(auth=auth)
resp = sess.get(self.TEST_URL, authenticated=False)
self.assertDictEqual(resp.json(), self.TEST_JSON)
self.assertRequestHeaderEqual('X-Auth-Token', None)
def test_service_type_urls(self):
service_type = 'compute'
interface = 'public'
path = '/instances'
status = 200
body = 'SUCCESS'
self.stub_service_url(service_type=service_type,
interface=interface,
path=path,
status_code=status,
text=body)
sess = client_session.Session(auth=AuthPlugin())
resp = sess.get(path,
endpoint_filter={'service_type': service_type,
'interface': interface})
self.assertEqual(self.requests.last_request.url,
AuthPlugin.SERVICE_URLS['compute']['public'] + path)
self.assertEqual(resp.text, body)
self.assertEqual(resp.status_code, status)
def test_service_url_raises_if_no_auth_plugin(self):
sess = client_session.Session()
self.assertRaises(exceptions.MissingAuthPlugin,
sess.get, '/path',
endpoint_filter={'service_type': 'compute',
'interface': 'public'})
def test_service_url_raises_if_no_url_returned(self):
sess = client_session.Session(auth=AuthPlugin())
self.assertRaises(exceptions.EndpointNotFound,
sess.get, '/path',
endpoint_filter={'service_type': 'unknown',
'interface': 'public'})
def test_raises_exc_only_when_asked(self):
# A request that returns a HTTP error should by default raise an
# exception by default, if you specify raise_exc=False then it will not
self.requests.get(self.TEST_URL, status_code=401)
sess = client_session.Session()
self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL)
resp = sess.get(self.TEST_URL, raise_exc=False)
self.assertEqual(401, resp.status_code)
def test_passed_auth_plugin(self):
passed = CalledAuthPlugin()
sess = client_session.Session()
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
endpoint_filter = {'service_type': 'identity'}
# no plugin with authenticated won't work
self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path',
authenticated=True)
# no plugin with an endpoint filter won't work
self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path',
authenticated=False, endpoint_filter=endpoint_filter)
resp = sess.get('path', auth=passed, endpoint_filter=endpoint_filter)
self.assertEqual(200, resp.status_code)
self.assertTrue(passed.get_endpoint_called)
self.assertTrue(passed.get_token_called)
def test_passed_auth_plugin_overrides(self):
fixed = CalledAuthPlugin()
passed = CalledAuthPlugin()
sess = client_session.Session(fixed)
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
resp = sess.get('path', auth=passed,
endpoint_filter={'service_type': 'identity'})
self.assertEqual(200, resp.status_code)
self.assertTrue(passed.get_endpoint_called)
self.assertTrue(passed.get_token_called)
self.assertFalse(fixed.get_endpoint_called)
self.assertFalse(fixed.get_token_called)
def test_requests_auth_plugin(self):
sess = client_session.Session()
requests_auth = object()
FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'})
RESP = mock.Mock(return_value=FAKE_RESP)
with mock.patch.object(sess.session, 'request', RESP) as mocked:
sess.get(self.TEST_URL, requests_auth=requests_auth)
mocked.assert_called_once_with('GET', self.TEST_URL,
headers=mock.ANY,
allow_redirects=mock.ANY,
auth=requests_auth,
verify=mock.ANY)
def test_reauth_called(self):
auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth)
self.requests.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
# allow_reauth=True is the default
resp = sess.get(self.TEST_URL, authenticated=True)
self.assertEqual(200, resp.status_code)
self.assertEqual('Hello', resp.text)
self.assertTrue(auth.invalidate_called)
def test_reauth_not_called(self):
auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth)
self.requests.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL,
authenticated=True, allow_reauth=False)
self.assertFalse(auth.invalidate_called)
def test_endpoint_override_overrides_filter(self):
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
override_base = 'http://mytest/'
path = 'path'
override_url = override_base + path
resp_text = uuid.uuid4().hex
self.requests.get(override_url, text=resp_text)
resp = sess.get(path,
endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'})
self.assertEqual(resp_text, resp.text)
self.assertEqual(override_url, self.requests.last_request.url)
self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called)
def test_endpoint_override_ignore_full_url(self):
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
path = 'path'
url = self.TEST_URL + path
resp_text = uuid.uuid4().hex
self.requests.get(url, text=resp_text)
resp = sess.get(url,
endpoint_override='http://someother.url',
endpoint_filter={'service_type': 'identity'})
self.assertEqual(resp_text, resp.text)
self.assertEqual(url, self.requests.last_request.url)
self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called)
def test_user_and_project_id(self):
auth = AuthPlugin()
sess = client_session.Session(auth=auth)
self.assertEqual(auth.TEST_USER_ID, sess.get_user_id())
self.assertEqual(auth.TEST_PROJECT_ID, sess.get_project_id())
class AdapterTest(utils.TestCase):
SERVICE_TYPE = uuid.uuid4().hex
SERVICE_NAME = uuid.uuid4().hex
INTERFACE = uuid.uuid4().hex
REGION_NAME = uuid.uuid4().hex
USER_AGENT = uuid.uuid4().hex
VERSION = uuid.uuid4().hex
TEST_URL = CalledAuthPlugin.ENDPOINT
def _create_loaded_adapter(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
return adapter.Adapter(sess,
auth=auth,
service_type=self.SERVICE_TYPE,
service_name=self.SERVICE_NAME,
interface=self.INTERFACE,
region_name=self.REGION_NAME,
user_agent=self.USER_AGENT,
version=self.VERSION)
def _verify_endpoint_called(self, adpt):
self.assertEqual(self.SERVICE_TYPE,
adpt.auth.endpoint_arguments['service_type'])
self.assertEqual(self.SERVICE_NAME,
adpt.auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
adpt.auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
adpt.auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
adpt.auth.endpoint_arguments['version'])
def test_setting_variables_on_request(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response)
adpt = self._create_loaded_adapter()
resp = adpt.get('/')
self.assertEqual(resp.text, response)
self._verify_endpoint_called(adpt)
self.assertTrue(adpt.auth.get_token_called)
self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT)
def test_setting_variables_on_get_endpoint(self):
adpt = self._create_loaded_adapter()
url = adpt.get_endpoint()
self.assertEqual(self.TEST_URL, url)
self._verify_endpoint_called(adpt)
def test_legacy_binding(self):
key = uuid.uuid4().hex
val = uuid.uuid4().hex
response = jsonutils.dumps({key: val})
self.stub_url('GET', text=response)
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
adpt = adapter.LegacyJsonAdapter(sess,
service_type=self.SERVICE_TYPE,
user_agent=self.USER_AGENT)
resp, body = adpt.get('/')
self.assertEqual(self.SERVICE_TYPE,
auth.endpoint_arguments['service_type'])
self.assertEqual(resp.text, response)
self.assertEqual(val, body[key])
def test_legacy_binding_non_json_resp(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response,
headers={'Content-Type': 'text/html'})
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
adpt = adapter.LegacyJsonAdapter(sess,
service_type=self.SERVICE_TYPE,
user_agent=self.USER_AGENT)
resp, body = adpt.get('/')
self.assertEqual(self.SERVICE_TYPE,
auth.endpoint_arguments['service_type'])
self.assertEqual(resp.text, response)
self.assertIsNone(body)
def test_methods(self):
sess = client_session.Session()
adpt = adapter.Adapter(sess)
url = 'http://url'
for method in ['get', 'head', 'post', 'put', 'patch', 'delete']:
with mock.patch.object(adpt, 'request') as m:
getattr(adpt, method)(url)
m.assert_called_once_with(url, method.upper())
def test_setting_endpoint_override(self):
endpoint_override = 'http://overrideurl'
path = '/path'
endpoint_url = endpoint_override + path
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
adpt = adapter.Adapter(sess, endpoint_override=endpoint_override)
response = uuid.uuid4().hex
self.requests.get(endpoint_url, text=response)
resp = adpt.get(path)
self.assertEqual(response, resp.text)
self.assertEqual(endpoint_url, self.requests.last_request.url)
self.assertEqual(endpoint_override, adpt.get_endpoint())
def test_adapter_invalidate(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
adpt.invalidate()
self.assertTrue(auth.invalidate_called)
def test_adapter_get_token(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
self.assertEqual(self.TEST_TOKEN, adpt.get_token())
self.assertTrue(auth.get_token_called)
def test_adapter_connect_retries(self):
retries = 2
sess = client_session.Session()
adpt = adapter.Adapter(sess, connect_retries=retries)
def _refused_error(request, context):
raise requests.exceptions.ConnectionError()
self.stub_url('GET', text=_refused_error)
with mock.patch('time.sleep') as m:
self.assertRaises(exceptions.ConnectionRefused,
adpt.get, self.TEST_URL)
self.assertEqual(retries, m.call_count)
# we count retries so there will be one initial request + 2 retries
self.assertThat(self.requests.request_history,
matchers.HasLength(retries + 1))
def test_user_and_project_id(self):
auth = AuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
self.assertEqual(auth.TEST_USER_ID, adpt.get_user_id())
self.assertEqual(auth.TEST_PROJECT_ID, adpt.get_project_id())
class ConfLoadingTests(utils.TestCase):
GROUP = 'sessiongroup'
def setUp(self):
super(ConfLoadingTests, self).setUp()
self.conf_fixture = self.useFixture(config.Config())
client_session.Session.register_conf_options(self.conf_fixture.conf,
self.GROUP)
def config(self, **kwargs):
kwargs['group'] = self.GROUP
self.conf_fixture.config(**kwargs)
def get_session(self, **kwargs):
return client_session.Session.load_from_conf_options(
self.conf_fixture.conf,
self.GROUP,
**kwargs)
def test_insecure_timeout(self):
self.config(insecure=True, timeout=5)
s = self.get_session()
self.assertFalse(s.verify)
self.assertEqual(5, s.timeout)
def test_client_certs(self):
cert = '/path/to/certfile'
key = '/path/to/keyfile'
self.config(certfile=cert, keyfile=key)
s = self.get_session()
self.assertTrue(s.verify)
self.assertEqual((cert, key), s.cert)
def test_cacert(self):
cafile = '/path/to/cacert'
self.config(cafile=cafile)
s = self.get_session()
self.assertEqual(cafile, s.verify)
def test_deprecated(self):
def new_deprecated():
return cfg.DeprecatedOpt(uuid.uuid4().hex, group=uuid.uuid4().hex)
opt_names = ['cafile', 'certfile', 'keyfile', 'insecure', 'timeout']
depr = dict([(n, [new_deprecated()]) for n in opt_names])
opts = client_session.Session.get_conf_options(deprecated_opts=depr)
self.assertThat(opt_names, matchers.HasLength(len(opts)))
for opt in opts:
self.assertIn(depr[opt.name][0], opt.deprecated_opts)
class CliLoadingTests(utils.TestCase):
def setUp(self):
super(CliLoadingTests, self).setUp()
self.parser = argparse.ArgumentParser()
client_session.Session.register_cli_options(self.parser)
def get_session(self, val, **kwargs):
args = self.parser.parse_args(val.split())
return client_session.Session.load_from_cli_options(args, **kwargs)
def test_insecure_timeout(self):
s = self.get_session('--insecure --timeout 5.5')
self.assertFalse(s.verify)
self.assertEqual(5.5, s.timeout)
def test_client_certs(self):
cert = '/path/to/certfile'
key = '/path/to/keyfile'
s = self.get_session('--os-cert %s --os-key %s' % (cert, key))
self.assertTrue(s.verify)
self.assertEqual((cert, key), s.cert)
def test_cacert(self):
cacert = '/path/to/cacert'
s = self.get_session('--os-cacert %s' % cacert)
self.assertEqual(cacert, s.verify)