From e70835bb25b59ffdfd1fd3dfe392de2df8e69941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20Ho=CC=88rberg?= Date: Wed, 20 May 2015 11:21:09 +0200 Subject: [PATCH] Partial commit for decrpyting and verifying signatures at the client. All tests works. --- src/saml2/client_base.py | 4 +- src/saml2/entity.py | 4 +- src/saml2/response.py | 60 ++++++++++------- src/saml2/sigver.py | 12 +++- tests/test_51_client.py | 140 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 187 insertions(+), 33 deletions(-) diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py index 009d438..3f7a227 100644 --- a/src/saml2/client_base.py +++ b/src/saml2/client_base.py @@ -542,7 +542,7 @@ class Base(Entity): # ======== response handling =========== def parse_authn_request_response(self, xmlstr, binding, outstanding=None, - outstanding_certs=None, decrypt=True): + outstanding_certs=None, decrypt=True, pefim=False): """ Deal with an AuthnResponse :param xmlstr: The reply as a xml string @@ -578,7 +578,7 @@ class Base(Entity): try: resp = self._parse_response(xmlstr, AuthnResponse, "assertion_consumer_service", - binding, **kwargs) + binding, pefim=pefim, **kwargs) except StatusError as err: logger.error("SAML status error: %s" % err) raise diff --git a/src/saml2/entity.py b/src/saml2/entity.py index a7a51fe..e7a75a5 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -978,7 +978,7 @@ class Entity(HTTPBase): # ------------------------------------------------------------------------ def _parse_response(self, xmlstr, response_cls, service, binding, - outstanding_certs=None, **kwargs): + outstanding_certs=None, pefim=False, **kwargs): """ Deal with a Response :param xmlstr: The response as a xml string @@ -1056,7 +1056,7 @@ class Entity(HTTPBase): decrypt = True if "decrypt" in kwargs: decrypt = kwargs["decrypt"] - response = response.verify(key_file, decrypt=decrypt) + response = response.verify(key_file, decrypt=decrypt, pefim=pefim) if not response: return None diff --git a/src/saml2/response.py b/src/saml2/response.py index b262732..d46a0b0 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -395,7 +395,7 @@ class StatusResponse(object): def loads(self, xmldata, decode=True, origxml=None): return self._loads(xmldata, decode, origxml) - def verify(self, key_file="", decrypt=True): + def verify(self, key_file="", decrypt=True, pefim=False): try: return self._verify() except AssertionError: @@ -780,10 +780,9 @@ class AuthnResponse(StatusResponse): logger.debug("--- Getting Identity ---") - if self.context == "AuthnReq" or self.context == "AttrQuery": - self.ava = self.get_identity() - - logger.debug("--- AVA: %s" % (self.ava,)) + #if self.context == "AuthnReq" or self.context == "AttrQuery": + # self.ava = self.get_identity() + # logger.debug("--- AVA: %s" % (self.ava,)) try: self.get_subject() @@ -808,13 +807,12 @@ class AuthnResponse(StatusResponse): if not self.sec.check_signature( assertion, origdoc=decr_txt, node_name=class_name(assertion), issuer=issuer): - logger.error( - "Failed to verify signature on '%s'" % assertion) + logger.error("Failed to verify signature on '%s'" % assertion) raise SignatureError() res.append(assertion) return res - def parse_assertion(self, key_file="", decrypt=True): + def parse_assertion(self, key_file="", decrypt=True, pefim=False): if self.context == "AuthnQuery": # can contain one or more assertions pass @@ -825,20 +823,28 @@ class AuthnResponse(StatusResponse): except AssertionError: raise Exception("No assertion part") - res = [] has_encrypted_assertions = self.response.encrypted_assertion if not has_encrypted_assertions and self.response.assertion: for tmp_assertion in self.response.assertion: if tmp_assertion.advice: - if tmp_assertion.advice.encrypted_assertion: + if tmp_assertion.advice.encrypted_assertion: has_encrypted_assertions = True break + if self.response.assertion: + logger.debug("***Unencrypted assertion***") + for assertion in self.response.assertion: + if not self._assertion(assertion, False): + return False + if has_encrypted_assertions and decrypt: + _enc_assertions = [] logger.debug("***Encrypted assertion/-s***") decr_text = self.sec.decrypt(self.xmlstr, key_file) resp = samlp.response_from_string(decr_text) - res = self.decrypt_assertions(resp.encrypted_assertion, decr_text) + _enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text) + decr_text = self.sec.decrypt(decr_text, key_file) + resp = samlp.response_from_string(decr_text) if resp.assertion: for tmp_ass in resp.assertion: if tmp_ass.advice and tmp_ass.advice.encrypted_assertion: @@ -849,26 +855,32 @@ class AuthnResponse(StatusResponse): tmp_ass.advice.assertion.extend(advice_res) else: tmp_ass.advice.assertion = advice_res + if not pefim: + _enc_assertions.extend(advice_res) tmp_ass.advice.encrypted_assertion = [] - self.response.assertion = resp.assertion - if self.response.assertion: - self.response.assertion.extend(res) - else: - self.response.assertion = res + self.response.assertion = resp.assertion + for assertion in _enc_assertions: + if not self._assertion(assertion, True): + return False self.xmlstr = decr_text self.response.encrypted_assertion = [] if self.response.assertion: - logger.debug("***Unencrypted assertion***") for assertion in self.response.assertion: - if not self._assertion(assertion, assertion in res): - return False - else: - self.assertions.append(assertion) + if assertion.advice and assertion.advice.assertion: + for advice_assertion in assertion.advice.assertion: + self.assertions.append(assertion) + + if self.assertions and len(self.assertions) > 0: self.assertion = self.assertions[0] + + if self.context == "AuthnReq" or self.context == "AttrQuery": + self.ava = self.get_identity() + logger.debug("--- AVA: %s" % (self.ava,)) + return True - def verify(self, key_file="", decrypt=True): + def verify(self, key_file="", decrypt=True, pefim=False): """ Verify that the assertion is syntactically correct and the signature is correct if present. :param key_file: If not the default key file should be used this is it. @@ -886,7 +898,7 @@ class AuthnResponse(StatusResponse): if not isinstance(self.response, samlp.Response): return self - if self.parse_assertion(key_file, decrypt=decrypt): + if self.parse_assertion(key_file, decrypt=decrypt, pefim=pefim): return self else: logger.error("Could not parse the assertion") @@ -1114,7 +1126,7 @@ class AssertionIDResponse(object): return self._postamble() - def verify(self, key_file="", decrypt=True): + def verify(self, key_file="", decrypt=True, pefim=False): try: valid_instance(self.response) except NotValid as exc: diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 40f0a5d..096134e 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -1306,9 +1306,17 @@ class SecurityContext(object): :param enctext: The encrypted text as a string :return: The decrypted text """ + _enctext = self.crypto.decrypt(enctext, self.key_file) + if _enctext is not None and len(_enctext) > 0: + return _enctext if key_file is not None and len(key_file.strip()) > 0: - return self.crypto.decrypt(enctext, key_file) - return self.crypto.decrypt(enctext, self.key_file) + _enctext = self.crypto.decrypt(enctext, key_file) + if _enctext is not None and len(_enctext) > 0: + return _enctext + _enctext = self.crypto.decrypt(enctext, self.key_file) + if _enctext is not None and len(_enctext) > 0: + return _enctext + return enctext def verify_signature(self, signedtext, cert_file=None, cert_type="pem", node_name=NODE_NAME, node_id=None, id_attr=""): diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 576171c..7affc3e 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -2,9 +2,11 @@ # -*- coding: utf-8 -*- import base64 +import uuid import six import urllib import urlparse +from saml2.cert import OpenSSLWrapper from saml2.xmldsig import SIG_RSA_SHA256 from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT @@ -25,7 +27,7 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID from saml2.server import Server -from saml2.sigver import pre_encryption_part +from saml2.sigver import pre_encryption_part, make_temp from saml2.sigver import rm_xmltag from saml2.sigver import verify_redirect_signature from saml2.s_utils import do_attribute_statement @@ -42,6 +44,28 @@ AUTHN = { } +def generate_cert(): + sn = uuid.uuid4().urn + cert_info = { + "cn": "localhost", + "country_code": "se", + "state": "ac", + "city": "Umea", + "organization": "ITS", + "organization_unit": "DIRG" + } + osw = OpenSSLWrapper() + ca_cert_str = osw.read_str_from_file( + full_path("root_cert/localhost.ca.crt")) + ca_key_str = osw.read_str_from_file( + full_path("root_cert/localhost.ca.key")) + req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, + sn=sn, key_length=2048) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str) + return cert_str, req_key_str + + def add_subelement(xmldoc, node_name, subelem): s = xmldoc.find(node_name) if s > 0: @@ -292,7 +316,7 @@ class TestClient: except Exception: # missing certificate self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) - def test_response(self): + def test_response_1(self): IDP = "urn:mace:example.com:saml:roland:idp" ava = {"givenName": ["Derek"], "surName": ["Jeter"], @@ -368,6 +392,116 @@ class TestClient: print(issuers) assert issuers == [[IDP], [IDP]] + def test_response_2(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + cert_str, cert_key_str = generate_cert() + + cert =\ + { + "cert": cert_str, + "key": cert_key_str + } + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + #encrypted_advice_attributes=True, + pefim=True, + encrypt_cert_advice=cert_str + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}, pefim=True) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_3(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + #encrypted_advice_attributes=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, pefim=True) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def setup_verify_authn_response(self): + idp = "urn:mace:example.com:saml:roland:idp" + ava = {"givenName": ["Derek"], "surName": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} + ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], 'sn': ['Jeter'], 'title': ["The man"]} + nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) + return idp, ava, ava_verify, nameid_policy + + + def verify_authn_response(self, idp, authn_response, _client, ava_verify): + assert authn_response is not None + assert authn_response.issuer() == idp + assert authn_response.response.assertion[0].issuer.text == idp + session_info = authn_response.session_info() + + assert session_info["ava"] == ava_verify + assert session_info["issuer"] == idp + assert session_info["came_from"] == "http://foo.example.com/service" + response = samlp.response_from_string(authn_response.xmlstr) + assert response.destination == "http://lingon.catalogix.se:8087/" + + # One person in the cache + assert len(_client.users.subjects()) == 1 + subject_id = _client.users.subjects()[0] + # The information I have about the subject comes from one source + assert _client.users.issuers_of_info(subject_id) == [idp] + + def test_init_values(self): entityid = self.client.config.entityid print(entityid) @@ -764,4 +898,4 @@ class TestClientWithDummy(): if __name__ == "__main__": tc = TestClient() tc.setup_class() - tc.test_sign_then_encrypt_assertion_advice() + tc.test_response_3()