Fixed so all tests works

This commit is contained in:
Roland Hedberg
2010-04-17 22:12:33 +02:00
parent 8df6bc3ef1
commit 5911db64a0
9 changed files with 151 additions and 25 deletions

View File

@@ -42,7 +42,7 @@ def ava_fro(acs, statement):
if statement == []:
return {}
acsdic = dict([(ac.format, ac) for ac in acs])
acsdic = dict([(ac.name_format, ac) for ac in acs])
acsdic[None] = acsdic[NAME_FORMAT_URI]
return dict([acsdic[a.name_format].ava_fro(a) for a in statement])
@@ -62,7 +62,7 @@ def to_local(acs, statement):
def from_local(acs, ava, name_format):
for aconv in acs:
#print ac.format, name_format
if aconv.format == name_format:
if aconv.name_format == name_format:
#print "Found a name_form converter"
return aconv.to(ava)
@@ -77,7 +77,7 @@ def from_local_name(acs, attr, name_format):
"""
for aconv in acs:
#print ac.format, name_format
if aconv.format == name_format:
if aconv.name_format == name_format:
#print "Found a name_form converter"
return aconv.to_format(attr)
return attr

View File

@@ -27,6 +27,7 @@ from saml2 import md, BINDING_HTTP_POST
from saml2 import samlp, BINDING_HTTP_REDIRECT, BINDING_SOAP
#from saml2.time_util import str_to_time
from saml2.sigver import make_temp, cert_from_key_info, verify_signature
from saml2.sigver import pem_format
from saml2.time_util import valid
from saml2.attribute_converter import ava_fro
@@ -107,8 +108,7 @@ class MetaData(object):
for key_desc in tssd.key_descriptor:
certs.extend(cert_from_key_info(key_desc.key_info))
certs = [make_temp(pem_format(cert), ".pem",
False) for c in certs]
certs = [make_temp(pem_format(c), ".pem", False) for c in certs]
for acs in tssd.attribute_consuming_service:
for attr in acs.requested_attribute:

View File

@@ -4,6 +4,8 @@
"sp":{
"name" : "urn:mace:example.com:saml:roland:sp",
"url": "http://lingon.catalogix.se:8087/",
"required_attributes": ["surName", "givenName", "mail"],
"optional_attributes": ["title"],
"idp":{
"entity_id": ["urn:mace:example.com:saml:roland:idp"],
},

View File

@@ -3,6 +3,7 @@ from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue
from saml2.assertion import Policy, Assertion, filter_on_attributes
from saml2.assertion import filter_attribute_value_assertions
from saml2.utils import MissingValue
from saml2 import attribute_converter
from py.test import raises
@@ -357,6 +358,52 @@ def test_filter_values_req_opt_1():
ava = filter_on_attributes(ava, [r], [o])
assert ava.keys() == ["serialNumber"]
assert _eq(ava["serialNumber"], ["12345","54321"])
def test_filter_values_req_opt_2():
r = [Attribute(friendly_name="surName",
name="urn:oid:2.5.4.4",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
Attribute(friendly_name="givenName",
name="urn:oid:2.5.4.42",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
Attribute(friendly_name="mail",
name="urn:oid:0.9.2342.19200300.100.1.3",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
o = [Attribute(friendly_name="title",
name="urn:oid:2.5.4.12",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
ava = { "surname":["Hedberg"], "givenName":["Roland"],
"eduPersonAffiliation":["staff"],"uid":["rohe0002"]}
raises(MissingValue, "filter_on_attributes(ava, r, o)")
# ---------------------------------------------------------------------------
def test_filter_values_req_opt_4():
r = [Attribute(friendly_name="surName",
name="urn:oid:2.5.4.4",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
Attribute(friendly_name="givenName",
name="urn:oid:2.5.4.42",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
o = [Attribute(friendly_name="title",
name="urn:oid:2.5.4.12",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
acs = attribute_converter.ac_factory("attributemaps")
rava = attribute_converter.ava_fro(acs, r)
oava = attribute_converter.ava_fro(acs, o)
ava = { "sn":["Hedberg"], "givenName":["Roland"],
"eduPersonAffiliation":["staff"],"uid":["rohe0002"]}
ava = assertion.filter_on_demands(ava, rava, oava)
print ava
assert _eq(ava.keys(), ['givenName', 'sn'])
assert ava == {'givenName': ['Roland'], 'sn': ['Hedberg']}
# ---------------------------------------------------------------------------
@@ -455,3 +502,26 @@ def test_filter_ava_3():
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["dj@example.com"]
def test_filter_ava_4():
""" Return everything as default policy is used """
policy = Policy({
"default": {
"lifetime": {"minutes":15},
"attribute_restrictions": None # means all I have
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": [".*@example\.com$"],
}
}})
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], [])
assert _eq(ava.keys(), ['mail', 'givenName', 'surName'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])

View File

@@ -16,7 +16,7 @@ class TestAC():
def test_setup(self):
assert len(self.acs) == 2
assert _eq([a.format for a in self.acs],[BASIC_NF, URI_NF] )
assert _eq([a.name_format for a in self.acs],[BASIC_NF, URI_NF] )
def test_ava_fro_1(self):
@@ -97,4 +97,36 @@ class TestAC():
assert a1["name_format"] == URI_NF
else:
assert False
def test_to_local_name(self):
attr = [saml.Attribute(friendly_name="surName",
name="urn:oid:2.5.4.4",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
saml.Attribute(friendly_name="efternamn",
name="urn:oid:2.5.4.42",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
saml.Attribute(friendly_name="titel",
name="urn:oid:2.5.4.12",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
lan = [attribute_converter.to_local_name(self.acs, a) for a in attr]
assert _eq(lan, ['sn', 'givenName', 'title'])
def test_ava_fro_1(self):
attr = [saml.Attribute(friendly_name="surName",
name="urn:oid:2.5.4.4",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
saml.Attribute(friendly_name="efternamn",
name="urn:oid:2.5.4.42",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"),
saml.Attribute(friendly_name="titel",
name="urn:oid:2.5.4.12",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")]
result = attribute_converter.ava_fro(self.acs, attr)
print result
assert result == {'givenName': [], 'sn': [], 'title': []}

View File

@@ -6,6 +6,8 @@ from saml2 import BINDING_SOAP
from saml2 import md, saml, samlp
from saml2 import time_util
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.attribute_converter import ac_factory
from py.test import raises
SWAMI_METADATA = "swamid-kalmar-1.0.xml"
@@ -31,8 +33,10 @@ def _read_lines(name):
name = "tests/"+name
return open(name).readlines()
ATTRCONV = ac_factory("attributemaps")
def test_swami_1():
md = metadata.MetaData()
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(SWAMI_METADATA),"-")
print len(md.entity)
assert len(md.entity)
@@ -49,9 +53,19 @@ def test_swami_1():
ssocerts = md.certs('https://idp.umu.se/saml2/idp/SSOService.php')
print ssocerts
assert len(ssocerts) == 1
print md.wants
assert _eq(md._wants.keys(),['https://connect.sunet.se/shibboleth',
'https://sp.swamid.se/shibboleth'])
assert _eq(md.wants('https://sp.swamid.se/shibboleth')[1].keys(),
["eduPersonPrincipalName"])
assert md.wants('https://sp.swamid.se/shibboleth')[0] == {}
assert _eq(md.wants('https://connect.sunet.se/shibboleth')[1].keys(),
['mail', 'givenName', 'eduPersonPrincipalName', 'sn',
'eduPersonScopedAffiliation'])
assert md.wants('https://connect.sunet.se/shibboleth')[0] == {}
def test_incommon_1():
md = metadata.MetaData()
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(INCOMMON_METADATA),"-")
print len(md.entity)
assert len(md.entity) == 442
@@ -63,10 +77,11 @@ def test_incommon_1():
idp_sso = md.single_sign_on_services('urn:mace:incommon:alaska.edu')
assert len(idp_sso) == 1
print idp_sso
print md.wants
assert idp_sso == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
def test_example():
md = metadata.MetaData()
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(EXAMPLE_METADATA), "-")
print len(md.entity)
assert len(md.entity) == 1
@@ -81,7 +96,7 @@ def test_example():
assert len(certs[0]) == 2
def test_switch_1():
md = metadata.MetaData()
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(SWITCH_METADATA), "-")
print len(md.entity)
assert len(md.entity) == 90
@@ -109,7 +124,7 @@ def test_switch_1():
assert len(dual) == 0
def test_sp_metadata():
md = metadata.MetaData()
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(SP_METADATA), "-")
print md.entity
@@ -127,12 +142,20 @@ def test_sp_metadata():
assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42',
'urn:oid:0.9.2342.19200300.100.1.3'])
assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail'])
print md.wants
assert md._wants.keys() == ['urn:mace:umu.se:saml:roland:sp']
assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[0].keys(),
["mail", "givenName", "sn"])
assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[1].keys(),
["title"])
KALMAR2_URL = "https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2"
KALMAR2_CERT = "kalmar2.pem"
def test_import_external_metadata(xmlsec):
md = metadata.MetaData(xmlsec)
md = metadata.MetaData(xmlsec,attrconv=ATTRCONV)
md.import_external_metadata(KALMAR2_URL, KALMAR2_CERT)
print len(md.entity)

View File

@@ -17,7 +17,7 @@ def _eq(l1,l2):
class TestAuthnResponse:
def setup_class(self):
server = Server("idp.config")
name_id = server.id.temporary_nameid()
name_id = server.ident.temporary_nameid()
self._resp_ = server.do_response(
"http://lingon.catalogix.se:8087/", # consumer_url

View File

@@ -167,7 +167,7 @@ class TestServer1():
assert response["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response_with_identity(self):
name_id = self.server.id.temporary_nameid()
name_id = self.server.ident.temporary_nameid()
resp = self.server.do_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
@@ -276,7 +276,7 @@ class TestServer1():
assert len(astate.attribute) == 3
def test_signed_response(self):
name_id = self.server.id.temporary_nameid()
name_id = self.server.ident.temporary_nameid()
signed_resp = self.server.do_response(
"http://lingon.catalogix.se:8087/", # consumer_url
@@ -316,8 +316,7 @@ class TestServer2():
print aa_policy.__dict__
print self.server.conf["service"]
response = self.server.do_aa_response( "http://example.com/sp/", "aaa",
"urn:mace:example.com:sp:1", IDENTITY.copy(),
issuer = self.server.conf["entityid"])
"urn:mace:example.com:sp:1", IDENTITY.copy())
assert response != None
assert response.destination == "http://example.com/sp/"

View File

@@ -60,7 +60,7 @@ class TestClient:
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
"http://vo.example.com/sp1",
"https://idp.example.com/idp/",
nameformat=saml.NAMEID_FORMAT_PERSISTENT)
nameid_format=saml.NAMEID_FORMAT_PERSISTENT)
str = "%s" % req.to_string()
print str
assert str == REQ1 % req.issue_instant
@@ -89,7 +89,7 @@ class TestClient:
("urn:oid:1.2.840.113549.1.9.1",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"):None,
},
nameformat=saml.NAMEID_FORMAT_PERSISTENT)
nameid_format=saml.NAMEID_FORMAT_PERSISTENT)
print req.to_string()
assert req.destination == "https://idp.example.com/idp/"
@@ -123,7 +123,7 @@ class TestClient:
"_e7b68a04488f715cda642fbdd90099f5",
"urn:mace:umu.se:saml/rolandsp",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
nameformat=saml.NAMEID_FORMAT_TRANSIENT )
nameid_format=saml.NAMEID_FORMAT_TRANSIENT )
assert isinstance(req, samlp.AttributeQuery)
assert req.destination == "https://aai-demo-idp.switch.ch/idp/shibboleth"
@@ -140,7 +140,7 @@ class TestClient:
"_e7b68a04488f715cda642fbdd90099f5",
"urn:mace:umu.se:saml/rolandsp",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
format=saml.NAMEID_FORMAT_TRANSIENT)
nameid_format=saml.NAMEID_FORMAT_TRANSIENT)
# since no one is answering on the other end
assert req == None
@@ -231,7 +231,7 @@ class TestClient:
self.client.config["xmlsec_binary"],
self.client.config["metadata"])
except Exception: # missing certificate
self.client.sc.verify_signature(ar_str, node_name=class_name(ar))
self.client.sec.verify_signature(ar_str, node_name=class_name(ar))
def test_response(self):
ava = { "givenName": ["Derek"], "surname": ["Jeter"],