ldappool/ldappool/tests/test_ldappool.py

268 lines
7.8 KiB
Python

# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2010
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import threading
import time
import unittest
import ldap
import ldappool
# patching StateConnector
ldappool.StateConnector.users = {
'uid=tarek,ou=users,dc=mozilla':
{'uidNumber': ['1'],
'account-enabled': ['Yes'],
'mail': ['tarek@mozilla.com'],
'cn': ['tarek']},
'cn=admin,dc=mozilla': {'cn': ['admin'],
'mail': ['admin'],
'uidNumber': ['100']}}
def _simple_bind(self, who='', cred='', *args):
self.connected = True
self.who = who
self.cred = cred
ldappool.StateConnector.simple_bind_s = _simple_bind
def _search(self, dn, *args, **kw):
if dn in self.users:
return [(dn, self.users[dn])]
elif dn == 'ou=users,dc=mozilla':
uid = kw['filterstr'].split('=')[-1][:-1]
for dn_, value in self.users.items():
if value['uidNumber'][0] != uid:
continue
return [(dn_, value)]
raise ldap.NO_SUCH_OBJECT
ldappool.StateConnector.search_s = _search
def _add(self, dn, user):
self.users[dn] = {}
for key, value in user:
if not isinstance(value, list):
value = [value]
self.users[dn][key] = value
return ldap.RES_ADD, ''
ldappool.StateConnector.add_s = _add
def _modify(self, dn, user):
if dn in self.users:
for type_, key, value in user:
if not isinstance(value, list):
value = [value]
self.users[dn][key] = value
return ldap.RES_MODIFY, ''
ldappool.StateConnector.modify_s = _modify
def _delete(self, dn):
if dn in self.users:
del self.users[dn]
return ldap.RES_DELETE, ''
ldappool.StateConnector.delete_s = _delete
class LDAPWorker(threading.Thread):
def __init__(self, pool):
threading.Thread.__init__(self)
self.pool = pool
self.results = []
def run(self):
dn = 'cn=admin,dc=mozilla'
for i in range(10):
with self.pool.connection() as conn:
res = conn.search_s(dn, ldap.SCOPE_BASE,
attrlist=['cn'])
self.results.append(res)
class TestLDAPSQLAuth(unittest.TestCase):
def test_ctor_args(self):
pool = ldappool.ConnectionManager('ldap://localhost', use_tls=True)
self.assertEqual(pool.use_tls, True)
def test_pool(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ldappool.ConnectionManager('ldap://localhost', dn, passwd)
workers = [LDAPWorker(pool) for i in range(10)]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
self.assertEqual(len(worker.results), 10)
cn = worker.results[0][0][1]['cn']
self.assertEqual(cn, ['admin'])
@unittest.skip("race conditions make this fail the gate")
def test_pool_full(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ldappool.ConnectionManager(
'ldap://localhost', dn, passwd, size=1, retry_delay=1.,
retry_max=5, use_pool=True)
class Worker(threading.Thread):
def __init__(self, pool, duration):
threading.Thread.__init__(self)
self.pool = pool
self.duration = duration
def run(self):
with self.pool.connection() as conn: # NOQA
time.sleep(self.duration)
def tryit():
with pool.connection() as conn: # NOQA
pass
# an attempt on a full pool should eventually work
# because the connector is reused
for i in range(10):
tryit()
# we have 1 non-active connector now
self.assertEqual(len(pool), 1)
# an attempt with a full pool should succeed if a
# slot gets freed in less than one second.
worker1 = Worker(pool, .4)
worker1.start()
try:
tryit()
finally:
worker1.join()
# an attempt with a full pool should fail
# if no slot gets freed in less than one second.
worker1 = Worker(pool, 1.1)
worker1.start()
try:
self.assertRaises(ldappool.MaxConnectionReachedError, tryit)
finally:
worker1.join()
# we still have one active connector
self.assertEqual(len(pool), 1)
def test_pool_cleanup(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ldappool.ConnectionManager('ldap://localhost', dn, passwd,
size=1, use_pool=True)
with pool.connection('bind1') as conn: # NOQA
pass
with pool.connection('bind2') as conn: # NOQA
pass
# the second call should have removed the first conn
self.assertEqual(len(pool), 1)
def test_pool_reuse(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ldappool.ConnectionManager('ldap://localhost', dn, passwd,
use_pool=True)
with pool.connection() as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection() as conn2:
pass
self.assertTrue(conn is conn2)
with pool.connection() as conn:
conn.connected = False
with pool.connection() as conn2:
pass
self.assertTrue(conn is not conn2)
# same bind and password: reuse
with pool.connection('bind', 'passwd') as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection('bind', 'passwd') as conn2:
pass
self.assertTrue(conn is conn2)
# same bind different password: rebind !
with pool.connection('bind', 'passwd') as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection('bind', 'passwd2') as conn2:
pass
self.assertTrue(conn is conn2)