From 39561635690deb103ba7cd5b04892263d0444e31 Mon Sep 17 00:00:00 2001 From: Brant Knudson Date: Fri, 11 Mar 2016 16:24:58 -0600 Subject: [PATCH] Define identity interface - easy cases The identity interface isn't adequately specified. Nobody could implement an identity driver using the interface definition as it is since they wouldn't know what the inputs are or the outputs. We can't write a Manager using only the interface definition since the definition is incomplete (For example it didn't even mention that create_user returns the user). Several tests are added specifically for the driver interface. The driver was only tested through the manager, so we didn't have tests that showed that the driver actually followed its interface, just however the manager called it. You might notice that the tests are missing some obvious cases that should be tested. In some cases this is because the existing drivers that we have (sql and ldap) don't work the same way. For example, create_group on the sql driver always returns description even when it's None while the ldap driver doesn't. I wanted to keep this change simple and not modify the drivers. The interface definition can be enhanced along with the drivers and these tests can be added later. There might be more requirements that the manager puts on the drivers than what's documented/tested here. These can be added later. Change-Id: I9a9c660f2d98b6ac6cb032b40b32089cb5cf1844 --- keystone/identity/backends/base.py | 179 +++++- .../tests/unit/identity/backends/__init__.py | 0 .../tests/unit/identity/backends/test_base.py | 535 ++++++++++++++++++ .../tests/unit/identity/backends/test_ldap.py | 43 ++ .../tests/unit/identity/backends/test_sql.py | 55 ++ tests-py3-blacklist.txt | 1 + 6 files changed, 799 insertions(+), 14 deletions(-) create mode 100644 keystone/tests/unit/identity/backends/__init__.py create mode 100644 keystone/tests/unit/identity/backends/test_base.py create mode 100644 keystone/tests/unit/identity/backends/test_ldap.py create mode 100644 keystone/tests/unit/identity/backends/test_sql.py diff --git a/keystone/identity/backends/base.py b/keystone/identity/backends/base.py index f1c27a3556..76d5037733 100644 --- a/keystone/identity/backends/base.py +++ b/keystone/identity/backends/base.py @@ -48,7 +48,85 @@ def filter_user(user_ref): @six.add_metaclass(abc.ABCMeta) class IdentityDriverV8(object): - """Interface description for an Identity driver.""" + """Interface description for an Identity driver. + + The schema for users and groups is different depending on whether the + driver is domain aware or not (as returned by self.is_domain_aware()). + + If the driver is not domain aware: + + * domain_id will be not be included in the user / group passed in to + create_user / create_group + * the domain_id should not be returned in user / group refs. They'll be + overwritten. + + User schema (if driver is domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + domain_id: + type: string + password: + type: string + enabled: + type: boolean + default_project_id: + type: string + required: [id, name, domain_id, enabled] + additionalProperties: True + + User schema (if driver is not domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + password: + type: string + enabled: + type: boolean + default_project_id: + type: string + required: [id, name, enabled] + additionalProperties: True + # Note that domain_id is not allowed as a property + + Group schema (if driver is domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + domain_id: + type: string + description: + type: string + required: [id, name, domain_id] + additionalProperties: True + + Group schema (if driver is not domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + description: + type: string + required: [id, name] + additionalProperties: True + # Note that domain_id is not allowed as a property + + """ def _get_conf(self): try: @@ -64,7 +142,7 @@ class IdentityDriverV8(object): CONF.identity.list_limit or CONF.list_limit) def is_domain_aware(self): - """Indicate if Driver supports domains.""" + """Indicate if the driver supports domains.""" return True def default_assignment_driver(self): @@ -90,7 +168,12 @@ class IdentityDriverV8(object): def authenticate(self, user_id, password): """Authenticate a given user and password. - :returns: user_ref + :param str user_id: User ID + :param str password: Password + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :rtype: dict + :raises AssertionError: If user or password is invalid. """ raise exception.NotImplemented() # pragma: no cover @@ -101,6 +184,14 @@ class IdentityDriverV8(object): def create_user(self, user_id, user): """Create a new user. + :param str user_id: user ID. The driver can ignore this value. + :param dict user: user info. See user schema in + :class:`~.IdentityDriverV8`. + + :returns: user, matching the user schema. The driver should not return + the password. + :rtype: dict + :raises keystone.exception.Conflict: If a duplicate user exists. """ @@ -112,8 +203,11 @@ class IdentityDriverV8(object): :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of user_refs or an empty list. + :returns: a list of users or an empty list. See user schema in + :class:`~.IdentityDriverV8`. + :rtype: list of dict """ raise exception.NotImplemented() # pragma: no cover @@ -122,11 +216,16 @@ class IdentityDriverV8(object): def list_users_in_group(self, group_id, hints): """List users in a group. - :param group_id: the group in question + :param str group_id: the group in question :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of user_refs or an empty list. + :returns: a list of users or an empty list. See user schema in + :class:`~.IdentityDriverV8`. + :rtype: list of dict + + :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ raise exception.NotImplemented() # pragma: no cover @@ -135,7 +234,11 @@ class IdentityDriverV8(object): def get_user(self, user_id): """Get a user by ID. - :returns: user_ref + :param str user_id: User ID. + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :rtype: dict + :raises keystone.exception.UserNotFound: If the user doesn't exist. """ @@ -145,8 +248,16 @@ class IdentityDriverV8(object): def update_user(self, user_id, user): """Update an existing user. + :param str user_id: User ID. + :param dict user: User modification. See user schema in + :class:`~.IdentityDriverV8`. Properties set to None will be + removed. Required properties cannot be removed. + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :raises keystone.exception.UserNotFound: If the user doesn't exist. - :raises keystone.exception.Conflict: If a duplicate user exists. + :raises keystone.exception.Conflict: If a duplicate user exists in the + same domain. """ raise exception.NotImplemented() # pragma: no cover @@ -155,6 +266,9 @@ class IdentityDriverV8(object): def add_user_to_group(self, user_id, group_id): """Add a user to a group. + :param str user_id: User ID. + :param str group_id: Group ID. + :raises keystone.exception.UserNotFound: If the user doesn't exist. :raises keystone.exception.GroupNotFound: If the group doesn't exist. @@ -165,6 +279,11 @@ class IdentityDriverV8(object): def check_user_in_group(self, user_id, group_id): """Check if a user is a member of a group. + :param str user_id: User ID. + :param str group_id: Group ID. + + :raises keystone.exception.NotFound: If the user is not a member of the + group. :raises keystone.exception.UserNotFound: If the user doesn't exist. :raises keystone.exception.GroupNotFound: If the group doesn't exist. @@ -175,7 +294,10 @@ class IdentityDriverV8(object): def remove_user_from_group(self, user_id, group_id): """Remove a user from a group. - :raises keystone.exception.NotFound: If the entity not found. + :param str user_id: User ID. + :param str group_id: Group ID. + + :raises keystone.exception.NotFound: If the user is not in the group. """ raise exception.NotImplemented() # pragma: no cover @@ -205,6 +327,13 @@ class IdentityDriverV8(object): def create_group(self, group_id, group): """Create a new group. + :param str group_id: group ID. The driver can ignore this value. + :param dict group: group info. See group schema in + :class:`~.IdentityDriverV8`. + + :returns: group, matching the group schema. + :rtype: dict + :raises keystone.exception.Conflict: If a duplicate group exists. """ @@ -216,8 +345,10 @@ class IdentityDriverV8(object): :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of group_refs or an empty list. + :returns: a list of group_refs or an empty list. See group schema in + :class:`~.IdentityDriverV8`. """ raise exception.NotImplemented() # pragma: no cover @@ -226,11 +357,15 @@ class IdentityDriverV8(object): def list_groups_for_user(self, user_id, hints): """List groups a user is in - :param user_id: the user in question + :param str user_id: the user in question :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of group_refs or an empty list. + :returns: a list of group_refs or an empty list. See group schema in + :class:`~.IdentityDriverV8`. + + :raises keystone.exception.UserNotFound: If the user doesn't exist. """ raise exception.NotImplemented() # pragma: no cover @@ -239,7 +374,10 @@ class IdentityDriverV8(object): def get_group(self, group_id): """Get a group by ID. - :returns: group_ref + :param str group_id: group ID. + + :returns: group info. See group schema in :class:`~.IdentityDriverV8`. + :rtype: dict :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ @@ -249,7 +387,11 @@ class IdentityDriverV8(object): def get_group_by_name(self, group_name, domain_id): """Get a group by name. - :returns: group_ref + :param str group_name: group name. + :param str domain_id: domain ID. + + :returns: group info. See group schema in :class:`~.IdentityDriverV8`. + :rtype: dict :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ @@ -259,6 +401,13 @@ class IdentityDriverV8(object): def update_group(self, group_id, group): """Update an existing group. + :param str group_id: Group ID. + :param dict group: Group modification. See group schema in + :class:`~.IdentityDriverV8`. Required properties cannot be removed. + + :returns: group, matching the group schema. + :rtype: dict + :raises keystone.exception.GroupNotFound: If the group doesn't exist. :raises keystone.exception.Conflict: If a duplicate group exists. @@ -269,6 +418,8 @@ class IdentityDriverV8(object): def delete_group(self, group_id): """Delete an existing group. + :param str group_id: Group ID. + :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ diff --git a/keystone/tests/unit/identity/backends/__init__.py b/keystone/tests/unit/identity/backends/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/keystone/tests/unit/identity/backends/test_base.py b/keystone/tests/unit/identity/backends/test_base.py new file mode 100644 index 0000000000..45e4c07249 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_base.py @@ -0,0 +1,535 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystone.common import driver_hints +from keystone import exception + + +class IdentityDriverV8Tests(object): + driver = None # subclasses must override driver to the actual driver. + + # subclasses that don't allow name updates must set this to False. + allows_name_update = True + + # Subclasses must override this to indicate whether it's domain-aware or + # not. + expected_is_domain_aware = True + + # Subclasses must override this to the expected default assignment driver. + expected_default_assignment_driver = 'sql' + + # Subclasses must override this to the expected is_sql value. + expected_is_sql = False + + # Subclasses must override this to the expected expected_generates_uuids + # value. + expected_generates_uuids = True + + def create_user(self, domain_id=None, **kwargs): + """Get a user for the test. + + Subclasses can override this to provide their own way to provide a user + for the test. By default, driver.create_user is used. For drivers that + don't support create_user, this may go directly to the backend, or + maybe it gets a user from a set of pre-created users. + """ + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id or uuid.uuid4().hex + user.update(kwargs) + return self.driver.create_user(user_id, user) + + def create_group(self, domain_id=None): + """Get a group for the test. + + Similar to :meth:`~.create_user`, subclasses can override this to + provide their own way to provide a group for the test. + """ + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = domain_id or uuid.uuid4().hex + return self.driver.create_group(group_id, group) + + def test_is_domain_aware(self): + self.assertIs(self.expected_is_domain_aware, + self.driver.is_domain_aware()) + + def test_default_assignment_driver(self): + self.assertEqual(self.expected_default_assignment_driver, + self.driver.default_assignment_driver()) + + def test_is_sql(self): + self.assertIs(self.expected_is_sql, self.driver.is_sql) + + def test_generates_uuids(self): + self.assertIs(self.expected_generates_uuids, + self.driver.generates_uuids()) + + def test_create_user(self): + # Don't use self.create_user since this needs to test the driver + # interface and create_user might not use the driver. + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + ret = self.driver.create_user(user_id, user) + self.assertEqual(user_id, ret['id']) + + def test_create_user_all_attributes(self): + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + 'enabled': True, + 'default_project_id': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + ret = self.driver.create_user(user_id, user) + exp_user = user.copy() + del exp_user['password'] + self.assertEqual(exp_user, ret) + + def test_create_user_same_id_exc(self): + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + self.driver.create_user(user_id, user) + self.assertRaises(exception.Conflict, + self.driver.create_user, user_id, user) + + def test_create_user_same_name_and_domain_exc(self): + user1_id = uuid.uuid4().hex + name = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + user = { + 'id': user1_id, + 'name': name, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id + self.driver.create_user(user1_id, user) + + user2_id = uuid.uuid4().hex + user = { + 'id': user2_id, + 'name': name, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id + self.assertRaises(exception.Conflict, + self.driver.create_user, user2_id, user) + + def test_list_users_no_users(self): + hints = driver_hints.Hints() + self.assertEqual([], self.driver.list_users(hints)) + + def test_list_users_when_users(self): + user = self.create_user() + + hints = driver_hints.Hints() + users = self.driver.list_users(hints) + self.assertEqual([user['id']], [u['id'] for u in users]) + + def test_get_user(self): + user = self.create_user() + + actual_user = self.driver.get_user(user['id']) + self.assertEqual(user['id'], actual_user['id']) + + def test_get_user_no_user_exc(self): + self.assertRaises(exception.UserNotFound, + self.driver.get_user, uuid.uuid4().hex) + + def test_get_user_by_name(self): + domain_id = uuid.uuid4().hex + user = self.create_user(domain_id=domain_id) + + actual_user = self.driver.get_user_by_name(user['name'], domain_id) + self.assertEqual(user['id'], actual_user['id']) + + def test_get_user_by_name_no_user_exc(self): + # When the user doesn't exist, UserNotFound is raised. + self.assertRaises( + exception.UserNotFound, self.driver.get_user_by_name, + user_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex) + + def test_update_user(self): + user = self.create_user() + + user_mod = {'enabled': False} + actual_user = self.driver.update_user(user['id'], user_mod) + self.assertEqual(user['id'], actual_user['id']) + self.assertIs(False, actual_user['enabled']) + + def test_update_user_remove_optional_attribute(self): + # When the attribute has a value of None it's supposed to be removed. + user = self.create_user(default_project_id=uuid.uuid4().hex) + self.assertIn('default_project_id', user) + + user_mod = {'default_project_id': None} + actual_user = self.driver.update_user(user['id'], user_mod) + self.assertNotIn('default_project_id', actual_user) + + def test_update_user_same_name_exc(self): + # For drivers that allow name update, if the name of a user is changed + # to the same as another user in the same domain, Conflict is raised. + + if not self.allows_name_update: + self.skipTest("Backend doesn't allow name update.") + + domain_id = uuid.uuid4().hex + user1 = self.create_user(domain_id=domain_id) + user2 = self.create_user(domain_id=domain_id) + + user_mod = {'name': user2['name']} + self.assertRaises(exception.Conflict, self.driver.update_user, + user1['id'], user_mod) + + def test_update_user_no_user_exc(self): + user_id = uuid.uuid4().hex + user_mod = {'enabled': False} + self.assertRaises(exception.UserNotFound, + self.driver.update_user, user_id, user_mod) + + def test_update_user_name_not_allowed_exc(self): + # For drivers that do not allow name update, attempting to change the + # name causes an exception. + + if self.allows_name_update: + self.skipTest("Backend allows name update.") + + user = self.create_user() + user_mod = {'name': uuid.uuid4().hex} + self.assertRaises(exception.Conflict, self.driver.update_user, + user['id'], user_mod) + + def test_delete_user(self): + user = self.create_user() + + self.driver.delete_user(user['id']) + self.assertRaises(exception.UserNotFound, self.driver.get_user, + user['id']) + + def test_delete_user_no_user_exc(self): + # When the user doesn't exist, UserNotFound is raised. + self.assertRaises(exception.UserNotFound, self.driver.delete_user, + user_id=uuid.uuid4().hex) + + def test_create_group(self): + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = uuid.uuid4().hex + new_group = self.driver.create_group(group_id, group) + self.assertEqual(group_id, new_group['id']) + + def test_create_group_all_attrs(self): + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = uuid.uuid4().hex + new_group = self.driver.create_group(group_id, group) + self.assertEqual(group, new_group) + + def test_create_group_duplicate_exc(self): + group1_id = uuid.uuid4().hex + name = uuid.uuid4().hex + domain = uuid.uuid4().hex + group1 = { + 'id': group1_id, + 'name': name, + } + if self.driver.is_domain_aware(): + group1['domain_id'] = domain + self.driver.create_group(group1_id, group1) + + group2_id = uuid.uuid4().hex + group2 = { + 'id': group2_id, + 'name': name, + } + if self.driver.is_domain_aware(): + group2['domain_id'] = domain + self.assertRaises(exception.Conflict, self.driver.create_group, + group2_id, group2) + + def test_get_group(self): + group = self.create_group() + + actual_group = self.driver.get_group(group['id']) + self.assertEqual(group['id'], actual_group['id']) + + def test_get_group_no_group_exc(self): + # When the group doesn't exist, get_group raises GroupNotFound. + self.assertRaises(exception.GroupNotFound, self.driver.get_group, + group_id=uuid.uuid4().hex) + + def test_get_group_by_name(self): + domain_id = uuid.uuid4().hex + group = self.create_group(domain_id=domain_id) + + actual_group = self.driver.get_group_by_name(group['name'], domain_id) + self.assertEqual(group['id'], actual_group['id']) + + def test_get_group_by_name_no_user_exc(self): + # When the group doesn't exist, get_group raises GroupNotFound. + self.assertRaises( + exception.GroupNotFound, self.driver.get_group_by_name, + group_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex) + + def test_update_group(self): + group = self.create_group() + + new_description = uuid.uuid4().hex + group_mod = {'description': new_description} + actual_group = self.driver.update_group(group['id'], group_mod) + self.assertEqual(new_description, actual_group['description']) + + def test_update_group_no_group(self): + # When the group doesn't exist, GroupNotFound is raised. + group_mod = {'description': uuid.uuid4().hex} + self.assertRaises(exception.GroupNotFound, self.driver.update_group, + group_id=uuid.uuid4().hex, group=group_mod) + + def test_update_group_name_already_exists(self): + # For drivers that support renaming, when the group is renamed to a + # name that already exists, Conflict is raised. + + if not self.allows_name_update: + self.skipTest("driver doesn't allow name update") + + domain_id = uuid.uuid4().hex + group1 = self.create_group(domain_id=domain_id) + group2 = self.create_group(domain_id=domain_id) + + group_mod = {'name': group1['name']} + self.assertRaises(exception.Conflict, self.driver.update_group, + group2['id'], group_mod) + + def test_update_group_name_not_allowed(self): + # For drivers that do not support renaming, when the group is attempted + # to be renamed ValidationError is raised. + + if self.allows_name_update: + self.skipTest("driver allows name update") + + group = self.create_group() + + group_mod = {'name': uuid.uuid4().hex} + self.assertRaises(exception.ValidationError, self.driver.update_group, + group['id'], group_mod) + + def test_delete_group(self): + group = self.create_group() + self.driver.delete_group(group['id']) + self.assertRaises(exception.GroupNotFound, self.driver.get_group, + group['id']) + + def test_delete_group_doesnt_exist_exc(self): + self.assertRaises(exception.GroupNotFound, self.driver.delete_group, + group_id=uuid.uuid4().hex) + + def test_list_groups_no_groups(self): + groups = self.driver.list_groups(driver_hints.Hints()) + self.assertEqual([], groups) + + def test_list_groups_one_group(self): + group = self.create_group() + groups = self.driver.list_groups(driver_hints.Hints()) + self.assertEqual(group['id'], groups[0]['id']) + + def test_add_user_to_group(self): + user = self.create_user() + group = self.create_group() + + self.driver.add_user_to_group(user['id'], group['id']) + + # No assert since if doesn't raise, then successful. + self.driver.check_user_in_group(user['id'], group['id']) + + def test_add_user_to_group_no_user_exc(self): + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises(exception.UserNotFound, + self.driver.add_user_to_group, user_id, group['id']) + + def test_add_user_to_group_no_group_exc(self): + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises(exception.GroupNotFound, + self.driver.add_user_to_group, user['id'], group_id) + + def test_check_user_in_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + # No assert since if doesn't raise, then successful. + self.driver.check_user_in_group(user['id'], group['id']) + + def test_check_user_in_group_user_not_in_group_exc(self): + user = self.create_user() + group = self.create_group() + + self.assertRaises(exception.NotFound, self.driver.check_user_in_group, + user['id'], group['id']) + + def test_check_user_in_group_user_doesnt_exist_exc(self): + # When the user doesn't exist, UserNotFound is raised. + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.check_user_in_group, user_id, + group['id']) + + def test_check_user_in_group_group_doesnt_exist_exc(self): + # When the group doesn't exist, UserNotFound is raised. + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.check_user_in_group, + user['id'], group_id) + + def test_list_users_in_group_no_users(self): + group = self.create_group() + + users = self.driver.list_users_in_group(group['id'], + driver_hints.Hints()) + self.assertEqual([], users) + + def test_list_users_in_group_user(self): + group = self.create_group() + user = self.create_user() + self.driver.add_user_to_group(user['id'], group['id']) + + users = self.driver.list_users_in_group(group['id'], + driver_hints.Hints()) + self.assertEqual([user['id']], [u['id'] for u in users]) + + def test_list_users_in_group_no_group(self): + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.list_users_in_group, group_id, + driver_hints.Hints()) + + def test_list_groups_for_user_no_groups(self): + user = self.create_user() + + groups = self.driver.list_groups_for_user(user['id'], + driver_hints.Hints()) + self.assertEqual([], groups) + + def test_list_groups_for_user_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + groups = self.driver.list_groups_for_user(user['id'], + driver_hints.Hints()) + self.assertEqual([group['id']], [g['id'] for g in groups]) + + def test_list_groups_for_user_no_user(self): + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.list_groups_for_user, + user_id, driver_hints.Hints()) + + def test_remove_user_from_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + self.driver.remove_user_from_group(user['id'], group['id']) + + self.assertRaises(exception.NotFound, self.driver.check_user_in_group, + user['id'], group['id']) + + def test_remove_user_from_group_not_in_group(self): + user = self.create_user() + group = self.create_group() + + # FIXME(blk-u): ldap is returning UserNotFound rather than NotFound, + # fix this. + self.assertRaises( + exception.NotFound, self.driver.remove_user_from_group, user['id'], + group['id']) + + def test_remove_user_from_group_no_user(self): + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.remove_user_from_group, + user_id, group['id']) + + def test_remove_user_from_group_no_group(self): + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.remove_user_from_group, + user['id'], group_id) + + def test_authenticate(self): + password = uuid.uuid4().hex + user = self.create_user(password=password) + + actual_user = self.driver.authenticate(user['id'], password) + self.assertEqual(user['id'], actual_user['id']) + + def test_authenticate_wrong_password(self): + user = self.create_user(password=uuid.uuid4().hex) + + password = uuid.uuid4().hex + self.assertRaises(AssertionError, self.driver.authenticate, user['id'], + password) + + def test_authenticate_no_user(self): + user_id = uuid.uuid4().hex + password = uuid.uuid4().hex + self.assertRaises(AssertionError, self.driver.authenticate, user_id, + password) diff --git a/keystone/tests/unit/identity/backends/test_ldap.py b/keystone/tests/unit/identity/backends/test_ldap.py new file mode 100644 index 0000000000..58972642a7 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_ldap.py @@ -0,0 +1,43 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import fixture as config_fixture + +from keystone.identity.backends import ldap +from keystone.tests.unit import core +from keystone.tests.unit.identity.backends import test_base +from keystone.tests.unit.ksfixtures import ldapdb + + +class TestIdentityDriver(core.BaseTestCase, + test_base.IdentityDriverV8Tests): + + allows_name_update = False + expected_is_domain_aware = False + expected_default_assignment_driver = 'sql' + expected_is_sql = False + expected_generates_uuids = False + + def setUp(self): + super(TestIdentityDriver, self).setUp() + + config_fixture_ = self.useFixture(config_fixture.Config()) + config_fixture_.config( + group='ldap', + url='fake://memory', + user='cn=Admin', + password='password', + suffix='cn=example,cn=com') + + self.useFixture(ldapdb.LDAPDatabase()) + + self.driver = ldap.Identity() diff --git a/keystone/tests/unit/identity/backends/test_sql.py b/keystone/tests/unit/identity/backends/test_sql.py new file mode 100644 index 0000000000..c82c142c58 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_sql.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_db.sqlalchemy import test_base as db_test + +from keystone.common import sql +from keystone.identity.backends import sql as sql_backend +from keystone.tests.unit.identity.backends import test_base +from keystone.tests.unit.ksfixtures import database + + +class TestIdentityDriver(db_test.DbTestCase, + test_base.IdentityDriverV8Tests): + + expected_is_domain_aware = True + expected_default_assignment_driver = 'sql' + expected_is_sql = True + expected_generates_uuids = True + + def setUp(self): + super(TestIdentityDriver, self).setUp() + + # Set keystone's connection URL to be the test engine's url. + database.initialize_sql_session(self.engine.url) + + # Override keystone's context manager to be oslo.db's global context + # manager. + sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True + self.addCleanup(setattr, + sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False) + self.addCleanup(sql.cleanup) + + version_specifiers = {} + database._load_sqlalchemy_models(version_specifiers) + sql.ModelBase.metadata.create_all(bind=self.engine) + + self.driver = sql_backend.Identity() + + +class MySQLOpportunisticIdentityDriverTestCase(TestIdentityDriver): + FIXTURE = db_test.MySQLOpportunisticFixture + + +class PostgreSQLOpportunisticIdentityDriverTestCase(TestIdentityDriver): + FIXTURE = db_test.PostgreSQLOpportunisticFixture diff --git a/tests-py3-blacklist.txt b/tests-py3-blacklist.txt index 3f7a6fcc1b..be4163c926 100644 --- a/tests-py3-blacklist.txt +++ b/tests-py3-blacklist.txt @@ -1,4 +1,5 @@ keystone.tests.unit.common.test_ldap +keystone.tests.unit.identity.backends.test_ldap keystone.tests.unit.test_backend_ldap keystone.tests.unit.test_backend_ldap_pool keystone.tests.unit.test_v2