From 2949fba0d23a265a07cccdcefbbf9f3abea95c02 Mon Sep 17 00:00:00 2001 From: Hans Date: Mon, 18 May 2015 21:26:39 +0200 Subject: [PATCH] Partial commit. Improved signing and added more testcases. --- src/saml2/entity.py | 12 +- src/saml2/server.py | 72 ++++++--- tests/test_50_server.py | 338 ++++++++++++++++++++++++++++++---------- 3 files changed, 315 insertions(+), 107 deletions(-) diff --git a/src/saml2/entity.py b/src/saml2/entity.py index c563789..9a92e84 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -512,8 +512,14 @@ class Entity(HTTPBase): exception = None for _cert in _certs: try: + begin_cert = "-----BEGIN CERTIFICATE-----\n" + end_cert = "\n-----END CERTIFICATE-----\n" + if begin_cert not in _cert: + _cert = "%s%s" % (begin_cert, _cert) + if end_cert not in _cert: + _cert = "%s%s" % (_cert, end_cert) _, cert_file = make_temp(_cert, decode=False) - response = cbxs.encrypt_assertion(response, self.sec.cert_file, + response = cbxs.encrypt_assertion(response, cert_file, pre_encryption_part(), node_xpath=node_xpath) return response except Exception as ex: @@ -525,7 +531,7 @@ class Entity(HTTPBase): def _response(self, in_response_to, consumer_url=None, status=None, issuer=None, sign=False, to_sign=None, sp_entity_id=None, encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False, - encrypt_cert=None, encrypt_cert_assertion=None,sign_assertion=None, **kwargs): + encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, **kwargs): """ Create a Response. Encryption: encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is @@ -598,7 +604,7 @@ class Entity(HTTPBase): if to_sign_advice: response = signed_instance_factory(response, self.sec, to_sign_advice) - response = self._encrypt_assertion(encrypt_cert, sp_entity_id, response, node_xpath=node_xpath) + response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath) if encrypt_assertion: response = response_from_string(response) if encrypt_assertion: diff --git a/src/saml2/server.py b/src/saml2/server.py index 934fa6c..e33d012 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -323,7 +323,7 @@ class Server(Entity): status=None, authn=None, issuer=None, policy=None, sign_assertion=False, sign_response=False, best_effort=False, encrypt_assertion=False, - encrypt_cert=None, authn_statement=None, + encrypt_cert_advice=None, encrypt_cert_assertion=None, authn_statement=None, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False): """ Create a response. A layer of indirection. @@ -375,16 +375,17 @@ class Server(Entity): sign_response) to_sign = [] - #if sign_assertion is not None and sign_assertion: - # if assertion.advice and assertion.advice.assertion: - # for tmp_assertion in assertion.advice.assertion: - # tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1) - # to_sign.append((class_name(tmp_assertion), tmp_assertion.id)) - # assertion.signature = pre_signature_part(assertion.id, - # self.sec.my_cert, 1) - # Just the assertion or the response and the assertion ? - # to_sign.append((class_name(assertion), assertion.id)) + if not encrypt_assertion: + if sign_assertion: + assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1) + to_sign.append((class_name(assertion), assertion.id)) + if not encrypted_advice_attributes: + if sign_assertion: + if assertion.advice and assertion.advice.assertion: + for tmp_assertion in assertion.advice.assertion: + tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1) + to_sign.append((class_name(tmp_assertion), tmp_assertion.id)) # Store which assertion that has been sent to which SP about which # subject. @@ -400,7 +401,8 @@ class Server(Entity): return self._response(in_response_to, consumer_url, status, issuer, sign_response, to_sign,sp_entity_id=sp_entity_id, encrypt_assertion=encrypt_assertion, - encrypt_cert=encrypt_cert, + encrypt_cert_advice=encrypt_cert_advice, + encrypt_cert_assertion=encrypt_cert_assertion, encrypt_assertion_self_contained=encrypt_assertion_self_contained, encrypted_advice_attributes=encrypted_advice_attributes,sign_assertion=sign_assertion, **args) @@ -477,8 +479,8 @@ class Server(Entity): sp_entity_id, name_id_policy=None, userid=None, name_id=None, authn=None, issuer=None, sign_response=None, sign_assertion=None, - encrypt_cert=None, encrypt_assertion=None, - encrypt_assertion_self_contained=False, + encrypt_cert_advice=None, encrypt_cert_assertion=None, encrypt_assertion=None, + encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, **kwargs): """ Constructs an AuthenticationResponse @@ -523,17 +525,35 @@ class Server(Entity): if encrypt_assertion is None: encrypt_assertion = False + + if encrypt_assertion_self_contained is None: + encrypt_assertion_self_contained = self.config.getattr("encrypt_assertion_self_contained", "idp") + if encrypt_assertion_self_contained is None: + encrypt_assertion_self_contained = True + + if encrypted_advice_attributes is None: + encrypted_advice_attributes = self.config.getattr("encrypted_advice_attributes", "idp") + if encrypted_advice_attributes is None: + encrypted_advice_attributes = False + + if encrypted_advice_attributes: + verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_advice", "idp") + if verify_encrypt_cert is not None: + if encrypt_cert_advice is None: + raise CertificateError("No SPCertEncType certificate for encryption contained in authentication " + "request.") + if not verify_encrypt_cert(encrypt_cert_advice): + raise CertificateError("Invalid certificate for encryption!") + + if encrypt_assertion: - if encrypt_cert is not None: - verify_encrypt_cert = self.config.getattr("verify_encrypt_cert", "idp") - if verify_encrypt_cert is not None: - if not verify_encrypt_cert(encrypt_cert): - raise CertificateError("Invalid certificate for encryption!") - else: - raise CertificateError("No SPCertEncType certificate for encryption contained in authentication " - "request.") - else: - encrypt_assertion = False + verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_assertion", "idp") + if verify_encrypt_cert is not None: + if encrypt_cert_assertion is None: + raise CertificateError("No SPCertEncType certificate for encryption contained in authentication " + "request.") + if not verify_encrypt_cert(encrypt_cert_assertion): + raise CertificateError("Invalid certificate for encryption!") if not name_id: try: @@ -593,7 +613,8 @@ class Server(Entity): encrypt_assertion=encrypt_assertion, encrypt_assertion_self_contained=encrypt_assertion_self_contained, encrypted_advice_attributes=encrypted_advice_attributes, - encrypt_cert=encrypt_cert) + encrypt_cert_advice=encrypt_cert_advice, + encrypt_cert_assertion=encrypt_cert_assertion) return self._authn_response(in_response_to, # in_response_to destination, # consumer_url sp_entity_id, # sp_entity_id @@ -608,7 +629,8 @@ class Server(Entity): encrypt_assertion=encrypt_assertion, encrypt_assertion_self_contained=encrypt_assertion_self_contained, encrypted_advice_attributes=encrypted_advice_attributes, - encrypt_cert=encrypt_cert) + encrypt_cert_advice=encrypt_cert_advice, + encrypt_cert_assertion=encrypt_cert_assertion) except MissingValue as exc: return self.create_error_response(in_response_to, destination, diff --git a/tests/test_50_server.py b/tests/test_50_server.py index ba19e9b..f294cdb 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -90,10 +90,41 @@ class TestServer1(): conf = config.SPConfig() conf.load_file("server_conf") self.client = client.Saml2Client(conf) + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + self.ava = {"givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} def teardown_class(self): self.server.close() + def verify_assertion(self, assertion): + assert assertion + assert assertion[0].attribute_statement + + ava = ava = get_ava(assertion[0]) + + assert ava ==\ + {'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']} + + + def verify_encrypted_assertion(self, assertion, decr_text): + self.verify_assertion(assertion) + assert assertion[0].signature is None + + assert 'EncryptedAssertion>