Refactoring spree

This commit is contained in:
Roland Hedberg
2010-03-25 17:07:01 +01:00
parent 55a86ee1bc
commit 8e23bd2b0a
15 changed files with 243 additions and 829 deletions

View File

@@ -98,7 +98,7 @@ class SAML2Plugin(FormPluginBase):
session_info["not_on_or_after"]) session_info["not_on_or_after"])
return name_id return name_id
def _pick_idp(self): def _pick_idp(self, environ):
""" """
If more than one idp and if none is selected, I have to do wayf If more than one idp and if none is selected, I have to do wayf
""" """
@@ -147,7 +147,7 @@ class SAML2Plugin(FormPluginBase):
self.log and self.log.info("VO: %s" % vorg) self.log and self.log.info("VO: %s" % vorg)
# If more than one idp and if none is selected, I have to do wayf # If more than one idp and if none is selected, I have to do wayf
idp_url = self._pick_idp() idp_url = self._pick_idp(environ)
# Do the AuthnRequest # Do the AuthnRequest
scl = Saml2Client(environ, self.conf) scl = Saml2Client(environ, self.conf)
@@ -169,7 +169,7 @@ class SAML2Plugin(FormPluginBase):
else : else :
HTTPInternalServerError(detail='Incorrect returned data') HTTPInternalServerError(detail='Incorrect returned data')
def _get_post(self): def _get_post(self, environ):
""" Get the posted information """ """ Get the posted information """
post_env = environ.copy() post_env = environ.copy()
post_env['QUERY_STRING'] = '' post_env['QUERY_STRING'] = ''
@@ -219,7 +219,7 @@ class SAML2Plugin(FormPluginBase):
if self.debug: if self.debug:
self.log and self.log.info('identify query: %s' % (query,)) self.log and self.log.info('identify query: %s' % (query,))
post = self._get_post() post = self._get_post(environ)
# Not for me, put the post back where next in line can find it # Not for me, put the post back where next in line can find it
try: try:

View File

@@ -36,6 +36,8 @@ from saml2.sigver import pre_signature_part, sign_assertion_using_xmlsec
from saml2.sigver import sign_statement_using_xmlsec from saml2.sigver import sign_statement_using_xmlsec
from saml2.soap import SOAPClient from saml2.soap import SOAPClient
from saml2.attribute_converter import to_local
DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT
FORM_SPEC = """<form method="post" action="%s"> FORM_SPEC = """<form method="post" action="%s">
@@ -365,11 +367,12 @@ class Saml2Client(object):
lax) lax)
# The assertion can contain zero or one attributeStatements # The assertion can contain zero or one attributeStatements
assert len(assertion.attribute_statement) <= 1 if not assertion.attribute_statement:
if assertion.attribute_statement:
ava = get_attribute_values(assertion.attribute_statement[0])
else:
ava = {} ava = {}
else:
assert len(assertion.attribute_statement) == 1
ava = to_local(self.config.attribute_converters(),
assertion.attribute_statement[0])
log and log.info("AVA: %s" % (ava,)) log and log.info("AVA: %s" % (ava,))
@@ -599,28 +602,6 @@ def for_me(condition, myself ):
if audience.text.strip() == myself: if audience.text.strip() == myself:
return True return True
def get_attribute_values(attribute_statement):
""" Get the attributes and the attribute values
:param response: The AttributeStatement.
:return: A dictionary containing attributes and values
"""
result = {}
for attribute in attribute_statement.attribute:
# Check name_format ??
try:
name = attribute.friendly_name.strip()
except AttributeError:
name = attribute.name.strip()
result[name] = []
for value in attribute.attribute_value:
if not value.text:
result[name].append('')
else:
result[name].append(value.text.strip())
return result
ROW = """<tr><td>%s</td><td>%s</td></tr>""" ROW = """<tr><td>%s</td><td>%s</td></tr>"""
def _print_statement(statem): def _print_statement(statem):

View File

