Got everything working again

This commit is contained in:
Roland Hedberg
2009-11-27 11:07:46 +01:00
parent 475c145238
commit e9adbd609e
9 changed files with 516 additions and 232 deletions

View File

@@ -56,7 +56,7 @@ class SAML2Plugin(FormPluginBase):
def __init__(self, rememberer_name, saml_conf_file, virtual_organization,
cache, debug):
FormPluginBase.__init__(self, rememberer_name)
FormPluginBase.__init__(self)
self.rememberer_name = rememberer_name
self.debug = debug
@@ -134,7 +134,7 @@ class SAML2Plugin(FormPluginBase):
self.srv["name"],
relay_state=came_from,
log=logger,
vo=vorg)
vorg=vorg)
self.outstanding_authn[sid] = came_from
if self.debug:

View File

@@ -497,7 +497,7 @@ class Saml2Client:
session_id = sid()
request = self.create_attribute_query(session_id, subject_id,
issuer, destination, attribute, sp_name_qualifier,
name_qualifier, format=format)
name_qualifier, nameformat=format)
log and log.info("Request, created: %s" % request)

View File

@@ -331,25 +331,21 @@ class MetaData(object):
return name
def requests(self, entity_id):
def attribute_consumer(self, entity_id):
try:
ssos = self.entity[entity_id]["sp_sso"]
except KeyError:
return ([], [])
try:
requested = ssos["attribute_consuming_service"][
"requested_attribute"]
except KeyError:
return ([], [])
required = []
optional = []
for attr in requested:
if "is_required" in attr and attr["is_required"] == "true":
required.append(attr)
else:
optional.append(attr)
# What if there is more than one ? Can't be ?
for acs in ssos[0].attribute_consuming_service:
for attr in acs.requested_attribute:
if attr.is_required == "true":
required.append(attr)
else:
optional.append(attr)
return (required, optional)

View File

