Partial commit.
Improved signing and added more testcases.
This commit is contained in:
@@ -512,8 +512,14 @@ class Entity(HTTPBase):
|
|||||||
exception = None
|
exception = None
|
||||||
for _cert in _certs:
|
for _cert in _certs:
|
||||||
try:
|
try:
|
||||||
|
begin_cert = "-----BEGIN CERTIFICATE-----\n"
|
||||||
|
end_cert = "\n-----END CERTIFICATE-----\n"
|
||||||
|
if begin_cert not in _cert:
|
||||||
|
_cert = "%s%s" % (begin_cert, _cert)
|
||||||
|
if end_cert not in _cert:
|
||||||
|
_cert = "%s%s" % (_cert, end_cert)
|
||||||
_, cert_file = make_temp(_cert, decode=False)
|
_, cert_file = make_temp(_cert, decode=False)
|
||||||
response = cbxs.encrypt_assertion(response, self.sec.cert_file,
|
response = cbxs.encrypt_assertion(response, cert_file,
|
||||||
pre_encryption_part(), node_xpath=node_xpath)
|
pre_encryption_part(), node_xpath=node_xpath)
|
||||||
return response
|
return response
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
@@ -525,7 +531,7 @@ class Entity(HTTPBase):
|
|||||||
def _response(self, in_response_to, consumer_url=None, status=None,
|
def _response(self, in_response_to, consumer_url=None, status=None,
|
||||||
issuer=None, sign=False, to_sign=None, sp_entity_id=None,
|
issuer=None, sign=False, to_sign=None, sp_entity_id=None,
|
||||||
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
|
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
|
||||||
encrypt_cert=None, encrypt_cert_assertion=None,sign_assertion=None, **kwargs):
|
encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, **kwargs):
|
||||||
""" Create a Response.
|
""" Create a Response.
|
||||||
Encryption:
|
Encryption:
|
||||||
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
|
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
|
||||||
@@ -598,7 +604,7 @@ class Entity(HTTPBase):
|
|||||||
|
|
||||||
if to_sign_advice:
|
if to_sign_advice:
|
||||||
response = signed_instance_factory(response, self.sec, to_sign_advice)
|
response = signed_instance_factory(response, self.sec, to_sign_advice)
|
||||||
response = self._encrypt_assertion(encrypt_cert, sp_entity_id, response, node_xpath=node_xpath)
|
response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath)
|
||||||
if encrypt_assertion:
|
if encrypt_assertion:
|
||||||
response = response_from_string(response)
|
response = response_from_string(response)
|
||||||
if encrypt_assertion:
|
if encrypt_assertion:
|
||||||
|
@@ -323,7 +323,7 @@ class Server(Entity):
|
|||||||
status=None, authn=None, issuer=None, policy=None,
|
status=None, authn=None, issuer=None, policy=None,
|
||||||
sign_assertion=False, sign_response=False,
|
sign_assertion=False, sign_response=False,
|
||||||
best_effort=False, encrypt_assertion=False,
|
best_effort=False, encrypt_assertion=False,
|
||||||
encrypt_cert=None, authn_statement=None,
|
encrypt_cert_advice=None, encrypt_cert_assertion=None, authn_statement=None,
|
||||||
encrypt_assertion_self_contained=False, encrypted_advice_attributes=False):
|
encrypt_assertion_self_contained=False, encrypted_advice_attributes=False):
|
||||||
""" Create a response. A layer of indirection.
|
""" Create a response. A layer of indirection.
|
||||||
|
|
||||||
@@ -375,16 +375,17 @@ class Server(Entity):
|
|||||||
sign_response)
|
sign_response)
|
||||||
|
|
||||||
to_sign = []
|
to_sign = []
|
||||||
#if sign_assertion is not None and sign_assertion:
|
if not encrypt_assertion:
|
||||||
# if assertion.advice and assertion.advice.assertion:
|
if sign_assertion:
|
||||||
# for tmp_assertion in assertion.advice.assertion:
|
assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1)
|
||||||
# tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
to_sign.append((class_name(assertion), assertion.id))
|
||||||
# to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
|
|
||||||
# assertion.signature = pre_signature_part(assertion.id,
|
|
||||||
# self.sec.my_cert, 1)
|
|
||||||
# Just the assertion or the response and the assertion ?
|
|
||||||
# to_sign.append((class_name(assertion), assertion.id))
|
|
||||||
|
|
||||||
|
if not encrypted_advice_attributes:
|
||||||
|
if sign_assertion:
|
||||||
|
if assertion.advice and assertion.advice.assertion:
|
||||||
|
for tmp_assertion in assertion.advice.assertion:
|
||||||
|
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
|
||||||
|
to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
|
||||||
|
|
||||||
# Store which assertion that has been sent to which SP about which
|
# Store which assertion that has been sent to which SP about which
|
||||||
# subject.
|
# subject.
|
||||||
@@ -400,7 +401,8 @@ class Server(Entity):
|
|||||||
|
|
||||||
return self._response(in_response_to, consumer_url, status, issuer,
|
return self._response(in_response_to, consumer_url, status, issuer,
|
||||||
sign_response, to_sign,sp_entity_id=sp_entity_id, encrypt_assertion=encrypt_assertion,
|
sign_response, to_sign,sp_entity_id=sp_entity_id, encrypt_assertion=encrypt_assertion,
|
||||||
encrypt_cert=encrypt_cert,
|
encrypt_cert_advice=encrypt_cert_advice,
|
||||||
|
encrypt_cert_assertion=encrypt_cert_assertion,
|
||||||
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
||||||
encrypted_advice_attributes=encrypted_advice_attributes,sign_assertion=sign_assertion,
|
encrypted_advice_attributes=encrypted_advice_attributes,sign_assertion=sign_assertion,
|
||||||
**args)
|
**args)
|
||||||
@@ -477,8 +479,8 @@ class Server(Entity):
|
|||||||
sp_entity_id, name_id_policy=None, userid=None,
|
sp_entity_id, name_id_policy=None, userid=None,
|
||||||
name_id=None, authn=None, issuer=None,
|
name_id=None, authn=None, issuer=None,
|
||||||
sign_response=None, sign_assertion=None,
|
sign_response=None, sign_assertion=None,
|
||||||
encrypt_cert=None, encrypt_assertion=None,
|
encrypt_cert_advice=None, encrypt_cert_assertion=None, encrypt_assertion=None,
|
||||||
encrypt_assertion_self_contained=False,
|
encrypt_assertion_self_contained=True,
|
||||||
encrypted_advice_attributes=False,
|
encrypted_advice_attributes=False,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
""" Constructs an AuthenticationResponse
|
""" Constructs an AuthenticationResponse
|
||||||
@@ -523,17 +525,35 @@ class Server(Entity):
|
|||||||
if encrypt_assertion is None:
|
if encrypt_assertion is None:
|
||||||
encrypt_assertion = False
|
encrypt_assertion = False
|
||||||
|
|
||||||
if encrypt_assertion:
|
|
||||||
if encrypt_cert is not None:
|
if encrypt_assertion_self_contained is None:
|
||||||
verify_encrypt_cert = self.config.getattr("verify_encrypt_cert", "idp")
|
encrypt_assertion_self_contained = self.config.getattr("encrypt_assertion_self_contained", "idp")
|
||||||
|
if encrypt_assertion_self_contained is None:
|
||||||
|
encrypt_assertion_self_contained = True
|
||||||
|
|
||||||
|
if encrypted_advice_attributes is None:
|
||||||
|
encrypted_advice_attributes = self.config.getattr("encrypted_advice_attributes", "idp")
|
||||||
|
if encrypted_advice_attributes is None:
|
||||||
|
encrypted_advice_attributes = False
|
||||||
|
|
||||||
|
if encrypted_advice_attributes:
|
||||||
|
verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_advice", "idp")
|
||||||
if verify_encrypt_cert is not None:
|
if verify_encrypt_cert is not None:
|
||||||
if not verify_encrypt_cert(encrypt_cert):
|
if encrypt_cert_advice is None:
|
||||||
raise CertificateError("Invalid certificate for encryption!")
|
|
||||||
else:
|
|
||||||
raise CertificateError("No SPCertEncType certificate for encryption contained in authentication "
|
raise CertificateError("No SPCertEncType certificate for encryption contained in authentication "
|
||||||
"request.")
|
"request.")
|
||||||
else:
|
if not verify_encrypt_cert(encrypt_cert_advice):
|
||||||
encrypt_assertion = False
|
raise CertificateError("Invalid certificate for encryption!")
|
||||||
|
|
||||||
|
|
||||||
|
if encrypt_assertion:
|
||||||
|
verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_assertion", "idp")
|
||||||
|
if verify_encrypt_cert is not None:
|
||||||
|
if encrypt_cert_assertion is None:
|
||||||
|
raise CertificateError("No SPCertEncType certificate for encryption contained in authentication "
|
||||||
|
"request.")
|
||||||
|
if not verify_encrypt_cert(encrypt_cert_assertion):
|
||||||
|
raise CertificateError("Invalid certificate for encryption!")
|
||||||
|
|
||||||
if not name_id:
|
if not name_id:
|
||||||
try:
|
try:
|
||||||
@@ -593,7 +613,8 @@ class Server(Entity):
|
|||||||
encrypt_assertion=encrypt_assertion,
|
encrypt_assertion=encrypt_assertion,
|
||||||
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
||||||
encrypted_advice_attributes=encrypted_advice_attributes,
|
encrypted_advice_attributes=encrypted_advice_attributes,
|
||||||
encrypt_cert=encrypt_cert)
|
encrypt_cert_advice=encrypt_cert_advice,
|
||||||
|
encrypt_cert_assertion=encrypt_cert_assertion)
|
||||||
return self._authn_response(in_response_to, # in_response_to
|
return self._authn_response(in_response_to, # in_response_to
|
||||||
destination, # consumer_url
|
destination, # consumer_url
|
||||||
sp_entity_id, # sp_entity_id
|
sp_entity_id, # sp_entity_id
|
||||||
@@ -608,7 +629,8 @@ class Server(Entity):
|
|||||||
encrypt_assertion=encrypt_assertion,
|
encrypt_assertion=encrypt_assertion,
|
||||||
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
||||||
encrypted_advice_attributes=encrypted_advice_attributes,
|
encrypted_advice_attributes=encrypted_advice_attributes,
|
||||||
encrypt_cert=encrypt_cert)
|
encrypt_cert_advice=encrypt_cert_advice,
|
||||||
|
encrypt_cert_assertion=encrypt_cert_assertion)
|
||||||
|
|
||||||
except MissingValue as exc:
|
except MissingValue as exc:
|
||||||
return self.create_error_response(in_response_to, destination,
|
return self.create_error_response(in_response_to, destination,
|
||||||
|
@@ -90,10 +90,41 @@ class TestServer1():
|
|||||||
conf = config.SPConfig()
|
conf = config.SPConfig()
|
||||||
conf.load_file("server_conf")
|
conf.load_file("server_conf")
|
||||||
self.client = client.Saml2Client(conf)
|
self.client = client.Saml2Client(conf)
|
||||||
|
self.name_id = self.server.ident.transient_nameid(
|
||||||
|
"urn:mace:example.com:saml:roland:sp", "id12")
|
||||||
|
self.ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||||
|
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
||||||
|
|
||||||
def teardown_class(self):
|
def teardown_class(self):
|
||||||
self.server.close()
|
self.server.close()
|
||||||
|
|
||||||
|
def verify_assertion(self, assertion):
|
||||||
|
assert assertion
|
||||||
|
assert assertion[0].attribute_statement
|
||||||
|
|
||||||
|
ava = ava = get_ava(assertion[0])
|
||||||
|
|
||||||
|
assert ava ==\
|
||||||
|
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
||||||
|
|
||||||
|
|
||||||
|
def verify_encrypted_assertion(self, assertion, decr_text):
|
||||||
|
self.verify_assertion(assertion)
|
||||||
|
assert assertion[0].signature is None
|
||||||
|
|
||||||
|
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
|
||||||
|
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
||||||
|
|
||||||
|
def verify_advice_assertion(self, resp, decr_text):
|
||||||
|
assert resp.assertion[0].signature is None
|
||||||
|
|
||||||
|
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
||||||
|
|
||||||
|
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
||||||
|
[saml, samlp])
|
||||||
|
self.verify_encrypted_assertion(assertion, decr_text)
|
||||||
|
|
||||||
|
|
||||||
def test_issuer(self):
|
def test_issuer(self):
|
||||||
issuer = self.server._issuer()
|
issuer = self.server._issuer()
|
||||||
assert isinstance(issuer, saml.Issuer)
|
assert isinstance(issuer, saml.Issuer)
|
||||||
@@ -414,21 +445,71 @@ class TestServer1():
|
|||||||
# value. Just that there should be one
|
# value. Just that there should be one
|
||||||
assert assertion.signature.signature_value.text != ""
|
assert assertion.signature.signature_value.text != ""
|
||||||
|
|
||||||
|
def test_signed_response_1(self):
|
||||||
|
|
||||||
|
|
||||||
|
signed_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=True,
|
||||||
|
sign_assertion=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(signed_resp)
|
||||||
|
|
||||||
|
valid = self.server.sec.verify_signature(signed_resp,
|
||||||
|
self.server.config.cert_file,
|
||||||
|
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||||
|
node_id=sresponse.id,
|
||||||
|
id_attr="")
|
||||||
|
assert valid
|
||||||
|
|
||||||
|
valid = self.server.sec.verify_signature(signed_resp,
|
||||||
|
self.server.config.cert_file,
|
||||||
|
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
|
||||||
|
node_id=sresponse.assertion[0].id,
|
||||||
|
id_attr="")
|
||||||
|
assert valid
|
||||||
|
|
||||||
|
self.verify_assertion(sresponse.assertion)
|
||||||
|
|
||||||
|
def test_signed_response_2(self):
|
||||||
|
signed_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=True,
|
||||||
|
sign_assertion=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(signed_resp)
|
||||||
|
|
||||||
|
valid = self.server.sec.verify_signature(signed_resp,
|
||||||
|
self.server.config.cert_file,
|
||||||
|
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
|
||||||
|
node_id=sresponse.id,
|
||||||
|
id_attr="")
|
||||||
|
assert valid
|
||||||
|
|
||||||
|
assert sresponse.assertion[0].signature == None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_encrypted_signed_response_1(self):
|
def test_encrypted_signed_response_1(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
signed_resp = self.server.create_authn_response(
|
||||||
ava,
|
self.ava,
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
@@ -483,19 +564,14 @@ class TestServer1():
|
|||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
def test_encrypted_signed_response_2(self):
|
def test_encrypted_signed_response_2(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
signed_resp = self.server.create_authn_response(
|
||||||
ava,
|
self.ava,
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
@@ -541,19 +617,14 @@ class TestServer1():
|
|||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
def test_encrypted_signed_response_3(self):
|
def test_encrypted_signed_response_3(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
signed_resp = self.server.create_authn_response(
|
||||||
ava,
|
self.ava,
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
@@ -596,19 +667,14 @@ class TestServer1():
|
|||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
def test_encrypted_signed_response_4(self):
|
def test_encrypted_signed_response_4(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str, cert_key_str = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
signed_resp = self.server.create_authn_response(
|
||||||
ava,
|
self.ava,
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
sign_response=True,
|
sign_response=True,
|
||||||
sign_assertion=True,
|
sign_assertion=True,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
@@ -663,104 +729,218 @@ class TestServer1():
|
|||||||
assert valid
|
assert valid
|
||||||
|
|
||||||
def test_encrypted_response_1(self):
|
def test_encrypted_response_1(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
cert_str_advice, cert_key_str_advice = generate_cert()
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
signed_resp = self.server.create_authn_response(
|
|
||||||
ava,
|
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
sign_response=False,
|
sign_response=False,
|
||||||
sign_assertion=False,
|
sign_assertion=False,
|
||||||
encrypt_assertion=False,
|
encrypt_assertion=False,
|
||||||
encrypt_assertion_self_contained=True,
|
encrypt_assertion_self_contained=True,
|
||||||
encrypted_advice_attributes=True,
|
encrypted_advice_attributes=True,
|
||||||
encrypt_cert=cert_str,
|
encrypt_cert_advice=cert_str_advice,
|
||||||
)
|
)
|
||||||
|
|
||||||
sresponse = response_from_string(signed_resp)
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
assert sresponse.signature is None
|
assert sresponse.signature is None
|
||||||
|
|
||||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
|
||||||
|
|
||||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
decr_text = self.server.sec.decrypt(_resp, key_file)
|
||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
assert resp.assertion[0].signature is None
|
self.verify_advice_assertion(resp, decr_text)
|
||||||
|
|
||||||
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
|
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
|
|
||||||
[saml, samlp])
|
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = ava = get_ava(assertion[0])
|
|
||||||
|
|
||||||
assert ava ==\
|
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
|
||||||
|
|
||||||
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
|
|
||||||
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
|
||||||
|
|
||||||
assert assertion[0].signature is None
|
|
||||||
|
|
||||||
def test_encrypted_response_2(self):
|
def test_encrypted_response_2(self):
|
||||||
name_id = self.server.ident.transient_nameid(
|
|
||||||
"urn:mace:example.com:saml:roland:sp", "id12")
|
|
||||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
|
||||||
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
|
|
||||||
|
|
||||||
cert_str, cert_key_str = generate_cert()
|
cert_str_advice, cert_key_str_advice = generate_cert()
|
||||||
|
|
||||||
signed_resp = self.server.create_authn_response(
|
_resp = self.server.create_authn_response(
|
||||||
ava,
|
self.ava,
|
||||||
"id12", # in_response_to
|
"id12", # in_response_to
|
||||||
"http://lingon.catalogix.se:8087/", # consumer_url
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
name_id=name_id,
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=False,
|
||||||
|
encrypt_assertion=True,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
|
encrypted_advice_attributes=True,
|
||||||
|
encrypt_cert_advice=cert_str_advice,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
|
assert sresponse.signature is None
|
||||||
|
|
||||||
|
decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.key_file)
|
||||||
|
|
||||||
|
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
|
||||||
|
|
||||||
|
decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file)
|
||||||
|
|
||||||
|
resp = samlp.response_from_string(decr_text_2)
|
||||||
|
|
||||||
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
|
self.verify_advice_assertion(resp, decr_text_2)
|
||||||
|
|
||||||
|
def test_encrypted_response_3(self):
|
||||||
|
cert_str_assertion, cert_key_str_assertion = generate_cert()
|
||||||
|
|
||||||
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
sign_response=False,
|
sign_response=False,
|
||||||
sign_assertion=False,
|
sign_assertion=False,
|
||||||
encrypt_assertion=True,
|
encrypt_assertion=True,
|
||||||
encrypt_assertion_self_contained=True,
|
encrypt_assertion_self_contained=True,
|
||||||
encrypted_advice_attributes=False,
|
encrypted_advice_attributes=False,
|
||||||
encrypt_cert=cert_str,
|
encrypt_cert_assertion=cert_str_assertion
|
||||||
)
|
)
|
||||||
|
|
||||||
sresponse = response_from_string(signed_resp)
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
assert sresponse.signature is None
|
assert sresponse.signature is None
|
||||||
|
|
||||||
_, key_file = make_temp("%s" % cert_key_str, decode=False)
|
_, key_file = make_temp("%s" % cert_key_str_assertion, decode=False)
|
||||||
|
|
||||||
decr_text = self.server.sec.decrypt(signed_resp, key_file)
|
decr_text = self.server.sec.decrypt(_resp, key_file)
|
||||||
|
|
||||||
resp = samlp.response_from_string(decr_text)
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
assert resp.encrypted_assertion[0].extension_elements
|
assert resp.encrypted_assertion[0].extension_elements
|
||||||
|
|
||||||
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
assert assertion
|
|
||||||
assert assertion[0].attribute_statement
|
|
||||||
|
|
||||||
ava = ava = get_ava(assertion[0])
|
self.verify_encrypted_assertion(assertion, decr_text)
|
||||||
|
|
||||||
assert ava ==\
|
def test_encrypted_response_4(self):
|
||||||
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=False,
|
||||||
|
encrypt_assertion=True,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
|
encrypted_advice_attributes=False,
|
||||||
|
)
|
||||||
|
|
||||||
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
|
sresponse = response_from_string(_resp)
|
||||||
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
|
|
||||||
|
|
||||||
assert assertion[0].signature is None
|
assert sresponse.signature is None
|
||||||
|
|
||||||
|
decr_text = self.server.sec.decrypt(_resp, self.client.config.key_file)
|
||||||
|
|
||||||
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
|
assert resp.encrypted_assertion[0].extension_elements
|
||||||
|
|
||||||
|
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
|
self.verify_encrypted_assertion(assertion, decr_text)
|
||||||
|
|
||||||
|
def test_encrypted_response_5(self):
|
||||||
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=False,
|
||||||
|
encrypt_assertion=False,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
|
encrypted_advice_attributes=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
|
assert sresponse.signature is None
|
||||||
|
|
||||||
|
decr_text = self.server.sec.decrypt(_resp, self.client.config.key_file)
|
||||||
|
|
||||||
|
resp = samlp.response_from_string(decr_text)
|
||||||
|
|
||||||
|
self.verify_advice_assertion(resp, decr_text)
|
||||||
|
|
||||||
|
def test_encrypted_response_6(self):
|
||||||
|
cert_str_advice, cert_key_str_advice = generate_cert()
|
||||||
|
|
||||||
|
cert_str_assertion, cert_key_str_assertion = generate_cert()
|
||||||
|
|
||||||
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=False,
|
||||||
|
encrypt_assertion=True,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
|
encrypted_advice_attributes=True,
|
||||||
|
encrypt_cert_advice=cert_str_advice,
|
||||||
|
encrypt_cert_assertion=cert_str_assertion
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
|
assert sresponse.signature is None
|
||||||
|
|
||||||
|
_, key_file = make_temp("%s" % cert_key_str_assertion, decode=False)
|
||||||
|
|
||||||
|
decr_text_1 = self.server.sec.decrypt(_resp, key_file)
|
||||||
|
|
||||||
|
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
|
||||||
|
|
||||||
|
decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file)
|
||||||
|
|
||||||
|
resp = samlp.response_from_string(decr_text_2)
|
||||||
|
|
||||||
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
|
self.verify_advice_assertion(resp, decr_text_2)
|
||||||
|
|
||||||
|
def test_encrypted_response_7(self):
|
||||||
|
_resp = self.server.create_authn_response(
|
||||||
|
self.ava,
|
||||||
|
"id12", # in_response_to
|
||||||
|
"http://lingon.catalogix.se:8087/", # consumer_url
|
||||||
|
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
|
||||||
|
name_id=self.name_id,
|
||||||
|
sign_response=False,
|
||||||
|
sign_assertion=False,
|
||||||
|
encrypt_assertion=True,
|
||||||
|
encrypt_assertion_self_contained=True,
|
||||||
|
encrypted_advice_attributes=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
sresponse = response_from_string(_resp)
|
||||||
|
|
||||||
|
assert sresponse.signature is None
|
||||||
|
|
||||||
|
decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.key_file)
|
||||||
|
|
||||||
|
decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.key_file)
|
||||||
|
|
||||||
|
resp = samlp.response_from_string(decr_text_2)
|
||||||
|
|
||||||
|
resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
|
||||||
|
|
||||||
|
self.verify_advice_assertion(resp, decr_text_2)
|
||||||
|
|
||||||
def test_slo_http_post(self):
|
def test_slo_http_post(self):
|
||||||
soon = time_util.in_a_while(days=1)
|
soon = time_util.in_a_while(days=1)
|
||||||
@@ -899,4 +1079,4 @@ class TestServerLogout():
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ts = TestServer1()
|
ts = TestServer1()
|
||||||
ts.setup_class()
|
ts.setup_class()
|
||||||
ts.test_encrypted_response_1()
|
ts.test_signed_response_1()
|
||||||
|
Reference in New Issue
Block a user