@@ -3,6 +3,8 @@
# #
from saml2 import metadata, utils from saml2 import metadata, utils
from saml2.assertion import Policy
from saml2.attribute_converter import ac_factory, AttributeConverter
import re import re
class MissingValue(Exception): class MissingValue(Exception):
@@ -19,41 +21,6 @@ def entity_id2url(meta, entity_id):
""" """
return meta.single_sign_on_services(entity_id)[0] return meta.single_sign_on_services(entity_id)[0]
def do_assertions(assertions):
""" This is only for IdPs or AAs, and it's about limiting what
is returned to the SP.
In the configuration file, restrictions on which values that
can be returned are specified with the help of regular expressions.
This function goes through and pre-compile the regular expressions.
:param assertions:
:return: The assertion with the string specification replaced with
a compiled regular expression.
"""
for _, spec in assertions.items():
if spec == None:
continue
try:
restr = spec["attribute_restrictions"]
except KeyError:
continue
if restr == None:
continue
for key, values in restr.items():
if not values:
spec["attribute_restrictions"][key] = None
continue
rev = []
for value in values:
rev.append(re.compile(value))
spec["attribute_restrictions"][key] = rev
return assertions
class Config(dict): class Config(dict):
def sp_check(self, config, metadata=None): def sp_check(self, config, metadata=None):
""" config["idp"] is a dictionary with entity_ids as keys and """ config["idp"] is a dictionary with entity_ids as keys and
@@ -80,15 +47,13 @@ class Config(dict):
assert "url" in config assert "url" in config
assert "name" in config assert "name" in config
def idp_check(self, config): def idp_aa_check(self, config):
assert "url" in config assert "url" in config
if "assertions" in config: if "assertions" in config:
config["assertions"] = do_assertions(config["assertions"]) config["policy"] = Policy(config["assertions"])
del config["assertions"]
def aa_check(self, config): elif "policy" in config:
assert "url" in config config["policy"] = Policy(config["policy"])
if "assertions" in config:
config["assertions"] = do_assertions(config["assertions"])
def load_metadata(self, metadata_conf, xmlsec_binary): def load_metadata(self, metadata_conf, xmlsec_binary):
""" Loads metadata into an internal structure """ """ Loads metadata into an internal structure """
@@ -125,14 +90,11 @@ class Config(dict):
config["metadata"] = self.load_metadata(config["metadata"], config["metadata"] = self.load_metadata(config["metadata"],
config["xmlsec_binary"]) config["xmlsec_binary"])
if "attribute_maps" in config: if "attribute_map_dir" in config:
(forward, backward) = utils.parse_attribute_map(config[ config["attrconverters"] = ac_factory(
"attribute_maps"]) config["attribute_map_dir"])
config["am_forward"] = forward
config["am_backward"] = backward
else: else:
config["am_forward"] = None config["attrconverters"] = [AttributeConverter()]
config["am_backward"] = None
if "sp" in config["service"]: if "sp" in config["service"]:
#print config["service"]["sp"] #print config["service"]["sp"]
@@ -141,11 +103,38 @@ class Config(dict):
else: else:
self.sp_check(config["service"]["sp"]) self.sp_check(config["service"]["sp"])
if "idp" in config["service"]: if "idp" in config["service"]:
self.idp_check(config["service"]["idp"]) self.idp_aa_check(config["service"]["idp"])
if "aa" in config["service"]: if "aa" in config["service"]:
self.aa_check(config["service"]["aa"]) self.idp_aa_check(config["service"]["aa"])
for key, val in config.items(): for key, val in config.items():
self[key] = val self[key] = val
return self return self
def services(self):
return self["service"].keys()
def idp_policy(self):
try:
return self["service"]["idp"]["policy"]
except KeyError:
return Policy()
def aa_policy(self):
try:
return self["service"]["aa"]["policy"]
except KeyError:
return Policy()
def aa_url(self):
return self["service"]["aa"]["url"]
def idp_url(self):
return self["service"]["idp"]["url"]
def vo_conf(self, name):
return self.conf["vitual_organization"][name]
def attribute_converters(self):
return self["attrconverters"]

View File

@@ -32,8 +32,11 @@ from saml2.time_util import valid
@decorator @decorator
def keep_updated(f, self, entity_id, *args, **kwargs): def keep_updated(f, self, entity_id, *args, **kwargs):
#print "In keep_updated" #print "In keep_updated"
try:
if not valid(self.entity[entity_id]["valid_until"]): if not valid(self.entity[entity_id]["valid_until"]):
self.reload_entity(entity_id) self.reload_entity(entity_id)
except KeyError:
pass
return f(self, entity_id, *args, **kwargs) return f(self, entity_id, *args, **kwargs)
@@ -195,6 +198,8 @@ class MetaData(object):
def reload_entity(self, entity_id): def reload_entity(self, entity_id):
for source, eids in self._import.items(): for source, eids in self._import.items():
if entity_id in eids: if entity_id in eids:
if source == "-":
return
self.clear_from_source(source) self.clear_from_source(source)
if isinstance(source, basestring): if isinstance(source, basestring):
f = open(source) f = open(source)

View File

