Tests will not fail if you don't have a MondoDB running.
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from contextlib import closing
|
||||
from pymongo.errors import ConnectionFailure
|
||||
from saml2 import BINDING_HTTP_POST
|
||||
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
|
||||
from saml2.client import Saml2Client
|
||||
@@ -20,62 +21,71 @@ def _eq(l1, l2):
|
||||
|
||||
def test_flow():
|
||||
sp = Saml2Client(config_file="servera_conf")
|
||||
with closing(Server(config_file="idp_conf_mdb")) as idp1:
|
||||
with closing(Server(config_file="idp_conf_mdb")) as idp2:
|
||||
# clean out database
|
||||
idp1.ident.mdb.db.drop()
|
||||
try:
|
||||
with closing(Server(config_file="idp_conf_mdb")) as idp1:
|
||||
with closing(Server(config_file="idp_conf_mdb")) as idp2:
|
||||
# clean out database
|
||||
idp1.ident.mdb.db.drop()
|
||||
|
||||
# -- dummy request ---
|
||||
req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
|
||||
# -- dummy request ---
|
||||
req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
|
||||
|
||||
# == Create an AuthnRequest response
|
||||
# == Create an AuthnRequest response
|
||||
|
||||
rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
|
||||
rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
|
||||
|
||||
#name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
|
||||
resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop",
|
||||
"surName": "Jeter",
|
||||
"givenName": "Derek",
|
||||
"mail": "derek.jeter@nyy.mlb.com",
|
||||
"title": "The man"},
|
||||
userid="jeter",
|
||||
authn=AUTHN,
|
||||
**rinfo)
|
||||
#name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
|
||||
resp = idp1.create_authn_response(
|
||||
{
|
||||
"eduPersonEntitlement": "Short stop",
|
||||
"surName": "Jeter",
|
||||
"givenName": "Derek",
|
||||
"mail": "derek.jeter@nyy.mlb.com",
|
||||
"title": "The man"},
|
||||
userid="jeter",
|
||||
authn=AUTHN,
|
||||
**rinfo)
|
||||
|
||||
# What's stored away is the assertion
|
||||
a_info = idp2.session_db.get_assertion(resp.assertion.id)
|
||||
# Make sure what I got back from MongoDB is the same as I put in
|
||||
assert a_info["assertion"] == resp.assertion
|
||||
# What's stored away is the assertion
|
||||
a_info = idp2.session_db.get_assertion(resp.assertion.id)
|
||||
# Make sure what I got back from MongoDB is the same as I put in
|
||||
assert a_info["assertion"] == resp.assertion
|
||||
|
||||
# By subject
|
||||
nid = resp.assertion.subject.name_id
|
||||
_assertion = idp2.session_db.get_assertions_by_subject(nid)
|
||||
assert len(_assertion) == 1
|
||||
assert _assertion[0] == resp.assertion
|
||||
# By subject
|
||||
nid = resp.assertion.subject.name_id
|
||||
_assertion = idp2.session_db.get_assertions_by_subject(nid)
|
||||
assert len(_assertion) == 1
|
||||
assert _assertion[0] == resp.assertion
|
||||
|
||||
nids = idp2.ident.find_nameid("jeter")
|
||||
assert len(nids) == 1
|
||||
nids = idp2.ident.find_nameid("jeter")
|
||||
assert len(nids) == 1
|
||||
except ConnectionFailure:
|
||||
pass
|
||||
|
||||
|
||||
def test_eptid_mongo_db():
|
||||
edb = EptidMDB("secret", "idp")
|
||||
e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
|
||||
"some other data")
|
||||
print e1
|
||||
assert e1.startswith("idp_entity_id!sp_entity_id!")
|
||||
e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
|
||||
"some other data")
|
||||
assert e1 == e2
|
||||
try:
|
||||
edb = EptidMDB("secret", "idp")
|
||||
except ConnectionFailure:
|
||||
pass
|
||||
else:
|
||||
e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
|
||||
"some other data")
|
||||
print e1
|
||||
assert e1.startswith("idp_entity_id!sp_entity_id!")
|
||||
e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
|
||||
"some other data")
|
||||
assert e1 == e2
|
||||
|
||||
e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2",
|
||||
"some other data")
|
||||
print e3
|
||||
assert e1 != e3
|
||||
e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2",
|
||||
"some other data")
|
||||
print e3
|
||||
assert e1 != e3
|
||||
|
||||
e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id",
|
||||
"some other data")
|
||||
assert e4 != e1
|
||||
assert e4 != e3
|
||||
e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id",
|
||||
"some other data")
|
||||
assert e4 != e1
|
||||
assert e4 != e3
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from pymongo.errors import ConnectionFailure
|
||||
|
||||
__author__ = 'rolandh'
|
||||
|
||||
@@ -55,43 +56,46 @@ def test_metadata():
|
||||
mds.imp({"local": [full_path("swamid-2.0.xml")]})
|
||||
assert len(mds) == 1 # One source
|
||||
|
||||
export_mdstore_to_mongo_db(mds, "metadata", "test")
|
||||
try:
|
||||
export_mdstore_to_mongo_db(mds, "metadata", "test")
|
||||
except ConnectionFailure:
|
||||
pass
|
||||
else:
|
||||
mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test")
|
||||
# replace all metadata instances with this one
|
||||
mds.metadata = {"mongo_db": mdmdb}
|
||||
|
||||
mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test")
|
||||
# replace all metadata instances with this one
|
||||
mds.metadata = {"mongo_db": mdmdb}
|
||||
idps = mds.with_descriptor("idpsso")
|
||||
assert idps.keys()
|
||||
idpsso = mds.single_sign_on_service(UMU_IDP)
|
||||
assert len(idpsso) == 1
|
||||
assert destinations(idpsso) == [
|
||||
'https://idp.umu.se/saml2/idp/SSOService.php']
|
||||
|
||||
idps = mds.with_descriptor("idpsso")
|
||||
assert idps.keys()
|
||||
idpsso = mds.single_sign_on_service(UMU_IDP)
|
||||
assert len(idpsso) == 1
|
||||
assert destinations(idpsso) == [
|
||||
'https://idp.umu.se/saml2/idp/SSOService.php']
|
||||
_name = name(mds[UMU_IDP])
|
||||
assert _name == u'Ume\xe5 University'
|
||||
certs = mds.certs(UMU_IDP, "idpsso", "signing")
|
||||
assert len(certs) == 1
|
||||
|
||||
_name = name(mds[UMU_IDP])
|
||||
assert _name == u'Ume\xe5 University'
|
||||
certs = mds.certs(UMU_IDP, "idpsso", "signing")
|
||||
assert len(certs) == 1
|
||||
sps = mds.with_descriptor("spsso")
|
||||
assert len(sps) == 417
|
||||
|
||||
sps = mds.with_descriptor("spsso")
|
||||
assert len(sps) == 417
|
||||
wants = mds.attribute_requirement('https://connect.sunet.se/shibboleth')
|
||||
assert wants["optional"] == []
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
|
||||
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
|
||||
'eduPersonScopedAffiliation', 'eduPersonAffiliation'])
|
||||
|
||||
wants = mds.attribute_requirement('https://connect.sunet.se/shibboleth')
|
||||
assert wants["optional"] == []
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
|
||||
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
|
||||
'eduPersonScopedAffiliation', 'eduPersonAffiliation'])
|
||||
|
||||
wants = mds.attribute_requirement(
|
||||
"https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp")
|
||||
# Optional
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
|
||||
assert _eq(lnamn, ['displayName', 'commonName', 'schacHomeOrganization',
|
||||
'eduPersonAffiliation', 'schacHomeOrganizationType'])
|
||||
# Required
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
|
||||
assert _eq(lnamn, ['eduPersonTargetedID', 'mail',
|
||||
'eduPersonScopedAffiliation'])
|
||||
wants = mds.attribute_requirement(
|
||||
"https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp")
|
||||
# Optional
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
|
||||
assert _eq(lnamn, ['displayName', 'commonName', 'schacHomeOrganization',
|
||||
'eduPersonAffiliation', 'schacHomeOrganizationType'])
|
||||
# Required
|
||||
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
|
||||
assert _eq(lnamn, ['eduPersonTargetedID', 'mail',
|
||||
'eduPersonScopedAffiliation'])
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_metadata()
|
||||
|
||||
Reference in New Issue
Block a user