From 30622a4a3e59fa7c5fba6bd4f5939c0a91550ad5 Mon Sep 17 00:00:00 2001 From: stroeder Date: Tue, 13 Aug 2013 12:33:08 +0000 Subject: [PATCH] updated reference to RFC 4516 in comment --- Lib/ldapurl.py | 433 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 433 insertions(+) create mode 100644 Lib/ldapurl.py diff --git a/Lib/ldapurl.py b/Lib/ldapurl.py new file mode 100644 index 0000000..af501a4 --- /dev/null +++ b/Lib/ldapurl.py @@ -0,0 +1,433 @@ +""" +ldapurl - handling of LDAP URLs as described in RFC 4516 + +See http://www.python-ldap.org/ for details. + +\$Id: ldapurl.py,v 1.63 2013/08/13 12:33:08 stroeder Exp $ + +Python compability note: +This module only works with Python 2.0+ since +1. string methods are used instead of module string and +2. list comprehensions are used. +""" + +__version__ = '2.4.14' + +__all__ = [ + # constants + 'SEARCH_SCOPE','SEARCH_SCOPE_STR', + 'LDAP_SCOPE_BASE','LDAP_SCOPE_ONELEVEL','LDAP_SCOPE_SUBTREE', + # functions + 'isLDAPUrl', + # classes + 'LDAPUrlExtension','LDAPUrlExtensions','LDAPUrl' +] + +import UserDict + +from urllib import quote,unquote + +LDAP_SCOPE_BASE = 0 +LDAP_SCOPE_ONELEVEL = 1 +LDAP_SCOPE_SUBTREE = 2 + +SEARCH_SCOPE_STR = {None:'',0:'base',1:'one',2:'sub'} + +SEARCH_SCOPE = { + '':None, + # the search scope strings defined in RFC 4516 + 'base':LDAP_SCOPE_BASE, + 'one':LDAP_SCOPE_ONELEVEL, + 'sub':LDAP_SCOPE_SUBTREE, +} + +# Some widely used types +StringType = type('') +TupleType=type(()) + + +def isLDAPUrl(s): + """ + Returns 1 if s is a LDAP URL, 0 else + """ + s_lower = s.lower() + return \ + s_lower.startswith('ldap://') or \ + s_lower.startswith('ldaps://') or \ + s_lower.startswith('ldapi://') + + +def ldapUrlEscape(s): + """Returns URL encoding of string s""" + return quote(s).replace(',','%2C').replace('/','%2F') + + +class LDAPUrlExtension: + """ + Class for parsing and unparsing LDAP URL extensions + as described in RFC 4516. + + Usable class attributes: + critical + Boolean integer marking the extension as critical + extype + Type of extension + exvalue + Value of extension + """ + + def __init__(self,extensionStr=None,critical=0,extype=None,exvalue=None): + self.critical = critical + self.extype = extype + self.exvalue = exvalue + if extensionStr: + self._parse(extensionStr) + + def _parse(self,extension): + extension = extension.strip() + if not extension: + # Don't parse empty strings + self.extype,self.exvalue = None,None + return + self.critical = extension[0]=='!' + if extension[0]=='!': + extension = extension[1:].strip() + try: + self.extype,self.exvalue = extension.split('=',1) + except ValueError: + # No value, just the extype + self.extype,self.exvalue = extension,None + else: + self.exvalue = unquote(self.exvalue.strip()) + self.extype = self.extype.strip() + + def unparse(self): + if self.exvalue is None: + return '%s%s' % ('!'*(self.critical>0),self.extype) + else: + return '%s%s=%s' % ( + '!'*(self.critical>0), + self.extype,quote(self.exvalue or '') + ) + + def __str__(self): + return self.unparse() + + def __repr__(self): + return '<%s.%s instance at %s: %s>' % ( + self.__class__.__module__, + self.__class__.__name__, + hex(id(self)), + self.__dict__ + ) + + def __eq__(self,other): + return \ + (self.critical==other.critical) and \ + (self.extype==other.extype) and \ + (self.exvalue==other.exvalue) + + def __ne__(self,other): + return not self.__eq__(other) + + +class LDAPUrlExtensions(UserDict.UserDict): + """ + Models a collection of LDAP URL extensions as + dictionary type + """ + + def __init__(self,default=None): + UserDict.UserDict.__init__(self) + for k,v in (default or {}).items(): + self[k]=v + + def __setitem__(self,name,value): + """ + value + Either LDAPUrlExtension instance, (critical,exvalue) + or string'ed exvalue + """ + assert isinstance(value,LDAPUrlExtension) + assert name==value.extype + self.data[name] = value + + def values(self): + return [ + self[k] + for k in self.keys() + ] + + def __str__(self): + return ','.join(map(str,self.values())) + + def __repr__(self): + return '<%s.%s instance at %s: %s>' % ( + self.__class__.__module__, + self.__class__.__name__, + hex(id(self)), + self.data + ) + + def __eq__(self,other): + assert isinstance(other,self.__class__),TypeError( + "other has to be instance of %s" % (self.__class__) + ) + return self.data==other.data + + def parse(self,extListStr): + for extension_str in extListStr.strip().split(','): + if extension_str: + e = LDAPUrlExtension(extension_str) + self[e.extype] = e + + def unparse(self): + return ','.join([ v.unparse() for v in self.values() ]) + + +class LDAPUrl: + """ + Class for parsing and unparsing LDAP URLs + as described in RFC 4516. + + Usable class attributes: + urlscheme + URL scheme (either ldap, ldaps or ldapi) + hostport + LDAP host (default '') + dn + String holding distinguished name (default '') + attrs + list of attribute types (default None) + scope + integer search scope for ldap-module + filterstr + String representation of LDAP Search Filters + (see RFC 4515) + extensions + Dictionary used as extensions store + who + Maps automagically to bindname LDAP URL extension + cred + Maps automagically to X-BINDPW LDAP URL extension + """ + + attr2extype = {'who':'bindname','cred':'X-BINDPW'} + + def __init__( + self, + ldapUrl=None, + urlscheme='ldap', + hostport='',dn='',attrs=None,scope=None,filterstr=None, + extensions=None, + who=None,cred=None + ): + self.urlscheme=urlscheme + self.hostport=hostport + self.dn=dn + self.attrs=attrs + self.scope=scope + self.filterstr=filterstr + self.extensions=(extensions or LDAPUrlExtensions({})) + if ldapUrl!=None: + self._parse(ldapUrl) + if who!=None: + self.who = who + if cred!=None: + self.cred = cred + + def __eq__(self,other): + return \ + self.urlscheme==other.urlscheme and \ + self.hostport==other.hostport and \ + self.dn==other.dn and \ + self.attrs==other.attrs and \ + self.scope==other.scope and \ + self.filterstr==other.filterstr and \ + self.extensions==other.extensions + + def __ne__(self,other): + return not self.__eq__(other) + + def _parse(self,ldap_url): + """ + parse a LDAP URL and set the class attributes + urlscheme,host,dn,attrs,scope,filterstr,extensions + """ + if not isLDAPUrl(ldap_url): + raise ValueError,'Parameter ldap_url does not seem to be a LDAP URL.' + scheme,rest = ldap_url.split('://',1) + self.urlscheme = scheme.strip() + if not self.urlscheme in ['ldap','ldaps','ldapi']: + raise ValueError,'LDAP URL contains unsupported URL scheme %s.' % (self.urlscheme) + slash_pos = rest.find('/') + qemark_pos = rest.find('?') + if (slash_pos==-1) and (qemark_pos==-1): + # No / and ? found at all + self.hostport = unquote(rest) + self.dn = '' + return + else: + if slash_pos!=-1 and (qemark_pos==-1 or (slash_posqemark_pos)): + # Question mark separates hostport from rest, DN is assumed to be empty + self.hostport = unquote(rest[:qemark_pos]) + # Do not eat question mark + rest = rest[qemark_pos:] + else: + raise ValueError,'Something completely weird happened!' + paramlist=rest.split('?',4) + paramlist_len = len(paramlist) + if paramlist_len>=1: + self.dn = unquote(paramlist[0]).strip() + if (paramlist_len>=2) and (paramlist[1]): + self.attrs = unquote(paramlist[1].strip()).split(',') + if paramlist_len>=3: + scope = paramlist[2].strip() + try: + self.scope = SEARCH_SCOPE[scope] + except KeyError: + raise ValueError,"Search scope must be either one of base, one or sub. LDAP URL contained %s" % (repr(scope)) + if paramlist_len>=4: + filterstr = paramlist[3].strip() + if not filterstr: + self.filterstr = None + else: + self.filterstr = unquote(filterstr) + if paramlist_len>=5: + if paramlist[4]: + self.extensions = LDAPUrlExtensions() + self.extensions.parse(paramlist[4]) + else: + self.extensions = None + return + + def applyDefaults(self,defaults): + """ + Apply defaults to all class attributes which are None. + + defaults + Dictionary containing a mapping from class attributes + to default values + """ + for k in defaults.keys(): + if getattr(self,k) is None: + setattr(self,k,defaults[k]) + + def initializeUrl(self): + """ + Returns LDAP URL suitable to be passed to ldap.initialize() + """ + if self.urlscheme=='ldapi': + # hostport part might contain slashes when ldapi:// is used + hostport = ldapUrlEscape(self.hostport) + else: + hostport = self.hostport + return '%s://%s' % (self.urlscheme,hostport) + + def unparse(self): + """ + Returns LDAP URL depending on class attributes set. + """ + if self.attrs is None: + attrs_str = '' + else: + attrs_str = ','.join(self.attrs) + scope_str = SEARCH_SCOPE_STR[self.scope] + if self.filterstr is None: + filterstr = '' + else: + filterstr = ldapUrlEscape(self.filterstr) + dn = ldapUrlEscape(self.dn) + if self.urlscheme=='ldapi': + # hostport part might contain slashes when ldapi:// is used + hostport = ldapUrlEscape(self.hostport) + else: + hostport = self.hostport + ldap_url = '%s://%s/%s?%s?%s?%s' % ( + self.urlscheme, + hostport,dn,attrs_str,scope_str,filterstr + ) + if self.extensions: + ldap_url = ldap_url+'?'+self.extensions.unparse() + return ldap_url + + def htmlHREF(self,urlPrefix='',hrefText=None,hrefTarget=None): + """ + Returns a string with HTML link for this LDAP URL. + + urlPrefix + Prefix before LDAP URL (e.g. for addressing another web-based client) + hrefText + link text/description + hrefTarget + string added as link target attribute + """ + assert type(urlPrefix)==StringType, "urlPrefix must be StringType" + if hrefText is None: + hrefText = self.unparse() + assert type(hrefText)==StringType, "hrefText must be StringType" + if hrefTarget is None: + target = '' + else: + assert type(hrefTarget)==StringType, "hrefTarget must be StringType" + target = ' target="%s"' % hrefTarget + return '%s' % ( + target,urlPrefix,self.unparse(),hrefText + ) + + def __str__(self): + return self.unparse() + + def __repr__(self): + return '<%s.%s instance at %s: %s>' % ( + self.__class__.__module__, + self.__class__.__name__, + hex(id(self)), + self.__dict__ + ) + + def __getattr__(self,name): + if self.attr2extype.has_key(name): + extype = self.attr2extype[name] + if self.extensions and \ + self.extensions.has_key(extype) and \ + not self.extensions[extype].exvalue is None: + result = unquote(self.extensions[extype].exvalue) + else: + return None + else: + raise AttributeError,"%s has no attribute %s" % ( + self.__class__.__name__,name + ) + return result # __getattr__() + + def __setattr__(self,name,value): + if self.attr2extype.has_key(name): + extype = self.attr2extype[name] + if value is None: + # A value of None means that extension is deleted + delattr(self,name) + elif value!=None: + # Add appropriate extension + self.extensions[extype] = LDAPUrlExtension( + extype=extype,exvalue=unquote(value) + ) + else: + self.__dict__[name] = value + + def __delattr__(self,name): + if self.attr2extype.has_key(name): + extype = self.attr2extype[name] + if self.extensions: + try: + del self.extensions[extype] + except KeyError: + pass + else: + del self.__dict__[name] +