@@ -24,54 +24,29 @@ import shelve
from saml2 import saml, samlp, VERSION, make_instance from saml2 import saml, samlp, VERSION, make_instance
from saml2.utils import sid, decode_base64_and_inflate from saml2.utils import sid, decode_base64_and_inflate
from saml2.utils import response_factory, do_ava_statement from saml2.utils import response_factory
from saml2.utils import MissingValue, args2dict from saml2.utils import MissingValue, args2dict
from saml2.utils import success_status_factory, assertion_factory from saml2.utils import success_status_factory, assertion_factory
from saml2.utils import filter_attribute_value_assertions
from saml2.utils import OtherError, do_attribute_statement from saml2.utils import OtherError, do_attribute_statement
from saml2.utils import VersionMismatch, UnknownPrincipal, UnsupportedBinding from saml2.utils import VersionMismatch, UnknownPrincipal, UnsupportedBinding
from saml2.utils import filter_on_attributes, status_from_exception_factory from saml2.utils import status_from_exception_factory
from saml2.sigver import correctly_signed_authn_request from saml2.sigver import correctly_signed_authn_request
from saml2.sigver import pre_signature_part from saml2.sigver import pre_signature_part
from saml2.time_util import instant, in_a_while from saml2.time_util import instant, in_a_while
from saml2.config import Config from saml2.config import Config
from saml2.cache import Cache from saml2.cache import Cache
from saml2.assertion import Assertion, Policy
class IdentifierMap(object):
class Server(object): def __init__(self, dbname, debug=0, log=None):
""" A class that does things that IdPs or AAs do """ self.map = shelve.open(dbname,writeback=True)
def __init__(self, config_file="", config=None, cache="",
log=None, debug=0):
if config_file:
self.load_config(config_file)
elif config:
self.conf = config
self.metadata = self.conf["metadata"]
if cache:
self.cache = Cache(cache)
else:
self.cache = Cache()
self.log = log
self.debug = debug self.debug = debug
self.log = log
def load_config(self, config_file): def persistent(self, entity_id, subject_id):
self.conf = Config()
self.conf.load_file(config_file)
if "subject_data" in self.conf:
self.id_map = shelve.open(self.conf["subject_data"],
writeback=True)
else:
self.id_map = None
def issuer(self):
return args2dict( self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
def persistent_id(self, entity_id, subject_id):
""" Keeps the link between a permanent identifier and a """ Keeps the link between a permanent identifier and a
temporary/pseudotemporary identifier for a subject temporary/pseudo-temporary identifier for a subject
:param entity_id: SP entity ID or VO entity ID :param entity_id: SP entity ID or VO entity ID
:param subject_id: The local identifier of the subject :param subject_id: The local identifier of the subject
@@ -79,12 +54,12 @@ class Server(object):
entity_id entity_id
""" """
if self.debug: if self.debug:
self.log and self.log.debug("Id map keys: %s" % self.id_map.keys()) self.log and self.log.debug("Id map keys: %s" % self.map.keys())
try: try:
emap = self.id_map[entity_id] emap = self.map[entity_id]
except KeyError: except KeyError:
emap = self.id_map[entity_id] = {"forward":{}, "backward":{}} emap = self.map[entity_id] = {"forward":{}, "backward":{}}
try: try:
if self.debug: if self.debug:
@@ -97,11 +72,43 @@ class Server(object):
break break
emap["forward"][subject_id] = temp_id emap["forward"][subject_id] = temp_id
emap["backward"][temp_id] = subject_id emap["backward"][temp_id] = subject_id
self.id_map[entity_id] = emap self.map[entity_id] = emap
self.id_map.sync() self.map.sync()
return temp_id return temp_id
class Server(object):
""" A class that does things that IdPs or AAs do """
def __init__(self, config_file="", config=None, cache="",
log=None, debug=0):
self.log = log
self.debug = debug
if config_file:
self.load_config(config_file)
elif config:
self.conf = config
self.metadata = self.conf["metadata"]
if cache:
self.cache = Cache(cache)
else:
self.cache = Cache()
def load_config(self, config_file):
self.conf = Config()
self.conf.load_file(config_file)
if "subject_data" in self.conf:
self.id = IdentifierMap(self.conf["subject_data"],
self.debug, self.log)
else:
self.id = None
def issuer(self):
return args2dict( self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
def parse_authn_request(self, enc_request): def parse_authn_request(self, enc_request):
"""Parse a Authentication Request """Parse a Authentication Request
@@ -169,9 +176,8 @@ class Server(object):
query = samlp.attribute_query_from_string(xml_string) query = samlp.attribute_query_from_string(xml_string)
assert query.version == VERSION assert query.version == VERSION
self.log and self.log.info( self.log and self.log.info(
"%s ?= %s" % (query.destination, "%s ?= %s" % (query.destination, self.conf.aa_url))
self.conf["service"]["aa"]["url"])) assert query.destination == self.conf.aa_url
assert query.destination == self.conf["service"]["aa"]["url"]
# verify signature # verify signature
@@ -185,93 +191,11 @@ class Server(object):
def find_subject(self, subject, attribute=None): def find_subject(self, subject, attribute=None):
pass pass
def _not_on_or_after(self, sp_entity_id): # ------------------------------------------------------------------------
""" When the assertion stops being valid, should not be
used after this time.
:return: String representation of the time def _response(self, consumer_url, in_response_to, sp_entity_id,
""" identity=None, name_id=None, status=None, sign=False,
if "assertions" in self.conf: policy=Policy()):
try:
spec = self.conf["assertions"][sp_entity_id]["lifetime"]
return in_a_while(**spec)
except KeyError:
try:
spec = self.conf["assertions"]["default"]["lifetime"]
return in_a_while(**spec)
except KeyError:
pass
# default is a hour
return in_a_while(**{"hours":1})
def filter_ava(self, ava, sp_entity_id, required=None, optional=None,
typ="idp"):
""" What attribute and attribute values returns depends on what
the SP has said it wants in the request or in the metadata file and
what the IdP/AA wants to release. An assumption is that what the SP
asks for overrides whatever is in the metadata. But of course the
IdP never releases anything it doesn't want to.
:param ava: All the attributes and values that are available
:param sp_entity_id: The entity ID of the SP
:param required: Attributes that the SP requires in the assertion
:param optional: Attributes that the SP thinks is optional
:param typ: IdPs and AAs does this, and they have different parts
of the configuration.
:return: A possibly modified AVA
"""
try:
assertions = self.conf["service"][typ]["assertions"]
try:
restrictions = assertions[sp_entity_id][
"attribute_restrictions"]
except KeyError:
try:
restrictions = assertions["default"][
"attribute_restrictions"]
except KeyError:
restrictions = None
except KeyError:
restrictions = None
if restrictions:
#print restrictions
ava = filter_attribute_value_assertions(ava, restrictions)
if required or optional:
ava = filter_on_attributes(ava, required, optional)
return ava
def restrict_ava(self, identity, spid):
""" Identity attribute names are expected to be expressed in
the local lingo (== friendlyName)
:param identity: A dictionary with attributes and values
:return: A filtered ava according to the IdPs/AAs rules and
the list of required/optional attributes according to the SP.
If the requirements can't be met an exception is raised.
"""
(required, optional) = self.conf["metadata"].attribute_consumer(spid)
return self.filter_ava(identity, spid, required, optional)
def _conditions(self, sp_entity_id):
return args2dict(
not_before=instant(),
# How long might depend on who's getting it
not_on_or_after=self._not_on_or_after(sp_entity_id),
audience_restriction=args2dict(
audience=args2dict(sp_entity_id)))
def _authn_statement(self):
return args2dict(authn_instant = instant(),
session_index = sid()),
def do_response(self, consumer_url, in_response_to,
sp_entity_id, identity=None, name_id=None,
status=None ):
""" Create a Response that adhers to the ??? profile. """ Create a Response that adhers to the ??? profile.
:param consumer_url: The URL which should receive the response :param consumer_url: The URL which should receive the response
@@ -280,15 +204,17 @@ class Server(object):
:param identity: A dictionary with attributes and values that are :param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response. expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject :param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param policy: The attribute release policy for this instance
:return: A Response instance :return: A Response instance
""" """
if not status: if not status:
status = success_status_factory() status = success_status_factory()
tmp = response_factory( response = response_factory(
issuer=self.issuer(), issuer=self.issuer(),
in_response_to = in_response_to, in_response_to = in_response_to,
destination = consumer_url, destination = consumer_url,
@@ -296,100 +222,68 @@ class Server(object):
) )
if identity: if identity:
attr_statement = do_ava_statement(identity, ast = Assertion(identity)
self.conf["am_backward"])
# temporary identifier or ??
if not name_id:
name_id = args2dict(sid(),format=saml.NAMEID_FORMAT_TRANSIENT)
# start using now and for a hour
conds = self._conditions(sp_entity_id)
assertion = assertion_factory(
attribute_statement = attr_statement,
authn_statement = self._authn_statement(),
conditions = conds,
subject=args2dict(
name_id=name_id,
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=args2dict(
subject_confirmation_data = \
args2dict(in_response_to=in_response_to))),
),
# Store which assertion that has been sent to which SP about which
# subject.
self.cache.set(name_id["text"], sp_entity_id, assertion,
conds["not_on_or_after"])
tmp.update({"assertion":assertion})
return make_instance(samlp.Response, tmp)
# ----------------------------
def error_response(self, destination, in_response_to, spid, exc,
name_id = None):
return self.do_response(
destination, # consumer_url
in_response_to, # in_response_to
spid, # sp_entity_id
name_id,
status = status_from_exception_factory(exc)
)
def do_aa_response(self, consumer_url, in_response_to,
sp_entity_id, identity,
name_id=None, ip_address="", issuer=None, sign=False):
try: try:
identity = self.restrict_ava(identity, sp_entity_id) ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc: except MissingValue, exc:
tmp = self.error_response( consumer_url, in_response_to, response = self.error_response(consumer_url, in_response_to,
sp_entity_id, exc, name_id) sp_entity_id, exc, name_id)
return make_instance(samlp.Response, tmp) return make_instance(samlp.Response, response)
attr_statement = do_ava_statement(identity, self.conf["am_backward"]) assertion = ast.construct(sp_entity_id, in_response_to, name_id,
self.conf.attribute_converters(),
# temporary identifier or ?? policy)
if not name_id:
name_id = args2dict(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
conds = self._conditions(sp_entity_id)
assertion = assertion_factory(
subject = args2dict(
name_id = name_id,
method = saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation = args2dict(
subject_confirmation_data = \
args2dict(
in_response_to = in_response_to,
not_on_or_after = self._not_on_or_after(sp_entity_id),
address = ip_address,
recipient = consumer_url))),
attribute_statement = attr_statement,
authn_statement = self._authn_statement(),
conditions = conds
)
if sign: if sign:
assertion["signature"] = pre_signature_part(assertion["id"]) assertion["signature"] = pre_signature_part(assertion["id"])
#print name_id # Store which assertion that has been sent to which SP about which
#print conds # subject.
self.cache.set(name_id["text"], sp_entity_id, assertion, print assertion
conds["not_on_or_after"])
tmp = response_factory( self.cache.set(assertion["subject"]["name_id"]["text"],
issuer=issuer, sp_entity_id, assertion,
in_response_to=in_response_to, assertion["conditions"]["not_on_or_after"])
destination=consumer_url,
status=success_status_factory(), response.update({"assertion":assertion})
assertion=assertion,
return make_instance(samlp.Response, response)
# ------------------------------------------------------------------------
def do_response(self, consumer_url, in_response_to,
sp_entity_id, identity=None, name_id=None,
status=None, sign=False ):
return self._response(consumer_url, in_response_to,
sp_entity_id, identity, name_id,
status, sign, self.conf.idp_policy())
# ------------------------------------------------------------------------
def error_response(self, destination, in_response_to, spid, exc,
name_id=None):
return self._response(
destination, # consumer_url
in_response_to, # in_response_to
spid, # sp_entity_id
None, # identity
name_id,
status = status_from_exception_factory(exc)
) )
return make_instance(samlp.Response, tmp) # ------------------------------------------------------------------------
def do_aa_response(self, consumer_url, in_response_to, sp_entity_id,
identity=None, name_id=None, ip_address="",
issuer=None, status=None, sign=False):
return self._response(consumer_url, in_response_to,
sp_entity_id, identity, name_id,
status, sign, policy=self.conf.aa_policy())
# ------------------------------------------------------------------------
def authn_response(self, identity, in_response_to, destination, spid, def authn_response(self, identity, in_response_to, destination, spid,
name_id_policy, userid): name_id_policy, userid):
@@ -408,14 +302,13 @@ class Server(object):
name_id = None name_id = None
if name_id_policy.sp_name_qualifier: if name_id_policy.sp_name_qualifier:
try: try:
vo_conf = self.conf["virtual_organization"][ vo_conf = self.conf.vo_conf(name_id_policy.sp_name_qualifier)
name_id_policy.sp_name_qualifier]
subj_id = identity[vo_conf["common_identifier"]] subj_id = identity[vo_conf["common_identifier"]]
except KeyError: except KeyError:
self.log.info( self.log.info(
"Get persistent ID (%s,%s)" % ( "Get persistent ID (%s,%s)" % (
name_id_policy.sp_name_qualifier,userid)) name_id_policy.sp_name_qualifier,userid))
subj_id = self.persistent_id(name_id_policy.sp_name_qualifier, subj_id = self.id.persistent(name_id_policy.sp_name_qualifier,
userid) userid)
self.log.info("=> %s" % subj_id) self.log.info("=> %s" % subj_id)
@@ -423,9 +316,7 @@ class Server(object):
format=saml.NAMEID_FORMAT_PERSISTENT, format=saml.NAMEID_FORMAT_PERSISTENT,
sp_name_qualifier=name_id_policy.sp_name_qualifier) sp_name_qualifier=name_id_policy.sp_name_qualifier)
# Do attribute-value filtering
try: try:
identity = self.restrict_ava(identity, spid)
resp = self.do_response( resp = self.do_response(
destination, # consumer_url destination, # consumer_url
in_response_to, # in_response_to in_response_to, # in_response_to

View File

@@ -27,6 +27,7 @@ class OtherError(Exception):
class MissingValue(Exception): class MissingValue(Exception):
pass pass
EXCEPTION2STATUS = { EXCEPTION2STATUS = {
VersionMismatch: samlp.STATUS_VERSION_MISMATCH, VersionMismatch: samlp.STATUS_VERSION_MISMATCH,
UnknownPrincipal: samlp.STATUS_UNKNOWN_PRINCIPAL, UnknownPrincipal: samlp.STATUS_UNKNOWN_PRINCIPAL,
@@ -118,36 +119,6 @@ def parse_attribute_map(filenames):
return (forward, backward) return (forward, backward)
def filter_attribute_value_assertions(ava, attribute_restrictions=None):
""" Will weed out attribute values and values according to the
rules defined in the attribute restrictions. If filtering results in
an attribute without values, then the attribute is removed from the
assertion.
:param ava: The incoming attribute value assertion
:param attribute_restrictions: The rules that govern which attributes
and values that are allowed.
:return: A attribute value assertion
"""
if not attribute_restrictions:
return ava
resava = {}
for attr, vals in ava.items():
if attr in attribute_restrictions:
if attribute_restrictions[attr] == None:
resava[attr] = vals
else:
rvals = []
for restr in attribute_restrictions[attr]:
for val in vals:
if restr.match(val):
rvals.append(val)
if rvals:
resava[attr] = list(set(rvals))
return resava
def identity_attribute(form, attribute, forward_map=None): def identity_attribute(form, attribute, forward_map=None):
if form == "friendly": if form == "friendly":
if attribute.friendly_name: if attribute.friendly_name:
@@ -160,84 +131,6 @@ def identity_attribute(form, attribute, forward_map=None):
# default is name # default is name
return attribute.name return attribute.name
def _filter_values(vals, required=None, optional=None):
""" Removes values from *val* that does not appear in *attributes*.
:param val: The values that are to be filtered
:param required: The required values
:param optional: The optional values
:return: The set of values after filtering
"""
if not required and not optional:
return vals
valr = []
valo = []
if required:
rvals = [v.text for v in required]
else:
rvals = []
if optional:
ovals = [v.text for v in optional]
else:
ovals = []
for val in vals:
if val in rvals:
valr.append(val)
elif val in ovals:
valo.append(val)
valo.extend(valr)
if rvals:
if len(rvals) == len(valr):
return valo
else:
raise MissingValue("Required attribute value missing")
else:
return valo
def _combine(required=None, optional=None):
res = {}
if not required:
required = []
if not optional:
optional = []
for attr in required:
part = None
for oat in optional:
if attr.name == oat.name:
part = (attr.attribute_value, oat.attribute_value)
break
if part:
res[(attr.name, attr.friendly_name)] = part
else:
res[(attr.name, attr.friendly_name)] = (attr.attribute_value, [])
for oat in optional:
tag = (oat.name, oat.friendly_name)
if tag not in res:
res[tag] = ([], oat.attribute_value)
return res
def filter_on_attributes(ava, required=None, optional=None):
""" Filter
:param required: list of RequestedAttribute instances
"""
res = {}
comb = _combine(required, optional)
for attr, vals in comb.items():
if attr[0] in ava:
res[attr[0]] = _filter_values(ava[attr[0]], vals[0], vals[1])
elif attr[1] in ava:
res[attr[1]] = _filter_values(ava[attr[1]], vals[0], vals[1])
else:
raise MissingValue("Required attribute missing")
return res
#---------------------------------------------------------------------------- #----------------------------------------------------------------------------
def _properties(klass): def _properties(klass):
@@ -326,42 +219,6 @@ def _basic_val(val):
return attrval return attrval
def basic_attribute(attribute, value):
"""<saml:Attribute NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:basic" Name="cn">
<saml:AttributeValue xsi:type="xs:string">Andreas Solberg</saml:AttributeValue>
</saml:Attribute>
"""
attr = {}
attr["name_format"] = NAME_FORMAT_BASIC
attr["name"] = attribute
return attr
def ava_to_attributes(ava, bmap=None):
attrs = []
for friendly_name, val in ava.items():
dic = {}
attrval = _attrval(val)
if attrval:
dic["attribute_value"] = attrval
dic["friendly_name"] = friendly_name
if bmap:
try:
(dic["name"], dic["name_format"]) = bmap[friendly_name]
except KeyError:
pass
attrs.append(args2dict(**dic))
return attrs
def do_ava_statement(identity, bmap):
"""
:param identity: A dictionary with fiendly names as keys
:return:
"""
return args2dict(attribute=ava_to_attributes(identity, bmap))
def do_attributes(identity): def do_attributes(identity):
attrs = [] attrs = []
if not identity: if not identity:

View File

@@ -7,3 +7,23 @@ def pytest_funcarg__xmlsec(request):
return fil return fil
raise Exception("Can't find xmlsec1") raise Exception("Can't find xmlsec1")
def pytest_funcarg__AVA(request):
return [
{
"surName": ["Jeter"],
"givenName": ["Derek"],
},
{
"surName": ["Howard"],
"givenName": ["Ryan"],
},
{
"surName": ["Suzuki"],
"givenName": ["Ischiro"],
},
{
"surName": ["Hedberg"],
"givenName": ["Roland"],
},
]

View File

@@ -4,10 +4,11 @@
"idp": { "idp": {
"name" : "Rolands IdP", "name" : "Rolands IdP",
"url": "http://localhost:8088/sso", "url": "http://localhost:8088/sso",
"assertions": { "policy": {
"default": { "default": {
"lifetime": {"minutes":15}, "lifetime": {"minutes":15},
"attribute_restrictions": None # means all I have "attribute_restrictions": None, # means all I have
"name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
}, },
"urn:mace:example.com:saml:roland:sp": { "urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5}, "lifetime": {"minutes": 5},
@@ -27,5 +28,5 @@
"local": ["metadata.xml", "vo_metadata.xml"], "local": ["metadata.xml", "vo_metadata.xml"],
}, },
"subject_data": "subject_data.db", "subject_data": "subject_data.db",
"attribute_maps" : ["attribute.map"] "attribute_map_dir" : "attributemaps",
} }

View File

@@ -7,6 +7,26 @@
"assertions": { "assertions": {
"default": { "default": {
"lifetime": {"minutes":15}, "lifetime": {"minutes":15},
"name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"givenName": None,
"surName": None,
"mail": [".*@example.com"],
"eduPersonAffiliation": ["(employee|staff|faculty)"],
}
}
}
},
"aa": {
"name" : "Rolands restrictied AA",
"url": "http://localhost:8089/sso",
"assertions": {
"default": {
"lifetime": {"minutes":15},
"name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
}, },
"urn:mace:example.com:saml:roland:sp": { "urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5}, "lifetime": {"minutes": 5},
@@ -27,5 +47,5 @@
"local": ["sp_0.metadata"], "local": ["sp_0.metadata"],
}, },
"subject_data": "subject_data.db", "subject_data": "subject_data.db",
"attribute_maps" : ["attribute.map"] "attribute_map_dir" : "attributemaps",
} }

View File

@@ -24,4 +24,5 @@
}, },
"subject_data": "subject_data.db", "subject_data": "subject_data.db",
"accept_time_diff": 60, "accept_time_diff": 60,
"attribute_map_dir" : "attributemaps",
} }

