add closing method to make sure to close ident db connection

This commit is contained in:
Erick Tryzelaar
2014-10-24 14:57:34 -07:00
parent bf9747cd97
commit ff5cb7d8ee
17 changed files with 533 additions and 513 deletions

View File

@@ -334,4 +334,5 @@ class IdentDB(object):
return name_id return name_id
def close(self): def close(self):
self.db.close() if hasattr(self.db, 'close'):
self.db.close()

View File

@@ -145,28 +145,32 @@ class Server(Entity):
raise Exception("Couldn't open identity database: %s" % raise Exception("Couldn't open identity database: %s" %
(dbspec,)) (dbspec,))
_domain = self.config.getattr("domain", "idp") try:
if _domain: _domain = self.config.getattr("domain", "idp")
self.ident.domain = _domain if _domain:
self.ident.domain = _domain
self.ident.name_qualifier = self.config.entityid self.ident.name_qualifier = self.config.entityid
dbspec = self.config.getattr("edu_person_targeted_id", "idp") dbspec = self.config.getattr("edu_person_targeted_id", "idp")
if not dbspec: if not dbspec:
pass pass
else:
typ = dbspec[0]
addr = dbspec[1]
secret = dbspec[2]
if typ == "shelve":
self.eptid = EptidShelve(secret, addr)
elif typ == "mongodb":
from saml2.mongo_store import EptidMDB
self.eptid = EptidMDB(secret, database=addr,
collection="eptid")
else: else:
self.eptid = Eptid(secret) typ = dbspec[0]
addr = dbspec[1]
secret = dbspec[2]
if typ == "shelve":
self.eptid = EptidShelve(secret, addr)
elif typ == "mongodb":
from saml2.mongo_store import EptidMDB
self.eptid = EptidMDB(secret, database=addr,
collection="eptid")
else:
self.eptid = Eptid(secret)
except Exception:
self.ident.close()
raise
def wants(self, sp_entity_id, index=None): def wants(self, sp_entity_id, index=None):
""" Returns what attributes the SP requires and which are optional """ Returns what attributes the SP requires and which are optional
@@ -681,3 +685,6 @@ class Server(Entity):
soap_envelope = soapenv.Envelope(header=header, body=body) soap_envelope = soapenv.Envelope(header=header, body=body)
return "%s" % soap_envelope return "%s" % soap_envelope
def close(self):
self.ident.close()

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2 import saml, sigver from saml2 import saml, sigver
from saml2 import md from saml2 import md
from saml2 import config from saml2 import config
@@ -150,18 +151,17 @@ def test_filter_ava5():
def test_idp_policy_filter(): def test_idp_policy_filter():
idp = Server("idp_conf_ec") with closing(Server("idp_conf_ec")) as idp:
ava = {"givenName": ["Derek"], "sn": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "c": ["USA"],
"eduPersonTargetedID": "foo!bar!xyz",
"norEduPersonNIN": "19800101134"}
ava = {"givenName": ["Derek"], "sn": ["Jeter"], policy = idp.config.getattr("policy", "idp")
"mail": ["derek@nyy.mlb.com"], "c": ["USA"], ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata)
"eduPersonTargetedID": "foo!bar!xyz",
"norEduPersonNIN": "19800101134"}
policy = idp.config.getattr("policy", "idp") print ava
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata) assert ava.keys() == ["eduPersonTargetedID"] # because no entity category
print ava
assert ava.keys() == ["eduPersonTargetedID"] # because no entity category
if __name__ == "__main__": if __name__ == "__main__":
test_idp_policy_filter() test_idp_policy_filter()

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from contextlib import closing
from saml2 import config from saml2 import config
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
@@ -34,38 +36,38 @@ AUTHN = {
class TestResponse: class TestResponse:
def setup_class(self): def setup_class(self):
server = Server("idp_conf") with closing(Server("idp_conf")) as server:
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12") "urn:mace:example.com:saml:roland:sp", "id12")
self._resp_ = server.create_authn_response( self._resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", "http://lingon.catalogix.se:8087/",
# consumer_url # consumer_url
"urn:mace:example.com:saml:roland:sp", "urn:mace:example.com:saml:roland:sp",
# sp_entity_id # sp_entity_id
name_id=name_id) name_id=name_id)
self._sign_resp_ = server.create_authn_response( self._sign_resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, name_id=name_id,
sign_assertion=True) sign_assertion=True)
self._resp_authn = server.create_authn_response( self._resp_authn = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, name_id=name_id,
authn=AUTHN) authn=AUTHN)
conf = config.SPConfig() conf = config.SPConfig()
conf.load_file("server_conf") conf.load_file("server_conf")
self.conf = conf self.conf = conf
def test_1(self): def test_1(self):
xml_response = ("%s" % (self._resp_,)) xml_response = ("%s" % (self._resp_,))

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.server import Server from saml2.server import Server
from saml2.sigver import pre_encryption_part, ASSERT_XPATH, EncryptError from saml2.sigver import pre_encryption_part, ASSERT_XPATH, EncryptError
@@ -30,13 +31,13 @@ def test_pre_enc():
def test_reshuffle_response(): def test_reshuffle_response():
server = Server("idp_conf") with closing(Server("idp_conf")) as server:
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12") "urn:mace:example.com:saml:roland:sp", "id12")
resp_ = server.create_authn_response( resp_ = server.create_authn_response(
IDENTITY, "id12", "http://lingon.catalogix.se:8087/", IDENTITY, "id12", "http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp", name_id=name_id) "urn:mace:example.com:saml:roland:sp", name_id=name_id)
resp2 = pre_encrypt_assertion(resp_) resp2 = pre_encrypt_assertion(resp_)
@@ -45,13 +46,13 @@ def test_reshuffle_response():
def test_enc1(): def test_enc1():
server = Server("idp_conf") with closing(Server("idp_conf")) as server:
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12") "urn:mace:example.com:saml:roland:sp", "id12")
resp_ = server.create_authn_response( resp_ = server.create_authn_response(
IDENTITY, "id12", "http://lingon.catalogix.se:8087/", IDENTITY, "id12", "http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp", name_id=name_id) "urn:mace:example.com:saml:roland:sp", name_id=name_id)
statement = pre_encrypt_assertion(resp_) statement = pre_encrypt_assertion(resp_)
@@ -82,13 +83,13 @@ def test_enc1():
def test_enc2(): def test_enc2():
crypto = CryptoBackendXmlSec1(xmlsec_path) crypto = CryptoBackendXmlSec1(xmlsec_path)
server = Server("idp_conf") with closing(Server("idp_conf")) as server:
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12") "urn:mace:example.com:saml:roland:sp", "id12")
resp_ = server.create_authn_response( resp_ = server.create_authn_response(
IDENTITY, "id12", "http://lingon.catalogix.se:8087/", IDENTITY, "id12", "http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp", name_id=name_id) "urn:mace:example.com:saml:roland:sp", name_id=name_id)
enc_resp = crypto.encrypt_assertion(resp_, full_path("pubkey.pem"), enc_resp = crypto.encrypt_assertion(resp_, full_path("pubkey.pem"),
pre_encryption_part()) pre_encryption_part())

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from contextlib import closing
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.server import Server from saml2.server import Server
@@ -28,37 +29,37 @@ AUTHN = {
class TestAuthnResponse: class TestAuthnResponse:
def setup_class(self): def setup_class(self):
server = Server(dotname("idp_conf")) with closing(Server(dotname("idp_conf"))) as server:
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp","id12") "urn:mace:example.com:saml:roland:sp","id12")
self._resp_ = server.create_authn_response( self._resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, name_id=name_id,
authn=AUTHN) authn=AUTHN)
self._sign_resp_ = server.create_authn_response(
IDENTITY,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, sign_assertion=True,
authn=AUTHN)
self._resp_authn = server.create_authn_response( self._sign_resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, name_id=name_id, sign_assertion=True,
authn=AUTHN) authn=AUTHN)
self.conf = config_factory("sp", dotname("server_conf")) self._resp_authn = server.create_authn_response(
self.conf.only_use_keys_in_metadata = False IDENTITY,
self.ar = authn_response(self.conf, "http://lingon.catalogix.se:8087/") "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=AUTHN)
self.conf = config_factory("sp", dotname("server_conf"))
self.conf.only_use_keys_in_metadata = False
self.ar = authn_response(self.conf, "http://lingon.catalogix.se:8087/")
def test_verify_1(self): def test_verify_1(self):
xml_response = "%s" % (self._resp_,) xml_response = "%s" % (self._resp_,)
@@ -128,4 +129,4 @@ class TestAuthnResponse:
if __name__ == "__main__": if __name__ == "__main__":
t = TestAuthnResponse() t = TestAuthnResponse()
t.setup_class() t.setup_class()
t.test_verify_1() t.test_verify_1()

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64 import base64
from contextlib import closing
from urlparse import parse_qs from urlparse import parse_qs
from saml2.sigver import pre_encryption_part from saml2.sigver import pre_encryption_part
from saml2.assertion import Policy from saml2.assertion import Policy
@@ -49,7 +50,7 @@ class TestServer1():
self.client = client.Saml2Client(conf) self.client = client.Saml2Client(conf)
def teardown_class(self): def teardown_class(self):
self.server.ident.close() self.server.close()
def test_issuer(self): def test_issuer(self):
issuer = self.server._issuer() issuer = self.server._issuer()
@@ -419,10 +420,11 @@ class TestServer1():
saml_soap = make_soap_enveloped_saml_thingy(logout_request) saml_soap = make_soap_enveloped_saml_thingy(logout_request)
self.server.ident.close() self.server.ident.close()
idp = Server("idp_soap_conf")
request = idp.parse_logout_request(saml_soap) with closing(Server("idp_soap_conf")) as idp:
idp.ident.close() request = idp.parse_logout_request(saml_soap)
assert request idp.ident.close()
assert request
#------------------------------------------------------------------------ #------------------------------------------------------------------------
@@ -436,7 +438,7 @@ class TestServer2():
self.server = Server("restrictive_idp_conf") self.server = Server("restrictive_idp_conf")
def teardown_class(self): def teardown_class(self):
self.server.ident.close() self.server.close()
def test_do_attribute_reponse(self): def test_do_attribute_reponse(self):
aa_policy = self.server.config.getattr("policy", "idp") aa_policy = self.server.config.getattr("policy", "idp")
@@ -487,21 +489,21 @@ def _logout_request(conf_file):
class TestServerLogout(): class TestServerLogout():
def test_1(self): def test_1(self):
server = Server("idp_slo_redirect_conf") with closing(Server("idp_slo_redirect_conf")) as server:
req_id, request = _logout_request("sp_slo_redirect_conf") req_id, request = _logout_request("sp_slo_redirect_conf")
print request print request
bindings = [BINDING_HTTP_REDIRECT] bindings = [BINDING_HTTP_REDIRECT]
response = server.create_logout_response(request, bindings) response = server.create_logout_response(request, bindings)
binding, destination = server.pick_binding("single_logout_service", binding, destination = server.pick_binding("single_logout_service",
bindings, "spsso", bindings, "spsso",
request) request)
http_args = server.apply_binding(binding, "%s" % response, destination, http_args = server.apply_binding(binding, "%s" % response, destination,
"relay_state", response=True) "relay_state", response=True)
assert len(http_args) == 4 assert len(http_args) == 4
assert http_args["headers"][0][0] == "Location" assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == [] assert http_args["data"] == []
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -120,6 +120,9 @@ class TestClient:
conf.load_file("server_conf") conf.load_file("server_conf")
self.client = Saml2Client(conf) self.client = Saml2Client(conf)
def teardown_class(self):
self.server.close()
def test_create_attribute_query1(self): def test_create_attribute_query1(self):
req_id, req = self.client.create_attribute_query( req_id, req = self.client.create_attribute_query(
"https://idp.example.com/idp/", "https://idp.example.com/idp/",

View File

@@ -48,6 +48,9 @@ class TestSP():
self.sp = make_plugin("rem", saml_conf="server_conf") self.sp = make_plugin("rem", saml_conf="server_conf")
self.server = Server(config_file="idp_conf") self.server = Server(config_file="idp_conf")
def teardown_class(self):
self.server.close()
def test_setup(self): def test_setup(self):
assert self.sp assert self.sp

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.httpbase import set_list2dict from saml2.httpbase import set_list2dict
from saml2.profile.ecp import RelayState from saml2.profile.ecp import RelayState
@@ -41,104 +42,104 @@ def test_complete_flow():
metadata_file=full_path("idp_all.xml")) metadata_file=full_path("idp_all.xml"))
sp = Saml2Client(config_file=dotname("servera_conf")) sp = Saml2Client(config_file=dotname("servera_conf"))
idp = Server(config_file=dotname("idp_all_conf"))
IDP_ENTITY_ID = idp.config.entityid with closing(Server(config_file=dotname("idp_all_conf"))) as idp:
#SP_ENTITY_ID = sp.config.entityid IDP_ENTITY_ID = idp.config.entityid
#SP_ENTITY_ID = sp.config.entityid
# ------------ @Client ----------------------------- # ------------ @Client -----------------------------
headers = client.add_paos_headers([]) headers = client.add_paos_headers([])
assert len(headers) == 2 assert len(headers) == 2
# ------------ @SP ----------------------------- # ------------ @SP -----------------------------
response = DummyResponse(set_list2dict(headers)) response = DummyResponse(set_list2dict(headers))
assert sp.can_handle_ecp_response(response) assert sp.can_handle_ecp_response(response)
sid, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ") sid, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ")
# ------------ @Client ----------------------------- # ------------ @Client -----------------------------
respdict = client.parse_soap_message(message) respdict = client.parse_soap_message(message)
cargs = client.parse_sp_ecp_response(respdict) cargs = client.parse_sp_ecp_response(respdict)
assert isinstance(respdict["body"], AuthnRequest) assert isinstance(respdict["body"], AuthnRequest)
assert len(respdict["header"]) == 2 assert len(respdict["header"]) == 2
item0 = respdict["header"][0] item0 = respdict["header"][0]
assert isinstance(item0, Request) or isinstance(item0, RelayState) assert isinstance(item0, Request) or isinstance(item0, RelayState)
destination = respdict["body"].destination destination = respdict["body"].destination
ht_args = client.apply_binding(BINDING_SOAP, respdict["body"], destination) ht_args = client.apply_binding(BINDING_SOAP, respdict["body"], destination)
# Time to send to the IDP # Time to send to the IDP
# ----------- @IDP ------------------------------- # ----------- @IDP -------------------------------
req = idp.parse_authn_request(ht_args["data"], BINDING_SOAP) req = idp.parse_authn_request(ht_args["data"], BINDING_SOAP)
assert isinstance(req.message, AuthnRequest) assert isinstance(req.message, AuthnRequest)
# create Response and return in the SOAP response # create Response and return in the SOAP response
sp_entity_id = req.sender() sp_entity_id = req.sender()
name_id = idp.ident.transient_nameid( "id12", sp.config.entityid) name_id = idp.ident.transient_nameid( "id12", sp.config.entityid)
binding, destination = idp.pick_binding("assertion_consumer_service", binding, destination = idp.pick_binding("assertion_consumer_service",
[BINDING_PAOS], [BINDING_PAOS],
entity_id=sp_entity_id) entity_id=sp_entity_id)
resp = idp.create_ecp_authn_request_response( resp = idp.create_ecp_authn_request_response(
destination, {"eduPersonEntitlement": "Short stop", destination, {"eduPersonEntitlement": "Short stop",
"surName": "Jeter", "surName": "Jeter",
"givenName": "Derek", "givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man" "title": "The man"
}, },
req.message.id, destination, sp_entity_id, req.message.id, destination, sp_entity_id,
name_id=name_id, authn=AUTHN) name_id=name_id, authn=AUTHN)
# ------------ @Client ----------------------------- # ------------ @Client -----------------------------
# The client got the response from the IDP repackage and send it to the SP # The client got the response from the IDP repackage and send it to the SP
respdict = client.parse_soap_message(resp) respdict = client.parse_soap_message(resp)
idp_response = respdict["body"] idp_response = respdict["body"]
assert isinstance(idp_response, Response) assert isinstance(idp_response, Response)
assert len(respdict["header"]) == 1 assert len(respdict["header"]) == 1
_ecp_response = None _ecp_response = None
for item in respdict["header"]: for item in respdict["header"]:
if item.c_tag == "Response" and item.c_namespace == ecp_prof.NAMESPACE: if item.c_tag == "Response" and item.c_namespace == ecp_prof.NAMESPACE:
_ecp_response = item _ecp_response = item
#_acs_url = _ecp_response.assertion_consumer_service_url #_acs_url = _ecp_response.assertion_consumer_service_url
# done phase2 at the client # done phase2 at the client
ht_args = client.use_soap(idp_response, cargs["rc_url"], ht_args = client.use_soap(idp_response, cargs["rc_url"],
[cargs["relay_state"]]) [cargs["relay_state"]])
print ht_args print ht_args
# ------------ @SP ----------------------------- # ------------ @SP -----------------------------
respdict = sp.unpack_soap_message(ht_args["data"]) respdict = sp.unpack_soap_message(ht_args["data"])
# verify the relay_state # verify the relay_state
for header in respdict["header"]: for header in respdict["header"]:
inst = create_class_from_xml_string(RelayState, header) inst = create_class_from_xml_string(RelayState, header)
if isinstance(inst, RelayState): if isinstance(inst, RelayState):
assert inst.text == "XYZ" assert inst.text == "XYZ"
# parse the response # parse the response
resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"}) resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"})
print resp.response print resp.response
assert resp.response.destination == "http://lingon.catalogix.se:8087/paos" assert resp.response.destination == "http://lingon.catalogix.se:8087/paos"
assert resp.response.status.status_code.value == STATUS_SUCCESS assert resp.response.status.status_code.value == STATUS_SUCCESS

