617 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			617 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | |
| import unittest, slapd
 | |
| import _ldap
 | |
| import logging
 | |
| 
 | |
| reusable_server = None
 | |
| def get_reusable_server():
 | |
|     global reusable_server
 | |
|     if reusable_server is None:
 | |
|         reusable_server = slapd.Slapd()
 | |
|     return reusable_server
 | |
| 
 | |
| class TestLdapCExtension(unittest.TestCase):
 | |
|     """Tests the LDAP C Extension module, _ldap.
 | |
|        These tests apply only to the _ldap module and bypass the
 | |
|        LDAPObject wrapper completely."""
 | |
| 
 | |
|     timeout = 3
 | |
| 
 | |
|     def _init_server(self, reuse_existing=True):
 | |
|         global reusable_server
 | |
|         """Sets self.server to a test LDAP server and self.base
 | |
|            to its base"""
 | |
|         if reuse_existing:
 | |
|             server = get_reusable_server()
 | |
|         else:
 | |
|             server = slapd.Slapd() # private server
 | |
|         #server.set_debug()  # enables verbose messages
 | |
|         server.start()   # no effect if already started
 | |
|         self.server = server
 | |
|         self.base = server.get_dn_suffix()
 | |
|         return server
 | |
| 
 | |
|     def _init(self, reuse_existing=True, bind=True):
 | |
|         """Starts a server, and returns a LDAPObject bound to it"""
 | |
|         server = self._init_server(reuse_existing)
 | |
|         l = _ldap.initialize(server.get_url())
 | |
|         if bind:
 | |
|             # Perform a simple bind
 | |
|             l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3)
 | |
|             m = l.simple_bind(server.get_root_dn(), server.get_root_password())
 | |
|             result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ONE, self.timeout)
 | |
|             self.assertTrue(result, _ldap.RES_BIND)
 | |
|         return l
 | |
| 
 | |
|     def assertNotNone(self, expr, msg=None):
 | |
|         self.failIf(expr is None, msg or repr(expr))
 | |
|     def assertNone(self, expr, msg=None):
 | |
|         self.failIf(expr is not None, msg or repr(expr))
 | |
| 
 | |
|     # Test for the existence of a whole bunch of constants
 | |
|     # that the C module is supposed to export
 | |
|     def test_constants(self):
 | |
|         self.assertEquals(_ldap.PORT, 389)
 | |
|         self.assertEquals(_ldap.VERSION1, 1)
 | |
|         self.assertEquals(_ldap.VERSION2, 2)
 | |
|         self.assertEquals(_ldap.VERSION3, 3)
 | |
| 
 | |
|         # constants for result3()
 | |
|         self.assertEquals(_ldap.RES_BIND, 0x61)
 | |
|         self.assertEquals(_ldap.RES_SEARCH_ENTRY, 0x64)
 | |
|         self.assertEquals(_ldap.RES_SEARCH_RESULT, 0x65)
 | |
|         self.assertEquals(_ldap.RES_MODIFY, 0x67)
 | |
|         self.assertEquals(_ldap.RES_ADD, 0x69)
 | |
|         self.assertEquals(_ldap.RES_DELETE, 0x6b)
 | |
|         self.assertEquals(_ldap.RES_MODRDN, 0x6d)
 | |
|         self.assertEquals(_ldap.RES_COMPARE, 0x6f)
 | |
|         self.assertEquals(_ldap.RES_SEARCH_REFERENCE, 0x73) # v3
 | |
|         self.assertEquals(_ldap.RES_EXTENDED, 0x78)         # v3
 | |
|         #self.assertEquals(_ldap.RES_INTERMEDIATE, 0x79)     # v3
 | |
|         self.assertNotNone(_ldap.RES_ANY)
 | |
|         self.assertNotNone(_ldap.RES_UNSOLICITED)
 | |
| 
 | |
|         self.assertNotNone(_ldap.AUTH_NONE)
 | |
|         self.assertNotNone(_ldap.AUTH_SIMPLE)
 | |
