Define identity interface - easy cases

The identity interface isn't adequately specified. Nobody could
implement an identity driver using the interface definition as it
is since they wouldn't know what the inputs are or the outputs.
We can't write a Manager using only the interface definition since
the definition is incomplete (For example it didn't even mention
that create_user returns the user).

Several tests are added specifically for the driver interface.
The driver was only tested through the manager, so we didn't have
tests that showed that the driver actually followed its interface,
just however the manager called it.

You might notice that the tests are missing some obvious cases
that should be tested. In some cases this is because the existing
drivers that we have (sql and ldap) don't work the same way. For
example, create_group on the sql driver always returns description
even when it's None while the ldap driver doesn't. I wanted to
keep this change simple and not modify the drivers. The interface
definition can be enhanced along with the drivers and these tests
can be added later.

There might be more requirements that the manager puts on the
drivers than what's documented/tested here. These can be added
later.

Change-Id: I9a9c660f2d98b6ac6cb032b40b32089cb5cf1844
This commit is contained in:
Brant Knudson 2016-03-11 16:24:58 -06:00
parent 23bb657369
commit 3956163569
6 changed files with 799 additions and 14 deletions

View File

@ -48,7 +48,85 @@ def filter_user(user_ref):
@six.add_metaclass(abc.ABCMeta)
class IdentityDriverV8(object):
"""Interface description for an Identity driver."""
"""Interface description for an Identity driver.
The schema for users and groups is different depending on whether the
driver is domain aware or not (as returned by self.is_domain_aware()).
If the driver is not domain aware:
* domain_id will be not be included in the user / group passed in to
create_user / create_group
* the domain_id should not be returned in user / group refs. They'll be
overwritten.
User schema (if driver is domain aware)::
type: object
properties:
id:
type: string
name:
type: string
domain_id:
type: string
password:
type: string
enabled:
type: boolean
default_project_id:
type: string
required: [id, name, domain_id, enabled]
additionalProperties: True
User schema (if driver is not domain aware)::
type: object
properties:
id:
type: string
name:
type: string
password:
type: string
enabled:
type: boolean
default_project_id:
type: string
required: [id, name, enabled]
additionalProperties: True
# Note that domain_id is not allowed as a property
Group schema (if driver is domain aware)::
type: object
properties:
id:
type: string
name:
type: string
domain_id:
type: string
description:
type: string
required: [id, name, domain_id]
additionalProperties: True
Group schema (if driver is not domain aware)::
type: object
properties:
id:
type: string
name:
type: string
description:
type: string
required: [id, name]
additionalProperties: True
# Note that domain_id is not allowed as a property
"""
def _get_conf(self):
try:
@ -64,7 +142,7 @@ class IdentityDriverV8(object):
CONF.identity.list_limit or CONF.list_limit)
def is_domain_aware(self):
"""Indicate if Driver supports domains."""
"""Indicate if the driver supports domains."""
return True
def default_assignment_driver(self):
@ -90,7 +168,12 @@ class IdentityDriverV8(object):
def authenticate(self, user_id, password):
"""Authenticate a given user and password.
:returns: user_ref
:param str user_id: User ID
:param str password: Password
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
:rtype: dict
:raises AssertionError: If user or password is invalid.
"""
raise exception.NotImplemented() # pragma: no cover
@ -101,6 +184,14 @@ class IdentityDriverV8(object):
def create_user(self, user_id, user):
"""Create a new user.
:param str user_id: user ID. The driver can ignore this value.
:param dict user: user info. See user schema in
:class:`~.IdentityDriverV8`.
:returns: user, matching the user schema. The driver should not return
the password.
:rtype: dict
:raises keystone.exception.Conflict: If a duplicate user exists.
"""
@ -112,8 +203,11 @@ class IdentityDriverV8(object):
:param hints: filter hints which the driver should
implement if at all possible.
:type hints: keystone.common.driver_hints.Hints
:returns: a list of user_refs or an empty list.
:returns: a list of users or an empty list. See user schema in
:class:`~.IdentityDriverV8`.
:rtype: list of dict
"""
raise exception.NotImplemented() # pragma: no cover
@ -122,11 +216,16 @@ class IdentityDriverV8(object):
def list_users_in_group(self, group_id, hints):
"""List users in a group.
:param group_id: the group in question
:param str group_id: the group in question
:param hints: filter hints which the driver should
implement if at all possible.
:type hints: keystone.common.driver_hints.Hints
:returns: a list of user_refs or an empty list.
:returns: a list of users or an empty list. See user schema in
:class:`~.IdentityDriverV8`.
:rtype: list of dict
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
"""
raise exception.NotImplemented() # pragma: no cover
@ -135,7 +234,11 @@ class IdentityDriverV8(object):
def get_user(self, user_id):
"""Get a user by ID.
:returns: user_ref
:param str user_id: User ID.
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
:rtype: dict
:raises keystone.exception.UserNotFound: If the user doesn't exist.
"""
@ -145,8 +248,16 @@ class IdentityDriverV8(object):
def update_user(self, user_id, user):
"""Update an existing user.
:param str user_id: User ID.
:param dict user: User modification. See user schema in
:class:`~.IdentityDriverV8`. Properties set to None will be
removed. Required properties cannot be removed.
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
:raises keystone.exception.UserNotFound: If the user doesn't exist.
:raises keystone.exception.Conflict: If a duplicate user exists.
:raises keystone.exception.Conflict: If a duplicate user exists in the
same domain.
"""
raise exception.NotImplemented() # pragma: no cover
@ -155,6 +266,9 @@ class IdentityDriverV8(object):
def add_user_to_group(self, user_id, group_id):
"""Add a user to a group.
:param str user_id: User ID.
:param str group_id: Group ID.
:raises keystone.exception.UserNotFound: If the user doesn't exist.
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
@ -165,6 +279,11 @@ class IdentityDriverV8(object):
def check_user_in_group(self, user_id, group_id):
"""Check if a user is a member of a group.
:param str user_id: User ID.
:param str group_id: Group ID.
:raises keystone.exception.NotFound: If the user is not a member of the
group.
:raises keystone.exception.UserNotFound: If the user doesn't exist.
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
@ -175,7 +294,10 @@ class IdentityDriverV8(object):
def remove_user_from_group(self, user_id, group_id):
"""Remove a user from a group.
:raises keystone.exception.NotFound: If the entity not found.
:param str user_id: User ID.
:param str group_id: Group ID.
:raises keystone.exception.NotFound: If the user is not in the group.
"""
raise exception.NotImplemented() # pragma: no cover
@ -205,6 +327,13 @@ class IdentityDriverV8(object):
def create_group(self, group_id, group):
"""Create a new group.
:param str group_id: group ID. The driver can ignore this value.
:param dict group: group info. See group schema in
:class:`~.IdentityDriverV8`.
:returns: group, matching the group schema.
:rtype: dict
:raises keystone.exception.Conflict: If a duplicate group exists.
"""
@ -216,8 +345,10 @@ class IdentityDriverV8(object):
:param hints: filter hints which the driver should
implement if at all possible.
:type hints: keystone.common.driver_hints.Hints
:returns: a list of group_refs or an empty list.
:returns: a list of group_refs or an empty list. See group schema in
:class:`~.IdentityDriverV8`.
"""
raise exception.NotImplemented() # pragma: no cover
@ -226,11 +357,15 @@ class IdentityDriverV8(object):
def list_groups_for_user(self, user_id, hints):
"""List groups a user is in
:param user_id: the user in question
:param str user_id: the user in question
:param hints: filter hints which the driver should
implement if at all possible.
:type hints: keystone.common.driver_hints.Hints
:returns: a list of group_refs or an empty list.
:returns: a list of group_refs or an empty list. See group schema in
:class:`~.IdentityDriverV8`.
:raises keystone.exception.UserNotFound: If the user doesn't exist.
"""
raise exception.NotImplemented() # pragma: no cover
@ -239,7 +374,10 @@ class IdentityDriverV8(object):
def get_group(self, group_id):
"""Get a group by ID.
:returns: group_ref
:param str group_id: group ID.
:returns: group info. See group schema in :class:`~.IdentityDriverV8`.
:rtype: dict
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
"""
@ -249,7 +387,11 @@ class IdentityDriverV8(object):
def get_group_by_name(self, group_name, domain_id):
"""Get a group by name.
:returns: group_ref
:param str group_name: group name.
:param str domain_id: domain ID.
:returns: group info. See group schema in :class:`~.IdentityDriverV8`.
:rtype: dict
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
"""
@ -259,6 +401,13 @@ class IdentityDriverV8(object):
def update_group(self, group_id, group):
"""Update an existing group.
:param str group_id: Group ID.
:param dict group: Group modification. See group schema in
:class:`~.IdentityDriverV8`. Required properties cannot be removed.
:returns: group, matching the group schema.
:rtype: dict
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
:raises keystone.exception.Conflict: If a duplicate group exists.
@ -269,6 +418,8 @@ class IdentityDriverV8(object):
def delete_group(self, group_id):
"""Delete an existing group.
:param str group_id: Group ID.
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
"""

View File

@ -0,0 +1,535 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone.common import driver_hints
from keystone import exception
class IdentityDriverV8Tests(object):
driver = None # subclasses must override driver to the actual driver.
# subclasses that don't allow name updates must set this to False.
allows_name_update = True
# Subclasses must override this to indicate whether it's domain-aware or
# not.
expected_is_domain_aware = True
# Subclasses must override this to the expected default assignment driver.
expected_default_assignment_driver = 'sql'
# Subclasses must override this to the expected is_sql value.
expected_is_sql = False
# Subclasses must override this to the expected expected_generates_uuids
# value.
expected_generates_uuids = True
def create_user(self, domain_id=None, **kwargs):
"""Get a user for the test.
Subclasses can override this to provide their own way to provide a user
for the test. By default, driver.create_user is used. For drivers that
don't support create_user, this may go directly to the backend, or
maybe it gets a user from a set of pre-created users.
"""
user_id = uuid.uuid4().hex
user = {
'id': user_id,
'name': uuid.uuid4().hex,
'enabled': True,
}
if self.driver.is_domain_aware():
user['domain_id'] = domain_id or uuid.uuid4().hex
user.update(kwargs)
return self.driver.create_user(user_id, user)
def create_group(self, domain_id=None):
"""Get a group for the test.
Similar to :meth:`~.create_user`, subclasses can override this to
provide their own way to provide a group for the test.
"""
group_id = uuid.uuid4().hex
group = {
'id': group_id,
'name': uuid.uuid4().hex,
}
if self.driver.is_domain_aware():
group['domain_id'] = domain_id or uuid.uuid4().hex
return self.driver.create_group(group_id, group)
def test_is_domain_aware(self):
self.assertIs(self.expected_is_domain_aware,
self.driver.is_domain_aware())
def test_default_assignment_driver(self):
self.assertEqual(self.expected_default_assignment_driver,
self.driver.default_assignment_driver())
def test_is_sql(self):
self.assertIs(self.expected_is_sql, self.driver.is_sql)
def test_generates_uuids(self):
self.assertIs(self.expected_generates_uuids,
self.driver.generates_uuids())
def test_create_user(self):
# Don't use self.create_user since this needs to test the driver
# interface and create_user might not use the driver.
user_id = uuid.uuid4().hex
user = {
'id': user_id,
'name': uuid.uuid4().hex,
'enabled': True
}
if self.driver.is_domain_aware():
user['domain_id'] = uuid.uuid4().hex
ret = self.driver.create_user(user_id, user)
self.assertEqual(user_id, ret['id'])
def test_create_user_all_attributes(self):
user_id = uuid.uuid4().hex
user = {
'id': user_id,
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
'enabled': True,
'default_project_id': uuid.uuid4().hex,
}
if self.driver.is_domain_aware():
user['domain_id'] = uuid.uuid4().hex
ret = self.driver.create_user(user_id, user)
exp_user = user.copy()
del exp_user['password']
self.assertEqual(exp_user, ret)
def test_create_user_same_id_exc(self):
user_id = uuid.uuid4().hex
user = {
'id': user_id,
'name': uuid.uuid4().hex,
'enabled': True,
}
if self.driver.is_domain_aware():
user['domain_id'] = uuid.uuid4().hex
self.driver.create_user(user_id, user)
self.assertRaises(exception.Conflict,
self.driver.create_user, user_id, user)
def test_create_user_same_name_and_domain_exc(self):
user1_id = uuid.uuid4().hex
name = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
user = {
'id': user1_id,
'name': name,
'enabled': True,
}
if self.driver.is_domain_aware():
user['domain_id'] = domain_id
self.driver.create_user(user1_id, user)
user2_id = uuid.uuid4().hex
user = {
'id': user2_id,
'name': name,
'enabled': True,
}
if self.driver.is_domain_aware():
user['domain_id'] = domain_id
self.assertRaises(exception.Conflict,
self.driver.create_user, user2_id, user)
def test_list_users_no_users(self):
hints = driver_hints.Hints()
self.assertEqual([], self.driver.list_users(hints))
def test_list_users_when_users(self):
user = self.create_user()
hints = driver_hints.Hints()
users = self.driver.list_users(hints)
self.assertEqual([user['id']], [u['id'] for u in users])
def test_get_user(self):
user = self.create_user()
actual_user = self.driver.get_user(user['id'])
self.assertEqual(user['id'], actual_user['id'])
def test_get_user_no_user_exc(self):
self.assertRaises(exception.UserNotFound,
self.driver.get_user, uuid.uuid4().hex)
def test_get_user_by_name(self):
domain_id = uuid.uuid4().hex
user = self.create_user(domain_id=domain_id)
actual_user = self.driver.get_user_by_name(user['name'], domain_id)
self.assertEqual(user['id'], actual_user['id'])
def test_get_user_by_name_no_user_exc(self):
# When the user doesn't exist, UserNotFound is raised.
self.assertRaises(
exception.UserNotFound, self.driver.get_user_by_name,
user_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex)
def test_update_user(self):
user = self.create_user()
user_mod = {'enabled': False}
actual_user = self.driver.update_user(user['id'], user_mod)
self.assertEqual(user['id'], actual_user['id'])
self.assertIs(False, actual_user['enabled'])
def test_update_user_remove_optional_attribute(self):
# When the attribute has a value of None it's supposed to be removed.
user = self.create_user(default_project_id=uuid.uuid4().hex)
self.assertIn('default_project_id', user)
user_mod = {'default_project_id': None}
actual_user = self.driver.update_user(user['id'], user_mod)
self.assertNotIn('default_project_id', actual_user)
def test_update_user_same_name_exc(self):
# For drivers that allow name update, if the name of a user is changed
# to the same as another user in the same domain, Conflict is raised.
if not self.allows_name_update:
self.skipTest("Backend doesn't allow name update.")
domain_id = uuid.uuid4().hex
user1 = self.create_user(domain_id=domain_id)
user2 = self.create_user(domain_id=domain_id)
user_mod = {'name': user2['name']}
self.assertRaises(exception.Conflict, self.driver.update_user,
user1['id'], user_mod)
def test_update_user_no_user_exc(self):
user_id = uuid.uuid4().hex
user_mod = {'enabled': False}
self.assertRaises(exception.UserNotFound,
self.driver.update_user, user_id, user_mod)
def test_update_user_name_not_allowed_exc(self):
# For drivers that do not allow name update, attempting to change the
# name causes an exception.
if self.allows_name_update:
self.skipTest("Backend allows name update.")
user = self.create_user()
user_mod = {'name': uuid.uuid4().hex}
self.assertRaises(exception.Conflict, self.driver.update_user,
user['id'], user_mod)
def test_delete_user(self):
user = self.create_user()
self.driver.delete_user(user['id'])
self.assertRaises(exception.UserNotFound, self.driver.get_user,
user['id'])
def test_delete_user_no_user_exc(self):
# When the user doesn't exist, UserNotFound is raised.
self.assertRaises(exception.UserNotFound, self.driver.delete_user,
user_id=uuid.uuid4().hex)
def test_create_group(self):
group_id = uuid.uuid4().hex
group = {
'id': group_id,
'name': uuid.uuid4().hex,
}
if self.driver.is_domain_aware():
group['domain_id'] = uuid.uuid4().hex
new_group = self.driver.create_group(group_id, group)
self.assertEqual(group_id, new_group['id'])
def test_create_group_all_attrs(self):
group_id = uuid.uuid4().hex
group = {
'id': group_id,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
}
if self.driver.is_domain_aware():
group['domain_id'] = uuid.uuid4().hex
new_group = self.driver.create_group(group_id, group)
self.assertEqual(group, new_group)
def test_create_group_duplicate_exc(self):
group1_id = uuid.uuid4().hex
name = uuid.uuid4().hex
domain = uuid.uuid4().hex
group1 = {
'id': group1_id,
'name': name,
}
if self.driver.is_domain_aware():
group1['domain_id'] = domain
self.driver.create_group(group1_id, group1)
group2_id = uuid.uuid4().hex
group2 = {
'id': group2_id,
'name': name,
}
if self.driver.is_domain_aware():
group2['domain_id'] = domain
self.assertRaises(exception.Conflict, self.driver.create_group,
group2_id, group2)
def test_get_group(self):
group = self.create_group()
actual_group = self.driver.get_group(group['id'])
self.assertEqual(group['id'], actual_group['id'])
def test_get_group_no_group_exc(self):
# When the group doesn't exist, get_group raises GroupNotFound.
self.assertRaises(exception.GroupNotFound, self.driver.get_group,
group_id=uuid.uuid4().hex)
def test_get_group_by_name(self):
domain_id = uuid.uuid4().hex
group = self.create_group(domain_id=domain_id)
actual_group = self.driver.get_group_by_name(group['name'], domain_id)
self.assertEqual(group['id'], actual_group['id'])
def test_get_group_by_name_no_user_exc(self):
# When the group doesn't exist, get_group raises GroupNotFound.
self.assertRaises(
exception.GroupNotFound, self.driver.get_group_by_name,
group_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex)
def test_update_group(self):
group = self.create_group()
new_description = uuid.uuid4().hex
group_mod = {'description': new_description}
actual_group = self.driver.update_group(group['id'], group_mod)
self.assertEqual(new_description, actual_group['description'])
def test_update_group_no_group(self):
# When the group doesn't exist, GroupNotFound is raised.
group_mod = {'description': uuid.uuid4().hex}
self.assertRaises(exception.GroupNotFound, self.driver.update_group,
group_id=uuid.uuid4().hex, group=group_mod)
def test_update_group_name_already_exists(self):
# For drivers that support renaming, when the group is renamed to a
# name that already exists, Conflict is raised.
if not self.allows_name_update:
self.skipTest("driver doesn't allow name update")
domain_id = uuid.uuid4().hex
group1 = self.create_group(domain_id=domain_id)
group2 = self.create_group(domain_id=domain_id)
group_mod = {'name': group1['name']}
self.assertRaises(exception.Conflict, self.driver.update_group,
group2['id'], group_mod)
def test_update_group_name_not_allowed(self):
# For drivers that do not support renaming, when the group is attempted
# to be renamed ValidationError is raised.
if self.allows_name_update:
self.skipTest("driver allows name update")
group = self.create_group()
group_mod = {'name': uuid.uuid4().hex}
self.assertRaises(exception.ValidationError, self.driver.update_group,
group['id'], group_mod)
def test_delete_group(self):
group = self.create_group()
self.driver.delete_group(group['id'])
self.assertRaises(exception.GroupNotFound, self.driver.get_group,
group['id'])
def test_delete_group_doesnt_exist_exc(self):
self.assertRaises(exception.GroupNotFound, self.driver.delete_group,
group_id=uuid.uuid4().hex)
def test_list_groups_no_groups(self):
groups = self.driver.list_groups(driver_hints.Hints())
self.assertEqual([], groups)
def test_list_groups_one_group(self):
group = self.create_group()
groups = self.driver.list_groups(driver_hints.Hints())
self.assertEqual(group['id'], groups[0]['id'])
def test_add_user_to_group(self):
user = self.create_user()
group = self.create_group()
self.driver.add_user_to_group(user['id'], group['id'])
# No assert since if doesn't raise, then successful.
self.driver.check_user_in_group(user['id'], group['id'])
def test_add_user_to_group_no_user_exc(self):
group = self.create_group()
user_id = uuid.uuid4().hex
self.assertRaises(exception.UserNotFound,
self.driver.add_user_to_group, user_id, group['id'])
def test_add_user_to_group_no_group_exc(self):
user = self.create_user()
group_id = uuid.uuid4().hex
self.assertRaises(exception.GroupNotFound,
self.driver.add_user_to_group, user['id'], group_id)
def test_check_user_in_group(self):
user = self.create_user()
group = self.create_group()
self.driver.add_user_to_group(user['id'], group['id'])
# No assert since if doesn't raise, then successful.
self.driver.check_user_in_group(user['id'], group['id'])
def test_check_user_in_group_user_not_in_group_exc(self):
user = self.create_user()
group = self.create_group()
self.assertRaises(exception.NotFound, self.driver.check_user_in_group,
user['id'], group['id'])
def test_check_user_in_group_user_doesnt_exist_exc(self):
# When the user doesn't exist, UserNotFound is raised.
group = self.create_group()
user_id = uuid.uuid4().hex
self.assertRaises(
exception.UserNotFound, self.driver.check_user_in_group, user_id,
group['id'])
def test_check_user_in_group_group_doesnt_exist_exc(self):
# When the group doesn't exist, UserNotFound is raised.
user = self.create_user()
group_id = uuid.uuid4().hex
self.assertRaises(
exception.GroupNotFound, self.driver.check_user_in_group,
user['id'], group_id)
def test_list_users_in_group_no_users(self):
group = self.create_group()
users = self.driver.list_users_in_group(group['id'],
driver_hints.Hints())
self.assertEqual([], users)
def test_list_users_in_group_user(self):
group = self.create_group()
user = self.create_user()
self.driver.add_user_to_group(user['id'], group['id'])
users = self.driver.list_users_in_group(group['id'],
driver_hints.Hints())
self.assertEqual([user['id']], [u['id'] for u in users])
def test_list_users_in_group_no_group(self):
group_id = uuid.uuid4().hex
self.assertRaises(
exception.GroupNotFound, self.driver.list_users_in_group, group_id,
driver_hints.Hints())
def test_list_groups_for_user_no_groups(self):
user = self.create_user()
groups = self.driver.list_groups_for_user(user['id'],
driver_hints.Hints())
self.assertEqual([], groups)
def test_list_groups_for_user_group(self):
user = self.create_user()
group = self.create_group()
self.driver.add_user_to_group(user['id'], group['id'])
groups = self.driver.list_groups_for_user(user['id'],
driver_hints.Hints())
self.assertEqual([group['id']], [g['id'] for g in groups])
def test_list_groups_for_user_no_user(self):
user_id = uuid.uuid4().hex
self.assertRaises(
exception.UserNotFound, self.driver.list_groups_for_user,
user_id, driver_hints.Hints())
def test_remove_user_from_group(self):
user = self.create_user()
group = self.create_group()
self.driver.add_user_to_group(user['id'], group['id'])
self.driver.remove_user_from_group(user['id'], group['id'])
self.assertRaises(exception.NotFound, self.driver.check_user_in_group,
user['id'], group['id'])
def test_remove_user_from_group_not_in_group(self):
user = self.create_user()
group = self.create_group()
# FIXME(blk-u): ldap is returning UserNotFound rather than NotFound,
# fix this.
self.assertRaises(
exception.NotFound, self.driver.remove_user_from_group, user['id'],
group['id'])
def test_remove_user_from_group_no_user(self):
group = self.create_group()
user_id = uuid.uuid4().hex
self.assertRaises(
exception.UserNotFound, self.driver.remove_user_from_group,
user_id, group['id'])
def test_remove_user_from_group_no_group(self):
user = self.create_user()
group_id = uuid.uuid4().hex
self.assertRaises(
exception.GroupNotFound, self.driver.remove_user_from_group,
user['id'], group_id)
def test_authenticate(self):
password = uuid.uuid4().hex
user = self.create_user(password=password)
actual_user = self.driver.authenticate(user['id'], password)
self.assertEqual(user['id'], actual_user['id'])
def test_authenticate_wrong_password(self):
user = self.create_user(password=uuid.uuid4().hex)
password = uuid.uuid4().hex
self.assertRaises(AssertionError, self.driver.authenticate, user['id'],
password)
def test_authenticate_no_user(self):
user_id = uuid.uuid4().hex
password = uuid.uuid4().hex
self.assertRaises(AssertionError, self.driver.authenticate, user_id,
password)

View File

@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture as config_fixture
from keystone.identity.backends import ldap
from keystone.tests.unit import core
from keystone.tests.unit.identity.backends import test_base
from keystone.tests.unit.ksfixtures import ldapdb
class TestIdentityDriver(core.BaseTestCase,
test_base.IdentityDriverV8Tests):
allows_name_update = False
expected_is_domain_aware = False
expected_default_assignment_driver = 'sql'
expected_is_sql = False
expected_generates_uuids = False
def setUp(self):
super(TestIdentityDriver, self).setUp()
config_fixture_ = self.useFixture(config_fixture.Config())
config_fixture_.config(
group='ldap',
url='fake://memory',
user='cn=Admin',
password='password',
suffix='cn=example,cn=com')
self.useFixture(ldapdb.LDAPDatabase())
self.driver = ldap.Identity()

View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db.sqlalchemy import test_base as db_test
from keystone.common import sql
from keystone.identity.backends import sql as sql_backend
from keystone.tests.unit.identity.backends import test_base
from keystone.tests.unit.ksfixtures import database
class TestIdentityDriver(db_test.DbTestCase,
test_base.IdentityDriverV8Tests):
expected_is_domain_aware = True
expected_default_assignment_driver = 'sql'
expected_is_sql = True
expected_generates_uuids = True
def setUp(self):
super(TestIdentityDriver, self).setUp()
# Set keystone's connection URL to be the test engine's url.
database.initialize_sql_session(self.engine.url)
# Override keystone's context manager to be oslo.db's global context
# manager.
sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True
self.addCleanup(setattr,
sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False)
self.addCleanup(sql.cleanup)
version_specifiers = {}
database._load_sqlalchemy_models(version_specifiers)
sql.ModelBase.metadata.create_all(bind=self.engine)
self.driver = sql_backend.Identity()
class MySQLOpportunisticIdentityDriverTestCase(TestIdentityDriver):
FIXTURE = db_test.MySQLOpportunisticFixture
class PostgreSQLOpportunisticIdentityDriverTestCase(TestIdentityDriver):
FIXTURE = db_test.PostgreSQLOpportunisticFixture

View File

@ -1,4 +1,5 @@
keystone.tests.unit.common.test_ldap
keystone.tests.unit.identity.backends.test_ldap
keystone.tests.unit.test_backend_ldap
keystone.tests.unit.test_backend_ldap_pool
keystone.tests.unit.test_v2