Fixed bug due to the refactoring.
This commit is contained in:
@@ -18,6 +18,7 @@ from saml2 import saml
|
||||
from saml2 import element_to_extension_element
|
||||
from saml2 import class_name
|
||||
from saml2 import BINDING_HTTP_REDIRECT
|
||||
from saml2.argtree import add_path
|
||||
|
||||
from saml2.entity import Entity
|
||||
from saml2.eptid import Eptid
|
||||
@@ -290,7 +291,7 @@ class Server(Entity):
|
||||
|
||||
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
|
||||
name_id, policy, _issuer, authn_statement, identity,
|
||||
best_effort, sign_response, farg, **kwargs):
|
||||
best_effort, sign_response, farg=None, **kwargs):
|
||||
"""
|
||||
Construct and return the Assertion
|
||||
|
||||
@@ -322,14 +323,17 @@ class Server(Entity):
|
||||
return self.create_error_response(in_response_to, consumer_url,
|
||||
exc, sign_response)
|
||||
|
||||
try:
|
||||
subject_confirmation_specs = kwargs['subject_confirmation']
|
||||
except KeyError:
|
||||
subject_confirmation_data = {
|
||||
'recipient': consumer_url,
|
||||
'in_response_to': in_response_to,
|
||||
'method': saml.SCM_BEARER
|
||||
}
|
||||
if not farg:
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', in_response_to])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', consumer_url])
|
||||
|
||||
if authn: # expected to be a dictionary
|
||||
# Would like to use dict comprehension but ...
|
||||
@@ -427,29 +431,23 @@ class Server(Entity):
|
||||
if pefim:
|
||||
encrypted_advice_attributes = True
|
||||
encrypt_assertion_self_contained = True
|
||||
assertion_attributes = self.setup_assertion(None, sp_entity_id,
|
||||
None, None, None,
|
||||
policy,
|
||||
None, None, identity,
|
||||
best_effort,
|
||||
sign_response, False,
|
||||
**assertion_args)
|
||||
assertion = self.setup_assertion(authn, sp_entity_id,
|
||||
ass_in_response_to, consumer_url,
|
||||
name_id, policy, _issuer,
|
||||
authn_statement, [], True,
|
||||
sign_response, **assertion_args)
|
||||
assertion_attributes = self.setup_assertion(
|
||||
None, sp_entity_id, None, None, None, policy, None, None,
|
||||
identity, best_effort, sign_response, farg=assertion_args)
|
||||
assertion = self.setup_assertion(
|
||||
authn, sp_entity_id, ass_in_response_to, consumer_url, name_id,
|
||||
policy, _issuer, authn_statement, [], True, sign_response,
|
||||
farg=assertion_args)
|
||||
assertion.advice = saml.Advice()
|
||||
|
||||
# assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
|
||||
# assertion.advice.assertion_uri_ref.append(saml.AssertionURIRef())
|
||||
assertion.advice.assertion.append(assertion_attributes)
|
||||
else:
|
||||
assertion = self.setup_assertion(authn, sp_entity_id,
|
||||
ass_in_response_to, consumer_url,
|
||||
name_id, policy, _issuer,
|
||||
authn_statement, identity, True,
|
||||
sign_response, **assertion_args)
|
||||
assertion = self.setup_assertion(
|
||||
authn, sp_entity_id, ass_in_response_to, consumer_url, name_id,
|
||||
policy, _issuer, authn_statement, identity, True,
|
||||
sign_response, farg=assertion_args)
|
||||
|
||||
to_sign = []
|
||||
if not encrypt_assertion:
|
||||
@@ -484,7 +482,7 @@ class Server(Entity):
|
||||
status=None, issuer=None,
|
||||
sign_assertion=False, sign_response=False,
|
||||
attributes=None, sign_alg=None,
|
||||
digest_alg=None, **kwargs):
|
||||
digest_alg=None, farg=None, **kwargs):
|
||||
""" Create an attribute assertion response.
|
||||
|
||||
:param identity: A dictionary with attributes and values that are
|
||||
@@ -516,6 +514,19 @@ class Server(Entity):
|
||||
to_sign = []
|
||||
|
||||
if identity:
|
||||
if not farg:
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to',
|
||||
in_response_to])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', destination])
|
||||
|
||||
_issuer = self._issuer(issuer)
|
||||
ast = Assertion(identity)
|
||||
if policy:
|
||||
@@ -527,19 +538,10 @@ class Server(Entity):
|
||||
restr = restriction_from_attribute_spec(attributes)
|
||||
ast = filter_attribute_value_assertions(ast)
|
||||
|
||||
try:
|
||||
subject_confirmation_specs = kwargs['subject_confirmation_specs']
|
||||
except KeyError:
|
||||
subject_confirmation_specs = {
|
||||
'recipient': destination,
|
||||
'in_response_to': in_response_to,
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
|
||||
assertion = ast.construct(
|
||||
sp_entity_id, self.config.attribute_converters, policy,
|
||||
issuer=_issuer, name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
if sign_assertion:
|
||||
assertion.signature = pre_signature_part(assertion.id,
|
||||
@@ -712,8 +714,7 @@ class Server(Entity):
|
||||
encrypt_cert_advice=encrypt_cert_advice,
|
||||
encrypt_cert_assertion=encrypt_cert_assertion,
|
||||
encrypt_assertion=encrypt_assertion,
|
||||
encrypt_assertion_self_contained
|
||||
=encrypt_assertion_self_contained,
|
||||
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
||||
encrypted_advice_attributes=encrypted_advice_attributes,
|
||||
pefim=pefim, **kwargs)
|
||||
except IOError as exc:
|
||||
|
@@ -1,6 +1,7 @@
|
||||
# coding=utf-8
|
||||
import pytest
|
||||
|
||||
from saml2.argtree import add_path
|
||||
from saml2.authn_context import pword
|
||||
from saml2.mdie import to_dict
|
||||
from saml2 import md, assertion
|
||||
@@ -810,16 +811,21 @@ def test_assertion_with_noop_attribute_conv():
|
||||
})
|
||||
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
|
||||
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
|
||||
subject_confirmation_specs = {
|
||||
'recipient': 'consumer_url',
|
||||
'in_response_to': 'in_response_to',
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
|
||||
farg = add_path(
|
||||
{},
|
||||
['subject', 'subject_confirmation', 'method', saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', 'in_response_to'])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', 'consumer_url'])
|
||||
|
||||
msg = ast.construct(
|
||||
"sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
|
||||
issuer=issuer, authn_decl=ACD, name_id=name_id,
|
||||
authn_auth="authn_authn",
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
issuer=issuer, farg=farg, authn_decl=ACD, name_id=name_id,
|
||||
authn_auth="authn_authn")
|
||||
|
||||
print(msg)
|
||||
for attr in msg.attribute_statement[0].attribute:
|
||||
@@ -864,16 +870,20 @@ def test_assertion_with_zero_attributes():
|
||||
})
|
||||
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
|
||||
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
|
||||
subject_confirmation_specs = {
|
||||
'recipient': 'consumer_url',
|
||||
'in_response_to': 'in_response_to',
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
farg = add_path(
|
||||
{},
|
||||
['subject', 'subject_confirmation', 'method', saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', 'in_response_to'])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', 'consumer_url'])
|
||||
|
||||
msg = ast.construct(
|
||||
"sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
|
||||
issuer=issuer, authn_decl=ACD, authn_auth="authn_authn",
|
||||
name_id=name_id, subject_confirmation_specs=subject_confirmation_specs)
|
||||
name_id=name_id, farg=farg)
|
||||
|
||||
print(msg)
|
||||
assert msg.attribute_statement == []
|
||||
@@ -892,17 +902,20 @@ def test_assertion_with_authn_instant():
|
||||
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
|
||||
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
|
||||
|
||||
subject_confirmation_specs = {
|
||||
'recipient': 'consumer_url',
|
||||
'in_response_to': 'in_response_to',
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
farg = add_path(
|
||||
{},
|
||||
['subject', 'subject_confirmation', 'method', saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', 'in_response_to'])
|
||||
add_path(
|
||||
farg['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', 'consumer_url'])
|
||||
|
||||
msg = ast.construct(
|
||||
"sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
|
||||
issuer=issuer, authn_decl=ACD, authn_auth="authn_authn",
|
||||
authn_instant=1234567890, name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
authn_instant=1234567890, name_id=name_id, farg=farg)
|
||||
|
||||
print(msg)
|
||||
assert msg.authn_statement[0].authn_instant == "2009-02-13T23:31:30Z"
|
||||
|
@@ -4,7 +4,11 @@
|
||||
import base64
|
||||
import uuid
|
||||
import six
|
||||
from six.moves.urllib.parse import parse_qs, urlencode, urlparse
|
||||
from future.backports.urllib.parse import parse_qs
|
||||
from future.backports.urllib.parse import urlencode
|
||||
from future.backports.urllib.parse import urlparse
|
||||
|
||||
from saml2.argtree import add_path
|
||||
from saml2.cert import OpenSSLWrapper
|
||||
from saml2.xmldsig import SIG_RSA_SHA256
|
||||
from saml2 import BINDING_HTTP_POST
|
||||
@@ -70,8 +74,8 @@ def add_subelement(xmldoc, node_name, subelem):
|
||||
s = xmldoc.find(node_name)
|
||||
if s > 0:
|
||||
x = xmldoc.rindex("<", 0, s)
|
||||
tag = xmldoc[x+1:s-1]
|
||||
c = s+len(node_name)
|
||||
tag = xmldoc[x + 1:s - 1]
|
||||
c = s + len(node_name)
|
||||
spaces = ""
|
||||
while xmldoc[c] == " ":
|
||||
spaces += " "
|
||||
@@ -87,6 +91,7 @@ def add_subelement(xmldoc, node_name, subelem):
|
||||
|
||||
return xmldoc
|
||||
|
||||
|
||||
def for_me(condition, me):
|
||||
for restriction in condition.audience_restriction:
|
||||
audience = restriction.audience
|
||||
@@ -108,6 +113,7 @@ def ava(attribute_statement):
|
||||
def _leq(l1, l2):
|
||||
return set(l1) == set(l2)
|
||||
|
||||
|
||||
# def test_parse_3():
|
||||
# xml_response = open(XML_RESPONSE_FILE3).read()
|
||||
# response = samlp.response_from_string(xml_response)
|
||||
@@ -296,7 +302,7 @@ class TestClient:
|
||||
assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
|
||||
|
||||
def test_sign_auth_request_0(self):
|
||||
#print(self.client.config)
|
||||
# print(self.client.config)
|
||||
|
||||
req_id, areq = self.client.create_authn_request(
|
||||
"http://www.example.com/sso", sign=True, message_id="id1")
|
||||
@@ -308,7 +314,7 @@ class TestClient:
|
||||
assert ar.signature
|
||||
assert ar.signature.signature_value
|
||||
signed_info = ar.signature.signed_info
|
||||
#print(signed_info)
|
||||
# print(signed_info)
|
||||
assert len(signed_info.reference) == 1
|
||||
assert signed_info.reference[0].uri == "#id1"
|
||||
assert signed_info.reference[0].digest_value
|
||||
@@ -419,7 +425,7 @@ class TestClient:
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
cert = \
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
@@ -536,7 +542,7 @@ class TestClient:
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
cert = \
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
@@ -580,7 +586,7 @@ class TestClient:
|
||||
|
||||
cert_assertion_str, cert_key_assertion_str = generate_cert()
|
||||
|
||||
cert_assertion =\
|
||||
cert_assertion = \
|
||||
{
|
||||
"cert": cert_assertion_str,
|
||||
"key": cert_key_assertion_str
|
||||
@@ -588,7 +594,7 @@ class TestClient:
|
||||
|
||||
cert_advice_str, cert_key_advice_str = generate_cert()
|
||||
|
||||
cert_advice =\
|
||||
cert_advice = \
|
||||
{
|
||||
"cert": cert_advice_str,
|
||||
"key": cert_key_advice_str
|
||||
@@ -617,7 +623,8 @@ class TestClient:
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": [cert_assertion, cert_advice]})
|
||||
{"id1": "http://foo.example.com/service"},
|
||||
{"id1": [cert_assertion, cert_advice]})
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
@@ -668,7 +675,7 @@ class TestClient:
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
cert = \
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
@@ -701,12 +708,14 @@ class TestClient:
|
||||
|
||||
def setup_verify_authn_response(self):
|
||||
idp = "urn:mace:example.com:saml:roland:idp"
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]}
|
||||
ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], 'sn': ['Jeter'], 'title': ["The man"]}
|
||||
nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
"mail": ["derek@nyy.mlb.com"], "title": ["The man"]}
|
||||
ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'],
|
||||
'sn': ['Jeter'], 'title': ["The man"]}
|
||||
nameid_policy = samlp.NameIDPolicy(allow_create="false",
|
||||
format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
return idp, ava, ava_verify, nameid_policy
|
||||
|
||||
|
||||
def verify_authn_response(self, idp, authn_response, _client, ava_verify):
|
||||
assert authn_response is not None
|
||||
assert authn_response.issuer() == idp
|
||||
@@ -725,7 +734,6 @@ class TestClient:
|
||||
# The information I have about the subject comes from one source
|
||||
assert _client.users.issuers_of_info(subject_id) == [idp]
|
||||
|
||||
|
||||
def test_init_values(self):
|
||||
entityid = self.client.config.entityid
|
||||
print(entityid)
|
||||
@@ -773,7 +781,9 @@ class TestClient:
|
||||
assertion=_ass
|
||||
)
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[0]["cert_file"],
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
0]["cert_file"],
|
||||
pre_encryption_part())
|
||||
|
||||
seresp = samlp.response_from_string(enctext)
|
||||
@@ -793,13 +803,14 @@ class TestClient:
|
||||
if ass.signature:
|
||||
if not _csec.verify_signature("%s" % ass,
|
||||
sign_cert_file,
|
||||
node_name=class_name(ass)):
|
||||
node_name=class_name(
|
||||
ass)):
|
||||
continue
|
||||
resp_ass.append(ass)
|
||||
|
||||
seresp.assertion = resp_ass
|
||||
seresp.encrypted_assertion = None
|
||||
#print(_sresp)
|
||||
# print(_sresp)
|
||||
|
||||
assert seresp.assertion
|
||||
|
||||
@@ -811,11 +822,18 @@ class TestClient:
|
||||
format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
|
||||
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
|
||||
subject_confirmation_specs = {
|
||||
'recipient': "http://lingon.catalogix.se:8087/",
|
||||
'in_response_to': "_012345",
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to',
|
||||
'_012345'])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient',
|
||||
"http://lingon.catalogix.se:8087/"])
|
||||
|
||||
assertion = asser.construct(
|
||||
self.client.config.entityid,
|
||||
@@ -825,7 +843,7 @@ class TestClient:
|
||||
issuer=self.server._issuer(),
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
subject_confirmation_specs=subject_confirmation_specs
|
||||
farg=farg['assertion']
|
||||
)
|
||||
|
||||
assertion.signature = sigver.pre_signature_part(
|
||||
@@ -850,10 +868,12 @@ class TestClient:
|
||||
# or as part of a bunch of tests.
|
||||
xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(xmldoc, self.client.sec.encryption_keypairs[1]["cert_file"],
|
||||
enctext = _sec.crypto.encrypt_assertion(xmldoc,
|
||||
self.client.sec.encryption_keypairs[
|
||||
1]["cert_file"],
|
||||
pre_encryption_part())
|
||||
|
||||
#seresp = samlp.response_from_string(enctext)
|
||||
# seresp = samlp.response_from_string(enctext)
|
||||
|
||||
resp_str = base64.encodestring(enctext.encode('utf-8'))
|
||||
# Now over to the client side
|
||||
@@ -861,7 +881,7 @@ class TestClient:
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"_012345": "http://foo.example.com/service"})
|
||||
|
||||
#assert resp.encrypted_assertion == []
|
||||
# assert resp.encrypted_assertion == []
|
||||
assert resp.assertion
|
||||
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
|
||||
|
||||
@@ -875,12 +895,25 @@ class TestClient:
|
||||
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
|
||||
|
||||
subject_confirmation_specs = {
|
||||
'recipient': "http://lingon.catalogix.se:8087/",
|
||||
'in_response_to': "_012345",
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
'recipient': "http://lingon.catalogix.se:8087/",
|
||||
'in_response_to': "_012345",
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
|
||||
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to',
|
||||
'_012345'])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient',
|
||||
"http://lingon.catalogix.se:8087/"])
|
||||
|
||||
assertion = asser.construct(
|
||||
self.client.config.entityid,
|
||||
self.server.config.attribute_converters,
|
||||
@@ -889,7 +922,7 @@ class TestClient:
|
||||
name_id=name_id,
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"})
|
||||
a_assertion = a_asser.construct(
|
||||
@@ -900,7 +933,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_assertion.signature = sigver.pre_signature_part(
|
||||
a_assertion.id, _sec.my_cert, 1)
|
||||
@@ -910,7 +943,8 @@ class TestClient:
|
||||
assertion.advice.encrypted_assertion = []
|
||||
assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
|
||||
assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion)
|
||||
assertion.advice.encrypted_assertion[0].add_extension_element(
|
||||
a_assertion)
|
||||
|
||||
response = sigver.response_factory(
|
||||
in_response_to="_012345",
|
||||
@@ -922,21 +956,25 @@ class TestClient:
|
||||
response.assertion.append(assertion)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion),
|
||||
key_file=self.client.sec.key_file,
|
||||
node_id=a_assertion.id)
|
||||
key_file=self.client.sec.key_file,
|
||||
node_id=a_assertion.id)
|
||||
|
||||
#xmldoc = "%s" % response
|
||||
# xmldoc = "%s" % response
|
||||
# strangely enough I get different tags if I run this test separately
|
||||
# or as part of a bunch of tests.
|
||||
#xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
|
||||
# xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice",
|
||||
"EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[0]["cert_file"],
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
0]["cert_file"],
|
||||
pre_encryption_part(),
|
||||
node_xpath=node_xpath)
|
||||
|
||||
#seresp = samlp.response_from_string(enctext)
|
||||
# seresp = samlp.response_from_string(enctext)
|
||||
|
||||
resp_str = base64.encodestring(enctext.encode('utf-8'))
|
||||
# Now over to the client side
|
||||
@@ -944,12 +982,13 @@ class TestClient:
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"_012345": "http://foo.example.com/service"})
|
||||
|
||||
#assert resp.encrypted_assertion == []
|
||||
# assert resp.encrypted_assertion == []
|
||||
assert resp.assertion
|
||||
assert resp.assertion.advice
|
||||
assert resp.assertion.advice.assertion
|
||||
assert resp.ava == \
|
||||
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['test.testsson@test.se']}
|
||||
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'],
|
||||
'email': ['test.testsson@test.se']}
|
||||
|
||||
def test_sign_then_encrypt_assertion_advice_2(self):
|
||||
# Begin with the IdPs side
|
||||
@@ -960,11 +999,18 @@ class TestClient:
|
||||
|
||||
asser_1 = Assertion({"givenName": "Derek"})
|
||||
|
||||
subject_confirmation_specs = {
|
||||
'recipient': "http://lingon.catalogix.se:8087/",
|
||||
'in_response_to': "_012345",
|
||||
'subject_confirmation_method': saml.SCM_BEARER
|
||||
}
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to',
|
||||
'_012345'])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient',
|
||||
"http://lingon.catalogix.se:8087/"])
|
||||
name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
|
||||
|
||||
assertion_1 = asser_1.construct(
|
||||
@@ -975,7 +1021,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
asser_2 = Assertion({"surName": "Jeter"})
|
||||
|
||||
@@ -987,7 +1033,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_asser_1 = Assertion({"uid": "test01"})
|
||||
a_assertion_1 = a_asser_1.construct(
|
||||
@@ -998,8 +1044,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_asser_2 = Assertion({"email": "test.testsson@test.se"})
|
||||
a_assertion_2 = a_asser_2.construct(
|
||||
@@ -1010,7 +1055,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_asser_3 = Assertion({"street": "street"})
|
||||
a_assertion_3 = a_asser_3.construct(
|
||||
@@ -1021,7 +1066,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_asser_4 = Assertion({"title": "title"})
|
||||
a_assertion_4 = a_asser_4.construct(
|
||||
@@ -1032,7 +1077,7 @@ class TestClient:
|
||||
authn_class=INTERNETPROTOCOLPASSWORD,
|
||||
authn_auth="http://www.example.com/login",
|
||||
name_id=name_id,
|
||||
subject_confirmation_specs=subject_confirmation_specs)
|
||||
farg=farg['assertion'])
|
||||
|
||||
a_assertion_1.signature = sigver.pre_signature_part(
|
||||
a_assertion_1.id, _sec.my_cert, 1)
|
||||
@@ -1046,9 +1091,11 @@ class TestClient:
|
||||
a_assertion_4.signature = sigver.pre_signature_part(
|
||||
a_assertion_4.id, _sec.my_cert, 1)
|
||||
|
||||
assertion_1.signature = sigver.pre_signature_part(assertion_1.id, _sec.my_cert, 1)
|
||||
assertion_1.signature = sigver.pre_signature_part(assertion_1.id,
|
||||
_sec.my_cert, 1)
|
||||
|
||||
assertion_2.signature = sigver.pre_signature_part(assertion_2.id, _sec.my_cert, 1)
|
||||
assertion_2.signature = sigver.pre_signature_part(assertion_2.id,
|
||||
_sec.my_cert, 1)
|
||||
|
||||
response = sigver.response_factory(
|
||||
in_response_to="_012345",
|
||||
@@ -1062,9 +1109,11 @@ class TestClient:
|
||||
response.assertion.advice = Advice()
|
||||
|
||||
response.assertion.advice.encrypted_assertion = []
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion.append(
|
||||
EncryptedAssertion())
|
||||
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_1)
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(
|
||||
a_assertion_1)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_1._to_element_tree().tag
|
||||
@@ -1072,22 +1121,29 @@ class TestClient:
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_1),
|
||||
response = _sec.sign_statement("%s" % response,
|
||||
class_name(a_assertion_1),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_1.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice",
|
||||
"EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[1]["cert_file"],
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
1]["cert_file"],
|
||||
pre_encryption_part(),
|
||||
node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = response.assertion[0]
|
||||
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_2)
|
||||
response.assertion.advice.encrypted_assertion.append(
|
||||
EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(
|
||||
a_assertion_2)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_2._to_element_tree().tag
|
||||
@@ -1095,15 +1151,20 @@ class TestClient:
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_2),
|
||||
response = _sec.sign_statement("%s" % response,
|
||||
class_name(a_assertion_2),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_2.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice",
|
||||
"EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[0]["cert_file"],
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
0]["cert_file"],
|
||||
pre_encryption_part(),
|
||||
node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
@@ -1111,14 +1172,17 @@ class TestClient:
|
||||
|
||||
assertion_tag = response.assertion._to_element_tree().tag
|
||||
response = pre_encrypt_assertion(response)
|
||||
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
|
||||
response = \
|
||||
response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
|
||||
assertion_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(assertion_1),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=assertion_1.id)
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[1]["cert_file"],
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
1]["cert_file"],
|
||||
pre_encryption_part())
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
@@ -1128,9 +1192,11 @@ class TestClient:
|
||||
response.assertion.advice = Advice()
|
||||
|
||||
response.assertion.advice.encrypted_assertion = []
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion.append(
|
||||
EncryptedAssertion())
|
||||
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_3)
|
||||
response.assertion.advice.encrypted_assertion[0].add_extension_element(
|
||||
a_assertion_3)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_3._to_element_tree().tag
|
||||
@@ -1138,23 +1204,30 @@ class TestClient:
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_3),
|
||||
response = _sec.sign_statement("%s" % response,
|
||||
class_name(a_assertion_3),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_3.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice",
|
||||
"EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[0]["cert_file"],
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
0]["cert_file"],
|
||||
pre_encryption_part(),
|
||||
node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response.assertion = response.assertion[0]
|
||||
|
||||
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
|
||||
response.assertion.advice.encrypted_assertion.append(
|
||||
EncryptedAssertion())
|
||||
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_4)
|
||||
response.assertion.advice.encrypted_assertion[1].add_extension_element(
|
||||
a_assertion_4)
|
||||
|
||||
advice_tag = response.assertion.advice._to_element_tree().tag
|
||||
assertion_tag = a_assertion_4._to_element_tree().tag
|
||||
@@ -1162,25 +1235,31 @@ class TestClient:
|
||||
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
|
||||
assertion_tag, advice_tag)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(a_assertion_4),
|
||||
response = _sec.sign_statement("%s" % response,
|
||||
class_name(a_assertion_4),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=a_assertion_4.id)
|
||||
|
||||
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
|
||||
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
|
||||
["Response", "Assertion", "Advice",
|
||||
"EncryptedAssertion", "Assertion"]])
|
||||
|
||||
enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.encryption_keypairs[1]["cert_file"],
|
||||
pre_encryption_part(), node_xpath=node_xpath)
|
||||
enctext = _sec.crypto.encrypt_assertion(response,
|
||||
self.client.sec.encryption_keypairs[
|
||||
1]["cert_file"],
|
||||
pre_encryption_part(),
|
||||
node_xpath=node_xpath)
|
||||
|
||||
response = samlp.response_from_string(enctext)
|
||||
|
||||
response = _sec.sign_statement("%s" % response, class_name(response.assertion[0]),
|
||||
response = _sec.sign_statement("%s" % response,
|
||||
class_name(response.assertion[0]),
|
||||
key_file=self.server.sec.key_file,
|
||||
node_id=response.assertion[0].id)
|
||||
|
||||
response = samlp.response_from_string(response)
|
||||
|
||||
#seresp = samlp.response_from_string(enctext)
|
||||
# seresp = samlp.response_from_string(enctext)
|
||||
|
||||
resp_str = base64.encodestring(str(response).encode('utf-8'))
|
||||
# Now over to the client side
|
||||
@@ -1188,13 +1267,14 @@ class TestClient:
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"_012345": "http://foo.example.com/service"})
|
||||
|
||||
#assert resp.encrypted_assertion == []
|
||||
# assert resp.encrypted_assertion == []
|
||||
assert resp.assertion
|
||||
assert resp.assertion.advice
|
||||
assert resp.assertion.advice.assertion
|
||||
assert resp.ava == \
|
||||
{'street': ['street'], 'uid': ['test01'], 'title': ['title'], 'givenName': ['Derek'], 'email':
|
||||
['test.testsson@test.se'], 'sn': ['Jeter']}
|
||||
{'street': ['street'], 'uid': ['test01'], 'title': ['title'],
|
||||
'givenName': ['Derek'], 'email':
|
||||
['test.testsson@test.se'], 'sn': ['Jeter']}
|
||||
|
||||
def test_signed_redirect(self):
|
||||
|
||||
@@ -1238,7 +1318,8 @@ class TestClient:
|
||||
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
|
||||
|
||||
resp = client.do_logout(nid, entity_ids, "Tired", in_a_while(minutes=5),
|
||||
sign=True, expected_binding=BINDING_HTTP_REDIRECT)
|
||||
sign=True,
|
||||
expected_binding=BINDING_HTTP_REDIRECT)
|
||||
|
||||
assert list(resp.keys()) == entity_ids
|
||||
binding, info = resp[entity_ids[0]]
|
||||
@@ -1354,7 +1435,8 @@ class TestClientWithDummy():
|
||||
def test_do_negotiated_authn(self):
|
||||
binding = BINDING_HTTP_REDIRECT
|
||||
response_binding = BINDING_HTTP_POST
|
||||
sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate(
|
||||
sid, auth_binding, http_args = \
|
||||
self.client.prepare_for_negotiated_authenticate(
|
||||
IDP, "http://www.example.com/relay_state",
|
||||
binding=binding, response_binding=response_binding)
|
||||
|
||||
@@ -1432,7 +1514,7 @@ class TestClientWithDummy():
|
||||
{sid: "/"})
|
||||
ac = resp.assertion.authn_statement[0].authn_context
|
||||
assert ac.authenticating_authority[0].text == \
|
||||
'http://www.example.com/login'
|
||||
'http://www.example.com/login'
|
||||
assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
|
||||
|
||||
def test_negotiated_post_sso(self):
|
||||
@@ -1467,7 +1549,7 @@ class TestClientWithDummy():
|
||||
{sid: "/"})
|
||||
ac = resp.assertion.authn_statement[0].authn_context
|
||||
assert ac.authenticating_authority[0].text == \
|
||||
'http://www.example.com/login'
|
||||
'http://www.example.com/login'
|
||||
assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user