@@ -214,7 +214,6 @@ class Server(object):
"""
attr_statement = do_attribute_statement(identity)
# start using now and for a hour
conds = kd_conditions(
not_before=instant(),
@@ -307,29 +306,34 @@ class Server(object):
return make_instance(samlp.Response, tmp)
def filter_ava(self, ava, sp_entity_id, request=None, role=""):
def filter_ava(self, ava, sp_entity_id, required, optional, role=""):
""" What attribute and attribute values returns depends on what
the SP has said it wants in the request or in the metadata file and
what the IdP/AA wants to release. An assumption is that what the SP
asks for overrides whatever is in the metadata.
"""
#print self.conf["service"][role]
#print self.conf["service"][role]["assertions"][sp_entity_id]
if not request:
#any rules for this SP ?
try:
restrictions = self.conf["service"][role][
"assertions"][sp_entity_id][
"attribute_restrictions"]
except KeyError:
try:
restrictions = self.conf["service"][role][sp_entity_id][
"attribute_restrictions"]
restrictions = self.conf["service"][role]["assertions"][
"default"]["attribute_restrictions"]
except KeyError:
try:
restrictions = self.conf["service"][role]["default"][
"attribute_restrictions"]
except KeyError:
restrictions = None
if restrictions:
ava = filter_attribute_value_assertions(ava,
restrictions)
restrictions = None
#print restrictions
if restrictions:
ava = filter_attribute_value_assertions(ava, restrictions)
if required:
pass
return ava
def authn_response(self, identity, in_response_to, destination, spid,
@@ -364,7 +368,8 @@ class Server(object):
sp_name_qualifier=name_id_policy.sp_name_qualifier)
# Do attribute filtering
identity = self.filter_ava( identity, spid, None,"idp")
(required,optional) = self.conf["metadata"].attribute_consumer(spid)
identity = self.filter_ava( identity, spid, required, optional, "idp")
resp = self.do_sso_response(
destination, # consumer_url

View File

@@ -23,6 +23,9 @@ class UnsupportedBinding(Exception):
class OtherError(Exception):
pass
class MissingValue(Exception):
pass
EXCEPTION2STATUS = {
VersionMismatch: samlp.STATUS_VERSION_MISMATCH,
UnknownPrincipal: samlp.STATUS_UNKNOWN_PRINCIPAL,
@@ -72,7 +75,7 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False):
:return: Value class instance
"""
cinst = None
#print "_make_val: %s %s (%s)" % (prop,val,klass)
#print "_make_val: %s %s (%s) [%s]" % (prop,val,klass,part)
if isinstance(val, bool):
cinst = klass(text="%s" % val)
elif isinstance(val, int):
@@ -92,9 +95,9 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False):
if part:
return cinst
else:
if cis:
if cinst:
cis = [cinst]
setattr(klass_inst, prop, cis)
setattr(klass_inst, prop, cis)
def make_instance(klass, spec):
"""
@@ -104,8 +107,11 @@ def make_instance(klass, spec):
:param spec: Information to be placed in the instance
:return: The instance
"""
#print "----- %s -----" % klass
#print "..... %s ....." % spec
klass_inst = klass()
for prop in klass.c_attributes.values():
#print "# %s" % (prop)
if prop in spec:
if isinstance(spec[prop], bool):
setattr(klass_inst, prop,"%s" % spec[prop])
@@ -117,12 +123,15 @@ def make_instance(klass, spec):
setattr(klass_inst, "text", spec["text"])
for prop, klass in klass.c_children.values():
#print "## %s, %s" % (prop, klass)
if prop in spec:
#print "%s" % spec[prop]
if isinstance(klass, list): # means there can be a list of values
make_vals(spec[prop], klass[0], klass_inst, prop)
else:
cis = make_vals(spec[prop], klass, klass_inst, prop, True)
setattr(klass_inst, prop, cis)
#+print ">>> %s <<<" % klass_inst
return klass_inst
def parse_attribute_map(filenames):
@@ -186,6 +195,68 @@ def identity_attribute(form, attribute, forward_map=None):
# default is name
return attribute.name
def filter_values(vals, attributes, required=True):
reqval = []
for rval in attributes:
for val in vals:
if rval.text == val:
reqval.append(val)
break
if required:
if len(reqval) == len(attributes):
return reqval
else:
raise MissingValue("Required attribute value missing")
else:
return reqval
def filter_required(ava, required):
"""
:param required: list of RequestedAttribute instances
"""
res = {}
for attr in required:
if attr.name in ava:
if required.attribute_value:
res[attr.name] = filter_values(ava[attr.name],
required.attribute_value)
else:
res[attr.name] = ava[attr.name]
elif attr.friendly_name in ava:
if attr.attribute_value:
res[attr.friendly_name] = filter_values(
ava[attr.friendly_name],
attr.attribute_value)
else:
res[attr.friendly_name] = ava[attr.friendly_name]
else:
raise MissingValue("Required attribute missing")
return res
def filter_optional(ava, optional):
"""
:param optional: list of RequestedAttribute instances
"""
res = {}
for attr in optional:
if attr.name in ava:
if optional.attribute_value:
res[attr.name] = filter_values(ava[attr.name],
optional.attribute_value, False)
else:
res[attr.name] = ava[attr.name]
elif attr.friendly_name in ava:
if attr.attribute_value:
res[attr.friendly_name] = filter_values(
ava[attr.friendly_name],
attr.attribute_value, False)
else:
res[attr.friendly_name] = ava[attr.friendly_name]
return res
#----------------------------------------------------------------------------
def properties(klass):
@@ -285,7 +356,7 @@ def do_attributes(identity):
for key, val in identity.items():
dic = {}
if isinstance(val, basestring):
attrval = kd_attribute_value(val)
attrval = [kd_attribute_value(val)]
elif isinstance(val, list):
attrval = [kd_attribute_value(v) for v in val]
elif val == None:

View File

