From 5911db64a0e232dce2d4a8cafa0163df085a1fae Mon Sep 17 00:00:00 2001 From: Roland Hedberg Date: Sat, 17 Apr 2010 22:12:33 +0200 Subject: [PATCH] Fixed so all tests works --- src/saml2/attribute_converter.py | 6 +-- src/saml2/metadata.py | 4 +- tests/server.config | 2 + tests/test_20_assertion.py | 70 ++++++++++++++++++++++++++++ tests/test_21_attribute_converter.py | 36 +++++++++++++- tests/test_30_metadata.py | 39 ++++++++++++---- tests/test_44_authnresp.py | 2 +- tests/test_50_server.py | 7 ++- tests/test_51_client.py | 10 ++-- 9 files changed, 151 insertions(+), 25 deletions(-) diff --git a/src/saml2/attribute_converter.py b/src/saml2/attribute_converter.py index aece875..418e872 100644 --- a/src/saml2/attribute_converter.py +++ b/src/saml2/attribute_converter.py @@ -42,7 +42,7 @@ def ava_fro(acs, statement): if statement == []: return {} - acsdic = dict([(ac.format, ac) for ac in acs]) + acsdic = dict([(ac.name_format, ac) for ac in acs]) acsdic[None] = acsdic[NAME_FORMAT_URI] return dict([acsdic[a.name_format].ava_fro(a) for a in statement]) @@ -62,7 +62,7 @@ def to_local(acs, statement): def from_local(acs, ava, name_format): for aconv in acs: #print ac.format, name_format - if aconv.format == name_format: + if aconv.name_format == name_format: #print "Found a name_form converter" return aconv.to(ava) @@ -77,7 +77,7 @@ def from_local_name(acs, attr, name_format): """ for aconv in acs: #print ac.format, name_format - if aconv.format == name_format: + if aconv.name_format == name_format: #print "Found a name_form converter" return aconv.to_format(attr) return attr diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index 2b8286f..d930904 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -27,6 +27,7 @@ from saml2 import md, BINDING_HTTP_POST from saml2 import samlp, BINDING_HTTP_REDIRECT, BINDING_SOAP #from saml2.time_util import str_to_time from saml2.sigver import make_temp, cert_from_key_info, verify_signature +from saml2.sigver import pem_format from saml2.time_util import valid from saml2.attribute_converter import ava_fro @@ -107,8 +108,7 @@ class MetaData(object): for key_desc in tssd.key_descriptor: certs.extend(cert_from_key_info(key_desc.key_info)) - certs = [make_temp(pem_format(cert), ".pem", - False) for c in certs] + certs = [make_temp(pem_format(c), ".pem", False) for c in certs] for acs in tssd.attribute_consuming_service: for attr in acs.requested_attribute: diff --git a/tests/server.config b/tests/server.config index f4e0103..6fdd5cc 100644 --- a/tests/server.config +++ b/tests/server.config @@ -4,6 +4,8 @@ "sp":{ "name" : "urn:mace:example.com:saml:roland:sp", "url": "http://lingon.catalogix.se:8087/", + "required_attributes": ["surName", "givenName", "mail"], + "optional_attributes": ["title"], "idp":{ "entity_id": ["urn:mace:example.com:saml:roland:idp"], }, diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py index 4816429..b897474 100644 --- a/tests/test_20_assertion.py +++ b/tests/test_20_assertion.py @@ -3,6 +3,7 @@ from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue from saml2.assertion import Policy, Assertion, filter_on_attributes from saml2.assertion import filter_attribute_value_assertions from saml2.utils import MissingValue +from saml2 import attribute_converter from py.test import raises @@ -357,6 +358,52 @@ def test_filter_values_req_opt_1(): ava = filter_on_attributes(ava, [r], [o]) assert ava.keys() == ["serialNumber"] assert _eq(ava["serialNumber"], ["12345","54321"]) + +def test_filter_values_req_opt_2(): + r = [Attribute(friendly_name="surName", + name="urn:oid:2.5.4.4", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + Attribute(friendly_name="givenName", + name="urn:oid:2.5.4.42", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + Attribute(friendly_name="mail", + name="urn:oid:0.9.2342.19200300.100.1.3", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + o = [Attribute(friendly_name="title", + name="urn:oid:2.5.4.12", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + + + ava = { "surname":["Hedberg"], "givenName":["Roland"], + "eduPersonAffiliation":["staff"],"uid":["rohe0002"]} + + raises(MissingValue, "filter_on_attributes(ava, r, o)") + +# --------------------------------------------------------------------------- + +def test_filter_values_req_opt_4(): + r = [Attribute(friendly_name="surName", + name="urn:oid:2.5.4.4", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + Attribute(friendly_name="givenName", + name="urn:oid:2.5.4.42", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + o = [Attribute(friendly_name="title", + name="urn:oid:2.5.4.12", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + + acs = attribute_converter.ac_factory("attributemaps") + + rava = attribute_converter.ava_fro(acs, r) + oava = attribute_converter.ava_fro(acs, o) + + ava = { "sn":["Hedberg"], "givenName":["Roland"], + "eduPersonAffiliation":["staff"],"uid":["rohe0002"]} + + ava = assertion.filter_on_demands(ava, rava, oava) + print ava + assert _eq(ava.keys(), ['givenName', 'sn']) + assert ava == {'givenName': ['Roland'], 'sn': ['Hedberg']} # --------------------------------------------------------------------------- @@ -455,3 +502,26 @@ def test_filter_ava_3(): assert _eq(ava.keys(), ["mail"]) assert ava["mail"] == ["dj@example.com"] + +def test_filter_ava_4(): + """ Return everything as default policy is used """ + policy = Policy({ + "default": { + "lifetime": {"minutes":15}, + "attribute_restrictions": None # means all I have + }, + "urn:mace:example.com:saml:roland:sp": { + "lifetime": {"minutes": 5}, + "attribute_restrictions":{ + "mail": [".*@example\.com$"], + } + }}) + + ava = { "givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com", "dj@example.com"]} + + # No restrictions apply + ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], []) + + assert _eq(ava.keys(), ['mail', 'givenName', 'surName']) + assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"]) diff --git a/tests/test_21_attribute_converter.py b/tests/test_21_attribute_converter.py index 985b365..17a51e8 100644 --- a/tests/test_21_attribute_converter.py +++ b/tests/test_21_attribute_converter.py @@ -16,7 +16,7 @@ class TestAC(): def test_setup(self): assert len(self.acs) == 2 - assert _eq([a.format for a in self.acs],[BASIC_NF, URI_NF] ) + assert _eq([a.name_format for a in self.acs],[BASIC_NF, URI_NF] ) def test_ava_fro_1(self): @@ -97,4 +97,36 @@ class TestAC(): assert a1["name_format"] == URI_NF else: assert False - \ No newline at end of file + + def test_to_local_name(self): + + attr = [saml.Attribute(friendly_name="surName", + name="urn:oid:2.5.4.4", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + saml.Attribute(friendly_name="efternamn", + name="urn:oid:2.5.4.42", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + saml.Attribute(friendly_name="titel", + name="urn:oid:2.5.4.12", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + + lan = [attribute_converter.to_local_name(self.acs, a) for a in attr] + + assert _eq(lan, ['sn', 'givenName', 'title']) + + def test_ava_fro_1(self): + + attr = [saml.Attribute(friendly_name="surName", + name="urn:oid:2.5.4.4", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + saml.Attribute(friendly_name="efternamn", + name="urn:oid:2.5.4.42", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"), + saml.Attribute(friendly_name="titel", + name="urn:oid:2.5.4.12", + name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")] + + result = attribute_converter.ava_fro(self.acs, attr) + + print result + assert result == {'givenName': [], 'sn': [], 'title': []} \ No newline at end of file diff --git a/tests/test_30_metadata.py b/tests/test_30_metadata.py index ab798e1..4345813 100644 --- a/tests/test_30_metadata.py +++ b/tests/test_30_metadata.py @@ -6,6 +6,8 @@ from saml2 import BINDING_SOAP from saml2 import md, saml, samlp from saml2 import time_util from saml2.saml import NAMEID_FORMAT_TRANSIENT +from saml2.attribute_converter import ac_factory + from py.test import raises SWAMI_METADATA = "swamid-kalmar-1.0.xml" @@ -31,8 +33,10 @@ def _read_lines(name): name = "tests/"+name return open(name).readlines() +ATTRCONV = ac_factory("attributemaps") + def test_swami_1(): - md = metadata.MetaData() + md = metadata.MetaData(attrconv=ATTRCONV) md.import_metadata(_read_file(SWAMI_METADATA),"-") print len(md.entity) assert len(md.entity) @@ -49,9 +53,19 @@ def test_swami_1(): ssocerts = md.certs('https://idp.umu.se/saml2/idp/SSOService.php') print ssocerts assert len(ssocerts) == 1 - + print md.wants + assert _eq(md._wants.keys(),['https://connect.sunet.se/shibboleth', + 'https://sp.swamid.se/shibboleth']) + assert _eq(md.wants('https://sp.swamid.se/shibboleth')[1].keys(), + ["eduPersonPrincipalName"]) + assert md.wants('https://sp.swamid.se/shibboleth')[0] == {} + assert _eq(md.wants('https://connect.sunet.se/shibboleth')[1].keys(), + ['mail', 'givenName', 'eduPersonPrincipalName', 'sn', + 'eduPersonScopedAffiliation']) + assert md.wants('https://connect.sunet.se/shibboleth')[0] == {} + def test_incommon_1(): - md = metadata.MetaData() + md = metadata.MetaData(attrconv=ATTRCONV) md.import_metadata(_read_file(INCOMMON_METADATA),"-") print len(md.entity) assert len(md.entity) == 442 @@ -63,10 +77,11 @@ def test_incommon_1(): idp_sso = md.single_sign_on_services('urn:mace:incommon:alaska.edu') assert len(idp_sso) == 1 print idp_sso + print md.wants assert idp_sso == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO'] - + def test_example(): - md = metadata.MetaData() + md = metadata.MetaData(attrconv=ATTRCONV) md.import_metadata(_read_file(EXAMPLE_METADATA), "-") print len(md.entity) assert len(md.entity) == 1 @@ -81,7 +96,7 @@ def test_example(): assert len(certs[0]) == 2 def test_switch_1(): - md = metadata.MetaData() + md = metadata.MetaData(attrconv=ATTRCONV) md.import_metadata(_read_file(SWITCH_METADATA), "-") print len(md.entity) assert len(md.entity) == 90 @@ -109,7 +124,7 @@ def test_switch_1(): assert len(dual) == 0 def test_sp_metadata(): - md = metadata.MetaData() + md = metadata.MetaData(attrconv=ATTRCONV) md.import_metadata(_read_file(SP_METADATA), "-") print md.entity @@ -127,12 +142,20 @@ def test_sp_metadata(): assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42', 'urn:oid:0.9.2342.19200300.100.1.3']) assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail']) + print md.wants + + assert md._wants.keys() == ['urn:mace:umu.se:saml:roland:sp'] + assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[0].keys(), + ["mail", "givenName", "sn"]) + assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[1].keys(), + ["title"]) + KALMAR2_URL = "https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2" KALMAR2_CERT = "kalmar2.pem" def test_import_external_metadata(xmlsec): - md = metadata.MetaData(xmlsec) + md = metadata.MetaData(xmlsec,attrconv=ATTRCONV) md.import_external_metadata(KALMAR2_URL, KALMAR2_CERT) print len(md.entity) diff --git a/tests/test_44_authnresp.py b/tests/test_44_authnresp.py index d1f8ffa..f9e831e 100644 --- a/tests/test_44_authnresp.py +++ b/tests/test_44_authnresp.py @@ -17,7 +17,7 @@ def _eq(l1,l2): class TestAuthnResponse: def setup_class(self): server = Server("idp.config") - name_id = server.id.temporary_nameid() + name_id = server.ident.temporary_nameid() self._resp_ = server.do_response( "http://lingon.catalogix.se:8087/", # consumer_url diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 79ecb69..8f42440 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -167,7 +167,7 @@ class TestServer1(): assert response["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" def test_sso_response_with_identity(self): - name_id = self.server.id.temporary_nameid() + name_id = self.server.ident.temporary_nameid() resp = self.server.do_response( "http://localhost:8087/", # consumer_url "12", # in_response_to @@ -276,7 +276,7 @@ class TestServer1(): assert len(astate.attribute) == 3 def test_signed_response(self): - name_id = self.server.id.temporary_nameid() + name_id = self.server.ident.temporary_nameid() signed_resp = self.server.do_response( "http://lingon.catalogix.se:8087/", # consumer_url @@ -316,8 +316,7 @@ class TestServer2(): print aa_policy.__dict__ print self.server.conf["service"] response = self.server.do_aa_response( "http://example.com/sp/", "aaa", - "urn:mace:example.com:sp:1", IDENTITY.copy(), - issuer = self.server.conf["entityid"]) + "urn:mace:example.com:sp:1", IDENTITY.copy()) assert response != None assert response.destination == "http://example.com/sp/" diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 991ca5d..1bfa1cf 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -60,7 +60,7 @@ class TestClient: "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", "http://vo.example.com/sp1", "https://idp.example.com/idp/", - nameformat=saml.NAMEID_FORMAT_PERSISTENT) + nameid_format=saml.NAMEID_FORMAT_PERSISTENT) str = "%s" % req.to_string() print str assert str == REQ1 % req.issue_instant @@ -89,7 +89,7 @@ class TestClient: ("urn:oid:1.2.840.113549.1.9.1", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"):None, }, - nameformat=saml.NAMEID_FORMAT_PERSISTENT) + nameid_format=saml.NAMEID_FORMAT_PERSISTENT) print req.to_string() assert req.destination == "https://idp.example.com/idp/" @@ -123,7 +123,7 @@ class TestClient: "_e7b68a04488f715cda642fbdd90099f5", "urn:mace:umu.se:saml/rolandsp", "https://aai-demo-idp.switch.ch/idp/shibboleth", - nameformat=saml.NAMEID_FORMAT_TRANSIENT ) + nameid_format=saml.NAMEID_FORMAT_TRANSIENT ) assert isinstance(req, samlp.AttributeQuery) assert req.destination == "https://aai-demo-idp.switch.ch/idp/shibboleth" @@ -140,7 +140,7 @@ class TestClient: "_e7b68a04488f715cda642fbdd90099f5", "urn:mace:umu.se:saml/rolandsp", "https://aai-demo-idp.switch.ch/idp/shibboleth", - format=saml.NAMEID_FORMAT_TRANSIENT) + nameid_format=saml.NAMEID_FORMAT_TRANSIENT) # since no one is answering on the other end assert req == None @@ -231,7 +231,7 @@ class TestClient: self.client.config["xmlsec_binary"], self.client.config["metadata"]) except Exception: # missing certificate - self.client.sc.verify_signature(ar_str, node_name=class_name(ar)) + self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) def test_response(self): ava = { "givenName": ["Derek"], "surname": ["Jeter"],