diff --git a/openstack_auth/tests/run_tests.py b/openstack_auth/tests/run_tests.py deleted file mode 100644 index c254491011..0000000000 --- a/openstack_auth/tests/run_tests.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -import django -from django.test.runner import DiscoverRunner as test_runner - - -os.environ['DJANGO_SETTINGS_MODULE'] = 'openstack_auth.tests.settings' - -if hasattr(django, 'setup'): - django.setup() - - -def run(*test_args): - if not test_args: - test_args = ['tests'] - parent = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "..", - "..", - ) - sys.path.insert(0, parent) - failures = test_runner().run_tests(test_args) - sys.exit(failures) - - -if __name__ == '__main__': - run(*sys.argv[1:]) diff --git a/openstack_auth/tests/models.py b/openstack_auth/tests/unit/__init__.py similarity index 100% rename from openstack_auth/tests/models.py rename to openstack_auth/tests/unit/__init__.py diff --git a/openstack_auth/tests/tests.py b/openstack_auth/tests/unit/test_auth.py similarity index 82% rename from openstack_auth/tests/tests.py rename to openstack_auth/tests/unit/test_auth.py index 78edc8fab9..a087e54748 100644 --- a/openstack_auth/tests/tests.py +++ b/openstack_auth/tests/unit/test_auth.py @@ -17,9 +17,7 @@ import django from django.conf import settings from django.contrib import auth from django.core.urlresolvers import reverse -from django import http from django import test -from django.test.utils import override_settings from keystoneauth1 import exceptions as keystone_exceptions from keystoneauth1.identity import v2 as v2_auth from keystoneauth1.identity import v3 as v3_auth @@ -27,14 +25,11 @@ from keystoneauth1 import session from keystoneauth1 import token_endpoint from keystoneclient.v2_0 import client as client_v2 from keystoneclient.v3 import client as client_v3 -import mock from mox3 import mox from testscenarios import load_tests_apply_scenarios -from openstack_auth import policy from openstack_auth.tests import data_v2 from openstack_auth.tests import data_v3 -from openstack_auth import user from openstack_auth import utils @@ -1256,267 +1251,3 @@ class OpenStackAuthTestsWebSSO(OpenStackAuthTestsMixin, self.assertRedirects(response, settings.LOGIN_REDIRECT_URL) load_tests = load_tests_apply_scenarios - - -class PolicyLoaderTestCase(test.TestCase): - def test_policy_file_load(self): - policy.reset() - enforcer = policy._get_enforcer() - self.assertEqual(2, len(enforcer)) - self.assertIn('identity', enforcer) - self.assertIn('compute', enforcer) - - def test_policy_reset(self): - policy._get_enforcer() - self.assertEqual(2, len(policy._ENFORCER)) - policy.reset() - self.assertIsNone(policy._ENFORCER) - - -class PermTestCase(test.TestCase): - def test_has_perms(self): - testuser = user.User(id=1, roles=[]) - - def has_perm(perm, obj=None): - return perm in ('perm1', 'perm3') - - with mock.patch.object(testuser, 'has_perm', side_effect=has_perm): - self.assertFalse(testuser.has_perms(['perm2'])) - - # perm1 AND perm3 - self.assertFalse(testuser.has_perms(['perm1', 'perm2'])) - - # perm1 AND perm3 - self.assertTrue(testuser.has_perms(['perm1', 'perm3'])) - - # perm1 AND (perm2 OR perm3) - perm_list = ['perm1', ('perm2', 'perm3')] - self.assertTrue(testuser.has_perms(perm_list)) - - -class PolicyTestCase(test.TestCase): - _roles = [] - - def setUp(self): - mock_user = user.User(id=1, roles=self._roles) - patcher = mock.patch('openstack_auth.utils.get_user', - return_value=mock_user) - self.MockClass = patcher.start() - self.addCleanup(patcher.stop) - self.request = http.HttpRequest() - - -class PolicyTestCaseNonAdmin(PolicyTestCase): - _roles = [{'id': '1', 'name': 'member'}] - - def test_check_admin_required_false(self): - policy.reset() - value = policy.check((("identity", "admin_required"),), - request=self.request) - self.assertFalse(value) - - def test_check_identity_rule_not_found_false(self): - policy.reset() - value = policy.check((("identity", "i_dont_exist"),), - request=self.request) - # this should fail because the default check for - # identity is admin_required - self.assertFalse(value) - - def test_check_nova_context_is_admin_false(self): - policy.reset() - value = policy.check((("compute", "context_is_admin"),), - request=self.request) - self.assertFalse(value) - - def test_compound_check_false(self): - policy.reset() - value = policy.check((("identity", "admin_required"), - ("identity", "identity:default"),), - request=self.request) - self.assertFalse(value) - - def test_scope_not_found(self): - policy.reset() - value = policy.check((("dummy", "default"),), - request=self.request) - self.assertTrue(value) - - -class PolicyTestCaseAdmin(PolicyTestCase): - _roles = [{'id': '1', 'name': 'admin'}] - - def test_check_admin_required_true(self): - policy.reset() - value = policy.check((("identity", "admin_required"),), - request=self.request) - self.assertTrue(value) - - def test_check_identity_rule_not_found_true(self): - policy.reset() - value = policy.check((("identity", "i_dont_exist"),), - request=self.request) - # this should succeed because the default check for - # identity is admin_required - self.assertTrue(value) - - def test_compound_check_true(self): - policy.reset() - value = policy.check((("identity", "admin_required"), - ("identity", "identity:default"),), - request=self.request) - self.assertTrue(value) - - def test_check_nova_context_is_admin_true(self): - policy.reset() - value = policy.check((("compute", "context_is_admin"),), - request=self.request) - self.assertTrue(value) - - -class PolicyTestCaseV3Admin(PolicyTestCase): - _roles = [{'id': '1', 'name': 'admin'}] - - def setUp(self): - policy_files = { - 'identity': 'policy.v3cloudsample.json', - 'compute': 'nova_policy.json'} - - override = self.settings(POLICY_FILES=policy_files) - override.enable() - self.addCleanup(override.disable) - - mock_user = user.User(id=1, roles=self._roles, - user_domain_id='admin_domain_id') - patcher = mock.patch('openstack_auth.utils.get_user', - return_value=mock_user) - self.MockClass = patcher.start() - self.addCleanup(patcher.stop) - self.request = http.HttpRequest() - - def test_check_cloud_admin_required_true(self): - policy.reset() - value = policy.check((("identity", "cloud_admin"),), - request=self.request) - self.assertTrue(value) - - def test_check_domain_admin_required_true(self): - policy.reset() - value = policy.check(( - ("identity", "admin_and_matching_domain_id"),), - request=self.request) - self.assertTrue(value) - - def test_check_any_admin_required_true(self): - policy.reset() - value = policy.check((("identity", "admin_or_cloud_admin"),), - request=self.request) - self.assertTrue(value) - - -class RoleTestCaseAdmin(test.TestCase): - - def test_get_admin_roles_with_default_value(self): - admin_roles = utils.get_admin_roles() - self.assertSetEqual({'admin'}, admin_roles) - - @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin']) - def test_get_admin_roles(self): - admin_roles = utils.get_admin_roles() - self.assertSetEqual({'foo', 'bar', 'admin'}, admin_roles) - - @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin']) - def test_get_admin_permissions(self): - admin_permissions = utils.get_admin_permissions() - self.assertSetEqual({'openstack.roles.foo', - 'openstack.roles.bar', - 'openstack.roles.admin'}, admin_permissions) - - -class UtilsTestCase(test.TestCase): - - def test_fix_auth_url_version_v20(self): - settings.OPENSTACK_API_VERSIONS['identity'] = 2.0 - test_urls = [ - ("http://a/", ("http://a/v2.0", False)), - ("http://a", ("http://a/v2.0", False)), - ("http://a:8080/", ("http://a:8080/v2.0", False)), - ("http://a/v2.0", ("http://a/v2.0", False)), - ("http://a/v2.0/", ("http://a/v2.0/", False)), - ("http://a/identity", ("http://a/identity/v2.0", False)), - ("http://a/identity/", ("http://a/identity/v2.0", False)), - ("http://a:5000/identity/v2.0", - ("http://a:5000/identity/v2.0", False)), - ("http://a/identity/v2.0/", ("http://a/identity/v2.0/", False)) - ] - for src, expected in test_urls: - self.assertEqual(expected, utils.fix_auth_url_version_prefix(src)) - - def test_fix_auth_url_version_v3(self): - settings.OPENSTACK_API_VERSIONS['identity'] = 3 - test_urls = [ - ("http://a/", ("http://a/v3", False)), - ("http://a", ("http://a/v3", False)), - ("http://a:8080/", ("http://a:8080/v3", False)), - ("http://a/v3", ("http://a/v3", False)), - ("http://a/v3/", ("http://a/v3/", False)), - ("http://a/v2.0/", ("http://a/v3/", True)), - ("http://a/v2.0", ("http://a/v3", True)), - ("http://a/identity", ("http://a/identity/v3", False)), - ("http://a:5000/identity/", ("http://a:5000/identity/v3", False)), - ("http://a/identity/v3", ("http://a/identity/v3", False)), - ("http://a/identity/v3/", ("http://a/identity/v3/", False)) - ] - for src, expected in test_urls: - self.assertEqual(expected, utils.fix_auth_url_version_prefix(src)) - - -class UserTestCase(test.TestCase): - - def setUp(self): - self.data = data_v3.generate_test_data(pki=True) - - def test_unscoped_token_is_none(self): - created_token = user.Token(self.data.domain_scoped_access_info, - unscoped_token=None) - self.assertTrue(created_token._is_pki_token( - self.data.domain_scoped_access_info.auth_token)) - self.assertFalse(created_token._is_pki_token(None)) - - -class BehindProxyTestCase(test.TestCase): - - def setUp(self): - self.request = http.HttpRequest() - - def test_without_proxy(self): - self.request.META['REMOTE_ADDR'] = '10.111.111.2' - from openstack_auth.utils import get_client_ip - self.assertEqual('10.111.111.2', get_client_ip(self.request)) - - def test_with_proxy_no_settings(self): - from openstack_auth.utils import get_client_ip - self.request.META['REMOTE_ADDR'] = '10.111.111.2' - self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33' - self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' - self.assertEqual('10.111.111.2', get_client_ip(self.request)) - - def test_with_settings_without_proxy(self): - from openstack_auth.utils import get_client_ip - self.request.META['REMOTE_ADDR'] = '10.111.111.2' - self.assertEqual('10.111.111.2', get_client_ip(self.request)) - - @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_FORWARDED_FOR') - def test_with_settings_with_proxy_forwardfor(self): - from openstack_auth.utils import get_client_ip - self.request.META['REMOTE_ADDR'] = '10.111.111.2' - self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' - self.assertEqual('172.18.0.2', get_client_ip(self.request)) - - @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_REAL_IP') - def test_with_settings_with_proxy_real_ip(self): - from openstack_auth.utils import get_client_ip - self.request.META['REMOTE_ADDR'] = '10.111.111.2' - self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33' - self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' - self.assertEqual('192.168.15.33', get_client_ip(self.request)) diff --git a/openstack_auth/tests/unit/test_models.py b/openstack_auth/tests/unit/test_models.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openstack_auth/tests/unit/test_policy.py b/openstack_auth/tests/unit/test_policy.py new file mode 100644 index 0000000000..96e686586f --- /dev/null +++ b/openstack_auth/tests/unit/test_policy.py @@ -0,0 +1,154 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from django import http +from django import test +import mock + +from openstack_auth import policy +from openstack_auth import user + + +class PolicyLoaderTestCase(test.TestCase): + def test_policy_file_load(self): + policy.reset() + enforcer = policy._get_enforcer() + self.assertEqual(2, len(enforcer)) + self.assertIn('identity', enforcer) + self.assertIn('compute', enforcer) + + def test_policy_reset(self): + policy._get_enforcer() + self.assertEqual(2, len(policy._ENFORCER)) + policy.reset() + self.assertIsNone(policy._ENFORCER) + + +class PolicyTestCase(test.TestCase): + _roles = [] + + def setUp(self): + mock_user = user.User(id=1, roles=self._roles) + patcher = mock.patch('openstack_auth.utils.get_user', + return_value=mock_user) + self.MockClass = patcher.start() + self.addCleanup(patcher.stop) + self.request = http.HttpRequest() + + +class PolicyTestCaseNonAdmin(PolicyTestCase): + _roles = [{'id': '1', 'name': 'member'}] + + def test_check_admin_required_false(self): + policy.reset() + value = policy.check((("identity", "admin_required"),), + request=self.request) + self.assertFalse(value) + + def test_check_identity_rule_not_found_false(self): + policy.reset() + value = policy.check((("identity", "i_dont_exist"),), + request=self.request) + # this should fail because the default check for + # identity is admin_required + self.assertFalse(value) + + def test_check_nova_context_is_admin_false(self): + policy.reset() + value = policy.check((("compute", "context_is_admin"),), + request=self.request) + self.assertFalse(value) + + def test_compound_check_false(self): + policy.reset() + value = policy.check((("identity", "admin_required"), + ("identity", "identity:default"),), + request=self.request) + self.assertFalse(value) + + def test_scope_not_found(self): + policy.reset() + value = policy.check((("dummy", "default"),), + request=self.request) + self.assertTrue(value) + + +class PolicyTestCaseAdmin(PolicyTestCase): + _roles = [{'id': '1', 'name': 'admin'}] + + def test_check_admin_required_true(self): + policy.reset() + value = policy.check((("identity", "admin_required"),), + request=self.request) + self.assertTrue(value) + + def test_check_identity_rule_not_found_true(self): + policy.reset() + value = policy.check((("identity", "i_dont_exist"),), + request=self.request) + # this should succeed because the default check for + # identity is admin_required + self.assertTrue(value) + + def test_compound_check_true(self): + policy.reset() + value = policy.check((("identity", "admin_required"), + ("identity", "identity:default"),), + request=self.request) + self.assertTrue(value) + + def test_check_nova_context_is_admin_true(self): + policy.reset() + value = policy.check((("compute", "context_is_admin"),), + request=self.request) + self.assertTrue(value) + + +class PolicyTestCaseV3Admin(PolicyTestCase): + _roles = [{'id': '1', 'name': 'admin'}] + + def setUp(self): + policy_files = { + 'identity': 'policy.v3cloudsample.json', + 'compute': 'nova_policy.json'} + + override = self.settings(POLICY_FILES=policy_files) + override.enable() + self.addCleanup(override.disable) + + mock_user = user.User(id=1, roles=self._roles, + user_domain_id='admin_domain_id') + patcher = mock.patch('openstack_auth.utils.get_user', + return_value=mock_user) + self.MockClass = patcher.start() + self.addCleanup(patcher.stop) + self.request = http.HttpRequest() + + def test_check_cloud_admin_required_true(self): + policy.reset() + value = policy.check((("identity", "cloud_admin"),), + request=self.request) + self.assertTrue(value) + + def test_check_domain_admin_required_true(self): + policy.reset() + value = policy.check(( + ("identity", "admin_and_matching_domain_id"),), + request=self.request) + self.assertTrue(value) + + def test_check_any_admin_required_true(self): + policy.reset() + value = policy.check((("identity", "admin_or_cloud_admin"),), + request=self.request) + self.assertTrue(value) diff --git a/openstack_auth/tests/unit/test_user.py b/openstack_auth/tests/unit/test_user.py new file mode 100644 index 0000000000..d088fe5893 --- /dev/null +++ b/openstack_auth/tests/unit/test_user.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from django import test +import mock + +from openstack_auth.tests import data_v3 +from openstack_auth import user + + +class PermTestCase(test.TestCase): + def test_has_perms(self): + testuser = user.User(id=1, roles=[]) + + def has_perm(perm, obj=None): + return perm in ('perm1', 'perm3') + + with mock.patch.object(testuser, 'has_perm', side_effect=has_perm): + self.assertFalse(testuser.has_perms(['perm2'])) + + # perm1 AND perm3 + self.assertFalse(testuser.has_perms(['perm1', 'perm2'])) + + # perm1 AND perm3 + self.assertTrue(testuser.has_perms(['perm1', 'perm3'])) + + # perm1 AND (perm2 OR perm3) + perm_list = ['perm1', ('perm2', 'perm3')] + self.assertTrue(testuser.has_perms(perm_list)) + + +class UserTestCase(test.TestCase): + + def setUp(self): + super(UserTestCase, self).setUp() + self.data = data_v3.generate_test_data(pki=True) + + def test_unscoped_token_is_none(self): + created_token = user.Token(self.data.domain_scoped_access_info, + unscoped_token=None) + self.assertTrue(created_token._is_pki_token( + self.data.domain_scoped_access_info.auth_token)) + self.assertFalse(created_token._is_pki_token(None)) diff --git a/openstack_auth/tests/unit/test_utils.py b/openstack_auth/tests/unit/test_utils.py new file mode 100644 index 0000000000..ada5902bdc --- /dev/null +++ b/openstack_auth/tests/unit/test_utils.py @@ -0,0 +1,115 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from django.conf import settings +from django import http +from django import test +from django.test.utils import override_settings + +from openstack_auth import utils + + +class RoleTestCaseAdmin(test.TestCase): + + def test_get_admin_roles_with_default_value(self): + admin_roles = utils.get_admin_roles() + self.assertSetEqual({'admin'}, admin_roles) + + @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin']) + def test_get_admin_roles(self): + admin_roles = utils.get_admin_roles() + self.assertSetEqual({'foo', 'bar', 'admin'}, admin_roles) + + @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin']) + def test_get_admin_permissions(self): + admin_permissions = utils.get_admin_permissions() + self.assertSetEqual({'openstack.roles.foo', + 'openstack.roles.bar', + 'openstack.roles.admin'}, admin_permissions) + + +class UtilsTestCase(test.TestCase): + + def test_fix_auth_url_version_v20(self): + settings.OPENSTACK_API_VERSIONS['identity'] = 2.0 + test_urls = [ + ("http://a/", ("http://a/v2.0", False)), + ("http://a", ("http://a/v2.0", False)), + ("http://a:8080/", ("http://a:8080/v2.0", False)), + ("http://a/v2.0", ("http://a/v2.0", False)), + ("http://a/v2.0/", ("http://a/v2.0/", False)), + ("http://a/identity", ("http://a/identity/v2.0", False)), + ("http://a/identity/", ("http://a/identity/v2.0", False)), + ("http://a:5000/identity/v2.0", + ("http://a:5000/identity/v2.0", False)), + ("http://a/identity/v2.0/", ("http://a/identity/v2.0/", False)) + ] + for src, expected in test_urls: + self.assertEqual(expected, utils.fix_auth_url_version_prefix(src)) + + def test_fix_auth_url_version_v3(self): + settings.OPENSTACK_API_VERSIONS['identity'] = 3 + test_urls = [ + ("http://a/", ("http://a/v3", False)), + ("http://a", ("http://a/v3", False)), + ("http://a:8080/", ("http://a:8080/v3", False)), + ("http://a/v3", ("http://a/v3", False)), + ("http://a/v3/", ("http://a/v3/", False)), + ("http://a/v2.0/", ("http://a/v3/", True)), + ("http://a/v2.0", ("http://a/v3", True)), + ("http://a/identity", ("http://a/identity/v3", False)), + ("http://a:5000/identity/", ("http://a:5000/identity/v3", False)), + ("http://a/identity/v3", ("http://a/identity/v3", False)), + ("http://a/identity/v3/", ("http://a/identity/v3/", False)) + ] + for src, expected in test_urls: + self.assertEqual(expected, utils.fix_auth_url_version_prefix(src)) + + +class BehindProxyTestCase(test.TestCase): + + def setUp(self): + super(BehindProxyTestCase, self).setUp() + self.request = http.HttpRequest() + + def test_without_proxy(self): + self.request.META['REMOTE_ADDR'] = '10.111.111.2' + from openstack_auth.utils import get_client_ip + self.assertEqual('10.111.111.2', get_client_ip(self.request)) + + def test_with_proxy_no_settings(self): + from openstack_auth.utils import get_client_ip + self.request.META['REMOTE_ADDR'] = '10.111.111.2' + self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33' + self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' + self.assertEqual('10.111.111.2', get_client_ip(self.request)) + + def test_with_settings_without_proxy(self): + from openstack_auth.utils import get_client_ip + self.request.META['REMOTE_ADDR'] = '10.111.111.2' + self.assertEqual('10.111.111.2', get_client_ip(self.request)) + + @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_FORWARDED_FOR') + def test_with_settings_with_proxy_forwardfor(self): + from openstack_auth.utils import get_client_ip + self.request.META['REMOTE_ADDR'] = '10.111.111.2' + self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' + self.assertEqual('172.18.0.2', get_client_ip(self.request)) + + @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_REAL_IP') + def test_with_settings_with_proxy_real_ip(self): + from openstack_auth.utils import get_client_ip + self.request.META['REMOTE_ADDR'] = '10.111.111.2' + self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33' + self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2' + self.assertEqual('192.168.15.33', get_client_ip(self.request))