View File

@@ -8,7 +8,6 @@ import gzip
from saml2 import utils, saml, samlp, md, make_instance from saml2 import utils, saml, samlp, md, make_instance
from saml2.utils import do_attribute_statement from saml2.utils import do_attribute_statement
from saml2.sigver import make_temp from saml2.sigver import make_temp
from saml2.config import do_assertions
from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue from saml2.saml import Attribute, NAME_FORMAT_URI, AttributeValue
from py.test import raises from py.test import raises
@@ -278,104 +277,6 @@ def test_subject():
AVA = [
{
"surName": ["Jeter"],
"givenName": ["Derek"],
},
{
"surName": ["Howard"],
"givenName": ["Ryan"],
},
{
"surName": ["Suzuki"],
"givenName": ["Ischiro"],
},
{
"surName": ["Hedberg"],
"givenName": ["Roland"],
},
]
def test_filter_attribute_value_assertions_0():
assertion = {
"default": {
"attribute_restrictions": {
"surName": [".*berg"],
}
}
}
ass = do_assertions(assertion)
print ass
ava = utils.filter_attribute_value_assertions(AVA[3],
ass["default"]["attribute_restrictions"])
print ava
assert ava.keys() == ["surName"]
assert ava["surName"] == ["Hedberg"]
def test_filter_attribute_value_assertions_1():
assertion = {
"default": {
"attribute_restrictions": {
"surName": None,
"givenName": [".*er.*"],
}
}
}
ass = do_assertions(assertion)
print ass
ava = utils.filter_attribute_value_assertions(AVA[0],
ass["default"]["attribute_restrictions"])
print ava
assert _eq(ava.keys(), ["givenName","surName"])
assert ava["surName"] == ["Jeter"]
assert ava["givenName"] == ["Derek"]
ava = utils.filter_attribute_value_assertions(AVA[1],
ass["default"]["attribute_restrictions"])
print ava
assert _eq(ava.keys(), ["surName"])
assert ava["surName"] == ["Howard"]
def test_filter_attribute_value_assertions_2():
assertion = {
"default": {
"attribute_restrictions": {
"givenName": ["^R.*"],
}
}
}
ass = do_assertions(assertion)
print ass
ava = utils.filter_attribute_value_assertions(AVA[0],
ass["default"]["attribute_restrictions"])
print ava
assert _eq(ava.keys(), [])
ava = utils.filter_attribute_value_assertions(AVA[1],
ass["default"]["attribute_restrictions"])
print ava
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Ryan"]
ava = utils.filter_attribute_value_assertions(AVA[3],
ass["default"]["attribute_restrictions"])
print ava
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Roland"]
def test_parse_attribute_map(): def test_parse_attribute_map():
(forward, backward) = utils.parse_attribute_map(["attribute.map"]) (forward, backward) = utils.parse_attribute_map(["attribute.map"])
@@ -442,128 +343,6 @@ def test_identity_attribute_4():
# if there would be a map it would be serialNumber # if there would be a map it would be serialNumber
assert utils.identity_attribute("friendly",a) == "serialNumber" assert utils.identity_attribute("friendly",a) == "serialNumber"
def test_combine_0():
r = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
o = Attribute(name="urn:oid:2.5.4.4", name_format=NAME_FORMAT_URI,
friendly_name="surName")
comb = utils._combine([r],[o])
print comb
assert _eq(comb.keys(), [('urn:oid:2.5.4.5', 'serialNumber'),
('urn:oid:2.5.4.4', 'surName')])
assert comb[('urn:oid:2.5.4.5', 'serialNumber')] == ([], [])
assert comb[('urn:oid:2.5.4.4', 'surName')] == ([], [])
def test_filter_on_attributes_0():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
required = [a]
ava = { "serialNumber": ["12345"]}
ava = utils.filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_on_attributes_1():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
required = [a]
ava = { "serialNumber": ["12345"], "givenName":["Lars"]}
ava = utils.filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_2():
a1 = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber")
a2 = Attribute(name="urn:oid:2.5.4.4", name_format=NAME_FORMAT_URI,
friendly_name="surName")
required = [a1,a2]
ava = { "serialNumber": ["12345"], "givenName":["Lars"]}
raises(utils.MissingValue, utils.filter_on_attributes, ava, required)
def test_filter_values_req_3():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345")])
required = [a]
ava = { "serialNumber": ["12345"]}
ava = utils.filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_4():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
required = [a]
ava = { "serialNumber": ["12345"]}
raises(utils.MissingValue, utils.filter_on_attributes, ava, required)
def test_filter_values_req_5():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345")])
required = [a]
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
def test_filter_values_req_6():
a = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
required = [a]
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert ava["serialNumber"] == ["54321"]
def test_filter_values_req_opt_0():
r = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
o = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345")])
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_on_attributes(ava, [r], [o])
assert ava.keys() == ["serialNumber"]
assert _eq(ava["serialNumber"], ["12345","54321"])
def test_filter_values_req_opt_1():
r = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="54321")])
o = Attribute(name="urn:oid:2.5.4.5", name_format=NAME_FORMAT_URI,
friendly_name="serialNumber", attribute_value=[
AttributeValue(text="12345"),
AttributeValue(text="abcd0")])
ava = { "serialNumber": ["12345", "54321"]}
ava = utils.filter_on_attributes(ava, [r], [o])
assert ava.keys() == ["serialNumber"]
assert _eq(ava["serialNumber"], ["12345","54321"])
def _givenName(a): def _givenName(a):
assert a["name"] == "urn:oid:2.5.4.42" assert a["name"] == "urn:oid:2.5.4.42"
assert a["friendly_name"] == "givenName" assert a["friendly_name"] == "givenName"
@@ -576,22 +355,6 @@ def _surName(a):
assert len(a["attribute_value"]) == 1 assert len(a["attribute_value"]) == 1
assert a["attribute_value"] == [{"text":"Jeter"}] assert a["attribute_value"] == [{"text":"Jeter"}]
def test_ava_to_attributes():
(forward, backward) = utils.parse_attribute_map(["attribute.map"])
attrs = utils.ava_to_attributes(AVA[0], backward)
assert len(attrs) == 2
a = attrs[0]
if a["name"] == "urn:oid:2.5.4.42":
_givenName(a)
_surName(attrs[1])
elif a["name"] == "urn:oid:2.5.4.4":
_surName(a)
_givenName(attrs[1])
else:
print a
assert False
def test_nameformat_email(): def test_nameformat_email():
assert utils.valid_email("foo@example.com") assert utils.valid_email("foo@example.com")
assert utils.valid_email("a@b.com") assert utils.valid_email("a@b.com")

