diff --git a/src/saml2/ident.py b/src/saml2/ident.py index 162f3fa..709c304 100644 --- a/src/saml2/ident.py +++ b/src/saml2/ident.py @@ -334,4 +334,5 @@ class IdentDB(object): return name_id def close(self): - self.db.close() + if hasattr(self.db, 'close'): + self.db.close() diff --git a/src/saml2/server.py b/src/saml2/server.py index 34fb4aa..bc078a2 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -145,28 +145,32 @@ class Server(Entity): raise Exception("Couldn't open identity database: %s" % (dbspec,)) - _domain = self.config.getattr("domain", "idp") - if _domain: - self.ident.domain = _domain + try: + _domain = self.config.getattr("domain", "idp") + if _domain: + self.ident.domain = _domain - self.ident.name_qualifier = self.config.entityid + self.ident.name_qualifier = self.config.entityid - dbspec = self.config.getattr("edu_person_targeted_id", "idp") - if not dbspec: - pass - else: - typ = dbspec[0] - addr = dbspec[1] - secret = dbspec[2] - if typ == "shelve": - self.eptid = EptidShelve(secret, addr) - elif typ == "mongodb": - from saml2.mongo_store import EptidMDB - - self.eptid = EptidMDB(secret, database=addr, - collection="eptid") + dbspec = self.config.getattr("edu_person_targeted_id", "idp") + if not dbspec: + pass else: - self.eptid = Eptid(secret) + typ = dbspec[0] + addr = dbspec[1] + secret = dbspec[2] + if typ == "shelve": + self.eptid = EptidShelve(secret, addr) + elif typ == "mongodb": + from saml2.mongo_store import EptidMDB + + self.eptid = EptidMDB(secret, database=addr, + collection="eptid") + else: + self.eptid = Eptid(secret) + except Exception: + self.ident.close() + raise def wants(self, sp_entity_id, index=None): """ Returns what attributes the SP requires and which are optional @@ -681,3 +685,6 @@ class Server(Entity): soap_envelope = soapenv.Envelope(header=header, body=body) return "%s" % soap_envelope + + def close(self): + self.ident.close() diff --git a/tests/test_37_entity_categories.py b/tests/test_37_entity_categories.py index ab6fa53..63fdefa 100644 --- a/tests/test_37_entity_categories.py +++ b/tests/test_37_entity_categories.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2 import saml, sigver from saml2 import md from saml2 import config @@ -150,18 +151,17 @@ def test_filter_ava5(): def test_idp_policy_filter(): - idp = Server("idp_conf_ec") + with closing(Server("idp_conf_ec")) as idp: + ava = {"givenName": ["Derek"], "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "norEduPersonNIN": "19800101134"} - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "norEduPersonNIN": "19800101134"} + policy = idp.config.getattr("policy", "idp") + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata) - policy = idp.config.getattr("policy", "idp") - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata) - - print ava - assert ava.keys() == ["eduPersonTargetedID"] # because no entity category + print ava + assert ava.keys() == ["eduPersonTargetedID"] # because no entity category if __name__ == "__main__": test_idp_policy_filter() diff --git a/tests/test_41_response.py b/tests/test_41_response.py index 14aed60..d9ad37c 100644 --- a/tests/test_41_response.py +++ b/tests/test_41_response.py @@ -1,6 +1,8 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from contextlib import closing + from saml2 import config from saml2.authn_context import INTERNETPROTOCOLPASSWORD @@ -34,38 +36,38 @@ AUTHN = { class TestResponse: def setup_class(self): - server = Server("idp_conf") - name_id = server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", "id12") + with closing(Server("idp_conf")) as server: + name_id = server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") - self._resp_ = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", - # consumer_url - "urn:mace:example.com:saml:roland:sp", - # sp_entity_id - name_id=name_id) + self._resp_ = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", + # consumer_url + "urn:mace:example.com:saml:roland:sp", + # sp_entity_id + name_id=name_id) - self._sign_resp_ = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id=name_id, - sign_assertion=True) + self._sign_resp_ = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + sign_assertion=True) - self._resp_authn = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id=name_id, - authn=AUTHN) + self._resp_authn = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + authn=AUTHN) - conf = config.SPConfig() - conf.load_file("server_conf") - self.conf = conf + conf = config.SPConfig() + conf.load_file("server_conf") + self.conf = conf def test_1(self): xml_response = ("%s" % (self._resp_,)) diff --git a/tests/test_42_enc.py b/tests/test_42_enc.py index 8158a05..71934d0 100644 --- a/tests/test_42_enc.py +++ b/tests/test_42_enc.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.server import Server from saml2.sigver import pre_encryption_part, ASSERT_XPATH, EncryptError @@ -30,13 +31,13 @@ def test_pre_enc(): def test_reshuffle_response(): - server = Server("idp_conf") - name_id = server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", "id12") + with closing(Server("idp_conf")) as server: + name_id = server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") - resp_ = server.create_authn_response( - IDENTITY, "id12", "http://lingon.catalogix.se:8087/", - "urn:mace:example.com:saml:roland:sp", name_id=name_id) + resp_ = server.create_authn_response( + IDENTITY, "id12", "http://lingon.catalogix.se:8087/", + "urn:mace:example.com:saml:roland:sp", name_id=name_id) resp2 = pre_encrypt_assertion(resp_) @@ -45,13 +46,13 @@ def test_reshuffle_response(): def test_enc1(): - server = Server("idp_conf") - name_id = server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", "id12") + with closing(Server("idp_conf")) as server: + name_id = server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") - resp_ = server.create_authn_response( - IDENTITY, "id12", "http://lingon.catalogix.se:8087/", - "urn:mace:example.com:saml:roland:sp", name_id=name_id) + resp_ = server.create_authn_response( + IDENTITY, "id12", "http://lingon.catalogix.se:8087/", + "urn:mace:example.com:saml:roland:sp", name_id=name_id) statement = pre_encrypt_assertion(resp_) @@ -82,13 +83,13 @@ def test_enc1(): def test_enc2(): crypto = CryptoBackendXmlSec1(xmlsec_path) - server = Server("idp_conf") - name_id = server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp", "id12") + with closing(Server("idp_conf")) as server: + name_id = server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") - resp_ = server.create_authn_response( - IDENTITY, "id12", "http://lingon.catalogix.se:8087/", - "urn:mace:example.com:saml:roland:sp", name_id=name_id) + resp_ = server.create_authn_response( + IDENTITY, "id12", "http://lingon.catalogix.se:8087/", + "urn:mace:example.com:saml:roland:sp", name_id=name_id) enc_resp = crypto.encrypt_assertion(resp_, full_path("pubkey.pem"), pre_encryption_part()) diff --git a/tests/test_44_authnresp.py b/tests/test_44_authnresp.py index c773d34..10f18f7 100644 --- a/tests/test_44_authnresp.py +++ b/tests/test_44_authnresp.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from contextlib import closing from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.server import Server @@ -28,37 +29,37 @@ AUTHN = { class TestAuthnResponse: def setup_class(self): - server = Server(dotname("idp_conf")) - name_id = server.ident.transient_nameid( - "urn:mace:example.com:saml:roland:sp","id12") + with closing(Server(dotname("idp_conf"))) as server: + name_id = server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp","id12") - self._resp_ = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id=name_id, - authn=AUTHN) - - self._sign_resp_ = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id=name_id, sign_assertion=True, - authn=AUTHN) + self._resp_ = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + authn=AUTHN) - self._resp_authn = server.create_authn_response( - IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - name_id=name_id, - authn=AUTHN) + self._sign_resp_ = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, sign_assertion=True, + authn=AUTHN) - self.conf = config_factory("sp", dotname("server_conf")) - self.conf.only_use_keys_in_metadata = False - self.ar = authn_response(self.conf, "http://lingon.catalogix.se:8087/") + self._resp_authn = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + authn=AUTHN) + + self.conf = config_factory("sp", dotname("server_conf")) + self.conf.only_use_keys_in_metadata = False + self.ar = authn_response(self.conf, "http://lingon.catalogix.se:8087/") def test_verify_1(self): xml_response = "%s" % (self._resp_,) @@ -128,4 +129,4 @@ class TestAuthnResponse: if __name__ == "__main__": t = TestAuthnResponse() t.setup_class() - t.test_verify_1() \ No newline at end of file + t.test_verify_1() diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 50f9f67..af2290b 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import base64 +from contextlib import closing from urlparse import parse_qs from saml2.sigver import pre_encryption_part from saml2.assertion import Policy @@ -49,7 +50,7 @@ class TestServer1(): self.client = client.Saml2Client(conf) def teardown_class(self): - self.server.ident.close() + self.server.close() def test_issuer(self): issuer = self.server._issuer() @@ -419,10 +420,11 @@ class TestServer1(): saml_soap = make_soap_enveloped_saml_thingy(logout_request) self.server.ident.close() - idp = Server("idp_soap_conf") - request = idp.parse_logout_request(saml_soap) - idp.ident.close() - assert request + + with closing(Server("idp_soap_conf")) as idp: + request = idp.parse_logout_request(saml_soap) + idp.ident.close() + assert request #------------------------------------------------------------------------ @@ -436,7 +438,7 @@ class TestServer2(): self.server = Server("restrictive_idp_conf") def teardown_class(self): - self.server.ident.close() + self.server.close() def test_do_attribute_reponse(self): aa_policy = self.server.config.getattr("policy", "idp") @@ -487,21 +489,21 @@ def _logout_request(conf_file): class TestServerLogout(): def test_1(self): - server = Server("idp_slo_redirect_conf") - req_id, request = _logout_request("sp_slo_redirect_conf") - print request - bindings = [BINDING_HTTP_REDIRECT] - response = server.create_logout_response(request, bindings) - binding, destination = server.pick_binding("single_logout_service", - bindings, "spsso", - request) + with closing(Server("idp_slo_redirect_conf")) as server: + req_id, request = _logout_request("sp_slo_redirect_conf") + print request + bindings = [BINDING_HTTP_REDIRECT] + response = server.create_logout_response(request, bindings) + binding, destination = server.pick_binding("single_logout_service", + bindings, "spsso", + request) - http_args = server.apply_binding(binding, "%s" % response, destination, - "relay_state", response=True) + http_args = server.apply_binding(binding, "%s" % response, destination, + "relay_state", response=True) - assert len(http_args) == 4 - assert http_args["headers"][0][0] == "Location" - assert http_args["data"] == [] + assert len(http_args) == 4 + assert http_args["headers"][0][0] == "Location" + assert http_args["data"] == [] if __name__ == "__main__": diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 2efbfdd..762832b 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -120,6 +120,9 @@ class TestClient: conf.load_file("server_conf") self.client = Saml2Client(conf) + def teardown_class(self): + self.server.close() + def test_create_attribute_query1(self): req_id, req = self.client.create_attribute_query( "https://idp.example.com/idp/", diff --git a/tests/test_60_sp.py b/tests/test_60_sp.py index 39cd8aa..dcf72a6 100644 --- a/tests/test_60_sp.py +++ b/tests/test_60_sp.py @@ -48,6 +48,9 @@ class TestSP(): self.sp = make_plugin("rem", saml_conf="server_conf") self.server = Server(config_file="idp_conf") + def teardown_class(self): + self.server.close() + def test_setup(self): assert self.sp diff --git a/tests/test_63_ecp.py b/tests/test_63_ecp.py index 41f2e93..46a2625 100644 --- a/tests/test_63_ecp.py +++ b/tests/test_63_ecp.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.httpbase import set_list2dict from saml2.profile.ecp import RelayState @@ -41,104 +42,104 @@ def test_complete_flow(): metadata_file=full_path("idp_all.xml")) sp = Saml2Client(config_file=dotname("servera_conf")) - idp = Server(config_file=dotname("idp_all_conf")) - IDP_ENTITY_ID = idp.config.entityid - #SP_ENTITY_ID = sp.config.entityid + with closing(Server(config_file=dotname("idp_all_conf"))) as idp: + IDP_ENTITY_ID = idp.config.entityid + #SP_ENTITY_ID = sp.config.entityid - # ------------ @Client ----------------------------- + # ------------ @Client ----------------------------- - headers = client.add_paos_headers([]) + headers = client.add_paos_headers([]) - assert len(headers) == 2 + assert len(headers) == 2 - # ------------ @SP ----------------------------- + # ------------ @SP ----------------------------- - response = DummyResponse(set_list2dict(headers)) + response = DummyResponse(set_list2dict(headers)) - assert sp.can_handle_ecp_response(response) + assert sp.can_handle_ecp_response(response) - sid, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ") + sid, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ") - # ------------ @Client ----------------------------- + # ------------ @Client ----------------------------- - respdict = client.parse_soap_message(message) + respdict = client.parse_soap_message(message) - cargs = client.parse_sp_ecp_response(respdict) + cargs = client.parse_sp_ecp_response(respdict) - assert isinstance(respdict["body"], AuthnRequest) - assert len(respdict["header"]) == 2 - item0 = respdict["header"][0] - assert isinstance(item0, Request) or isinstance(item0, RelayState) + assert isinstance(respdict["body"], AuthnRequest) + assert len(respdict["header"]) == 2 + item0 = respdict["header"][0] + assert isinstance(item0, Request) or isinstance(item0, RelayState) - destination = respdict["body"].destination + destination = respdict["body"].destination - ht_args = client.apply_binding(BINDING_SOAP, respdict["body"], destination) + ht_args = client.apply_binding(BINDING_SOAP, respdict["body"], destination) - # Time to send to the IDP - # ----------- @IDP ------------------------------- + # Time to send to the IDP + # ----------- @IDP ------------------------------- - req = idp.parse_authn_request(ht_args["data"], BINDING_SOAP) + req = idp.parse_authn_request(ht_args["data"], BINDING_SOAP) - assert isinstance(req.message, AuthnRequest) + assert isinstance(req.message, AuthnRequest) - # create Response and return in the SOAP response - sp_entity_id = req.sender() + # create Response and return in the SOAP response + sp_entity_id = req.sender() - name_id = idp.ident.transient_nameid( "id12", sp.config.entityid) - binding, destination = idp.pick_binding("assertion_consumer_service", - [BINDING_PAOS], - entity_id=sp_entity_id) + name_id = idp.ident.transient_nameid( "id12", sp.config.entityid) + binding, destination = idp.pick_binding("assertion_consumer_service", + [BINDING_PAOS], + entity_id=sp_entity_id) - resp = idp.create_ecp_authn_request_response( - destination, {"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man" - }, - req.message.id, destination, sp_entity_id, - name_id=name_id, authn=AUTHN) + resp = idp.create_ecp_authn_request_response( + destination, {"eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man" + }, + req.message.id, destination, sp_entity_id, + name_id=name_id, authn=AUTHN) - # ------------ @Client ----------------------------- - # The client got the response from the IDP repackage and send it to the SP + # ------------ @Client ----------------------------- + # The client got the response from the IDP repackage and send it to the SP - respdict = client.parse_soap_message(resp) - idp_response = respdict["body"] + respdict = client.parse_soap_message(resp) + idp_response = respdict["body"] - assert isinstance(idp_response, Response) - assert len(respdict["header"]) == 1 + assert isinstance(idp_response, Response) + assert len(respdict["header"]) == 1 - _ecp_response = None - for item in respdict["header"]: - if item.c_tag == "Response" and item.c_namespace == ecp_prof.NAMESPACE: - _ecp_response = item + _ecp_response = None + for item in respdict["header"]: + if item.c_tag == "Response" and item.c_namespace == ecp_prof.NAMESPACE: + _ecp_response = item - #_acs_url = _ecp_response.assertion_consumer_service_url + #_acs_url = _ecp_response.assertion_consumer_service_url - # done phase2 at the client + # done phase2 at the client - ht_args = client.use_soap(idp_response, cargs["rc_url"], - [cargs["relay_state"]]) + ht_args = client.use_soap(idp_response, cargs["rc_url"], + [cargs["relay_state"]]) - print ht_args + print ht_args - # ------------ @SP ----------------------------- + # ------------ @SP ----------------------------- - respdict = sp.unpack_soap_message(ht_args["data"]) + respdict = sp.unpack_soap_message(ht_args["data"]) - # verify the relay_state + # verify the relay_state - for header in respdict["header"]: - inst = create_class_from_xml_string(RelayState, header) - if isinstance(inst, RelayState): - assert inst.text == "XYZ" + for header in respdict["header"]: + inst = create_class_from_xml_string(RelayState, header) + if isinstance(inst, RelayState): + assert inst.text == "XYZ" - # parse the response + # parse the response - resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"}) + resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"}) - print resp.response + print resp.response - assert resp.response.destination == "http://lingon.catalogix.se:8087/paos" - assert resp.response.status.status_code.value == STATUS_SUCCESS + assert resp.response.destination == "http://lingon.catalogix.se:8087/paos" + assert resp.response.status.status_code.value == STATUS_SUCCESS diff --git a/tests/test_64_artifact.py b/tests/test_64_artifact.py index d7ca33f..bb5014e 100644 --- a/tests/test_64_artifact.py +++ b/tests/test_64_artifact.py @@ -1,4 +1,5 @@ import base64 +from contextlib import closing from hashlib import sha1 from urlparse import urlparse from urlparse import parse_qs @@ -76,157 +77,156 @@ def test_create_artifact_resolve(): s = sha1(SP) assert artifact[4:24] == s.digest() - idp = Server(config_file="idp_all_conf") + with closing(Server(config_file="idp_all_conf")) as idp: + typecode = artifact[:2] + assert typecode == ARTIFACT_TYPECODE - typecode = artifact[:2] - assert typecode == ARTIFACT_TYPECODE + destination = idp.artifact2destination(b64art, "spsso") - destination = idp.artifact2destination(b64art, "spsso") + msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid()) - msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid()) + print msg - print msg + args = idp.use_soap(msg, destination, None, False) - args = idp.use_soap(msg, destination, None, False) + sp = Saml2Client(config_file="servera_conf") - sp = Saml2Client(config_file="servera_conf") + ar = sp.parse_artifact_resolve(args["data"]) - ar = sp.parse_artifact_resolve(args["data"]) + print ar - print ar - - assert ar.artifact.text == b64art + assert ar.artifact.text == b64art def test_artifact_flow(): #SP = 'urn:mace:example.com:saml:roland:sp' sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") - # original request + with closing(Server(config_file="idp_all_conf")) as idp: + # original request - binding, destination = sp.pick_binding("single_sign_on_service", - entity_id=idp.config.entityid) - relay_state = "RS0" - req_id, req = sp.create_authn_request(destination, id="id1") + binding, destination = sp.pick_binding("single_sign_on_service", + entity_id=idp.config.entityid) + relay_state = "RS0" + req_id, req = sp.create_authn_request(destination, id="id1") - artifact = sp.use_artifact(req, 1) + artifact = sp.use_artifact(req, 1) - binding, destination = sp.pick_binding("single_sign_on_service", - [BINDING_HTTP_ARTIFACT], - entity_id=idp.config.entityid) + binding, destination = sp.pick_binding("single_sign_on_service", + [BINDING_HTTP_ARTIFACT], + entity_id=idp.config.entityid) - hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state) + hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state) - # ========== @IDP ============ + # ========== @IDP ============ - artifact2 = get_msg(hinfo, binding) + artifact2 = get_msg(hinfo, binding) - assert artifact == artifact2 + assert artifact == artifact2 - # The IDP now wants to replace the artifact with the real request + # The IDP now wants to replace the artifact with the real request - destination = idp.artifact2destination(artifact2, "spsso") + destination = idp.artifact2destination(artifact2, "spsso") - msg_id, msg = idp.create_artifact_resolve(artifact2, destination, sid()) + msg_id, msg = idp.create_artifact_resolve(artifact2, destination, sid()) - hinfo = idp.use_soap(msg, destination, None, False) + hinfo = idp.use_soap(msg, destination, None, False) - # ======== @SP ========== + # ======== @SP ========== - msg = get_msg(hinfo, BINDING_SOAP) + msg = get_msg(hinfo, BINDING_SOAP) - ar = sp.parse_artifact_resolve(msg) + ar = sp.parse_artifact_resolve(msg) - assert ar.artifact.text == artifact + assert ar.artifact.text == artifact - # The SP picks the request out of the repository with the artifact as the key - oreq = sp.artifact[ar.artifact.text] - # Should be the same as req above + # The SP picks the request out of the repository with the artifact as the key + oreq = sp.artifact[ar.artifact.text] + # Should be the same as req above - # Returns the information over the existing SOAP connection so - # no transport information needed + # Returns the information over the existing SOAP connection so + # no transport information needed - msg = sp.create_artifact_response(ar, ar.artifact.text) - hinfo = sp.use_soap(msg, destination) + msg = sp.create_artifact_response(ar, ar.artifact.text) + hinfo = sp.use_soap(msg, destination) - # ========== @IDP ============ + # ========== @IDP ============ - msg = get_msg(hinfo, BINDING_SOAP) + msg = get_msg(hinfo, BINDING_SOAP) - # The IDP untangles the request from the artifact resolve response - spreq = idp.parse_artifact_resolve_response(msg) + # The IDP untangles the request from the artifact resolve response + spreq = idp.parse_artifact_resolve_response(msg) - # should be the same as req above + # should be the same as req above - assert spreq.id == req.id + assert spreq.id == req.id - # That was one way, the Request from the SP - # ---------------------------------------------# - # Now for the other, the response from the IDP + # That was one way, the Request from the SP + # ---------------------------------------------# + # Now for the other, the response from the IDP - name_id = idp.ident.transient_nameid(sp.config.entityid, "derek") + name_id = idp.ident.transient_nameid(sp.config.entityid, "derek") - resp_args = idp.response_args(spreq, [BINDING_HTTP_POST]) + resp_args = idp.response_args(spreq, [BINDING_HTTP_POST]) - response = idp.create_authn_response({"eduPersonEntitlement": "Short stop", - "surName": "Jeter", "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, - name_id=name_id, - authn=AUTHN, - **resp_args) + response = idp.create_authn_response({"eduPersonEntitlement": "Short stop", + "surName": "Jeter", "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + name_id=name_id, + authn=AUTHN, + **resp_args) - print response + print response - # with the response in hand create an artifact + # with the response in hand create an artifact - artifact = idp.use_artifact(response, 1) + artifact = idp.use_artifact(response, 1) - binding, destination = sp.pick_binding("single_sign_on_service", - [BINDING_HTTP_ARTIFACT], - entity_id=idp.config.entityid) + binding, destination = sp.pick_binding("single_sign_on_service", + [BINDING_HTTP_ARTIFACT], + entity_id=idp.config.entityid) - hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state, - response=True) + hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state, + response=True) - # ========== SP ========= + # ========== SP ========= - artifact3 = get_msg(hinfo, binding) + artifact3 = get_msg(hinfo, binding) - assert artifact == artifact3 + assert artifact == artifact3 - destination = sp.artifact2destination(artifact3, "idpsso") + destination = sp.artifact2destination(artifact3, "idpsso") - # Got an artifact want to replace it with the real message - msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid()) + # Got an artifact want to replace it with the real message + msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid()) - print msg + print msg - hinfo = sp.use_soap(msg, destination, None, False) + hinfo = sp.use_soap(msg, destination, None, False) - # ======== IDP ========== + # ======== IDP ========== - msg = get_msg(hinfo, BINDING_SOAP) + msg = get_msg(hinfo, BINDING_SOAP) - ar = idp.parse_artifact_resolve(msg) + ar = idp.parse_artifact_resolve(msg) - print ar + print ar - assert ar.artifact.text == artifact3 + assert ar.artifact.text == artifact3 - # The IDP retrieves the response from the database using the artifact as the key - #oreq = idp.artifact[ar.artifact.text] + # The IDP retrieves the response from the database using the artifact as the key + #oreq = idp.artifact[ar.artifact.text] - binding, destination = idp.pick_binding("artifact_resolution_service", - entity_id=sp.config.entityid) + binding, destination = idp.pick_binding("artifact_resolution_service", + entity_id=sp.config.entityid) - resp = idp.create_artifact_response(ar, ar.artifact.text) - hinfo = idp.use_soap(resp, destination) + resp = idp.create_artifact_response(ar, ar.artifact.text) + hinfo = idp.use_soap(resp, destination) - # ========== SP ============ + # ========== SP ============ - msg = get_msg(hinfo, BINDING_SOAP) - sp_resp = sp.parse_artifact_resolve_response(msg) + msg = get_msg(hinfo, BINDING_SOAP) + sp_resp = sp.parse_artifact_resolve_response(msg) - assert sp_resp.id == response.id + assert sp_resp.id == response.id diff --git a/tests/test_65_authn_query.py b/tests/test_65_authn_query.py index 94f3b3e..22152f1 100644 --- a/tests/test_65_authn_query.py +++ b/tests/test_65_authn_query.py @@ -1,3 +1,4 @@ +from contextlib import closing from urlparse import urlparse, parse_qs from saml2 import BINDING_SOAP, BINDING_HTTP_POST @@ -43,100 +44,99 @@ def get_msg(hinfo, binding): def test_basic(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") + with closing(Server(config_file="idp_all_conf")) as idp: + srvs = sp.metadata.authn_query_service(idp.config.entityid) - srvs = sp.metadata.authn_query_service(idp.config.entityid) + destination = srvs[0]["location"] + authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) - destination = srvs[0]["location"] - authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) + subject = Subject(text="abc", + name_id=NameID(format=NAMEID_FORMAT_TRANSIENT)) - subject = Subject(text="abc", - name_id=NameID(format=NAMEID_FORMAT_TRANSIENT)) + _id, aq = sp.create_authn_query(subject, destination, authn_context) - _id, aq = sp.create_authn_query(subject, destination, authn_context) + print aq - print aq - - assert isinstance(aq, AuthnQuery) + assert isinstance(aq, AuthnQuery) def test_flow(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") - relay_state = "FOO" - # -- dummy request --- - orig_req = AuthnRequest( - issuer=sp._issuer(), - name_id_policy=NameIDPolicy(allow_create="true", - format=NAMEID_FORMAT_TRANSIENT)) + with closing(Server(config_file="idp_all_conf")) as idp: + relay_state = "FOO" + # -- dummy request --- + orig_req = AuthnRequest( + issuer=sp._issuer(), + name_id_policy=NameIDPolicy(allow_create="true", + format=NAMEID_FORMAT_TRANSIENT)) - # == Create an AuthnRequest response + # == Create an AuthnRequest response - name_id = idp.ident.transient_nameid(sp.config.entityid, "id12") - binding, destination = idp.pick_binding("assertion_consumer_service", - entity_id=sp.config.entityid) - resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, - "id-123456789", - destination, - sp.config.entityid, - name_id=name_id, - authn=AUTHN) + name_id = idp.ident.transient_nameid(sp.config.entityid, "id12") + binding, destination = idp.pick_binding("assertion_consumer_service", + entity_id=sp.config.entityid) + resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + "id-123456789", + destination, + sp.config.entityid, + name_id=name_id, + authn=AUTHN) - hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) + hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) - # ------- @SP ---------- + # ------- @SP ---------- - xmlstr = get_msg(hinfo, binding) - aresp = sp.parse_authn_request_response(xmlstr, binding, - {resp.in_response_to: "/"}) + xmlstr = get_msg(hinfo, binding) + aresp = sp.parse_authn_request_response(xmlstr, binding, + {resp.in_response_to: "/"}) - binding, destination = sp.pick_binding("authn_query_service", - entity_id=idp.config.entityid) + binding, destination = sp.pick_binding("authn_query_service", + entity_id=idp.config.entityid) - authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) + authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) - subject = aresp.assertion.subject + subject = aresp.assertion.subject - aq_id, aq = sp.create_authn_query(subject, destination, authn_context) + aq_id, aq = sp.create_authn_query(subject, destination, authn_context) - print aq + print aq - assert isinstance(aq, AuthnQuery) - binding = BINDING_SOAP + assert isinstance(aq, AuthnQuery) + binding = BINDING_SOAP - hinfo = sp.apply_binding(binding, "%s" % aq, destination, "state2") + hinfo = sp.apply_binding(binding, "%s" % aq, destination, "state2") - # -------- @IDP ---------- + # -------- @IDP ---------- - xmlstr = get_msg(hinfo, binding) + xmlstr = get_msg(hinfo, binding) - pm = idp.parse_authn_query(xmlstr, binding) + pm = idp.parse_authn_query(xmlstr, binding) - msg = pm.message - assert msg.id == aq.id + msg = pm.message + assert msg.id == aq.id - p_res = idp.create_authn_query_response(msg.subject, msg.session_index, - msg.requested_authn_context) + p_res = idp.create_authn_query_response(msg.subject, msg.session_index, + msg.requested_authn_context) - print p_res + print p_res - hinfo = idp.apply_binding(binding, "%s" % p_res, "", "state2", - response=True) + hinfo = idp.apply_binding(binding, "%s" % p_res, "", "state2", + response=True) - # ------- @SP ---------- + # ------- @SP ---------- - xmlstr = get_msg(hinfo, binding) + xmlstr = get_msg(hinfo, binding) - final = sp.parse_authn_query_response(xmlstr, binding) + final = sp.parse_authn_query_response(xmlstr, binding) - print final + print final - assert final.response.id == p_res.id + assert final.response.id == p_res.id if __name__ == "__main__": test_flow() diff --git a/tests/test_66_name_id_mapping.py b/tests/test_66_name_id_mapping.py index 3e1da83..7a41d89 100644 --- a/tests/test_66_name_id_mapping.py +++ b/tests/test_66_name_id_mapping.py @@ -1,5 +1,6 @@ __author__ = 'rolandh' +from contextlib import closing from saml2.client import Saml2Client from saml2.saml import NameID, NAMEID_FORMAT_PERSISTENT from saml2.saml import NAMEID_FORMAT_TRANSIENT @@ -10,64 +11,64 @@ from saml2.samlp import NameIDMappingRequest def test_base_request(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") - binding, destination = sp.pick_binding("name_id_mapping_service", - entity_id=idp.config.entityid) + with closing(Server(config_file="idp_all_conf")) as idp: + binding, destination = sp.pick_binding("name_id_mapping_service", + entity_id=idp.config.entityid) - policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, - sp_name_qualifier="urn:mace:swamid:junk", - allow_create="true") + policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, + sp_name_qualifier="urn:mace:swamid:junk", + allow_create="true") - nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") + nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") - mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) + mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) - print nmr + print nmr - assert isinstance(nmr, NameIDMappingRequest) + assert isinstance(nmr, NameIDMappingRequest) def test_request_response(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") - binding, destination = sp.pick_binding("name_id_mapping_service", - entity_id=idp.config.entityid) + with closing(Server(config_file="idp_all_conf")) as idp: + binding, destination = sp.pick_binding("name_id_mapping_service", + entity_id=idp.config.entityid) - policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, - sp_name_qualifier="urn:mace:swamid:junk", - allow_create="true") + policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, + sp_name_qualifier="urn:mace:swamid:junk", + allow_create="true") - nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") + nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") - mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) + mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) - print nmr + print nmr - args = sp.use_soap(nmr, destination) + args = sp.use_soap(nmr, destination) - # ------- IDP ------------ + # ------- IDP ------------ - req = idp.parse_name_id_mapping_request(args["data"], binding) + req = idp.parse_name_id_mapping_request(args["data"], binding) - in_response_to = req.message.id - name_id = NameID(format=NAMEID_FORMAT_PERSISTENT, text="foobar") + in_response_to = req.message.id + name_id = NameID(format=NAMEID_FORMAT_PERSISTENT, text="foobar") - idp_response = idp.create_name_id_mapping_response( - name_id, in_response_to=in_response_to) + idp_response = idp.create_name_id_mapping_response( + name_id, in_response_to=in_response_to) - print idp_response + print idp_response - ht_args = sp.use_soap(idp_response) + ht_args = sp.use_soap(idp_response) - # ------- SP ------------ + # ------- SP ------------ - _resp = sp.parse_name_id_mapping_request_response(ht_args["data"], binding) + _resp = sp.parse_name_id_mapping_request_response(ht_args["data"], binding) - print _resp.response + print _resp.response - r_name_id = _resp.response.name_id + r_name_id = _resp.response.name_id - assert r_name_id.format == NAMEID_FORMAT_PERSISTENT - assert r_name_id.text == "foobar" + assert r_name_id.format == NAMEID_FORMAT_PERSISTENT + assert r_name_id.text == "foobar" diff --git a/tests/test_67_manage_name_id.py b/tests/test_67_manage_name_id.py index f0e41fb..6176a24 100644 --- a/tests/test_67_manage_name_id.py +++ b/tests/test_67_manage_name_id.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2 import BINDING_SOAP from saml2.samlp import NewID from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT @@ -9,73 +10,71 @@ __author__ = 'rolandh' def test_basic(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") + with closing(Server(config_file="idp_all_conf")) as idp: + # -------- @SP ------------ + binding, destination = sp.pick_binding("manage_name_id_service", + entity_id=idp.config.entityid) - # -------- @SP ------------ - binding, destination = sp.pick_binding("manage_name_id_service", - entity_id=idp.config.entityid) + nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") + newid = NewID(text="Barfoo") - nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") - newid = NewID(text="Barfoo") + mid, mreq = sp.create_manage_name_id_request(destination, name_id=nameid, + new_id=newid) - mid, mreq = sp.create_manage_name_id_request(destination, name_id=nameid, - new_id=newid) + print mreq + rargs = sp.apply_binding(binding, "%s" % mreq, destination, "") - print mreq - rargs = sp.apply_binding(binding, "%s" % mreq, destination, "") + # --------- @IDP -------------- - # --------- @IDP -------------- + _req = idp.parse_manage_name_id_request(rargs["data"], binding) - _req = idp.parse_manage_name_id_request(rargs["data"], binding) + print _req.message - print _req.message - - assert mid == _req.message.id + assert mid == _req.message.id def test_flow(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") + with closing(Server(config_file="idp_all_conf")) as idp: + binding, destination = sp.pick_binding("manage_name_id_service", + entity_id=idp.config.entityid) - binding, destination = sp.pick_binding("manage_name_id_service", - entity_id=idp.config.entityid) + nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") + newid = NewID(text="Barfoo") - nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") - newid = NewID(text="Barfoo") + mid, midq = sp.create_manage_name_id_request(destination, name_id=nameid, + new_id=newid) - mid, midq = sp.create_manage_name_id_request(destination, name_id=nameid, - new_id=newid) + print midq + rargs = sp.apply_binding(binding, "%s" % midq, destination, "") - print midq - rargs = sp.apply_binding(binding, "%s" % midq, destination, "") + # --------- @IDP -------------- - # --------- @IDP -------------- + _req = idp.parse_manage_name_id_request(rargs["data"], binding) - _req = idp.parse_manage_name_id_request(rargs["data"], binding) + print _req.message - print _req.message + mnir = idp.create_manage_name_id_response(_req.message, [binding]) - mnir = idp.create_manage_name_id_response(_req.message, [binding]) + if binding != BINDING_SOAP: + binding, destination = idp.pick_binding("manage_name_id_service", + entity_id=sp.config.entityid) + else: + destination = "" - if binding != BINDING_SOAP: - binding, destination = idp.pick_binding("manage_name_id_service", - entity_id=sp.config.entityid) - else: - destination = "" + respargs = idp.apply_binding(binding, "%s" % mnir, destination, "") - respargs = idp.apply_binding(binding, "%s" % mnir, destination, "") + print respargs - print respargs + # ---------- @SP --------------- - # ---------- @SP --------------- + _response = sp.parse_manage_name_id_request_response(respargs["data"], + binding) - _response = sp.parse_manage_name_id_request_response(respargs["data"], - binding) + print _response.response - print _response.response - - assert _response.response.id == mnir.id + assert _response.response.id == mnir.id if __name__ == "__main__": - test_flow() \ No newline at end of file + test_flow() diff --git a/tests/test_68_assertion_id.py b/tests/test_68_assertion_id.py index ec3a55f..0c61fed 100644 --- a/tests/test_68_assertion_id.py +++ b/tests/test_68_assertion_id.py @@ -1,3 +1,4 @@ +from contextlib import closing from urlparse import parse_qs from urlparse import urlparse from saml2.authn_context import INTERNETPROTOCOLPASSWORD @@ -46,66 +47,65 @@ def get_msg(hinfo, binding, response=False): def test_basic_flow(): sp = Saml2Client(config_file="servera_conf") - idp = Server(config_file="idp_all_conf") + with closing(Server(config_file="idp_all_conf")) as idp: + # -------- @IDP ------------- - # -------- @IDP ------------- + relay_state = "FOO" + # -- dummy request --- + orig_req = AuthnRequest( + issuer=sp._issuer(), name_id_policy=NameIDPolicy( + allow_create="true", format=NAMEID_FORMAT_TRANSIENT)) - relay_state = "FOO" - # -- dummy request --- - orig_req = AuthnRequest( - issuer=sp._issuer(), name_id_policy=NameIDPolicy( - allow_create="true", format=NAMEID_FORMAT_TRANSIENT)) + # == Create an AuthnRequest response - # == Create an AuthnRequest response + name_id = idp.ident.transient_nameid("id12", sp.config.entityid) - name_id = idp.ident.transient_nameid("id12", sp.config.entityid) + binding, destination = idp.pick_binding("assertion_consumer_service", + entity_id=sp.config.entityid) + resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + "id-123456789", + destination, + sp.config.entityid, + name_id=name_id, + authn=AUTHN) - binding, destination = idp.pick_binding("assertion_consumer_service", - entity_id=sp.config.entityid) - resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, - "id-123456789", - destination, - sp.config.entityid, - name_id=name_id, - authn=AUTHN) + hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) - hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) + # --------- @SP ------------- - # --------- @SP ------------- + xmlstr = get_msg(hinfo, binding) - xmlstr = get_msg(hinfo, binding) + aresp = sp.parse_authn_request_response(xmlstr, binding, + {resp.in_response_to: "/"}) - aresp = sp.parse_authn_request_response(xmlstr, binding, - {resp.in_response_to: "/"}) + # == Look for assertion X - # == Look for assertion X + asid = aresp.assertion.id - asid = aresp.assertion.id + binding, destination = sp.pick_binding("assertion_id_request_service", + entity_id=idp.config.entityid) - binding, destination = sp.pick_binding("assertion_id_request_service", - entity_id=idp.config.entityid) + hinfo = sp.apply_binding(binding, asid, destination) - hinfo = sp.apply_binding(binding, asid, destination) + # ---------- @IDP ------------ - # ---------- @IDP ------------ + aid = get_msg(hinfo, binding, response=False) - aid = get_msg(hinfo, binding, response=False) + # == construct response - # == construct response + resp = idp.create_assertion_id_request_response(aid) - resp = idp.create_assertion_id_request_response(aid) + hinfo = idp.apply_binding(binding, "%s" % resp, None, "", response=True) - hinfo = idp.apply_binding(binding, "%s" % resp, None, "", response=True) + # ----------- @SP ------------- - # ----------- @SP ------------- + xmlstr = get_msg(hinfo, binding, response=True) - xmlstr = get_msg(hinfo, binding, response=True) + final = sp.parse_assertion_id_request_response(xmlstr, binding) - final = sp.parse_assertion_id_request_response(xmlstr, binding) - - print final.response - assert isinstance(final.response, Assertion) + print final.response + assert isinstance(final.response, Assertion) diff --git a/tests/test_70_redirect_signing.py b/tests/test_70_redirect_signing.py index fae2ff8..d54f70a 100644 --- a/tests/test_70_redirect_signing.py +++ b/tests/test_70_redirect_signing.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2.pack import http_redirect_message from saml2.sigver import verify_redirect_signature from saml2.sigver import import_rsa_key_from_file @@ -12,37 +13,35 @@ from pathutils import dotname __author__ = 'rolandh' -idp = Server(config_file=dotname("idp_all_conf")) - -conf = SPConfig() -conf.load_file(dotname("servera_conf")) -sp = Saml2Client(conf) - - def test(): - srvs = sp.metadata.single_sign_on_service(idp.config.entityid, - BINDING_HTTP_REDIRECT) + with closing(Server(config_file=dotname("idp_all_conf"))) as idp: + conf = SPConfig() + conf.load_file(dotname("servera_conf")) + sp = Saml2Client(conf) - destination = srvs[0]["location"] - req_id, req = sp.create_authn_request(destination, id="id1") + srvs = sp.metadata.single_sign_on_service(idp.config.entityid, + BINDING_HTTP_REDIRECT) - try: - key = sp.sec.key - except AttributeError: - key = import_rsa_key_from_file(sp.sec.key_file) + destination = srvs[0]["location"] + req_id, req = sp.create_authn_request(destination, id="id1") - info = http_redirect_message(req, destination, relay_state="RS", - typ="SAMLRequest", sigalg=SIG_RSA_SHA1, - key=key) + try: + key = sp.sec.key + except AttributeError: + key = import_rsa_key_from_file(sp.sec.key_file) - verified_ok = False + info = http_redirect_message(req, destination, relay_state="RS", + typ="SAMLRequest", sigalg=SIG_RSA_SHA1, + key=key) - for param, val in info["headers"]: - if param == "Location": - _dict = parse_qs(val.split("?")[1]) - _certs = idp.metadata.certs(sp.config.entityid, "any", "signing") - for cert in _certs: - if verify_redirect_signature(_dict, cert): - verified_ok = True + verified_ok = False - assert verified_ok + for param, val in info["headers"]: + if param == "Location": + _dict = parse_qs(val.split("?")[1]) + _certs = idp.metadata.certs(sp.config.entityid, "any", "signing") + for cert in _certs: + if verify_redirect_signature(_dict, cert): + verified_ok = True + + assert verified_ok diff --git a/tests/test_75_mongodb.py b/tests/test_75_mongodb.py index 6e64e47..d61fef1 100644 --- a/tests/test_75_mongodb.py +++ b/tests/test_75_mongodb.py @@ -1,3 +1,4 @@ +from contextlib import closing from saml2 import BINDING_HTTP_POST from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client @@ -19,42 +20,41 @@ def _eq(l1, l2): def test_flow(): sp = Saml2Client(config_file="servera_conf") - idp1 = Server(config_file="idp_conf_mdb") - idp2 = Server(config_file="idp_conf_mdb") + with closing(Server(config_file="idp_conf_mdb")) as idp1: + with closing(Server(config_file="idp_conf_mdb")) as idp2: + # clean out database + idp1.ident.mdb.db.drop() - # clean out database - idp1.ident.mdb.db.drop() + # -- dummy request --- + req_id, orig_req = sp.create_authn_request(idp1.config.entityid) - # -- dummy request --- - req_id, orig_req = sp.create_authn_request(idp1.config.entityid) + # == Create an AuthnRequest response - # == Create an AuthnRequest response + rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) - rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) + #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) + resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + userid="jeter", + authn=AUTHN, + **rinfo) - #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) - resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, - userid="jeter", - authn=AUTHN, - **rinfo) + # What's stored away is the assertion + a_info = idp2.session_db.get_assertion(resp.assertion.id) + # Make sure what I got back from MongoDB is the same as I put in + assert a_info["assertion"] == resp.assertion - # What's stored away is the assertion - a_info = idp2.session_db.get_assertion(resp.assertion.id) - # Make sure what I got back from MongoDB is the same as I put in - assert a_info["assertion"] == resp.assertion + # By subject + nid = resp.assertion.subject.name_id + _assertion = idp2.session_db.get_assertions_by_subject(nid) + assert len(_assertion) == 1 + assert _assertion[0] == resp.assertion - # By subject - nid = resp.assertion.subject.name_id - _assertion = idp2.session_db.get_assertions_by_subject(nid) - assert len(_assertion) == 1 - assert _assertion[0] == resp.assertion - - nids = idp2.ident.find_nameid("jeter") - assert len(nids) == 1 + nids = idp2.ident.find_nameid("jeter") + assert len(nids) == 1 def test_eptid_mongo_db():