diff --git a/tests/test_75_mongodb.py b/tests/test_75_mongodb.py index d61fef1..1dd02d8 100644 --- a/tests/test_75_mongodb.py +++ b/tests/test_75_mongodb.py @@ -1,4 +1,5 @@ from contextlib import closing +from pymongo.errors import ConnectionFailure from saml2 import BINDING_HTTP_POST from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client @@ -20,62 +21,71 @@ def _eq(l1, l2): def test_flow(): sp = Saml2Client(config_file="servera_conf") - with closing(Server(config_file="idp_conf_mdb")) as idp1: - with closing(Server(config_file="idp_conf_mdb")) as idp2: - # clean out database - idp1.ident.mdb.db.drop() + try: + with closing(Server(config_file="idp_conf_mdb")) as idp1: + with closing(Server(config_file="idp_conf_mdb")) as idp2: + # clean out database + idp1.ident.mdb.db.drop() - # -- dummy request --- - req_id, orig_req = sp.create_authn_request(idp1.config.entityid) + # -- dummy request --- + req_id, orig_req = sp.create_authn_request(idp1.config.entityid) - # == Create an AuthnRequest response + # == Create an AuthnRequest response - rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) + rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST]) - #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) - resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop", - "surName": "Jeter", - "givenName": "Derek", - "mail": "derek.jeter@nyy.mlb.com", - "title": "The man"}, - userid="jeter", - authn=AUTHN, - **rinfo) + #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"]) + resp = idp1.create_authn_response( + { + "eduPersonEntitlement": "Short stop", + "surName": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man"}, + userid="jeter", + authn=AUTHN, + **rinfo) - # What's stored away is the assertion - a_info = idp2.session_db.get_assertion(resp.assertion.id) - # Make sure what I got back from MongoDB is the same as I put in - assert a_info["assertion"] == resp.assertion + # What's stored away is the assertion + a_info = idp2.session_db.get_assertion(resp.assertion.id) + # Make sure what I got back from MongoDB is the same as I put in + assert a_info["assertion"] == resp.assertion - # By subject - nid = resp.assertion.subject.name_id - _assertion = idp2.session_db.get_assertions_by_subject(nid) - assert len(_assertion) == 1 - assert _assertion[0] == resp.assertion + # By subject + nid = resp.assertion.subject.name_id + _assertion = idp2.session_db.get_assertions_by_subject(nid) + assert len(_assertion) == 1 + assert _assertion[0] == resp.assertion - nids = idp2.ident.find_nameid("jeter") - assert len(nids) == 1 + nids = idp2.ident.find_nameid("jeter") + assert len(nids) == 1 + except ConnectionFailure: + pass def test_eptid_mongo_db(): - edb = EptidMDB("secret", "idp") - e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id", - "some other data") - print e1 - assert e1.startswith("idp_entity_id!sp_entity_id!") - e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id", - "some other data") - assert e1 == e2 + try: + edb = EptidMDB("secret", "idp") + except ConnectionFailure: + pass + else: + e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id", + "some other data") + print e1 + assert e1.startswith("idp_entity_id!sp_entity_id!") + e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id", + "some other data") + assert e1 == e2 - e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2", - "some other data") - print e3 - assert e1 != e3 + e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2", + "some other data") + print e3 + assert e1 != e3 - e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id", - "some other data") - assert e4 != e1 - assert e4 != e3 + e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id", + "some other data") + assert e4 != e1 + assert e4 != e3 diff --git a/tests/test_76_metadata_in_mdb.py b/tests/test_76_metadata_in_mdb.py index 32369e5..e6b6f0b 100644 --- a/tests/test_76_metadata_in_mdb.py +++ b/tests/test_76_metadata_in_mdb.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from pymongo.errors import ConnectionFailure __author__ = 'rolandh' @@ -55,43 +56,46 @@ def test_metadata(): mds.imp({"local": [full_path("swamid-2.0.xml")]}) assert len(mds) == 1 # One source - export_mdstore_to_mongo_db(mds, "metadata", "test") + try: + export_mdstore_to_mongo_db(mds, "metadata", "test") + except ConnectionFailure: + pass + else: + mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test") + # replace all metadata instances with this one + mds.metadata = {"mongo_db": mdmdb} - mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test") - # replace all metadata instances with this one - mds.metadata = {"mongo_db": mdmdb} + idps = mds.with_descriptor("idpsso") + assert idps.keys() + idpsso = mds.single_sign_on_service(UMU_IDP) + assert len(idpsso) == 1 + assert destinations(idpsso) == [ + 'https://idp.umu.se/saml2/idp/SSOService.php'] - idps = mds.with_descriptor("idpsso") - assert idps.keys() - idpsso = mds.single_sign_on_service(UMU_IDP) - assert len(idpsso) == 1 - assert destinations(idpsso) == [ - 'https://idp.umu.se/saml2/idp/SSOService.php'] + _name = name(mds[UMU_IDP]) + assert _name == u'Ume\xe5 University' + certs = mds.certs(UMU_IDP, "idpsso", "signing") + assert len(certs) == 1 - _name = name(mds[UMU_IDP]) - assert _name == u'Ume\xe5 University' - certs = mds.certs(UMU_IDP, "idpsso", "signing") - assert len(certs) == 1 + sps = mds.with_descriptor("spsso") + assert len(sps) == 417 - sps = mds.with_descriptor("spsso") - assert len(sps) == 417 + wants = mds.attribute_requirement('https://connect.sunet.se/shibboleth') + assert wants["optional"] == [] + lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]] + assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn', + 'eduPersonScopedAffiliation', 'eduPersonAffiliation']) - wants = mds.attribute_requirement('https://connect.sunet.se/shibboleth') - assert wants["optional"] == [] - lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]] - assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn', - 'eduPersonScopedAffiliation', 'eduPersonAffiliation']) - - wants = mds.attribute_requirement( - "https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp") - # Optional - lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]] - assert _eq(lnamn, ['displayName', 'commonName', 'schacHomeOrganization', - 'eduPersonAffiliation', 'schacHomeOrganizationType']) - # Required - lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]] - assert _eq(lnamn, ['eduPersonTargetedID', 'mail', - 'eduPersonScopedAffiliation']) + wants = mds.attribute_requirement( + "https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp") + # Optional + lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]] + assert _eq(lnamn, ['displayName', 'commonName', 'schacHomeOrganization', + 'eduPersonAffiliation', 'schacHomeOrganizationType']) + # Required + lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]] + assert _eq(lnamn, ['eduPersonTargetedID', 'mail', + 'eduPersonScopedAffiliation']) if __name__ == "__main__": test_metadata()