| 
 | |
|         self.assertNotNone(_ldap.SCOPE_BASE)
 | |
|         self.assertNotNone(_ldap.SCOPE_ONELEVEL)
 | |
|         self.assertNotNone(_ldap.SCOPE_SUBTREE)
 | |
| 
 | |
|         self.assertNotNone(_ldap.MOD_ADD)
 | |
|         self.assertNotNone(_ldap.MOD_DELETE)
 | |
|         self.assertNotNone(_ldap.MOD_REPLACE)
 | |
|         self.assertNotNone(_ldap.MOD_INCREMENT)
 | |
|         self.assertNotNone(_ldap.MOD_BVALUES)
 | |
| 
 | |
|         # for result3()
 | |
|         self.assertNotNone(_ldap.MSG_ONE)
 | |
|         self.assertNotNone(_ldap.MSG_ALL)
 | |
|         self.assertNotNone(_ldap.MSG_RECEIVED)
 | |
| 
 | |
|         # for OPT_DEFEF
 | |
|         self.assertNotNone(_ldap.DEREF_NEVER)
 | |
|         self.assertNotNone(_ldap.DEREF_SEARCHING)
 | |
|         self.assertNotNone(_ldap.DEREF_FINDING)
 | |
|         self.assertNotNone(_ldap.DEREF_ALWAYS)
 | |
| 
 | |
|         # for OPT_SIZELIMIT, OPT_TIMELIMIT
 | |
|         self.assertNotNone(_ldap.NO_LIMIT)
 | |
| 
 | |
|         # standard options
 | |
|         self.assertNotNone(_ldap.OPT_API_INFO)
 | |
|         self.assertNotNone(_ldap.OPT_DEREF)
 | |
|         self.assertNotNone(_ldap.OPT_SIZELIMIT)
 | |
|         self.assertNotNone(_ldap.OPT_TIMELIMIT)
 | |
|         self.assertNotNone(_ldap.OPT_REFERRALS)
 | |
|         self.assertNotNone(_ldap.OPT_RESTART)
 | |
|         self.assertNotNone(_ldap.OPT_PROTOCOL_VERSION)
 | |
|         self.assertNotNone(_ldap.OPT_SERVER_CONTROLS)
 | |
|         self.assertNotNone(_ldap.OPT_CLIENT_CONTROLS)
 | |
|         self.assertNotNone(_ldap.OPT_API_FEATURE_INFO)
 | |
|         self.assertNotNone(_ldap.OPT_HOST_NAME)
 | |
|         self.assertNotNone(_ldap.OPT_ERROR_NUMBER)   # = OPT_RESULT_CODE
 | |
|         self.assertNotNone(_ldap.OPT_ERROR_STRING)   # = OPT_DIAGNOSITIC_MESSAGE
 | |
|         self.assertNotNone(_ldap.OPT_MATCHED_DN)
 | |
| 
 | |
|         # OpenLDAP specific
 | |
|         self.assertNotNone(_ldap.OPT_DEBUG_LEVEL)
 | |
|         self.assertNotNone(_ldap.OPT_TIMEOUT)
 | |
|         self.assertNotNone(_ldap.OPT_REFHOPLIMIT)
 | |
|         self.assertNotNone(_ldap.OPT_NETWORK_TIMEOUT)
 | |
|         self.assertNotNone(_ldap.OPT_URI)
 | |
|         #self.assertNotNone(_ldap.OPT_REFERRAL_URLS)
 | |
|         #self.assertNotNone(_ldap.OPT_SOCKBUF)
 | |
|         #self.assertNotNone(_ldap.OPT_DEFBASE)
 | |
|         #self.assertNotNone(_ldap.OPT_CONNECT_ASYNC)
 | |
| 
 | |
|         # str2dn()
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_LDAP)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_LDAPV3)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_LDAPV2)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_DCE)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_UFN)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_AD_CANONICAL)
 | |
|         self.assertNotNone(_ldap.DN_FORMAT_MASK)
 | |
