Added tests for encryption and signing of the authentication response.
Added tests to decrypt authentication responses in the client.
This commit is contained in:
parent
c4c5c224d4
commit
73dfdc9603
|
@ -506,6 +506,13 @@ class Entity(HTTPBase):
|
|||
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
|
||||
encrypt_cert=None, **kwargs):
|
||||
""" Create a Response.
|
||||
Encryption:
|
||||
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
|
||||
true, then will the function try to encrypt the assertion in the the advice element of the main
|
||||
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
|
||||
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
|
||||
If encrypted_advice_attributes is
|
||||
false the main assertion will be encrypted. Since the same key
|
||||
|
||||
:param in_response_to: The session identifier of the request
|
||||
:param consumer_url: The URL which should receive the response
|
||||
|
@ -535,17 +542,15 @@ class Entity(HTTPBase):
|
|||
return signed_instance_factory(response, self.sec, to_sign)
|
||||
|
||||
if encrypt_assertion:
|
||||
node_xpath = None
|
||||
if sign:
|
||||
response.signature = pre_signature_part(response.id,
|
||||
self.sec.my_cert, 1)
|
||||
sign_class = [(class_name(response), response.id)]
|
||||
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
|
||||
xnode_path = None
|
||||
if encrypted_advice_attributes and encrypt_assertion_self_contained and \
|
||||
response.assertion.advice is not None and len(response.assertion.advice.assertion) == 1:
|
||||
if encrypted_advice_attributes and response.assertion.advice is not None \
|
||||
and len(response.assertion.advice.assertion) == 1:
|
||||
tmp_assertion = response.assertion.advice.assertion[0]
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = tmp_assertion._to_element_tree().tag
|
||||
response.assertion.advice.encrypted_assertion = []
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
if isinstance(tmp_assertion, list):
|
||||
|
@ -553,26 +558,28 @@ class Entity(HTTPBase):
|
|||
else:
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
|
||||
response.assertion.advice.assertion = []
|
||||
response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
if encrypt_assertion_self_contained:
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = tmp_assertion._to_element_tree().tag
|
||||
response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
elif encrypt_assertion_self_contained:
|
||||
assertion_tag = response.assertion._to_element_tree().tag
|
||||
response = pre_encrypt_assertion(response)
|
||||
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
|
||||
assertion_tag)
|
||||
else:
|
||||
response = pre_encrypt_assertion(response)
|
||||
if to_sign:
|
||||
response = signed_instance_factory(response, self.sec, to_sign)
|
||||
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
|
||||
response = cbxs.encrypt_assertion(response, cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
# template(response.assertion.id))
|
||||
if sign:
|
||||
if to_sign:
|
||||
signed_instance_factory(response, self.sec, to_sign)
|
||||
else:
|
||||
# default is to sign the whole response if anything
|
||||
return signed_instance_factory(response, self.sec,
|
||||
sign_class)
|
||||
return signed_instance_factory(response, self.sec, sign_class)
|
||||
else:
|
||||
return response
|
||||
|
||||
|
|
|
@ -646,7 +646,7 @@ class AuthnResponse(StatusResponse):
|
|||
assert len(self.assertion.attribute_statement) == 1
|
||||
_attr_statem = self.assertion.attribute_statement[0]
|
||||
ava.update(self.read_attribute_statement(_attr_statem))
|
||||
if not ava == 1:
|
||||
if not ava:
|
||||
logger.error("Missing Attribute Statement")
|
||||
return ava
|
||||
|
||||
|
|
|
@ -373,11 +373,17 @@ class Server(Entity):
|
|||
name_id, policy, _issuer, authn_statement, identity, True,
|
||||
sign_response)
|
||||
|
||||
to_sign = []
|
||||
if sign_assertion is not None and sign_assertion:
|
||||
if assertion.advice and assertion.advice.assertion:
|
||||
for tmp_assertion in assertion.advice.assertion:
|
||||
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
||||
to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
|
||||
assertion.signature = pre_signature_part(assertion.id,
|
||||
self.sec.my_cert, 1)
|
||||
# Just the assertion or the response and the assertion ?
|
||||
to_sign = [(class_name(assertion), assertion.id)]
|
||||
to_sign.append((class_name(assertion), assertion.id))
|
||||
|
||||
|
||||
# Store which assertion that has been sent to which SP about which
|
||||
# subject.
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICSTCCAbICAQEwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCc2UxCzAJBgNV
|
||||
BAgTAmFjMQ0wCwYDVQQHEwR1bWVhMRwwGgYDVQQKExNJVFMgVW1lYSBVbml2ZXJz
|
||||
aXR5MQ0wCwYDVQQLEwRESVJHMRUwEwYDVQQDEwxsb2NhbGhvc3QuY2EwHhcNMTQw
|
||||
MzE0MDczNDIwWhcNMjQwMzExMDczNDIwWjBtMQswCQYDVQQGEwJzZTELMAkGA1UE
|
||||
CBMCYWMxDTALBgNVBAcTBHVtZWExHDAaBgNVBAoTE0lUUyBVbWVhIFVuaXZlcnNp
|
||||
dHkxDTALBgNVBAsTBERJUkcxFTATBgNVBAMTDGxvY2FsaG9zdC5jYTCBnzANBgkq
|
||||
hkiG9w0BAQEFAAOBjQAwgYkCgYEAzJseOYg+hKGsnGWilv9FrfH2csQ9UZGwgwHz
|
||||
zR2IquQg/+nONxd4MIwjOnQrLOJuhpu55ZTSpeT901GNDLj4xPQnFrWWyET8NxZg
|
||||
w7Ilra55iQNaoUWpdi0JQSXI/9CY8t+Y170+7DfBJ6zo4y6+HKaOLNxZy4IbB0SE
|
||||
aZBGsrUCAwEAATANBgkqhkiG9w0BAQsFAAOBgQAEOQJkD+5fb2mTtxwaZeRyQN9c
|
||||
If0Xd2E7Z6BstGCUMWa/q4FrNee324kINVsFYg0GBTBwfyYPwR7I70LGjS3gXEzd
|
||||
RjGx/Z8yvHJyavJj8iRCOflQ0fUlwasFYtrJesPOfM+aJ05Jb8GelUxpKh6BtPlE
|
||||
IU0FLm//i9ucLQ9zBg==
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,15 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXQIBAAKBgQDMmx45iD6EoaycZaKW/0Wt8fZyxD1RkbCDAfPNHYiq5CD/6c43
|
||||
F3gwjCM6dCss4m6Gm7nllNKl5P3TUY0MuPjE9CcWtZbIRPw3FmDDsiWtrnmJA1qh
|
||||
Ral2LQlBJcj/0Jjy35jXvT7sN8EnrOjjLr4cpo4s3FnLghsHRIRpkEaytQIDAQAB
|
||||
AoGAMw6aUjz/bNVzX2u1UPzOhIOWvjjeHFbAt1BraEnwasSWv4W2oeTHZ0XxHIsU
|
||||
oxS2A/0kPHgQwLkN5ge5rO0TlpAI5X9ZqlJ0SXF5zjJOBtyK6TWoUbwnyzS7lbFC
|
||||
q9AVrHwMX9uNCboccqzjrzHyNE+4/QT7z2G5AMzjfq+5EwECQQDvOJuUl2pbUUIK
|
||||
nMCmwkARFEZzZYV2oIBDsagTG8gX7glj5stoYXuez8EnYtNHRDBConyKqruuzqJk
|
||||
qSKlha7hAkEA2vT4CpAzHSCknwQKXmFwBD5hVBrv+JZSur6XpqEdwXkX2osScAaW
|
||||
xj3vQEQorJC2CryvUVOTeuFoog0f+6HiVQJBAMs0dMQ2ErxbPBQzr1p4K1/Wrzmb
|
||||
BVINaKcYJEOHF+Nr6kIYbLTQCeiPZe4E/p/NBomz6MMJ4L/O+xcyrSGZe0ECQQCZ
|
||||
ejELpnxNpH4AAKML+Ry9vMQYYjFnfGdNAx/l6vWikjEIPYeVAulY2D0GPUCNhXo1
|
||||
GIGDbiPodGwVe0G57oVpAkBjckA1LEE1Kzkq5sR9U9t9m+3WSBvMZxLUwJYdVmOY
|
||||
Y6xKVtJfe9XuQt9tzoWQW8iieWyKOw7yLYCBcjns2iZn
|
||||
-----END RSA PRIVATE KEY-----
|
|
@ -1,9 +1,13 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import os
|
||||
from contextlib import closing
|
||||
from urlparse import parse_qs
|
||||
from saml2.sigver import pre_encryption_part
|
||||
import uuid
|
||||
|
||||
from saml2.cert import OpenSSLWrapper
|
||||
from saml2.sigver import pre_encryption_part, make_temp
|
||||
from saml2.assertion import Policy
|
||||
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
|
||||
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
|
||||
|
@ -41,6 +45,40 @@ def _eq(l1, l2):
|
|||
return set(l1) == set(l2)
|
||||
|
||||
|
||||
BASEDIR = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
def get_ava(assertion):
|
||||
ava = {}
|
||||
for statement in assertion.attribute_statement:
|
||||
for attr in statement.attribute:
|
||||
value = []
|
||||
for tmp_val in attr.attribute_value:
|
||||
value.append(tmp_val.text)
|
||||
key = attr.friendly_name
|
||||
if key is None or len(key) == 0:
|
||||
key = attr.text
|
||||
ava[key] = value
|
||||
return ava
|
||||
|
||||
|
||||
def generate_cert():
|
||||
sn = uuid.uuid4().urn
|
||||
cert_info = {
|
||||
"cn": "localhost",
|
||||
"country_code": "se",
|
||||
"state": "ac",
|
||||
"city": "Umea",
|
||||
"organization": "ITS",
|
||||
"organization_unit": "DIRG"
|
||||
}
|
||||
osw = OpenSSLWrapper()
|
||||
ca_cert_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.crt")
|
||||
ca_key_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.key")
|
||||
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048)
|
||||
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
|
||||
return cert_str, req_key_str
|
||||
|
||||
class TestServer1():
|
||||
def setup_class(self):
|
||||
self.server = Server("idp_conf")
|
||||
|
@ -372,6 +410,371 @@ class TestServer1():
|
|||
# value. Just that there should be one
|
||||
assert assertion.signature.signature_value.text != ""
|
||||
|
||||
|
||||
def test_encrypted_signed_response_1(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypted_advice_attributes=True,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
valid = self.server.sec.verify_signature(signed_resp,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||
node_id=sresponse.id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
#Do not work since the response is changed after the signature is created.
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=resp.assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
||||
[saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
#Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature
|
||||
#is no longer working. :(
|
||||
|
||||
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
|
||||
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
|
||||
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
||||
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
def test_encrypted_signed_response_2(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
valid = self.server.sec.verify_signature(signed_resp,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||
node_id=sresponse.id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
assert resp.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
|
||||
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
|
||||
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
||||
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
def test_encrypted_signed_response_3(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
valid = self.server.sec.verify_signature(signed_resp,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||
node_id=sresponse.id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
assert resp.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
assert 'xmlns:encas' not in decr_text
|
||||
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
def test_encrypted_signed_response_4(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypted_advice_attributes=True,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
valid = self.server.sec.verify_signature(signed_resp,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||
node_id=sresponse.id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
#Do not work since the response is changed after the signature is created.
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=resp.assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
||||
[saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
#Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature
|
||||
#is no longer working. :(
|
||||
|
||||
assert 'xmlns:encas' not in decr_text
|
||||
|
||||
valid = self.server.sec.verify_signature(decr_text,
|
||||
self.server.config.cert_file,
|
||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||
node_id=assertion[0].id,
|
||||
id_attr="")
|
||||
assert valid
|
||||
|
||||
def test_encrypted_response_1(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=False,
|
||||
sign_assertion=False,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypted_advice_attributes=True,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
assert sresponse.signature is None
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
assert resp.assertion[0].signature is None
|
||||
|
||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
||||
[saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
|
||||
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
||||
|
||||
assert assertion[0].signature is None
|
||||
|
||||
def test_encrypted_response_2(self):
|
||||
name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
signed_resp = self.server.create_authn_response(
|
||||
ava,
|
||||
"id12", # in_response_to
|
||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||
name_id=name_id,
|
||||
sign_response=False,
|
||||
sign_assertion=False,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypted_advice_attributes=False,
|
||||
encrypt_cert=cert_str,
|
||||
)
|
||||
|
||||
sresponse = response_from_string(signed_resp)
|
||||
|
||||
#'urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest'
|
||||
|
||||
assert sresponse.signature is None
|
||||
|
||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||
|
||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
|
||||
assert resp.encrypted_assertion[0].extension_elements
|
||||
|
||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||
assert assertion
|
||||
assert assertion[0].attribute_statement
|
||||
|
||||
ava = ava = get_ava(assertion[0])
|
||||
|
||||
assert ava ==\
|
||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||
|
||||
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
|
||||
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
||||
|
||||
assert assertion[0].signature is None
|
||||
|
||||
|
||||
def test_slo_http_post(self):
|
||||
soon = time_util.in_a_while(days=1)
|
||||
sinfo = {
|
||||
|
@ -509,3 +912,4 @@ class TestServerLogout():
|
|||
if __name__ == "__main__":
|
||||
ts = TestServer1()
|
||||
ts.setup_class()
|
||||
ts.test_encrypted_response_1()
|
||||
|
|
|
@ -20,7 +20,7 @@ from saml2.authn_context import INTERNETPROTOCOLPASSWORD
|
|||
from saml2.client import Saml2Client
|
||||
from saml2.config import SPConfig
|
||||
from saml2.response import LogoutResponse
|
||||
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion
|
||||
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
|
||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
||||
from saml2.saml import NameID
|
||||
from saml2.server import Server
|
||||
|
@ -499,6 +499,86 @@ class TestClient:
|
|||
assert resp.assertion
|
||||
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
|
||||
|
||||
def test_sign_then_encrypt_assertion_advice(self):
|
||||
# Begin with the IdPs side
|
||||
_sec = self.server.sec
|
||||
|
||||
nameid_policy = samlp.NameIDPolicy(allow_create="false",
|
||||
format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
|
||||
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
|
||||
assertion = asser.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"})
|
||||
a_assertion = a_asser.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_assertion.signature = sigver.pre_signature_part(
|
||||
a_assertion.id, _sec.my_cert, 1)
|
||||
|
||||
assertion.advice = Advice()
|
||||
|
||||
assertion.advice.encrypted_assertion = []
|
||||
assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
|
||||
assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion)
|
||||
|
||||
response = sigver.response_factory(
|
||||
in_response_to="_012345",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
status=s_utils.success_status_factory(),
|
||||
issuer=self.server._issuer()
|
||||
)
|
||||
|
||||
response.assertion.append(assertion)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion),
|
||||
key_file=self.client.sec.key_file,
|
||||
node_id=a_assertion.id)
|
||||
|
||||
#xmldoc = "%s" % response
|
||||
# strangely enough I get different tags if I run this test separately
|
||||
# or as part of a bunch of tests.
|
||||
#xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
|
||||
#seresp = samlp.response_from_string(enctext)
|
||||
|
||||
resp_str = base64.encodestring(enctext)
|
||||
# Now over to the client side
|
||||
resp = self.client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"_012345": "http://foo.example.com/service"})
|
||||
|
||||
#assert resp.encrypted_assertion == []
|
||||
assert resp.assertion
|
||||
assert resp.assertion.advice
|
||||
assert resp.assertion.advice.assertion
|
||||
assert resp.ava == \
|
||||
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['test.testsson@test.se']}
|
||||
|
||||
|
||||
|
||||
def test_signed_redirect(self):
|
||||
|
||||
msg_str = "%s" % self.client.create_authn_request(
|
||||
|
@ -628,4 +708,4 @@ class TestClientWithDummy():
|
|||
if __name__ == "__main__":
|
||||
tc = TestClient()
|
||||
tc.setup_class()
|
||||
tc.test_sign_then_encrypt_assertion2()
|
||||
tc.test_sign_then_encrypt_assertion_advice()
|
Loading…
Reference in New Issue