From e9adbd609ee428dfac685ed582e7dd59e74956f9 Mon Sep 17 00:00:00 2001 From: Roland Hedberg Date: Fri, 27 Nov 2009 11:07:46 +0100 Subject: [PATCH] Got everything working again --- src/s2repoze/plugins/sp.py | 4 +- src/saml2/client.py | 2 +- src/saml2/metadata.py | 20 ++- src/saml2/server.py | 39 ++--- src/saml2/utils.py | 79 +++++++++- tests/test_client.py | 10 ++ tests/test_metadata.py | 21 +++ tests/test_server.py | 285 ++++++++++++------------------------ tests/test_utils.py | 288 ++++++++++++++++++++++++++++++++++++- 9 files changed, 516 insertions(+), 232 deletions(-) diff --git a/src/s2repoze/plugins/sp.py b/src/s2repoze/plugins/sp.py index 449f2d6..306fded 100644 --- a/src/s2repoze/plugins/sp.py +++ b/src/s2repoze/plugins/sp.py @@ -56,7 +56,7 @@ class SAML2Plugin(FormPluginBase): def __init__(self, rememberer_name, saml_conf_file, virtual_organization, cache, debug): - FormPluginBase.__init__(self, rememberer_name) + FormPluginBase.__init__(self) self.rememberer_name = rememberer_name self.debug = debug @@ -134,7 +134,7 @@ class SAML2Plugin(FormPluginBase): self.srv["name"], relay_state=came_from, log=logger, - vo=vorg) + vorg=vorg) self.outstanding_authn[sid] = came_from if self.debug: diff --git a/src/saml2/client.py b/src/saml2/client.py index eb4f1ae..4471f6f 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -497,7 +497,7 @@ class Saml2Client: session_id = sid() request = self.create_attribute_query(session_id, subject_id, issuer, destination, attribute, sp_name_qualifier, - name_qualifier, format=format) + name_qualifier, nameformat=format) log and log.info("Request, created: %s" % request) diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index d5fd3e8..900027a 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -331,25 +331,21 @@ class MetaData(object): return name - def requests(self, entity_id): + def attribute_consumer(self, entity_id): try: ssos = self.entity[entity_id]["sp_sso"] except KeyError: return ([], []) - try: - requested = ssos["attribute_consuming_service"][ - "requested_attribute"] - except KeyError: - return ([], []) - required = [] optional = [] - for attr in requested: - if "is_required" in attr and attr["is_required"] == "true": - required.append(attr) - else: - optional.append(attr) + # What if there is more than one ? Can't be ? + for acs in ssos[0].attribute_consuming_service: + for attr in acs.requested_attribute: + if attr.is_required == "true": + required.append(attr) + else: + optional.append(attr) return (required, optional) \ No newline at end of file diff --git a/src/saml2/server.py b/src/saml2/server.py index 4202c2c..6f106ca 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -214,7 +214,6 @@ class Server(object): """ attr_statement = do_attribute_statement(identity) - # start using now and for a hour conds = kd_conditions( not_before=instant(), @@ -307,29 +306,34 @@ class Server(object): return make_instance(samlp.Response, tmp) - def filter_ava(self, ava, sp_entity_id, request=None, role=""): + def filter_ava(self, ava, sp_entity_id, required, optional, role=""): """ What attribute and attribute values returns depends on what the SP has said it wants in the request or in the metadata file and what the IdP/AA wants to release. An assumption is that what the SP asks for overrides whatever is in the metadata. """ + + #print self.conf["service"][role] + #print self.conf["service"][role]["assertions"][sp_entity_id] - if not request: - #any rules for this SP ? + try: + restrictions = self.conf["service"][role][ + "assertions"][sp_entity_id][ + "attribute_restrictions"] + except KeyError: try: - restrictions = self.conf["service"][role][sp_entity_id][ - "attribute_restrictions"] + restrictions = self.conf["service"][role]["assertions"][ + "default"]["attribute_restrictions"] except KeyError: - try: - restrictions = self.conf["service"][role]["default"][ - "attribute_restrictions"] - except KeyError: - restrictions = None - - if restrictions: - ava = filter_attribute_value_assertions(ava, - restrictions) - + restrictions = None + + #print restrictions + if restrictions: + ava = filter_attribute_value_assertions(ava, restrictions) + + if required: + pass + return ava def authn_response(self, identity, in_response_to, destination, spid, @@ -364,7 +368,8 @@ class Server(object): sp_name_qualifier=name_id_policy.sp_name_qualifier) # Do attribute filtering - identity = self.filter_ava( identity, spid, None,"idp") + (required,optional) = self.conf["metadata"].attribute_consumer(spid) + identity = self.filter_ava( identity, spid, required, optional, "idp") resp = self.do_sso_response( destination, # consumer_url diff --git a/src/saml2/utils.py b/src/saml2/utils.py index 8e317ad..fc24a37 100644 --- a/src/saml2/utils.py +++ b/src/saml2/utils.py @@ -23,6 +23,9 @@ class UnsupportedBinding(Exception): class OtherError(Exception): pass +class MissingValue(Exception): + pass + EXCEPTION2STATUS = { VersionMismatch: samlp.STATUS_VERSION_MISMATCH, UnknownPrincipal: samlp.STATUS_UNKNOWN_PRINCIPAL, @@ -72,7 +75,7 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False): :return: Value class instance """ cinst = None - #print "_make_val: %s %s (%s)" % (prop,val,klass) + #print "_make_val: %s %s (%s) [%s]" % (prop,val,klass,part) if isinstance(val, bool): cinst = klass(text="%s" % val) elif isinstance(val, int): @@ -92,9 +95,9 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False): if part: return cinst else: - if cis: + if cinst: cis = [cinst] - setattr(klass_inst, prop, cis) + setattr(klass_inst, prop, cis) def make_instance(klass, spec): """ @@ -104,8 +107,11 @@ def make_instance(klass, spec): :param spec: Information to be placed in the instance :return: The instance """ + #print "----- %s -----" % klass + #print "..... %s ....." % spec klass_inst = klass() for prop in klass.c_attributes.values(): + #print "# %s" % (prop) if prop in spec: if isinstance(spec[prop], bool): setattr(klass_inst, prop,"%s" % spec[prop]) @@ -117,12 +123,15 @@ def make_instance(klass, spec): setattr(klass_inst, "text", spec["text"]) for prop, klass in klass.c_children.values(): + #print "## %s, %s" % (prop, klass) if prop in spec: + #print "%s" % spec[prop] if isinstance(klass, list): # means there can be a list of values make_vals(spec[prop], klass[0], klass_inst, prop) else: cis = make_vals(spec[prop], klass, klass_inst, prop, True) setattr(klass_inst, prop, cis) + #+print ">>> %s <<<" % klass_inst return klass_inst def parse_attribute_map(filenames): @@ -186,6 +195,68 @@ def identity_attribute(form, attribute, forward_map=None): # default is name return attribute.name +def filter_values(vals, attributes, required=True): + reqval = [] + for rval in attributes: + for val in vals: + if rval.text == val: + reqval.append(val) + break + + if required: + if len(reqval) == len(attributes): + return reqval + else: + raise MissingValue("Required attribute value missing") + else: + return reqval + +def filter_required(ava, required): + """ + :param required: list of RequestedAttribute instances + """ + res = {} + for attr in required: + if attr.name in ava: + if required.attribute_value: + res[attr.name] = filter_values(ava[attr.name], + required.attribute_value) + else: + res[attr.name] = ava[attr.name] + elif attr.friendly_name in ava: + if attr.attribute_value: + res[attr.friendly_name] = filter_values( + ava[attr.friendly_name], + attr.attribute_value) + else: + res[attr.friendly_name] = ava[attr.friendly_name] + else: + raise MissingValue("Required attribute missing") + + return res + +def filter_optional(ava, optional): + """ + :param optional: list of RequestedAttribute instances + """ + res = {} + for attr in optional: + if attr.name in ava: + if optional.attribute_value: + res[attr.name] = filter_values(ava[attr.name], + optional.attribute_value, False) + else: + res[attr.name] = ava[attr.name] + elif attr.friendly_name in ava: + if attr.attribute_value: + res[attr.friendly_name] = filter_values( + ava[attr.friendly_name], + attr.attribute_value, False) + else: + res[attr.friendly_name] = ava[attr.friendly_name] + + return res + #---------------------------------------------------------------------------- def properties(klass): @@ -285,7 +356,7 @@ def do_attributes(identity): for key, val in identity.items(): dic = {} if isinstance(val, basestring): - attrval = kd_attribute_value(val) + attrval = [kd_attribute_value(val)] elif isinstance(val, list): attrval = [kd_attribute_value(v) for v in val] elif val == None: diff --git a/tests/test_client.py b/tests/test_client.py index 7184156..dff00d3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -180,6 +180,16 @@ class TestClient: assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" + def test_attribute_query(self): + req = self.client.attribute_query( + "_e7b68a04488f715cda642fbdd90099f5", + "urn:mace:umu.se:saml/rolandsp", + "https://aai-demo-idp.switch.ch/idp/shibboleth", + format=saml.NAMEID_FORMAT_TRANSIENT) + + # since no one is answering on the other end + assert req == None + def test_idp_entry(self): idp_entry = utils.make_instance( samlp.IDPEntry, self.client.idp_entry(name="Umeå Universitet", diff --git a/tests/test_metadata.py b/tests/test_metadata.py index ce5b433..a0544f7 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -10,6 +10,7 @@ SWAMI_METADATA = "tests/urn-mace-swami.se-swamid-test-1.0-metadata.xml" INCOMMON_METADATA = "tests/InCommon-metadata.xml" EXAMPLE_METADATA = "tests/metadata_example.xml" SWITCH_METADATA = "tests/metadata.aaitest.xml" +SP_METADATA = "tests/metasp.xml" def _eq(l1,l2): return set(l1) == set(l2) @@ -92,6 +93,26 @@ def test_switch_1(): print len(dual) assert len(dual) == 0 +def test_sp_metadata(): + md = metadata.MetaData() + md.import_metadata(open(SP_METADATA).read()) + + print md.entity + assert len(md.entity) == 1 + assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp'] + assert md.entity['urn:mace:umu.se:saml:roland:sp'].keys() == [ + "organization","sp_sso"] + print md.entity['urn:mace:umu.se:saml:roland:sp']["sp_sso"][0].keyswv() + (req,opt) = md.attribute_consumer('urn:mace:umu.se:saml:roland:sp') + print req + assert len(req) == 3 + assert len(opt) == 1 + assert opt[0].name == 'urn:oid:2.5.4.12' + assert opt[0].friendly_name == 'title' + assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42', + 'urn:oid:0.9.2342.19200300.100.1.3']) + assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail']) + # ------------ Constructing metaval ---------------------------------------- def test_construct_organisation_name(): diff --git a/tests/test_server.py b/tests/test_server.py index 6f10d73..1254116 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -4,207 +4,18 @@ from saml2.server import Server from saml2 import server from saml2 import samlp, saml, client, utils -from saml2.utils import make_instance, OtherError, UnknownPricipal +from saml2.utils import make_instance, OtherError from saml2.utils import do_attribute_statement from py.test import raises import shelve - -SUCCESS_STATUS = """ -""" - -ERROR_STATUS = """ -Error resolving principal""" +import re def _eq(l1,l2): return set(l1) == set(l2) -def test_status_success(): - stat = utils.kd_status( - status_code=utils.kd_status_code( - value=samlp.STATUS_SUCCESS)) - status = make_instance( samlp.Status, stat) - status_text = "%s" % status - assert status_text == SUCCESS_STATUS - assert status.status_code.value == samlp.STATUS_SUCCESS - -def test_success_status(): - stat = utils.kd_success_status() - status = make_instance(samlp.Status, stat) - status_text = "%s" % status - assert status_text == SUCCESS_STATUS - assert status.status_code.value == samlp.STATUS_SUCCESS - -def test_error_status(): - stat = utils.kd_status( - status_message=utils.kd_status_message( - "Error resolving principal"), - status_code=utils.kd_status_code( - value=samlp.STATUS_RESPONDER, - status_code=utils.kd_status_code( - value=samlp.STATUS_UNKNOWN_PRINCIPAL))) - - status_text = "%s" % make_instance( samlp.Status, stat ) - print status_text - assert status_text == ERROR_STATUS - -def test_status_from_exception(): - e = UnknownPricipal("Error resolving principal") - stat = utils.kd_status_from_exception(e) - status_text = "%s" % make_instance( samlp.Status, stat ) - - assert status_text == ERROR_STATUS - -def test_attribute_statement(): - astat = do_attribute_statement({"surName":"Jeter", - "givenName":"Derek"}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) == 2 - attr0 = statement.attribute[0] - assert _eq(attr0.keyswv(), ["name","attribute_value"]) - assert len(attr0.attribute_value) == 1 - attr1 = statement.attribute[1] - assert _eq(attr1.keyswv(), ["name","attribute_value"]) - assert len(attr1.attribute_value) == 1 - if attr0.name == "givenName": - assert attr0.attribute_value[0].text == "Derek" - assert attr1.name == "surName" - assert attr1.attribute_value[0].text == "Jeter" - else: - assert attr0.name == "surName" - assert attr0.attribute_value[0].text == "Jeter" - assert attr1.name == "givenName" - assert attr1.attribute_value[0].text == "Derek" - -def test_audience(): - aud_restr = make_instance( saml.AudienceRestriction, - utils.kd_audience_restriction( - audience=utils.kd_audience("urn:foo:bar"))) - - assert aud_restr.keyswv() == ["audience"] - assert aud_restr.audience.text == "urn:foo:bar" - -def test_conditions(): - conds_dict = utils.kd_conditions( - not_before="2009-10-30T07:58:10.852Z", - not_on_or_after="2009-10-30T08:03:10.852Z", - audience_restriction=utils.kd_audience_restriction( - audience=utils.kd_audience("urn:foo:bar"))) - - conditions = make_instance(saml.Conditions, conds_dict) - assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after", - "audience_restriction"]) - assert conditions.not_before == "2009-10-30T07:58:10.852Z" - assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z" - assert conditions.audience_restriction[0].audience.text == "urn:foo:bar" - -def test_value_1(): - #FriendlyName="givenName" Name="urn:oid:2.5.4.42" - # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri" - adict = utils.kd_attribute(name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI) - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name","name_format"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - -def test_value_2(): - adict = utils.kd_attribute(name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI, - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name","name_format","friendly_name"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - assert attribute.friendly_name == "givenName" - -def test_value_3(): - adict = utils.kd_attribute(attribute_value="Derek", - name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI, - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name", "name_format", - "friendly_name", "attribute_value"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - assert attribute.friendly_name == "givenName" - assert len(attribute.attribute_value) == 1 - assert attribute.attribute_value[0].text == "Derek" - -def test_value_4(): - adict = utils.kd_attribute(attribute_value="Derek", - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"]) - assert attribute.friendly_name == "givenName" - assert len(attribute.attribute_value) == 1 - assert attribute.attribute_value[0].text == "Derek" - -def test_do_attribute_statement_0(): - astat = do_attribute_statement({"vo_attr":"foobar"}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) == 1 - attr0 = statement.attribute[0] - assert _eq(attr0.keyswv(), ["name","attribute_value"]) - assert attr0.name == "vo_attr" - assert len(attr0.attribute_value) == 1 - assert attr0.attribute_value[0].text == "foobar" - -def test_do_attribute_statement(): - astat = do_attribute_statement({"surName":"Jeter", - "givenName":["Derek","Sanderson"]}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) == 2 - attr0 = statement.attribute[0] - assert _eq(attr0.keyswv(), ["name","attribute_value"]) - attr1 = statement.attribute[1] - assert _eq(attr1.keyswv(), ["name","attribute_value"]) - if attr0.name == "givenName": - assert len(attr0.attribute_value) == 2 - assert _eq([av.text for av in attr0.attribute_value], - ["Derek","Sanderson"]) - assert attr1.name == "surName" - assert attr1.attribute_value[0].text == "Jeter" - assert len(attr1.attribute_value) == 1 - else: - assert attr0.name == "surName" - assert attr0.attribute_value[0].text == "Jeter" - assert len(attr0.attribute_value) == 1 - assert attr1.name == "givenName" - assert len(attr1.attribute_value) == 2 - assert _eq([av.text for av in attr1.attribute_value], - ["Derek","Sanderson"]) - -def test_do_attribute_statement_multi(): - astat = do_attribute_statement( - {( "urn:oid:1.3.6.1.4.1.5923.1.1.1.7", - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", - "eduPersonEntitlement"):"Jeter"}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) - assert _eq(statement.attribute[0].keyswv(), - ["name","name_format","friendly_name","attribute_value"]) - attribute = statement.attribute[0] - assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" - assert attribute.name_format == ( - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri") - assert attribute.friendly_name == "eduPersonEntitlement" - -def test_subject(): - adict = utils.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT) - subject = make_instance(saml.Subject, adict) - assert _eq(subject.keyswv(),["text", "name_id"]) - assert subject.text == "_aaa" - assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT - - class TestServer(): def setup_class(self): - self.server = Server("tests/server.config") + self.server = Server("tests/idp.config") def test_issuer(self): issuer = make_instance( saml.Issuer, self.server.issuer()) @@ -232,7 +43,7 @@ class TestServer(): assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id', 'subject', 'issue_instant', 'version']) assert assertion.version == "2.0" - assert assertion.issuer.text == "urn:mace:example.com:saml:roland:sp" + assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp" # assert len(assertion.attribute_statement) == 1 attribute_statement = assertion.attribute_statement[0] @@ -279,7 +90,7 @@ class TestServer(): 'in_response_to', 'issue_instant', 'version', 'issuer', 'id']) assert response.version == "2.0" - assert response.issuer.text == "urn:mace:example.com:saml:roland:sp" + assert response.issuer.text == "urn:mace:example.com:saml:roland:idp" assert response.destination == "https:#www.example.com" assert response.in_response_to == "_012345" # @@ -392,3 +203,89 @@ class TestServer(): print pid1, pid2 assert pid1 == pid2 + + def test_filter_ava_0(self): + ava = { "givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"]} + + # No restrictions apply + ava = self.server.filter_ava(ava, + "urn:mace:example.com:saml:roland:sp", + [], [], "idp") + + assert _eq(ava.keys(), ["givenName", "surName", "mail"]) + assert ava["givenName"] == ["Derek"] + assert ava["surName"] == ["Jeter"] + assert ava["mail"] == ["derek@nyy.mlb.com"] + + + def test_filter_ava_1(self): + """ No mail address returned """ + self.server.conf["service"]["idp"]["assertions"][ + "urn:mace:example.com:saml:roland:sp"] = { + "lifetime": {"minutes": 5}, + "attribute_restrictions":{ + "givenName": None, + "surName": None, + } + } + + print self.server.conf["service"]["idp"]["assertions"] + + ava = { "givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"]} + + # No restrictions apply + ava = self.server.filter_ava(ava, + "urn:mace:example.com:saml:roland:sp", + [], [], "idp") + + assert _eq(ava.keys(), ["givenName", "surName"]) + assert ava["givenName"] == ["Derek"] + assert ava["surName"] == ["Jeter"] + + def test_filter_ava_2(self): + """ Only mail returned """ + self.server.conf["service"]["idp"]["assertions"][ + "urn:mace:example.com:saml:roland:sp"] = { + "lifetime": {"minutes": 5}, + "attribute_restrictions":{ + "mail": None, + } + } + + print self.server.conf["service"]["idp"]["assertions"] + + ava = { "givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"]} + + # No restrictions apply + ava = self.server.filter_ava(ava, + "urn:mace:example.com:saml:roland:sp", + [], [], "idp") + + assert _eq(ava.keys(), ["mail"]) + assert ava["mail"] == ["derek@nyy.mlb.com"] + + def test_filter_ava_3(self): + """ Only example.com mail addresses returned """ + self.server.conf["service"]["idp"]["assertions"][ + "urn:mace:example.com:saml:roland:sp"] = { + "lifetime": {"minutes": 5}, + "attribute_restrictions":{ + "mail": [re.compile(".*@example\.com$")], + } + } + + print self.server.conf["service"]["idp"]["assertions"] + + ava = { "givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com", "dj@example.com"]} + + # No restrictions apply + ava = self.server.filter_ava(ava, + "urn:mace:example.com:saml:roland:sp", + [], [], "idp") + + assert _eq(ava.keys(), ["mail"]) + assert ava["mail"] == ["dj@example.com"] diff --git a/tests/test_utils.py b/tests/test_utils.py index 3cdf62f..b71554a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,17 +1,222 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from saml2 import utils +from saml2 import utils, saml, samlp +from saml2.utils import do_attribute_statement, make_instance import zlib import base64 import gzip from saml2.sigver import make_temp from saml2.config import do_assertions -from saml2.saml import Attribute, NAME_FORMAT_URI +from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue +from py.test import raises + +SUCCESS_STATUS = """ +""" + +ERROR_STATUS = """ +Error resolving principal""" + def _eq(l1,l2): return set(l1) == set(l2) +def test_status_success(): + stat = utils.kd_status( + status_code=utils.kd_status_code( + value=samlp.STATUS_SUCCESS)) + status = make_instance( samlp.Status, stat) + status_text = "%s" % status + assert status_text == SUCCESS_STATUS + assert status.status_code.value == samlp.STATUS_SUCCESS + +def test_success_status(): + stat = utils.kd_success_status() + status = make_instance(samlp.Status, stat) + status_text = "%s" % status + assert status_text == SUCCESS_STATUS + assert status.status_code.value == samlp.STATUS_SUCCESS + +def test_error_status(): + stat = utils.kd_status( + status_message=utils.kd_status_message( + "Error resolving principal"), + status_code=utils.kd_status_code( + value=samlp.STATUS_RESPONDER, + status_code=utils.kd_status_code( + value=samlp.STATUS_UNKNOWN_PRINCIPAL))) + + status_text = "%s" % make_instance( samlp.Status, stat ) + print status_text + assert status_text == ERROR_STATUS + +def test_status_from_exception(): + e = utils.UnknownPrincipal("Error resolving principal") + stat = utils.kd_status_from_exception(e) + status_text = "%s" % make_instance( samlp.Status, stat ) + + assert status_text == ERROR_STATUS + +def test_attribute(): + attr = utils.do_attributes({"surName":"Jeter"}) + + assert len(attr) == 1 + inst = make_instance(saml.Attribute, attr[0]) + print inst + assert inst.name == "surName" + assert len(inst.attribute_value) == 1 + assert inst.attribute_value[0].text == "Jeter" + +def test_attribute_statement(): + astat = do_attribute_statement({"surName":"Jeter", + "givenName":"Derek"}) + print astat + statement = make_instance(saml.AttributeStatement,astat) + print statement + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 2 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + assert len(attr0.attribute_value) == 1 + attr1 = statement.attribute[1] + assert _eq(attr1.keyswv(), ["name","attribute_value"]) + assert len(attr1.attribute_value) == 1 + if attr0.name == "givenName": + assert attr0.attribute_value[0].text == "Derek" + assert attr1.name == "surName" + assert attr1.attribute_value[0].text == "Jeter" + else: + assert attr0.name == "surName" + assert attr0.attribute_value[0].text == "Jeter" + assert attr1.name == "givenName" + assert attr1.attribute_value[0].text == "Derek" + +def test_audience(): + aud_restr = make_instance( saml.AudienceRestriction, + utils.kd_audience_restriction( + audience=utils.kd_audience("urn:foo:bar"))) + + assert aud_restr.keyswv() == ["audience"] + assert aud_restr.audience.text == "urn:foo:bar" + +def test_conditions(): + conds_dict = utils.kd_conditions( + not_before="2009-10-30T07:58:10.852Z", + not_on_or_after="2009-10-30T08:03:10.852Z", + audience_restriction=utils.kd_audience_restriction( + audience=utils.kd_audience("urn:foo:bar"))) + + conditions = make_instance(saml.Conditions, conds_dict) + assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after", + "audience_restriction"]) + assert conditions.not_before == "2009-10-30T07:58:10.852Z" + assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z" + assert conditions.audience_restriction[0].audience.text == "urn:foo:bar" + +def test_value_1(): + #FriendlyName="givenName" Name="urn:oid:2.5.4.42" + # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + adict = utils.kd_attribute(name="urn:oid:2.5.4.42", + name_format=NAME_FORMAT_URI) + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name","name_format"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == saml.NAME_FORMAT_URI + +def test_value_2(): + adict = utils.kd_attribute(name="urn:oid:2.5.4.42", + name_format=NAME_FORMAT_URI, + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name","name_format","friendly_name"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + +def test_value_3(): + adict = utils.kd_attribute(attribute_value="Derek", + name="urn:oid:2.5.4.42", + name_format=NAME_FORMAT_URI, + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name", "name_format", + "friendly_name", "attribute_value"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + assert len(attribute.attribute_value) == 1 + assert attribute.attribute_value[0].text == "Derek" + +def test_value_4(): + adict = utils.kd_attribute(attribute_value="Derek", + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"]) + assert attribute.friendly_name == "givenName" + assert len(attribute.attribute_value) == 1 + assert attribute.attribute_value[0].text == "Derek" + +def test_do_attribute_statement_0(): + astat = do_attribute_statement({"vo_attr":"foobar"}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 1 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + assert attr0.name == "vo_attr" + assert len(attr0.attribute_value) == 1 + assert attr0.attribute_value[0].text == "foobar" + +def test_do_attribute_statement(): + astat = do_attribute_statement({"surName":"Jeter", + "givenName":["Derek","Sanderson"]}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 2 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + attr1 = statement.attribute[1] + assert _eq(attr1.keyswv(), ["name","attribute_value"]) + if attr0.name == "givenName": + assert len(attr0.attribute_value) == 2 + assert _eq([av.text for av in attr0.attribute_value], + ["Derek","Sanderson"]) + assert attr1.name == "surName" + assert attr1.attribute_value[0].text == "Jeter" + assert len(attr1.attribute_value) == 1 + else: + assert attr0.name == "surName" + assert attr0.attribute_value[0].text == "Jeter" + assert len(attr0.attribute_value) == 1 + assert attr1.name == "givenName" + assert len(attr1.attribute_value) == 2 + assert _eq([av.text for av in attr1.attribute_value], + ["Derek","Sanderson"]) + +def test_do_attribute_statement_multi(): + astat = do_attribute_statement( + {( "urn:oid:1.3.6.1.4.1.5923.1.1.1.7", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "eduPersonEntitlement"):"Jeter"}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) + assert _eq(statement.attribute[0].keyswv(), + ["name","name_format","friendly_name","attribute_value"]) + attribute = statement.attribute[0] + assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" + assert attribute.name_format == ( + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri") + assert attribute.friendly_name == "eduPersonEntitlement" + +def test_subject(): + adict = utils.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT) + subject = make_instance(saml.Subject, adict) + assert _eq(subject.keyswv(),["text", "name_id"]) + assert subject.text == "_aaa" + assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT + + def test_encode_decode(): package = "1234567890abcdefghijklmnopqrstuvxyzåäö" @@ -171,3 +376,82 @@ def test_identity_attribute_4(): assert utils.identity_attribute("name",a) == "urn:oid:2.5.4.5" # if there would be a map it would be serialNumber assert utils.identity_attribute("friendly",a) == "serialNumber" + +def test_filter_values_req_0(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber") + + required = [a] + ava = { "serialNumber": ["12345"]} + + ava = utils.filter_required(ava, required) + assert ava.keys() == ["serialNumber"] + assert ava["serialNumber"] == ["12345"] + +def test_filter_values_req_1(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber") + + required = [a] + ava = { "serialNumber": ["12345"], "givenName":["Lars"]} + + ava = utils.filter_required(ava, required) + assert ava.keys() == ["serialNumber"] + assert ava["serialNumber"] == ["12345"] + +def test_filter_values_req_2(): + a1 = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber") + a2 = Attribute(name="urn:oid:2.5.4.4", name_format=NAME_FORMAT_URI, + friendly_name="surName") + + required = [a1,a2] + ava = { "serialNumber": ["12345"], "givenName":["Lars"]} + + raises(utils.MissingValue, utils.filter_required, ava, required) + +def test_filter_values_req_3(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber", attribute_value=[ + AttributeValue(text="12345")]) + + required = [a] + ava = { "serialNumber": ["12345"]} + + ava = utils.filter_required(ava, required) + assert ava.keys() == ["serialNumber"] + assert ava["serialNumber"] == ["12345"] + +def test_filter_values_req_4(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber", attribute_value=[ + AttributeValue(text="54321")]) + + required = [a] + ava = { "serialNumber": ["12345"]} + + raises(utils.MissingValue, utils.filter_required, ava, required) + +def test_filter_values_req_5(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber", attribute_value=[ + AttributeValue(text="12345")]) + + required = [a] + ava = { "serialNumber": ["12345", "54321"]} + + ava = utils.filter_required(ava, required) + assert ava.keys() == ["serialNumber"] + assert ava["serialNumber"] == ["12345"] + +def test_filter_values_req_6(): + a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI, + friendly_name="serialNumber", attribute_value=[ + AttributeValue(text="54321")]) + + required = [a] + ava = { "serialNumber": ["12345", "54321"]} + + ava = utils.filter_required(ava, required) + assert ava.keys() == ["serialNumber"] + assert ava["serialNumber"] == ["54321"]