|         self.assertNotNone(_ldap.DN_PRETTY)
 | |
|         self.assertNotNone(_ldap.DN_SKIP)
 | |
|         self.assertNotNone(_ldap.DN_P_NOLEADTRAILSPACES)
 | |
|         self.assertNotNone(_ldap.DN_P_NOSPACEAFTERRDN)
 | |
|         self.assertNotNone(_ldap.DN_PEDANTIC)
 | |
|         self.assertNotNone(_ldap.AVA_NULL)
 | |
|         self.assertNotNone(_ldap.AVA_STRING)
 | |
|         self.assertNotNone(_ldap.AVA_BINARY)
 | |
|         self.assertNotNone(_ldap.AVA_NONPRINTABLE)
 | |
| 
 | |
|         # these two constants are pointless? XXX
 | |
|         self.assertEquals(_ldap.LDAP_OPT_ON, 1) 
 | |
|         self.assertEquals(_ldap.LDAP_OPT_OFF, 0)
 | |
| 
 | |
|         # these constants useless after ldap_url_parse() was dropped XXX
 | |
|         self.assertNotNone(_ldap.URL_ERR_BADSCOPE)
 | |
|         self.assertNotNone(_ldap.URL_ERR_MEM)
 | |
| 
 | |
|     def test_simple_bind(self):
 | |
|         l = self._init()
 | |
| 
 | |
|     def test_simple_anonymous_bind(self):
 | |
|         l = self._init(bind=False)
 | |
|         m = l.simple_bind("", "")
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertTrue(result, _ldap.RES_BIND)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # see if we can get the rootdse while we're here
 | |
|         m = l.search_ext("", _ldap.SCOPE_BASE, '(objectClass=*)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(pmsg[0][0], "") # rootDSE has no dn
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertTrue(pmsg[0][1].has_key('objectClass'))
 | |
| 
 | |
|     def test_unbind(self):
 | |
|         l = self._init()
 | |
|         m = l.unbind_ext()
 | |
|         self.assertNone(m)
 | |
| 
 | |
|         # Second attempt to unbind should yield an exception
 | |
|         try: l.unbind_ext()
 | |
|         except _ldap.error: pass
 | |
| 
 | |
|     def test_search_ext_individual(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, 
 | |
|                 '(objectClass=dcObject)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ONE, self.timeout)
 | |
| 
 | |
|         # Expect to get just one object
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_ENTRY)
 | |
|         self.assertEquals(len(pmsg), 1)
 | |
|         self.assertEquals(len(pmsg[0]), 2)
 | |
|         self.assertEquals(pmsg[0][0], self.base)
 | |
|         self.assertEquals(pmsg[0][0], self.base)
 | |
|         self.assertTrue('dcObject' in pmsg[0][1]['objectClass'])
 | |
|         self.assertTrue('organization' in pmsg[0][1]['objectClass'])
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ONE, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|     def test_abandon(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)')
 | |
| 
 | |
|         ret = l.abandon_ext(m)
 | |
|         self.assertNone(ret)
 | |
| 
 | |
|         got_timeout = False
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, 0.3)  # (timeout /could/ be longer)
 | |
|         except _ldap.TIMEOUT, e:
 | |
|             got_timeout = True
 | |
|         self.assertTrue(got_timeout)
 | |
| 
 | |
