499 lines
18 KiB
Python
499 lines
18 KiB
Python
import datetime
|
|
import re
|
|
#import os
|
|
|
|
from saml2 import metadata
|
|
from saml2 import make_vals
|
|
from saml2 import make_instance
|
|
from saml2 import BINDING_HTTP_REDIRECT
|
|
from saml2 import BINDING_HTTP_POST
|
|
from saml2 import BINDING_SOAP
|
|
from saml2 import BINDING_HTTP_ARTIFACT
|
|
from saml2 import md, saml, samlp
|
|
from saml2 import time_util
|
|
from saml2 import NAMESPACE as SAML2_NAMESPACE
|
|
from saml2.saml import NAMEID_FORMAT_TRANSIENT, NAME_FORMAT_URI
|
|
from saml2.attribute_converter import ac_factory, to_local_name
|
|
|
|
#from py.test import raises
|
|
|
|
SWAMI_METADATA = "swamid-1.0.xml"
|
|
INCOMMON_METADATA = "InCommon-metadata.xml"
|
|
EXAMPLE_METADATA = "metadata_example.xml"
|
|
SWITCH_METADATA = "metadata.aaitest.xml"
|
|
SP_METADATA = "metasp.xml"
|
|
|
|
def _eq(l1,l2):
|
|
return set(l1) == set(l2)
|
|
|
|
def _read_file(name):
|
|
try:
|
|
return open(name).read()
|
|
except IOError:
|
|
name = "tests/"+name
|
|
return open(name).read()
|
|
|
|
def _read_lines(name):
|
|
try:
|
|
return open(name).readlines()
|
|
except IOError:
|
|
name = "tests/"+name
|
|
return open(name).readlines()
|
|
|
|
def _fix_valid_until(xmlstring):
|
|
new_date = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
new_date = new_date.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
return re.sub(r' validUntil=".*?"', ' validUntil="%s"' % new_date,
|
|
xmlstring)
|
|
|
|
ATTRCONV = ac_factory("attributemaps")
|
|
|
|
def test_swami_1():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_read_file(SWAMI_METADATA),"-")
|
|
print len(md.entity)
|
|
assert len(md.entity)
|
|
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
|
|
if "idpsso" in ent])
|
|
print idps
|
|
assert idps.keys()
|
|
idpsso = md.single_sign_on_service(
|
|
'https://idp.umu.se/saml2/idp/metadata.php')
|
|
assert md.name('https://idp.umu.se/saml2/idp/metadata.php') == (
|
|
u'Ume\xe5 University (SAML2)')
|
|
assert len(idpsso) == 1
|
|
assert idpsso == ['https://idp.umu.se/saml2/idp/SSOService.php']
|
|
print md._loc_key['https://idp.umu.se/saml2/idp/SSOService.php']
|
|
ssocerts = md.certs('https://idp.umu.se/saml2/idp/SSOService.php', "signing")
|
|
print ssocerts
|
|
assert len(ssocerts) == 1
|
|
sps = dict([(id,ent["spsso"]) for id,ent in md.entity.items()\
|
|
if "spsso" in ent])
|
|
|
|
acs_sp = []
|
|
for nam, desc in sps.items():
|
|
if desc[0].attribute_consuming_service:
|
|
acs_sp.append(nam)
|
|
|
|
#print md.wants('https://www.diva-portal.org/shibboleth')
|
|
wants = md.attribute_requirement('https://connect8.sunet.se/shibboleth')
|
|
lnamn = [to_local_name(md.attrconv, attr) for attr in wants[1]]
|
|
assert _eq(lnamn,
|
|
['mail', 'givenName', 'eduPersonPrincipalName', 'sn',
|
|
'eduPersonScopedAffiliation'])
|
|
|
|
wants = md.attribute_requirement('https://beta.lobber.se/shibboleth')
|
|
assert wants[0] == []
|
|
lnamn = [to_local_name(md.attrconv, attr) for attr in wants[1]]
|
|
assert _eq(lnamn,
|
|
['eduPersonScopedAffiliation', 'eduPersonEntitlement',
|
|
'eduPersonPrincipalName', 'sn', 'mail', 'givenName'])
|
|
|
|
def test_incommon_1():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_read_file(INCOMMON_METADATA),"-")
|
|
print len(md.entity)
|
|
assert len(md.entity) == 442
|
|
idps = dict([
|
|
(id,ent["idpsso"]) for id,ent in md.entity.items() if "idpsso" in ent])
|
|
print idps.keys()
|
|
assert len(idps) == 53 # !!!!???? < 10%
|
|
assert md.single_sign_on_service('urn:mace:incommon:uiuc.edu') == []
|
|
idpsso = md.single_sign_on_service('urn:mace:incommon:alaska.edu')
|
|
assert len(idpsso) == 1
|
|
print idpsso
|
|
assert idpsso == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
|
|
|
|
sps = dict([(id,ent["spsso"]) for id,ent in md.entity.items()\
|
|
if "spsso" in ent])
|
|
|
|
acs_sp = []
|
|
for nam, desc in sps.items():
|
|
if desc[0].attribute_consuming_service:
|
|
acs_sp.append(nam)
|
|
|
|
assert len(acs_sp) == 0
|
|
|
|
# Look for attribute authorities
|
|
aas = dict([(id,ent["attribute_authority"]) for id,ent in md.entity.items()\
|
|
if "attribute_authority" in ent])
|
|
|
|
print aas.keys()
|
|
assert len(aas) == 53
|
|
|
|
def test_ext_2():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_read_file("extended.xml"),"-")
|
|
# No specific binding defined
|
|
|
|
eid = [id for id,ent in md.entity.items() if "spsso" in ent]
|
|
|
|
endps = md.single_logout_service(eid[0], None)
|
|
assert len(endps) == 4
|
|
assert _eq([b for b, e in endps], [BINDING_SOAP, BINDING_HTTP_REDIRECT,
|
|
BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT])
|
|
|
|
def test_example():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_read_file(EXAMPLE_METADATA), "-")
|
|
print len(md.entity)
|
|
assert len(md.entity) == 1
|
|
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
|
|
if "idpsso" in ent])
|
|
assert idps.keys() == [
|
|
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
|
|
print md._loc_key['http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
|
|
certs = md.certs(
|
|
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
|
|
"signing")
|
|
assert len(certs) == 1
|
|
assert isinstance(certs[0], tuple)
|
|
assert len(certs[0]) == 2
|
|
|
|
def test_switch_1():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_read_file(SWITCH_METADATA), "-")
|
|
print len(md.entity)
|
|
assert len(md.entity) == 90
|
|
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
|
|
if "idpsso" in ent])
|
|
print idps.keys()
|
|
idpsso = md.single_sign_on_service(
|
|
'https://aai-demo-idp.switch.ch/idp/shibboleth')
|
|
assert len(idpsso) == 1
|
|
print idpsso
|
|
assert idpsso == [
|
|
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
|
|
assert len(idps) == 16
|
|
aas = dict([(id,ent["attribute_authority"]) for id,ent in md.entity.items() \
|
|
if "attribute_authority" in ent])
|
|
print aas.keys()
|
|
aads = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
|
|
assert len(aads) == 1
|
|
aad = aads[0]
|
|
assert len(aad.attribute_service) == 1
|
|
assert len(aad.name_id_format) == 2
|
|
dual = dict([(id,ent) for id,ent in md.entity.items() \
|
|
if "idpsso" in ent and "spsso" in ent])
|
|
print len(dual)
|
|
assert len(dual) == 0
|
|
|
|
def test_sp_metadata():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_fix_valid_until(_read_file(SP_METADATA)), "-")
|
|
|
|
print md.entity
|
|
assert len(md.entity) == 1
|
|
assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp']
|
|
assert _eq(md.entity['urn:mace:umu.se:saml:roland:sp'].keys(), [
|
|
'valid_until',"organization","spsso",
|
|
'contact_person'])
|
|
print md.entity['urn:mace:umu.se:saml:roland:sp']["spsso"][0].keyswv()
|
|
(req,opt) = md.attribute_requirement('urn:mace:umu.se:saml:roland:sp')
|
|
print req
|
|
assert len(req) == 3
|
|
assert len(opt) == 1
|
|
assert opt[0].name == 'urn:oid:2.5.4.12'
|
|
assert opt[0].friendly_name == 'title'
|
|
assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42',
|
|
'urn:oid:0.9.2342.19200300.100.1.3'])
|
|
assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail'])
|
|
|
|
KALMAR2_URL = "https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2"
|
|
KALMAR2_CERT = "kalmar2.pem"
|
|
|
|
#def test_import_external_metadata(xmlsec):
|
|
# md = metadata.MetaData(xmlsec,attrconv=ATTRCONV)
|
|
# md.import_external_metadata(KALMAR2_URL, KALMAR2_CERT)
|
|
#
|
|
# print len(md.entity)
|
|
# assert len(md.entity) > 20
|
|
# idps = dict([
|
|
# (id,ent["idpsso"]) for id,ent in md.entity.items() if "idpsso" in ent])
|
|
# print idps.keys()
|
|
# assert len(idps) > 1
|
|
# assert "https://idp.umu.se/saml2/idp/metadata.php" in idps
|
|
|
|
# ------------ Constructing metaval ----------------------------------------
|
|
|
|
def test_construct_organisation_name():
|
|
o = md.Organization()
|
|
make_vals({"text":"Exempel AB", "lang":"se"},
|
|
md.OrganizationName, o, "organization_name")
|
|
print o
|
|
assert str(o) == """<?xml version='1.0' encoding='UTF-8'?>
|
|
<ns0:Organization xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:OrganizationName xml:lang="se">Exempel AB</ns0:OrganizationName></ns0:Organization>"""
|
|
|
|
def test_make_int_value():
|
|
val = make_vals( 1, saml.AttributeValue, part=True)
|
|
assert isinstance(val, saml.AttributeValue)
|
|
assert val.text == "1"
|
|
|
|
def test_make_true_value():
|
|
val = make_vals( True, saml.AttributeValue, part=True )
|
|
assert isinstance(val, saml.AttributeValue)
|
|
assert val.text == "true"
|
|
|
|
def test_make_false_value():
|
|
val = make_vals( False, saml.AttributeValue, part=True )
|
|
assert isinstance(val, saml.AttributeValue)
|
|
assert val.text == "false"
|
|
|
|
NO_VALUE = """<?xml version='1.0' encoding='UTF-8'?>
|
|
<saml:AttributeValue xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" />"""
|
|
|
|
def test_make_no_value():
|
|
val = make_vals( None, saml.AttributeValue, part=True )
|
|
assert isinstance(val, saml.AttributeValue)
|
|
assert val.text == ""
|
|
print val
|
|
assert val.to_string({'saml': saml.NAMESPACE}) == NO_VALUE
|
|
|
|
def test_make_string():
|
|
val = make_vals( "example", saml.AttributeValue, part=True )
|
|
assert isinstance(val, saml.AttributeValue)
|
|
assert val.text == "example"
|
|
|
|
def test_make_list_of_strings():
|
|
attr = saml.Attribute()
|
|
vals = ["foo", "bar"]
|
|
make_vals(vals, saml.AttributeValue, attr, "attribute_value")
|
|
assert attr.keyswv() == ["attribute_value"]
|
|
print attr.attribute_value
|
|
assert _eq([val.text for val in attr.attribute_value], vals)
|
|
|
|
def test_make_dict():
|
|
vals = ["foo", "bar"]
|
|
attrval = { "attribute_value": vals}
|
|
attr = make_vals(attrval, saml.Attribute, part=True)
|
|
assert attr.keyswv() == ["attribute_value"]
|
|
assert _eq([val.text for val in attr.attribute_value], vals)
|
|
|
|
# ------------ Constructing metadata ----------------------------------------
|
|
|
|
def test_construct_contact():
|
|
c = make_instance(md.ContactPerson, {
|
|
"given_name":"Roland",
|
|
"sur_name": "Hedberg",
|
|
"email_address": "roland@catalogix.se",
|
|
})
|
|
print c
|
|
assert c.given_name.text == "Roland"
|
|
assert c.sur_name.text == "Hedberg"
|
|
assert c.email_address[0].text == "roland@catalogix.se"
|
|
assert _eq(c.keyswv(), ["given_name","sur_name","email_address"])
|
|
|
|
|
|
def test_construct_organisation():
|
|
c = make_instance( md.Organization, {
|
|
"organization_name": ["Example Co.",
|
|
{"text":"Exempel AB", "lang":"se"}],
|
|
"organization_url": "http://www.example.com/"
|
|
})
|
|
|
|
assert _eq(c.keyswv(), ["organization_name","organization_url"])
|
|
assert len(c.organization_name) == 2
|
|
org_names = [on.text for on in c.organization_name]
|
|
assert _eq(org_names,["Exempel AB","Example Co."])
|
|
assert len(c.organization_url) == 1
|
|
|
|
def test_construct_entity_descr_1():
|
|
ed = make_instance(md.EntityDescriptor,
|
|
{"organization": {
|
|
"organization_name":"Catalogix",
|
|
"organization_url": "http://www.catalogix.se/"},
|
|
"entity_id": "urn:mace:catalogix.se:sp1",
|
|
})
|
|
|
|
assert ed.entity_id == "urn:mace:catalogix.se:sp1"
|
|
org = ed.organization
|
|
assert org
|
|
assert _eq(org.keyswv(), ["organization_name","organization_url"])
|
|
assert len(org.organization_name) == 1
|
|
assert org.organization_name[0].text == "Catalogix"
|
|
assert org.organization_url[0].text == "http://www.catalogix.se/"
|
|
|
|
def test_construct_entity_descr_2():
|
|
ed = make_instance(md.EntityDescriptor,
|
|
{"organization": {
|
|
"organization_name":"Catalogix",
|
|
"organization_url": "http://www.catalogix.se/"},
|
|
"entity_id": "urn:mace:catalogix.se:sp1",
|
|
"contact_person": {
|
|
"given_name":"Roland",
|
|
"sur_name": "Hedberg",
|
|
"email_address": "roland@catalogix.se",
|
|
}
|
|
})
|
|
|
|
assert _eq(ed.keyswv(), ["entity_id", "contact_person", "organization"])
|
|
assert ed.entity_id == "urn:mace:catalogix.se:sp1"
|
|
org = ed.organization
|
|
assert org
|
|
assert _eq(org.keyswv(), ["organization_name", "organization_url"])
|
|
assert len(org.organization_name) == 1
|
|
assert org.organization_name[0].text == "Catalogix"
|
|
assert org.organization_url[0].text == "http://www.catalogix.se/"
|
|
assert len(ed.contact_person) == 1
|
|
c = ed.contact_person[0]
|
|
assert c.given_name.text == "Roland"
|
|
assert c.sur_name.text == "Hedberg"
|
|
assert c.email_address[0].text == "roland@catalogix.se"
|
|
assert _eq(c.keyswv(), ["given_name","sur_name","email_address"])
|
|
|
|
def test_construct_key_descriptor():
|
|
cert = "".join(_read_lines("test.pem")[1:-1]).strip()
|
|
spec = {
|
|
"use": "signing",
|
|
"key_info" : {
|
|
"x509_data": {
|
|
"x509_certificate": cert
|
|
}
|
|
}
|
|
}
|
|
kd = make_instance(md.KeyDescriptor, spec)
|
|
assert _eq(kd.keyswv(), ["use", "key_info"])
|
|
assert kd.use == "signing"
|
|
ki = kd.key_info
|
|
assert _eq(ki.keyswv(), ["x509_data"])
|
|
assert len(ki.x509_data) == 1
|
|
data = ki.x509_data[0]
|
|
assert _eq(data.keyswv(), ["x509_certificate"])
|
|
assert data.x509_certificate
|
|
assert len(data.x509_certificate.text.strip()) == len(cert)
|
|
|
|
def test_construct_key_descriptor_with_key_name():
|
|
cert = "".join(_read_lines("test.pem")[1:-1]).strip()
|
|
spec = {
|
|
"use": "signing",
|
|
"key_info" : {
|
|
"key_name": "example.com",
|
|
"x509_data": {
|
|
"x509_certificate": cert
|
|
}
|
|
}
|
|
}
|
|
kd = make_instance(md.KeyDescriptor, spec)
|
|
assert _eq(kd.keyswv(), ["use", "key_info"])
|
|
assert kd.use == "signing"
|
|
ki = kd.key_info
|
|
assert _eq(ki.keyswv(), ["x509_data", "key_name"])
|
|
assert len(ki.key_name) == 1
|
|
assert ki.key_name[0].text.strip() == "example.com"
|
|
assert len(ki.x509_data) == 1
|
|
data = ki.x509_data[0]
|
|
assert _eq(data.keyswv(), ["x509_certificate"])
|
|
assert data.x509_certificate
|
|
assert len(data.x509_certificate.text.strip()) == len(cert)
|
|
|
|
def test_construct_AttributeAuthorityDescriptor():
|
|
aad = make_instance(
|
|
md.AttributeAuthorityDescriptor, {
|
|
"valid_until": time_util.in_a_while(30), # 30 days from now
|
|
"id": "aad.example.com",
|
|
"protocol_support_enumeration": SAML2_NAMESPACE,
|
|
"attribute_service": {
|
|
"binding": BINDING_SOAP,
|
|
"location": "http://example.com:6543/saml2/aad",
|
|
},
|
|
"name_id_format":[
|
|
NAMEID_FORMAT_TRANSIENT,
|
|
],
|
|
"key_descriptor": {
|
|
"use": "signing",
|
|
"key_info" : {
|
|
"key_name": "example.com",
|
|
}
|
|
}
|
|
})
|
|
|
|
print aad
|
|
assert _eq(aad.keyswv(),["valid_until", "id", "attribute_service",
|
|
"name_id_format", "key_descriptor",
|
|
"protocol_support_enumeration"])
|
|
assert time_util.str_to_time(aad.valid_until)
|
|
assert aad.id == "aad.example.com"
|
|
assert aad.protocol_support_enumeration == SAML2_NAMESPACE
|
|
assert len(aad.attribute_service) == 1
|
|
atsr = aad.attribute_service[0]
|
|
assert _eq(atsr.keyswv(),["binding", "location"])
|
|
assert atsr.binding == BINDING_SOAP
|
|
assert atsr.location == "http://example.com:6543/saml2/aad"
|
|
assert len(aad.name_id_format) == 1
|
|
nif = aad.name_id_format[0]
|
|
assert nif.text.strip() == NAMEID_FORMAT_TRANSIENT
|
|
assert len(aad.key_descriptor) == 1
|
|
kdesc = aad.key_descriptor[0]
|
|
assert kdesc.use == "signing"
|
|
assert kdesc.key_info.key_name[0].text.strip() == "example.com"
|
|
|
|
STATUS_RESULT = """<?xml version='1.0' encoding='UTF-8'?>
|
|
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" /></ns0:StatusCode><ns0:StatusMessage>Error resolving principal</ns0:StatusMessage></ns0:Status>"""
|
|
|
|
def test_status():
|
|
input = {
|
|
"status_code": {
|
|
"value": samlp.STATUS_RESPONDER,
|
|
"status_code":
|
|
{
|
|
"value": samlp.STATUS_UNKNOWN_PRINCIPAL,
|
|
},
|
|
},
|
|
"status_message": "Error resolving principal",
|
|
}
|
|
status_text = "%s" % make_instance( samlp.Status, input)
|
|
assert status_text == STATUS_RESULT
|
|
|
|
def test_attributes():
|
|
required = ["surname", "givenname", "edupersonaffiliation"]
|
|
ra = metadata.do_requested_attribute(required, ATTRCONV, "True")
|
|
print ra
|
|
assert ra
|
|
assert len(ra) == 3
|
|
for i in range(3):
|
|
assert isinstance(ra[i], md.RequestedAttribute)
|
|
assert ra[i].name_format == NAME_FORMAT_URI
|
|
assert ra[i].attribute_value == []
|
|
assert ra[i].is_required == "True"
|
|
assert ra[0].friendly_name == "surname"
|
|
assert ra[0].name == 'urn:oid:2.5.4.4'
|
|
|
|
def test_extend():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_fix_valid_until(_read_file("extended.xml")), "-")
|
|
|
|
signcerts = md.certs("https://coip-test.sunet.se/shibboleth", "signing")
|
|
assert len(signcerts) == 1
|
|
enccerts = md.certs("https://coip-test.sunet.se/shibboleth", "encryption")
|
|
assert len(enccerts) == 1
|
|
assert signcerts[0] == enccerts[0]
|
|
|
|
def test_ui_info():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_fix_valid_until(_read_file("idp_uiinfo.xml")), "-")
|
|
loc = md.single_sign_on_services_with_uiinfo(
|
|
"http://example.com/saml2/idp.xml")
|
|
assert len(loc) == 1
|
|
assert loc[0][0] == "http://example.com/saml2/"
|
|
assert len(loc[0][1]) == 1
|
|
ui_info = loc[0][1][0]
|
|
print ui_info
|
|
assert ui_info.description[0].text == "Exempel bolag"
|
|
|
|
def test_pdp():
|
|
md = metadata.MetaData(attrconv=ATTRCONV)
|
|
md.import_metadata(_fix_valid_until(_read_file("pdp_meta.xml")), "-")
|
|
|
|
assert md
|
|
|
|
pdps = md.pdp_services("http://www.example.org/pysaml2/")
|
|
|
|
assert len(pdps) == 1
|
|
pdp = pdps[0]
|
|
assert len(pdp.authz_service) == 1
|
|
assert pdp.authz_service[0].location == "http://www.example.org/pysaml2/authz"
|
|
assert pdp.authz_service[0].binding == BINDING_SOAP
|
|
endpoints = md.authz_service("http://www.example.org/pysaml2/")
|
|
assert len(endpoints) == 1
|
|
assert endpoints[0] == "http://www.example.org/pysaml2/authz"
|