diff --git a/src/saml2/client.py b/src/saml2/client.py index 0fa9d3d..0d361e7 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -31,12 +31,12 @@ from saml2.utils import do_attributes, args2dict from saml2 import samlp, saml, extension_element_to_element from saml2 import VERSION, class_name, make_instance -from saml2.sigver import correctly_signed_response, decrypt -from saml2.sigver import pre_signature_part, sign_assertion_using_xmlsec -from saml2.sigver import sign_statement_using_xmlsec +from saml2.sigver import pre_signature_part +from saml2.sigver import security_context from saml2.soap import SOAPClient from saml2.attribute_converter import to_local +from saml2.authnresponse import authn_response DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT @@ -48,24 +48,6 @@ FORM_SPEC = """
LAX = False -def _use_on_or_after(condition, slack): - now = time.mktime(time.gmtime()) - not_on_or_after = time.mktime(str_to_time(condition.not_on_or_after)) - if not_on_or_after < now + slack: - # To old ignore - raise Exception("To old can't use it") - return not_on_or_after - -def _use_before(condition, slack): - not_before = time.mktime(str_to_time(condition.not_before)) - now = time.mktime(time.gmtime()) - - if not_before > now + slack: - # Can't use it yet - raise Exception("Can't use it yet %s <= %s" % (not_before, now)) - - return True - class Saml2Client(object): """ The basic pySAML2 service provider class """ @@ -79,7 +61,8 @@ class Saml2Client(object): self.config = config if "metadata" in config: self.metadata = config["metadata"] - + self.sc = security_context(config) + def _init_request(self, request, destination): #request.id = sid() request.version = VERSION @@ -129,9 +112,10 @@ class Saml2Client(object): if post.has_key("SAMLResponse"): saml_response = post['SAMLResponse'].value if saml_response: - return self.verify_response(saml_response, requestor, - outstanding, log, - context="AuthNReq") + ar = authn_response(self.conf, requestor, outstanding, log) + ar.loads(saml_response) + return ar.verify() + return None def authn_request(self, query_id, destination, service_url, spentityid, @@ -184,9 +168,8 @@ class Saml2Client(object): request = make_instance(samlp.AuthnRequest, prel) if sign: - return sign_statement_using_xmlsec("%s" % request, class_name(request), - self.config["xmlsec_binary"], - key_file=self.config["key_file"]) + return self.sc.sign_statement_using_xmlsec("%s" % request, + class_name(request)) #return samlp.authn_request_from_string(sreq) else: return "%s" % request @@ -195,8 +178,7 @@ class Saml2Client(object): my_name="", relay_state="", binding=saml2.BINDING_HTTP_REDIRECT, log=None, vorg="", scoping=None): - """ Either verifies an authentication Response or if none is present - send an authentication request. + """ Sends an authentication request. :param spentityid: The SP EntityID :param binding: How the authentication request should be sent to the @@ -253,211 +235,6 @@ class Saml2Client(object): raise Exception("Unkown binding type: %s" % binding) return (session_id, response) - def verify_response(self, xml_response, requestor, outstanding=None, - log=None, decode=True, context="", lax=False): - """ Verify a response - - :param xml_response: The response as a XML string - :param requestor: The hostname of the machine - :param outstanding: A collection of outstanding authentication requests - :param log: Where logging information should be sent - :param decode: There for testing purposes - :param lax: Accept things you normally shouldn't - :return: A 2-tuple consisting of an identity description and the - real relay-state - """ - - if not outstanding: - outstanding = {} - - if decode: - decoded_xml = base64.b64decode(xml_response) - else: - decoded_xml = xml_response - - # own copy - xmlstr = decoded_xml[:] - log and log.info("verify correct signature") - response = correctly_signed_response(decoded_xml, - self.config["xmlsec_binary"], log=log) - if not response: - if log: - log.error("Response was not correctly signed") - log.info(decoded_xml) - return None - else: - log and log.error("Response was correctly signed or nor signed") - - log and log.info("response: %s" % (response,)) - try: - session_info = self.do_response(response, - requestor, - outstanding=outstanding, - xmlstr=xmlstr, - log=log, context=context, - lax=lax) - session_info["issuer"] = response.issuer.text - session_info["session_id"] = response.in_response_to - except AttributeError, exc: - if log: - log.error("AttributeError: %s" % (exc,)) - else: - print >> sys.stderr, "AttributeError: %s" % (exc,) - return None - except Exception, exc: - if log: - log.error("Exception: %s" % (exc,)) - else: - print >> sys.stderr, "Exception: %s" % (exc,) - return None - - session_info["ava"]["__userid"] = session_info["name_id"] - return session_info - - def _verify_condition(self, assertion, requestor, log, lax=False, - slack=0): - # The Identity Provider MUST include a element - #print "Conditions",assertion.conditions - assert assertion.conditions - condition = assertion.conditions - log and log.info("condition: %s" % condition) - - try: - slack = self.config["accept_time_diff"] - except KeyError: - slack = 0 - - try: - not_on_or_after = _use_on_or_after(condition, slack) - _use_before(condition, slack) - except Exception: - if not lax: - raise - else: - not_on_or_after = 0 - - if not for_me(condition, requestor): - if not LAX and not lax: - raise Exception("Not for me!!!") - - return not_on_or_after - - def _websso(self, assertion, _outstanding, _requestor, _log=None): - # the assertion MUST contain one AuthNStatement - assert len(assertion.authn_statement) == 1 - # authn_statement = assertion.authn_statement[0] - # check authn_statement.session_index - - - def _assertion(self, assertion, outstanding, requestor, log, context, - lax): - if log: - log.info("assertion context: %s" % (context,)) - log.info("assertion keys: %s" % (assertion.keyswv())) - log.info("outstanding: %s" % (outstanding)) - - if context == "AuthNReq": - self._websso(assertion, outstanding, requestor, log) - - # The Identity Provider MUST include a element - #print "Conditions",assertion.conditions - assert assertion.conditions - log and log.info("verify_condition") - not_on_or_after = self._verify_condition(assertion, requestor, log, - lax) - - # The assertion can contain zero or one attributeStatements - if not assertion.attribute_statement: - ava = {} - else: - assert len(assertion.attribute_statement) == 1 - ava = to_local(self.config.attribute_converters(), - assertion.attribute_statement[0]) - - log and log.info("AVA: %s" % (ava,)) - - # The assertion must contain a Subject - assert assertion.subject - subject = assertion.subject - for subject_confirmation in subject.subject_confirmation: - data = subject_confirmation.subject_confirmation_data - if data.in_response_to in outstanding: - came_from = outstanding[data.in_response_to] - del outstanding[data.in_response_to] - elif LAX: - came_from = "" - else: - print data.in_response_to, outstanding.keys() - raise Exception( - "Combination of session id and requestURI I don't recall") - - # The subject must contain a name_id - assert subject.name_id - name_id = subject.name_id.text.strip() - - return {"ava":ava, "name_id":name_id, "came_from":came_from, - "not_on_or_after":not_on_or_after} - - def _encrypted_assertion(self, xmlstr, outstanding, requestor, - log=None, context=""): - log and log.debug("Decrypt message") - decrypt_xml = decrypt(xmlstr, self.config["key_file"], - self.config["xmlsec_binary"], log=log) - log and log.debug("Decryption successfull") - - response = samlp.response_from_string(decrypt_xml) - log and log.debug("Parsed decrypted assertion successfull") - - enc = response.encrypted_assertion[0].extension_elements[0] - assertion = extension_element_to_element( - enc, - saml.ELEMENT_FROM_STRING, - namespace=saml.NAMESPACE) - log and log.info("Decrypted Assertion: %s" % assertion) - return self._assertion(assertion, outstanding, requestor, log, - context) - - def do_response(self, response, requestor, outstanding=None, - xmlstr="", log=None, context="", lax=False): - """ - Parse a response, verify that it is a response for me and - expected by me and that it is correct. - - :param response: The response as a structure - :param requestor: The host (me) that asked for a AuthN response - :param outstanding: A dictionary with session ids as keys and request - URIs as values. - :param lax: Accept things you normally shouldn't - :result: A 2-tuple with attribute value assertions as a dictionary - as one part and the NameID as the other. - """ - - if not outstanding: - outstanding = {} - - if response.status: - status = response.status - if status.status_code.value != samlp.STATUS_SUCCESS: - log and log.info("Not successfull operation: %s" % status) - raise Exception( - "Not successfull according to: %s" % \ - status.status_code.value) - - # MUST contain *one* assertion - try: - assert len(response.assertion) == 1 or \ - len(response.encrypted_assertion) == 1 - except AssertionError: - raise Exception("No assertion part") - - if response.assertion: - log and log.info("***Unencrypted response***") - return self._assertion(response.assertion[0], outstanding, - requestor, log, context, lax) - else: - log and log.info("***Encrypted response***") - return self._encrypted_assertion(xmlstr, outstanding, - requestor, log, context) def create_attribute_query(self, session_id, subject_id, issuer, destination, attribute=None, sp_name_qualifier=None, @@ -504,9 +281,7 @@ class Saml2Client(object): request = make_instance(samlp.AttributeQuery, prequery) if sign: - signed_req = sign_assertion_using_xmlsec("%s" % request, - self.config["xmlsec_binary"], - key_file=self.config["key_file"]) + signed_req = self.sc.sign_assertion_using_xmlsec("%s" % request) return samlp.attribute_query_from_string(signed_req) else: @@ -596,12 +371,6 @@ class Saml2Client(object): # ---------------------------------------------------------------------- -def for_me(condition, myself ): - for restriction in condition.audience_restriction: - audience = restriction.audience - if audience.text.strip() == myself: - return True - ROW = """%s%s""" def _print_statement(statem): @@ -637,4 +406,3 @@ def _print_statements(states): def print_response(resp): print _print_statement(resp) print resp.to_string() - \ No newline at end of file diff --git a/src/saml2/config.py b/src/saml2/config.py index 4e14182..e711ff8 100644 --- a/src/saml2/config.py +++ b/src/saml2/config.py @@ -112,6 +112,9 @@ class Config(dict): return self + def xmlsec(self): + return self["xmlsec_binary"] + def services(self): return self["service"].keys() diff --git a/src/saml2/saml.py b/src/saml2/saml.py index 03b04e4..dc57387 100644 --- a/src/saml2/saml.py +++ b/src/saml2/saml.py @@ -372,6 +372,7 @@ class SubjectConfirmation(SamlBase): extension_elements=None, extension_attributes=None): """Constructor for SubjectConfirmation + :param method: Subject confirmation method :param base_id: Method attribute :param name_id: NameID element :param subject_confirmation_data: SubjectConfirmationData element diff --git a/src/saml2/server.py b/src/saml2/server.py index d5e6d4d..d699184 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -31,7 +31,7 @@ from saml2.utils import OtherError, do_attribute_statement from saml2.utils import VersionMismatch, UnknownPrincipal, UnsupportedBinding from saml2.utils import status_from_exception_factory -from saml2.sigver import correctly_signed_authn_request +from saml2.sigver import security_context from saml2.sigver import pre_signature_part from saml2.time_util import instant, in_a_while from saml2.config import Config @@ -42,21 +42,26 @@ class UnknownVO(Exception): pass class Identifier(object): + """ A class that handles identifiers of objects """ def __init__(self, dbname, entityid, voconf=None, debug=0, log=None): self.map = shelve.open(dbname,writeback=True) self.entityid = entityid self.voconf = voconf self.debug = debug self.log = log - + def persistent(self, entity_id, subject_id): """ Keeps the link between a permanent identifier and a temporary/pseudo-temporary identifier for a subject + The store supports look-up both ways: from a permanent local + identifier to a identifier used talking to a SP and from an + identifier given back by an SP to the local permanent. + :param entity_id: SP entity ID or VO entity ID - :param subject_id: The local identifier of the subject + :param subject_id: The local permanent identifier of the subject :return: An arbitrary identifier for the subject unique to the - entity_id + service/group of services/VO with a given entity_id """ if self.debug: self.log and self.log.debug("Id map keys: %s" % self.map.keys()) @@ -104,12 +109,30 @@ class Identifier(object): sp_name_qualifier=sp_name_qualifier) def persistent_nameid(self, sp_name_qualifier, userid): + """ Get or create a persistent identifier for this object to be used + when communicating with servers using a specific SPNameQualifier + + :param sp_name_qualifier: An identifier for a 'context' + :param userid: The local permanent identifier of the object + :return: A persistent random identifier. + """ subj_id = self.persistent(sp_name_qualifier, userid) return args2dict(subj_id, format=saml.NAMEID_FORMAT_PERSISTENT, sp_name_qualifier=sp_name_qualifier) def construct_nameid(self, local_policy, userid, sp_entity_id, identity=None, name_id_policy=None): + """ Returns a name_id for the object. How the name_id is + constructed depends on the context. + + :param local_policy: The policy the server is configured to follow + :param user: The local permanent identifier of the object + :param sp_entity_id: The 'user' of the name_id + :param identity: Attribute/value pairs describing the object + :param name_id_policy: The policy the server on the other side wants + us to follow. + :return: NameID instance precursor + """ if name_id_policy and name_id_policy.sp_name_qualifier: return self._get_vo_identifier(name_id_policy.sp_name_qualifier, userid, identity) @@ -121,6 +144,7 @@ class Identifier(object): return self.temporary_nameid() def temporary_nameid(self): + """ Returns a random one-time identifier """ return args2dict(sid(), format=saml.NAMEID_FORMAT_TRANSIENT) @@ -137,12 +161,14 @@ class Server(object): self.conf = config self.metadata = self.conf["metadata"] + self.sc = security_context(self.conf, log) if cache: self.cache = Cache(cache) else: self.cache = Cache() def load_config(self, config_file): + self.conf = Config() self.conf.load_file(config_file) if "subject_data" in self.conf: @@ -153,8 +179,9 @@ class Server(object): self.id = None def issuer(self): + """ Return an Issuer precursor """ return args2dict( self.conf["entityid"], - format=saml.NAMEID_FORMAT_ENTITY) + format=saml.NAMEID_FORMAT_ENTITY) def parse_authn_request(self, enc_request): @@ -164,15 +191,14 @@ class Server(object): :return: A dictionary with keys: consumer_url - as gotten from the SPs entity_id and the metadata id - the id of the request - name_id_policy - how to chose the subjects identifier - sp_entityid - the entity id of the SP + sp_entity_id - the entity id of the SP + request - The verified request """ response = {} request_xml = decode_base64_and_inflate(enc_request) try: - request = correctly_signed_authn_request(request_xml, - self.conf["xmlsec_binary"], log=self.log) + request = self.sc.correctly_signed_authn_request(request_xml) if self.log and self.debug: self.log.error("Request was correctly signed") except Exception: @@ -187,18 +213,20 @@ class Server(object): if request.version != VERSION: raise VersionMismatch( "can't work with version %s" % request.version) - sp_entityid = request.issuer.text + + sp_entity_id = request.issuer.text # used to find return address in metadata try: - consumer_url = self.metadata.consumer_url(sp_entityid) + consumer_url = self.metadata.consumer_url(sp_entity_id) except KeyError: self.log and self.log.info( "entities: %s" % self.metadata.entity.keys()) - raise UnknownPrincipal(sp_entityid) + raise UnknownPrincipal(sp_entity_id) + if not consumer_url: # what to do ? - raise UnsupportedBinding(sp_entityid) + raise UnsupportedBinding(sp_entity_id) - response["sp_entityid"] = sp_entityid + response["sp_entity_id"] = sp_entity_id if consumer_url != return_destination: # serious error on someones behalf @@ -214,17 +242,30 @@ class Server(object): return response def wants(self, sp_entity_id): - """ + """ Returns what attributes this SP requiers and which are optional + if any such demands are registered in the Metadata. + :param sp_entity_id: The entity id of the SP :return: 2-tuple, list of required and list of optional attributes """ return self.metadata.requests(sp_entity_id) def parse_attribute_query(self, xml_string): + """ Parse an attribute query + + :param xml_string: The Attribute Query as an XML string + :return: 3-Tuple containing: + subject - identifier of the subject + attribute - which attributes that the requestor wants back + query - the whole query + """ query = samlp.attribute_query_from_string(xml_string) + # Check that it's assert query.version == VERSION + self.log and self.log.info( "%s ?= %s" % (query.destination, self.conf.aa_url)) + # Is it for me ? assert query.destination == self.conf.aa_url # verify signature @@ -235,10 +276,7 @@ class Server(object): else: attribute = None return (subject, attribute, query) - - def find_subject(self, subject, attribute=None): - pass - + # ------------------------------------------------------------------------ def _response(self, consumer_url, in_response_to, sp_entity_id, diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 320ab90..11f6c18 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -19,7 +19,7 @@ Based on the use of xmlsec1 binaries and not the python xmlsec module. """ -from saml2 import samlp +from saml2 import samlp, class_name, saml import xmldsig as ds from tempfile import NamedTemporaryFile from subprocess import Popen, PIPE @@ -44,33 +44,6 @@ _TEST_ = True class SignatureError(Exception): pass - -def decrypt( enctext, key_file, xmlsec_binary, log=None): - """ Decrypting an encrypted text by the use of a private key. - - :param input: The encrypted text as a string - :param key_file: The name of the key file - :param xmlsec_binary: Where on the computer the xmlsec binary is. - :param log: A reference to a logging instance. - :return: The decrypted text - """ - log and log.info("input len: %d" % len(enctext)) - _, fil = make_temp("%s" % enctext, decode=False) - ntf = NamedTemporaryFile() - - log and log.info("xmlsec binary: %s" % xmlsec_binary) - com_list = [xmlsec_binary, "--decrypt", - "--privkey-pem", key_file, - "--output", ntf.name, - "--id-attr:%s" % ID_ATTR, - ENC_NODE_NAME, fil] - - log and log.info("Decrypt command: %s" % " ".join(com_list)) - result = Popen(com_list, stderr=PIPE).communicate() - log and log.info("Decrypt result: %s" % (result,)) - - ntf.seek(0) - return ntf.read() def create_id(): """ Create a string of 40 random characters from the set [a-p], @@ -145,14 +118,9 @@ def _parse_xmlsec_output(output): elif line == "FALSE": return False return False - -def verify_signature_assertion(xmlsec_binary, enctext, cert_file): - return verify_signature(xmlsec_binary, enctext, cert_file, - "der", - "urn:oasis:names:tc:SAML:2.0:assertion:Assertion") - -def verify_signature(xmlsec_binary, enctext, cert_file, - cert_type="der", node_name=NODE_NAME): + +def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="", + node_name=NODE_NAME, debug=False): """ Verifies the signature of a XML document. :param xmlsec_binary: The xmlsec1 binaries to be used @@ -160,6 +128,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file, :param der_file: The public key that was used to sign the document :return: Boolean True if the signature was correct otherwise False. """ + _, fil = make_temp("%s" % enctext, decode=False) com_list = [xmlsec_binary, "--verify", @@ -167,7 +136,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file, "--id-attr:%s" % ID_ATTR, node_name, fil] - if _TEST_: + if debug: try: print " ".join(com_list) except TypeError: @@ -182,7 +151,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file, output = Popen(com_list, stderr=PIPE).communicate()[1] verified = _parse_xmlsec_output(output) - if _TEST_: + if debug: print output print os.stat(cert_file) print "Verify result: '%s'" % (verified,) @@ -191,176 +160,229 @@ def verify_signature(xmlsec_binary, enctext, cert_file, return verified -def correctly_signed_authn_request(decoded_xml, xmlsec_binary=XMLSEC_BINARY, - metadata=None, log=None, must=False): - """ Check if a request is correctly signed, if we have metadata for - the SP that sent the info use that, if not use the key that are in - the message if any. - - :param decode_xml: The SAML message as a XML string - :param xmlsec_binary: Where the xmlsec1 binary can be found on this - system. - :param metadata: Metadata information - :return: None if the signature can not be verified otherwise - request as a samlp.Request instance - """ - request = samlp.authn_request_from_string(decoded_xml) +# --------------------------------------------------------------------------- - if not request.signature: - if must: - raise SignatureError("Missing must signature") - else: - return request +def security_context(conf, log=None): + if not conf: + return None - issuer = request.issuer.text.strip() - - if metadata: - certs = metadata.certs(issuer) - else: - certs = [] + try: + debug = conf["debug"] + except KeyError: + debug = 0 - if not certs: - certs = [make_temp("%s" % cert, ".der") \ - for cert in cert_from_instance(request)] - if not certs: - raise SignatureError("Missing signing certificate") + return SecurityContext(conf.xmlsec(), conf["key_file"], "pem", + conf["cert_file"], "pem", conf["metadata"], + log=log, debug=debug) - verified = False - for _, der_file in certs: - if verify_signature(xmlsec_binary, decoded_xml, der_file): - verified = True - break - - if not verified: - raise SignatureError("Failed to verify signature") +class SecurityContext(object): + def __init__(self, xmlsec_binary, key_file="", key_type= "", cert_file="", + cert_type="", metadata=None, log=None, debug=False): + self.xmlsec = xmlsec_binary + self.key_file = key_file + self.cert_file = cert_file + self.cert_type = cert_type + self.metadata = metadata + self.log = log + self.debug = debug - return request + if self.debug and not self.log: + self.debug = 0 + + def correctly_signed(self, xml, must=False): + self.log and self.log.info("verify correct signature") + return self.correctly_signed_response(xml, must) + + def decrypt(self, enctext): + """ Decrypting an encrypted text by the use of a private key. + + :param enctext: The encrypted text as a string + :return: The decrypted text + """ + self.log and self.log.info("input len: %d" % len(enctext)) + _, fil = make_temp("%s" % enctext, decode=False) + ntf = NamedTemporaryFile() + + com_list = [self.xmlsec, "--decrypt", + "--privkey-pem", key_file, + "--output", ntf.name, + "--id-attr:%s" % ID_ATTR, + ENC_NODE_NAME, fil] + + if self.debug: + self.log.debug("Decrypt command: %s" % " ".join(com_list)) + + result = Popen(com_list, stderr=PIPE).communicate() + + if self.debug: + self.log.debug("Decrypt result: %s" % (result,)) + + ntf.seek(0) + return ntf.read() -def correctly_signed_response(decoded_xml, - xmlsec_binary=XMLSEC_BINARY, metadata=None, log=None, must=False): - """ Check if a instance is correctly signed, if we have metadata for - the IdP that sent the info use that, if not use the key that are in - the message if any. - :param decode_xml: The SAML message as a XML string - :param xmlsec_binary: Where the xmlsec1 binary can be found on this - system. - :param metadata: Metadata information - :return: None if the signature can not be verified otherwise an instance - """ - - response = samlp.response_from_string(decoded_xml) + + def verify_signature(self, enctext, cert_file=None, cert_type="pem", + node_name=NODE_NAME): + """ Verifies the signature of a XML document. + + :param enctext: The XML document as a string + :param der_file: The public key that was used to sign the document + :return: Boolean True if the signature was correct otherwise False. + """ + if not cert_file: + cert_file = self.cert_file + cert_type = self.cert_type + + return verify_signature(enctext, self.xmlsec, cert_file, cert_type, + node_name, True) + + def correctly_signed_authn_request(self, decoded_xml, must=False): + """ Check if a request is correctly signed, if we have metadata for + the SP that sent the info use that, if not use the key that are in + the message if any. + + :param decode_xml: The SAML message as a XML string + :param must: Whether there must be a signature + :return: None if the signature can not be verified otherwise + request as a samlp.Request instance + """ + request = samlp.authn_request_from_string(decoded_xml) - if not xmlsec_binary: - xmlsec_binary = XMLSEC_BINARY - - # Try to find the signing cert in the assertion - for assertion in response.assertion: - if not assertion.signature: - if _TEST_: - log and log.info("unsigned") + if not request.signature: if must: - raise SignatureError("Signature missing") - continue - else: - if _TEST_: - log and log.info("signed") - - issuer = assertion.issuer.text.strip() - if _TEST_: - print "issuer: %s" % issuer - if metadata: - certs = metadata.certs(issuer) + raise SignatureError("Missing must signature") + else: + return request + + issuer = request.issuer.text.strip() + + if self.metadata: + certs = self.metadata.certs(issuer) else: certs = [] - - if _TEST_: - print "metadata certs: %s" % certs - + if not certs: certs = [make_temp("%s" % cert, ".der") \ - for cert in cert_from_instance(assertion)] + for cert in cert_from_instance(request)] if not certs: - raise SignatureError("Missing certificate") + raise SignatureError("Missing signing certificate") verified = False for _, der_file in certs: - if verify_signature(xmlsec_binary, decoded_xml, der_file): + if verify_signature(self.xmlsec, decoded_xml, der_file): verified = True break if not verified: - raise SignatureError("Could not verify") + raise SignatureError("Failed to verify signature") - return response + return request -#---------------------------------------------------------------------------- -# SIGNATURE PART -#---------------------------------------------------------------------------- + def correctly_signed_response(self, decoded_xml, must=False): + """ Check if a instance is correctly signed, if we have metadata for + the IdP that sent the info use that, if not use the key that are in + the message if any. -def sign_statement_using_xmlsec(statement, xtype, xmlsec_binary, key=None, + :param decode_xml: The SAML message as a XML string + :param must: Whether there must be a signature + :return: None if the signature can not be verified otherwise an instance + """ + + response = samlp.response_from_string(decoded_xml) + + # Try to find the signing cert in the assertion + for assertion in response.assertion: + if not assertion.signature: + if self.debug: + self.log.debug("unsigned") + if must: + raise SignatureError("Signature missing") + continue + else: + if self.debug: + self.log.debug("signed") + + issuer = assertion.issuer.text.strip() + + if self.debug: + self.log.debug("issuer: %s" % issuer) + + if self.metadata: + certs = self.metadata.certs(issuer) + else: + certs = [] + + if self.debug: + self.log.debug("metadata certs: %s" % certs) + + if not certs: + certs = [make_temp("%s" % cert, ".der") \ + for cert in cert_from_instance(assertion)] + if not certs: + raise SignatureError("Missing certificate") + + verified = False + for _, der_file in certs: + if self.verify_signature(decoded_xml, der_file, "der"): + verified = True + break + + if not verified: + raise SignatureError("Could not verify") + + return response + + #---------------------------------------------------------------------------- + # SIGNATURE PART + #---------------------------------------------------------------------------- + + def sign_statement_using_xmlsec(self, statement, class_name, key=None, key_file=None): - """Sign a SAML statement using xmlsec. - - :param statement: The statement to be signed - :param key: The key to be used for the signing, either this or - :param key_File: The file where the key can be found - :param xmlsec_binary: The xmlsec1 binaries used to do the signing. - :return: The signed statement - """ + """Sign a SAML statement using xmlsec. - _, fil = make_temp("%s" % statement, decode=False) - - if key: - _, key_file = make_temp("%s" % key, ".pem") - ntf = NamedTemporaryFile() - - com_list = [xmlsec_binary, "--sign", - "--output", ntf.name, - "--privkey-pem", key_file, - "--id-attr:%s" % ID_ATTR, - xtype, - fil] - - #print " ".join(com_list) - - if Popen(com_list, stdout=PIPE).communicate()[0] == "": - ntf.seek(0) - return ntf.read() - else: - raise Exception("Signing failed") - -def sign_assertion_using_xmlsec(statement, xmlsec_binary, key=None, - key_file=None): - """Sign a SAML statement using xmlsec. - - :param statement: The statement to be signed - :param key: The key to be used for the signing, either this or - :param key_File: The file where the key can be found - :param xmlsec_binary: The xmlsec1 binaries used to do the signing. - :return: The signed statement - """ + :param statement: The statement to be signed + :param key: The key to be used for the signing, either this or + :param key_File: The file where the key can be found + :return: The signed statement + """ - _, fil = make_temp("%s" % statement, decode=False) + if not key and not key_file: + key_file = self.key_file + + _, fil = make_temp("%s" % statement, decode=False) - if key: - _, key_file = make_temp("%s" % key, ".pem") - ntf = NamedTemporaryFile() - - com_list = [xmlsec_binary, "--sign", - "--output", ntf.name, - "--privkey-pem", key_file, - "--id-attr:%s" % ID_ATTR, - "urn:oasis:names:tc:SAML:2.0:assertion:Assertion", - fil] + if key: + _, key_file = make_temp("%s" % key, ".pem") + + ntf = NamedTemporaryFile() + + com_list = [self.xmlsec, "--sign", + "--output", ntf.name, + "--privkey-pem", key_file, + "--id-attr:%s" % ID_ATTR, + class_name, + fil] - #print " ".join(com_list) + if Popen(com_list, stdout=PIPE).communicate()[0] == "": + ntf.seek(0) + return ntf.read() + else: + raise Exception("Signing failed") - if Popen(com_list, stdout=PIPE).communicate()[0] == "": - ntf.seek(0) - return ntf.read() - else: - raise Exception("Signing failed") + def sign_assertion_using_xmlsec(self, statement, key=None, key_file=None): + """Sign a SAML assertion using xmlsec. + + :param statement: The statement to be signed + :param key: The key to be used for the signing, either this or + :param key_File: The file where the key can be found + :return: The signed statement + """ + + return self.sign_statement_using_xmlsec( statement, + class_name(saml.Assertion()), key=None, key_file=None) + +# =========================================================================== PRE_SIGNATURE = { "signed_info": { diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index 8e2512d..ed124d4 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +import os + from saml2 import sigver, make_instance from saml2 import utils from saml2 import time_util @@ -16,60 +18,71 @@ PRIV_KEY = "test.key" def _eq(l1,l2): return set(l1) == set(l2) -def test_verify_1(xmlsec): - xml_response = open(SIGNED).read() - response = sigver.correctly_signed_response(xml_response, xmlsec) - assert response - -def test_non_verify_1(xmlsec): - """ unsigned is OK if not good """ - xml_response = open(UNSIGNED).read() - response = sigver.correctly_signed_response(xml_response, xmlsec) - assert response - -def test_non_verify_2(xmlsec): - xml_response = open(FALSE_SIGNED).read() - raises(sigver.SignatureError,sigver.correctly_signed_response, - xml_response, xmlsec) - SIGNED_VALUE= """AS1kHHtA4eTOU2XLTWhLMSJQ6V+TSDymRoTF78CqjrYURNLk9wjdPjAReNn9eykv ryFiHNk0p9wMBknha5pH8aeCI/LmcVhLa5xteGZrtE/Udh5vv8z4kRQX51Uz/5x8 ToiobGw83MEW6A0dRUn0O20NBMMTaFZZPXye7RvVlHY=""" DIGEST_VALUE = "WFRXmImfoO3M6JOLE6BGGpU9Ud0=" -def test_sign(xmlsec): - ass = make_instance(saml.Assertion, { - "version": "2.0", - "id": "11111", - "issue_instant": "2009-10-30T13:20:28Z", - "signature": sigver.pre_signature_part("11111"), - "attribute_statement": { - "attribute": [{ - "friendly_name": "surName", - "attribute_value": "Foo", - }, - { - "friendly_name": "givenName", - "attribute_value": "Bar", - } - ] - } - }) +def get_xmlsec(): + for path in os.environ["PATH"].split(":"): + fil = os.path.join(path, "xmlsec1") + if os.access(fil,os.X_OK): + return fil + + raise Exception("Can't find xmlsec1") + +class TestSecurity(): + def setup_class(self): + self.sec = sigver.SecurityContext(get_xmlsec(), PRIV_KEY, "pem") + + def test_verify_1(self): + xml_response = open(SIGNED).read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_non_verify_1(self): + """ unsigned is OK if not good """ + xml_response = open(UNSIGNED).read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_non_verify_2(self): + xml_response = open(FALSE_SIGNED).read() + raises(sigver.SignatureError,self.sec.correctly_signed_response, + xml_response) + + def test_sign(self): + ass = make_instance(saml.Assertion, { + "version": "2.0", + "id": "11111", + "issue_instant": "2009-10-30T13:20:28Z", + "signature": sigver.pre_signature_part("11111"), + "attribute_statement": { + "attribute": [{ + "friendly_name": "surName", + "attribute_value": "Foo", + }, + { + "friendly_name": "givenName", + "attribute_value": "Bar", + } + ] + } + }) + + sign_ass = self.sec.sign_assertion_using_xmlsec("%s" % ass) + + sass = saml.assertion_from_string(sign_ass) + print sass + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + assert time_util.str_to_time(sass.issue_instant) + sig = sass.signature + assert sig.signature_value.text == SIGNED_VALUE + assert len(sig.signed_info.reference) == 1 + assert len(sig.signed_info.reference[0].digest_value) == 1 + assert sig.signed_info.reference[0].digest_value[0].text == DIGEST_VALUE - print ass - sign_ass = sigver.sign_assertion_using_xmlsec("%s" % ass, xmlsec, - key_file=PRIV_KEY) - sass = saml.assertion_from_string(sign_ass) - print sass - assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', - 'version', 'signature', 'id']) - assert sass.version == "2.0" - assert sass.id == "11111" - assert time_util.str_to_time(sass.issue_instant) - sig = sass.signature - assert sig.signature_value.text == SIGNED_VALUE - assert len(sig.signed_info.reference) == 1 - assert len(sig.signed_info.reference[0].digest_value) == 1 - assert sig.signed_info.reference[0].digest_value[0].text == DIGEST_VALUE - diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 0238a42..8ebdbfe 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -157,13 +157,13 @@ class TestServer1(): print authn_request intermed = utils.deflate_and_base64_encode(authn_request) response = self.server.parse_authn_request(intermed) - + print response assert response["consumer_url"] == "http://localhost:8087/" assert response["id"] == "1" name_id_policy = response["request"].name_id_policy assert _eq(name_id_policy.keyswv(), ["format", "allow_create"]) assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT - assert response["sp_entityid"] == "urn:mace:example.com:saml:roland:sp" + assert response["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" def test_sso_response_with_identity(self): name_id = self.server.id.temporary_nameid() diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 33c11f7..a296ed1 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -4,9 +4,8 @@ from saml2.client import Saml2Client from saml2 import samlp, client, BINDING_HTTP_POST from saml2 import saml, utils, config, class_name, make_instance -from saml2.sigver import correctly_signed_authn_request, verify_signature -from saml2.sigver import correctly_signed_response -from saml2.server import Server +#from saml2.sigver import correctly_signed_authn_request, verify_signature +#from saml2.server import Server XML_RESPONSE_FILE = "saml_signed.xml" XML_RESPONSE_FILE2 = "saml2_response.xml" @@ -48,17 +47,6 @@ REQ1 = """ class TestClient: def setup_class(self): - server = Server("idp.config") - name_id = server.id.temporary_nameid() - - self._resp_ = server.do_response( - "http://lingon.catalogix.se:8087/", # consumer_url - "12", # in_response_to - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - {"eduPersonEntitlement":"Jeter"}, - name_id = name_id - ) - conf = config.Config() try: conf.load_file("tests/server.config") @@ -66,58 +54,6 @@ class TestClient: conf.load_file("server.config") self.client = Saml2Client({},conf) - def test_verify_1(self): - xml_response = ("%s" % (self._resp_,)).split("\n")[1] - outstanding = {"12": "http://localhost:8088/sso"} - session_info = self.client.verify_response(xml_response, - "urn:mace:example.com:saml:roland:sp", - outstanding=outstanding, - decode=False) - del session_info["name_id"] - del session_info["not_on_or_after"] - del session_info["ava"]["__userid"] - print session_info - assert session_info == {'session_id': '12', - 'came_from': 'http://localhost:8088/sso', - 'ava': {'eduPersonEntitlement': ['Jeter'] }, - 'issuer': 'urn:mace:example.com:saml:roland:idp'} - - def test_parse_1(self): - xml_response = ("%s" % (self._resp_,)).split("\n")[1] - response = correctly_signed_response(xml_response, - self.client.config["xmlsec_binary"]) - outstanding = {"12": "http://localhost:8088/sso"} - session_info = self.client.do_response(response, - "urn:mace:example.com:saml:roland:sp", - outstanding=outstanding) - assert session_info["ava"]["eduPersonEntitlement"] == ["Jeter"] - - def test_parse_2(self): - xml_response = open(XML_RESPONSE_FILE2).read() - response = samlp.response_from_string(xml_response) - outstanding = {"_ae0216740b5baa4b13c79ffdb2baa82572788fd9a3": - "http://localhost:8088/sso"} - session_info = self.client.do_response(response, - "xenosmilus.umdc.umu.se", - outstanding=outstanding, - lax=True) - - assert session_info["ava"] == {'uid': ['andreas'], - 'mobile': ['+4741107700'], - 'edupersonnickname': ['erlang'], - 'o': ['Feide RnD'], - 'edupersonentitlement': [ - 'urn:mace:feide.no:entitlement:test'], - 'edupersonaffiliation': ['employee'], - 'eduPersonPrincipalName': ['andreas@rnd.feide.no'], - 'sn': ['Solberg'], - 'mail': ['andreas@uninett.no'], - 'ou': ['Guests'], - 'cn': ['Andreas Solberg']} - assert session_info["name_id"] == "_242f88493449e639aab95dd9b92b1d04234ab84fd8" - assert session_info.keys() == ['came_from', 'name_id', 'ava', - 'not_on_or_after'] - def test_create_attribute_query1(self): req = self.client.create_attribute_query("1", "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", @@ -294,8 +230,4 @@ class TestClient: self.client.config["xmlsec_binary"], self.client.config["metadata"]) except Exception: # missing certificate - verify_signature(self.client.config["xmlsec_binary"], - ar_str, - self.client.config["cert_file"], - cert_type="pem", - node_name=class_name(ar)) + self.client.sc.verify_signature(ar_str, node_name=class_name(ar))