Isolate common ldap code to the identity backend

Since the LDAP code is now isolated to identity only move the common
LDAP code to be part of the ldap backend. This consolidates the code
to a specific location with a 2 cycle deprecation stub in
``keystone.common.ldap``. This lays the ground work for simplifying
the once extremely complex LDAP code that was originally written
to handle Identity and Assignment contructs.

Change-Id: I897bf24ba806c413bd17b76ed62a6cf446dfb0b6
This commit is contained in:
Morgan Fainberg 2016-05-21 20:10:10 -07:00
parent 94391a3240
commit d5cca09bc1
15 changed files with 2199 additions and 2125 deletions

View File

@ -12,4 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common.ldap.core import * # noqa
from oslo_log import versionutils
versionutils.deprecated(
what='keystone.common.ldap',
as_of=versionutils.deprecated.NEWTON,
remove_in=+2,
in_favor_of='keystone.identity.backends.ldap.common')
# NOTE(notmorgan): This is maintained for compatibility in case outside
# developers are relying on this location.
from keystone.identity.backends.ldap.common import * # noqa

File diff suppressed because it is too large Load Diff

View File

@ -12,59 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base model for keystone internal services.
Unless marked otherwise, all fields are strings.
"""
from oslo_log import versionutils
class Model(dict):
"""Base model class."""
versionutils.deprecated(
what='keystone.common.ldap.models',
as_of=versionutils.deprecated.NEWTON,
remove_in=+2,
in_favor_of='keystone.identity.backends.ldap.models')
def __hash__(self):
"""Define hash behavior where hash of service ID is returned."""
return self['id'].__hash__()
@property
def known_keys(cls):
return cls.required_keys + cls.optional_keys
class User(Model):
"""User object.
Required keys:
id
name
domain_id
Optional keys:
password
description
email
enabled (bool, default True)
default_project_id
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('password', 'description', 'email', 'enabled',
'default_project_id')
class Group(Model):
"""Group object.
Required keys:
id
name
domain_id
Optional keys:
description
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('description',)
# NOTE(notmorgan): This is maintained for compatibility in case outside
# developers are relying on this location.
from keystone.identity.backends.ldap.models import * # noqa

View File

@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.identity.backends.ldap.core import * # noqa

File diff suppressed because it is too large Load Diff

View File

@ -21,11 +21,11 @@ from oslo_log import versionutils
import six
from keystone.common import driver_hints
from keystone.common import ldap as common_ldap
from keystone.common.ldap import models
from keystone import exception
from keystone.i18n import _
from keystone.identity.backends import base
from keystone.identity.backends.ldap import common as common_ldap
from keystone.identity.backends.ldap import models
CONF = cfg.CONF

View File

@ -0,0 +1,70 @@
# Copyright (C) 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base model for keystone internal services.
Unless marked otherwise, all fields are strings.
"""
class Model(dict):
"""Base model class."""
def __hash__(self):
"""Define hash behavior where hash of service ID is returned."""
return self['id'].__hash__()
@property
def known_keys(cls):
return cls.required_keys + cls.optional_keys
class User(Model):
"""User object.
Required keys:
id
name
domain_id
Optional keys:
password
description
email
enabled (bool, default True)
default_project_id
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('password', 'description', 'email', 'enabled',
'default_project_id')
class Group(Model):
"""Group object.
Required keys:
id
name
domain_id
Optional keys:
description
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('description',)

View File

@ -15,8 +15,7 @@ import ldap
from oslo_config import cfg
from keystone.common import cache
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap
@ -45,7 +44,7 @@ class BaseBackendLdapCommon(object):
self.load_backends()
self.load_fixtures(default_fixtures)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
self.addCleanup(self.clear_database)
def _get_domain_fixture(self):

View File