View File

@@ -1,4 +1,5 @@
import base64 import base64
from contextlib import closing
from hashlib import sha1 from hashlib import sha1
from urlparse import urlparse from urlparse import urlparse
from urlparse import parse_qs from urlparse import parse_qs
@@ -76,157 +77,156 @@ def test_create_artifact_resolve():
s = sha1(SP) s = sha1(SP)
assert artifact[4:24] == s.digest() assert artifact[4:24] == s.digest()
idp = Server(config_file="idp_all_conf") with closing(Server(config_file="idp_all_conf")) as idp:
typecode = artifact[:2]
assert typecode == ARTIFACT_TYPECODE
typecode = artifact[:2] destination = idp.artifact2destination(b64art, "spsso")
assert typecode == ARTIFACT_TYPECODE
destination = idp.artifact2destination(b64art, "spsso") msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid())
msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid()) print msg
print msg args = idp.use_soap(msg, destination, None, False)
args = idp.use_soap(msg, destination, None, False) sp = Saml2Client(config_file="servera_conf")
sp = Saml2Client(config_file="servera_conf") ar = sp.parse_artifact_resolve(args["data"])
ar = sp.parse_artifact_resolve(args["data"]) print ar
print ar assert ar.artifact.text == b64art
assert ar.artifact.text == b64art
def test_artifact_flow(): def test_artifact_flow():
#SP = 'urn:mace:example.com:saml:roland:sp' #SP = 'urn:mace:example.com:saml:roland:sp'
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
# original request with closing(Server(config_file="idp_all_conf")) as idp:
# original request
binding, destination = sp.pick_binding("single_sign_on_service", binding, destination = sp.pick_binding("single_sign_on_service",
entity_id=idp.config.entityid) entity_id=idp.config.entityid)
relay_state = "RS0" relay_state = "RS0"
req_id, req = sp.create_authn_request(destination, id="id1") req_id, req = sp.create_authn_request(destination, id="id1")
artifact = sp.use_artifact(req, 1) artifact = sp.use_artifact(req, 1)
binding, destination = sp.pick_binding("single_sign_on_service", binding, destination = sp.pick_binding("single_sign_on_service",
[BINDING_HTTP_ARTIFACT], [BINDING_HTTP_ARTIFACT],
entity_id=idp.config.entityid) entity_id=idp.config.entityid)
hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state) hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state)
# ========== @IDP ============ # ========== @IDP ============
artifact2 = get_msg(hinfo, binding) artifact2 = get_msg(hinfo, binding)
assert artifact == artifact2 assert artifact == artifact2
# The IDP now wants to replace the artifact with the real request # The IDP now wants to replace the artifact with the real request
destination = idp.artifact2destination(artifact2, "spsso") destination = idp.artifact2destination(artifact2, "spsso")
msg_id, msg = idp.create_artifact_resolve(artifact2, destination, sid()) msg_id, msg = idp.create_artifact_resolve(artifact2, destination, sid())
hinfo = idp.use_soap(msg, destination, None, False) hinfo = idp.use_soap(msg, destination, None, False)
# ======== @SP ========== # ======== @SP ==========
msg = get_msg(hinfo, BINDING_SOAP) msg = get_msg(hinfo, BINDING_SOAP)
ar = sp.parse_artifact_resolve(msg) ar = sp.parse_artifact_resolve(msg)
assert ar.artifact.text == artifact assert ar.artifact.text == artifact
# The SP picks the request out of the repository with the artifact as the key # The SP picks the request out of the repository with the artifact as the key
oreq = sp.artifact[ar.artifact.text] oreq = sp.artifact[ar.artifact.text]
# Should be the same as req above # Should be the same as req above
# Returns the information over the existing SOAP connection so # Returns the information over the existing SOAP connection so
# no transport information needed # no transport information needed
msg = sp.create_artifact_response(ar, ar.artifact.text) msg = sp.create_artifact_response(ar, ar.artifact.text)
hinfo = sp.use_soap(msg, destination) hinfo = sp.use_soap(msg, destination)
# ========== @IDP ============ # ========== @IDP ============
msg = get_msg(hinfo, BINDING_SOAP) msg = get_msg(hinfo, BINDING_SOAP)
# The IDP untangles the request from the artifact resolve response # The IDP untangles the request from the artifact resolve response
spreq = idp.parse_artifact_resolve_response(msg) spreq = idp.parse_artifact_resolve_response(msg)
# should be the same as req above # should be the same as req above
assert spreq.id == req.id assert spreq.id == req.id
# That was one way, the Request from the SP # That was one way, the Request from the SP
# ---------------------------------------------# # ---------------------------------------------#
# Now for the other, the response from the IDP # Now for the other, the response from the IDP
name_id = idp.ident.transient_nameid(sp.config.entityid, "derek") name_id = idp.ident.transient_nameid(sp.config.entityid, "derek")
resp_args = idp.response_args(spreq, [BINDING_HTTP_POST]) resp_args = idp.response_args(spreq, [BINDING_HTTP_POST])
response = idp.create_authn_response({"eduPersonEntitlement": "Short stop", response = idp.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter", "givenName": "Derek", "surName": "Jeter", "givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man"}, "title": "The man"},
name_id=name_id, name_id=name_id,
authn=AUTHN, authn=AUTHN,
**resp_args) **resp_args)
print response print response
# with the response in hand create an artifact # with the response in hand create an artifact
artifact = idp.use_artifact(response, 1) artifact = idp.use_artifact(response, 1)
binding, destination = sp.pick_binding("single_sign_on_service", binding, destination = sp.pick_binding("single_sign_on_service",
[BINDING_HTTP_ARTIFACT], [BINDING_HTTP_ARTIFACT],
entity_id=idp.config.entityid) entity_id=idp.config.entityid)
hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state, hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state,
response=True) response=True)
# ========== SP ========= # ========== SP =========
artifact3 = get_msg(hinfo, binding) artifact3 = get_msg(hinfo, binding)
assert artifact == artifact3 assert artifact == artifact3
destination = sp.artifact2destination(artifact3, "idpsso") destination = sp.artifact2destination(artifact3, "idpsso")
# Got an artifact want to replace it with the real message # Got an artifact want to replace it with the real message
msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid()) msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid())
print msg print msg
hinfo = sp.use_soap(msg, destination, None, False) hinfo = sp.use_soap(msg, destination, None, False)
# ======== IDP ========== # ======== IDP ==========
msg = get_msg(hinfo, BINDING_SOAP) msg = get_msg(hinfo, BINDING_SOAP)
ar = idp.parse_artifact_resolve(msg) ar = idp.parse_artifact_resolve(msg)
print ar print ar
assert ar.artifact.text == artifact3 assert ar.artifact.text == artifact3
# The IDP retrieves the response from the database using the artifact as the key # The IDP retrieves the response from the database using the artifact as the key
#oreq = idp.artifact[ar.artifact.text] #oreq = idp.artifact[ar.artifact.text]
binding, destination = idp.pick_binding("artifact_resolution_service", binding, destination = idp.pick_binding("artifact_resolution_service",
entity_id=sp.config.entityid) entity_id=sp.config.entityid)
resp = idp.create_artifact_response(ar, ar.artifact.text) resp = idp.create_artifact_response(ar, ar.artifact.text)
hinfo = idp.use_soap(resp, destination) hinfo = idp.use_soap(resp, destination)
# ========== SP ============ # ========== SP ============
msg = get_msg(hinfo, BINDING_SOAP) msg = get_msg(hinfo, BINDING_SOAP)
sp_resp = sp.parse_artifact_resolve_response(msg) sp_resp = sp.parse_artifact_resolve_response(msg)
assert sp_resp.id == response.id assert sp_resp.id == response.id

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from urlparse import urlparse, parse_qs from urlparse import urlparse, parse_qs
from saml2 import BINDING_SOAP, BINDING_HTTP_POST from saml2 import BINDING_SOAP, BINDING_HTTP_POST
@@ -43,100 +44,99 @@ def get_msg(hinfo, binding):
def test_basic(): def test_basic():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") with closing(Server(config_file="idp_all_conf")) as idp:
srvs = sp.metadata.authn_query_service(idp.config.entityid)
srvs = sp.metadata.authn_query_service(idp.config.entityid) destination = srvs[0]["location"]
authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD)
destination = srvs[0]["location"] subject = Subject(text="abc",
authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) name_id=NameID(format=NAMEID_FORMAT_TRANSIENT))
subject = Subject(text="abc", _id, aq = sp.create_authn_query(subject, destination, authn_context)
name_id=NameID(format=NAMEID_FORMAT_TRANSIENT))
_id, aq = sp.create_authn_query(subject, destination, authn_context) print aq
print aq assert isinstance(aq, AuthnQuery)
assert isinstance(aq, AuthnQuery)
def test_flow(): def test_flow():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
relay_state = "FOO" with closing(Server(config_file="idp_all_conf")) as idp:
# -- dummy request --- relay_state = "FOO"
orig_req = AuthnRequest( # -- dummy request ---
issuer=sp._issuer(), orig_req = AuthnRequest(
name_id_policy=NameIDPolicy(allow_create="true", issuer=sp._issuer(),
format=NAMEID_FORMAT_TRANSIENT)) name_id_policy=NameIDPolicy(allow_create="true",
format=NAMEID_FORMAT_TRANSIENT))
# == Create an AuthnRequest response # == Create an AuthnRequest response
name_id = idp.ident.transient_nameid(sp.config.entityid, "id12") name_id = idp.ident.transient_nameid(sp.config.entityid, "id12")
binding, destination = idp.pick_binding("assertion_consumer_service", binding, destination = idp.pick_binding("assertion_consumer_service",
entity_id=sp.config.entityid) entity_id=sp.config.entityid)
resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop", resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter", "surName": "Jeter",
"givenName": "Derek", "givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man"}, "title": "The man"},
"id-123456789", "id-123456789",
destination, destination,
sp.config.entityid, sp.config.entityid,
name_id=name_id, name_id=name_id,
authn=AUTHN) authn=AUTHN)
hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state)
# ------- @SP ---------- # ------- @SP ----------
xmlstr = get_msg(hinfo, binding) xmlstr = get_msg(hinfo, binding)
aresp = sp.parse_authn_request_response(xmlstr, binding, aresp = sp.parse_authn_request_response(xmlstr, binding,
{resp.in_response_to: "/"}) {resp.in_response_to: "/"})
binding, destination = sp.pick_binding("authn_query_service", binding, destination = sp.pick_binding("authn_query_service",
entity_id=idp.config.entityid) entity_id=idp.config.entityid)
authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD) authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD)
subject = aresp.assertion.subject subject = aresp.assertion.subject
aq_id, aq = sp.create_authn_query(subject, destination, authn_context) aq_id, aq = sp.create_authn_query(subject, destination, authn_context)
print aq print aq
assert isinstance(aq, AuthnQuery) assert isinstance(aq, AuthnQuery)
binding = BINDING_SOAP binding = BINDING_SOAP
hinfo = sp.apply_binding(binding, "%s" % aq, destination, "state2") hinfo = sp.apply_binding(binding, "%s" % aq, destination, "state2")
# -------- @IDP ---------- # -------- @IDP ----------
xmlstr = get_msg(hinfo, binding) xmlstr = get_msg(hinfo, binding)
pm = idp.parse_authn_query(xmlstr, binding) pm = idp.parse_authn_query(xmlstr, binding)
msg = pm.message msg = pm.message
assert msg.id == aq.id assert msg.id == aq.id
p_res = idp.create_authn_query_response(msg.subject, msg.session_index, p_res = idp.create_authn_query_response(msg.subject, msg.session_index,
msg.requested_authn_context) msg.requested_authn_context)
print p_res print p_res
hinfo = idp.apply_binding(binding, "%s" % p_res, "", "state2", hinfo = idp.apply_binding(binding, "%s" % p_res, "", "state2",
response=True) response=True)
# ------- @SP ---------- # ------- @SP ----------
xmlstr = get_msg(hinfo, binding) xmlstr = get_msg(hinfo, binding)
final = sp.parse_authn_query_response(xmlstr, binding) final = sp.parse_authn_query_response(xmlstr, binding)
print final print final
assert final.response.id == p_res.id assert final.response.id == p_res.id
if __name__ == "__main__": if __name__ == "__main__":
test_flow() test_flow()