@@ -180,6 +180,16 @@ class TestClient:
assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT
assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"
def test_attribute_query(self):
req = self.client.attribute_query(
"_e7b68a04488f715cda642fbdd90099f5",
"urn:mace:umu.se:saml/rolandsp",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
format=saml.NAMEID_FORMAT_TRANSIENT)
# since no one is answering on the other end
assert req == None
def test_idp_entry(self):
idp_entry = utils.make_instance( samlp.IDPEntry,
self.client.idp_entry(name="Umeå Universitet",

View File

@@ -10,6 +10,7 @@ SWAMI_METADATA = "tests/urn-mace-swami.se-swamid-test-1.0-metadata.xml"
INCOMMON_METADATA = "tests/InCommon-metadata.xml"
EXAMPLE_METADATA = "tests/metadata_example.xml"
SWITCH_METADATA = "tests/metadata.aaitest.xml"
SP_METADATA = "tests/metasp.xml"
def _eq(l1,l2):
return set(l1) == set(l2)
@@ -92,6 +93,26 @@ def test_switch_1():
print len(dual)
assert len(dual) == 0
def test_sp_metadata():
md = metadata.MetaData()
md.import_metadata(open(SP_METADATA).read())
print md.entity
assert len(md.entity) == 1
assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp']
assert md.entity['urn:mace:umu.se:saml:roland:sp'].keys() == [
"organization","sp_sso"]
print md.entity['urn:mace:umu.se:saml:roland:sp']["sp_sso"][0].keyswv()
(req,opt) = md.attribute_consumer('urn:mace:umu.se:saml:roland:sp')
print req
assert len(req) == 3
assert len(opt) == 1
assert opt[0].name == 'urn:oid:2.5.4.12'
assert opt[0].friendly_name == 'title'
assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42',
'urn:oid:0.9.2342.19200300.100.1.3'])
assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail'])
# ------------ Constructing metaval ----------------------------------------
def test_construct_organisation_name():

View File