@ -46,9 +46,9 @@ from keystone import auth
from keystone.common import config
from keystone.common import dependency
from keystone.common.kvs import core as kvs_core
from keystone.common import ldap as ks_ldap
from keystone.common import sql
from keystone import exception
from keystone.identity.backends.ldap import common as ks_ldap
from keystone import notifications
from keystone.server import common
from keystone.tests.unit import ksfixtures
@ -622,7 +622,7 @@ class TestCase(BaseTestCase):
'routes.middleware=INFO',
'stevedore.extension=INFO',
'keystone.notifications=INFO',
'keystone.common.ldap=INFO',
'keystone.identity.backends.ldap.common=INFO',
])
self.auth_plugin_config_override()

View File

@ -32,9 +32,8 @@ from oslo_log import log
import six
from six import moves
from keystone.common.ldap import core
from keystone import exception
from keystone.identity.backends.ldap import common
SCOPE_NAMES = {
ldap.SCOPE_BASE: 'SCOPE_BASE',
@ -51,7 +50,7 @@ CONF = cfg.CONF
def _internal_attr(attr_name, value_or_values):
def normalize_value(value):
return core.utf8_decode(value)
return common.utf8_decode(value)
def normalize_dn(dn):
# Capitalize the attribute names as an LDAP server might.
@ -69,7 +68,7 @@ def _internal_attr(attr_name, value_or_values):
return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
try:
dn = ldap.dn.str2dn(core.utf8_encode(dn))
dn = ldap.dn.str2dn(common.utf8_encode(dn))
except ldap.DECODING_ERROR:
# NOTE(amakarov): In case of IDs instead of DNs in group members
# they must be handled as regular values.
@ -78,10 +77,10 @@ def _internal_attr(attr_name, value_or_values):
norm = []
for part in dn:
name, val, i = part[0]
name = core.utf8_decode(name)
name = common.utf8_decode(name)
name = name.upper()
norm.append([(name, val, i)])
return core.utf8_decode(ldap.dn.dn2str(norm))
return common.utf8_decode(ldap.dn.dn2str(norm))
if attr_name in ('member', 'roleOccupant'):
attr_fn = normalize_dn
@ -217,7 +216,7 @@ FakeShelves = {}
PendingRequests = {}
class FakeLdap(core.LDAPHandler):
class FakeLdap(common.LDAPHandler):
"""Emulate the python-ldap API.
The python-ldap API requires all strings to be UTF-8 encoded. This
@ -263,7 +262,7 @@ class FakeLdap(core.LDAPHandler):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cacertfile)
elif tls_cacertdir:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, tls_cacertdir)
if tls_req_cert in list(core.LDAP_TLS_CERTS.values()):
if tls_req_cert in list(common.LDAP_TLS_CERTS.values()):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, tls_req_cert)
else:
raise ValueError("invalid TLS_REQUIRE_CERT tls_req_cert=%s",
@ -281,13 +280,15 @@ class FakeLdap(core.LDAPHandler):
self.pool_conn_lifetime = pool_conn_lifetime
def dn(self, dn):
return core.utf8_decode(dn)
return common.utf8_decode(dn)
def _dn_to_id_attr(self, dn):
return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][0])
return common.utf8_decode(
ldap.dn.str2dn(common.utf8_encode(dn))[0][0][0])
def _dn_to_id_value(self, dn):
return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][1])
return common.utf8_decode(
ldap.dn.str2dn(common.utf8_encode(dn))[0][0][1])
def key(self, dn):
return '%s%s' % (self.__prefix, self.dn(dn))
@ -298,14 +299,14 @@ class FakeLdap(core.LDAPHandler):
if server_fail:
raise ldap.SERVER_DOWN
whos = ['cn=Admin', CONF.ldap.user]
if (core.utf8_decode(who) in whos and
core.utf8_decode(cred) in ['password', CONF.ldap.password]):
if (common.utf8_decode(who) in whos and
common.utf8_decode(cred) in ['password', CONF.ldap.password]):
return
try:
attrs = self.db[self.key(who)]
except KeyError:
LOG.debug('bind fail: who=%s not found', core.utf8_decode(who))
LOG.debug('bind fail: who=%s not found', common.utf8_decode(who))
raise ldap.NO_SUCH_OBJECT
db_password = None
@ -313,12 +314,12 @@ class FakeLdap(core.LDAPHandler):
db_password = attrs['userPassword'][0]
except (KeyError, IndexError):
LOG.debug('bind fail: password for who=%s not found',
core.utf8_decode(who))
common.utf8_decode(who))
raise ldap.INAPPROPRIATE_AUTH
if cred != core.utf8_encode(db_password):
if cred != common.utf8_encode(db_password):
LOG.debug('bind fail: password for who=%s does not match',
core.utf8_decode(who))
common.utf8_decode(who))
raise ldap.INVALID_CREDENTIALS
def unbind_s(self):
@ -343,7 +344,7 @@ class FakeLdap(core.LDAPHandler):
if k == id_attr:
for val in dummy_v:
if core.utf8_decode(val) == id_value:
if common.utf8_decode(val) == id_value:
id_attr_in_modlist = True
if not id_attr_in_modlist:
@ -352,10 +353,10 @@ class FakeLdap(core.LDAPHandler):
raise ldap.NAMING_VIOLATION
key = self.key(dn)
LOG.debug('add item: dn=%(dn)s, attrs=%(attrs)s', {
'dn': core.utf8_decode(dn), 'attrs': modlist})
'dn': common.utf8_decode(dn), 'attrs': modlist})
if key in self.db:
LOG.debug('add item failed: dn=%s is already in store.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.ALREADY_EXISTS(dn)
self.db[key] = {k: _internal_attr(k, v) for k, v in modlist}
@ -379,17 +380,17 @@ class FakeLdap(core.LDAPHandler):
try:
if CONTROL_TREEDELETE in [c.controlType for c in serverctrls]:
LOG.debug('FakeLdap subtree_delete item: dn=%s',
core.utf8_decode(dn))
common.utf8_decode(dn))
children = self._getChildren(dn)
for c in children:
del self.db[c]
key = self.key(dn)
LOG.debug('FakeLdap delete item: dn=%s', core.utf8_decode(dn))
LOG.debug('FakeLdap delete item: dn=%s', common.utf8_decode(dn))
del self.db[key]
except KeyError:
LOG.debug('delete item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
self.db.sync()
@ -405,12 +406,12 @@ class FakeLdap(core.LDAPHandler):
key = self.key(dn)
LOG.debug('modify item: dn=%(dn)s attrs=%(attrs)s', {
'dn': core.utf8_decode(dn), 'attrs': modlist})
'dn': common.utf8_decode(dn), 'attrs': modlist})
try:
entry = self.db[key]
except KeyError:
LOG.debug('modify item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
for cmd, k, v in modlist:
@ -495,14 +496,14 @@ class FakeLdap(core.LDAPHandler):
elif scope == ldap.SCOPE_ONELEVEL:
def get_entries():
base_dn = ldap.dn.str2dn(core.utf8_encode(base))
base_dn = ldap.dn.str2dn(common.utf8_encode(base))
base_len = len(base_dn)
for k, v in self.db.items():
if not k.startswith(self.__prefix):
continue
k_dn_str = k[len(self.__prefix):]
k_dn = ldap.dn.str2dn(core.utf8_encode(k_dn_str))
k_dn = ldap.dn.str2dn(common.utf8_encode(k_dn_str))
if len(k_dn) != base_len + 1:
continue
if k_dn[-base_len:] != base_dn:
@ -518,13 +519,13 @@ class FakeLdap(core.LDAPHandler):
objects = []
for dn, attrs in results:
# filter the objects by filterstr
id_attr, id_val, _ = ldap.dn.str2dn(core.utf8_encode(dn))[0][0]
id_attr = core.utf8_decode(id_attr)
id_val = core.utf8_decode(id_val)
id_attr, id_val, _ = ldap.dn.str2dn(common.utf8_encode(dn))[0][0]
id_attr = common.utf8_decode(id_attr)
id_val = common.utf8_decode(id_val)
match_attrs = attrs.copy()
match_attrs[id_attr] = [id_val]
attrs_checked = set()
if not filterstr or _match_query(core.utf8_decode(filterstr),
if not filterstr or _match_query(common.utf8_decode(filterstr),
match_attrs,
attrs_checked):
if (filterstr and
@ -533,7 +534,7 @@ class FakeLdap(core.LDAPHandler):
raise AssertionError('No objectClass in search filter')
# filter the attributes by attrlist
attrs = {k: v for k, v in attrs.items()
if not attrlist or k in core.utf8_decode(attrlist)}
if not attrlist or k in common.utf8_decode(attrlist)}
objects.append((dn, attrs))
return objects
@ -658,7 +659,7 @@ class FakeLdapNoSubtreeDelete(FakeLdap):
except KeyError:
LOG.debug('delete item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
super(FakeLdapNoSubtreeDelete, self).delete_ext_s(dn,
serverctrls,

View File

@ -22,8 +22,7 @@ from oslo_config import cfg
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import ldap as ks_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap
@ -40,82 +39,82 @@ class DnCompareTest(unit.BaseTestCase):
# prep_case_insensitive returns the string with spaces at the front and
# end if it's already lowercase and no insignificant characters.
value = 'lowercase value'
self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(value, common_ldap.prep_case_insensitive(value))
def test_prep_lowercase(self):
# prep_case_insensitive returns the string with spaces at the front and
# end and lowercases the value.
value = 'UPPERCASE VALUE'
exp_value = value.lower()
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_prep_insignificant(self):
# prep_case_insensitive remove insignificant spaces.
value = 'before after'
exp_value = 'before after'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_prep_insignificant_pre_post(self):
# prep_case_insensitive remove insignificant spaces.
value = ' value '
exp_value = 'value'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_ava_equal_same(self):
# is_ava_value_equal returns True if the two values are the same.
value = 'val1'
self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
self.assertTrue(common_ldap.is_ava_value_equal('cn', value, value))
def test_ava_equal_complex(self):
# is_ava_value_equal returns True if the two values are the same using
# a value that's got different capitalization and insignificant chars.
val1 = 'before after'
val2 = ' BEFORE afTer '
self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
self.assertTrue(common_ldap.is_ava_value_equal('cn', val1, val2))
def test_ava_different(self):
# is_ava_value_equal returns False if the values aren't the same.
self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
self.assertFalse(common_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
def test_rdn_same(self):
# is_rdn_equal returns True if the two values are the same.
rdn = ldap.dn.str2dn('cn=val1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
self.assertTrue(common_ldap.is_rdn_equal(rdn, rdn))
def test_rdn_diff_length(self):
# is_rdn_equal returns False if the RDNs have a different number of
# AVAs.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_same_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same, even if in a different order
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_type(self):
# is_rdn_equal returns False if the RDNs have the same number of AVAs
# and the attribute types are different.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_case_diff(self):
# is_rdn_equal returns True for same RDNs even when attr type case is
# different.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('CN=cn1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_alias(self):
# is_rdn_equal returns False for same RDNs even when attr type alias is
@ -123,82 +122,82 @@ class DnCompareTest(unit.BaseTestCase):
# consider them equal.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_dn_same(self):
# is_dn_equal returns True if the DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
self.assertTrue(common_ldap.is_dn_equal(dn, dn))
def test_dn_equal_unicode(self):
# is_dn_equal can accept unicode
dn = u'cn=fäké,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
self.assertTrue(common_ldap.is_dn_equal(dn, dn))
def test_dn_diff_length(self):
# is_dn_equal returns False if the DNs don't have the same number of
# RDNs
dn1 = 'cn=Babs Jansen,ou=OpenStack'
dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
self.assertFalse(common_ldap.is_dn_equal(dn1, dn2))
def test_dn_equal_rdns(self):
# is_dn_equal returns True if the DNs have the same number of RDNs
# and each RDN is the same.
dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
self.assertTrue(common_ldap.is_dn_equal(dn1, dn2))
def test_dn_parsed_dns(self):
# is_dn_equal can also accept parsed DNs.
dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
self.assertTrue(common_ldap.is_dn_equal(dn_str1, dn_str2))
def test_startswith_under_child(self):
# dn_startswith returns True if descendant_dn is a child of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
self.assertTrue(common_ldap.dn_startswith(child, parent))
def test_startswith_parent(self):
# dn_startswith returns False if descendant_dn is a parent of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(parent, child))
self.assertFalse(common_ldap.dn_startswith(parent, child))
def test_startswith_same(self):
# dn_startswith returns False if DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(dn, dn))
self.assertFalse(common_ldap.dn_startswith(dn, dn))
def test_startswith_not_parent(self):
# dn_startswith returns False if descendant_dn is not under the dn
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'dc=example.com'
self.assertFalse(ks_ldap.dn_startswith(child, parent))
self.assertFalse(common_ldap.dn_startswith(child, parent))
def test_startswith_descendant(self):
# dn_startswith returns True if descendant_dn is a descendant of dn.
descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
dn = 'ou=OpenStack,dc=example.com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
descendant = 'uid=12345,ou=Users,dc=example,dc=com'
dn = 'ou=Users,dc=example,dc=com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
def test_startswith_parsed_dns(self):
# dn_startswith also accepts parsed DNs.
descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
dn = ldap.dn.str2dn('ou=OpenStack')
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
def test_startswith_unicode(self):
# dn_startswith accepts unicode.
child = u'cn=fäké,ou=OpenStäck'
parent = u'ou=OpenStäck'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
self.assertTrue(common_ldap.dn_startswith(child, parent))
class LDAPDeleteTreeTest(unit.TestCase):
@ -206,15 +205,15 @@ class LDAPDeleteTreeTest(unit.TestCase):
def setUp(self):
super(LDAPDeleteTreeTest, self).setUp()
ks_ldap.register_handler('fake://',
fakeldap.FakeLdapNoSubtreeDelete)
common_ldap.register_handler('fake://',
fakeldap.FakeLdapNoSubtreeDelete)
self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
self.load_fixtures(default_fixtures)
self.addCleanup(self.clear_database)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
def clear_database(self):
for shelf in fakeldap.FakeShelves:
@ -264,7 +263,7 @@ class LDAPDeleteTreeTest(unit.TestCase):
scope = ldap.SCOPE_SUBTREE
filt = '(|(objectclass=*)(objectclass=ldapsubentry))'
entries = conn.search_s(base_dn, scope, filt,
attrlist=common_ldap_core.DN_ONLY)
attrlist=common_ldap.DN_ONLY)
self.assertThat(entries, matchers.HasLength(3))
sort_ents = sorted([e[0] for e in entries], key=len, reverse=True)
self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents)
@ -292,14 +291,14 @@ class MultiURLTests(unit.TestCase):
def test_multiple_urls_with_comma_no_conn_pool(self):
urls = 'ldap://localhost,ldap://backup.localhost'
self.config_fixture.config(group='ldap', url=urls, use_pool=False)
base_ldap = ks_ldap.BaseLdap(CONF)
base_ldap = common_ldap.BaseLdap(CONF)
ldap_connection = base_ldap.get_connection()
self.assertEqual(urls, ldap_connection.conn.conn._uri)
def test_multiple_urls_with_comma_with_conn_pool(self):
urls = 'ldap://localhost,ldap://backup.localhost'
self.config_fixture.config(group='ldap', url=urls, use_pool=True)
base_ldap = ks_ldap.BaseLdap(CONF)
base_ldap = common_ldap.BaseLdap(CONF)
ldap_connection = base_ldap.get_connection()
self.assertEqual(urls, ldap_connection.conn.conn_pool.uri)
@ -307,11 +306,11 @@ class MultiURLTests(unit.TestCase):
class SslTlsTest(unit.TestCase):
"""Test for the SSL/TLS functionality in keystone.common.ldap.core."""
@mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
# Attempt to connect to initialize python-ldap.
base_ldap = ks_ldap.BaseLdap(config)
base_ldap = common_ldap.BaseLdap(config)
base_ldap.get_connection()
def test_certfile_trust_tls(self):
@ -378,8 +377,8 @@ class LDAPPagedResultsTest(unit.TestCase):
super(LDAPPagedResultsTest, self).setUp()
self.clear_database()
ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.addCleanup(common_ldap_core._HANDLERS.clear)
common_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.addCleanup(common_ldap._HANDLERS.clear)
self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
@ -425,7 +424,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'binary_attr': [b'\x00\xFF\x00\xFF']
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The attribute containing the binary value should
# not be present in the converted result.
self.assertNotIn('binary_attr', py_result[0][1])
@ -434,23 +433,23 @@ class CommonLdapTestCase(unit.BaseTestCase):
value_unicode = u'fäké1'
value_utf8 = value_unicode.encode('utf-8')
result_utf8 = ks_ldap.utf8_encode(value_unicode)
result_utf8 = common_ldap.utf8_encode(value_unicode)
self.assertEqual(value_utf8, result_utf8)
result_utf8 = ks_ldap.utf8_encode(value_utf8)
result_utf8 = common_ldap.utf8_encode(value_utf8)
self.assertEqual(value_utf8, result_utf8)
result_unicode = ks_ldap.utf8_decode(value_utf8)
result_unicode = common_ldap.utf8_decode(value_utf8)
self.assertEqual(value_unicode, result_unicode)
result_unicode = ks_ldap.utf8_decode(value_unicode)
result_unicode = common_ldap.utf8_decode(value_unicode)
self.assertEqual(value_unicode, result_unicode)
self.assertRaises(TypeError,
ks_ldap.utf8_encode,
common_ldap.utf8_encode,
100)
result_unicode = ks_ldap.utf8_decode(100)
result_unicode = common_ldap.utf8_decode(100)
self.assertEqual(u'100', result_unicode)
def test_user_id_begins_with_0(self):
@ -462,7 +461,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': ['TRUE']
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be True
self.assertIs(py_result[0][1]['enabled'][0], True)
@ -479,7 +478,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': [bitmask]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be 225
self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
@ -496,7 +495,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': [bitmask]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be 225, the 0 is dropped.
self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
@ -514,7 +513,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'user_name': [user_name]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user name should still be a string value.
self.assertEqual(user_name, py_result[0][1]['user_name'][0])
@ -525,7 +524,7 @@ class LDAPFilterQueryCompositionTest(unit.TestCase):
def setUp(self):
super(LDAPFilterQueryCompositionTest, self).setUp()
self.base_ldap = ks_ldap.BaseLdap(self.config_fixture.conf)
self.base_ldap = common_ldap.BaseLdap(self.config_fixture.conf)
# The tests need an attribute mapping to use.
self.attribute_name = uuid.uuid4().hex

View File

@ -13,8 +13,7 @@
import fixtures
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests.unit import fakeldap
@ -24,11 +23,11 @@ class LDAPDatabase(fixtures.Fixture):
def setUp(self):
super(LDAPDatabase, self).setUp()
self.clear()
common_ldap_core._HANDLERS.clear()
common_ldap._HANDLERS.clear()
common_ldap.register_handler('fake://', fakeldap.FakeLdap)
# TODO(dstanek): switch the flow here
self.addCleanup(self.clear)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
def clear(self):
for shelf in fakeldap.FakeShelves:

View File

@ -29,10 +29,9 @@ from testtools import matchers
from keystone.common import cache
from keystone.common import driver_hints
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone import exception
from keystone import identity
from keystone.identity.backends.ldap import common as common_ldap
from keystone.identity.mapping_backends import mapping as map
from keystone.tests import unit
from keystone.tests.unit.assignment import test_backends as assignment_tests
@ -1213,7 +1212,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = self.identity_api.get_user(user_ref['id'])
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_invert_no_enabled_value(self, mock_ldap_get):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default=False)
@ -1234,7 +1233,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# from the resource default.
self.assertIs(not CONF.ldap.user_enabled_default, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_invert_default_str_value(self, mock_ldap_get):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default='False')
@ -1255,7 +1254,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# from the resource default.
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_expired(self, mock_ldap_get):
# If using 'passwordisexpired' as enabled attribute, and inverting it,
# Then an unauthorized user (expired password) should not be enabled.
@ -1275,7 +1274,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = user_api.get('123456789')
self.assertIs(False, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
# If using 'passwordisexpired' as enabled attribute, and inverting it,
# and the result is utf8 encoded, then the an authorized user should
@ -1296,7 +1295,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = user_api.get('123456789')
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_user_api_get_connection_no_user_password(self, mocked_method):
"""Don't bind in case the user and password are blank."""
# Ensure the username/password are in-fact blank
@ -1306,7 +1305,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertFalse(mocked_method.called,
msg='`simple_bind_s` method was unexpectedly called')
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_chase_referrals_off(self, mocked_fakeldap):
self.config_fixture.config(
group='ldap',
@ -1320,7 +1319,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# is as expected.
self.assertFalse(mocked_fakeldap.call_args[-1]['chase_referrals'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_chase_referrals_on(self, mocked_fakeldap):
self.config_fixture.config(
group='ldap',
@ -1334,7 +1333,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# is as expected.
self.assertTrue(mocked_fakeldap.call_args[-1]['chase_referrals'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_debug_level_set(self, mocked_fakeldap):
level = 12345
self.config_fixture.config(
@ -1459,7 +1458,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
users = self.identity_api.driver.user.get_all()
self.assertThat(users, matchers.HasLength(len(default_fixtures.USERS)))
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_mixed_case_attribute(self, mock_ldap_get):
# Mock the search results to return attribute names
# with unexpected case.
@ -1827,7 +1826,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# user_ref['id'] should contains the email attribute
self.assertEqual(self.user_foo['email'], user_ref['id'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_get_id_from_dn_for_multivalued_attribute_id(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -1850,7 +1849,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# has multiple values
self.assertEqual('nobodycares', user_ref['id'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_id_attribute_not_found(self, mock_ldap_get):
mock_ldap_get.return_value = (
'cn=nobodycares,dc=example,dc=com',
@ -1864,7 +1863,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_api.get,
'nobodycares')
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_id_not_in_dn(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -1884,7 +1883,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertEqual('crap', user_ref['id'])
self.assertEqual('junk', user_ref['name'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_name_in_dn(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -2109,7 +2108,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.skipTest(
"N/A: Covered by test_user_enabled_invert")
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
# Since user_enabled_emulation is enabled in this test, this test will
# fail since it's using user_enabled_invert.
@ -2507,7 +2506,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
def test_list_limit_domain_specific_inheritance(self, ldap_get_all):
# passiging hints is important, because if it's not passed, limiting
# is considered be disabled
@ -2522,7 +2521,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
hints = args[0]
self.assertEqual(1000, hints.limit['limit'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
def test_list_limit_domain_specific_override(self, ldap_get_all):
# passiging hints is important, because if it's not passed, limiting
# is considered to be disabled

View File

@ -19,8 +19,8 @@ import ldappool
import mock
from oslo_config import cfg
from keystone.common.ldap import core as ldap_core
from keystone.identity.backends import ldap
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import fakeldap
from keystone.tests.unit import test_backend_ldap
@ -32,7 +32,7 @@ class LdapPoolCommonTestMixin(object):
"""LDAP pool specific common tests used here and in live tests."""
def cleanup_pools(self):
ldap_core.PooledLDAPHandler.connection_pools.clear()
common_ldap.PooledLDAPHandler.connection_pools.clear()
def test_handler_with_use_pool_enabled(self):
# by default use_pool and use_auth_pool is enabled in test pool config
@ -40,11 +40,11 @@ class LdapPoolCommonTestMixin(object):
self.user_foo.pop('password')
self.assertDictEqual(self.user_foo, user_ref)
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
self.assertIsInstance(handler, ldap_core.PooledLDAPHandler)
handler = common_ldap._get_connection(CONF.ldap.url, use_pool=True)
self.assertIsInstance(handler, common_ldap.PooledLDAPHandler)
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_handler_with_use_pool_not_enabled(self, bind_method,
connect_method):
self.config_fixture.config(group='ldap', use_pool=False)
@ -56,10 +56,10 @@ class LdapPoolCommonTestMixin(object):
end_user_auth=True)
# use_auth_pool flag does not matter when use_pool is False
# still handler is non pool version
self.assertIsInstance(handler.conn, ldap_core.PythonLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PythonLDAPHandler)
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_handler_with_end_user_auth_use_pool_not_enabled(self, bind_method,
connect_method):
# by default use_pool is enabled in test pool config
@ -70,13 +70,13 @@ class LdapPoolCommonTestMixin(object):
user_api = ldap.UserApi(CONF)
handler = user_api.get_connection(user=None, password=None,
end_user_auth=True)
self.assertIsInstance(handler.conn, ldap_core.PythonLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PythonLDAPHandler)
# For end_user_auth case, flag should not be false otherwise
# it will use, admin connections ldap pool
handler = user_api.get_connection(user=None, password=None,
end_user_auth=False)
self.assertIsInstance(handler.conn, ldap_core.PooledLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PooledLDAPHandler)
def test_pool_size_set(self):
# get related connection manager instance
@ -214,12 +214,12 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
def setUp(self):
self.useFixture(fixtures.MockPatchObject(
ldap_core.PooledLDAPHandler, 'Connector', fakeldap.FakeLdapPool))
common_ldap.PooledLDAPHandler, 'Connector', fakeldap.FakeLdapPool))
super(LDAPIdentity, self).setUp()
self.addCleanup(self.cleanup_pools)
# storing to local variable to avoid long references
self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
self.conn_pools = common_ldap.PooledLDAPHandler.connection_pools
# super class loads db fixtures which establishes ldap connection
# so adding dummy call to highlight connection pool initialization
# as its not that obvious though its not needed here
@ -230,7 +230,7 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
config_files.append(unit.dirs.tests_conf('backend_ldap_pool.conf'))
return config_files
@mock.patch.object(ldap_core, 'utf8_encode')
@mock.patch.object(common_ldap, 'utf8_encode')
def test_utf8_encoded_is_used_in_pool(self, mocked_method):
def side_effect(arg):
return arg

View File

@ -17,8 +17,8 @@ import uuid
import ldappool
from oslo_config import cfg
from keystone.common.ldap import core as ldap_core
from keystone.identity.backends import ldap
from keystone.identity.backends.ldap import common as ldap_common
from keystone.tests import unit
from keystone.tests.unit import fakeldap
from keystone.tests.unit import test_backend_ldap_pool
@ -40,7 +40,7 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
super(LiveLDAPPoolIdentity, self).setUp()
self.addCleanup(self.cleanup_pools)
# storing to local variable to avoid long references
self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
self.conn_pools = ldap_common.PooledLDAPHandler.connection_pools
def config_files(self):
config_files = super(LiveLDAPPoolIdentity, self).config_files()
@ -48,7 +48,7 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
return config_files
def test_assert_connector_used_not_fake_ldap_pool(self):
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
handler = ldap_common._get_connection(CONF.ldap.url, use_pool=True)
self.assertNotEqual(type(handler.Connector),
type(fakeldap.FakeLdapPool))
self.assertEqual(type(ldappool.StateConnector),
@ -120,7 +120,8 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
return self.identity_api.get_user(user['id'])
def _get_auth_conn_pool_cm(self):
pool_url = ldap_core.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url
pool_url = (
ldap_common.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url)
return self.conn_pools[pool_url]
def _do_password_change_for_one_user(self, password, new_password):