Merge "Isolate common ldap code to the identity backend"

This commit is contained in:
Jenkins 2016-05-23 17:10:25 +00:00 committed by Gerrit Code Review
commit 723208b586
15 changed files with 2199 additions and 2125 deletions

View File

@ -12,4 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common.ldap.core import * # noqa
from oslo_log import versionutils
versionutils.deprecated(
what='keystone.common.ldap',
as_of=versionutils.deprecated.NEWTON,
remove_in=+2,
in_favor_of='keystone.identity.backends.ldap.common')
# NOTE(notmorgan): This is maintained for compatibility in case outside
# developers are relying on this location.
from keystone.identity.backends.ldap.common import * # noqa

File diff suppressed because it is too large Load Diff

View File

@ -12,59 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base model for keystone internal services.
Unless marked otherwise, all fields are strings.
"""
from oslo_log import versionutils
class Model(dict):
"""Base model class."""
versionutils.deprecated(
what='keystone.common.ldap.models',
as_of=versionutils.deprecated.NEWTON,
remove_in=+2,
in_favor_of='keystone.identity.backends.ldap.models')
def __hash__(self):
"""Define hash behavior where hash of service ID is returned."""
return self['id'].__hash__()
@property
def known_keys(cls):
return cls.required_keys + cls.optional_keys
class User(Model):
"""User object.
Required keys:
id
name
domain_id
Optional keys:
password
description
email
enabled (bool, default True)
default_project_id
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('password', 'description', 'email', 'enabled',
'default_project_id')
class Group(Model):
"""Group object.
Required keys:
id
name
domain_id
Optional keys:
description
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('description',)
# NOTE(notmorgan): This is maintained for compatibility in case outside
# developers are relying on this location.
from keystone.identity.backends.ldap.models import * # noqa

View File

@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.identity.backends.ldap.core import * # noqa

File diff suppressed because it is too large Load Diff

View File

@ -21,11 +21,11 @@ from oslo_log import versionutils
import six
from keystone.common import driver_hints
from keystone.common import ldap as common_ldap
from keystone.common.ldap import models
from keystone import exception
from keystone.i18n import _
from keystone.identity.backends import base
from keystone.identity.backends.ldap import common as common_ldap
from keystone.identity.backends.ldap import models
CONF = cfg.CONF

View File

@ -0,0 +1,70 @@
# Copyright (C) 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base model for keystone internal services.
Unless marked otherwise, all fields are strings.
"""
class Model(dict):
"""Base model class."""
def __hash__(self):
"""Define hash behavior where hash of service ID is returned."""
return self['id'].__hash__()
@property
def known_keys(cls):
return cls.required_keys + cls.optional_keys
class User(Model):
"""User object.
Required keys:
id
name
domain_id
Optional keys:
password
description
email
enabled (bool, default True)
default_project_id
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('password', 'description', 'email', 'enabled',
'default_project_id')
class Group(Model):
"""Group object.
Required keys:
id
name
domain_id
Optional keys:
description
"""
required_keys = ('id', 'name', 'domain_id')
optional_keys = ('description',)

View File

@ -15,8 +15,7 @@ import ldap
from oslo_config import cfg
from keystone.common import cache
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap
@ -45,7 +44,7 @@ class BaseBackendLdapCommon(object):
self.load_backends()
self.load_fixtures(default_fixtures)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
self.addCleanup(self.clear_database)
def _get_domain_fixture(self):

View File

@ -46,9 +46,9 @@ from keystone import auth
from keystone.common import config
from keystone.common import dependency
from keystone.common.kvs import core as kvs_core
from keystone.common import ldap as ks_ldap
from keystone.common import sql
from keystone import exception
from keystone.identity.backends.ldap import common as ks_ldap
from keystone import notifications
from keystone.server import common
from keystone.tests.unit import ksfixtures
@ -622,7 +622,7 @@ class TestCase(BaseTestCase):
'routes.middleware=INFO',
'stevedore.extension=INFO',
'keystone.notifications=INFO',
'keystone.common.ldap=INFO',
'keystone.identity.backends.ldap.common=INFO',
])
self.auth_plugin_config_override()

View File

@ -32,9 +32,8 @@ from oslo_log import log
import six
from six import moves
from keystone.common.ldap import core
from keystone import exception
from keystone.identity.backends.ldap import common
SCOPE_NAMES = {
ldap.SCOPE_BASE: 'SCOPE_BASE',
@ -51,7 +50,7 @@ CONF = cfg.CONF
def _internal_attr(attr_name, value_or_values):
def normalize_value(value):
return core.utf8_decode(value)
return common.utf8_decode(value)
def normalize_dn(dn):
# Capitalize the attribute names as an LDAP server might.
@ -69,7 +68,7 @@ def _internal_attr(attr_name, value_or_values):
return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
try:
dn = ldap.dn.str2dn(core.utf8_encode(dn))
dn = ldap.dn.str2dn(common.utf8_encode(dn))
except ldap.DECODING_ERROR:
# NOTE(amakarov): In case of IDs instead of DNs in group members
# they must be handled as regular values.
@ -78,10 +77,10 @@ def _internal_attr(attr_name, value_or_values):
norm = []
for part in dn:
name, val, i = part[0]
name = core.utf8_decode(name)
name = common.utf8_decode(name)
name = name.upper()
norm.append([(name, val, i)])
return core.utf8_decode(ldap.dn.dn2str(norm))
return common.utf8_decode(ldap.dn.dn2str(norm))
if attr_name in ('member', 'roleOccupant'):
attr_fn = normalize_dn
@ -217,7 +216,7 @@ FakeShelves = {}
PendingRequests = {}
class FakeLdap(core.LDAPHandler):
class FakeLdap(common.LDAPHandler):
"""Emulate the python-ldap API.
The python-ldap API requires all strings to be UTF-8 encoded. This
@ -263,7 +262,7 @@ class FakeLdap(core.LDAPHandler):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cacertfile)
elif tls_cacertdir:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, tls_cacertdir)
if tls_req_cert in list(core.LDAP_TLS_CERTS.values()):
if tls_req_cert in list(common.LDAP_TLS_CERTS.values()):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, tls_req_cert)
else:
raise ValueError("invalid TLS_REQUIRE_CERT tls_req_cert=%s",
@ -281,13 +280,15 @@ class FakeLdap(core.LDAPHandler):
self.pool_conn_lifetime = pool_conn_lifetime
def dn(self, dn):
return core.utf8_decode(dn)
return common.utf8_decode(dn)
def _dn_to_id_attr(self, dn):
return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][0])
return common.utf8_decode(
ldap.dn.str2dn(common.utf8_encode(dn))[0][0][0])
def _dn_to_id_value(self, dn):
return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][1])
return common.utf8_decode(
ldap.dn.str2dn(common.utf8_encode(dn))[0][0][1])
def key(self, dn):
return '%s%s' % (self.__prefix, self.dn(dn))
@ -298,14 +299,14 @@ class FakeLdap(core.LDAPHandler):
if server_fail:
raise ldap.SERVER_DOWN
whos = ['cn=Admin', CONF.ldap.user]
if (core.utf8_decode(who) in whos and
core.utf8_decode(cred) in ['password', CONF.ldap.password]):
if (common.utf8_decode(who) in whos and
common.utf8_decode(cred) in ['password', CONF.ldap.password]):
return
try:
attrs = self.db[self.key(who)]
except KeyError:
LOG.debug('bind fail: who=%s not found', core.utf8_decode(who))
LOG.debug('bind fail: who=%s not found', common.utf8_decode(who))
raise ldap.NO_SUCH_OBJECT
db_password = None
@ -313,12 +314,12 @@ class FakeLdap(core.LDAPHandler):
db_password = attrs['userPassword'][0]
except (KeyError, IndexError):
LOG.debug('bind fail: password for who=%s not found',
core.utf8_decode(who))
common.utf8_decode(who))
raise ldap.INAPPROPRIATE_AUTH
if cred != core.utf8_encode(db_password):
if cred != common.utf8_encode(db_password):
LOG.debug('bind fail: password for who=%s does not match',
core.utf8_decode(who))
common.utf8_decode(who))
raise ldap.INVALID_CREDENTIALS
def unbind_s(self):
@ -343,7 +344,7 @@ class FakeLdap(core.LDAPHandler):
if k == id_attr:
for val in dummy_v:
if core.utf8_decode(val) == id_value:
if common.utf8_decode(val) == id_value:
id_attr_in_modlist = True
if not id_attr_in_modlist:
@ -352,10 +353,10 @@ class FakeLdap(core.LDAPHandler):
raise ldap.NAMING_VIOLATION
key = self.key(dn)
LOG.debug('add item: dn=%(dn)s, attrs=%(attrs)s', {
'dn': core.utf8_decode(dn), 'attrs': modlist})
'dn': common.utf8_decode(dn), 'attrs': modlist})
if key in self.db:
LOG.debug('add item failed: dn=%s is already in store.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.ALREADY_EXISTS(dn)
self.db[key] = {k: _internal_attr(k, v) for k, v in modlist}
@ -379,17 +380,17 @@ class FakeLdap(core.LDAPHandler):
try:
if CONTROL_TREEDELETE in [c.controlType for c in serverctrls]:
LOG.debug('FakeLdap subtree_delete item: dn=%s',
core.utf8_decode(dn))
common.utf8_decode(dn))
children = self._getChildren(dn)
for c in children:
del self.db[c]
key = self.key(dn)
LOG.debug('FakeLdap delete item: dn=%s', core.utf8_decode(dn))
LOG.debug('FakeLdap delete item: dn=%s', common.utf8_decode(dn))
del self.db[key]
except KeyError:
LOG.debug('delete item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
self.db.sync()
@ -405,12 +406,12 @@ class FakeLdap(core.LDAPHandler):
key = self.key(dn)
LOG.debug('modify item: dn=%(dn)s attrs=%(attrs)s', {
'dn': core.utf8_decode(dn), 'attrs': modlist})
'dn': common.utf8_decode(dn), 'attrs': modlist})
try:
entry = self.db[key]
except KeyError:
LOG.debug('modify item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
for cmd, k, v in modlist:
@ -495,14 +496,14 @@ class FakeLdap(core.LDAPHandler):
elif scope == ldap.SCOPE_ONELEVEL:
def get_entries():
base_dn = ldap.dn.str2dn(core.utf8_encode(base))
base_dn = ldap.dn.str2dn(common.utf8_encode(base))
base_len = len(base_dn)
for k, v in self.db.items():
if not k.startswith(self.__prefix):
continue
k_dn_str = k[len(self.__prefix):]
k_dn = ldap.dn.str2dn(core.utf8_encode(k_dn_str))
k_dn = ldap.dn.str2dn(common.utf8_encode(k_dn_str))
if len(k_dn) != base_len + 1:
continue
if k_dn[-base_len:] != base_dn:
@ -518,13 +519,13 @@ class FakeLdap(core.LDAPHandler):
objects = []
for dn, attrs in results:
# filter the objects by filterstr
id_attr, id_val, _ = ldap.dn.str2dn(core.utf8_encode(dn))[0][0]
id_attr = core.utf8_decode(id_attr)
id_val = core.utf8_decode(id_val)
id_attr, id_val, _ = ldap.dn.str2dn(common.utf8_encode(dn))[0][0]
id_attr = common.utf8_decode(id_attr)
id_val = common.utf8_decode(id_val)
match_attrs = attrs.copy()
match_attrs[id_attr] = [id_val]
attrs_checked = set()
if not filterstr or _match_query(core.utf8_decode(filterstr),
if not filterstr or _match_query(common.utf8_decode(filterstr),
match_attrs,
attrs_checked):
if (filterstr and
@ -533,7 +534,7 @@ class FakeLdap(core.LDAPHandler):
raise AssertionError('No objectClass in search filter')
# filter the attributes by attrlist
attrs = {k: v for k, v in attrs.items()
if not attrlist or k in core.utf8_decode(attrlist)}
if not attrlist or k in common.utf8_decode(attrlist)}
objects.append((dn, attrs))
return objects
@ -658,7 +659,7 @@ class FakeLdapNoSubtreeDelete(FakeLdap):
except KeyError:
LOG.debug('delete item failed: dn=%s not found.',
core.utf8_decode(dn))
common.utf8_decode(dn))
raise ldap.NO_SUCH_OBJECT
super(FakeLdapNoSubtreeDelete, self).delete_ext_s(dn,
serverctrls,

View File

@ -22,8 +22,7 @@ from oslo_config import cfg
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import ldap as ks_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap
@ -40,82 +39,82 @@ class DnCompareTest(unit.BaseTestCase):
# prep_case_insensitive returns the string with spaces at the front and
# end if it's already lowercase and no insignificant characters.
value = 'lowercase value'
self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(value, common_ldap.prep_case_insensitive(value))
def test_prep_lowercase(self):
# prep_case_insensitive returns the string with spaces at the front and
# end and lowercases the value.
value = 'UPPERCASE VALUE'
exp_value = value.lower()
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_prep_insignificant(self):
# prep_case_insensitive remove insignificant spaces.
value = 'before after'
exp_value = 'before after'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_prep_insignificant_pre_post(self):
# prep_case_insensitive remove insignificant spaces.
value = ' value '
exp_value = 'value'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
def test_ava_equal_same(self):
# is_ava_value_equal returns True if the two values are the same.
value = 'val1'
self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
self.assertTrue(common_ldap.is_ava_value_equal('cn', value, value))
def test_ava_equal_complex(self):
# is_ava_value_equal returns True if the two values are the same using
# a value that's got different capitalization and insignificant chars.
val1 = 'before after'
val2 = ' BEFORE afTer '
self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
self.assertTrue(common_ldap.is_ava_value_equal('cn', val1, val2))
def test_ava_different(self):
# is_ava_value_equal returns False if the values aren't the same.
self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
self.assertFalse(common_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
def test_rdn_same(self):
# is_rdn_equal returns True if the two values are the same.
rdn = ldap.dn.str2dn('cn=val1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
self.assertTrue(common_ldap.is_rdn_equal(rdn, rdn))
def test_rdn_diff_length(self):
# is_rdn_equal returns False if the RDNs have a different number of
# AVAs.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_same_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same, even if in a different order
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_type(self):
# is_rdn_equal returns False if the RDNs have the same number of AVAs
# and the attribute types are different.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_case_diff(self):
# is_rdn_equal returns True for same RDNs even when attr type case is
# different.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('CN=cn1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_alias(self):
# is_rdn_equal returns False for same RDNs even when attr type alias is
@ -123,82 +122,82 @@ class DnCompareTest(unit.BaseTestCase):
# consider them equal.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
def test_dn_same(self):
# is_dn_equal returns True if the DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
self.assertTrue(common_ldap.is_dn_equal(dn, dn))
def test_dn_equal_unicode(self):
# is_dn_equal can accept unicode
dn = u'cn=fäké,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
self.assertTrue(common_ldap.is_dn_equal(dn, dn))
def test_dn_diff_length(self):
# is_dn_equal returns False if the DNs don't have the same number of
# RDNs
dn1 = 'cn=Babs Jansen,ou=OpenStack'
dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
self.assertFalse(common_ldap.is_dn_equal(dn1, dn2))
def test_dn_equal_rdns(self):
# is_dn_equal returns True if the DNs have the same number of RDNs
# and each RDN is the same.
dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
self.assertTrue(common_ldap.is_dn_equal(dn1, dn2))
def test_dn_parsed_dns(self):
# is_dn_equal can also accept parsed DNs.
dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
self.assertTrue(common_ldap.is_dn_equal(dn_str1, dn_str2))
def test_startswith_under_child(self):
# dn_startswith returns True if descendant_dn is a child of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
self.assertTrue(common_ldap.dn_startswith(child, parent))
def test_startswith_parent(self):
# dn_startswith returns False if descendant_dn is a parent of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(parent, child))
self.assertFalse(common_ldap.dn_startswith(parent, child))
def test_startswith_same(self):
# dn_startswith returns False if DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(dn, dn))
self.assertFalse(common_ldap.dn_startswith(dn, dn))
def test_startswith_not_parent(self):
# dn_startswith returns False if descendant_dn is not under the dn
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'dc=example.com'
self.assertFalse(ks_ldap.dn_startswith(child, parent))
self.assertFalse(common_ldap.dn_startswith(child, parent))
def test_startswith_descendant(self):
# dn_startswith returns True if descendant_dn is a descendant of dn.
descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
dn = 'ou=OpenStack,dc=example.com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
descendant = 'uid=12345,ou=Users,dc=example,dc=com'
dn = 'ou=Users,dc=example,dc=com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
def test_startswith_parsed_dns(self):
# dn_startswith also accepts parsed DNs.
descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
dn = ldap.dn.str2dn('ou=OpenStack')
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
self.assertTrue(common_ldap.dn_startswith(descendant, dn))
def test_startswith_unicode(self):
# dn_startswith accepts unicode.
child = u'cn=fäké,ou=OpenStäck'
parent = u'ou=OpenStäck'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
self.assertTrue(common_ldap.dn_startswith(child, parent))
class LDAPDeleteTreeTest(unit.TestCase):
@ -206,15 +205,15 @@ class LDAPDeleteTreeTest(unit.TestCase):
def setUp(self):
super(LDAPDeleteTreeTest, self).setUp()
ks_ldap.register_handler('fake://',
fakeldap.FakeLdapNoSubtreeDelete)
common_ldap.register_handler('fake://',
fakeldap.FakeLdapNoSubtreeDelete)
self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
self.load_fixtures(default_fixtures)
self.addCleanup(self.clear_database)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
def clear_database(self):
for shelf in fakeldap.FakeShelves:
@ -264,7 +263,7 @@ class LDAPDeleteTreeTest(unit.TestCase):
scope = ldap.SCOPE_SUBTREE
filt = '(|(objectclass=*)(objectclass=ldapsubentry))'
entries = conn.search_s(base_dn, scope, filt,
attrlist=common_ldap_core.DN_ONLY)
attrlist=common_ldap.DN_ONLY)
self.assertThat(entries, matchers.HasLength(3))
sort_ents = sorted([e[0] for e in entries], key=len, reverse=True)
self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents)
@ -292,14 +291,14 @@ class MultiURLTests(unit.TestCase):
def test_multiple_urls_with_comma_no_conn_pool(self):
urls = 'ldap://localhost,ldap://backup.localhost'
self.config_fixture.config(group='ldap', url=urls, use_pool=False)
base_ldap = ks_ldap.BaseLdap(CONF)
base_ldap = common_ldap.BaseLdap(CONF)
ldap_connection = base_ldap.get_connection()
self.assertEqual(urls, ldap_connection.conn.conn._uri)
def test_multiple_urls_with_comma_with_conn_pool(self):
urls = 'ldap://localhost,ldap://backup.localhost'
self.config_fixture.config(group='ldap', url=urls, use_pool=True)
base_ldap = ks_ldap.BaseLdap(CONF)
base_ldap = common_ldap.BaseLdap(CONF)
ldap_connection = base_ldap.get_connection()
self.assertEqual(urls, ldap_connection.conn.conn_pool.uri)
@ -307,11 +306,11 @@ class MultiURLTests(unit.TestCase):
class SslTlsTest(unit.TestCase):
"""Test for the SSL/TLS functionality in keystone.common.ldap.core."""
@mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
# Attempt to connect to initialize python-ldap.
base_ldap = ks_ldap.BaseLdap(config)
base_ldap = common_ldap.BaseLdap(config)
base_ldap.get_connection()
def test_certfile_trust_tls(self):
@ -378,8 +377,8 @@ class LDAPPagedResultsTest(unit.TestCase):
super(LDAPPagedResultsTest, self).setUp()
self.clear_database()
ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.addCleanup(common_ldap_core._HANDLERS.clear)
common_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.addCleanup(common_ldap._HANDLERS.clear)
self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
@ -425,7 +424,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'binary_attr': [b'\x00\xFF\x00\xFF']
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The attribute containing the binary value should
# not be present in the converted result.
self.assertNotIn('binary_attr', py_result[0][1])
@ -434,23 +433,23 @@ class CommonLdapTestCase(unit.BaseTestCase):
value_unicode = u'fäké1'
value_utf8 = value_unicode.encode('utf-8')
result_utf8 = ks_ldap.utf8_encode(value_unicode)
result_utf8 = common_ldap.utf8_encode(value_unicode)
self.assertEqual(value_utf8, result_utf8)
result_utf8 = ks_ldap.utf8_encode(value_utf8)
result_utf8 = common_ldap.utf8_encode(value_utf8)
self.assertEqual(value_utf8, result_utf8)
result_unicode = ks_ldap.utf8_decode(value_utf8)
result_unicode = common_ldap.utf8_decode(value_utf8)
self.assertEqual(value_unicode, result_unicode)
result_unicode = ks_ldap.utf8_decode(value_unicode)
result_unicode = common_ldap.utf8_decode(value_unicode)
self.assertEqual(value_unicode, result_unicode)
self.assertRaises(TypeError,
ks_ldap.utf8_encode,
common_ldap.utf8_encode,
100)
result_unicode = ks_ldap.utf8_decode(100)
result_unicode = common_ldap.utf8_decode(100)
self.assertEqual(u'100', result_unicode)
def test_user_id_begins_with_0(self):
@ -462,7 +461,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': ['TRUE']
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be True
self.assertIs(py_result[0][1]['enabled'][0], True)
@ -479,7 +478,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': [bitmask]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be 225
self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
@ -496,7 +495,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'enabled': [bitmask]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user id should be 0123456, and the enabled
# flag should be 225, the 0 is dropped.
self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
@ -514,7 +513,7 @@ class CommonLdapTestCase(unit.BaseTestCase):
'user_name': [user_name]
}
), ]
py_result = ks_ldap.convert_ldap_result(result)
py_result = common_ldap.convert_ldap_result(result)
# The user name should still be a string value.
self.assertEqual(user_name, py_result[0][1]['user_name'][0])
@ -525,7 +524,7 @@ class LDAPFilterQueryCompositionTest(unit.TestCase):
def setUp(self):
super(LDAPFilterQueryCompositionTest, self).setUp()
self.base_ldap = ks_ldap.BaseLdap(self.config_fixture.conf)
self.base_ldap = common_ldap.BaseLdap(self.config_fixture.conf)
# The tests need an attribute mapping to use.
self.attribute_name = uuid.uuid4().hex

