From 73dfdc9603859dad9a3c7fe261ba53cc20a081c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20Ho=CC=88rberg?= Date: Wed, 18 Mar 2015 08:57:21 +0100 Subject: [PATCH] Added tests for encryption and signing of the authentication response. Added tests to decrypt authentication responses in the client. --- src/saml2/entity.py | 35 +-- src/saml2/response.py | 2 +- src/saml2/server.py | 8 +- tests/root_cert/localhost.ca.crt | 15 ++ tests/root_cert/localhost.ca.key | 15 ++ tests/test_50_server.py | 406 ++++++++++++++++++++++++++++++- tests/test_51_client.py | 84 ++++++- 7 files changed, 546 insertions(+), 19 deletions(-) create mode 100644 tests/root_cert/localhost.ca.crt create mode 100644 tests/root_cert/localhost.ca.key diff --git a/src/saml2/entity.py b/src/saml2/entity.py index 15174eb..26c2fe5 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -506,6 +506,13 @@ class Entity(HTTPBase): encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False, encrypt_cert=None, **kwargs): """ Create a Response. + Encryption: + encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is + true, then will the function try to encrypt the assertion in the the advice element of the main + assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists + in the advice element the main assertion will be encrypted instead, since it's no point to encrypt + If encrypted_advice_attributes is + false the main assertion will be encrypted. Since the same key :param in_response_to: The session identifier of the request :param consumer_url: The URL which should receive the response @@ -535,17 +542,15 @@ class Entity(HTTPBase): return signed_instance_factory(response, self.sec, to_sign) if encrypt_assertion: + node_xpath = None if sign: response.signature = pre_signature_part(response.id, self.sec.my_cert, 1) sign_class = [(class_name(response), response.id)] cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary) - xnode_path = None - if encrypted_advice_attributes and encrypt_assertion_self_contained and \ - response.assertion.advice is not None and len(response.assertion.advice.assertion) == 1: + if encrypted_advice_attributes and response.assertion.advice is not None \ + and len(response.assertion.advice.assertion) == 1: tmp_assertion = response.assertion.advice.assertion[0] - advice_tag = response.assertion.advice._to_element_tree().tag - assertion_tag = tmp_assertion._to_element_tree().tag response.assertion.advice.encrypted_assertion = [] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) if isinstance(tmp_assertion, list): @@ -553,26 +558,28 @@ class Entity(HTTPBase): else: response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion) response.assertion.advice.assertion = [] - response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( - assertion_tag, advice_tag) + if encrypt_assertion_self_contained: + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = tmp_assertion._to_element_tree().tag + response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in - ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) elif encrypt_assertion_self_contained: assertion_tag = response.assertion._to_element_tree().tag response = pre_encrypt_assertion(response) response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( assertion_tag) + else: + response = pre_encrypt_assertion(response) + if to_sign: + response = signed_instance_factory(response, self.sec, to_sign) _, cert_file = make_temp("%s" % encrypt_cert, decode=False) response = cbxs.encrypt_assertion(response, cert_file, pre_encryption_part(), node_xpath=node_xpath) # template(response.assertion.id)) if sign: - if to_sign: - signed_instance_factory(response, self.sec, to_sign) - else: - # default is to sign the whole response if anything - return signed_instance_factory(response, self.sec, - sign_class) + return signed_instance_factory(response, self.sec, sign_class) else: return response diff --git a/src/saml2/response.py b/src/saml2/response.py index 6f36819..bddc032 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -646,7 +646,7 @@ class AuthnResponse(StatusResponse): assert len(self.assertion.attribute_statement) == 1 _attr_statem = self.assertion.attribute_statement[0] ava.update(self.read_attribute_statement(_attr_statem)) - if not ava == 1: + if not ava: logger.error("Missing Attribute Statement") return ava diff --git a/src/saml2/server.py b/src/saml2/server.py index 818e394..24cc392 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -373,11 +373,17 @@ class Server(Entity): name_id, policy, _issuer, authn_statement, identity, True, sign_response) + to_sign = [] if sign_assertion is not None and sign_assertion: + if assertion.advice and assertion.advice.assertion: + for tmp_assertion in assertion.advice.assertion: + tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1) + to_sign.append((class_name(tmp_assertion), tmp_assertion.id)) assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1) # Just the assertion or the response and the assertion ? - to_sign = [(class_name(assertion), assertion.id)] + to_sign.append((class_name(assertion), assertion.id)) + # Store which assertion that has been sent to which SP about which # subject. diff --git a/tests/root_cert/localhost.ca.crt b/tests/root_cert/localhost.ca.crt new file mode 100644 index 0000000..c7faff9 --- /dev/null +++ b/tests/root_cert/localhost.ca.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSTCCAbICAQEwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCc2UxCzAJBgNV +BAgTAmFjMQ0wCwYDVQQHEwR1bWVhMRwwGgYDVQQKExNJVFMgVW1lYSBVbml2ZXJz +aXR5MQ0wCwYDVQQLEwRESVJHMRUwEwYDVQQDEwxsb2NhbGhvc3QuY2EwHhcNMTQw +MzE0MDczNDIwWhcNMjQwMzExMDczNDIwWjBtMQswCQYDVQQGEwJzZTELMAkGA1UE +CBMCYWMxDTALBgNVBAcTBHVtZWExHDAaBgNVBAoTE0lUUyBVbWVhIFVuaXZlcnNp +dHkxDTALBgNVBAsTBERJUkcxFTATBgNVBAMTDGxvY2FsaG9zdC5jYTCBnzANBgkq +hkiG9w0BAQEFAAOBjQAwgYkCgYEAzJseOYg+hKGsnGWilv9FrfH2csQ9UZGwgwHz +zR2IquQg/+nONxd4MIwjOnQrLOJuhpu55ZTSpeT901GNDLj4xPQnFrWWyET8NxZg +w7Ilra55iQNaoUWpdi0JQSXI/9CY8t+Y170+7DfBJ6zo4y6+HKaOLNxZy4IbB0SE +aZBGsrUCAwEAATANBgkqhkiG9w0BAQsFAAOBgQAEOQJkD+5fb2mTtxwaZeRyQN9c +If0Xd2E7Z6BstGCUMWa/q4FrNee324kINVsFYg0GBTBwfyYPwR7I70LGjS3gXEzd +RjGx/Z8yvHJyavJj8iRCOflQ0fUlwasFYtrJesPOfM+aJ05Jb8GelUxpKh6BtPlE +IU0FLm//i9ucLQ9zBg== +-----END CERTIFICATE----- diff --git a/tests/root_cert/localhost.ca.key b/tests/root_cert/localhost.ca.key new file mode 100644 index 0000000..a588e7a --- /dev/null +++ b/tests/root_cert/localhost.ca.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDMmx45iD6EoaycZaKW/0Wt8fZyxD1RkbCDAfPNHYiq5CD/6c43 +F3gwjCM6dCss4m6Gm7nllNKl5P3TUY0MuPjE9CcWtZbIRPw3FmDDsiWtrnmJA1qh +Ral2LQlBJcj/0Jjy35jXvT7sN8EnrOjjLr4cpo4s3FnLghsHRIRpkEaytQIDAQAB +AoGAMw6aUjz/bNVzX2u1UPzOhIOWvjjeHFbAt1BraEnwasSWv4W2oeTHZ0XxHIsU +oxS2A/0kPHgQwLkN5ge5rO0TlpAI5X9ZqlJ0SXF5zjJOBtyK6TWoUbwnyzS7lbFC +q9AVrHwMX9uNCboccqzjrzHyNE+4/QT7z2G5AMzjfq+5EwECQQDvOJuUl2pbUUIK +nMCmwkARFEZzZYV2oIBDsagTG8gX7glj5stoYXuez8EnYtNHRDBConyKqruuzqJk +qSKlha7hAkEA2vT4CpAzHSCknwQKXmFwBD5hVBrv+JZSur6XpqEdwXkX2osScAaW +xj3vQEQorJC2CryvUVOTeuFoog0f+6HiVQJBAMs0dMQ2ErxbPBQzr1p4K1/Wrzmb +BVINaKcYJEOHF+Nr6kIYbLTQCeiPZe4E/p/NBomz6MMJ4L/O+xcyrSGZe0ECQQCZ +ejELpnxNpH4AAKML+Ry9vMQYYjFnfGdNAx/l6vWikjEIPYeVAulY2D0GPUCNhXo1 +GIGDbiPodGwVe0G57oVpAkBjckA1LEE1Kzkq5sR9U9t9m+3WSBvMZxLUwJYdVmOY +Y6xKVtJfe9XuQt9tzoWQW8iieWyKOw7yLYCBcjns2iZn +-----END RSA PRIVATE KEY----- diff --git a/tests/test_50_server.py b/tests/test_50_server.py index af2290b..50479ee 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -1,9 +1,13 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import base64 +import os from contextlib import closing from urlparse import parse_qs -from saml2.sigver import pre_encryption_part +import uuid + +from saml2.cert import OpenSSLWrapper +from saml2.sigver import pre_encryption_part, make_temp from saml2.assertion import Policy from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT @@ -41,6 +45,40 @@ def _eq(l1, l2): return set(l1) == set(l2) +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def get_ava(assertion): + ava = {} + for statement in assertion.attribute_statement: + for attr in statement.attribute: + value = [] + for tmp_val in attr.attribute_value: + value.append(tmp_val.text) + key = attr.friendly_name + if key is None or len(key) == 0: + key = attr.text + ava[key] = value + return ava + + +def generate_cert(): + sn = uuid.uuid4().urn + cert_info = { + "cn": "localhost", + "country_code": "se", + "state": "ac", + "city": "Umea", + "organization": "ITS", + "organization_unit": "DIRG" + } + osw = OpenSSLWrapper() + ca_cert_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.crt") + ca_key_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.key") + req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + return cert_str, req_key_str + class TestServer1(): def setup_class(self): self.server = Server("idp_conf") @@ -372,6 +410,371 @@ class TestServer1(): # value. Just that there should be one assert assertion.signature.signature_value.text != "" + + def test_encrypted_signed_response_1(self): + name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + ava = {"givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} + + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=True, + encrypt_cert=cert_str, + ) + + sresponse = response_from_string(signed_resp) + + #'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest' + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + _, key_file = make_temp("%s" % cert_key_str, decode=False) + + decr_text = self.server.sec.decrypt(signed_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + #Do not work since the response is changed after the signature is created. + valid = self.server.sec.verify_signature(decr_text, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=resp.assertion[0].id, + id_attr="") + assert valid + + assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements, + [saml, samlp]) + assert assertion + assert assertion[0].attribute_statement + + ava = ava = get_ava(assertion[0]) + + assert ava ==\ + {'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']} + + #Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature + #is no longer working. :( + + assert 'EncryptedAssertion>