View File

@@ -1,5 +1,6 @@
__author__ = 'rolandh' __author__ = 'rolandh'
from contextlib import closing
from saml2.client import Saml2Client from saml2.client import Saml2Client
from saml2.saml import NameID, NAMEID_FORMAT_PERSISTENT from saml2.saml import NameID, NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NAMEID_FORMAT_TRANSIENT
@@ -10,64 +11,64 @@ from saml2.samlp import NameIDMappingRequest
def test_base_request(): def test_base_request():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
binding, destination = sp.pick_binding("name_id_mapping_service", with closing(Server(config_file="idp_all_conf")) as idp:
entity_id=idp.config.entityid) binding, destination = sp.pick_binding("name_id_mapping_service",
entity_id=idp.config.entityid)
policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT,
sp_name_qualifier="urn:mace:swamid:junk", sp_name_qualifier="urn:mace:swamid:junk",
allow_create="true") allow_create="true")
nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination)
print nmr print nmr
assert isinstance(nmr, NameIDMappingRequest) assert isinstance(nmr, NameIDMappingRequest)
def test_request_response(): def test_request_response():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf")
binding, destination = sp.pick_binding("name_id_mapping_service", with closing(Server(config_file="idp_all_conf")) as idp:
entity_id=idp.config.entityid) binding, destination = sp.pick_binding("name_id_mapping_service",
entity_id=idp.config.entityid)
policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT,
sp_name_qualifier="urn:mace:swamid:junk", sp_name_qualifier="urn:mace:swamid:junk",
allow_create="true") allow_create="true")
nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination) mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination)
print nmr print nmr
args = sp.use_soap(nmr, destination) args = sp.use_soap(nmr, destination)
# ------- IDP ------------ # ------- IDP ------------
req = idp.parse_name_id_mapping_request(args["data"], binding) req = idp.parse_name_id_mapping_request(args["data"], binding)
in_response_to = req.message.id in_response_to = req.message.id
name_id = NameID(format=NAMEID_FORMAT_PERSISTENT, text="foobar") name_id = NameID(format=NAMEID_FORMAT_PERSISTENT, text="foobar")
idp_response = idp.create_name_id_mapping_response( idp_response = idp.create_name_id_mapping_response(
name_id, in_response_to=in_response_to) name_id, in_response_to=in_response_to)
print idp_response print idp_response
ht_args = sp.use_soap(idp_response) ht_args = sp.use_soap(idp_response)
# ------- SP ------------ # ------- SP ------------
_resp = sp.parse_name_id_mapping_request_response(ht_args["data"], binding) _resp = sp.parse_name_id_mapping_request_response(ht_args["data"], binding)
print _resp.response print _resp.response
r_name_id = _resp.response.name_id r_name_id = _resp.response.name_id
assert r_name_id.format == NAMEID_FORMAT_PERSISTENT assert r_name_id.format == NAMEID_FORMAT_PERSISTENT
assert r_name_id.text == "foobar" assert r_name_id.text == "foobar"

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2 import BINDING_SOAP from saml2 import BINDING_SOAP
from saml2.samlp import NewID from saml2.samlp import NewID
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
@@ -9,73 +10,71 @@ __author__ = 'rolandh'
def test_basic(): def test_basic():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") with closing(Server(config_file="idp_all_conf")) as idp:
# -------- @SP ------------
binding, destination = sp.pick_binding("manage_name_id_service",
entity_id=idp.config.entityid)
# -------- @SP ------------ nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
binding, destination = sp.pick_binding("manage_name_id_service", newid = NewID(text="Barfoo")
entity_id=idp.config.entityid)
nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") mid, mreq = sp.create_manage_name_id_request(destination, name_id=nameid,
newid = NewID(text="Barfoo") new_id=newid)
mid, mreq = sp.create_manage_name_id_request(destination, name_id=nameid, print mreq
new_id=newid) rargs = sp.apply_binding(binding, "%s" % mreq, destination, "")
print mreq # --------- @IDP --------------
rargs = sp.apply_binding(binding, "%s" % mreq, destination, "")
# --------- @IDP -------------- _req = idp.parse_manage_name_id_request(rargs["data"], binding)
_req = idp.parse_manage_name_id_request(rargs["data"], binding) print _req.message
print _req.message assert mid == _req.message.id
assert mid == _req.message.id
def test_flow(): def test_flow():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") with closing(Server(config_file="idp_all_conf")) as idp:
binding, destination = sp.pick_binding("manage_name_id_service",
entity_id=idp.config.entityid)
binding, destination = sp.pick_binding("manage_name_id_service", nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
entity_id=idp.config.entityid) newid = NewID(text="Barfoo")
nameid = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") mid, midq = sp.create_manage_name_id_request(destination, name_id=nameid,
newid = NewID(text="Barfoo") new_id=newid)
mid, midq = sp.create_manage_name_id_request(destination, name_id=nameid, print midq
new_id=newid) rargs = sp.apply_binding(binding, "%s" % midq, destination, "")
print midq # --------- @IDP --------------
rargs = sp.apply_binding(binding, "%s" % midq, destination, "")
# --------- @IDP -------------- _req = idp.parse_manage_name_id_request(rargs["data"], binding)
_req = idp.parse_manage_name_id_request(rargs["data"], binding) print _req.message
print _req.message mnir = idp.create_manage_name_id_response(_req.message, [binding])
mnir = idp.create_manage_name_id_response(_req.message, [binding]) if binding != BINDING_SOAP:
binding, destination = idp.pick_binding("manage_name_id_service",
entity_id=sp.config.entityid)
else:
destination = ""
if binding != BINDING_SOAP: respargs = idp.apply_binding(binding, "%s" % mnir, destination, "")
binding, destination = idp.pick_binding("manage_name_id_service",
entity_id=sp.config.entityid)
else:
destination = ""
respargs = idp.apply_binding(binding, "%s" % mnir, destination, "") print respargs
print respargs # ---------- @SP ---------------
# ---------- @SP --------------- _response = sp.parse_manage_name_id_request_response(respargs["data"],
binding)
_response = sp.parse_manage_name_id_request_response(respargs["data"], print _response.response
binding)
print _response.response assert _response.response.id == mnir.id
assert _response.response.id == mnir.id
if __name__ == "__main__": if __name__ == "__main__":
test_flow() test_flow()

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from urlparse import parse_qs from urlparse import parse_qs
from urlparse import urlparse from urlparse import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
@@ -46,66 +47,65 @@ def get_msg(hinfo, binding, response=False):
def test_basic_flow(): def test_basic_flow():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") with closing(Server(config_file="idp_all_conf")) as idp:
# -------- @IDP -------------
# -------- @IDP ------------- relay_state = "FOO"
# -- dummy request ---
orig_req = AuthnRequest(
issuer=sp._issuer(), name_id_policy=NameIDPolicy(
allow_create="true", format=NAMEID_FORMAT_TRANSIENT))
relay_state = "FOO" # == Create an AuthnRequest response
# -- dummy request ---
orig_req = AuthnRequest(
issuer=sp._issuer(), name_id_policy=NameIDPolicy(
allow_create="true", format=NAMEID_FORMAT_TRANSIENT))
# == Create an AuthnRequest response name_id = idp.ident.transient_nameid("id12", sp.config.entityid)
name_id = idp.ident.transient_nameid("id12", sp.config.entityid) binding, destination = idp.pick_binding("assertion_consumer_service",
entity_id=sp.config.entityid)
resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
"id-123456789",
destination,
sp.config.entityid,
name_id=name_id,
authn=AUTHN)
binding, destination = idp.pick_binding("assertion_consumer_service", hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state)
entity_id=sp.config.entityid)
resp = idp.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
"id-123456789",
destination,
sp.config.entityid,
name_id=name_id,
authn=AUTHN)
hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) # --------- @SP -------------
# --------- @SP ------------- xmlstr = get_msg(hinfo, binding)
xmlstr = get_msg(hinfo, binding) aresp = sp.parse_authn_request_response(xmlstr, binding,
{resp.in_response_to: "/"})
aresp = sp.parse_authn_request_response(xmlstr, binding, # == Look for assertion X
{resp.in_response_to: "/"})
# == Look for assertion X asid = aresp.assertion.id
asid = aresp.assertion.id binding, destination = sp.pick_binding("assertion_id_request_service",
entity_id=idp.config.entityid)
binding, destination = sp.pick_binding("assertion_id_request_service", hinfo = sp.apply_binding(binding, asid, destination)
entity_id=idp.config.entityid)
hinfo = sp.apply_binding(binding, asid, destination) # ---------- @IDP ------------
# ---------- @IDP ------------ aid = get_msg(hinfo, binding, response=False)
aid = get_msg(hinfo, binding, response=False) # == construct response
# == construct response resp = idp.create_assertion_id_request_response(aid)
resp = idp.create_assertion_id_request_response(aid) hinfo = idp.apply_binding(binding, "%s" % resp, None, "", response=True)
hinfo = idp.apply_binding(binding, "%s" % resp, None, "", response=True) # ----------- @SP -------------
# ----------- @SP ------------- xmlstr = get_msg(hinfo, binding, response=True)
xmlstr = get_msg(hinfo, binding, response=True) final = sp.parse_assertion_id_request_response(xmlstr, binding)
final = sp.parse_assertion_id_request_response(xmlstr, binding) print final.response
assert isinstance(final.response, Assertion)
print final.response
assert isinstance(final.response, Assertion)

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2.pack import http_redirect_message from saml2.pack import http_redirect_message
from saml2.sigver import verify_redirect_signature from saml2.sigver import verify_redirect_signature
from saml2.sigver import import_rsa_key_from_file from saml2.sigver import import_rsa_key_from_file
@@ -12,37 +13,35 @@ from pathutils import dotname
__author__ = 'rolandh' __author__ = 'rolandh'
idp = Server(config_file=dotname("idp_all_conf"))
conf = SPConfig()
conf.load_file(dotname("servera_conf"))
sp = Saml2Client(conf)
def test(): def test():
srvs = sp.metadata.single_sign_on_service(idp.config.entityid, with closing(Server(config_file=dotname("idp_all_conf"))) as idp:
BINDING_HTTP_REDIRECT) conf = SPConfig()
conf.load_file(dotname("servera_conf"))
sp = Saml2Client(conf)
destination = srvs[0]["location"] srvs = sp.metadata.single_sign_on_service(idp.config.entityid,
req_id, req = sp.create_authn_request(destination, id="id1") BINDING_HTTP_REDIRECT)
try: destination = srvs[0]["location"]
key = sp.sec.key req_id, req = sp.create_authn_request(destination, id="id1")
except AttributeError:
key = import_rsa_key_from_file(sp.sec.key_file)
info = http_redirect_message(req, destination, relay_state="RS", try:
typ="SAMLRequest", sigalg=SIG_RSA_SHA1, key = sp.sec.key
key=key) except AttributeError:
key = import_rsa_key_from_file(sp.sec.key_file)
verified_ok = False info = http_redirect_message(req, destination, relay_state="RS",
typ="SAMLRequest", sigalg=SIG_RSA_SHA1,
key=key)
for param, val in info["headers"]: verified_ok = False
if param == "Location":
_dict = parse_qs(val.split("?")[1])
_certs = idp.metadata.certs(sp.config.entityid, "any", "signing")
for cert in _certs:
if verify_redirect_signature(_dict, cert):
verified_ok = True
assert verified_ok for param, val in info["headers"]:
if param == "Location":
_dict = parse_qs(val.split("?")[1])
_certs = idp.metadata.certs(sp.config.entityid, "any", "signing")
for cert in _certs:
if verify_redirect_signature(_dict, cert):
verified_ok = True
assert verified_ok

