Partial commit.
Improved signing and added more testcases.
This commit is contained in:
@@ -579,6 +579,12 @@ class Entity(HTTPBase):
|
|||||||
encrypt_advice = False
|
encrypt_advice = False
|
||||||
if encrypted_advice_attributes and response.assertion.advice is not None \
|
if encrypted_advice_attributes and response.assertion.advice is not None \
|
||||||
and len(response.assertion.advice.assertion) == 1:
|
and len(response.assertion.advice.assertion) == 1:
|
||||||
|
to_sign_advice = []
|
||||||
|
if sign_assertion is not None and sign_assertion:
|
||||||
|
if response.assertion.advice and response.assertion.advice.assertion:
|
||||||
|
for tmp_assertion in response.assertion.advice.assertion:
|
||||||
|
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
||||||
|
to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id))
|
||||||
tmp_assertion = response.assertion.advice.assertion[0]
|
tmp_assertion = response.assertion.advice.assertion[0]
|
||||||
response.assertion.advice.encrypted_assertion = []
|
response.assertion.advice.encrypted_assertion = []
|
||||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||||
@@ -587,12 +593,6 @@ class Entity(HTTPBase):
|
|||||||
else:
|
else:
|
||||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
|
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
|
||||||
response.assertion.advice.assertion = []
|
response.assertion.advice.assertion = []
|
||||||
to_sign_advice = []
|
|
||||||
if sign_assertion is not None and sign_assertion:
|
|
||||||
if response.assertion.advice and response.assertion.advice.assertion:
|
|
||||||
for tmp_assertion in response.assertion.advice.assertion:
|
|
||||||
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
|
||||||
to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id))
|
|
||||||
if encrypt_assertion_self_contained:
|
if encrypt_assertion_self_contained:
|
||||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||||
assertion_tag = tmp_assertion._to_element_tree().tag
|
assertion_tag = tmp_assertion._to_element_tree().tag
|
||||||
@@ -608,6 +608,14 @@ class Entity(HTTPBase):
|
|||||||
if encrypt_assertion:
|
if encrypt_assertion:
|
||||||
response = response_from_string(response)
|
response = response_from_string(response)
|
||||||
if encrypt_assertion:
|
if encrypt_assertion:
|
||||||
|
to_sign_assertion = []
|
||||||
|
if sign_assertion is not None and sign_assertion:
|
||||||
|
_assertions = response.assertion
|
||||||
|
if not isinstance(response.assertion, list):
|
||||||
|
_assertions = [response.assertion]
|
||||||
|
for _assertion in _assertions:
|
||||||
|
_assertion.signature = pre_signature_part(_assertion.id, self.sec.my_cert, 1)
|
||||||
|
to_sign_assertion.append((class_name(_assertion), _assertion.id))
|
||||||
if encrypt_assertion_self_contained:
|
if encrypt_assertion_self_contained:
|
||||||
try:
|
try:
|
||||||
assertion_tag = response.assertion._to_element_tree().tag
|
assertion_tag = response.assertion._to_element_tree().tag
|
||||||
@@ -618,13 +626,12 @@ class Entity(HTTPBase):
|
|||||||
assertion_tag)
|
assertion_tag)
|
||||||
else:
|
else:
|
||||||
response = pre_encrypt_assertion(response)
|
response = pre_encrypt_assertion(response)
|
||||||
to_sign_assertion = []
|
|
||||||
if sign_assertion is not None and sign_assertion:
|
|
||||||
response.assertion.signature = pre_signature_part(response.assertion.id, self.sec.my_cert, 1)
|
|
||||||
to_sign_assertion.append((class_name(response.assertion), response.assertion.id))
|
|
||||||
if to_sign_assertion:
|
if to_sign_assertion:
|
||||||
response = signed_instance_factory(response, self.sec, to_sign_assertion)
|
response = signed_instance_factory(response, self.sec, to_sign_assertion)
|
||||||
response = self._encrypt_assertion(encrypt_cert_assertion, sp_entity_id, response)
|
response = self._encrypt_assertion(encrypt_cert_assertion, sp_entity_id, response)
|
||||||
|
else:
|
||||||
|
if to_sign:
|
||||||
|
response = signed_instance_factory(response, self.sec, to_sign)
|
||||||
if sign:
|
if sign:
|
||||||
return signed_instance_factory(response, self.sec, sign_class)
|
return signed_instance_factory(response, self.sec, sign_class)
|
||||||
else:
|
else:
|
||||||
|
@@ -380,12 +380,12 @@ class Server(Entity):
|
|||||||
assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1)
|
assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1)
|
||||||
to_sign.append((class_name(assertion), assertion.id))
|
to_sign.append((class_name(assertion), assertion.id))
|
||||||
|
|
||||||
if not encrypted_advice_attributes:
|
#if not encrypted_advice_attributes:
|
||||||
if sign_assertion:
|
# if sign_assertion:
|
||||||
if assertion.advice and assertion.advice.assertion:
|
# if assertion.advice and assertion.advice.assertion:
|
||||||
for tmp_assertion in assertion.advice.assertion:
|
# for tmp_assertion in assertion.advice.assertion:
|
||||||
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
# tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
||||||
to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
|
# to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
|
||||||
|
|
||||||
# Store which assertion that has been sent to which SP about which
|
# Store which assertion that has been sent to which SP about which
|
||||||
# subject.
|
# subject.
|
||||||
|
@@ -498,8 +498,32 @@ class TestServer1():
|
|||||||
|
|
||||||
assert sresponse.assertion[0].signature == None
|
assert sresponse.assertion[0].signature == None
|
||||||
|
|
||||||
|
def test_signed_response_3(self):
|
||||||
|
|
||||||
|
|
||||||
|
signed_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(signed_resp)
|
||||||
|
|
||||||
|
assert sresponse.signature == None
|
||||||
|
|
||||||
|
valid = self.server.sec.verify_signature(signed_resp,
|
||||||
|
self.server.config.cert_file,
|
||||||
|
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||||
|
node_id=sresponse.assertion[0].id,
|
||||||
|
id_attr="")
|
||||||
|
assert valid
|
||||||
|
|
||||||
|
self.verify_assertion(sresponse.assertion)
|
||||||
|
|
||||||
def test_encrypted_signed_response_1(self):
|
def test_encrypted_signed_response_1(self):
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
@@ -512,10 +536,10 @@ class TestServer1():
|
|||||||
name_id=self.name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=False,
|
||||||
encrypt_assertion_self_contained=True,
|
encrypt_assertion_self_contained=True,
|
||||||
encrypted_advice_attributes=True,
|
encrypted_advice_attributes=True,
|
||||||
encrypt_cert=cert_str,
|
encrypt_cert_advice=cert_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
sresponse = response_from_string(signed_resp)
|
sresponse = response_from_string(signed_resp)
|
||||||
@@ -527,34 +551,26 @@ class TestServer1():
|
|||||||
id_attr="")
|
id_attr="")
|
||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
|
valid = self.server.sec.verify_signature(signed_resp,
|
||||||
|
self.server.config.cert_file,
|
||||||
|
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||||
|
node_id=sresponse.assertion[0].id,
|
||||||
|
id_attr="")
|
||||||
|
|
||||||
|
assert valid
|
||||||
|
|
||||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||||
|
|
||||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
|
||||||
self.server.config.cert_file,
|
|
||||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
|
||||||
node_id=resp.assertion[0].id,
|
|
||||||
id_attr="")
|
|
||||||
assert valid
|
|
||||||
|
|
||||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
||||||
[saml, samlp])
|
[saml, samlp])
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = ava = get_ava(assertion[0])
|
self.verify_assertion(assertion)
|
||||||
|
|
||||||
assert ava ==\
|
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
|
||||||
|
|
||||||
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
|
|
||||||
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
|
|
||||||
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
valid = self.server.sec.verify_signature(decr_text,
|
||||||
self.server.config.cert_file,
|
self.server.config.cert_file,
|
||||||
@@ -573,7 +589,7 @@ class TestServer1():
|
|||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=self.name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=False,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
encrypt_assertion_self_contained=True,
|
encrypt_assertion_self_contained=True,
|
||||||
encrypt_cert=cert_str,
|
encrypt_cert=cert_str,
|
||||||
@@ -588,33 +604,16 @@ class TestServer1():
|
|||||||
id_attr="")
|
id_attr="")
|
||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
decr_text = self.server.sec.decrypt(signed_resp, self.client.config.key_file)
|
||||||
|
|
||||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
|
||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
assert resp.encrypted_assertion[0].extension_elements
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
assert resp.assertion[0].signature == None
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = get_ava(assertion[0])
|
self.verify_assertion(resp.assertion)
|
||||||
|
|
||||||
assert ava ==\
|
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
|
||||||
|
|
||||||
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
|
|
||||||
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
|
|
||||||
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
|
||||||
self.server.config.cert_file,
|
|
||||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
|
||||||
node_id=assertion[0].id,
|
|
||||||
id_attr="")
|
|
||||||
assert valid
|
|
||||||
|
|
||||||
def test_encrypted_signed_response_3(self):
|
def test_encrypted_signed_response_3(self):
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
@@ -628,7 +627,8 @@ class TestServer1():
|
|||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
encrypt_cert=cert_str,
|
encrypt_assertion_self_contained=False,
|
||||||
|
encrypt_cert_assertion=cert_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
sresponse = response_from_string(signed_resp)
|
sresponse = response_from_string(signed_resp)
|
||||||
@@ -646,27 +646,24 @@ class TestServer1():
|
|||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
assert resp.encrypted_assertion[0].extension_elements
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = get_ava(assertion[0])
|
|
||||||
|
|
||||||
assert ava ==\
|
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
|
||||||
|
|
||||||
assert 'xmlns:encas' not in decr_text
|
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
valid = self.server.sec.verify_signature(decr_text,
|
||||||
self.server.config.cert_file,
|
self.server.config.cert_file,
|
||||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||||
node_id=assertion[0].id,
|
node_id=resp.assertion[0].id,
|
||||||
id_attr="")
|
id_attr="")
|
||||||
|
|
||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
|
self.verify_assertion(resp.assertion)
|
||||||
|
|
||||||
|
assert 'xmlns:encas' not in decr_text
|
||||||
|
|
||||||
|
|
||||||
def test_encrypted_signed_response_4(self):
|
def test_encrypted_signed_response_4(self):
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
signed_resp = self.server.create_authn_response(
|
||||||
@@ -678,8 +675,9 @@ class TestServer1():
|
|||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
encrypted_advice_attributes=True,
|
encrypted_advice_attributes=True,
|
||||||
encrypt_cert=cert_str,
|
encrypt_cert_advice=cert_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
sresponse = response_from_string(signed_resp)
|
sresponse = response_from_string(signed_resp)
|
||||||
@@ -691,35 +689,30 @@ class TestServer1():
|
|||||||
id_attr="")
|
id_attr="")
|
||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
decr_text = self.server.sec.decrypt(signed_resp, self.client.config.key_file)
|
||||||
|
|
||||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
|
||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
valid = self.server.sec.verify_signature(decr_text,
|
||||||
self.server.config.cert_file,
|
self.server.config.cert_file,
|
||||||
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||||
node_id=resp.assertion[0].id,
|
node_id=resp.assertion[0].id,
|
||||||
id_attr="")
|
id_attr="")
|
||||||
|
|
||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
decr_text = self.server.sec.decrypt(decr_text, key_file)
|
||||||
[saml, samlp])
|
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = ava = get_ava(assertion[0])
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
assert ava ==\
|
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
assertion = \
|
||||||
|
extension_elements_to_elements(assertion[0].advice.encrypted_assertion[0].extension_elements,[saml, samlp])
|
||||||
#Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature
|
self.verify_assertion(assertion)
|
||||||
#is no longer working. :(
|
|
||||||
|
|
||||||
assert 'xmlns:encas' not in decr_text
|
|
||||||
|
|
||||||
valid = self.server.sec.verify_signature(decr_text,
|
valid = self.server.sec.verify_signature(decr_text,
|
||||||
self.server.config.cert_file,
|
self.server.config.cert_file,
|
||||||
@@ -1079,4 +1072,4 @@ class TestServerLogout():
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ts = TestServer1()
|
ts = TestServer1()
|
||||||
ts.setup_class()
|
ts.setup_class()
|
||||||
ts.test_signed_response_1()
|
ts.test_encrypted_signed_response_4()
|
||||||
|
Reference in New Issue
Block a user