Fixed tests

This commit is contained in:
Roland Hedberg
2013-04-15 11:05:48 +02:00
parent 68ab79f49d
commit a3a16f02c8
8 changed files with 440 additions and 158 deletions

109
tests/idp_conf_mdb.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2 import BINDING_SOAP, BINDING_URI
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_ARTIFACT
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAME_FORMAT_URI
from pathutils import full_path, xmlsec_path
BASE = "http://localhost:8088"
CONFIG = {
"entityid": "%s/saml/idp" % BASE,
"name": "Rolands IdP",
"service": {
"aa": {
"endpoints": {
"attribute_service": [
("%s/aap" % BASE, BINDING_HTTP_POST),
("%s/aas" % BASE, BINDING_SOAP)
]
},
},
"aq": {
"endpoints": {
"authn_query_service": [
("%s/aqs" % BASE, BINDING_SOAP)
]
},
},
"idp": {
"endpoints": {
"single_sign_on_service": [
("%s/sso/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/sso/post" % BASE, BINDING_HTTP_POST),
("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT),
("%s/sso/paos" % BASE, BINDING_SOAP)
],
"single_logout_service": [
("%s/slo/soap" % BASE, BINDING_SOAP),
("%s/slo/post" % BASE, BINDING_HTTP_POST)
],
"artifact_resolution_service": [
("%s/ars" % BASE, BINDING_SOAP)
],
"assertion_id_request_service": [
("%s/airs" % BASE, BINDING_URI)
],
"authn_query_service": [
("%s/aqs" % BASE, BINDING_SOAP)
],
"manage_name_id_service": [
("%s/mni/soap" % BASE, BINDING_SOAP),
("%s/mni/post" % BASE, BINDING_HTTP_POST),
("%s/mni/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/mni/art" % BASE, BINDING_HTTP_ARTIFACT)
],
"name_id_mapping_service": [
("%s/nim/soap" % BASE, BINDING_SOAP),
("%s/nim/post" % BASE, BINDING_HTTP_POST),
("%s/nim/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/nim/art" % BASE, BINDING_HTTP_ARTIFACT)
]
},
"policy": {
"default": {
"lifetime": {"minutes": 15},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI,
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"nameid_format": NAMEID_FORMAT_PERSISTENT,
# "attribute_restrictions":{
# "givenName": None,
# "surName": None,
# }
}
},
"subject_data": ("mongodb", "subject"),
"session_storage": ("mongodb", "session")
},
},
"debug": 1,
"key_file": full_path("test.key"),
"cert_file": full_path("test.pem"),
"xmlsec_binary": xmlsec_path,
"metadata": {
"local": [full_path("servera.xml"),
full_path("vo_metadata.xml")],
},
"attribute_map_dir": full_path("attributemaps"),
"organization": {
"name": "Exempel AB",
"display_name": [("Exempel ÄB", "se"), ("Example Co.", "en")],
"url": "http://www.example.com/roland",
},
"contact_person": [
{
"given_name":"John",
"sur_name": "Smith",
"email_address": ["john.smith@example.com"],
"contact_type": "technical",
},
],
}

109
tests/idp_conf_mdb2.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2 import BINDING_SOAP, BINDING_URI
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_ARTIFACT
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAME_FORMAT_URI
from pathutils import full_path, xmlsec_path
BASE = "http://localhost:8089"
CONFIG = {
"entityid": "%s/saml/idp2" % BASE,
"name": "Rolands 2nd IdP",
"service": {
"aa": {
"endpoints": {
"attribute_service": [
("%s/aap" % BASE, BINDING_HTTP_POST),
("%s/aas" % BASE, BINDING_SOAP)
]
},
},
"aq": {
"endpoints": {
"authn_query_service": [
("%s/aqs" % BASE, BINDING_SOAP)
]
},
},
"idp": {
"endpoints": {
"single_sign_on_service": [
("%s/sso/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/sso/post" % BASE, BINDING_HTTP_POST),
("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT),
("%s/sso/paos" % BASE, BINDING_SOAP)
],
"single_logout_service": [
("%s/slo/soap" % BASE, BINDING_SOAP),
("%s/slo/post" % BASE, BINDING_HTTP_POST)
],
"artifact_resolution_service": [
("%s/ars" % BASE, BINDING_SOAP)
],
"assertion_id_request_service": [
("%s/airs" % BASE, BINDING_URI)
],
"authn_query_service": [
("%s/aqs" % BASE, BINDING_SOAP)
],
"manage_name_id_service": [
("%s/mni/soap" % BASE, BINDING_SOAP),
("%s/mni/post" % BASE, BINDING_HTTP_POST),
("%s/mni/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/mni/art" % BASE, BINDING_HTTP_ARTIFACT)
],
"name_id_mapping_service": [
("%s/nim/soap" % BASE, BINDING_SOAP),
("%s/nim/post" % BASE, BINDING_HTTP_POST),
("%s/nim/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/nim/art" % BASE, BINDING_HTTP_ARTIFACT)
]
},
"policy": {
"default": {
"lifetime": {"minutes": 15},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI,
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"nameid_format": NAMEID_FORMAT_PERSISTENT,
# "attribute_restrictions":{
# "givenName": None,
# "surName": None,
# }
}
},
"subject_data": ("mongodb", "subject"),
"session_storage": ("mongodb", "session")
},
},
"debug": 1,
"key_file": full_path("test.key"),
"cert_file": full_path("test.pem"),
"xmlsec_binary": xmlsec_path,
"metadata": {
"local": [full_path("servera.xml"),
full_path("vo_metadata.xml")],
},
"attribute_map_dir": full_path("attributemaps"),
"organization": {
"name": "Exempel AB",
"display_name": [("Exempel ÄB", "se"), ("Example Co.", "en")],
"url": "http://www.example.com/roland",
},
"contact_person": [
{
"given_name":"John",
"sur_name": "Smith",
"email_address": ["john.smith@example.com"],
"contact_type": "technical",
},
],
}

View File

@@ -6,25 +6,26 @@ CONFIG={
"description": "My own SP",
"service": {
"sp": {
"endpoints":{
"assertion_consumer_service": ["http://lingon.catalogix.se:8087/"],
"endpoints": {
"assertion_consumer_service": [
"http://lingon.catalogix.se:8087/"],
},
"required_attributes": ["surName", "givenName", "mail"],
"optional_attributes": ["title"],
"idp": ["urn:mace:example.com:saml:roland:idp"],
}
},
"debug" : 1,
"key_file" : full_path("test.key"),
"cert_file" : full_path("test.pem"),
"debug": 1,
"key_file": full_path("test.key"),
"cert_file": full_path("test.pem"),
"ca_certs": full_path("cacerts.txt"),
"xmlsec_binary" : xmlsec_path,
"xmlsec_binary": xmlsec_path,
"metadata": {
"local": [full_path("idp.xml"), full_path("vo_metadata.xml")],
},
"virtual_organization" : {
"virtual_organization": {
"urn:mace:example.com:it:tek":{
"nameid_format" : "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID",
"nameid_format": "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID",
"common_identifier": "umuselin",
}
},

View File

@@ -25,17 +25,20 @@ ONTS = {
xmlenc.NAMESPACE: xmlenc
}
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2)
def _class(cls):
return "%s&%s" % (cls.c_namespace, cls.c_tag)
def test_construct_contact():
c = from_dict({
"__class__": _class(md.ContactPerson),
"given_name":{"text":"Roland", "__class__": _class(md.GivenName)},
"sur_name": {"text":"Hedberg", "__class__": _class(md.SurName)},
"given_name": {"text": "Roland", "__class__": _class(md.GivenName)},
"sur_name": {"text": "Hedberg", "__class__": _class(md.SurName)},
"email_address": [{"text":"roland@catalogix.se",
"__class__": _class(md.EmailAddress)}],
}, ONTS)
@@ -44,4 +47,4 @@ def test_construct_contact():
assert c.given_name.text == "Roland"
assert c.sur_name.text == "Hedberg"
assert c.email_address[0].text == "roland@catalogix.se"
assert _eq(c.keyswv(), ["given_name","sur_name","email_address"])
assert _eq(c.keyswv(), ["given_name", "sur_name", "email_address"])

View File

@@ -10,18 +10,19 @@ from saml2 import samlp, saml, client, config
from saml2 import s_utils
from saml2 import sigver
from saml2 import time_util
from saml2.s_utils import OtherError, UnsupportedBinding
from saml2.s_utils import OtherError
from saml2.s_utils import do_attribute_statement, factory
from saml2.soap import make_soap_enveloped_saml_thingy
from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from py.test import raises
import os
nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="123456")
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2)
@@ -39,25 +40,27 @@ class TestServer1():
def test_issuer(self):
issuer = self.server._issuer()
assert isinstance(issuer, saml.Issuer)
assert _eq(issuer.keyswv(), ["text","format"])
assert _eq(issuer.keyswv(), ["text", "format"])
assert issuer.format == saml.NAMEID_FORMAT_ENTITY
assert issuer.text == self.server.config.entityid
def test_assertion(self):
assertion = s_utils.assertion_factory(
subject= factory(saml.Subject, text="_aaa",
name_id=factory(saml.NameID,
format=saml.NAMEID_FORMAT_TRANSIENT)),
attribute_statement = do_attribute_statement({
("","","surName"): ("Jeter",""),
("","","givenName") :("Derek",""),
}),
subject=factory(
saml.Subject, text="_aaa",
name_id=factory(saml.NameID,
format=saml.NAMEID_FORMAT_TRANSIENT)),
attribute_statement=do_attribute_statement(
{
("", "", "surName"): ("Jeter", ""),
("", "", "givenName"): ("Derek", ""),
}
),
issuer=self.server._issuer(),
)
)
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert _eq(assertion.keyswv(), ['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
#
@@ -77,31 +80,33 @@ class TestServer1():
assert attr0.attribute_value[0].text == "Jeter"
#
subject = assertion.subject
assert _eq(subject.keyswv(),["text", "name_id"])
assert _eq(subject.keyswv(), ["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
response = sigver.response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
assertion=s_utils.assertion_factory(
subject = factory( saml.Subject, text="_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = do_attribute_statement({
("","","surName"): ("Jeter",""),
("","","givenName") :("Derek",""),
}),
issuer=self.server._issuer(),
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
assertion=s_utils.assertion_factory(
subject=factory(saml.Subject, text="_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement=do_attribute_statement(
{
("", "", "surName"): ("Jeter", ""),
("", "", "givenName"): ("Derek", ""),
}
),
issuer=self.server._issuer(),
)
),
issuer=self.server._issuer(),
)
print response.keyswv()
assert _eq(response.keyswv(),['destination', 'assertion','status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert _eq(response.keyswv(), ['destination', 'assertion', 'status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert response.destination == "https:#www.example.com"
@@ -113,25 +118,24 @@ class TestServer1():
def test_parse_faulty_request(self):
authn_request = self.client.create_authn_request(
destination = "http://www.example.com",
id = "id1")
destination="http://www.example.com", id="id1")
# should raise an error because faulty spentityid
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
htargs = self.client.apply_binding(
binding, "%s" % authn_request, "http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
raises(OtherError, self.server.parse_authn_request,
_dict["SAMLRequest"][0], binding)
_dict["SAMLRequest"][0], binding)
def test_parse_faulty_request_to_err_status(self):
authn_request = self.client.create_authn_request(
destination = "http://www.example.com")
destination="http://www.example.com")
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
@@ -147,7 +151,7 @@ class TestServer1():
assert _eq(status.keyswv(), ["status_code", "status_message"])
assert status.status_message.text == 'Not destined for me!'
status_code = status.status_code
assert _eq(status_code.keyswv(), ["status_code","value"])
assert _eq(status_code.keyswv(), ["status_code", "value"])
assert status_code.value == samlp.STATUS_RESPONDER
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
@@ -175,25 +179,26 @@ class TestServer1():
def test_sso_response_with_identity(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp",
"id12")
"urn:mace:example.com:saml:roland:sp", "id12")
resp = self.server.create_authn_response(
{"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
{
"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"
},
"id12", # in_response_to
"http://localhost:8087/", # destination
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=(AUTHN_PASSWORD, "http://www.example.com/login")
)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id', 'issuer'])
assert _eq(resp.keyswv(), ['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "id12"
assert resp.status
@@ -229,17 +234,17 @@ class TestServer1():
def test_sso_response_without_identity(self):
resp = self.server.create_authn_response(
{},
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=(AUTHN_PASSWORD, "http://www.example.com/login")
)
{},
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=(AUTHN_PASSWORD, "http://www.example.com/login")
)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "id12"
assert resp.status
@@ -249,22 +254,21 @@ class TestServer1():
def test_sso_failure_response(self):
exc = s_utils.MissingValue("eduPersonAffiliation missing")
resp = self.server.create_error_response("id12",
"http://localhost:8087/",
exc )
resp = self.server.create_error_response(
"id12", "http://localhost:8087/", exc)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "id12"
assert resp.status
print resp.status
assert resp.status.status_code.value == samlp.STATUS_RESPONDER
assert resp.status.status_code.status_code.value == \
samlp.STATUS_REQUEST_UNSUPPORTED
samlp.STATUS_REQUEST_UNSUPPORTED
assert resp.status.status_message.text == \
"eduPersonAffiliation missing"
"eduPersonAffiliation missing"
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert not resp.assertion
@@ -275,24 +279,22 @@ class TestServer1():
conf.load_file("server_conf")
self.client = client.Saml2Client(conf)
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true")
resp_str = "%s" % self.server.create_authn_response(
ava, "id1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
npolicy,
"foba0001@example.com",
authn=(AUTHN_PASSWORD,
ava, "id1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp", npolicy,
"foba0001@example.com", authn=(AUTHN_PASSWORD,
"http://www.example.com/login"))
response = samlp.response_from_string(resp_str)
print response.keyswv()
assert _eq(response.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant', 'version',
'issuer', 'id'])
assert _eq(response.keyswv(), ['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
print response.assertion[0].keyswv()
assert len(response.assertion) == 1
assert _eq(response.assertion[0].keyswv(), ['attribute_statement',
@@ -308,19 +310,18 @@ class TestServer1():
def test_signed_response(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp",
"id12")
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id,
sign_assertion=True
)
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_assertion=True
)
print signed_resp
assert signed_resp
@@ -339,7 +340,7 @@ class TestServer1():
sinfo = {
"name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after" : soon,
"not_on_or_after": soon,
"user": {
"givenName": "Leo",
"surName": "Laport",
@@ -393,6 +394,7 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"],
"mail": ["foo@gmail.com"], "title": "The man"}
class TestServer2():
def setup_class(self):
self.server = Server("restrictive_idp_conf")
@@ -403,9 +405,9 @@ class TestServer2():
def test_do_attribute_reponse(self):
aa_policy = self.server.config.getattr("policy", "idp")
print aa_policy.__dict__
response = self.server.create_attribute_response(IDENTITY.copy(), "aaa",
"http://example.com/sp/",
"urn:mace:example.com:sp:1")
response = self.server.create_attribute_response(
IDENTITY.copy(), "aaa", "http://example.com/sp/",
"urn:mace:example.com:sp:1")
assert response is not None
assert response.destination == "http://example.com/sp/"
@@ -422,6 +424,7 @@ class TestServer2():
subject_confirmation = subject.subject_confirmation
assert subject_confirmation.subject_confirmation_data.in_response_to == "aaa"
def _logout_request(conf_file):
conf = config.SPConfig()
conf.load_file(conf_file)
@@ -431,7 +434,7 @@ def _logout_request(conf_file):
sinfo = {
"name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after" : soon,
"not_on_or_after": soon,
"user": {
"givenName": "Leo",
"surName": "Laport",
@@ -440,10 +443,11 @@ def _logout_request(conf_file):
sp.users.add_information_about_person(sinfo)
return sp.create_logout_request(
name_id = nid,
destination = "http://localhost:8088/slo",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
name_id=nid,
destination="http://localhost:8088/slo",
issuer_entity_id="urn:mace:example.com:saml:roland:idp",
reason="I'm tired of this")
class TestServerLogout():
@@ -463,3 +467,9 @@ class TestServerLogout():
assert len(http_args) == 4
assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == []
if __name__ == "__main__":
ts = TestServer1()
ts.setup_class()
ts.test_authn_response_0()

View File

@@ -19,12 +19,14 @@ from saml2.time_util import in_a_while
from py.test import raises
from fakeIDP import FakeIDP, unpack_form
def for_me(condition, me ):
for restriction in condition.audience_restriction:
audience = restriction.audience
if audience.text.strip() == me:
return True
def ava(attribute_statement):
result = {}
for attribute in attribute_statement.attribute:
@@ -35,6 +37,7 @@ def ava(attribute_statement):
result[name].append(value.text.strip())
return result
def _leq(l1, l2):
return set(l1) == set(l2)
@@ -107,14 +110,14 @@ class TestClient:
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
attribute={
("urn:oid:2.5.4.42",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"givenName"): None,
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"givenName"): None,
("urn:oid:2.5.4.4",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"surname"): None,
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"surname"): None,
("urn:oid:1.2.840.113549.1.9.1",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None,
},
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None,
},
format=saml.NAMEID_FORMAT_PERSISTENT,
message_id="id1")
@@ -140,10 +143,10 @@ class TestClient:
seen.append("surname")
elif attribute.name == "urn:oid:1.2.840.113549.1.9.1":
assert attribute.name_format == saml.NAME_FORMAT_URI
if getattr(attribute,"friendly_name"):
if getattr(attribute, "friendly_name"):
assert False
seen.append("email")
assert _leq(seen,["givenName", "surname", "email"])
assert _leq(seen, ["givenName", "surname", "email"])
def test_create_attribute_query_3(self):
req = self.client.create_attribute_query(
@@ -162,25 +165,6 @@ class TestClient:
assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT
assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"
# def test_idp_entry(self):
# idp_entry = self.client.idp_entry(name="Umeå Universitet",
# location="https://idp.umu.se/")
#
# assert idp_entry.name == "Umeå Universitet"
# assert idp_entry.loc == "https://idp.umu.se/"
#
# def test_scope(self):
# entity_id = "urn:mace:example.com:saml:roland:idp"
# locs = self.client.metadata.single_sign_on_services(entity_id)
# scope = self.client.scoping_from_metadata(entity_id, locs)
#
# assert scope.idp_list
# assert len(scope.idp_list.idp_entry) == 1
# idp_entry = scope.idp_list.idp_entry[0]
# assert idp_entry.name == 'Exempel AB'
# assert idp_entry.loc == ['http://localhost:8088/sso']
def test_create_auth_request_0(self):
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso", message_id="id1")
@@ -198,7 +182,7 @@ class TestClient:
def test_create_auth_request_vo(self):
assert self.client.config.vorg.keys() == [
"urn:mace:example.com:it:tek"]
"urn:mace:example.com:it:tek"]
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
@@ -294,8 +278,8 @@ class TestClient:
# --- authenticate another person
ava = { "givenName": ["Alfonson"], "surName": ["Soriano"],
"mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]}
ava = {"givenName": ["Alfonson"], "surName": ["Soriano"],
"mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]}
resp_str = "%s" % self.server.create_authn_response(
identity=ava,
@@ -308,8 +292,9 @@ class TestClient:
resp_str = base64.encodestring(resp_str)
self.client.parse_authn_request_response(resp_str, BINDING_HTTP_POST,
{"id2":"http://foo.example.com/service"})
self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"id2": "http://foo.example.com/service"})
# Two persons in the cache
assert len(self.client.users.subjects()) == 2
@@ -336,6 +321,8 @@ class TestClient:
# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"
class TestClientWithDummy():
def setup_class(self):
self.server = FakeIDP("idp_all_conf")
@@ -347,8 +334,8 @@ class TestClientWithDummy():
self.client.send = self.server.receive
def test_do_authn(self):
sid, http_args = self.client.prepare_for_authenticate(IDP,
"http://www.example.com/relay_state")
sid, http_args = self.client.prepare_for_authenticate(
IDP, "http://www.example.com/relay_state")
assert isinstance(sid, basestring)
assert len(http_args) == 4
@@ -356,11 +343,10 @@ class TestClientWithDummy():
assert http_args["data"] == []
def test_do_attribute_query(self):
response = self.client.do_attribute_query(IDP,
"_e7b68a04488f715cda642fbdd90099f5",
attribute={"eduPersonAffiliation":None},
nameid_format=NAMEID_FORMAT_TRANSIENT)
response = self.client.do_attribute_query(
IDP, "_e7b68a04488f715cda642fbdd90099f5",
attribute={"eduPersonAffiliation":None},
nameid_format=NAMEID_FORMAT_TRANSIENT)
def test_logout_1(self):
""" one IdP/AA logout from"""
@@ -389,9 +375,8 @@ class TestClientWithDummy():
def test_post_sso(self):
sid, http_args = self.client.prepare_for_authenticate(
"urn:mace:example.com:saml:roland:idp",
relay_state="really",
binding=BINDING_HTTP_POST)
"urn:mace:example.com:saml:roland:idp", relay_state="really",
binding=BINDING_HTTP_POST)
# Normally a response would now be sent back to the users web client
# Here I fake what the client will do
@@ -415,3 +400,8 @@ class TestClientWithDummy():
'http://www.example.com/login'
assert ac.authn_context_class_ref.text == AUTHN_PASSWORD
# if __name__ == "__main__":
# tc = TestClient()
# tc.setup_class()
# tc.test_response()

View File

@@ -15,6 +15,7 @@ from saml2.server import Server
TAG1 = "name=\"SAMLRequest\" value="
def get_msg(hinfo, binding):
if binding == BINDING_SOAP:
xmlstr = hinfo["data"]
@@ -24,7 +25,7 @@ def get_msg(hinfo, binding):
i += len(TAG1) + 1
j = _inp.find('"', i)
xmlstr = _inp[i:j]
else: # BINDING_HTTP_REDIRECT
else: # BINDING_HTTP_REDIRECT
parts = urlparse(hinfo["headers"][0][1])
xmlstr = parse_qs(parts.query)["SAMLRequest"][0]
@@ -32,6 +33,7 @@ def get_msg(hinfo, binding):
# ------------------------------------------------------------------------
def test_basic():
sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
@@ -43,7 +45,8 @@ def test_basic():
authn_context_class_ref=AuthnContextClassRef(
text=AUTHN_PASSWORD))]
subject = Subject(text="abc", name_id=NameID(format=NAMEID_FORMAT_TRANSIENT))
subject = Subject(text="abc",
name_id=NameID(format=NAMEID_FORMAT_TRANSIENT))
aq = sp.create_authn_query(subject, destination, authn_context)
@@ -51,15 +54,17 @@ def test_basic():
assert isinstance(aq, AuthnQuery)
def test_flow():
sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
relay_state = "FOO"
# -- dummy request ---
orig_req = AuthnRequest(issuer=sp._issuer(),
name_id_policy=NameIDPolicy(allow_create="true",
format=NAMEID_FORMAT_TRANSIENT))
orig_req = AuthnRequest(
issuer=sp._issuer(),
name_id_policy=NameIDPolicy(allow_create="true",
format=NAMEID_FORMAT_TRANSIENT))
# == Create an AuthnRequest response
@@ -84,7 +89,7 @@ def test_flow():
xmlstr = get_msg(hinfo, binding)
aresp = sp.parse_authn_request_response(xmlstr, binding,
{resp.in_response_to :"/"})
{resp.in_response_to: "/"})
binding, destination = sp.pick_binding("authn_query_service",
entity_id=idp.config.entityid)
@@ -113,7 +118,6 @@ def test_flow():
msg = pm.message
assert msg.id == aq.id
p_res = idp.create_authn_query_response(msg.subject, msg.session_index,
msg.requested_authn_context)
@@ -131,3 +135,6 @@ def test_flow():
print final
assert final.response.id == p_res.id
if __name__ == "__main__":
test_flow()

View File

@@ -1 +1,54 @@
from saml2 import BINDING_HTTP_POST
from saml2.saml import AUTHN_PASSWORD
from saml2.client import Saml2Client
from saml2.server import Server
__author__ = 'rolandh'
def _eq(l1, l2):
return set(l1) == set(l2)
def test_flow():
sp = Saml2Client(config_file="servera_conf")
idp1 = Server(config_file="idp_conf_mdb")
idp2 = Server(config_file="idp_conf_mdb")
# clean out database
idp1.ident.mdb.db.drop()
# -- dummy request ---
orig_req = sp.create_authn_request(idp1.config.entityid)
# == Create an AuthnRequest response
rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
#name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
userid="jeter",
authn=(AUTHN_PASSWORD,
"http://www.example.com/login"),
**rinfo)
# What's stored away is the assertion
a_info = idp2.session_db.get_assertion(resp.assertion.id)
# Make sure what I got back from MongoDB is the same as I put in
assert a_info["assertion"] == resp.assertion
# By subject
nid = resp.assertion.subject.name_id
_assertion = idp2.session_db.get_assertions_by_subject(nid)
assert len(_assertion) == 1
assert _assertion[0] == resp.assertion
nids = idp2.ident.find_nameid("jeter")
assert len(nids) == 1
if __name__ == "__main__":
test_flow()