Pysaml can now decrypt multiple encrypted assertions with multiple advice elements with multiple encrypted assertions.
This commit is contained in:
@@ -591,13 +591,14 @@ class SamlBase(ExtensionContainer):
|
||||
|
||||
def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(self, assertion_tag, advice_tag):
|
||||
for tmp_encrypted_assertion in self.assertion.advice.encrypted_assertion:
|
||||
prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree().
|
||||
find(assertion_tag)])
|
||||
|
||||
tree = self._to_element_tree()
|
||||
|
||||
self.set_prefixes(tree.find(assertion_tag).find(advice_tag).find(tmp_encrypted_assertion._to_element_tree()
|
||||
.tag).find(assertion_tag), prefix_map)
|
||||
if tmp_encrypted_assertion.encrypted_data is None:
|
||||
prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree().find(assertion_tag)])
|
||||
tree = self._to_element_tree()
|
||||
encs = tree.find(assertion_tag).find(advice_tag).findall(tmp_encrypted_assertion._to_element_tree().tag)
|
||||
for enc in encs:
|
||||
assertion = enc.find(assertion_tag)
|
||||
if assertion is not None:
|
||||
self.set_prefixes(assertion, prefix_map)
|
||||
|
||||
return ElementTree.tostring(tree, encoding="UTF-8")
|
||||
|
||||
|
@@ -542,7 +542,7 @@ class Base(Entity):
|
||||
# ======== response handling ===========
|
||||
|
||||
def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
|
||||
outstanding_certs=None, decrypt=True, pefim=False):
|
||||
outstanding_certs=None):
|
||||
""" Deal with an AuthnResponse
|
||||
|
||||
:param xmlstr: The reply as a xml string
|
||||
@@ -573,12 +573,11 @@ class Base(Entity):
|
||||
"attribute_converters": self.config.attribute_converters,
|
||||
"allow_unknown_attributes":
|
||||
self.config.allow_unknown_attributes,
|
||||
"decrypt": decrypt
|
||||
}
|
||||
try:
|
||||
resp = self._parse_response(xmlstr, AuthnResponse,
|
||||
"assertion_consumer_service",
|
||||
binding, pefim=pefim, **kwargs)
|
||||
binding, **kwargs)
|
||||
except StatusError as err:
|
||||
logger.error("SAML status error: %s" % err)
|
||||
raise
|
||||
|
@@ -978,7 +978,7 @@ class Entity(HTTPBase):
|
||||
# ------------------------------------------------------------------------
|
||||
|
||||
def _parse_response(self, xmlstr, response_cls, service, binding,
|
||||
outstanding_certs=None, pefim=False, **kwargs):
|
||||
outstanding_certs=None, **kwargs):
|
||||
""" Deal with a Response
|
||||
|
||||
:param xmlstr: The response as a xml string
|
||||
@@ -1040,23 +1040,23 @@ class Entity(HTTPBase):
|
||||
logger.debug("XMLSTR: %s" % xmlstr)
|
||||
|
||||
if response:
|
||||
keys = None
|
||||
if outstanding_certs:
|
||||
try:
|
||||
cert = outstanding_certs[response.in_response_to]
|
||||
except KeyError:
|
||||
key_file = ""
|
||||
keys = None
|
||||
else:
|
||||
_, key_file = make_temp("%s" % cert["key"],
|
||||
decode=False)
|
||||
else:
|
||||
key_file = ""
|
||||
if not isinstance(cert, list):
|
||||
cert = [cert]
|
||||
keys = []
|
||||
for _cert in cert:
|
||||
keys.append(_cert["key"])
|
||||
only_identity_in_encrypted_assertion = False
|
||||
if "only_identity_in_encrypted_assertion" in kwargs:
|
||||
only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"]
|
||||
decrypt = True
|
||||
if "decrypt" in kwargs:
|
||||
decrypt = kwargs["decrypt"]
|
||||
response = response.verify(key_file, decrypt=decrypt, pefim=pefim)
|
||||
|
||||
response = response.verify(keys)
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
@@ -395,7 +395,7 @@ class StatusResponse(object):
|
||||
def loads(self, xmldata, decode=True, origxml=None):
|
||||
return self._loads(xmldata, decode, origxml)
|
||||
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
def verify(self, keys=None):
|
||||
try:
|
||||
return self._verify()
|
||||
except AssertionError:
|
||||
@@ -636,18 +636,19 @@ class AuthnResponse(StatusResponse):
|
||||
|
||||
"""
|
||||
ava = {}
|
||||
if self.assertion.advice:
|
||||
if self.assertion.advice.assertion:
|
||||
for tmp_assertion in self.assertion.advice.assertion:
|
||||
if tmp_assertion.attribute_statement:
|
||||
assert len(tmp_assertion.attribute_statement) == 1
|
||||
ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0]))
|
||||
if self.assertion.attribute_statement:
|
||||
assert len(self.assertion.attribute_statement) == 1
|
||||
_attr_statem = self.assertion.attribute_statement[0]
|
||||
ava.update(self.read_attribute_statement(_attr_statem))
|
||||
if not ava:
|
||||
logger.error("Missing Attribute Statement")
|
||||
for _assertion in self.assertions:
|
||||
if _assertion.advice:
|
||||
if _assertion.advice.assertion:
|
||||
for tmp_assertion in _assertion.advice.assertion:
|
||||
if tmp_assertion.attribute_statement:
|
||||
assert len(tmp_assertion.attribute_statement) == 1
|
||||
ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0]))
|
||||
if _assertion.attribute_statement:
|
||||
assert len(_assertion.attribute_statement) == 1
|
||||
_attr_statem = _assertion.attribute_statement[0]
|
||||
ava.update(self.read_attribute_statement(_attr_statem))
|
||||
if not ava:
|
||||
logger.error("Missing Attribute Statement")
|
||||
return ava
|
||||
|
||||
def _bearer_confirmed(self, data):
|
||||
@@ -796,14 +797,14 @@ class AuthnResponse(StatusResponse):
|
||||
logger.exception("get subject")
|
||||
raise
|
||||
|
||||
def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None):
|
||||
def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None, verified=False):
|
||||
res = []
|
||||
for encrypted_assertion in encrypted_assertions:
|
||||
if encrypted_assertion.extension_elements:
|
||||
assertions = extension_elements_to_elements(
|
||||
encrypted_assertion.extension_elements, [saml, samlp])
|
||||
for assertion in assertions:
|
||||
if assertion.signature:
|
||||
if assertion.signature and not verified:
|
||||
if not self.sec.check_signature(
|
||||
assertion, origdoc=decr_txt,
|
||||
node_name=class_name(assertion), issuer=issuer):
|
||||
@@ -812,7 +813,35 @@ class AuthnResponse(StatusResponse):
|
||||
res.append(assertion)
|
||||
return res
|
||||
|
||||
def parse_assertion(self, key_file="", decrypt=True, pefim=False):
|
||||
def find_encrypt_data_assertion(self, enc_assertions):
|
||||
for _assertion in enc_assertions:
|
||||
if _assertion.encrypted_data is not None:
|
||||
return True
|
||||
|
||||
def find_encrypt_data_assertion_list(self, _assertions):
|
||||
for _assertion in _assertions:
|
||||
if _assertion.advice:
|
||||
if _assertion.advice.encrypted_assertion:
|
||||
res = self.find_encrypt_data_assertion(_assertion.advice.encrypted_assertion)
|
||||
if res:
|
||||
return True
|
||||
|
||||
def find_encrypt_data(self, resp):
|
||||
_has_encrypt_data = False
|
||||
if resp.encrypted_assertion:
|
||||
res = self.find_encrypt_data_assertion(resp.encrypted_assertion)
|
||||
if res:
|
||||
return True
|
||||
if resp.assertion:
|
||||
for tmp_assertion in resp.assertion:
|
||||
if tmp_assertion.advice:
|
||||
if tmp_assertion.advice.encrypted_assertion:
|
||||
res = self.find_encrypt_data_assertion(tmp_assertion.advice.encrypted_assertion)
|
||||
if res:
|
||||
return True
|
||||
return False
|
||||
|
||||
def parse_assertion(self, keys=None):
|
||||
if self.context == "AuthnQuery":
|
||||
# can contain one or more assertions
|
||||
pass
|
||||
@@ -823,13 +852,13 @@ class AuthnResponse(StatusResponse):
|
||||
except AssertionError:
|
||||
raise Exception("No assertion part")
|
||||
|
||||
has_encrypted_assertions = self.response.encrypted_assertion
|
||||
if not has_encrypted_assertions and self.response.assertion:
|
||||
for tmp_assertion in self.response.assertion:
|
||||
if tmp_assertion.advice:
|
||||
if tmp_assertion.advice.encrypted_assertion:
|
||||
has_encrypted_assertions = True
|
||||
break
|
||||
has_encrypted_assertions = self.find_encrypt_data(self.response) #self.response.encrypted_assertion
|
||||
#if not has_encrypted_assertions and self.response.assertion:
|
||||
# for tmp_assertion in self.response.assertion:
|
||||
# if tmp_assertion.advice:
|
||||
# if tmp_assertion.advice.encrypted_assertion:
|
||||
# has_encrypted_assertions = True
|
||||
# break
|
||||
|
||||
if self.response.assertion:
|
||||
logger.debug("***Unencrypted assertion***")
|
||||
@@ -837,16 +866,25 @@ class AuthnResponse(StatusResponse):
|
||||
if not self._assertion(assertion, False):
|
||||
return False
|
||||
|
||||
if has_encrypted_assertions and decrypt:
|
||||
if has_encrypted_assertions:
|
||||
_enc_assertions = []
|
||||
logger.debug("***Encrypted assertion/-s***")
|
||||
decr_text = self.sec.decrypt(self.xmlstr, key_file)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
decr_text = "%s" % self.response
|
||||
resp = self.response
|
||||
while self.find_encrypt_data(resp):
|
||||
decr_text = self.sec.decrypt_keys(decr_text, keys)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text)
|
||||
decr_text = self.sec.decrypt(decr_text, key_file)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
while self.find_encrypt_data(resp) or self.find_encrypt_data_assertion_list(_enc_assertions):
|
||||
decr_text = self.sec.decrypt_keys(decr_text, keys)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True)
|
||||
#_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True)
|
||||
all_assertions = _enc_assertions
|
||||
if resp.assertion:
|
||||
for tmp_ass in resp.assertion:
|
||||
all_assertions = all_assertions + resp.assertion
|
||||
if len(all_assertions) > 0:
|
||||
for tmp_ass in all_assertions:
|
||||
if tmp_ass.advice and tmp_ass.advice.encrypted_assertion:
|
||||
advice_res = self.decrypt_assertions(tmp_ass.advice.encrypted_assertion,
|
||||
decr_text,
|
||||
@@ -855,21 +893,20 @@ class AuthnResponse(StatusResponse):
|
||||
tmp_ass.advice.assertion.extend(advice_res)
|
||||
else:
|
||||
tmp_ass.advice.assertion = advice_res
|
||||
if not pefim:
|
||||
_enc_assertions.extend(advice_res)
|
||||
tmp_ass.advice.encrypted_assertion = []
|
||||
self.response.assertion = resp.assertion
|
||||
for assertion in _enc_assertions:
|
||||
if not self._assertion(assertion, True):
|
||||
return False
|
||||
else:
|
||||
self.assertions.append(assertion)
|
||||
|
||||
self.xmlstr = decr_text
|
||||
self.response.encrypted_assertion = []
|
||||
|
||||
if self.response.assertion:
|
||||
for assertion in self.response.assertion:
|
||||
if assertion.advice and assertion.advice.assertion:
|
||||
for advice_assertion in assertion.advice.assertion:
|
||||
self.assertions.append(assertion)
|
||||
self.assertions.append(assertion)
|
||||
|
||||
if self.assertions and len(self.assertions) > 0:
|
||||
self.assertion = self.assertions[0]
|
||||
@@ -880,7 +917,7 @@ class AuthnResponse(StatusResponse):
|
||||
|
||||
return True
|
||||
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
def verify(self, keys=None):
|
||||
""" Verify that the assertion is syntactically correct and
|
||||
the signature is correct if present.
|
||||
:param key_file: If not the default key file should be used this is it.
|
||||
@@ -898,7 +935,7 @@ class AuthnResponse(StatusResponse):
|
||||
if not isinstance(self.response, samlp.Response):
|
||||
return self
|
||||
|
||||
if self.parse_assertion(key_file, decrypt=decrypt, pefim=pefim):
|
||||
if self.parse_assertion(keys):
|
||||
return self
|
||||
else:
|
||||
logger.error("Could not parse the assertion")
|
||||
@@ -1126,7 +1163,7 @@ class AssertionIDResponse(object):
|
||||
|
||||
return self._postamble()
|
||||
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
def verify(self, keys=None):
|
||||
try:
|
||||
valid_instance(self.response)
|
||||
except NotValid as exc:
|
||||
|
@@ -769,7 +769,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
|
||||
return output
|
||||
|
||||
def encrypt_assertion(self, statement, enc_key, template,
|
||||
key_type="des-192", node_xpath=None):
|
||||
key_type="des-192", node_xpath=None, node_id=None):
|
||||
"""
|
||||
Will encrypt an assertion
|
||||
|
||||
@@ -792,6 +792,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
|
||||
com_list = [self.xmlsec, "encrypt", "--pubkey-cert-pem", enc_key,
|
||||
"--session-key", key_type, "--xml-data", fil,
|
||||
"--node-xpath", node_xpath]
|
||||
if node_id:
|
||||
com_list.extend(["--node-id", node_id])
|
||||
|
||||
(_stdout, _stderr, output) = self._run_xmlsec(
|
||||
com_list, [tmpl], exception=EncryptError, validate_output=False)
|
||||
@@ -1300,6 +1302,25 @@ class SecurityContext(object):
|
||||
"""
|
||||
raise NotImplemented()
|
||||
|
||||
def decrypt_keys(self, enctext, keys=None):
|
||||
""" Decrypting an encrypted text by the use of a private key.
|
||||
|
||||
:param enctext: The encrypted text as a string
|
||||
:return: The decrypted text
|
||||
"""
|
||||
if not isinstance(keys, list):
|
||||
keys = [keys]
|
||||
_enctext = self.crypto.decrypt(enctext, self.key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
for _key in keys:
|
||||
if _key is not None and len(_key.strip()) > 0:
|
||||
_, key_file = make_temp("%s" % _key, decode=False)
|
||||
_enctext = self.crypto.decrypt(enctext, key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
return enctext
|
||||
|
||||
def decrypt(self, enctext, key_file=None):
|
||||
""" Decrypting an encrypted text by the use of a private key.
|
||||
|
||||
@@ -1310,12 +1331,9 @@ class SecurityContext(object):
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
if key_file is not None and len(key_file.strip()) > 0:
|
||||
_enctext = self.crypto.decrypt(enctext, key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
_enctext = self.crypto.decrypt(enctext, self.key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
_enctext = self.crypto.decrypt(enctext, key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
return enctext
|
||||
|
||||
def verify_signature(self, signedtext, cert_file=None, cert_type="pem",
|
||||
|
@@ -27,7 +27,7 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
|
||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
||||
from saml2.saml import NameID
|
||||
from saml2.server import Server
|
||||
from saml2.sigver import pre_encryption_part, make_temp
|
||||
from saml2.sigver import pre_encryption_part, make_temp, pre_encrypt_assertion
|
||||
from saml2.sigver import rm_xmltag
|
||||
from saml2.sigver import verify_redirect_signature
|
||||
from saml2.s_utils import do_attribute_statement
|
||||
@@ -434,7 +434,7 @@ class TestClient:
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": cert}, pefim=True)
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": cert})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
@@ -471,7 +471,225 @@ class TestClient:
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, pefim=True)
|
||||
{"id1": "http://foo.example.com/service"})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_4(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
#encrypted_advice_attributes=True,
|
||||
pefim=True,
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_5(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
}
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
#encrypted_advice_attributes=True,
|
||||
pefim=True,
|
||||
encrypt_cert_assertion=cert_str
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": cert})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_6(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
cert_assertion_str, cert_key_assertion_str = generate_cert()
|
||||
|
||||
cert_assertion =\
|
||||
{
|
||||
"cert": cert_assertion_str,
|
||||
"key": cert_key_assertion_str
|
||||
}
|
||||
|
||||
cert_advice_str, cert_key_advice_str = generate_cert()
|
||||
|
||||
cert_advice =\
|
||||
{
|
||||
"cert": cert_advice_str,
|
||||
"key": cert_key_advice_str
|
||||
}
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
#encrypted_advice_attributes=True,
|
||||
pefim=True,
|
||||
encrypt_cert_assertion=cert_assertion_str,
|
||||
encrypt_cert_advice=cert_advice_str
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": [cert_assertion, cert_advice]})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_7(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypted_advice_attributes=True,
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_8(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
}
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=True,
|
||||
encrypt_assertion_self_contained=True,
|
||||
encrypt_cert_assertion=cert_str
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": cert})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
@@ -486,7 +704,7 @@ class TestClient:
|
||||
def verify_authn_response(self, idp, authn_response, _client, ava_verify):
|
||||
assert authn_response is not None
|
||||
assert authn_response.issuer() == idp
|
||||
assert authn_response.response.assertion[0].issuer.text == idp
|
||||
assert authn_response.assertion.issuer.text == idp
|
||||
session_info = authn_response.session_info()
|
||||
|
||||
assert session_info["ava"] == ava_verify
|
||||
@@ -634,7 +852,7 @@ class TestClient:
|
||||
assert resp.assertion
|
||||
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
|
||||
|
||||
def test_sign_then_encrypt_assertion_advice(self):
|
||||
def test_sign_then_encrypt_assertion_advice_1(self):
|
||||
# Begin with the IdPs side
|
||||
_sec = self.server.sec
|
||||
|
||||
@@ -712,7 +930,240 @@ class TestClient:
|
||||
assert resp.ava == \
|
||||
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['test.testsson@test.se']}
|
||||
|
||||
def test_sign_then_encrypt_assertion_advice_2(self):
|
||||
# Begin with the IdPs side
|
||||
_sec = self.server.sec
|
||||
|
||||
nameid_policy = samlp.NameIDPolicy(allow_create="false",
|
||||
format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
|
||||
asser_1 = Assertion({"givenName": "Derek"})
|
||||
assertion_1 = asser_1.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
asser_2 = Assertion({"surName": "Jeter"})
|
||||
assertion_2 = asser_2.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_asser_1 = Assertion({"uid": "test01"})
|
||||
a_assertion_1 = a_asser_1.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_asser_2 = Assertion({"email": "test.testsson@test.se"})
|
||||
a_assertion_2 = a_asser_2.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_asser_3 = Assertion({"street": "street"})
|
||||
a_assertion_3 = a_asser_3.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_asser_4 = Assertion({"title": "title"})
|
||||
a_assertion_4 = a_asser_4.construct(
|
||||
self.client.config.entityid, "_012345",
|
||||
"http://lingon.catalogix.se:8087/",
|
||||
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
|
||||
policy=self.server.config.getattr("policy", "idp"),
|
||||
issuer=self.server._issuer(),
|
||||
attrconvs=self.server.config.attribute_converters,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login")
|
||||
|
||||
a_assertion_1.signature = sigver.pre_signature_part(
|
||||
a_assertion_1.id, _sec.my_cert, 1)
|
||||
|
||||
a_assertion_2.signature = sigver.pre_signature_part(
|
||||
a_assertion_2.id, _sec.my_cert, 1)
|
||||
|
||||
a_assertion_3.signature = sigver.pre_signature_part(
|
||||
a_assertion_3.id, _sec.my_cert, 1)
|
||||
|
||||
a_assertion_4.signature = sigver.pre_signature_part(
|
||||
a_assertion_4.id, _sec.my_cert, 1)
|
||||
|
||||
assertion_1.signature = sigver.pre_signature_part(assertion_1.id, _sec.my_cert, 1)
|
||||
|
||||
assertion_2.signature = sigver.pre_signature_part(assertion_2.id, _sec.my_cert, 1)
|
||||
|
||||
response = sigver.response_factory(
|
||||
in_response_to="_012345",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
status=s_utils.success_status_factory(),
|
||||
issuer=self.server._issuer()
|
||||
)
|
||||
|
||||
response.assertion = assertion_1
|
||||
|
||||
response.assertion.advice = Advice()
|
||||
|
||||
response.assertion.advice.encrypted_assertion = []
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_1)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_1._to_element_tree().tag
|
||||
response = \
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_1),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_1.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = response.assertion[0]
|
||||
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_2)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_2._to_element_tree().tag
|
||||
response = \
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_2),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_2.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = response.assertion[0]
|
||||
|
||||
assertion_tag = response.assertion._to_element_tree().tag
|
||||
response = pre_encrypt_assertion(response)
|
||||
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
|
||||
assertion_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(assertion_1),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=assertion_1.id)
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, pre_encryption_part())
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = assertion_2
|
||||
|
||||
response.assertion.advice = Advice()
|
||||
|
||||
response.assertion.advice.encrypted_assertion = []
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
|
||||
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_3)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_3._to_element_tree().tag
|
||||
response = \
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_3),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_3.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = response.assertion[0]
|
||||
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_4)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_4._to_element_tree().tag
|
||||
response = \
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_4),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_4.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file,
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(response.assertion[0]),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=response.assertion[0].id)
|
||||
|
||||
response = samlp.response_from_string(response)
|
||||
|
||||
#seresp = samlp.response_from_string(enctext)
|
||||
|
||||
resp_str = base64.encodestring("%s" % response)
|
||||
# Now over to the client side
|
||||
resp = self.client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"_012345": "http://foo.example.com/service"})
|
||||
|
||||
#assert resp.encrypted_assertion == []
|
||||
assert resp.assertion
|
||||
assert resp.assertion.advice
|
||||
assert resp.assertion.advice.assertion
|
||||
assert resp.ava == \
|
||||
{'street': ['street'], 'uid': ['test01'], 'title': ['title'], 'givenName': ['Derek'], 'email':
|
||||
['test.testsson@test.se'], 'sn': ['Jeter']}
|
||||
|
||||
def test_signed_redirect(self):
|
||||
|
||||
@@ -898,4 +1349,4 @@ class TestClientWithDummy():
|
||||
if __name__ == "__main__":
|
||||
tc = TestClient()
|
||||
tc.setup_class()
|
||||
tc.test_response_3()
|
||||
tc.test_response_8()
|
||||
|
Reference in New Issue
Block a user