209 lines
8.3 KiB
Python
209 lines
8.3 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import uuid
|
|
|
|
import ldappool
|
|
|
|
from keystone.common.ldap import core as ldap_core
|
|
from keystone import config
|
|
from keystone.identity.backends import ldap
|
|
from keystone import tests
|
|
from keystone.tests import fakeldap
|
|
from keystone.tests import test_backend_ldap_pool
|
|
from keystone.tests import test_ldap_livetest
|
|
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
|
|
test_ldap_livetest.LiveLDAPIdentity):
|
|
"""Executes existing LDAP live test with pooled LDAP handler to make
|
|
sure it works without any error.
|
|
|
|
Also executes common pool specific tests via Mixin class.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(LiveLDAPPoolIdentity, self).setUp()
|
|
self.addCleanup(self.cleanup_pools)
|
|
# storing to local variable to avoid long references
|
|
self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
|
|
|
|
def config_files(self):
|
|
config_files = super(LiveLDAPPoolIdentity, self).config_files()
|
|
config_files.append(tests.dirs.
|
|
tests_conf('backend_pool_liveldap.conf'))
|
|
return config_files
|
|
|
|
def config_overrides(self):
|
|
super(LiveLDAPPoolIdentity, self).config_overrides()
|
|
self.config_fixture.config(
|
|
group='identity',
|
|
driver='keystone.identity.backends.ldap.Identity')
|
|
|
|
def test_assert_connector_used_not_fake_ldap_pool(self):
|
|
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
|
|
self.assertNotEqual(type(handler.Connector),
|
|
type(fakeldap.FakeLdapPool))
|
|
self.assertEqual(type(handler.Connector),
|
|
type(ldappool.StateConnector))
|
|
|
|
def test_async_search_and_result3(self):
|
|
self.config_fixture.config(group='ldap', page_size=1)
|
|
self.test_user_enable_attribute_mask()
|
|
|
|
def test_pool_size_expands_correctly(self):
|
|
|
|
who = CONF.ldap.user
|
|
cred = CONF.ldap.password
|
|
# get related connection manager instance
|
|
ldappool_cm = self.conn_pools[CONF.ldap.url]
|
|
|
|
def _get_conn():
|
|
return ldappool_cm.connection(who, cred)
|
|
|
|
with _get_conn() as c1: # 1
|
|
self.assertEqual(len(ldappool_cm), 1)
|
|
self.assertTrue(c1.connected, True)
|
|
self.assertTrue(c1.active, True)
|
|
with _get_conn() as c2: # conn2
|
|
self.assertEqual(len(ldappool_cm), 2)
|
|
self.assertTrue(c2.connected)
|
|
self.assertTrue(c2.active)
|
|
|
|
self.assertEqual(len(ldappool_cm), 2)
|
|
# c2 went out of context, its connected but not active
|
|
self.assertTrue(c2.connected)
|
|
self.assertFalse(c2.active)
|
|
with _get_conn() as c3: # conn3
|
|
self.assertEqual(len(ldappool_cm), 2)
|
|
self.assertTrue(c3.connected)
|
|
self.assertTrue(c3.active)
|
|
self.assertTrue(c3 is c2) # same connection is reused
|
|
self.assertTrue(c2.active)
|
|
with _get_conn() as c4: # conn4
|
|
self.assertEqual(len(ldappool_cm), 3)
|
|
self.assertTrue(c4.connected)
|
|
self.assertTrue(c4.active)
|
|
|
|
def test_password_change_with_auth_pool_disabled(self):
|
|
self.config_fixture.config(group='ldap', use_auth_pool=False)
|
|
old_password = self.user_sna['password']
|
|
|
|
self.test_password_change_with_pool()
|
|
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
context={},
|
|
user_id=self.user_sna['id'],
|
|
password=old_password)
|
|
|
|
def _create_user_and_authenticate(self, password):
|
|
user_dict = {
|
|
'domain_id': CONF.identity.default_domain_id,
|
|
'name': uuid.uuid4().hex,
|
|
'password': password}
|
|
user = self.identity_api.create_user(user_dict)
|
|
|
|
self.identity_api.authenticate(
|
|
context={},
|
|
user_id=user['id'],
|
|
password=password)
|
|
|
|
return self.identity_api.get_user(user['id'])
|
|
|
|
def _get_auth_conn_pool_cm(self):
|
|
pool_url = ldap_core.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url
|
|
return self.conn_pools[pool_url]
|
|
|
|
def _do_password_change_for_one_user(self, password, new_password):
|
|
self.config_fixture.config(group='ldap', use_auth_pool=True)
|
|
self.cleanup_pools()
|
|
self.load_backends()
|
|
|
|
user1 = self._create_user_and_authenticate(password)
|
|
auth_cm = self._get_auth_conn_pool_cm()
|
|
self.assertEqual(len(auth_cm), 1)
|
|
user2 = self._create_user_and_authenticate(password)
|
|
self.assertEqual(len(auth_cm), 1)
|
|
user3 = self._create_user_and_authenticate(password)
|
|
self.assertEqual(len(auth_cm), 1)
|
|
user4 = self._create_user_and_authenticate(password)
|
|
self.assertEqual(len(auth_cm), 1)
|
|
user5 = self._create_user_and_authenticate(password)
|
|
self.assertEqual(len(auth_cm), 1)
|
|
|
|
# connection pool size remains 1 even for different user ldap bind
|
|
# as there is only one active connection at a time
|
|
|
|
user_api = ldap.UserApi(CONF)
|
|
u1_dn = user_api._id_to_dn_string(user1['id'])
|
|
u2_dn = user_api._id_to_dn_string(user2['id'])
|
|
u3_dn = user_api._id_to_dn_string(user3['id'])
|
|
u4_dn = user_api._id_to_dn_string(user4['id'])
|
|
u5_dn = user_api._id_to_dn_string(user5['id'])
|
|
|
|
# now create multiple active connections for end user auth case which
|
|
# will force to keep them in pool. After that, modify one of user
|
|
# password. Need to make sure that user connection is in middle
|
|
# of pool list.
|
|
auth_cm = self._get_auth_conn_pool_cm()
|
|
with auth_cm.connection(u1_dn, password) as _:
|
|
with auth_cm.connection(u2_dn, password) as _:
|
|
with auth_cm.connection(u3_dn, password) as _:
|
|
with auth_cm.connection(u4_dn, password) as _:
|
|
with auth_cm.connection(u5_dn, password) as _:
|
|
self.assertEqual(len(auth_cm), 5)
|
|
_.unbind_s()
|
|
|
|
user3['password'] = new_password
|
|
self.identity_api.update_user(user3['id'], user3)
|
|
|
|
return user3
|
|
|
|
def test_password_change_with_auth_pool_enabled_long_lifetime(self):
|
|
self.config_fixture.config(group='ldap',
|
|
auth_pool_connection_lifetime=600)
|
|
old_password = 'my_password'
|
|
new_password = 'new_password'
|
|
user = self._do_password_change_for_one_user(old_password,
|
|
new_password)
|
|
user.pop('password')
|
|
|
|
# with long connection lifetime auth_pool can bind to old password
|
|
# successfully which is not desired if password change is frequent
|
|
# use case in a deployment.
|
|
# This can happen in multiple concurrent connections case only.
|
|
user_ref = self.identity_api.authenticate(
|
|
context={}, user_id=user['id'], password=old_password)
|
|
|
|
self.assertDictEqual(user_ref, user)
|
|
|
|
def test_password_change_with_auth_pool_enabled_no_lifetime(self):
|
|
self.config_fixture.config(group='ldap',
|
|
auth_pool_connection_lifetime=0)
|
|
|
|
old_password = 'my_password'
|
|
new_password = 'new_password'
|
|
user = self._do_password_change_for_one_user(old_password,
|
|
new_password)
|
|
# now as connection lifetime is zero, so authentication
|
|
# with old password will always fail.
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
context={}, user_id=user['id'],
|
|
password=old_password)
|