View File

@@ -33,7 +33,7 @@ def _read_lines(name):
def test_swami_1(): def test_swami_1():
md = metadata.MetaData() md = metadata.MetaData()
md.import_metadata(_read_file(SWAMI_METADATA)) md.import_metadata(_read_file(SWAMI_METADATA),"-")
print len(md.entity) print len(md.entity)
assert len(md.entity) assert len(md.entity)
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \ idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
@@ -52,7 +52,7 @@ def test_swami_1():
def test_incommon_1(): def test_incommon_1():
md = metadata.MetaData() md = metadata.MetaData()
md.import_metadata(_read_file(INCOMMON_METADATA)) md.import_metadata(_read_file(INCOMMON_METADATA),"-")
print len(md.entity) print len(md.entity)
assert len(md.entity) == 442 assert len(md.entity) == 442
idps = dict([ idps = dict([
@@ -67,7 +67,7 @@ def test_incommon_1():
def test_example(): def test_example():
md = metadata.MetaData() md = metadata.MetaData()
md.import_metadata(_read_file(EXAMPLE_METADATA)) md.import_metadata(_read_file(EXAMPLE_METADATA), "-")
print len(md.entity) print len(md.entity)
assert len(md.entity) == 1 assert len(md.entity) == 1
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \ idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
@@ -82,7 +82,7 @@ def test_example():
def test_switch_1(): def test_switch_1():
md = metadata.MetaData() md = metadata.MetaData()
md.import_metadata(_read_file(SWITCH_METADATA)) md.import_metadata(_read_file(SWITCH_METADATA), "-")
print len(md.entity) print len(md.entity)
assert len(md.entity) == 90 assert len(md.entity) == 90
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \ idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
@@ -110,13 +110,13 @@ def test_switch_1():
def test_sp_metadata(): def test_sp_metadata():
md = metadata.MetaData() md = metadata.MetaData()
md.import_metadata(_read_file(SP_METADATA)) md.import_metadata(_read_file(SP_METADATA), "-")
print md.entity print md.entity
assert len(md.entity) == 1 assert len(md.entity) == 1
assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp'] assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp']
assert md.entity['urn:mace:umu.se:saml:roland:sp'].keys() == [ assert md.entity['urn:mace:umu.se:saml:roland:sp'].keys() == [
"organization","sp_sso"] 'valid_until',"organization","sp_sso"]
print md.entity['urn:mace:umu.se:saml:roland:sp']["sp_sso"][0].keyswv() print md.entity['urn:mace:umu.se:saml:roland:sp']["sp_sso"][0].keyswv()
(req,opt) = md.attribute_consumer('urn:mace:umu.se:saml:roland:sp') (req,opt) = md.attribute_consumer('urn:mace:umu.se:saml:roland:sp')
print req print req

View File

@@ -156,17 +156,9 @@ def test_idp():
c = Config().load(IDP1) c = Config().load(IDP1)
print c print c
service = c["service"] assert c.services() == ["idp"]
assert c.idp_url() == "http://localhost:8088/"
assert service.keys() == ["idp"] attribute_restrictions = c.idp_policy().get_attribute_restriction("")
idp = service["idp"] assert attribute_restrictions["eduPersonAffiliation"][0].match("staff")
assert _eq(idp.keys(),['url', 'name', 'assertions'])
assert idp["url"] == "http://localhost:8088/"
default_attribute_restrictions = idp["assertions"]["default"][
"attribute_restrictions"]
assert default_attribute_restrictions[
"eduPersonAffiliation"][0].match("staff")

View File

@@ -161,8 +161,8 @@ def test_6():
assert dr.uri == "#invoice34" assert dr.uri == "#invoice34"
assert len(dr.extension_elements) == 1 assert len(dr.extension_elements) == 1
ee = dr.extension_elements[0] ee = dr.extension_elements[0]
assert ee.c_tag == "Transforms" assert ee.tag == "Transforms"
assert ee.c_namespace == "http://www.w3.org/2000/09/xmldsig#" assert ee.namespace == "http://www.w3.org/2000/09/xmldsig#"
trs = saml2.extension_element_to_element(ee, xmldsig.ELEMENT_FROM_STRING, trs = saml2.extension_element_to_element(ee, xmldsig.ELEMENT_FROM_STRING,
namespace=xmldsig.NAMESPACE) namespace=xmldsig.NAMESPACE)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from saml2.server import Server from saml2.server import Server, IdentifierMap
from saml2 import server, make_instance from saml2 import server, make_instance
from saml2 import samlp, saml, client, utils from saml2 import samlp, saml, client, utils
from saml2.utils import OtherError from saml2.utils import OtherError
@@ -13,6 +13,14 @@ import re
def _eq(l1,l2): def _eq(l1,l2):
return set(l1) == set(l2) return set(l1) == set(l2)
def test_persistence_0():
id = IdentifierMap("subject_data.db")
pid1 = id.persistent("urn:mace:example.com:saml:roland:sp", "jeter")
pid2 = id.persistent("urn:mace:example.com:saml:roland:sp", "jeter")
print pid1, pid2
assert pid1 == pid2
class TestServer1(): class TestServer1():
def setup_class(self): def setup_class(self):
@@ -171,7 +179,7 @@ class TestServer1():
"http://localhost:8087/", # consumer_url "http://localhost:8087/", # consumer_url
"12", # in_response_to "12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
{ "eduPersonEntitlement": "Bat"} { "eduPersonEntitlement": "Short stop"}
) )
print resp.keyswv() print resp.keyswv()
@@ -197,7 +205,7 @@ class TestServer1():
assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attribute.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" assert attribute.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
value = attribute.attribute_value[0] value = attribute.attribute_value[0]
assert value.text.strip() == "Bat" assert value.text.strip() == "Short stop"
assert value.type == "xs:string" assert value.type == "xs:string"
assert assertion.subject assert assertion.subject
assert assertion.subject.name_id assert assertion.subject.name_id
@@ -244,107 +252,7 @@ class TestServer1():
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert not resp.assertion assert not resp.assertion
def test_persistence_0(self):
pid1 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
pid2 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
print pid1, pid2
assert pid1 == pid2
def test_filter_ava_0(self):
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["givenName", "surName", "mail"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_1(self):
""" No mail address returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"givenName": None,
"surName": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_filter_ava_2(self):
""" Only mail returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_3(self):
""" Only example.com mail addresses returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": [re.compile(".*@example\.com$")],
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["dj@example.com"]
def test_authn_response_0(self): def test_authn_response_0(self):
# reset
del self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"], ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]} "mail": ["derek@nyy.mlb.com"]}
@@ -385,24 +293,10 @@ class TestServer2():
except IOError, e: except IOError, e:
self.server = Server("tests/restrictive_idp.config") self.server = Server("tests/restrictive_idp.config")
def test_0(self):
ident = self.server.restrict_ava(IDENTITY.copy(),
"urn:mace:example.com:saml:roland:sp")
assert len(ident) == 3
assert ident == {'eduPersonAffiliation': ['staff'],
'givenName': ['Derek'], 'surName': ['Jeter']}
print self.server.conf.keys()
attr = utils.ava_to_attributes(ident, self.server.conf["am_backward"])
assert len(attr) == 3
assert {'attribute_value': [{'text': 'staff'}],
'friendly_name': 'eduPersonAffiliation',
'name': 'urn:oid:1.3.6.1.4.1.5923.1.1.1.1',
'name_format': 'urn:oasis:names:tc:SAML:2.0:attrname-format:uri'} in attr
def test_do_aa_reponse(self): def test_do_aa_reponse(self):
aa_policy = self.server.conf.aa_policy()
print aa_policy.__dict__
print self.server.conf["service"]
response = self.server.do_aa_response( "http://example.com/sp/", "aaa", response = self.server.do_aa_response( "http://example.com/sp/", "aaa",
"urn:mace:example.com:sp:1", IDENTITY.copy(), "urn:mace:example.com:sp:1", IDENTITY.copy(),
issuer = self.server.conf["entityid"]) issuer = self.server.conf["entityid"])