View File

@@ -1,3 +1,4 @@
from contextlib import closing
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client from saml2.client import Saml2Client
@@ -19,42 +20,41 @@ def _eq(l1, l2):
def test_flow(): def test_flow():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp1 = Server(config_file="idp_conf_mdb") with closing(Server(config_file="idp_conf_mdb")) as idp1:
idp2 = Server(config_file="idp_conf_mdb") with closing(Server(config_file="idp_conf_mdb")) as idp2:
# clean out database
idp1.ident.mdb.db.drop()
# clean out database # -- dummy request ---
idp1.ident.mdb.db.drop() req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
# -- dummy request --- # == Create an AuthnRequest response
req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
# == Create an AuthnRequest response rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
userid="jeter",
authn=AUTHN,
**rinfo)
#name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) # What's stored away is the assertion
resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop", a_info = idp2.session_db.get_assertion(resp.assertion.id)
"surName": "Jeter", # Make sure what I got back from MongoDB is the same as I put in
"givenName": "Derek", assert a_info["assertion"] == resp.assertion
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"},
userid="jeter",
authn=AUTHN,
**rinfo)
# What's stored away is the assertion # By subject
a_info = idp2.session_db.get_assertion(resp.assertion.id) nid = resp.assertion.subject.name_id
# Make sure what I got back from MongoDB is the same as I put in _assertion = idp2.session_db.get_assertions_by_subject(nid)
assert a_info["assertion"] == resp.assertion assert len(_assertion) == 1
assert _assertion[0] == resp.assertion
# By subject nids = idp2.ident.find_nameid("jeter")
nid = resp.assertion.subject.name_id assert len(nids) == 1
_assertion = idp2.session_db.get_assertions_by_subject(nid)
assert len(_assertion) == 1
assert _assertion[0] == resp.assertion
nids = idp2.ident.find_nameid("jeter")
assert len(nids) == 1
def test_eptid_mongo_db(): def test_eptid_mongo_db():