View File

@ -13,8 +13,7 @@
import fixtures
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests.unit import fakeldap
@ -24,11 +23,11 @@ class LDAPDatabase(fixtures.Fixture):
def setUp(self):
super(LDAPDatabase, self).setUp()
self.clear()
common_ldap_core._HANDLERS.clear()
common_ldap._HANDLERS.clear()
common_ldap.register_handler('fake://', fakeldap.FakeLdap)
# TODO(dstanek): switch the flow here
self.addCleanup(self.clear)
self.addCleanup(common_ldap_core._HANDLERS.clear)
self.addCleanup(common_ldap._HANDLERS.clear)
def clear(self):
for shelf in fakeldap.FakeShelves:

View File

@ -29,10 +29,9 @@ from testtools import matchers
from keystone.common import cache
from keystone.common import driver_hints
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone import exception
from keystone import identity
from keystone.identity.backends.ldap import common as common_ldap
from keystone.identity.mapping_backends import mapping as map
from keystone.tests import unit
from keystone.tests.unit.assignment import test_backends as assignment_tests
@ -1213,7 +1212,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = self.identity_api.get_user(user_ref['id'])
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_invert_no_enabled_value(self, mock_ldap_get):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default=False)
@ -1234,7 +1233,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# from the resource default.
self.assertIs(not CONF.ldap.user_enabled_default, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_invert_default_str_value(self, mock_ldap_get):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default='False')
@ -1255,7 +1254,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# from the resource default.
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_expired(self, mock_ldap_get):
# If using 'passwordisexpired' as enabled attribute, and inverting it,
# Then an unauthorized user (expired password) should not be enabled.
@ -1275,7 +1274,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = user_api.get('123456789')
self.assertIs(False, user_ref['enabled'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
# If using 'passwordisexpired' as enabled attribute, and inverting it,
# and the result is utf8 encoded, then the an authorized user should
@ -1296,7 +1295,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_ref = user_api.get('123456789')
self.assertIs(True, user_ref['enabled'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_user_api_get_connection_no_user_password(self, mocked_method):
"""Don't bind in case the user and password are blank."""
# Ensure the username/password are in-fact blank
@ -1306,7 +1305,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertFalse(mocked_method.called,
msg='`simple_bind_s` method was unexpectedly called')
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_chase_referrals_off(self, mocked_fakeldap):
self.config_fixture.config(
group='ldap',
@ -1320,7 +1319,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# is as expected.
self.assertFalse(mocked_fakeldap.call_args[-1]['chase_referrals'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_chase_referrals_on(self, mocked_fakeldap):
self.config_fixture.config(
group='ldap',
@ -1334,7 +1333,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# is as expected.
self.assertTrue(mocked_fakeldap.call_args[-1]['chase_referrals'])
@mock.patch.object(common_ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
def test_debug_level_set(self, mocked_fakeldap):
level = 12345
self.config_fixture.config(
@ -1459,7 +1458,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
users = self.identity_api.driver.user.get_all()
self.assertThat(users, matchers.HasLength(len(default_fixtures.USERS)))
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_mixed_case_attribute(self, mock_ldap_get):
# Mock the search results to return attribute names
# with unexpected case.
@ -1827,7 +1826,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# user_ref['id'] should contains the email attribute
self.assertEqual(self.user_foo['email'], user_ref['id'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_get_id_from_dn_for_multivalued_attribute_id(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -1850,7 +1849,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# has multiple values
self.assertEqual('nobodycares', user_ref['id'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_id_attribute_not_found(self, mock_ldap_get):
mock_ldap_get.return_value = (
'cn=nobodycares,dc=example,dc=com',
@ -1864,7 +1863,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_api.get,
'nobodycares')
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_id_not_in_dn(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -1884,7 +1883,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertEqual('crap', user_ref['id'])
self.assertEqual('junk', user_ref['name'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_name_in_dn(self, mock_ldap_get):
driver = self.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
@ -2109,7 +2108,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.skipTest(
"N/A: Covered by test_user_enabled_invert")
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
# Since user_enabled_emulation is enabled in this test, this test will
# fail since it's using user_enabled_invert.
@ -2507,7 +2506,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
def test_list_limit_domain_specific_inheritance(self, ldap_get_all):
# passiging hints is important, because if it's not passed, limiting
# is considered be disabled
@ -2522,7 +2521,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
hints = args[0]
self.assertEqual(1000, hints.limit['limit'])
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
@mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
def test_list_limit_domain_specific_override(self, ldap_get_all):
# passiging hints is important, because if it's not passed, limiting
# is considered to be disabled

View File

@ -19,8 +19,8 @@ import ldappool
import mock
from oslo_config import cfg
from keystone.common.ldap import core as ldap_core
from keystone.identity.backends import ldap
from keystone.identity.backends.ldap import common as common_ldap
from keystone.tests import unit
from keystone.tests.unit import fakeldap
from keystone.tests.unit import test_backend_ldap
@ -32,7 +32,7 @@ class LdapPoolCommonTestMixin(object):
"""LDAP pool specific common tests used here and in live tests."""
def cleanup_pools(self):
ldap_core.PooledLDAPHandler.connection_pools.clear()
common_ldap.PooledLDAPHandler.connection_pools.clear()
def test_handler_with_use_pool_enabled(self):
# by default use_pool and use_auth_pool is enabled in test pool config
@ -40,11 +40,11 @@ class LdapPoolCommonTestMixin(object):
self.user_foo.pop('password')
self.assertDictEqual(self.user_foo, user_ref)
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
self.assertIsInstance(handler, ldap_core.PooledLDAPHandler)
handler = common_ldap._get_connection(CONF.ldap.url, use_pool=True)
self.assertIsInstance(handler, common_ldap.PooledLDAPHandler)
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_handler_with_use_pool_not_enabled(self, bind_method,
connect_method):
self.config_fixture.config(group='ldap', use_pool=False)
@ -56,10 +56,10 @@ class LdapPoolCommonTestMixin(object):
end_user_auth=True)
# use_auth_pool flag does not matter when use_pool is False
# still handler is non pool version
self.assertIsInstance(handler.conn, ldap_core.PythonLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PythonLDAPHandler)
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'connect')
@mock.patch.object(ldap_core.KeystoneLDAPHandler, 'simple_bind_s')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
@mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
def test_handler_with_end_user_auth_use_pool_not_enabled(self, bind_method,
connect_method):
# by default use_pool is enabled in test pool config
@ -70,13 +70,13 @@ class LdapPoolCommonTestMixin(object):
user_api = ldap.UserApi(CONF)
handler = user_api.get_connection(user=None, password=None,
end_user_auth=True)
self.assertIsInstance(handler.conn, ldap_core.PythonLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PythonLDAPHandler)
# For end_user_auth case, flag should not be false otherwise
# it will use, admin connections ldap pool
handler = user_api.get_connection(user=None, password=None,
end_user_auth=False)
self.assertIsInstance(handler.conn, ldap_core.PooledLDAPHandler)
self.assertIsInstance(handler.conn, common_ldap.PooledLDAPHandler)
def test_pool_size_set(self):
# get related connection manager instance
@ -214,12 +214,12 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
def setUp(self):
self.useFixture(fixtures.MockPatchObject(
ldap_core.PooledLDAPHandler, 'Connector', fakeldap.FakeLdapPool))
common_ldap.PooledLDAPHandler, 'Connector', fakeldap.FakeLdapPool))
super(LDAPIdentity, self).setUp()
self.addCleanup(self.cleanup_pools)
# storing to local variable to avoid long references
self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
self.conn_pools = common_ldap.PooledLDAPHandler.connection_pools
# super class loads db fixtures which establishes ldap connection
# so adding dummy call to highlight connection pool initialization
# as its not that obvious though its not needed here
@ -230,7 +230,7 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
config_files.append(unit.dirs.tests_conf('backend_ldap_pool.conf'))
return config_files
@mock.patch.object(ldap_core, 'utf8_encode')
@mock.patch.object(common_ldap, 'utf8_encode')
def test_utf8_encoded_is_used_in_pool(self, mocked_method):
def side_effect(arg):
return arg

View File

@ -17,8 +17,8 @@ import uuid
import ldappool
from oslo_config import cfg
from keystone.common.ldap import core as ldap_core
from keystone.identity.backends import ldap
from keystone.identity.backends.ldap import common as ldap_common
from keystone.tests import unit
from keystone.tests.unit import fakeldap
from keystone.tests.unit import test_backend_ldap_pool
@ -40,7 +40,7 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
super(LiveLDAPPoolIdentity, self).setUp()
self.addCleanup(self.cleanup_pools)
# storing to local variable to avoid long references
self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
self.conn_pools = ldap_common.PooledLDAPHandler.connection_pools
def config_files(self):
config_files = super(LiveLDAPPoolIdentity, self).config_files()
@ -48,7 +48,7 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
return config_files
def test_assert_connector_used_not_fake_ldap_pool(self):
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
handler = ldap_common._get_connection(CONF.ldap.url, use_pool=True)
self.assertNotEqual(type(handler.Connector),
type(fakeldap.FakeLdapPool))
self.assertEqual(type(ldappool.StateConnector),
@ -120,7 +120,8 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
return self.identity_api.get_user(user['id'])
def _get_auth_conn_pool_cm(self):
pool_url = ldap_core.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url
pool_url = (
ldap_common.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url)
return self.conn_pools[pool_url]
def _do_password_change_for_one_user(self, password, new_password):