Merge "Define identity interface - easy cases"
This commit is contained in:
commit
389fcea47c
|
@ -48,7 +48,85 @@ def filter_user(user_ref):
|
|||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class IdentityDriverV8(object):
|
||||
"""Interface description for an Identity driver."""
|
||||
"""Interface description for an Identity driver.
|
||||
|
||||
The schema for users and groups is different depending on whether the
|
||||
driver is domain aware or not (as returned by self.is_domain_aware()).
|
||||
|
||||
If the driver is not domain aware:
|
||||
|
||||
* domain_id will be not be included in the user / group passed in to
|
||||
create_user / create_group
|
||||
* the domain_id should not be returned in user / group refs. They'll be
|
||||
overwritten.
|
||||
|
||||
User schema (if driver is domain aware)::
|
||||
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
domain_id:
|
||||
type: string
|
||||
password:
|
||||
type: string
|
||||
enabled:
|
||||
type: boolean
|
||||
default_project_id:
|
||||
type: string
|
||||
required: [id, name, domain_id, enabled]
|
||||
additionalProperties: True
|
||||
|
||||
User schema (if driver is not domain aware)::
|
||||
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
password:
|
||||
type: string
|
||||
enabled:
|
||||
type: boolean
|
||||
default_project_id:
|
||||
type: string
|
||||
required: [id, name, enabled]
|
||||
additionalProperties: True
|
||||
# Note that domain_id is not allowed as a property
|
||||
|
||||
Group schema (if driver is domain aware)::
|
||||
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
domain_id:
|
||||
type: string
|
||||
description:
|
||||
type: string
|
||||
required: [id, name, domain_id]
|
||||
additionalProperties: True
|
||||
|
||||
Group schema (if driver is not domain aware)::
|
||||
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
description:
|
||||
type: string
|
||||
required: [id, name]
|
||||
additionalProperties: True
|
||||
# Note that domain_id is not allowed as a property
|
||||
|
||||
"""
|
||||
|
||||
def _get_conf(self):
|
||||
try:
|
||||
|
@ -64,7 +142,7 @@ class IdentityDriverV8(object):
|
|||
CONF.identity.list_limit or CONF.list_limit)
|
||||
|
||||
def is_domain_aware(self):
|
||||
"""Indicate if Driver supports domains."""
|
||||
"""Indicate if the driver supports domains."""
|
||||
return True
|
||||
|
||||
def default_assignment_driver(self):
|
||||
|
@ -90,7 +168,12 @@ class IdentityDriverV8(object):
|
|||
def authenticate(self, user_id, password):
|
||||
"""Authenticate a given user and password.
|
||||
|
||||
:returns: user_ref
|
||||
:param str user_id: User ID
|
||||
:param str password: Password
|
||||
|
||||
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
|
||||
:rtype: dict
|
||||
|
||||
:raises AssertionError: If user or password is invalid.
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -101,6 +184,14 @@ class IdentityDriverV8(object):
|
|||
def create_user(self, user_id, user):
|
||||
"""Create a new user.
|
||||
|
||||
:param str user_id: user ID. The driver can ignore this value.
|
||||
:param dict user: user info. See user schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
|
||||
:returns: user, matching the user schema. The driver should not return
|
||||
the password.
|
||||
:rtype: dict
|
||||
|
||||
:raises keystone.exception.Conflict: If a duplicate user exists.
|
||||
|
||||
"""
|
||||
|
@ -112,8 +203,11 @@ class IdentityDriverV8(object):
|
|||
|
||||
:param hints: filter hints which the driver should
|
||||
implement if at all possible.
|
||||
:type hints: keystone.common.driver_hints.Hints
|
||||
|
||||
:returns: a list of user_refs or an empty list.
|
||||
:returns: a list of users or an empty list. See user schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
:rtype: list of dict
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -122,11 +216,16 @@ class IdentityDriverV8(object):
|
|||
def list_users_in_group(self, group_id, hints):
|
||||
"""List users in a group.
|
||||
|
||||
:param group_id: the group in question
|
||||
:param str group_id: the group in question
|
||||
:param hints: filter hints which the driver should
|
||||
implement if at all possible.
|
||||
:type hints: keystone.common.driver_hints.Hints
|
||||
|
||||
:returns: a list of user_refs or an empty list.
|
||||
:returns: a list of users or an empty list. See user schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
:rtype: list of dict
|
||||
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -135,7 +234,11 @@ class IdentityDriverV8(object):
|
|||
def get_user(self, user_id):
|
||||
"""Get a user by ID.
|
||||
|
||||
:returns: user_ref
|
||||
:param str user_id: User ID.
|
||||
|
||||
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
|
||||
:rtype: dict
|
||||
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
|
||||
"""
|
||||
|
@ -145,8 +248,16 @@ class IdentityDriverV8(object):
|
|||
def update_user(self, user_id, user):
|
||||
"""Update an existing user.
|
||||
|
||||
:param str user_id: User ID.
|
||||
:param dict user: User modification. See user schema in
|
||||
:class:`~.IdentityDriverV8`. Properties set to None will be
|
||||
removed. Required properties cannot be removed.
|
||||
|
||||
:returns: user. See user schema in :class:`~.IdentityDriverV8`.
|
||||
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
:raises keystone.exception.Conflict: If a duplicate user exists.
|
||||
:raises keystone.exception.Conflict: If a duplicate user exists in the
|
||||
same domain.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -155,6 +266,9 @@ class IdentityDriverV8(object):
|
|||
def add_user_to_group(self, user_id, group_id):
|
||||
"""Add a user to a group.
|
||||
|
||||
:param str user_id: User ID.
|
||||
:param str group_id: Group ID.
|
||||
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
|
@ -165,6 +279,11 @@ class IdentityDriverV8(object):
|
|||
def check_user_in_group(self, user_id, group_id):
|
||||
"""Check if a user is a member of a group.
|
||||
|
||||
:param str user_id: User ID.
|
||||
:param str group_id: Group ID.
|
||||
|
||||
:raises keystone.exception.NotFound: If the user is not a member of the
|
||||
group.
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
|
@ -175,7 +294,10 @@ class IdentityDriverV8(object):
|
|||
def remove_user_from_group(self, user_id, group_id):
|
||||
"""Remove a user from a group.
|
||||
|
||||
:raises keystone.exception.NotFound: If the entity not found.
|
||||
:param str user_id: User ID.
|
||||
:param str group_id: Group ID.
|
||||
|
||||
:raises keystone.exception.NotFound: If the user is not in the group.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -205,6 +327,13 @@ class IdentityDriverV8(object):
|
|||
def create_group(self, group_id, group):
|
||||
"""Create a new group.
|
||||
|
||||
:param str group_id: group ID. The driver can ignore this value.
|
||||
:param dict group: group info. See group schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
|
||||
:returns: group, matching the group schema.
|
||||
:rtype: dict
|
||||
|
||||
:raises keystone.exception.Conflict: If a duplicate group exists.
|
||||
|
||||
"""
|
||||
|
@ -216,8 +345,10 @@ class IdentityDriverV8(object):
|
|||
|
||||
:param hints: filter hints which the driver should
|
||||
implement if at all possible.
|
||||
:type hints: keystone.common.driver_hints.Hints
|
||||
|
||||
:returns: a list of group_refs or an empty list.
|
||||
:returns: a list of group_refs or an empty list. See group schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -226,11 +357,15 @@ class IdentityDriverV8(object):
|
|||
def list_groups_for_user(self, user_id, hints):
|
||||
"""List groups a user is in.
|
||||
|
||||
:param user_id: the user in question
|
||||
:param str user_id: the user in question
|
||||
:param hints: filter hints which the driver should
|
||||
implement if at all possible.
|
||||
:type hints: keystone.common.driver_hints.Hints
|
||||
|
||||
:returns: a list of group_refs or an empty list.
|
||||
:returns: a list of group_refs or an empty list. See group schema in
|
||||
:class:`~.IdentityDriverV8`.
|
||||
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
@ -239,7 +374,10 @@ class IdentityDriverV8(object):
|
|||
def get_group(self, group_id):
|
||||
"""Get a group by ID.
|
||||
|
||||
:returns: group_ref
|
||||
:param str group_id: group ID.
|
||||
|
||||
:returns: group info. See group schema in :class:`~.IdentityDriverV8`.
|
||||
:rtype: dict
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
"""
|
||||
|
@ -249,7 +387,11 @@ class IdentityDriverV8(object):
|
|||
def get_group_by_name(self, group_name, domain_id):
|
||||
"""Get a group by name.
|
||||
|
||||
:returns: group_ref
|
||||
:param str group_name: group name.
|
||||
:param str domain_id: domain ID.
|
||||
|
||||
:returns: group info. See group schema in :class:`~.IdentityDriverV8`.
|
||||
:rtype: dict
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
"""
|
||||
|
@ -259,6 +401,13 @@ class IdentityDriverV8(object):
|
|||
def update_group(self, group_id, group):
|
||||
"""Update an existing group.
|
||||
|
||||
:param str group_id: Group ID.
|
||||
:param dict group: Group modification. See group schema in
|
||||
:class:`~.IdentityDriverV8`. Required properties cannot be removed.
|
||||
|
||||
:returns: group, matching the group schema.
|
||||
:rtype: dict
|
||||
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
:raises keystone.exception.Conflict: If a duplicate group exists.
|
||||
|
||||
|
@ -269,6 +418,8 @@ class IdentityDriverV8(object):
|
|||
def delete_group(self, group_id):
|
||||
"""Delete an existing group.
|
||||
|
||||
:param str group_id: Group ID.
|
||||
|
||||
:raises keystone.exception.GroupNotFound: If the group doesn't exist.
|
||||
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,535 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from keystone.common import driver_hints
|
||||
from keystone import exception
|
||||
|
||||
|
||||
class IdentityDriverV8Tests(object):
|
||||
driver = None # subclasses must override driver to the actual driver.
|
||||
|
||||
# subclasses that don't allow name updates must set this to False.
|
||||
allows_name_update = True
|
||||
|
||||
# Subclasses must override this to indicate whether it's domain-aware or
|
||||
# not.
|
||||
expected_is_domain_aware = True
|
||||
|
||||
# Subclasses must override this to the expected default assignment driver.
|
||||
expected_default_assignment_driver = 'sql'
|
||||
|
||||
# Subclasses must override this to the expected is_sql value.
|
||||
expected_is_sql = False
|
||||
|
||||
# Subclasses must override this to the expected expected_generates_uuids
|
||||
# value.
|
||||
expected_generates_uuids = True
|
||||
|
||||
def create_user(self, domain_id=None, **kwargs):
|
||||
"""Get a user for the test.
|
||||
|
||||
Subclasses can override this to provide their own way to provide a user
|
||||
for the test. By default, driver.create_user is used. For drivers that
|
||||
don't support create_user, this may go directly to the backend, or
|
||||
maybe it gets a user from a set of pre-created users.
|
||||
"""
|
||||
user_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
'enabled': True,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = domain_id or uuid.uuid4().hex
|
||||
user.update(kwargs)
|
||||
return self.driver.create_user(user_id, user)
|
||||
|
||||
def create_group(self, domain_id=None):
|
||||
"""Get a group for the test.
|
||||
|
||||
Similar to :meth:`~.create_user`, subclasses can override this to
|
||||
provide their own way to provide a group for the test.
|
||||
"""
|
||||
group_id = uuid.uuid4().hex
|
||||
group = {
|
||||
'id': group_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
group['domain_id'] = domain_id or uuid.uuid4().hex
|
||||
return self.driver.create_group(group_id, group)
|
||||
|
||||
def test_is_domain_aware(self):
|
||||
self.assertIs(self.expected_is_domain_aware,
|
||||
self.driver.is_domain_aware())
|
||||
|
||||
def test_default_assignment_driver(self):
|
||||
self.assertEqual(self.expected_default_assignment_driver,
|
||||
self.driver.default_assignment_driver())
|
||||
|
||||
def test_is_sql(self):
|
||||
self.assertIs(self.expected_is_sql, self.driver.is_sql)
|
||||
|
||||
def test_generates_uuids(self):
|
||||
self.assertIs(self.expected_generates_uuids,
|
||||
self.driver.generates_uuids())
|
||||
|
||||
def test_create_user(self):
|
||||
# Don't use self.create_user since this needs to test the driver
|
||||
# interface and create_user might not use the driver.
|
||||
user_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
'enabled': True
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = uuid.uuid4().hex
|
||||
ret = self.driver.create_user(user_id, user)
|
||||
self.assertEqual(user_id, ret['id'])
|
||||
|
||||
def test_create_user_all_attributes(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
'password': uuid.uuid4().hex,
|
||||
'enabled': True,
|
||||
'default_project_id': uuid.uuid4().hex,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = uuid.uuid4().hex
|
||||
ret = self.driver.create_user(user_id, user)
|
||||
exp_user = user.copy()
|
||||
del exp_user['password']
|
||||
self.assertEqual(exp_user, ret)
|
||||
|
||||
def test_create_user_same_id_exc(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
'enabled': True,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = uuid.uuid4().hex
|
||||
self.driver.create_user(user_id, user)
|
||||
self.assertRaises(exception.Conflict,
|
||||
self.driver.create_user, user_id, user)
|
||||
|
||||
def test_create_user_same_name_and_domain_exc(self):
|
||||
user1_id = uuid.uuid4().hex
|
||||
name = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user1_id,
|
||||
'name': name,
|
||||
'enabled': True,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = domain_id
|
||||
self.driver.create_user(user1_id, user)
|
||||
|
||||
user2_id = uuid.uuid4().hex
|
||||
user = {
|
||||
'id': user2_id,
|
||||
'name': name,
|
||||
'enabled': True,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
user['domain_id'] = domain_id
|
||||
self.assertRaises(exception.Conflict,
|
||||
self.driver.create_user, user2_id, user)
|
||||
|
||||
def test_list_users_no_users(self):
|
||||
hints = driver_hints.Hints()
|
||||
self.assertEqual([], self.driver.list_users(hints))
|
||||
|
||||
def test_list_users_when_users(self):
|
||||
user = self.create_user()
|
||||
|
||||
hints = driver_hints.Hints()
|
||||
users = self.driver.list_users(hints)
|
||||
self.assertEqual([user['id']], [u['id'] for u in users])
|
||||
|
||||
def test_get_user(self):
|
||||
user = self.create_user()
|
||||
|
||||
actual_user = self.driver.get_user(user['id'])
|
||||
self.assertEqual(user['id'], actual_user['id'])
|
||||
|
||||
def test_get_user_no_user_exc(self):
|
||||
self.assertRaises(exception.UserNotFound,
|
||||
self.driver.get_user, uuid.uuid4().hex)
|
||||
|
||||
def test_get_user_by_name(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
user = self.create_user(domain_id=domain_id)
|
||||
|
||||
actual_user = self.driver.get_user_by_name(user['name'], domain_id)
|
||||
self.assertEqual(user['id'], actual_user['id'])
|
||||
|
||||
def test_get_user_by_name_no_user_exc(self):
|
||||
# When the user doesn't exist, UserNotFound is raised.
|
||||
self.assertRaises(
|
||||
exception.UserNotFound, self.driver.get_user_by_name,
|
||||
user_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex)
|
||||
|
||||
def test_update_user(self):
|
||||
user = self.create_user()
|
||||
|
||||
user_mod = {'enabled': False}
|
||||
actual_user = self.driver.update_user(user['id'], user_mod)
|
||||
self.assertEqual(user['id'], actual_user['id'])
|
||||
self.assertIs(False, actual_user['enabled'])
|
||||
|
||||
def test_update_user_remove_optional_attribute(self):
|
||||
# When the attribute has a value of None it's supposed to be removed.
|
||||
user = self.create_user(default_project_id=uuid.uuid4().hex)
|
||||
self.assertIn('default_project_id', user)
|
||||
|
||||
user_mod = {'default_project_id': None}
|
||||
actual_user = self.driver.update_user(user['id'], user_mod)
|
||||
self.assertNotIn('default_project_id', actual_user)
|
||||
|
||||
def test_update_user_same_name_exc(self):
|
||||
# For drivers that allow name update, if the name of a user is changed
|
||||
# to the same as another user in the same domain, Conflict is raised.
|
||||
|
||||
if not self.allows_name_update:
|
||||
self.skipTest("Backend doesn't allow name update.")
|
||||
|
||||
domain_id = uuid.uuid4().hex
|
||||
user1 = self.create_user(domain_id=domain_id)
|
||||
user2 = self.create_user(domain_id=domain_id)
|
||||
|
||||
user_mod = {'name': user2['name']}
|
||||
self.assertRaises(exception.Conflict, self.driver.update_user,
|
||||
user1['id'], user_mod)
|
||||
|
||||
def test_update_user_no_user_exc(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
user_mod = {'enabled': False}
|
||||
self.assertRaises(exception.UserNotFound,
|
||||
self.driver.update_user, user_id, user_mod)
|
||||
|
||||
def test_update_user_name_not_allowed_exc(self):
|
||||
# For drivers that do not allow name update, attempting to change the
|
||||
# name causes an exception.
|
||||
|
||||
if self.allows_name_update:
|
||||
self.skipTest("Backend allows name update.")
|
||||
|
||||
user = self.create_user()
|
||||
user_mod = {'name': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.Conflict, self.driver.update_user,
|
||||
user['id'], user_mod)
|
||||
|
||||
def test_delete_user(self):
|
||||
user = self.create_user()
|
||||
|
||||
self.driver.delete_user(user['id'])
|
||||
self.assertRaises(exception.UserNotFound, self.driver.get_user,
|
||||
user['id'])
|
||||
|
||||
def test_delete_user_no_user_exc(self):
|
||||
# When the user doesn't exist, UserNotFound is raised.
|
||||
self.assertRaises(exception.UserNotFound, self.driver.delete_user,
|
||||
user_id=uuid.uuid4().hex)
|
||||
|
||||
def test_create_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
group = {
|
||||
'id': group_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
group['domain_id'] = uuid.uuid4().hex
|
||||
new_group = self.driver.create_group(group_id, group)
|
||||
self.assertEqual(group_id, new_group['id'])
|
||||
|
||||
def test_create_group_all_attrs(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
group = {
|
||||
'id': group_id,
|
||||
'name': uuid.uuid4().hex,
|
||||
'description': uuid.uuid4().hex,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
group['domain_id'] = uuid.uuid4().hex
|
||||
new_group = self.driver.create_group(group_id, group)
|
||||
self.assertEqual(group, new_group)
|
||||
|
||||
def test_create_group_duplicate_exc(self):
|
||||
group1_id = uuid.uuid4().hex
|
||||
name = uuid.uuid4().hex
|
||||
domain = uuid.uuid4().hex
|
||||
group1 = {
|
||||
'id': group1_id,
|
||||
'name': name,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
group1['domain_id'] = domain
|
||||
self.driver.create_group(group1_id, group1)
|
||||
|
||||
group2_id = uuid.uuid4().hex
|
||||
group2 = {
|
||||
'id': group2_id,
|
||||
'name': name,
|
||||
}
|
||||
if self.driver.is_domain_aware():
|
||||
group2['domain_id'] = domain
|
||||
self.assertRaises(exception.Conflict, self.driver.create_group,
|
||||
group2_id, group2)
|
||||
|
||||
def test_get_group(self):
|
||||
group = self.create_group()
|
||||
|
||||
actual_group = self.driver.get_group(group['id'])
|
||||
self.assertEqual(group['id'], actual_group['id'])
|
||||
|
||||
def test_get_group_no_group_exc(self):
|
||||
# When the group doesn't exist, get_group raises GroupNotFound.
|
||||
self.assertRaises(exception.GroupNotFound, self.driver.get_group,
|
||||
group_id=uuid.uuid4().hex)
|
||||
|
||||
def test_get_group_by_name(self):
|
||||
domain_id = uuid.uuid4().hex
|
||||
group = self.create_group(domain_id=domain_id)
|
||||
|
||||
actual_group = self.driver.get_group_by_name(group['name'], domain_id)
|
||||
self.assertEqual(group['id'], actual_group['id'])
|
||||
|
||||
def test_get_group_by_name_no_user_exc(self):
|
||||
# When the group doesn't exist, get_group raises GroupNotFound.
|
||||
self.assertRaises(
|
||||
exception.GroupNotFound, self.driver.get_group_by_name,
|
||||
group_name=uuid.uuid4().hex, domain_id=uuid.uuid4().hex)
|
||||
|
||||
def test_update_group(self):
|
||||
group = self.create_group()
|
||||
|
||||
new_description = uuid.uuid4().hex
|
||||
group_mod = {'description': new_description}
|
||||
actual_group = self.driver.update_group(group['id'], group_mod)
|
||||
self.assertEqual(new_description, actual_group['description'])
|
||||
|
||||
def test_update_group_no_group(self):
|
||||
# When the group doesn't exist, GroupNotFound is raised.
|
||||
group_mod = {'description': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.GroupNotFound, self.driver.update_group,
|
||||
group_id=uuid.uuid4().hex, group=group_mod)
|
||||
|
||||
def test_update_group_name_already_exists(self):
|
||||
# For drivers that support renaming, when the group is renamed to a
|
||||
# name that already exists, Conflict is raised.
|
||||
|
||||
if not self.allows_name_update:
|
||||
self.skipTest("driver doesn't allow name update")
|
||||
|
||||
domain_id = uuid.uuid4().hex
|
||||
group1 = self.create_group(domain_id=domain_id)
|
||||
group2 = self.create_group(domain_id=domain_id)
|
||||
|
||||
group_mod = {'name': group1['name']}
|
||||
self.assertRaises(exception.Conflict, self.driver.update_group,
|
||||
group2['id'], group_mod)
|
||||
|
||||
def test_update_group_name_not_allowed(self):
|
||||
# For drivers that do not support renaming, when the group is attempted
|
||||
# to be renamed ValidationError is raised.
|
||||
|
||||
if self.allows_name_update:
|
||||
self.skipTest("driver allows name update")
|
||||
|
||||
group = self.create_group()
|
||||
|
||||
group_mod = {'name': uuid.uuid4().hex}
|
||||
self.assertRaises(exception.ValidationError, self.driver.update_group,
|
||||
group['id'], group_mod)
|
||||
|
||||
def test_delete_group(self):
|
||||
group = self.create_group()
|
||||
self.driver.delete_group(group['id'])
|
||||
self.assertRaises(exception.GroupNotFound, self.driver.get_group,
|
||||
group['id'])
|
||||
|
||||
def test_delete_group_doesnt_exist_exc(self):
|
||||
self.assertRaises(exception.GroupNotFound, self.driver.delete_group,
|
||||
group_id=uuid.uuid4().hex)
|
||||
|
||||
def test_list_groups_no_groups(self):
|
||||
groups = self.driver.list_groups(driver_hints.Hints())
|
||||
self.assertEqual([], groups)
|
||||
|
||||
def test_list_groups_one_group(self):
|
||||
group = self.create_group()
|
||||
groups = self.driver.list_groups(driver_hints.Hints())
|
||||
self.assertEqual(group['id'], groups[0]['id'])
|
||||
|
||||
def test_add_user_to_group(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
|
||||
self.driver.add_user_to_group(user['id'], group['id'])
|
||||
|
||||
# No assert since if doesn't raise, then successful.
|
||||
self.driver.check_user_in_group(user['id'], group['id'])
|
||||
|
||||
def test_add_user_to_group_no_user_exc(self):
|
||||
group = self.create_group()
|
||||
|
||||
user_id = uuid.uuid4().hex
|
||||
self.assertRaises(exception.UserNotFound,
|
||||
self.driver.add_user_to_group, user_id, group['id'])
|
||||
|
||||
def test_add_user_to_group_no_group_exc(self):
|
||||
user = self.create_user()
|
||||
|
||||
group_id = uuid.uuid4().hex
|
||||
self.assertRaises(exception.GroupNotFound,
|
||||
self.driver.add_user_to_group, user['id'], group_id)
|
||||
|
||||
def test_check_user_in_group(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
self.driver.add_user_to_group(user['id'], group['id'])
|
||||
|
||||
# No assert since if doesn't raise, then successful.
|
||||
self.driver.check_user_in_group(user['id'], group['id'])
|
||||
|
||||
def test_check_user_in_group_user_not_in_group_exc(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
|
||||
self.assertRaises(exception.NotFound, self.driver.check_user_in_group,
|
||||
user['id'], group['id'])
|
||||
|
||||
def test_check_user_in_group_user_doesnt_exist_exc(self):
|
||||
# When the user doesn't exist, UserNotFound is raised.
|
||||
group = self.create_group()
|
||||
|
||||
user_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.UserNotFound, self.driver.check_user_in_group, user_id,
|
||||
group['id'])
|
||||
|
||||
def test_check_user_in_group_group_doesnt_exist_exc(self):
|
||||
# When the group doesn't exist, UserNotFound is raised.
|
||||
user = self.create_user()
|
||||
|
||||
group_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.GroupNotFound, self.driver.check_user_in_group,
|
||||
user['id'], group_id)
|
||||
|
||||
def test_list_users_in_group_no_users(self):
|
||||
group = self.create_group()
|
||||
|
||||
users = self.driver.list_users_in_group(group['id'],
|
||||
driver_hints.Hints())
|
||||
self.assertEqual([], users)
|
||||
|
||||
def test_list_users_in_group_user(self):
|
||||
group = self.create_group()
|
||||
user = self.create_user()
|
||||
self.driver.add_user_to_group(user['id'], group['id'])
|
||||
|
||||
users = self.driver.list_users_in_group(group['id'],
|
||||
driver_hints.Hints())
|
||||
self.assertEqual([user['id']], [u['id'] for u in users])
|
||||
|
||||
def test_list_users_in_group_no_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.GroupNotFound, self.driver.list_users_in_group, group_id,
|
||||
driver_hints.Hints())
|
||||
|
||||
def test_list_groups_for_user_no_groups(self):
|
||||
user = self.create_user()
|
||||
|
||||
groups = self.driver.list_groups_for_user(user['id'],
|
||||
driver_hints.Hints())
|
||||
self.assertEqual([], groups)
|
||||
|
||||
def test_list_groups_for_user_group(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
self.driver.add_user_to_group(user['id'], group['id'])
|
||||
|
||||
groups = self.driver.list_groups_for_user(user['id'],
|
||||
driver_hints.Hints())
|
||||
self.assertEqual([group['id']], [g['id'] for g in groups])
|
||||
|
||||
def test_list_groups_for_user_no_user(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.UserNotFound, self.driver.list_groups_for_user,
|
||||
user_id, driver_hints.Hints())
|
||||
|
||||
def test_remove_user_from_group(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
self.driver.add_user_to_group(user['id'], group['id'])
|
||||
|
||||
self.driver.remove_user_from_group(user['id'], group['id'])
|
||||
|
||||
self.assertRaises(exception.NotFound, self.driver.check_user_in_group,
|
||||
user['id'], group['id'])
|
||||
|
||||
def test_remove_user_from_group_not_in_group(self):
|
||||
user = self.create_user()
|
||||
group = self.create_group()
|
||||
|
||||
# FIXME(blk-u): ldap is returning UserNotFound rather than NotFound,
|
||||
# fix this.
|
||||
self.assertRaises(
|
||||
exception.NotFound, self.driver.remove_user_from_group, user['id'],
|
||||
group['id'])
|
||||
|
||||
def test_remove_user_from_group_no_user(self):
|
||||
group = self.create_group()
|
||||
|
||||
user_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.UserNotFound, self.driver.remove_user_from_group,
|
||||
user_id, group['id'])
|
||||
|
||||
def test_remove_user_from_group_no_group(self):
|
||||
user = self.create_user()
|
||||
|
||||
group_id = uuid.uuid4().hex
|
||||
self.assertRaises(
|
||||
exception.GroupNotFound, self.driver.remove_user_from_group,
|
||||
user['id'], group_id)
|
||||
|
||||
def test_authenticate(self):
|
||||
password = uuid.uuid4().hex
|
||||
user = self.create_user(password=password)
|
||||
|
||||
actual_user = self.driver.authenticate(user['id'], password)
|
||||
self.assertEqual(user['id'], actual_user['id'])
|
||||
|
||||
def test_authenticate_wrong_password(self):
|
||||
user = self.create_user(password=uuid.uuid4().hex)
|
||||
|
||||
password = uuid.uuid4().hex
|
||||
self.assertRaises(AssertionError, self.driver.authenticate, user['id'],
|
||||
password)
|
||||
|
||||
def test_authenticate_no_user(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
password = uuid.uuid4().hex
|
||||
self.assertRaises(AssertionError, self.driver.authenticate, user_id,
|
||||
password)
|
|
@ -0,0 +1,43 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import fixture as config_fixture
|
||||
|
||||
from keystone.identity.backends import ldap
|
||||
from keystone.tests.unit import core
|
||||
from keystone.tests.unit.identity.backends import test_base
|
||||
from keystone.tests.unit.ksfixtures import ldapdb
|
||||
|
||||
|
||||
class TestIdentityDriver(core.BaseTestCase,
|
||||
test_base.IdentityDriverV8Tests):
|
||||
|
||||
allows_name_update = False
|
||||
expected_is_domain_aware = False
|
||||
expected_default_assignment_driver = 'sql'
|
||||
expected_is_sql = False
|
||||
expected_generates_uuids = False
|
||||
|
||||
def setUp(self):
|
||||
super(TestIdentityDriver, self).setUp()
|
||||
|
||||
config_fixture_ = self.useFixture(config_fixture.Config())
|
||||
config_fixture_.config(
|
||||
group='ldap',
|
||||
url='fake://memory',
|
||||
user='cn=Admin',
|
||||
password='password',
|
||||
suffix='cn=example,cn=com')
|
||||
|
||||
self.useFixture(ldapdb.LDAPDatabase())
|
||||
|
||||
self.driver = ldap.Identity()
|
|
@ -0,0 +1,55 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_db.sqlalchemy import test_base as db_test
|
||||
|
||||
from keystone.common import sql
|
||||
from keystone.identity.backends import sql as sql_backend
|
||||
from keystone.tests.unit.identity.backends import test_base
|
||||
from keystone.tests.unit.ksfixtures import database
|
||||
|
||||
|
||||
class TestIdentityDriver(db_test.DbTestCase,
|
||||
test_base.IdentityDriverV8Tests):
|
||||
|
||||
expected_is_domain_aware = True
|
||||
expected_default_assignment_driver = 'sql'
|
||||
expected_is_sql = True
|
||||
expected_generates_uuids = True
|
||||
|
||||
def setUp(self):
|
||||
super(TestIdentityDriver, self).setUp()
|
||||
|
||||
# Set keystone's connection URL to be the test engine's url.
|
||||
database.initialize_sql_session(self.engine.url)
|
||||
|
||||
# Override keystone's context manager to be oslo.db's global context
|
||||
# manager.
|
||||
sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True
|
||||
self.addCleanup(setattr,
|
||||
sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False)
|
||||
self.addCleanup(sql.cleanup)
|
||||
|
||||
version_specifiers = {}
|
||||
database._load_sqlalchemy_models(version_specifiers)
|
||||
sql.ModelBase.metadata.create_all(bind=self.engine)
|
||||
|
||||
self.driver = sql_backend.Identity()
|
||||
|
||||
|
||||
class MySQLOpportunisticIdentityDriverTestCase(TestIdentityDriver):
|
||||
FIXTURE = db_test.MySQLOpportunisticFixture
|
||||
|
||||
|
||||
class PostgreSQLOpportunisticIdentityDriverTestCase(TestIdentityDriver):
|
||||
FIXTURE = db_test.PostgreSQLOpportunisticFixture
|
|
@ -1,4 +1,5 @@
|
|||
keystone.tests.unit.common.test_ldap
|
||||
keystone.tests.unit.identity.backends.test_ldap
|
||||
keystone.tests.unit.test_backend_ldap
|
||||
keystone.tests.unit.test_backend_ldap_pool
|
||||
keystone.tests.unit.test_v2
|
||||
|
|
Loading…
Reference in New Issue