|     def test_search_ext_all(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
| 
 | |
|         # Expect to get some objects
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertTrue(len(pmsg) >= 2)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|     def test_add(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         m = l.add_ext("cn=Foo," + self.base, [
 | |
|                ('objectClass','organizationalRole'), 
 | |
|                ('cn', 'Foo'),
 | |
|                ('description', 'testing'),
 | |
|             ])
 | |
| 
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # search for it back
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=Foo)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
| 
 | |
|         # Expect to get the objects
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(len(pmsg), 1)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         self.assertEquals(pmsg[0], ('cn=Foo,'+self.base,
 | |
|                 { 'objectClass': ['organizationalRole'],
 | |
|                   'cn': ['Foo'],
 | |
|                   'description': ['testing'] }))
 | |
| 
 | |
|     def test_compare(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         # first, add an object with a field we can compare on
 | |
|         dn = "cn=CompareTest," + self.base
 | |
|         m = l.add_ext(dn, [
 | |
|                ('objectClass','person'), 
 | |
|                ('sn', 'CompareTest'),
 | |
|                ('cn', 'CompareTest'),
 | |
|                ('userPassword', 'the_password'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         # try a false compare
 | |
|         m = l.compare_ext(dn, "userPassword", "bad_string")
 | |
|         compared_false = False
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail(repr(r))
 | |
|         except _ldap.COMPARE_FALSE:
 | |
|             compared_false = True
 | |
|         self.assertTrue(compared_false)
 | |
| 
 | |
|         # try a true compare
 | |
|         m = l.compare_ext(dn, "userPassword", "the_password")
 | |
|         compared_true = False
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail(repr(r))
 | |
|         except _ldap.COMPARE_TRUE:
 | |
|             compared_true = True
 | |
|         self.assertTrue(compared_true)
 | |
| 
 | |
|         m = l.compare_ext(dn, "badAttribute", "ignoreme")
 | |
|         raised_error = False
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail(repr(r))
 | |
|         except _ldap.error:
 | |
|             raised_error = True
 | |
|         self.assertTrue(raised_error)
 | |
| 
 | |
|     def test_delete_no_such_object(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         # try deleting an object that doesn't exist
 | |
|         not_found = False
 | |
|         m = l.delete_ext("cn=DoesNotExist,"+self.base)
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail(r)
 | |
|         except _ldap.NO_SUCH_OBJECT:
 | |
|             not_found = True
 | |
|         self.assertTrue(not_found)
 | |
| 
 | |
|     def test_delete(self):
 | |
|         l = self._init()
 | |
|         # first, add an object we will delete
 | |
|         dn = "cn=Deleteme,"+self.base
 | |
|         m = l.add_ext(dn, [
 | |
|                ('objectClass','organizationalRole'), 
 | |
|                ('cn', 'Deleteme'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         m = l.delete_ext(dn)
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_DELETE)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|     def test_modify_no_such_object(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         # try deleting an object that doesn't exist
 | |
|         not_found = False
 | |
|         m = l.modify_ext("cn=DoesNotExist,"+self.base, [
 | |
|                 (_ldap.MOD_ADD, 'description', ['blah']),
 | |
|             ])
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail(r)
 | |
|         except _ldap.NO_SUCH_OBJECT:
 | |
|             not_found = True
 | |
|         self.assertTrue(not_found)
 | |
| 
 | |
|     def DISABLED_test_modify_no_such_object_empty_attrs(self):
 | |
|         # XXX ldif-backend for slapd appears broken???
 | |
| 
 | |
|         l = self._init()
 | |
| 
 | |
|         # try deleting an object that doesn't exist
 | |
|         m = l.modify_ext("cn=DoesNotExist,"+self.base, [
 | |
|                 (_ldap.MOD_ADD, 'description', []),
 | |
|             ])
 | |
|         self.assertTrue(isinstance(m, int))
 | |
|         r = l.result3(m, _ldap.MSG_ALL, self.timeout) # what should happen??
 | |
|         self.fail(r)
 | |
| 
 | |
|     def test_modify(self):
 | |
|         l = self._init()
 | |
|         # first, add an object we will delete
 | |
|         dn = "cn=AddToMe,"+self.base
 | |
|         m = l.add_ext(dn, [
 | |
|                ('objectClass','person'), 
 | |
|                ('cn', 'AddToMe'),
 | |
|                ('sn', 'Modify'),
 | |
|                ('description', 'a description'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         m = l.modify_ext(dn, [
 | |
|                 (_ldap.MOD_ADD, 'description', ['b desc', 'c desc']),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_MODIFY)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # search for it back
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=AddToMe)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
| 
 | |
|         # Expect to get the objects
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(len(pmsg), 1)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         self.assertEquals(pmsg[0][0], dn)
 | |
|         d = list(pmsg[0][1]['description'])
 | |
|         d.sort()
 | |
|         self.assertEquals(d, ['a description', 'b desc', 'c desc'])
 | |
| 
 | |
|     def test_rename(self):
 | |
|         l = self._init()
 | |
|         dn = "cn=RenameMe,"+self.base
 | |
|         m = l.add_ext(dn, [
 | |
|                ('objectClass','organizationalRole'), 
 | |
|                ('cn', 'RenameMe'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         # do the rename with same parent
 | |
|         m = l.rename(dn, "cn=IAmRenamed")
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_MODRDN)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # make sure the old one is gone
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=RenameMe)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(len(pmsg), 0) # expect no results
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # check that the new one looks right
 | |
|         dn2 = "cn=IAmRenamed,"+self.base
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
|         self.assertEquals(len(pmsg), 1)
 | |
|         self.assertEquals(pmsg[0][0], dn2)
 | |
|         self.assertEquals(pmsg[0][1]['cn'], ['IAmRenamed'])
 | |
| 
 | |
|         # create the container
 | |
|         containerDn = "ou=RenameContainer,"+self.base
 | |
|         m = l.add_ext(containerDn, [
 | |
|                ('objectClass','organizationalUnit'), 
 | |
|                ('ou', 'RenameContainer'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         # WORKAROUND bug in slapd. (Without an existing child, 
 | |
|         # renames into a container object do not work for the ldif backend,
 | |
|         # the renamed object appears to be deleted, not moved.)
 | |
|         # see http://www.openldap.org/its/index.cgi/Software%20Bugs?id=5408
 | |
|         m = l.add_ext("cn=Bogus," + containerDn, [
 | |
|                ('objectClass','organizationalRole'), 
 | |
|                ('cn', 'Bogus'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         # now rename from dn2 to the conater
 | |
|         dn3 = "cn=IAmRenamedAgain," + containerDn
 | |
| 
 | |
|         # Now try renaming dn2 across container (simultaneous name change)
 | |
|         m = l.rename(dn2, "cn=IAmRenamedAgain", containerDn)
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_MODRDN)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         # make sure dn2 is gone
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(len(pmsg), 0) # expect no results
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
| 
 | |
|         # make sure dn3 is there
 | |
|         m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamedAgain)')
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_SEARCH_RESULT)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(ctrls, [])
 | |
|         self.assertEquals(len(pmsg), 1)
 | |
|         self.assertEquals(pmsg[0][0], dn3)
 | |
|         self.assertEquals(pmsg[0][1]['cn'], ['IAmRenamedAgain'])
 | |
| 
 | |
| 
 | |
|     def test_whoami(self):
 | |
|         l = self._init()
 | |
|         r = l.whoami_s()
 | |
|         self.assertEquals("dn:" + self.server.get_root_dn(), r)
 | |
| 
 | |
|     def test_whoami_unbound(self):
 | |
|         l = self._init(bind=False)
 | |
|         l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3)
 | |
|         r = l.whoami_s()
 | |
|         self.assertEquals("", r)
 | |
| 
 | |
|     def test_whoami_anonymous(self):
 | |
|         l = self._init(bind=False)
 | |
|         l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3)
 | |
| 
 | |
|         # Anonymous bind
 | |
|         m = l.simple_bind("", "")
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertTrue(result, _ldap.RES_BIND)
 | |
| 
 | |
|         r = l.whoami_s()
 | |
|         self.assertEquals("", r)
 | |
| 
 | |
|     def test_passwd(self):
 | |
|         l = self._init()
 | |
| 
 | |
|         # first, create a user to change password on
 | |
|         dn = "cn=PasswordTest," + self.base
 | |
|         m = l.add_ext(dn, [
 | |
|                ('objectClass','person'), 
 | |
|                ('sn', 'PasswordTest'),
 | |
|                ('cn', 'PasswordTest'),
 | |
|                ('userPassword', 'initial'),
 | |
|             ])
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(result, _ldap.RES_ADD)
 | |
| 
 | |
|         # try changing password with a wrong old-pw
 | |
|         m = l.passwd(dn, "bogus", "ignored")
 | |
|         try:
 | |
|             r = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|             self.fail("expected UNWILLING_TO_PERFORM")
 | |
|         except _ldap.UNWILLING_TO_PERFORM:
 | |
|             pass
 | |
| 
 | |
|         # try changing password with a correct old-pw
 | |
|         m = l.passwd(dn, "initial", "changed")
 | |
|         result,pmsg,msgid,ctrls = l.result3(m, _ldap.MSG_ALL, self.timeout)
 | |
|         self.assertEquals(msgid, m)
 | |
|         self.assertEquals(pmsg, [])
 | |
|         self.assertEquals(result, _ldap.RES_EXTENDED)
 | |
|         self.assertEquals(ctrls, [])
 | |
| 
 | |
|     def test_options(self):
 | |
|         oldval = _ldap.get_option(_ldap.OPT_PROTOCOL_VERSION)
 | |
|         try:
 | |
| 
 | |
|             try:
 | |
|                 _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, "3")
 | |
|                 self.fail("expected string value to raise a type error")
 | |
|             except TypeError: pass
 | |
| 
 | |
|             _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION2)
 | |
|             v = _ldap.get_option(_ldap.OPT_PROTOCOL_VERSION)
 | |
|             self.assertEquals(v, _ldap.VERSION2)
 | |
|             _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3)
 | |
|             v = _ldap.get_option(_ldap.OPT_PROTOCOL_VERSION)
 | |
|             self.assertEquals(v, _ldap.VERSION3)
 | |
|         finally:
 | |
|             _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, oldval)
 | |
| 
 | |
|         l = self._init()
 | |
| 
 | |
|         # Try changing some basic options and checking that they took effect
 | |
| 
 | |
|         l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION2)
 | |
|         v = l.get_option(_ldap.OPT_PROTOCOL_VERSION)
 | |
|         self.assertEquals(v, _ldap.VERSION2)
 | |
| 
 | |
|         l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3)
 | |
|         v = l.get_option(_ldap.OPT_PROTOCOL_VERSION)
 | |
|         self.assertEquals(v, _ldap.VERSION3)
 | |
| 
 | |
|         # Try setting options that will yield a known error.
 | |
|         try:
 | |
|             _ldap.get_option(_ldap.OPT_MATCHED_DN)
 | |
|             self.fail("expected ValueError")
 | |
|         except ValueError:
 | |
|             pass
 | |
| 
 | |
|     def _require_attr(self, obj, attrname):
 | |
|         """Returns true if the attribute exists on the object.
 | |
|            This is to allow some tests to be optional, because
 | |
|            _ldap is compiled with different properties depending
 | |
|            on the underlying C library.
 | |
|            This could me made to thrown an exception if you want the
 | |
|            tests to be strict."""
 | |
|         if hasattr(obj, attrname):
 | |
|             return True
 | |
|         #self.fail("required attribute '%s' missing" % attrname)
 | |
|         return False
 | |
| 
 | |
|     def test_sasl(self):
 | |
|         l = self._init()
 | |
|         if not self._require_attr(l, 'sasl_interactive_bind_s'): # HAVE_SASL
 | |
|             return
 | |
|         # TODO
 | |
| 
 | |
|     def test_tls(self):
 | |
|         l = self._init()
 | |
|         if not self._require_attr(l, 'start_tls_s'):    # HAVE_TLS
 | |
|             return
 | |
|         # TODO
 | |
| 
 | |
|     def test_cancel(self):
 | |
|         l = self._init()
 | |
|         if not self._require_attr(l, 'cancel'):         # FEATURE_CANCEL
 | |
|             return
 | |
| 
 | |
|     def test_str2dn(self):
 | |
|         pass
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 | 
