Remove LDAP delete logic and associated tests

[0][1] made it so that every write operation to the LDAP backend by
users returns a 403 Forbidden. However, since all our unit tests
depend on being able to write to the LDAP backend, the code to
perform writes is still in the codebase and it's toggled by the
unit tests.

This patch cleans up delete operations from unit tests. And since
those unit tests don't depend anymore on being able to do deletes
to LDAP, the associated delete logic in the driver is removed.
None of this is user facing or changes anything in the Keystone
behavior.

[0] I13eada3d5c3a166223c3e3ce70b7054eaed1003a
[1] I225daf0e00742c54d5d009e456d4a3ad864356a0

Change-Id: I9f183492fd318c755ba026e9e402615fe66f100c
This commit is contained in:
Kristi Nikolla 2017-01-23 16:49:43 -05:00
parent cbf179b6d5
commit 2fb3fd267c
8 changed files with 187 additions and 331 deletions

View File

@ -36,7 +36,6 @@ from keystone.i18n import _
LOG = log.getLogger(__name__)
LDAP_VALUES = {'TRUE': True, 'FALSE': False}
CONTROL_TREEDELETE = '1.2.840.113556.1.4.805'
LDAP_SCOPES = {'one': ldap.SCOPE_ONELEVEL,
'sub': ldap.SCOPE_SUBTREE}
LDAP_DEREF = {'always': ldap.DEREF_ALWAYS,
@ -479,14 +478,6 @@ class LDAPHandler(object):
def modify_s(self, dn, modlist):
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def delete_s(self, dn):
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def delete_ext_s(self, dn, serverctrls=None, clientctrls=None):
raise exception.NotImplemented() # pragma: no cover
class PythonLDAPHandler(LDAPHandler):
"""LDAPHandler implementation which calls the python-ldap API.
@ -565,12 +556,6 @@ class PythonLDAPHandler(LDAPHandler):
def modify_s(self, dn, modlist):
return self.conn.modify_s(dn, modlist)
def delete_s(self, dn):
return self.conn.delete_s(dn)
def delete_ext_s(self, dn, serverctrls=None, clientctrls=None):
return self.conn.delete_ext_s(dn, serverctrls, clientctrls)
def _common_ldap_initialization(url, use_tls=False, tls_cacertfile=None,
tls_cacertdir=None, tls_req_cert=None,
@ -832,14 +817,6 @@ class PooledLDAPHandler(LDAPHandler):
def modify_s(self, conn, dn, modlist):
return conn.modify_s(dn, modlist)
@use_conn_pool
def delete_s(self, conn, dn):
return conn.delete_s(dn)
@use_conn_pool
def delete_ext_s(self, conn, dn, serverctrls=None, clientctrls=None):
return conn.delete_ext_s(dn, serverctrls, clientctrls)
class KeystoneLDAPHandler(LDAPHandler):
"""Convert data types and perform logging.
@ -1078,17 +1055,6 @@ class KeystoneLDAPHandler(LDAPHandler):
for op, kind, values in ldap_modlist]
return self.conn.modify_s(dn_utf8, ldap_modlist_utf8)
def delete_s(self, dn):
LOG.debug("LDAP delete: dn=%s", dn)
dn_utf8 = utf8_encode(dn)
return self.conn.delete_s(dn_utf8)
def delete_ext_s(self, dn, serverctrls=None, clientctrls=None):
LOG.debug('LDAP delete_ext: dn=%s serverctrls=%s clientctrls=%s',
dn, serverctrls, clientctrls)
dn_utf8 = utf8_encode(dn)
return self.conn.delete_ext_s(dn_utf8, serverctrls, clientctrls)
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit runtime context, unbind LDAP."""
self.unbind_s()
@ -1569,13 +1535,6 @@ class BaseLdap(object):
return self.get(object_id)
def delete(self, object_id):
with self.get_connection() as conn:
try:
conn.delete_s(self._id_to_dn(object_id))
except ldap.NO_SUCH_OBJECT:
raise self._not_found(object_id)
def add_member(self, member_dn, member_list_dn):
"""Add member to the member list.
@ -1599,55 +1558,6 @@ class BaseLdap(object):
except ldap.NO_SUCH_OBJECT:
raise self._not_found(member_list_dn)
def remove_member(self, member_dn, member_list_dn):
"""Remove member from the member list.
:param member_dn: DN of member to be removed.
:param member_list_dn: DN of group from which the
member will be removed.
:raises self.NotFound: If the group entry didn't exist.
:raises ldap.NO_SUCH_ATTRIBUTE: If the user wasn't a member.
"""
with self.get_connection() as conn:
try:
mod = (ldap.MOD_DELETE, self.member_attribute, member_dn)
conn.modify_s(member_list_dn, [mod])
except ldap.NO_SUCH_OBJECT:
raise self._not_found(member_list_dn)
def _delete_tree_nodes(self, search_base, scope, query_params=None):
query = u'(objectClass=%s)' % self.object_class
if query_params:
query = (u'(&%s%s)' %
(query, ''.join(['(%s=%s)'
% (k, ldap.filter.escape_filter_chars(v))
for k, v in
query_params.items()])))
not_deleted_nodes = []
with self.get_connection() as conn:
try:
nodes = conn.search_s(search_base, scope, query,
attrlist=DN_ONLY)
except ldap.NO_SUCH_OBJECT:
LOG.debug('Could not find entry with dn=%s', search_base)
raise self._not_found(self._dn_to_id(search_base))
else:
for node_dn, _t in nodes:
try:
conn.delete_s(node_dn)
except ldap.NO_SUCH_OBJECT:
not_deleted_nodes.append(node_dn)
if not_deleted_nodes:
msg = ('When deleting entries for %(search_base)s, '
'could not delete nonexistent entries '
'%(entries)s%(dots)s')
LOG.warning(msg,
{'search_base': search_base,
'entries': not_deleted_nodes[:3],
'dots': '...' if len(not_deleted_nodes) > 3 else ''})
def filter_query(self, hints, query=None):
"""Apply filtering to a query.
@ -1885,8 +1795,3 @@ class EnabledEmuMixIn(BaseLdap):
else:
return super(EnabledEmuMixIn, self).update(
object_id, values, old_obj)
def delete(self, object_id):
if self.enabled_emulation:
self._remove_enabled(object_id)
super(EnabledEmuMixIn, self).delete(object_id)

View File

@ -165,8 +165,7 @@ class Identity(base.IdentityDriverBase):
return self._update_user(user_id, user)
def delete_user(self, user_id):
self._disallow_write()
self._delete_user(user_id)
raise exception.Forbidden(READ_ONLY_LDAP_ERROR_MESSAGE)
def change_password(self, user_id, new_password):
raise exception.Forbidden(READ_ONLY_LDAP_ERROR_MESSAGE)
@ -176,8 +175,7 @@ class Identity(base.IdentityDriverBase):
self._add_user_to_group(user_id, group_id)
def remove_user_from_group(self, user_id, group_id):
self._disallow_write()
self._remove_user_from_group(user_id, group_id)
raise exception.Forbidden(READ_ONLY_LDAP_ERROR_MESSAGE)
def create_group(self, group_id, group):
self._disallow_write()
@ -188,8 +186,7 @@ class Identity(base.IdentityDriverBase):
return self._update_group(group_id, group)
def delete_group(self, group_id):
self._disallow_write()
return self._delete_group(group_id)
raise exception.Forbidden(READ_ONLY_LDAP_ERROR_MESSAGE)
# Test implementations
def _create_user(self, user_id, user):
@ -217,26 +214,6 @@ class Identity(base.IdentityDriverBase):
self.user.update(user_id, user, old_obj)
return self.user.get_filtered(user_id)
def _delete_user(self, user_id):
msg = _DEPRECATION_MSG % "delete_user"
versionutils.report_deprecated_feature(LOG, msg)
user = self.user.get(user_id)
user_dn = user['dn']
groups = self.group.list_user_groups(user_dn)
for group in groups:
group_ref = self.group.get(group['id'], '*') # unfiltered
group_dn = group_ref['dn']
try:
super(GroupApi, self.group).remove_member(user_dn, group_dn)
except ldap.NO_SUCH_ATTRIBUTE:
msg = ('User %(user)s was not removed from group %(group)s '
'because the relationship was not found')
LOG.warning(msg, {'user': user_id, 'group': group['id']})
if hasattr(user, 'tenant_id'):
self.project.remove_user(user.tenant_id, user_dn)
self.user.delete(user_id)
def _create_group(self, group_id, group):
msg = _DEPRECATION_MSG % "create_group"
versionutils.report_deprecated_feature(LOG, msg)
@ -247,11 +224,6 @@ class Identity(base.IdentityDriverBase):
versionutils.report_deprecated_feature(LOG, msg)
return common_ldap.filter_entity(self.group.update(group_id, group))
def _delete_group(self, group_id):
msg = _DEPRECATION_MSG % "delete_group"
versionutils.report_deprecated_feature(LOG, msg)
return self.group.delete(group_id)
def _add_user_to_group(self, user_id, group_id):
msg = _DEPRECATION_MSG % "add_user_to_group"
versionutils.report_deprecated_feature(LOG, msg)
@ -259,13 +231,6 @@ class Identity(base.IdentityDriverBase):
user_dn = user_ref['dn']
self.group.add_user(user_dn, group_id, user_id)
def _remove_user_from_group(self, user_id, group_id):
msg = _DEPRECATION_MSG % "remove_user_from_group"
versionutils.report_deprecated_feature(LOG, msg)
user_ref = self._get_user(user_id)
user_dn = user_ref['dn']
self.group.remove_user(user_dn, group_id, user_id)
# TODO(termie): turn this into a data object and move logic to driver
class UserApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap):
@ -418,16 +383,6 @@ class GroupApi(common_ldap.BaseLdap):
data.pop('description')
return super(GroupApi, self).create(data)
def delete(self, group_id):
# TODO(spzala): this is only placeholder for group and domain
# role support which will be added under bug 1101287
group_ref = self.get(group_id)
group_dn = group_ref['dn']
if group_dn:
self._delete_tree_nodes(group_dn, ldap.SCOPE_ONELEVEL)
super(GroupApi, self).delete(group_id)
def update(self, group_id, values):
old_obj = self.get(group_id)
return super(GroupApi, self).update(group_id, values, old_obj)
@ -442,14 +397,6 @@ class GroupApi(common_ldap.BaseLdap):
'User %(user_id)s is already a member of group %(group_id)s') %
{'user_id': user_id, 'group_id': group_id})
def remove_user(self, user_dn, group_id, user_id):
group_ref = self.get(group_id)
group_dn = group_ref['dn']
try:
super(GroupApi, self).remove_member(user_dn, group_dn)
except ldap.NO_SUCH_ATTRIBUTE:
raise exception.UserNotFound(user_id=user_id)
def list_user_groups(self, user_dn):
"""Return a list of groups for which the user is a member."""
user_dn_esc = ldap.filter.escape_filter_chars(user_dn)

View File

@ -41,9 +41,6 @@ SCOPE_NAMES = {
ldap.SCOPE_SUBTREE: 'SCOPE_SUBTREE',
}
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa366991(v=vs.85).aspx # noqa
CONTROL_TREEDELETE = '1.2.840.113556.1.4.805'
LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF

View File

@ -17,6 +17,7 @@ import uuid
from six.moves import range
import keystone.conf
from keystone import exception
CONF = keystone.conf.CONF
@ -122,4 +123,9 @@ class FilterTests(object):
def _delete_test_data(self, entity_type, entity_list):
for entity in entity_list:
self._delete_entity(entity_type)(entity['id'])
try:
self._delete_entity(entity_type)(entity['id'])
except exception.Forbidden:
# Note(knikolla): Some identity backends such as LDAP are
# read only
break

View File

@ -42,3 +42,27 @@ class TestIdentityDriver(core.BaseTestCase,
self.useFixture(ldapdb.LDAPDatabase())
self.driver = ldap.Identity()
def test_delete_user(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_user_no_user_exc(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_group(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_group_doesnt_exist_exc(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_remove_user_from_group(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_remove_user_from_group_not_in_group(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_remove_user_from_group_no_user(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_remove_user_from_group_no_group(self):
self.skip_test_overrides('N/A: LDAP has no write support')

View File

@ -760,26 +760,6 @@ class IdentityTests(object):
negative_user['id'])
self.assertEqual(0, len(group_refs))
# remove the user from each group and ensure that
# the group count reduces by one for each
for x in range(0, 3):
before_count = GROUP_COUNT - x
after_count = GROUP_COUNT - x - 1
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.remove_user_from_group(
positive_user['id'],
test_groups[x]['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user
# did not change
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
def test_remove_user_from_group(self):
domain = self._get_domain_fixture()
new_group = unit.new_group_ref(domain_id=domain['id'])

View File

@ -142,6 +142,15 @@ class IdentityTests(identity_tests.IdentityTests):
self.skip_test_overrides(
"Using arbitrary attributes doesn't work under LDAP")
def test_remove_user_from_group(self):
self.skip_test_overrides('N/A: LDAP does not support write')
def test_remove_user_from_group_returns_not_found(self):
self.skip_test_overrides('N/A: LDAP does not support write')
def test_delete_user_returns_not_found(self):
self.skip_test_overrides('N/A: LDAP does not support write')
class AssignmentTests(assignment_tests.AssignmentTests):
@ -202,6 +211,21 @@ class AssignmentTests(assignment_tests.AssignmentTests):
def test_multi_group_grants_on_project_domain(self):
self.skip_test_overrides('N/A: LDAP does not support multiple domains')
def test_delete_user_grant_no_user(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_group_grant_no_group(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_user_with_project_roles(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_user_with_project_association(self):
self.skip_test_overrides('N/A: LDAP has no write support')
def test_delete_group_removes_role_assignments(self):
self.skip_test_overrides('N/A: LDAP has no write support')
class ResourceTests(resource_tests.ResourceTests):
@ -355,9 +379,8 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
user['password'] = u'fäképass2'
self.identity_api.update_user(user['id'], user)
self.identity_api.delete_user(user['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
self.assertRaises(exception.Forbidden,
self.identity_api.delete_user,
user['id'])
def test_user_filter(self):
@ -663,43 +686,6 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
after_assignments = len(self.assignment_api.list_role_assignments())
self.assertEqual(existing_assignments + 2, after_assignments)
def test_list_group_members_missing_entry(self):
"""List group members with deleted user.
If a group has a deleted entry for a member, the non-deleted members
are returned.
"""
# Create a group
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group)['id']
# Create a couple of users and add them to the group.
user = dict(name=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
user_1_id = self.identity_api.create_user(user)['id']
self.identity_api.add_user_to_group(user_1_id, group_id)
user = dict(name=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
user_2_id = self.identity_api.create_user(user)['id']
self.identity_api.add_user_to_group(user_2_id, group_id)
# Delete user 2
# NOTE(blk-u): need to go directly to user interface to keep from
# updating the group.
unused, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(user_2_id))
driver.user.delete(entity_id)
# List group users and verify only user 1.
res = self.identity_api.list_users_in_group(group_id)
self.assertEqual(1, len(res), "Expected 1 entry (user_1)")
self.assertEqual(user_1_id, res[0]['id'], "Expected user 1 id")
def test_list_group_members_when_no_members(self):
# List group members when there is no member in the group.
# No exception should be raised.
@ -741,6 +727,31 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
user_id=user['id'],
password=None)
@mock.patch.object(versionutils, 'report_deprecated_feature')
def test_user_crud(self, mock_deprecator):
# NOTE(stevemar): As of the Mitaka release, we now check for calls that
# the LDAP write functionality has been deprecated.
user_dict = self.new_user_ref(
domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user_dict)
args, _kwargs = mock_deprecator.call_args
self.assertIn("create_user for the LDAP identity backend", args[1])
del user_dict['password']
user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
user_dict['password'] = uuid.uuid4().hex
self.identity_api.update_user(user['id'], user_dict)
args, _kwargs = mock_deprecator.call_args
self.assertIn("update_user for the LDAP identity backend", args[1])
del user_dict['password']
user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
# The group and domain CRUD tests below override the standard ones in
# unit.identity.test_backends.py so that we can exclude the update name
# test, since we do not (and will not) support the update of either group
@ -765,15 +776,8 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
group_ref = self.identity_api.get_group(group['id'])
self.assertDictEqual(group, group_ref)
self.identity_api.delete_group(group['id'])
args, _kwargs = mock_deprecator.call_args
self.assertIn("delete_group for the LDAP identity backend", args[1])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
group['id'])
@mock.patch.object(versionutils, 'report_deprecated_feature')
def test_add_remove_user_group_deprecated(self, mock_deprecator):
def test_add_user_group_deprecated(self, mock_deprecator):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
@ -782,27 +786,10 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
args, _kwargs = mock_deprecator.call_args
self.assertIn("add_user_to_group for the LDAP identity", args[1])
self.identity_api.remove_user_from_group(user['id'], group['id'])
args, _kwargs = mock_deprecator.call_args
self.assertIn("remove_user_from_group for the LDAP identity", args[1])
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_group_crud(self):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
group_ref = self.identity_api.get_group(group['id'])
# delete the group bypassing identity api.
domain_id, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(group['id']))
driver.delete_group(entity_id)
self.assertEqual(group_ref,
self.identity_api.get_group(group['id']))
self.identity_api.get_group.invalidate(self.identity_api, group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group, group['id'])
# Note(knikolla): Since delete logic has been deleted from LDAP,
# this doesn't test caching on delete.
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
@ -812,6 +799,41 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
group_ref)
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_get_user(self):
# Note(knikolla): Since delete logic has been deleted from LDAP,
# this doesn't test caching on delete.
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
user['description'] = uuid.uuid4().hex
# cache the result.
self.identity_api.get_user(ref['id'])
# update using identity api and get back updated user.
user_updated = self.identity_api.update_user(ref['id'], user)
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
user_updated)
self.assertDictContainsSubset(
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
user_updated)
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_get_user_by_name(self):
# Note(knikolla): Since delete logic has been deleted from LDAP,
# this doesn't test caching on delete.
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
user['description'] = uuid.uuid4().hex
user_updated = self.identity_api.update_user(ref['id'], user)
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
user_updated)
self.assertDictContainsSubset(
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
user_updated)
def test_create_user_none_mapping(self):
# When create a user where an attribute maps to None, the entry is
# created without that attribute and it doesn't fail with a TypeError.
@ -825,24 +847,6 @@ class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
# If this doesn't raise, then the test is successful.
user = self.identity_api.create_user(user)
def test_create_user_with_boolean_string_names(self):
# Ensure that any attribute that is equal to the string 'TRUE'
# or 'FALSE' will not be converted to a boolean value, it
# should be returned as is.
boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
'TrUe' 'FaLse']
for name in boolean_strings:
user = self.new_user_ref(name=name,
domain_id=CONF.identity.default_domain_id)
user_ref = self.identity_api.create_user(user)
user_info = self.identity_api.get_user(user_ref['id'])
self.assertEqual(name, user_info['name'])
# Delete the user to ensure that the Keystone uniqueness
# requirements combined with the case-insensitive nature of a
# typical LDAP schema does not cause subsequent names in
# boolean_strings to clash.
self.identity_api.delete_user(user_ref['id'])
def test_unignored_user_none_mapping(self):
# Ensure that an attribute that maps to None that is not explicitly
# ignored in configuration is implicitly ignored without triggering
@ -1787,68 +1791,6 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.resource_api.get_project,
project['id'])
@mock.patch.object(versionutils, 'report_deprecated_feature')
def test_user_crud(self, mock_deprecator):
# NOTE(stevemar): As of the Mitaka release, we now check for calls that
# the LDAP write functionality has been deprecated.
user_dict = self.new_user_ref(
domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user_dict)
args, _kwargs = mock_deprecator.call_args
self.assertIn("create_user for the LDAP identity backend", args[1])
del user_dict['password']
user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
user_dict['password'] = uuid.uuid4().hex
self.identity_api.update_user(user['id'], user_dict)
args, _kwargs = mock_deprecator.call_args
self.assertIn("update_user for the LDAP identity backend", args[1])
del user_dict['password']
user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
self.identity_api.delete_user(user['id'])
args, _kwargs = mock_deprecator.call_args
self.assertIn("delete_user for the LDAP identity backend", args[1])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
user['id'])
def test_delete_user_group_cleanup(self):
domain = self._get_domain_fixture()
# setup: create user
user_dict = self.new_user_ref(domain_id=domain['id'])
user = self.identity_api.create_user(user_dict)
# setup: add user to 3 groups
group_names = []
numgroups = 3
for _ in range(numgroups):
group_dict = unit.new_group_ref(domain_id=domain['id'])
group = self.identity_api.create_group(group_dict)
group_names.append(group['name'])
self.identity_api.add_user_to_group(user['id'], group['id'])
# configure a group filter
driver = self.identity_api._select_identity_driver(domain['id'])
driver.group.ldap_filter = ('(|(ou=%s)(ou=%s))' %
tuple(group_names[:2]))
# confirm that user is a member of all 3 groups
group_api = self.identity_api.driver.group
user_dn_esc = self.identity_api.driver.user.get(user['id'])['dn']
groups = group_api.get_all('(%s=%s)' %
(group_api.member_attribute, user_dn_esc))
self.assertEqual(numgroups, len(groups))
# confirm that deleting user removes from all groups
self.identity_api.delete_user(user['id'])
groups = group_api.get_all('(%s=%s)' %
(group_api.member_attribute, user_dn_esc))
self.assertEqual(0, len(groups))
def test_user_auth_emulated(self):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -3080,8 +3022,7 @@ class LDAPMatchingRuleInChainTests(LDAPTestSetup, unit.TestCase):
self.assertDictEqual(self.group, group_ref)
def test_list_user_groups(self):
# tests indirectly by calling delete user
self.identity_api.delete_user(self.user['id'])
self.identity_api.list_groups_for_user(self.user['id'])
def test_list_groups_for_user(self):
groups_ref = self.identity_api.list_groups_for_user(self.user['id'])
@ -3091,10 +3032,3 @@ class LDAPMatchingRuleInChainTests(LDAPTestSetup, unit.TestCase):
groups_refs = self.identity_api.list_groups()
self.assertEqual(1, len(groups_refs))
self.assertEqual(self.group['id'], groups_refs[0]['id'])
self.identity_api.delete_group(self.group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
self.group['id'])
groups_refs = self.identity_api.list_groups()
self.assertEqual([], groups_refs)

View File

@ -476,6 +476,69 @@ class SqlIdentity(SqlTests,
# roles assignments.
self.assertThat(user_domains, matchers.HasLength(0))
def test_list_groups_for_user(self):
domain = self._get_domain_fixture()
test_groups = []
test_users = []
GROUP_COUNT = 3
USER_COUNT = 2
for x in range(0, USER_COUNT):
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
test_users.append(new_user)
positive_user = test_users[0]
negative_user = test_users[1]
for x in range(0, USER_COUNT):
group_refs = self.identity_api.list_groups_for_user(
test_users[x]['id'])
self.assertEqual(0, len(group_refs))
for x in range(0, GROUP_COUNT):
before_count = x
after_count = x + 1
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
test_groups.append(new_group)
# add the user to the group and ensure that the
# group count increases by one for each
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.add_user_to_group(
positive_user['id'],
new_group['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user did not change
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
# remove the user from each group and ensure that
# the group count reduces by one for each
for x in range(0, 3):
before_count = GROUP_COUNT - x
after_count = GROUP_COUNT - x - 1
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.remove_user_from_group(
positive_user['id'],
test_groups[x]['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user
# did not change
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
def test_storing_null_domain_id_in_project_ref(self):
"""Test the special storage of domain_id=None in sql resource driver.