diff --git a/keystone/identity/backends/base.py b/keystone/identity/backends/base.py index f1c27a3556..76d5037733 100644 --- a/keystone/identity/backends/base.py +++ b/keystone/identity/backends/base.py @@ -48,7 +48,85 @@ def filter_user(user_ref): @six.add_metaclass(abc.ABCMeta) class IdentityDriverV8(object): - """Interface description for an Identity driver.""" + """Interface description for an Identity driver. + + The schema for users and groups is different depending on whether the + driver is domain aware or not (as returned by self.is_domain_aware()). + + If the driver is not domain aware: + + * domain_id will be not be included in the user / group passed in to + create_user / create_group + * the domain_id should not be returned in user / group refs. They'll be + overwritten. + + User schema (if driver is domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + domain_id: + type: string + password: + type: string + enabled: + type: boolean + default_project_id: + type: string + required: [id, name, domain_id, enabled] + additionalProperties: True + + User schema (if driver is not domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + password: + type: string + enabled: + type: boolean + default_project_id: + type: string + required: [id, name, enabled] + additionalProperties: True + # Note that domain_id is not allowed as a property + + Group schema (if driver is domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + domain_id: + type: string + description: + type: string + required: [id, name, domain_id] + additionalProperties: True + + Group schema (if driver is not domain aware):: + + type: object + properties: + id: + type: string + name: + type: string + description: + type: string + required: [id, name] + additionalProperties: True + # Note that domain_id is not allowed as a property + + """ def _get_conf(self): try: @@ -64,7 +142,7 @@ class IdentityDriverV8(object): CONF.identity.list_limit or CONF.list_limit) def is_domain_aware(self): - """Indicate if Driver supports domains.""" + """Indicate if the driver supports domains.""" return True def default_assignment_driver(self): @@ -90,7 +168,12 @@ class IdentityDriverV8(object): def authenticate(self, user_id, password): """Authenticate a given user and password. - :returns: user_ref + :param str user_id: User ID + :param str password: Password + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :rtype: dict + :raises AssertionError: If user or password is invalid. """ raise exception.NotImplemented() # pragma: no cover @@ -101,6 +184,14 @@ class IdentityDriverV8(object): def create_user(self, user_id, user): """Create a new user. + :param str user_id: user ID. The driver can ignore this value. + :param dict user: user info. See user schema in + :class:`~.IdentityDriverV8`. + + :returns: user, matching the user schema. The driver should not return + the password. + :rtype: dict + :raises keystone.exception.Conflict: If a duplicate user exists. """ @@ -112,8 +203,11 @@ class IdentityDriverV8(object): :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of user_refs or an empty list. + :returns: a list of users or an empty list. See user schema in + :class:`~.IdentityDriverV8`. + :rtype: list of dict """ raise exception.NotImplemented() # pragma: no cover @@ -122,11 +216,16 @@ class IdentityDriverV8(object): def list_users_in_group(self, group_id, hints): """List users in a group. - :param group_id: the group in question + :param str group_id: the group in question :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of user_refs or an empty list. + :returns: a list of users or an empty list. See user schema in + :class:`~.IdentityDriverV8`. + :rtype: list of dict + + :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ raise exception.NotImplemented() # pragma: no cover @@ -135,7 +234,11 @@ class IdentityDriverV8(object): def get_user(self, user_id): """Get a user by ID. - :returns: user_ref + :param str user_id: User ID. + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :rtype: dict + :raises keystone.exception.UserNotFound: If the user doesn't exist. """ @@ -145,8 +248,16 @@ class IdentityDriverV8(object): def update_user(self, user_id, user): """Update an existing user. + :param str user_id: User ID. + :param dict user: User modification. See user schema in + :class:`~.IdentityDriverV8`. Properties set to None will be + removed. Required properties cannot be removed. + + :returns: user. See user schema in :class:`~.IdentityDriverV8`. + :raises keystone.exception.UserNotFound: If the user doesn't exist. - :raises keystone.exception.Conflict: If a duplicate user exists. + :raises keystone.exception.Conflict: If a duplicate user exists in the + same domain. """ raise exception.NotImplemented() # pragma: no cover @@ -155,6 +266,9 @@ class IdentityDriverV8(object): def add_user_to_group(self, user_id, group_id): """Add a user to a group. + :param str user_id: User ID. + :param str group_id: Group ID. + :raises keystone.exception.UserNotFound: If the user doesn't exist. :raises keystone.exception.GroupNotFound: If the group doesn't exist. @@ -165,6 +279,11 @@ class IdentityDriverV8(object): def check_user_in_group(self, user_id, group_id): """Check if a user is a member of a group. + :param str user_id: User ID. + :param str group_id: Group ID. + + :raises keystone.exception.NotFound: If the user is not a member of the + group. :raises keystone.exception.UserNotFound: If the user doesn't exist. :raises keystone.exception.GroupNotFound: If the group doesn't exist. @@ -175,7 +294,10 @@ class IdentityDriverV8(object): def remove_user_from_group(self, user_id, group_id): """Remove a user from a group. - :raises keystone.exception.NotFound: If the entity not found. + :param str user_id: User ID. + :param str group_id: Group ID. + + :raises keystone.exception.NotFound: If the user is not in the group. """ raise exception.NotImplemented() # pragma: no cover @@ -205,6 +327,13 @@ class IdentityDriverV8(object): def create_group(self, group_id, group): """Create a new group. + :param str group_id: group ID. The driver can ignore this value. + :param dict group: group info. See group schema in + :class:`~.IdentityDriverV8`. + + :returns: group, matching the group schema. + :rtype: dict + :raises keystone.exception.Conflict: If a duplicate group exists. """ @@ -216,8 +345,10 @@ class IdentityDriverV8(object): :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of group_refs or an empty list. + :returns: a list of group_refs or an empty list. See group schema in + :class:`~.IdentityDriverV8`. """ raise exception.NotImplemented() # pragma: no cover @@ -226,11 +357,15 @@ class IdentityDriverV8(object): def list_groups_for_user(self, user_id, hints): """List groups a user is in - :param user_id: the user in question + :param str user_id: the user in question :param hints: filter hints which the driver should implement if at all possible. + :type hints: keystone.common.driver_hints.Hints - :returns: a list of group_refs or an empty list. + :returns: a list of group_refs or an empty list. See group schema in + :class:`~.IdentityDriverV8`. + + :raises keystone.exception.UserNotFound: If the user doesn't exist. """ raise exception.NotImplemented() # pragma: no cover @@ -239,7 +374,10 @@ class IdentityDriverV8(object): def get_group(self, group_id): """Get a group by ID. - :returns: group_ref + :param str group_id: group ID. + + :returns: group info. See group schema in :class:`~.IdentityDriverV8`. + :rtype: dict :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ @@ -249,7 +387,11 @@ class IdentityDriverV8(object): def get_group_by_name(self, group_name, domain_id): """Get a group by name. - :returns: group_ref + :param str group_name: group name. + :param str domain_id: domain ID. + + :returns: group info. See group schema in :class:`~.IdentityDriverV8`. + :rtype: dict :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ @@ -259,6 +401,13 @@ class IdentityDriverV8(object): def update_group(self, group_id, group): """Update an existing group. + :param str group_id: Group ID. + :param dict group: Group modification. See group schema in + :class:`~.IdentityDriverV8`. Required properties cannot be removed. + + :returns: group, matching the group schema. + :rtype: dict + :raises keystone.exception.GroupNotFound: If the group doesn't exist. :raises keystone.exception.Conflict: If a duplicate group exists. @@ -269,6 +418,8 @@ class IdentityDriverV8(object): def delete_group(self, group_id): """Delete an existing group. + :param str group_id: Group ID. + :raises keystone.exception.GroupNotFound: If the group doesn't exist. """ diff --git a/keystone/tests/unit/identity/backends/__init__.py b/keystone/tests/unit/identity/backends/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/keystone/tests/unit/identity/backends/test_base.py b/keystone/tests/unit/identity/backends/test_base.py new file mode 100644 index 0000000000..45e4c07249 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_base.py @@ -0,0 +1,535 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystone.common import driver_hints +from keystone import exception + + +class IdentityDriverV8Tests(object): + driver = None # subclasses must override driver to the actual driver. + + # subclasses that don't allow name updates must set this to False. + allows_name_update = True + + # Subclasses must override this to indicate whether it's domain-aware or + # not. + expected_is_domain_aware = True + + # Subclasses must override this to the expected default assignment driver. + expected_default_assignment_driver = 'sql' + + # Subclasses must override this to the expected is_sql value. + expected_is_sql = False + + # Subclasses must override this to the expected expected_generates_uuids + # value. + expected_generates_uuids = True + + def create_user(self, domain_id=None, **kwargs): + """Get a user for the test. + + Subclasses can override this to provide their own way to provide a user + for the test. By default, driver.create_user is used. For drivers that + don't support create_user, this may go directly to the backend, or + maybe it gets a user from a set of pre-created users. + """ + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id or uuid.uuid4().hex + user.update(kwargs) + return self.driver.create_user(user_id, user) + + def create_group(self, domain_id=None): + """Get a group for the test. + + Similar to :meth:`~.create_user`, subclasses can override this to + provide their own way to provide a group for the test. + """ + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = domain_id or uuid.uuid4().hex + return self.driver.create_group(group_id, group) + + def test_is_domain_aware(self): + self.assertIs(self.expected_is_domain_aware, + self.driver.is_domain_aware()) + + def test_default_assignment_driver(self): + self.assertEqual(self.expected_default_assignment_driver, + self.driver.default_assignment_driver()) + + def test_is_sql(self): + self.assertIs(self.expected_is_sql, self.driver.is_sql) + + def test_generates_uuids(self): + self.assertIs(self.expected_generates_uuids, + self.driver.generates_uuids()) + + def test_create_user(self): + # Don't use self.create_user since this needs to test the driver + # interface and create_user might not use the driver. + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + ret = self.driver.create_user(user_id, user) + self.assertEqual(user_id, ret['id']) + + def test_create_user_all_attributes(self): + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + 'enabled': True, + 'default_project_id': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + ret = self.driver.create_user(user_id, user) + exp_user = user.copy() + del exp_user['password'] + self.assertEqual(exp_user, ret) + + def test_create_user_same_id_exc(self): + user_id = uuid.uuid4().hex + user = { + 'id': user_id, + 'name': uuid.uuid4().hex, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = uuid.uuid4().hex + self.driver.create_user(user_id, user) + self.assertRaises(exception.Conflict, + self.driver.create_user, user_id, user) + + def test_create_user_same_name_and_domain_exc(self): + user1_id = uuid.uuid4().hex + name = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + user = { + 'id': user1_id, + 'name': name, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id + self.driver.create_user(user1_id, user) + + user2_id = uuid.uuid4().hex + user = { + 'id': user2_id, + 'name': name, + 'enabled': True, + } + if self.driver.is_domain_aware(): + user['domain_id'] = domain_id + self.assertRaises(exception.Conflict, + self.driver.create_user, user2_id, user) + + def test_list_users_no_users(self): + hints = driver_hints.Hints() + self.assertEqual([], self.driver.list_users(hints)) + + def test_list_users_when_users(self): + user = self.create_user() + + hints = driver_hints.Hints() + users = self.driver.list_users(hints) + self.assertEqual([user['id']], [u['id'] for u in users]) + + def test_get_user(self): + user = self.create_user() + + actual_user = self.driver.get_user(user['id']) + self.assertEqual(user['id'], actual_user['id']) + + def test_get_user_no_user_exc(self): + self.assertRaises(exception.UserNotFound, + self.driver.get_user, uuid.uuid4().hex) + + def test_get_user_by_name(self): + domain_id = uuid.uuid4().hex + user = self.create_user(domain_id=domain_id) + + actual_user = self.driver.get_user_by_name(user['name'], domain_id) + self.assertEqual(user['id'], actual_user['id']) + + def test_get_user_by_name_no_user_exc(self): + # When the user doesn't exist, UserNotFound is raised. + self.assertRaises( + exception.UserNotFound, self.driver.get_user_by_name, + user_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex) + + def test_update_user(self): + user = self.create_user() + + user_mod = {'enabled': False} + actual_user = self.driver.update_user(user['id'], user_mod) + self.assertEqual(user['id'], actual_user['id']) + self.assertIs(False, actual_user['enabled']) + + def test_update_user_remove_optional_attribute(self): + # When the attribute has a value of None it's supposed to be removed. + user = self.create_user(default_project_id=uuid.uuid4().hex) + self.assertIn('default_project_id', user) + + user_mod = {'default_project_id': None} + actual_user = self.driver.update_user(user['id'], user_mod) + self.assertNotIn('default_project_id', actual_user) + + def test_update_user_same_name_exc(self): + # For drivers that allow name update, if the name of a user is changed + # to the same as another user in the same domain, Conflict is raised. + + if not self.allows_name_update: + self.skipTest("Backend doesn't allow name update.") + + domain_id = uuid.uuid4().hex + user1 = self.create_user(domain_id=domain_id) + user2 = self.create_user(domain_id=domain_id) + + user_mod = {'name': user2['name']} + self.assertRaises(exception.Conflict, self.driver.update_user, + user1['id'], user_mod) + + def test_update_user_no_user_exc(self): + user_id = uuid.uuid4().hex + user_mod = {'enabled': False} + self.assertRaises(exception.UserNotFound, + self.driver.update_user, user_id, user_mod) + + def test_update_user_name_not_allowed_exc(self): + # For drivers that do not allow name update, attempting to change the + # name causes an exception. + + if self.allows_name_update: + self.skipTest("Backend allows name update.") + + user = self.create_user() + user_mod = {'name': uuid.uuid4().hex} + self.assertRaises(exception.Conflict, self.driver.update_user, + user['id'], user_mod) + + def test_delete_user(self): + user = self.create_user() + + self.driver.delete_user(user['id']) + self.assertRaises(exception.UserNotFound, self.driver.get_user, + user['id']) + + def test_delete_user_no_user_exc(self): + # When the user doesn't exist, UserNotFound is raised. + self.assertRaises(exception.UserNotFound, self.driver.delete_user, + user_id=uuid.uuid4().hex) + + def test_create_group(self): + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = uuid.uuid4().hex + new_group = self.driver.create_group(group_id, group) + self.assertEqual(group_id, new_group['id']) + + def test_create_group_all_attrs(self): + group_id = uuid.uuid4().hex + group = { + 'id': group_id, + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + } + if self.driver.is_domain_aware(): + group['domain_id'] = uuid.uuid4().hex + new_group = self.driver.create_group(group_id, group) + self.assertEqual(group, new_group) + + def test_create_group_duplicate_exc(self): + group1_id = uuid.uuid4().hex + name = uuid.uuid4().hex + domain = uuid.uuid4().hex + group1 = { + 'id': group1_id, + 'name': name, + } + if self.driver.is_domain_aware(): + group1['domain_id'] = domain + self.driver.create_group(group1_id, group1) + + group2_id = uuid.uuid4().hex + group2 = { + 'id': group2_id, + 'name': name, + } + if self.driver.is_domain_aware(): + group2['domain_id'] = domain + self.assertRaises(exception.Conflict, self.driver.create_group, + group2_id, group2) + + def test_get_group(self): + group = self.create_group() + + actual_group = self.driver.get_group(group['id']) + self.assertEqual(group['id'], actual_group['id']) + + def test_get_group_no_group_exc(self): + # When the group doesn't exist, get_group raises GroupNotFound. + self.assertRaises(exception.GroupNotFound, self.driver.get_group, + group_id=uuid.uuid4().hex) + + def test_get_group_by_name(self): + domain_id = uuid.uuid4().hex + group = self.create_group(domain_id=domain_id) + + actual_group = self.driver.get_group_by_name(group['name'], domain_id) + self.assertEqual(group['id'], actual_group['id']) + + def test_get_group_by_name_no_user_exc(self): + # When the group doesn't exist, get_group raises GroupNotFound. + self.assertRaises( + exception.GroupNotFound, self.driver.get_group_by_name, + group_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex) + + def test_update_group(self): + group = self.create_group() + + new_description = uuid.uuid4().hex + group_mod = {'description': new_description} + actual_group = self.driver.update_group(group['id'], group_mod) + self.assertEqual(new_description, actual_group['description']) + + def test_update_group_no_group(self): + # When the group doesn't exist, GroupNotFound is raised. + group_mod = {'description': uuid.uuid4().hex} + self.assertRaises(exception.GroupNotFound, self.driver.update_group, + group_id=uuid.uuid4().hex, group=group_mod) + + def test_update_group_name_already_exists(self): + # For drivers that support renaming, when the group is renamed to a + # name that already exists, Conflict is raised. + + if not self.allows_name_update: + self.skipTest("driver doesn't allow name update") + + domain_id = uuid.uuid4().hex + group1 = self.create_group(domain_id=domain_id) + group2 = self.create_group(domain_id=domain_id) + + group_mod = {'name': group1['name']} + self.assertRaises(exception.Conflict, self.driver.update_group, + group2['id'], group_mod) + + def test_update_group_name_not_allowed(self): + # For drivers that do not support renaming, when the group is attempted + # to be renamed ValidationError is raised. + + if self.allows_name_update: + self.skipTest("driver allows name update") + + group = self.create_group() + + group_mod = {'name': uuid.uuid4().hex} + self.assertRaises(exception.ValidationError, self.driver.update_group, + group['id'], group_mod) + + def test_delete_group(self): + group = self.create_group() + self.driver.delete_group(group['id']) + self.assertRaises(exception.GroupNotFound, self.driver.get_group, + group['id']) + + def test_delete_group_doesnt_exist_exc(self): + self.assertRaises(exception.GroupNotFound, self.driver.delete_group, + group_id=uuid.uuid4().hex) + + def test_list_groups_no_groups(self): + groups = self.driver.list_groups(driver_hints.Hints()) + self.assertEqual([], groups) + + def test_list_groups_one_group(self): + group = self.create_group() + groups = self.driver.list_groups(driver_hints.Hints()) + self.assertEqual(group['id'], groups[0]['id']) + + def test_add_user_to_group(self): + user = self.create_user() + group = self.create_group() + + self.driver.add_user_to_group(user['id'], group['id']) + + # No assert since if doesn't raise, then successful. + self.driver.check_user_in_group(user['id'], group['id']) + + def test_add_user_to_group_no_user_exc(self): + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises(exception.UserNotFound, + self.driver.add_user_to_group, user_id, group['id']) + + def test_add_user_to_group_no_group_exc(self): + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises(exception.GroupNotFound, + self.driver.add_user_to_group, user['id'], group_id) + + def test_check_user_in_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + # No assert since if doesn't raise, then successful. + self.driver.check_user_in_group(user['id'], group['id']) + + def test_check_user_in_group_user_not_in_group_exc(self): + user = self.create_user() + group = self.create_group() + + self.assertRaises(exception.NotFound, self.driver.check_user_in_group, + user['id'], group['id']) + + def test_check_user_in_group_user_doesnt_exist_exc(self): + # When the user doesn't exist, UserNotFound is raised. + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.check_user_in_group, user_id, + group['id']) + + def test_check_user_in_group_group_doesnt_exist_exc(self): + # When the group doesn't exist, UserNotFound is raised. + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.check_user_in_group, + user['id'], group_id) + + def test_list_users_in_group_no_users(self): + group = self.create_group() + + users = self.driver.list_users_in_group(group['id'], + driver_hints.Hints()) + self.assertEqual([], users) + + def test_list_users_in_group_user(self): + group = self.create_group() + user = self.create_user() + self.driver.add_user_to_group(user['id'], group['id']) + + users = self.driver.list_users_in_group(group['id'], + driver_hints.Hints()) + self.assertEqual([user['id']], [u['id'] for u in users]) + + def test_list_users_in_group_no_group(self): + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.list_users_in_group, group_id, + driver_hints.Hints()) + + def test_list_groups_for_user_no_groups(self): + user = self.create_user() + + groups = self.driver.list_groups_for_user(user['id'], + driver_hints.Hints()) + self.assertEqual([], groups) + + def test_list_groups_for_user_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + groups = self.driver.list_groups_for_user(user['id'], + driver_hints.Hints()) + self.assertEqual([group['id']], [g['id'] for g in groups]) + + def test_list_groups_for_user_no_user(self): + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.list_groups_for_user, + user_id, driver_hints.Hints()) + + def test_remove_user_from_group(self): + user = self.create_user() + group = self.create_group() + self.driver.add_user_to_group(user['id'], group['id']) + + self.driver.remove_user_from_group(user['id'], group['id']) + + self.assertRaises(exception.NotFound, self.driver.check_user_in_group, + user['id'], group['id']) + + def test_remove_user_from_group_not_in_group(self): + user = self.create_user() + group = self.create_group() + + # FIXME(blk-u): ldap is returning UserNotFound rather than NotFound, + # fix this. + self.assertRaises( + exception.NotFound, self.driver.remove_user_from_group, user['id'], + group['id']) + + def test_remove_user_from_group_no_user(self): + group = self.create_group() + + user_id = uuid.uuid4().hex + self.assertRaises( + exception.UserNotFound, self.driver.remove_user_from_group, + user_id, group['id']) + + def test_remove_user_from_group_no_group(self): + user = self.create_user() + + group_id = uuid.uuid4().hex + self.assertRaises( + exception.GroupNotFound, self.driver.remove_user_from_group, + user['id'], group_id) + + def test_authenticate(self): + password = uuid.uuid4().hex + user = self.create_user(password=password) + + actual_user = self.driver.authenticate(user['id'], password) + self.assertEqual(user['id'], actual_user['id']) + + def test_authenticate_wrong_password(self): + user = self.create_user(password=uuid.uuid4().hex) + + password = uuid.uuid4().hex + self.assertRaises(AssertionError, self.driver.authenticate, user['id'], + password) + + def test_authenticate_no_user(self): + user_id = uuid.uuid4().hex + password = uuid.uuid4().hex + self.assertRaises(AssertionError, self.driver.authenticate, user_id, + password) diff --git a/keystone/tests/unit/identity/backends/test_ldap.py b/keystone/tests/unit/identity/backends/test_ldap.py new file mode 100644 index 0000000000..58972642a7 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_ldap.py @@ -0,0 +1,43 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import fixture as config_fixture + +from keystone.identity.backends import ldap +from keystone.tests.unit import core +from keystone.tests.unit.identity.backends import test_base +from keystone.tests.unit.ksfixtures import ldapdb + + +class TestIdentityDriver(core.BaseTestCase, + test_base.IdentityDriverV8Tests): + + allows_name_update = False + expected_is_domain_aware = False + expected_default_assignment_driver = 'sql' + expected_is_sql = False + expected_generates_uuids = False + + def setUp(self): + super(TestIdentityDriver, self).setUp() + + config_fixture_ = self.useFixture(config_fixture.Config()) + config_fixture_.config( + group='ldap', + url='fake://memory', + user='cn=Admin', + password='password', + suffix='cn=example,cn=com') + + self.useFixture(ldapdb.LDAPDatabase()) + + self.driver = ldap.Identity() diff --git a/keystone/tests/unit/identity/backends/test_sql.py b/keystone/tests/unit/identity/backends/test_sql.py new file mode 100644 index 0000000000..c82c142c58 --- /dev/null +++ b/keystone/tests/unit/identity/backends/test_sql.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_db.sqlalchemy import test_base as db_test + +from keystone.common import sql +from keystone.identity.backends import sql as sql_backend +from keystone.tests.unit.identity.backends import test_base +from keystone.tests.unit.ksfixtures import database + + +class TestIdentityDriver(db_test.DbTestCase, + test_base.IdentityDriverV8Tests): + + expected_is_domain_aware = True + expected_default_assignment_driver = 'sql' + expected_is_sql = True + expected_generates_uuids = True + + def setUp(self): + super(TestIdentityDriver, self).setUp() + + # Set keystone's connection URL to be the test engine's url. + database.initialize_sql_session(self.engine.url) + + # Override keystone's context manager to be oslo.db's global context + # manager. + sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True + self.addCleanup(setattr, + sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False) + self.addCleanup(sql.cleanup) + + version_specifiers = {} + database._load_sqlalchemy_models(version_specifiers) + sql.ModelBase.metadata.create_all(bind=self.engine) + + self.driver = sql_backend.Identity() + + +class MySQLOpportunisticIdentityDriverTestCase(TestIdentityDriver): + FIXTURE = db_test.MySQLOpportunisticFixture + + +class PostgreSQLOpportunisticIdentityDriverTestCase(TestIdentityDriver): + FIXTURE = db_test.PostgreSQLOpportunisticFixture diff --git a/tests-py3-blacklist.txt b/tests-py3-blacklist.txt index 3f7a6fcc1b..be4163c926 100644 --- a/tests-py3-blacklist.txt +++ b/tests-py3-blacklist.txt @@ -1,4 +1,5 @@ keystone.tests.unit.common.test_ldap +keystone.tests.unit.identity.backends.test_ldap keystone.tests.unit.test_backend_ldap keystone.tests.unit.test_backend_ldap_pool keystone.tests.unit.test_v2