diff --git a/tests/idp_conf_mdb.py b/tests/idp_conf_mdb.py new file mode 100644 index 0000000..f997263 --- /dev/null +++ b/tests/idp_conf_mdb.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from saml2 import BINDING_SOAP, BINDING_URI +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_HTTP_ARTIFACT +from saml2.saml import NAMEID_FORMAT_PERSISTENT +from saml2.saml import NAME_FORMAT_URI + +from pathutils import full_path, xmlsec_path + +BASE = "http://localhost:8088" + +CONFIG = { + "entityid": "%s/saml/idp" % BASE, + "name": "Rolands IdP", + "service": { + "aa": { + "endpoints": { + "attribute_service": [ + ("%s/aap" % BASE, BINDING_HTTP_POST), + ("%s/aas" % BASE, BINDING_SOAP) + ] + }, + }, + "aq": { + "endpoints": { + "authn_query_service": [ + ("%s/aqs" % BASE, BINDING_SOAP) + ] + }, + }, + "idp": { + "endpoints": { + "single_sign_on_service": [ + ("%s/sso/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/sso/post" % BASE, BINDING_HTTP_POST), + ("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT), + ("%s/sso/paos" % BASE, BINDING_SOAP) + ], + "single_logout_service": [ + ("%s/slo/soap" % BASE, BINDING_SOAP), + ("%s/slo/post" % BASE, BINDING_HTTP_POST) + ], + "artifact_resolution_service": [ + ("%s/ars" % BASE, BINDING_SOAP) + ], + "assertion_id_request_service": [ + ("%s/airs" % BASE, BINDING_URI) + ], + "authn_query_service": [ + ("%s/aqs" % BASE, BINDING_SOAP) + ], + "manage_name_id_service": [ + ("%s/mni/soap" % BASE, BINDING_SOAP), + ("%s/mni/post" % BASE, BINDING_HTTP_POST), + ("%s/mni/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/mni/art" % BASE, BINDING_HTTP_ARTIFACT) + ], + "name_id_mapping_service": [ + ("%s/nim/soap" % BASE, BINDING_SOAP), + ("%s/nim/post" % BASE, BINDING_HTTP_POST), + ("%s/nim/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/nim/art" % BASE, BINDING_HTTP_ARTIFACT) + ] + }, + "policy": { + "default": { + "lifetime": {"minutes": 15}, + "attribute_restrictions": None, # means all I have + "name_form": NAME_FORMAT_URI, + }, + "urn:mace:example.com:saml:roland:sp": { + "lifetime": {"minutes": 5}, + "nameid_format": NAMEID_FORMAT_PERSISTENT, + # "attribute_restrictions":{ + # "givenName": None, + # "surName": None, + # } + } + }, + "subject_data": ("mongodb", "subject"), + "session_storage": ("mongodb", "session") + }, + }, + "debug": 1, + "key_file": full_path("test.key"), + "cert_file": full_path("test.pem"), + "xmlsec_binary": xmlsec_path, + "metadata": { + "local": [full_path("servera.xml"), + full_path("vo_metadata.xml")], + }, + "attribute_map_dir": full_path("attributemaps"), + "organization": { + "name": "Exempel AB", + "display_name": [("Exempel ÄB", "se"), ("Example Co.", "en")], + "url": "http://www.example.com/roland", + }, + "contact_person": [ + { + "given_name":"John", + "sur_name": "Smith", + "email_address": ["john.smith@example.com"], + "contact_type": "technical", + }, + ], +} + diff --git a/tests/idp_conf_mdb2.py b/tests/idp_conf_mdb2.py new file mode 100644 index 0000000..edd86c5 --- /dev/null +++ b/tests/idp_conf_mdb2.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from saml2 import BINDING_SOAP, BINDING_URI +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_HTTP_ARTIFACT +from saml2.saml import NAMEID_FORMAT_PERSISTENT +from saml2.saml import NAME_FORMAT_URI + +from pathutils import full_path, xmlsec_path + +BASE = "http://localhost:8089" + +CONFIG = { + "entityid": "%s/saml/idp2" % BASE, + "name": "Rolands 2nd IdP", + "service": { + "aa": { + "endpoints": { + "attribute_service": [ + ("%s/aap" % BASE, BINDING_HTTP_POST), + ("%s/aas" % BASE, BINDING_SOAP) + ] + }, + }, + "aq": { + "endpoints": { + "authn_query_service": [ + ("%s/aqs" % BASE, BINDING_SOAP) + ] + }, + }, + "idp": { + "endpoints": { + "single_sign_on_service": [ + ("%s/sso/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/sso/post" % BASE, BINDING_HTTP_POST), + ("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT), + ("%s/sso/paos" % BASE, BINDING_SOAP) + ], + "single_logout_service": [ + ("%s/slo/soap" % BASE, BINDING_SOAP), + ("%s/slo/post" % BASE, BINDING_HTTP_POST) + ], + "artifact_resolution_service": [ + ("%s/ars" % BASE, BINDING_SOAP) + ], + "assertion_id_request_service": [ + ("%s/airs" % BASE, BINDING_URI) + ], + "authn_query_service": [ + ("%s/aqs" % BASE, BINDING_SOAP) + ], + "manage_name_id_service": [ + ("%s/mni/soap" % BASE, BINDING_SOAP), + ("%s/mni/post" % BASE, BINDING_HTTP_POST), + ("%s/mni/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/mni/art" % BASE, BINDING_HTTP_ARTIFACT) + ], + "name_id_mapping_service": [ + ("%s/nim/soap" % BASE, BINDING_SOAP), + ("%s/nim/post" % BASE, BINDING_HTTP_POST), + ("%s/nim/redirect" % BASE, BINDING_HTTP_REDIRECT), + ("%s/nim/art" % BASE, BINDING_HTTP_ARTIFACT) + ] + }, + "policy": { + "default": { + "lifetime": {"minutes": 15}, + "attribute_restrictions": None, # means all I have + "name_form": NAME_FORMAT_URI, + }, + "urn:mace:example.com:saml:roland:sp": { + "lifetime": {"minutes": 5}, + "nameid_format": NAMEID_FORMAT_PERSISTENT, + # "attribute_restrictions":{ + # "givenName": None, + # "surName": None, + # } + } + }, + "subject_data": ("mongodb", "subject"), + "session_storage": ("mongodb", "session") + }, + }, + "debug": 1, + "key_file": full_path("test.key"), + "cert_file": full_path("test.pem"), + "xmlsec_binary": xmlsec_path, + "metadata": { + "local": [full_path("servera.xml"), + full_path("vo_metadata.xml")], + }, + "attribute_map_dir": full_path("attributemaps"), + "organization": { + "name": "Exempel AB", + "display_name": [("Exempel ÄB", "se"), ("Example Co.", "en")], + "url": "http://www.example.com/roland", + }, + "contact_person": [ + { + "given_name":"John", + "sur_name": "Smith", + "email_address": ["john.smith@example.com"], + "contact_type": "technical", + }, + ], +} + diff --git a/tests/server_conf.py b/tests/server_conf.py index b7b6ad2..c571a1d 100644 --- a/tests/server_conf.py +++ b/tests/server_conf.py @@ -6,25 +6,26 @@ CONFIG={ "description": "My own SP", "service": { "sp": { - "endpoints":{ - "assertion_consumer_service": ["http://lingon.catalogix.se:8087/"], + "endpoints": { + "assertion_consumer_service": [ + "http://lingon.catalogix.se:8087/"], }, "required_attributes": ["surName", "givenName", "mail"], "optional_attributes": ["title"], "idp": ["urn:mace:example.com:saml:roland:idp"], } }, - "debug" : 1, - "key_file" : full_path("test.key"), - "cert_file" : full_path("test.pem"), + "debug": 1, + "key_file": full_path("test.key"), + "cert_file": full_path("test.pem"), "ca_certs": full_path("cacerts.txt"), - "xmlsec_binary" : xmlsec_path, + "xmlsec_binary": xmlsec_path, "metadata": { "local": [full_path("idp.xml"), full_path("vo_metadata.xml")], }, - "virtual_organization" : { + "virtual_organization": { "urn:mace:example.com:it:tek":{ - "nameid_format" : "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID", + "nameid_format": "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID", "common_identifier": "umuselin", } }, diff --git a/tests/test_22_mdie.py b/tests/test_22_mdie.py index 337b833..6814a29 100644 --- a/tests/test_22_mdie.py +++ b/tests/test_22_mdie.py @@ -25,17 +25,20 @@ ONTS = { xmlenc.NAMESPACE: xmlenc } -def _eq(l1,l2): + +def _eq(l1, l2): return set(l1) == set(l2) + def _class(cls): return "%s&%s" % (cls.c_namespace, cls.c_tag) + def test_construct_contact(): c = from_dict({ "__class__": _class(md.ContactPerson), - "given_name":{"text":"Roland", "__class__": _class(md.GivenName)}, - "sur_name": {"text":"Hedberg", "__class__": _class(md.SurName)}, + "given_name": {"text": "Roland", "__class__": _class(md.GivenName)}, + "sur_name": {"text": "Hedberg", "__class__": _class(md.SurName)}, "email_address": [{"text":"roland@catalogix.se", "__class__": _class(md.EmailAddress)}], }, ONTS) @@ -44,4 +47,4 @@ def test_construct_contact(): assert c.given_name.text == "Roland" assert c.sur_name.text == "Hedberg" assert c.email_address[0].text == "roland@catalogix.se" - assert _eq(c.keyswv(), ["given_name","sur_name","email_address"]) + assert _eq(c.keyswv(), ["given_name", "sur_name", "email_address"]) diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 757f798..59cc804 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -10,18 +10,19 @@ from saml2 import samlp, saml, client, config from saml2 import s_utils from saml2 import sigver from saml2 import time_util -from saml2.s_utils import OtherError, UnsupportedBinding +from saml2.s_utils import OtherError from saml2.s_utils import do_attribute_statement, factory from saml2.soap import make_soap_enveloped_saml_thingy -from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_HTTP_REDIRECT from py.test import raises -import os nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="123456") -def _eq(l1,l2): + +def _eq(l1, l2): return set(l1) == set(l2) @@ -39,25 +40,27 @@ class TestServer1(): def test_issuer(self): issuer = self.server._issuer() assert isinstance(issuer, saml.Issuer) - assert _eq(issuer.keyswv(), ["text","format"]) + assert _eq(issuer.keyswv(), ["text", "format"]) assert issuer.format == saml.NAMEID_FORMAT_ENTITY assert issuer.text == self.server.config.entityid - def test_assertion(self): assertion = s_utils.assertion_factory( - subject= factory(saml.Subject, text="_aaa", - name_id=factory(saml.NameID, - format=saml.NAMEID_FORMAT_TRANSIENT)), - attribute_statement = do_attribute_statement({ - ("","","surName"): ("Jeter",""), - ("","","givenName") :("Derek",""), - }), + subject=factory( + saml.Subject, text="_aaa", + name_id=factory(saml.NameID, + format=saml.NAMEID_FORMAT_TRANSIENT)), + attribute_statement=do_attribute_statement( + { + ("", "", "surName"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), issuer=self.server._issuer(), - ) + ) - assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id', - 'subject', 'issue_instant', 'version']) + assert _eq(assertion.keyswv(), ['attribute_statement', 'issuer', 'id', + 'subject', 'issue_instant', 'version']) assert assertion.version == "2.0" assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp" # @@ -77,31 +80,33 @@ class TestServer1(): assert attr0.attribute_value[0].text == "Jeter" # subject = assertion.subject - assert _eq(subject.keyswv(),["text", "name_id"]) + assert _eq(subject.keyswv(), ["text", "name_id"]) assert subject.text == "_aaa" assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT def test_response(self): response = sigver.response_factory( - in_response_to="_012345", - destination="https:#www.example.com", - status=s_utils.success_status_factory(), - assertion=s_utils.assertion_factory( - subject = factory( saml.Subject, text="_aaa", - name_id=saml.NAMEID_FORMAT_TRANSIENT), - attribute_statement = do_attribute_statement({ - ("","","surName"): ("Jeter",""), - ("","","givenName") :("Derek",""), - }), - issuer=self.server._issuer(), + in_response_to="_012345", + destination="https:#www.example.com", + status=s_utils.success_status_factory(), + assertion=s_utils.assertion_factory( + subject=factory(saml.Subject, text="_aaa", + name_id=saml.NAMEID_FORMAT_TRANSIENT), + attribute_statement=do_attribute_statement( + { + ("", "", "surName"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } ), issuer=self.server._issuer(), - ) + ), + issuer=self.server._issuer(), + ) print response.keyswv() - assert _eq(response.keyswv(),['destination', 'assertion','status', - 'in_response_to', 'issue_instant', - 'version', 'issuer', 'id']) + assert _eq(response.keyswv(), ['destination', 'assertion', 'status', + 'in_response_to', 'issue_instant', + 'version', 'issuer', 'id']) assert response.version == "2.0" assert response.issuer.text == "urn:mace:example.com:saml:roland:idp" assert response.destination == "https:#www.example.com" @@ -113,25 +118,24 @@ class TestServer1(): def test_parse_faulty_request(self): authn_request = self.client.create_authn_request( - destination = "http://www.example.com", - id = "id1") + destination="http://www.example.com", id="id1") # should raise an error because faulty spentityid binding = BINDING_HTTP_REDIRECT - htargs = self.client.apply_binding(binding, "%s" % authn_request, - "http://www.example.com", "abcd") + htargs = self.client.apply_binding( + binding, "%s" % authn_request, "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) print _dict raises(OtherError, self.server.parse_authn_request, - _dict["SAMLRequest"][0], binding) + _dict["SAMLRequest"][0], binding) def test_parse_faulty_request_to_err_status(self): authn_request = self.client.create_authn_request( - destination = "http://www.example.com") + destination="http://www.example.com") binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, "%s" % authn_request, - "http://www.example.com", "abcd") + "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) print _dict @@ -147,7 +151,7 @@ class TestServer1(): assert _eq(status.keyswv(), ["status_code", "status_message"]) assert status.status_message.text == 'Not destined for me!' status_code = status.status_code - assert _eq(status_code.keyswv(), ["status_code","value"]) + assert _eq(status_code.keyswv(), ["status_code", "value"]) assert status_code.value == samlp.STATUS_RESPONDER assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL @@ -175,25 +179,26 @@ class TestServer1(): def test_sso_response_with_identity(self): name_id = self.server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", - "id12") + "urn:mace:example.com:saml:roland:sp", "id12") resp = self.server.create_authn_response( - {"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, + { + "eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man" + }, "id12", # in_response_to "http://localhost:8087/", # destination - "urn:mace:example.com:saml:roland:sp", # sp_entity_id + "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, authn=(AUTHN_PASSWORD, "http://www.example.com/login") ) print resp.keyswv() - assert _eq(resp.keyswv(),['status', 'destination', 'assertion', - 'in_response_to', 'issue_instant', - 'version', 'id', 'issuer']) + assert _eq(resp.keyswv(), ['status', 'destination', 'assertion', + 'in_response_to', 'issue_instant', + 'version', 'id', 'issuer']) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status @@ -229,17 +234,17 @@ class TestServer1(): def test_sso_response_without_identity(self): resp = self.server.create_authn_response( - {}, - "id12", # in_response_to - "http://localhost:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - userid="USER1", - authn=(AUTHN_PASSWORD, "http://www.example.com/login") - ) + {}, + "id12", # in_response_to + "http://localhost:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + userid="USER1", + authn=(AUTHN_PASSWORD, "http://www.example.com/login") + ) print resp.keyswv() - assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to', - 'issue_instant', 'version', 'id', 'issuer']) + assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to', + 'issue_instant', 'version', 'id', 'issuer']) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status @@ -249,22 +254,21 @@ class TestServer1(): def test_sso_failure_response(self): exc = s_utils.MissingValue("eduPersonAffiliation missing") - resp = self.server.create_error_response("id12", - "http://localhost:8087/", - exc ) + resp = self.server.create_error_response( + "id12", "http://localhost:8087/", exc) print resp.keyswv() - assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to', - 'issue_instant', 'version', 'id', 'issuer']) + assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to', + 'issue_instant', 'version', 'id', 'issuer']) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status print resp.status assert resp.status.status_code.value == samlp.STATUS_RESPONDER assert resp.status.status_code.status_code.value == \ - samlp.STATUS_REQUEST_UNSUPPORTED + samlp.STATUS_REQUEST_UNSUPPORTED assert resp.status.status_message.text == \ - "eduPersonAffiliation missing" + "eduPersonAffiliation missing" assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert not resp.assertion @@ -275,24 +279,22 @@ class TestServer1(): conf.load_file("server_conf") self.client = client.Saml2Client(conf) - ava = { "givenName": ["Derek"], "surName": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "title": "The man"} + ava = {"givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT, allow_create="true") resp_str = "%s" % self.server.create_authn_response( - ava, "id1", "http://local:8087/", - "urn:mace:example.com:saml:roland:sp", - npolicy, - "foba0001@example.com", - authn=(AUTHN_PASSWORD, + ava, "id1", "http://local:8087/", + "urn:mace:example.com:saml:roland:sp", npolicy, + "foba0001@example.com", authn=(AUTHN_PASSWORD, "http://www.example.com/login")) response = samlp.response_from_string(resp_str) print response.keyswv() - assert _eq(response.keyswv(),['status', 'destination', 'assertion', - 'in_response_to', 'issue_instant', 'version', - 'issuer', 'id']) + assert _eq(response.keyswv(), ['status', 'destination', 'assertion', + 'in_response_to', 'issue_instant', + 'version', 'issuer', 'id']) print response.assertion[0].keyswv() assert len(response.assertion) == 1 assert _eq(response.assertion[0].keyswv(), ['attribute_statement', @@ -308,19 +310,18 @@ class TestServer1(): def test_signed_response(self): name_id = self.server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", - "id12") - ava = { "givenName": ["Derek"], "surName": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "title": "The man"} + "urn:mace:example.com:saml:roland:sp", "id12") + ava = {"givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} signed_resp = self.server.create_authn_response( - ava, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id = name_id, - sign_assertion=True - ) + ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + sign_assertion=True + ) print signed_resp assert signed_resp @@ -339,7 +340,7 @@ class TestServer1(): sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", - "not_on_or_after" : soon, + "not_on_or_after": soon, "user": { "givenName": "Leo", "surName": "Laport", @@ -393,6 +394,7 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"], "surName": ["Jeter"], "givenName": ["Derek"], "mail": ["foo@gmail.com"], "title": "The man"} + class TestServer2(): def setup_class(self): self.server = Server("restrictive_idp_conf") @@ -403,9 +405,9 @@ class TestServer2(): def test_do_attribute_reponse(self): aa_policy = self.server.config.getattr("policy", "idp") print aa_policy.__dict__ - response = self.server.create_attribute_response(IDENTITY.copy(), "aaa", - "http://example.com/sp/", - "urn:mace:example.com:sp:1") + response = self.server.create_attribute_response( + IDENTITY.copy(), "aaa", "http://example.com/sp/", + "urn:mace:example.com:sp:1") assert response is not None assert response.destination == "http://example.com/sp/" @@ -422,6 +424,7 @@ class TestServer2(): subject_confirmation = subject.subject_confirmation assert subject_confirmation.subject_confirmation_data.in_response_to == "aaa" + def _logout_request(conf_file): conf = config.SPConfig() conf.load_file(conf_file) @@ -431,7 +434,7 @@ def _logout_request(conf_file): sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", - "not_on_or_after" : soon, + "not_on_or_after": soon, "user": { "givenName": "Leo", "surName": "Laport", @@ -440,10 +443,11 @@ def _logout_request(conf_file): sp.users.add_information_about_person(sinfo) return sp.create_logout_request( - name_id = nid, - destination = "http://localhost:8088/slo", - issuer_entity_id = "urn:mace:example.com:saml:roland:idp", - reason = "I'm tired of this") + name_id=nid, + destination="http://localhost:8088/slo", + issuer_entity_id="urn:mace:example.com:saml:roland:idp", + reason="I'm tired of this") + class TestServerLogout(): @@ -463,3 +467,9 @@ class TestServerLogout(): assert len(http_args) == 4 assert http_args["headers"][0][0] == "Location" assert http_args["data"] == [] + + +if __name__ == "__main__": + ts = TestServer1() + ts.setup_class() + ts.test_authn_response_0() \ No newline at end of file diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 0812739..54445d4 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -19,12 +19,14 @@ from saml2.time_util import in_a_while from py.test import raises from fakeIDP import FakeIDP, unpack_form + def for_me(condition, me ): for restriction in condition.audience_restriction: audience = restriction.audience if audience.text.strip() == me: return True + def ava(attribute_statement): result = {} for attribute in attribute_statement.attribute: @@ -35,6 +37,7 @@ def ava(attribute_statement): result[name].append(value.text.strip()) return result + def _leq(l1, l2): return set(l1) == set(l2) @@ -107,14 +110,14 @@ class TestClient: "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", attribute={ ("urn:oid:2.5.4.42", - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", - "givenName"): None, + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "givenName"): None, ("urn:oid:2.5.4.4", - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", - "surname"): None, + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "surname"): None, ("urn:oid:1.2.840.113549.1.9.1", - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, - }, + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, + }, format=saml.NAMEID_FORMAT_PERSISTENT, message_id="id1") @@ -140,10 +143,10 @@ class TestClient: seen.append("surname") elif attribute.name == "urn:oid:1.2.840.113549.1.9.1": assert attribute.name_format == saml.NAME_FORMAT_URI - if getattr(attribute,"friendly_name"): + if getattr(attribute, "friendly_name"): assert False seen.append("email") - assert _leq(seen,["givenName", "surname", "email"]) + assert _leq(seen, ["givenName", "surname", "email"]) def test_create_attribute_query_3(self): req = self.client.create_attribute_query( @@ -162,25 +165,6 @@ class TestClient: assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" - - # def test_idp_entry(self): - # idp_entry = self.client.idp_entry(name="Umeå Universitet", - # location="https://idp.umu.se/") - # - # assert idp_entry.name == "Umeå Universitet" - # assert idp_entry.loc == "https://idp.umu.se/" - # - # def test_scope(self): - # entity_id = "urn:mace:example.com:saml:roland:idp" - # locs = self.client.metadata.single_sign_on_services(entity_id) - # scope = self.client.scoping_from_metadata(entity_id, locs) - # - # assert scope.idp_list - # assert len(scope.idp_list.idp_entry) == 1 - # idp_entry = scope.idp_list.idp_entry[0] - # assert idp_entry.name == 'Exempel AB' - # assert idp_entry.loc == ['http://localhost:8088/sso'] - def test_create_auth_request_0(self): ar_str = "%s" % self.client.create_authn_request( "http://www.example.com/sso", message_id="id1") @@ -198,7 +182,7 @@ class TestClient: def test_create_auth_request_vo(self): assert self.client.config.vorg.keys() == [ - "urn:mace:example.com:it:tek"] + "urn:mace:example.com:it:tek"] ar_str = "%s" % self.client.create_authn_request( "http://www.example.com/sso", @@ -294,8 +278,8 @@ class TestClient: # --- authenticate another person - ava = { "givenName": ["Alfonson"], "surName": ["Soriano"], - "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} + ava = {"givenName": ["Alfonson"], "surName": ["Soriano"], + "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} resp_str = "%s" % self.server.create_authn_response( identity=ava, @@ -308,8 +292,9 @@ class TestClient: resp_str = base64.encodestring(resp_str) - self.client.parse_authn_request_response(resp_str, BINDING_HTTP_POST, - {"id2":"http://foo.example.com/service"}) + self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id2": "http://foo.example.com/service"}) # Two persons in the cache assert len(self.client.users.subjects()) == 2 @@ -336,6 +321,8 @@ class TestClient: # Below can only be done with dummy Server IDP = "urn:mace:example.com:saml:roland:idp" + + class TestClientWithDummy(): def setup_class(self): self.server = FakeIDP("idp_all_conf") @@ -347,8 +334,8 @@ class TestClientWithDummy(): self.client.send = self.server.receive def test_do_authn(self): - sid, http_args = self.client.prepare_for_authenticate(IDP, - "http://www.example.com/relay_state") + sid, http_args = self.client.prepare_for_authenticate( + IDP, "http://www.example.com/relay_state") assert isinstance(sid, basestring) assert len(http_args) == 4 @@ -356,11 +343,10 @@ class TestClientWithDummy(): assert http_args["data"] == [] def test_do_attribute_query(self): - response = self.client.do_attribute_query(IDP, - "_e7b68a04488f715cda642fbdd90099f5", - attribute={"eduPersonAffiliation":None}, - nameid_format=NAMEID_FORMAT_TRANSIENT) - + response = self.client.do_attribute_query( + IDP, "_e7b68a04488f715cda642fbdd90099f5", + attribute={"eduPersonAffiliation":None}, + nameid_format=NAMEID_FORMAT_TRANSIENT) def test_logout_1(self): """ one IdP/AA logout from""" @@ -389,9 +375,8 @@ class TestClientWithDummy(): def test_post_sso(self): sid, http_args = self.client.prepare_for_authenticate( - "urn:mace:example.com:saml:roland:idp", - relay_state="really", - binding=BINDING_HTTP_POST) + "urn:mace:example.com:saml:roland:idp", relay_state="really", + binding=BINDING_HTTP_POST) # Normally a response would now be sent back to the users web client # Here I fake what the client will do @@ -415,3 +400,8 @@ class TestClientWithDummy(): 'http://www.example.com/login' assert ac.authn_context_class_ref.text == AUTHN_PASSWORD + +# if __name__ == "__main__": +# tc = TestClient() +# tc.setup_class() +# tc.test_response() \ No newline at end of file diff --git a/tests/test_65_authn_query.py b/tests/test_65_authn_query.py index d3c3191..1693000 100644 --- a/tests/test_65_authn_query.py +++ b/tests/test_65_authn_query.py @@ -15,6 +15,7 @@ from saml2.server import Server TAG1 = "name=\"SAMLRequest\" value=" + def get_msg(hinfo, binding): if binding == BINDING_SOAP: xmlstr = hinfo["data"] @@ -24,7 +25,7 @@ def get_msg(hinfo, binding): i += len(TAG1) + 1 j = _inp.find('"', i) xmlstr = _inp[i:j] - else: # BINDING_HTTP_REDIRECT + else: # BINDING_HTTP_REDIRECT parts = urlparse(hinfo["headers"][0][1]) xmlstr = parse_qs(parts.query)["SAMLRequest"][0] @@ -32,6 +33,7 @@ def get_msg(hinfo, binding): # ------------------------------------------------------------------------ + def test_basic(): sp = Saml2Client(config_file="servera_conf") idp = Server(config_file="idp_all_conf") @@ -43,7 +45,8 @@ def test_basic(): authn_context_class_ref=AuthnContextClassRef( text=AUTHN_PASSWORD))] - subject = Subject(text="abc", name_id=NameID(format=NAMEID_FORMAT_TRANSIENT)) + subject = Subject(text="abc", + name_id=NameID(format=NAMEID_FORMAT_TRANSIENT)) aq = sp.create_authn_query(subject, destination, authn_context) @@ -51,15 +54,17 @@ def test_basic(): assert isinstance(aq, AuthnQuery) + def test_flow(): sp = Saml2Client(config_file="servera_conf") idp = Server(config_file="idp_all_conf") relay_state = "FOO" # -- dummy request --- - orig_req = AuthnRequest(issuer=sp._issuer(), - name_id_policy=NameIDPolicy(allow_create="true", - format=NAMEID_FORMAT_TRANSIENT)) + orig_req = AuthnRequest( + issuer=sp._issuer(), + name_id_policy=NameIDPolicy(allow_create="true", + format=NAMEID_FORMAT_TRANSIENT)) # == Create an AuthnRequest response @@ -84,7 +89,7 @@ def test_flow(): xmlstr = get_msg(hinfo, binding) aresp = sp.parse_authn_request_response(xmlstr, binding, - {resp.in_response_to :"/"}) + {resp.in_response_to: "/"}) binding, destination = sp.pick_binding("authn_query_service", entity_id=idp.config.entityid) @@ -113,7 +118,6 @@ def test_flow(): msg = pm.message assert msg.id == aq.id - p_res = idp.create_authn_query_response(msg.subject, msg.session_index, msg.requested_authn_context) @@ -131,3 +135,6 @@ def test_flow(): print final assert final.response.id == p_res.id + +if __name__ == "__main__": + test_flow() \ No newline at end of file diff --git a/tests/test_71_mongodb.py b/tests/test_71_mongodb.py index 3b031d2..d8b7f05 100644 --- a/tests/test_71_mongodb.py +++ b/tests/test_71_mongodb.py @@ -1 +1,54 @@ +from saml2 import BINDING_HTTP_POST +from saml2.saml import AUTHN_PASSWORD +from saml2.client import Saml2Client +from saml2.server import Server + __author__ = 'rolandh' + + +def _eq(l1, l2): + return set(l1) == set(l2) + + +def test_flow(): + sp = Saml2Client(config_file="servera_conf") + idp1 = Server(config_file="idp_conf_mdb") + idp2 = Server(config_file="idp_conf_mdb") + + # clean out database + idp1.ident.mdb.db.drop() + + # -- dummy request --- + orig_req = sp.create_authn_request(idp1.config.entityid) + + # == Create an AuthnRequest response + + rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) + + #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) + resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + userid="jeter", + authn=(AUTHN_PASSWORD, + "http://www.example.com/login"), + **rinfo) + + # What's stored away is the assertion + a_info = idp2.session_db.get_assertion(resp.assertion.id) + # Make sure what I got back from MongoDB is the same as I put in + assert a_info["assertion"] == resp.assertion + + # By subject + nid = resp.assertion.subject.name_id + _assertion = idp2.session_db.get_assertions_by_subject(nid) + assert len(_assertion) == 1 + assert _assertion[0] == resp.assertion + + nids = idp2.ident.find_nameid("jeter") + assert len(nids) == 1 + +if __name__ == "__main__": + test_flow()