135 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| """
 | |
| ldap.controls.psearch - classes for Persistent Search Control
 | |
| (see http://tools.ietf.org/html/draft-ietf-ldapext-psearch)
 | |
| 
 | |
| See http://www.python-ldap.org/ for project details.
 | |
| 
 | |
| $Id: psearch.py,v 1.4 2011/07/22 13:47:47 stroeder Exp $
 | |
| """
 | |
| 
 | |
| __all__ = [
 | |
|   'PersistentSearchControl',
 | |
|   'EntryChangeNotificationControl',
 | |
|   'CHANGE_TYPES_INT',
 | |
|   'CHANGE_TYPES_STR',
 | |
| ]
 | |
| 
 | |
| # Imports from python-ldap 2.4+
 | |
| import ldap.controls
 | |
| from ldap.controls import RequestControl,ResponseControl,KNOWN_RESPONSE_CONTROLS
 | |
| 
 | |
| # Imports from pyasn1
 | |
| from pyasn1.type import namedtype,namedval,univ,constraint
 | |
| from pyasn1.codec.ber import encoder,decoder
 | |
| from pyasn1_modules.rfc2251 import LDAPDN
 | |
| 
 | |
| #---------------------------------------------------------------------------
 | |
| # Constants and classes for Persistent Search Control
 | |
| #---------------------------------------------------------------------------
 | |
| 
 | |
| CHANGE_TYPES_INT = {
 | |
|   'add':1,
 | |
|   'delete':2,
 | |
|   'modify':4,
 | |
|   'modDN':8,
 | |
| }
 | |
| CHANGE_TYPES_STR = dict([(v,k) for k,v in CHANGE_TYPES_INT.items()])
 | |
| 
 | |
| 
 | |
| class PersistentSearchControl(RequestControl):
 | |
|   """
 | |
|   Implements the request control for persistent search.
 | |
| 
 | |
|   changeTypes
 | |
|     List of strings specifiying the types of changes returned by the server.
 | |
|     Setting to None requests all changes.
 | |
|   changesOnly
 | |
|     Boolean which indicates whether only changes are returned by the server.
 | |
|   returnECs
 | |
|     Boolean which indicates whether the server should return an
 | |
|     Entry Change Notication response control
 | |
|   """
 | |
| 
 | |
|   class PersistentSearchControlValue(univ.Sequence):
 | |
|     componentType = namedtype.NamedTypes(
 | |
|       namedtype.NamedType('changeTypes',univ.Integer()),
 | |
|       namedtype.NamedType('changesOnly',univ.Boolean()),
 | |
|       namedtype.NamedType('returnECs',univ.Boolean()),
 | |
|     )
 | |
| 
 | |
|   controlType = "2.16.840.1.113730.3.4.3"
 | |
| 
 | |
|   def __init__(self,criticality=True,changeTypes=None,changesOnly=False,returnECs=True):
 | |
|     self.criticality,self.changesOnly,self.returnECs = \
 | |
|       criticality,changesOnly,returnECs
 | |
|     self.changeTypes = changeTypes or CHANGE_TYPES_INT.values()
 | |
| 
 | |
|   def encodeControlValue(self):
 | |
|     if not type(self.changeTypes)==type(0):
 | |
|       # Assume a sequence type of integers to be OR-ed
 | |
|       changeTypes_int = 0
 | |
|       for ct in self.changeTypes:
 | |
|         changeTypes_int = changeTypes_int|CHANGE_TYPES_INT.get(ct,ct)
 | |
|       self.changeTypes = changeTypes_int
 | |
|     p = self.PersistentSearchControlValue()
 | |
|     p.setComponentByName('changeTypes',univ.Integer(self.changeTypes))
 | |
|     p.setComponentByName('changesOnly',univ.Boolean(self.changesOnly))
 | |
|     p.setComponentByName('returnECs',univ.Boolean(self.returnECs))
 | |
|     return encoder.encode(p)
 | |
| 
 | |
| 
 | |
| class ChangeType(univ.Enumerated):
 | |
|   namedValues = namedval.NamedValues(
 | |
|     ('add',1),
 | |
|     ('delete',2),
 | |
|     ('modify',4),
 | |
|     ('modDN',8),
 | |
|   )
 | |
|   subtypeSpec = univ.Enumerated.subtypeSpec + constraint.SingleValueConstraint(1,2,4,8)
 | |
| 
 | |
| 
 | |
| class EntryChangeNotificationValue(univ.Sequence):
 | |
|   componentType = namedtype.NamedTypes(
 | |
|     namedtype.NamedType('changeType',ChangeType()),
 | |
|     namedtype.OptionalNamedType('previousDN', LDAPDN()),
 | |
|     namedtype.OptionalNamedType('changeNumber',univ.Integer()),
 | |
|   )
 | |
| 
 | |
| 
 | |
| class EntryChangeNotificationControl(ResponseControl):
 | |
|   """
 | |
|   Implements the response control for persistent search.
 | |
| 
 | |
|   Class attributes with values extracted from the response control:
 | |
| 
 | |
|   changeType
 | |
|     String indicating the type of change causing this result to be
 | |
|     returned by the server
 | |
|   previousDN
 | |
|     Old DN of the entry in case of a modrdn change
 | |
|   changeNumber
 | |
|     A change serial number returned by the server (optional).
 | |
|   """
 | |
| 
 | |
|   controlType = "2.16.840.1.113730.3.4.7"
 | |
| 
 | |
|   def decodeControlValue(self,encodedControlValue):
 | |
|     ecncValue,_ = decoder.decode(encodedControlValue,asn1Spec=EntryChangeNotificationValue())
 | |
|     self.changeType = int(ecncValue.getComponentByName('changeType'))
 | |
|     if len(ecncValue)==3:
 | |
|       self.previousDN = str(ecncValue.getComponentByName('previousDN'))
 | |
|       self.changeNumber = int(ecncValue.getComponentByName('changeNumber'))
 | |
|     elif len(ecncValue)==2:
 | |
|       if self.changeType==8:
 | |
|         self.previousDN = str(ecncValue.getComponentByName('previousDN'))
 | |
|         self.changeNumber = None
 | |
|       else:
 | |
|         self.previousDN = None
 | |
|         self.changeNumber = int(ecncValue.getComponentByName('changeNumber'))
 | |
|     else:
 | |
|       self.previousDN,self.changeNumber = None,None
 | |
|     return (self.changeType,self.previousDN,self.changeNumber)
 | |
| 
 | |
| KNOWN_RESPONSE_CONTROLS[EntryChangeNotificationControl.controlType] = EntryChangeNotificationControl
 | 