@@ -4,207 +4,18 @@
from saml2.server import Server
from saml2 import server
from saml2 import samlp, saml, client, utils
from saml2.utils import make_instance, OtherError, UnknownPricipal
from saml2.utils import make_instance, OtherError
from saml2.utils import do_attribute_statement
from py.test import raises
import shelve
SUCCESS_STATUS = """<?xml version=\'1.0\' encoding=\'UTF-8\'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>"""
ERROR_STATUS = """<?xml version='1.0' encoding='UTF-8'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" /></ns0:StatusCode><ns0:StatusMessage>Error resolving principal</ns0:StatusMessage></ns0:Status>"""
import re
def _eq(l1,l2):
return set(l1) == set(l2)
def test_status_success():
stat = utils.kd_status(
status_code=utils.kd_status_code(
value=samlp.STATUS_SUCCESS))
status = make_instance( samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_success_status():
stat = utils.kd_success_status()
status = make_instance(samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_error_status():
stat = utils.kd_status(
status_message=utils.kd_status_message(
"Error resolving principal"),
status_code=utils.kd_status_code(
value=samlp.STATUS_RESPONDER,
status_code=utils.kd_status_code(
value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
status_text = "%s" % make_instance( samlp.Status, stat )
print status_text
assert status_text == ERROR_STATUS
def test_status_from_exception():
e = UnknownPricipal("Error resolving principal")
stat = utils.kd_status_from_exception(e)
status_text = "%s" % make_instance( samlp.Status, stat )
assert status_text == ERROR_STATUS
def test_attribute_statement():
astat = do_attribute_statement({"surName":"Jeter",
"givenName":"Derek"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert len(attr0.attribute_value) == 1
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
assert len(attr1.attribute_value) == 1
if attr0.name == "givenName":
assert attr0.attribute_value[0].text == "Derek"
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert attr1.name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
def test_audience():
aud_restr = make_instance( saml.AudienceRestriction,
utils.kd_audience_restriction(
audience=utils.kd_audience("urn:foo:bar")))
assert aud_restr.keyswv() == ["audience"]
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
conds_dict = utils.kd_conditions(
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=utils.kd_audience_restriction(
audience=utils.kd_audience("urn:foo:bar")))
conditions = make_instance(saml.Conditions, conds_dict)
assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
"audience_restriction"])
assert conditions.not_before == "2009-10-30T07:58:10.852Z"
assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
def test_value_1():
#FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
adict = utils.kd_attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI)
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
def test_value_2():
adict = utils.kd_attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
def test_value_3():
adict = utils.kd_attribute(attribute_value="Derek",
name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name", "name_format",
"friendly_name", "attribute_value"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_value_4():
adict = utils.kd_attribute(attribute_value="Derek",
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_do_attribute_statement_0():
astat = do_attribute_statement({"vo_attr":"foobar"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 1
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert attr0.name == "vo_attr"
assert len(attr0.attribute_value) == 1
assert attr0.attribute_value[0].text == "foobar"
def test_do_attribute_statement():
astat = do_attribute_statement({"surName":"Jeter",
"givenName":["Derek","Sanderson"]})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
if attr0.name == "givenName":
assert len(attr0.attribute_value) == 2
assert _eq([av.text for av in attr0.attribute_value],
["Derek","Sanderson"])
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
assert len(attr1.attribute_value) == 1
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert len(attr0.attribute_value) == 1
assert attr1.name == "givenName"
assert len(attr1.attribute_value) == 2
assert _eq([av.text for av in attr1.attribute_value],
["Derek","Sanderson"])
def test_do_attribute_statement_multi():
astat = do_attribute_statement(
{( "urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute)
assert _eq(statement.attribute[0].keyswv(),
["name","name_format","friendly_name","attribute_value"])
attribute = statement.attribute[0]
assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attribute.name_format == (
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
assert attribute.friendly_name == "eduPersonEntitlement"
def test_subject():
adict = utils.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
subject = make_instance(saml.Subject, adict)
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
class TestServer():
def setup_class(self):
self.server = Server("tests/server.config")
self.server = Server("tests/idp.config")
def test_issuer(self):
issuer = make_instance( saml.Issuer, self.server.issuer())
@@ -232,7 +43,7 @@ class TestServer():
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:sp"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
#
assert len(assertion.attribute_statement) == 1
attribute_statement = assertion.attribute_statement[0]
@@ -279,7 +90,7 @@ class TestServer():
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:sp"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
@@ -392,3 +203,89 @@ class TestServer():
print pid1, pid2
assert pid1 == pid2
def test_filter_ava_0(self):
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["givenName", "surName", "mail"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_1(self):
""" No mail address returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"givenName": None,
"surName": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_filter_ava_2(self):
""" Only mail returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_3(self):
""" Only example.com mail addresses returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": [re.compile(".*@example\.com$")],
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["dj@example.com"]

View File

@@ -1,17 +1,222 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2 import utils
from saml2 import utils, saml, samlp
from saml2.utils import do_attribute_statement, make_instance
import zlib
import base64
import gzip
from saml2.sigver import make_temp
from saml2.config import do_assertions
from saml2.saml import Attribute, NAME_FORMAT_URI
from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue
from py.test import raises
SUCCESS_STATUS = """<?xml version=\'1.0\' encoding=\'UTF-8\'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>"""
ERROR_STATUS = """<?xml version='1.0' encoding='UTF-8'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" /></ns0:StatusCode><ns0:StatusMessage>Error resolving principal</ns0:StatusMessage></ns0:Status>"""
def _eq(l1,l2):
return set(l1) == set(l2)
def test_status_success():
stat = utils.kd_status(
status_code=utils.kd_status_code(
value=samlp.STATUS_SUCCESS))
status = make_instance( samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_success_status():
stat = utils.kd_success_status()
status = make_instance(samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_error_status():
stat = utils.kd_status(
status_message=utils.kd_status_message(
"Error resolving principal"),
status_code=utils.kd_status_code(
value=samlp.STATUS_RESPONDER,
status_code=utils.kd_status_code(
value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
status_text = "%s" % make_instance( samlp.Status, stat )
print status_text
assert status_text == ERROR_STATUS
def test_status_from_exception():
e = utils.UnknownPrincipal("Error resolving principal")
stat = utils.kd_status_from_exception(e)
status_text = "%s" % make_instance( samlp.Status, stat )
assert status_text == ERROR_STATUS
def test_attribute():
attr = utils.do_attributes({"surName":"Jeter"})
assert len(attr) == 1
inst = make_instance(saml.Attribute, attr[0])
print inst
assert inst.name == "surName"
assert len(inst.attribute_value) == 1
assert inst.attribute_value[0].text == "Jeter"
def test_attribute_statement():
astat = do_attribute_statement({"surName":"Jeter",
"givenName":"Derek"})
print astat
statement = make_instance(saml.AttributeStatement,astat)
print statement
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert len(attr0.attribute_value) == 1
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
assert len(attr1.attribute_value) == 1
if attr0.name == "givenName":
assert attr0.attribute_value[0].text == "Derek"
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert attr1.name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
def test_audience():
aud_restr = make_instance( saml.AudienceRestriction,
utils.kd_audience_restriction(
audience=utils.kd_audience("urn:foo:bar")))
assert aud_restr.keyswv() == ["audience"]
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
conds_dict = utils.kd_conditions(
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=utils.kd_audience_restriction(
audience=utils.kd_audience("urn:foo:bar")))
conditions = make_instance(saml.Conditions, conds_dict)
assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
"audience_restriction"])
assert conditions.not_before == "2009-10-30T07:58:10.852Z"
assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
def test_value_1():
#FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
adict = utils.kd_attribute(name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI)
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
def test_value_2():
adict = utils.kd_attribute(name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
def test_value_3():
adict = utils.kd_attribute(attribute_value="Derek",
name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name", "name_format",
"friendly_name", "attribute_value"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_value_4():
adict = utils.kd_attribute(attribute_value="Derek",
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_do_attribute_statement_0():
astat = do_attribute_statement({"vo_attr":"foobar"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 1
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert attr0.name == "vo_attr"
assert len(attr0.attribute_value) == 1
assert attr0.attribute_value[0].text == "foobar"
def test_do_attribute_statement():
astat = do_attribute_statement({"surName":"Jeter",
"givenName":["Derek","Sanderson"]})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
if attr0.name == "givenName":
assert len(attr0.attribute_value) == 2
assert _eq([av.text for av in attr0.attribute_value],
["Derek","Sanderson"])
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
assert len(attr1.attribute_value) == 1
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert len(attr0.attribute_value) == 1
assert attr1.name == "givenName"
assert len(attr1.attribute_value) == 2
assert _eq([av.text for av in attr1.attribute_value],
["Derek","Sanderson"])
def test_do_attribute_statement_multi():
astat = do_attribute_statement(
{( "urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute)
assert _eq(statement.attribute[0].keyswv(),
["name","name_format","friendly_name","attribute_value"])
attribute = statement.attribute[0]
assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attribute.name_format == (
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
assert attribute.friendly_name == "eduPersonEntitlement"
def test_subject():
adict = utils.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
subject = make_instance(saml.Subject, adict)
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_encode_decode():
package = "1234567890abcdefghijklmnopqrstuvxyzåäö"
@@ -171,3 +376,82 @@ def test_identity_attribute_4():
assert utils.identity_attribute("name",a) == "urn:oid:2.5.4.5"
# if there would be a map it would be serialNumber
assert utils.identity_attribute("friendly",a) == "serialNumber"
def test_filter_values_req_0():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
required = [a]
ava = { "serialNumber": ["12345"]}
ava = utils.filter_required(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_1():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
required = [a]
ava = { "serialNumber": ["12345"], "givenName":["Lars"]}
ava = utils.filter_required(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_2():
a1 = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
a2 = Attribute(name="urn:oid:2.5.4.4", name_format=NAME_FORMAT_URI,
friendly_name="surName")
required = [a1,a2]
ava = { "serialNumber": ["12345"], "givenName":["Lars"]}
raises(utils.MissingValue, utils.filter_required, ava, required)
def test_filter_values_req_3():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345")])
required = [a]
ava = { "serialNumber": ["12345"]}
ava = utils.filter_required(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_4():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
required = [a]
ava = { "serialNumber": ["12345"]}
raises(utils.MissingValue, utils.filter_required, ava, required)
def test_filter_values_req_5():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345")])
required = [a]
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_required(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_6():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
required = [a]
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_required(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["54321"]