Merge "Check that the user is dumb moved to the common method"

This commit is contained in:
Jenkins 2014-05-30 11:45:10 +00:00 committed by Gerrit Code Review
commit 35568575bb
4 changed files with 89 additions and 12 deletions

View File

@ -450,7 +450,7 @@ class ProjectApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap):
if not role_dn:
# Get users who have default tenant mapping
for user_dn in tenant[1].get(self.member_attribute, []):
if self.use_dumb_member and user_dn == self.dumb_member:
if self._is_dumb_member(user_dn):
continue
res.add(user_dn)
@ -552,7 +552,7 @@ class RoleApi(common_ldap.BaseLdap):
except KeyError:
continue
for user_dn in user_dns:
if self.use_dumb_member and user_dn == self.dumb_member:
if self._is_dumb_member(user_dn):
continue
res.append(UserRoleAssociation(
user_dn=user_dn,
@ -647,15 +647,7 @@ class RoleApi(common_ldap.BaseLdap):
# object.
tenant_dn = ldap.dn.dn2str(tenant)
for user_dn in role[self.member_attribute]:
# NOTE(nkinder): Ideally, this comparison would be aware of the
# Distinguished Name LDAP syntax. Since Keystone is responsible
# for setting the dumb member DN, we are relatively sure that
# it is returned in the same form. We still need to do a case
# insensitive comparison since attribute names will be upper
# case for AD. We already do this elsewhere in the LDAP
# driver, so it's OK until we decide to become syntax aware.
if (self.use_dumb_member and
user_dn.lower() == self.dumb_member.lower()):
if self._is_dumb_member(user_dn):
continue
res.append(UserRoleAssociation(
user_dn=user_dn,

View File

@ -875,6 +875,14 @@ class BaseLdap(object):
mapping[ldap_attr] = attr_map
return mapping
def _is_dumb_member(self, member_dn):
"""Checks that member is a dumb member.
:param member_dn: DN of member to be checked.
"""
return (self.use_dumb_member
and is_dn_equal(member_dn, self.dumb_member))
def get_connection(self, user=None, password=None):
conn = _get_connection(self.LDAP_URL)

View File

@ -357,7 +357,7 @@ class GroupApi(common_ldap.BaseLdap):
for dn, member in attrs:
user_dns = member.get(self.member_attribute, [])
for user_dn in user_dns:
if self.use_dumb_member and user_dn == self.dumb_member:
if self._is_dumb_member(user_dn):
continue
users.append(user_dn)
return users

View File

@ -398,6 +398,27 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
dumb_id = common_ldap.BaseLdap._dn_to_id(CONF.ldap.dumb_member)
self.assertNotIn(dumb_id, assignment_ids)
def test_list_user_ids_for_project_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
self.clear_database()
self.load_backends()
self.load_fixtures(default_fixtures)
user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': test_backend.DEFAULT_DOMAIN_ID}
self.identity_api.create_user(user['id'], user)
self.assignment_api.add_user_to_project(self.tenant_baz['id'],
user['id'])
user_ids = self.assignment_api.list_user_ids_for_project(
self.tenant_baz['id'])
self.assertIn(user['id'], user_ids)
dumb_id = common_ldap.BaseLdap._dn_to_id(CONF.ldap.dumb_member)
self.assertNotIn(dumb_id, user_ids)
def test_list_role_assignments_bad_role(self):
self.skipTest('Blocked by bug 1221805')
@ -459,6 +480,32 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# If this doesn't raise, then the test is successful.
self.identity_api.list_users_in_group(group['id'])
def test_list_group_members_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
self.clear_database()
self.load_backends()
self.load_fixtures(default_fixtures)
# Create a group
group_id = None
group = dict(name=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_id, group)['id']
# Create a user
user_id = None
user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
user_id = self.identity_api.create_user(user_id, user)['id']
# Add user to the group
self.identity_api.add_user_to_group(user_id, group_id)
user_ids = self.identity_api.list_users_in_group(group_id)
dumb_id = common_ldap.BaseLdap._dn_to_id(CONF.ldap.dumb_member)
self.assertNotIn(dumb_id, user_ids)
def test_list_domains(self):
domains = self.assignment_api.list_domains()
self.assertEqual(
@ -1031,6 +1078,36 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
'Invalid LDAP deref option: %s\.' % CONF.ldap.alias_dereferencing,
identity.backends.ldap.Identity)
def test_is_dumb_member(self):
self.config_fixture.config(group='ldap',
use_dumb_member=True)
self.load_backends()
dn = 'cn=dumb,dc=nonexistent'
self.assertTrue(self.identity_api.driver.user._is_dumb_member(dn))
def test_is_dumb_member_upper_case_keys(self):
self.config_fixture.config(group='ldap',
use_dumb_member=True)
self.load_backends()
dn = 'CN=dumb,DC=nonexistent'
self.assertTrue(self.identity_api.driver.user._is_dumb_member(dn))
def test_is_dumb_member_with_false_use_dumb_member(self):
self.config_fixture.config(group='ldap',
use_dumb_member=False)
self.load_backends()
dn = 'cn=dumb,dc=nonexistent'
self.assertFalse(self.identity_api.driver.user._is_dumb_member(dn))
def test_is_dumb_member_not_dumb(self):
self.config_fixture.config(group='ldap',
use_dumb_member=True)
self.load_backends()
dn = 'ou=some,dc=example.com'
self.assertFalse(self.identity_api.driver.user._is_dumb_member(dn))
def test_user_extra_attribute_mapping(self):
self.config_fixture.config(
group='ldap',