Lots and lots of changes, sorry should be separated but that won't happen

This commit is contained in:
Roland Hedberg
2009-11-17 10:43:42 +01:00
parent b9a48efbfa
commit 8b3be05ddf
18 changed files with 941 additions and 610 deletions

View File

@@ -2,9 +2,9 @@
import re
import base64
from cgi import escape
from cgi import escape, parse_qs
import urllib
import urlparse
#import urlparse
from saml2 import server
from saml2.utils import make_instance, sid, decode_base64_and_inflate
@@ -76,7 +76,7 @@ def sso(environ, start_response, user, logger):
logger and logger.info("Environ keys: %s" % environ.keys())
if "QUERY_STRING" in environ:
logger and logger.info("Query string: %s" % environ["QUERY_STRING"])
query = urlparse.parse_qs(environ["QUERY_STRING"])
query = parse_qs(environ["QUERY_STRING"])
elif "s2repoze.qinfo" in environ:
query = environ["s2repoze.qinfo"]
# base 64 encoded request
@@ -125,7 +125,7 @@ def not_found(environ, start_response, logger):
def not_authn(environ, start_response, logger):
if "QUERY_STRING" in environ:
query = urlparse.parse_qs(environ["QUERY_STRING"])
query = parse_qs(environ["QUERY_STRING"])
logger and logger.info("query: %s" % query)
start_response('401 Unauthorized', [('Content-Type', 'text/plain')])
return ['Unknown user']

View File

@@ -1,5 +1,5 @@
{
"entityid" : "urn:mace:umu.se:saml:roland:idp",
"entityid" : "urn:mace:example.com:saml:roland:idp",
"service": ["idp"],
"my_name" : "Rolands IdP",
"debug" : 1,

View File

@@ -1,7 +1,60 @@
from paste.request import construct_url
import zope.interface
from repoze.who.interfaces import IRequestClassifier
from repoze.who.interfaces import IChallengeDecider
from paste.httpheaders import REQUEST_METHOD
from paste.httpheaders import CONTENT_TYPE
from paste.httpheaders import USER_AGENT
from paste.httpheaders import WWW_AUTHENTICATE
import re
_DAV_METHODS = (
'OPTIONS',
'PROPFIND',
'PROPPATCH',
'MKCOL',
'LOCK',
'UNLOCK',
'TRACE',
'DELETE',
'COPY',
'MOVE'
)
_DAV_USERAGENTS = (
'Microsoft Data Access Internet Publishing Provider',
'WebDrive',
'Zope External Editor',
'WebDAVFS',
'Goliath',
'neon',
'davlib',
'wsAPI',
'Microsoft-WebDAV'
)
def my_request_classifier(environ):
""" Returns one of the classifiers 'dav', 'xmlpost', or 'browser',
depending on the imperative logic below"""
request_method = REQUEST_METHOD(environ)
if request_method in _DAV_METHODS:
return 'dav'
useragent = USER_AGENT(environ)
if useragent:
for agent in _DAV_USERAGENTS:
if useragent.find(agent) != -1:
return 'dav'
if request_method == 'POST':
if CONTENT_TYPE(environ) == 'text/xml':
return 'xmlpost'
elif CONTENT_TYPE(environ) == "application/soap+xml":
return 'soap'
return 'browser'
zope.interface.directlyProvides(my_request_classifier, IRequestClassifier)
class my_challenge_decider:
def __init__(self,path_login=""):
self.path_login = path_login

View File

@@ -2,39 +2,36 @@ import ConfigParser, os
from zope.interface import implements
from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator
#from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator
from repoze.who.interfaces import IMetadataProvider
class INIMetadataProvider(object):
implements(IMetadataProvider)
def __init__(self, ini_file):
def __init__(self, ini_file, key_attribute):
self.users = ConfigParser.ConfigParser()
self.users.readfp(open(ini_file))
self.key_attribute = key_attribute
# def authenticate(self, environ, identity):
# try:
# username = identity['login']
# password = identity['password']
# except KeyError:
# return None
#
# success = User.authenticate(username, password)
#
# return success
def add_metadata(self, environ, identity):
logger = environ.get('repoze.who.logger','')
username = identity.get('repoze.who.userid')
logger and logger.info("Identity: %s (before)" % (identity.items(),))
key = identity.get('repoze.who.userid')
#logger and logger.info("Identity: %s (before)" % (identity.items(),))
try:
identity["user"] = self.users.items(username)
logger and logger.info("Identity: %s (after)" % (identity.items(),))
if self.key_attribute:
for sec in self.users.sections():
if self.users.has_option(sec,self.key_attribute):
if key in self.users.get(sec, self.key_attribute):
identity["user"] = dict(self.users.items(sec))
break
else:
identity["user"] = dict(self.users.items(key))
#logger and logger.info("Identity: %s (after)" % (identity.items(),))
except ValueError:
pass
def make_plugin(ini_file):
return INIMetadataProvider(ini_file)
def make_plugin(ini_file, key_attribute=""):
return INIMetadataProvider(ini_file, key_attribute)

View File

@@ -23,6 +23,7 @@ import urlparse
import urllib
import cgi
import os
import time
from paste.httpheaders import CONTENT_LENGTH
from paste.httpheaders import CONTENT_TYPE
@@ -48,6 +49,7 @@ from saml2.attribute_resolver import AttributeResolver
from saml2.metadata import MetaData
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.config import Config
from saml2.cache import Cache
def construct_came_from(environ):
""" The URL that the user used when the process where interupted
@@ -65,8 +67,8 @@ class SAML2Plugin(FormPluginBase):
implements(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider)
def __init__(self, rememberer_name, saml_conf_file, store,
path_logout, path_toskip, debug):
def __init__(self, rememberer_name, saml_conf_file, virtual_organization,
cache, path_logout, path_toskip, debug):
self.rememberer_name = rememberer_name
self.path_logout = path_logout
@@ -75,7 +77,17 @@ class SAML2Plugin(FormPluginBase):
self.conf = Config()
self.conf.load_file(saml_conf_file)
self.sp = self.conf["service"]["sp"]
if virtual_organization:
self.vo = virtual_organization
try:
self.vo_conf = self.conf[
"virtual_organization"][virtual_organization]
except KeyError:
self.vo = None
else:
self.vo = None
try:
self.metadata = self.conf["metadata"]
except KeyError:
@@ -83,10 +95,10 @@ class SAML2Plugin(FormPluginBase):
self.outstanding_authn = {}
self.iam = os.uname()[1]
if store==u"file":
self.store = shelve.open(store_filename)
elif store==u"mem":
self.store = {}
if cache:
self.cache = Cache(cache)
else:
self.cache = Cache()
#### IChallenger ####
def challenge(self, environ, status, app_headers, forget_headers):
@@ -104,11 +116,20 @@ class SAML2Plugin(FormPluginBase):
came_from = construct_came_from(environ)
if self.debug:
logger and logger.info("RelayState >> %s" % came_from)
try:
vo = environ["myapp.vo"]
except KeyError:
vo = self.vo
logger and logger.info("VO: %s" % vo)
# If more than one idp, I have to do wayf
(sid, result) = cl.authenticate(self.conf["entityid"],
self.conf["idp_url"],
self.conf["service_url"],
self.conf["my_name"],
relay_state=came_from, log=logger)
self.conf["idp"]["url"][0],
self.sp["url"],
self.sp["my_name"],
relay_state=came_from,
log=logger,
vo=vo)
self.outstanding_authn[sid] = came_from
if self.debug:
@@ -116,7 +137,7 @@ class SAML2Plugin(FormPluginBase):
if isinstance(result, tuple):
return HTTPTemporaryRedirect(headers=[result])
else :
# possible to normally not used
# possible though normally not used
body = "\n".join(result)
def auth_form(environ, start_response):
content_length = CONTENT_LENGTH.tuples(str(len(result)))
@@ -133,8 +154,8 @@ class SAML2Plugin(FormPluginBase):
uri = environ.get('REQUEST_URI',construct_url(environ))
if self.debug:
logger and logger.info("environ.keys(): %s" % environ.keys())
logger and logger.info("Environment: %s" % environ)
#logger and logger.info("environ.keys(): %s" % environ.keys())
#logger and logger.info("Environment: %s" % environ)
logger and logger.info('identify uri: %s' % (uri,))
query = parse_dict_querystring(environ)
@@ -166,6 +187,13 @@ class SAML2Plugin(FormPluginBase):
post_env = environ.copy()
post_env['QUERY_STRING'] = ''
if environ["CONTENT_LENGTH"]:
body = environ["wsgi.input"].read(int(environ["CONTENT_LENGTH"]))
from StringIO import StringIO
environ['wsgi.input'] = StringIO(body)
environ['s2repoze.body'] = body
post = cgi.FieldStorage(
fp=environ['wsgi.input'],
environ=post_env,
@@ -173,18 +201,26 @@ class SAML2Plugin(FormPluginBase):
)
if self.debug:
logger and logger.info('identify post keys: %s' % (post.keys(),))
logger and logger.info('identify post: %s' % (post,))
try:
if not post.has_key("SAMLResponse"):
environ["post.fieldstorage"] = post
return {}
except TypeError:
environ["post.fieldstorage"] = post
return {}
# check for SAML2 authN
cl = Saml2Client(environ, self.conf)
try:
(ava, came_from) = cl.response(post,
(ava, came_from, issuer, not_on_or_after) = cl.response(post,
self.conf["entityid"],
self.outstanding_authn,
logger)
name_id = ava["__userid"]
del ava["__userid"]
self.store[name_id] = ava
self.cache.set( name_id, issuer, ava, not_on_or_after)
if self.debug:
logger and logger.info("stored %s with key %s" % (ava, name_id))
except TypeError:
@@ -205,62 +241,94 @@ class SAML2Plugin(FormPluginBase):
identity["password"] = ""
identity['repoze.who.userid'] = name_id
identity["user"] = ava
#identity["issuer"] = issuer
if self.debug:
logger and logger.info("Identity: %s" % identity)
return identity
# IMetadataProvider
def add_metadata(self, environ, identity):
subject_id = identity['repoze.who.userid']
if self.debug:
logger = environ.get('repoze.who.logger','')
logger and logger.info(
"add_metadata for %s" % identity['repoze.who.userid'])
if logger:
logger.info(
"add_metadata for %s" % subject_id)
logger.info(
"Known subjects: %s" % self.cache.subjects())
try:
logger.info(
"Issuers: %s" % self.cache.issuers(subject_id))
except KeyError:
pass
if "user" not in identity:
identity["user"] = {}
try:
ava = self.store[identity['repoze.who.userid']]
(ava, old) = self.cache.get_all(subject_id)
now = time.gmtime()
if self.debug:
logger and logger.info("Adding %s" % ava)
identity["user"].update(ava)
self.store[identity['repoze.who.userid']] = identity
except KeyError:
pass
if "pysaml2_vo_expanded" not in identity:
# is this a Virtual Organization situation
if "virtual_organization" in self.conf:
if self.vo:
logger and logger.info("** Do VO aggregation **")
try:
subject_id = identity["user"][
self.conf["common_identifier"]][0]
except KeyError:
return
logger and logger.info("SubjectID: %s" % subject_id)
ar = AttributeResolver(environ, self.metadata,
self.conf["xmlsec_binary"],
self.conf["key_file"],
self.conf["cert_file"])
#try:
# This ought to be caseignore
#subject_id = identity["user"][
# self.vo_conf["common_identifier"]][0]
#except KeyError:
# logger and logger.error("** No common identifier **")
# return
logger and logger.info(
"SubjectID: %s, VO:%s" % (subject_id, self.vo))
vo_members = [
member for member in self.metadata.vo_members(
self.conf["virtual_organization"])\
if member != self.conf["md_idp"]]
member for member in self.metadata.vo_members(self.vo)\
if member not in self.conf["idp"]["entity_id"]]
logger and logger.info("VO members: %s" % vo_members)
vo_members = [m for m in vo_members \
if not self.cache.active(subject_id, m)]
logger and logger.info(
"VO members (not cached): %s" % vo_members)
if vo_members:
ar = AttributeResolver(environ, self.metadata, self.conf)
if "name_id_format" in self.vo_conf:
name_id_format = self.vo_conf["name_id_format"]
sp_name_qualifier=""
else:
sp_name_qualifier=self.vo
name_id_format = ""
extra = ar.extend(subject_id,
self.conf["entityid"],
vo_members,
self.conf["nameid_format"],
name_id_format=name_id_format,
sp_name_qualifier=sp_name_qualifier,
log=logger)
for attr,val in extra.items():
try:
# might lead to duplicates !
identity["user"][attr].extend(val)
except KeyError:
identity["user"][attr] = val
for issuer, tup in extra.items():
(not_on_or_after, resp) = tup
self.cache.set(subject_id, issuer, resp,
not_on_or_after)
logger.info(
">Issuers: %s" % self.cache.issuers(subject_id))
logger.info(
"AVA: %s" % (self.cache.get_all(subject_id),))
identity["user"] = self.cache.get_all(subject_id)[0]
# Only do this once
identity["pysaml2_vo_expanded"] = 1
self.store[identity['repoze.who.userid']] = identity
#self.store[identity['repoze.who.userid']] = (
# not_on_or_after, identity)
# @return
# used 2 times : one to get the ticket, the other to validate it
@@ -277,7 +345,9 @@ class SAML2Plugin(FormPluginBase):
def make_plugin(rememberer_name=None, # plugin for remember
store= "mem", # store for remember
cache= "", # cache
# Which virtual organization to support
virtual_organization="",
path_logout='', # regex url to logout
path_toskip='', # regex url to skip
saml_conf="",
@@ -294,7 +364,8 @@ def make_plugin(rememberer_name=None, # plugin for remember
path_logout = path_logout.lstrip().split('\n');
path_toskip = path_toskip.lstrip().splitlines()
plugin = SAML2Plugin(rememberer_name, saml_conf, store,
plugin = SAML2Plugin(rememberer_name, saml_conf,
virtual_organization, cache,
path_logout, path_toskip, debug)
return plugin

View File

@@ -33,16 +33,16 @@ DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT
class AttributeResolver(object):
def __init__(self, environ, metadata=None, xmlsec_binary=None,
key_file=None, cert_file=None):
def __init__(self, environ, metadata=None, config=None, saml2client=None):
self.metadata = metadata
self.saml2client = Saml2Client(environ, metadata=metadata,
xmlsec_binary=xmlsec_binary,
key_file=key_file,
cert_file=cert_file)
if saml2client:
self.saml2client = saml2client
else:
self.saml2client = Saml2Client(environ, config)
def extend(self, subject_id, issuer, vo_members, nameid_format,
log=None):
def extend(self, subject_id, issuer, vo_members, name_id_format=None,
sp_name_qualifier=None, log=None):
"""
:param subject_id: The identifier by which the subject is know
among all the participents of the VO
@@ -61,17 +61,15 @@ class AttributeResolver(object):
for attr_serv in ass.attribute_service:
log and log.info("Send attribute request to %s" % \
attr_serv.location)
resp = self.saml2client.attribute_query(subject_id,
(resp, issuer,
not_on_or_after) = self.saml2client.attribute_query(
subject_id,
issuer,
attr_serv.location,
format=nameid_format, log=log)
sp_name_qualifier=sp_name_qualifier,
format=name_id_format, log=log)
if resp:
# unnecessary
del resp["__userid"]
for attr,val in resp.items():
try:
extended_identity[attr].extend(val)
except KeyError:
extended_identity[attr] = val
extended_identity[issuer] = (not_on_or_after, resp)
return extended_identity

View File

@@ -46,6 +46,10 @@ LAX = True
class Saml2Client:
def __init__(self, environ, config=None):
"""
:param environ:
:param config: A saml2.config.Config instance
"""
self.environ = environ
if config:
self.config = config
@@ -82,47 +86,7 @@ class Saml2Client:
def scoping_from_metadata(self, entityid, location):
name = metadata.name(entityid)
return make_instance(self.scoping([self.idp_entry(name, location)]))
def create_authn_request(self, query_id, destination, service_url,
spentityid, my_name, sp_name_qualifier=None,
scoping=None):
""" Creates an Authenication Request
:param query_id: Query identifier
:param destination: Where to send the request
:param service_url: The page to where the response MUST be sent.
:param spentityid: My official name
:param my_name: Who I am
:param sp_name_qualifier: The domain in which the name should be
valid
:param scoping: For which IdPs this query are aimed.
:return: An authentication request
"""
authn_request = self._init_request(samlp.AuthnRequest(query_id),
destination)
authn_request.assertion_consumer_service_url = service_url
authn_request.protocol_binding = saml2.BINDING_HTTP_POST
authn_request.provider_name = my_name
if scoping:
authn_request.scoping = scoping
name_id_policy = samlp.NameIDPolicy()
name_id_policy.allow_create = 'true'
if sp_name_qualifier:
name_id_policy.format = saml.NAMEID_FORMAT_PERSISTENT
name_id_policy.sp_name_qualifier = sp_name_qualifier
else:
name_id_policy.format = saml.NAMEID_FORMAT_TRANSIENT
authn_request.name_id_policy = name_id_policy
authn_request.issuer = saml.Issuer(text=spentityid)
return authn_request
def response(self, post, requestor, outstanding, log=None):
""" Deal with the AuthnResponse
@@ -141,19 +105,54 @@ class Saml2Client:
if post.has_key("SAMLResponse"):
saml_response = post['SAMLResponse'].value
if saml_response:
(identity, came_from) = self.verify_response(
(identity, came_from,
not_on_or_after, response_issuer) = self.verify_response(
saml_response, requestor,
outstanding, log,
context="AuthNReq")
#relay_state = post["RelayState"].value
return (identity, came_from)
return (identity, came_from, response_issuer, not_on_or_after)
else:
return None
def authn_request(self, query_id, destination, service_url, spentityid,
my_name, vo="", scoping=None):
res = {
"id": query_id,
"version": VERSION,
"issue_instant": instant(),
"destination": destination,
"assertion_consumer_service_url": service_url,
"protocol_binding": saml2.BINDING_HTTP_POST,
"provider_name": my_name,
}
if scoping:
res["scoping"] = scoping
name_id_policy = {
"allow_create": "true"
}
name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT
if vo:
try:
#if vo in self.config["virtual_organization"]:
name_id_policy["sp_name_qualifier"] = vo
name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT
except KeyError:
pass
res["name_id_policy"] = name_id_policy
res["issuer"] = { "text": spentityid }
return make_instance(samlp.AuthnRequest, res)
def authenticate(self, spentityid, location="", service_url="",
my_name="", relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT, log=None,
scoping=None):
vo="", scoping=None):
""" Either verifies an authentication Response or if none is present
send an authentication request.
@@ -161,15 +160,16 @@ class Saml2Client:
:param binding: How the authentication request should be sent to the
IdP
:param location: Where the IdP is.
:param service_url: The service URL
:param service_url: The SP's service URL
:param my_name: The providers name
:param relay_state: To where the user should be returned after
successfull log in.
:param binding: Which binding to use for sending the request
:param log: Where to write log messages
:param vo: The entity_id of the virtual organization I'm a member of
:param scoping: For which IdPs this query are aimed.
:return: AuthnRequest reponse
:return: AuthnRequest response
"""
if log:
@@ -178,8 +178,8 @@ class Saml2Client:
log.info("service_url: %s" % service_url)
log.info("my_name: %s" % my_name)
session_id = sid()
authen_req = "%s" % self.create_authn_request(session_id, location,
service_url, spentityid, my_name, scoping)
authen_req = "%s" % self.authn_request(session_id, location,
service_url, spentityid, my_name, vo, scoping)
log and log.info("AuthNReq: %s" % authen_req)
if binding == saml2.BINDING_HTTP_POST:
@@ -237,29 +237,32 @@ class Saml2Client:
response = correctly_signed_response(decoded_xml,
self.config["xmlsec_binary"], log=log)
if not response:
log and log.error("Response was not correctly signed")
print "Response was not correctly signed"
if log:
log.error("Response was not correctly signed")
log.info(decoded_xml)
return ({}, "")
else:
log and log.error("Response was correctly signed or nor signed")
log and log.info("response: %s" % (response,))
try:
(ava, name_id, came_from) = self.do_response(response,
(ava, name_id, came_from, not_on_or_after) = self.do_response(
response,
requestor,
outstanding=outstanding,
xmlstr=xmlstr,
log=log, context=context)
issuer = response.issuer.text
except AttributeError, exc:
log and log.error("AttributeError: %s" % (exc,))
return ({}, "")
return ({}, "", 0, "")
except Exception, exc:
log and log.error("Exception: %s" % (exc,))
return ({}, "")
return ({}, "", 0, "")
# should return userid and attribute value assertions
ava["__userid"] = name_id
return (ava, came_from)
return (ava, came_from, not_on_or_after, issuer)
def _verify_condition(self, assertion, requestor, log):
# The Identity Provider MUST include a <saml:Conditions> element
@@ -283,7 +286,9 @@ class Saml2Client:
if not for_me(condition, requestor):
if not LAX:
raise Exception("Not for me!!!")
return not_on_or_after
def _websso(self, assertion, outstanding, requestor, log):
# the assertion MUST contain one AuthNStatement
assert len(assertion.authn_statement) == 1
@@ -305,7 +310,7 @@ class Saml2Client:
#print "Conditions",assertion.conditions
assert assertion.conditions
log and log.info("verify_condition")
self._verify_condition(assertion, requestor, log)
not_on_or_after = self._verify_condition(assertion, requestor, log)
# The assertion can contain zero or one attributeStatements
assert len(assertion.attribute_statement) <= 1
@@ -334,7 +339,7 @@ class Saml2Client:
assert subject.name_id
name_id = subject.name_id.text.strip()
return (ava, name_id, came_from)
return (ava, name_id, came_from, not_on_or_after)
def _encrypted_assertion(self, xmlstr, outstanding, requestor,
log=None, context=""):
@@ -388,7 +393,7 @@ class Saml2Client:
else:
log and log.info("Session id I don't recall using")
raise Exception("Session id I don't recall using")
# MUST contain *one* assertion
try:
assert len(response.assertion) == 1 or \
@@ -509,13 +514,15 @@ class Saml2Client:
log and log.info("SOAP request sent and got response: %s" % response)
if response:
log and log.info("Verifying response")
(identity, came_from) = self.verify_response(response,
(identity, came_from, not_on_or_after,
response_issuer) = self.verify_response(
response,
issuer,
outstanding={session_id:""},
log=log, decode=False,
context="AttrReq")
log and log.info("identity: %s" % identity)
return identity
return (identity, response_issuer, not_on_or_after)
else:
log and log.info("No response")
return None
@@ -609,51 +616,4 @@ def _print_statements(states):
def print_response(resp):
print _print_statement(resp)
print resp.to_string()
def d_init_request(id, destination):
return {
"id": id,
"version": VERSION,
"issue_instant": instant(),
"destination": destination,
}
def d_authn_request(query_id, destination, service_url,
spentityid, my_name, sp_name_qualifier=None,
scoping=None):
""" Creates an Authenication Request
:param query_id: Query identifier
:param destination: Where to send the request
:param service_url: The page to where the response MUST be sent.
:param spentityid: My official name
:param my_name: Who I am
:param sp_name_qualifier: The domain in which the name should be
valid
:param scoping: For which IdPs this query are aimed.
:return: An authentication request
"""
authn_request = d_init_request(query_id, destination)
authn_request["assertion_consumer_service_url"] = service_url
authn_request["protocol_binding"] = saml2.BINDING_HTTP_POST
authn_request["provider_name"] = my_name
if scoping:
authn_request["scoping"] = scoping
name_id_policy = {
"allow_create": 'true'
}
if sp_name_qualifier:
name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT
name_id_policy["sp_name_qualifier"] = sp_name_qualifier
else:
name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT
authn_request["name_id_policy"] = name_id_policy
authn_request["issuer"] = spentityid
return make_instance(samlp.AuthnRequest,authn_request)

View File

@@ -4,26 +4,39 @@
from saml2 import metadata
def entity_id2url(md, entity_id):
try:
# grab the first one
return md.single_sign_on_services(entity_id)[0]
except Exception:
print "idp_entity_id",entity_id
print ("idps in metadata",
[e for e,d in md.entity.items() if "idp_sso" in d])
print "metadata entities", md.entity.keys()
for ent, dic in md.entity.items():
print ent, dic.keys()
return None
class Config(dict):
def sp_check(self, config):
assert "idp" in config
if "metadata" in config:
md = config["metadata"]
if "idp_entity_id" in config:
try:
config["idp_url"] = md.single_sign_on_services(
config["idp_entity_id"])[0]
except Exception:
print "idp_entity_id",config["idp_entity_id"]
print ("idps in metadata",
[e for e,d in md.entity.items() if "idp_sso" in d])
print "metadata entities", md.entity.keys()
for ent, dic in md.entity.items():
print ent, dic.keys()
raise
if "entity_id" in config["idp"]:
if not "url" in config["idp"]:
config["idp"]["url"] = []
urls = config["idp"]["url"]
for eid in config["idp"]["entity_id"]:
url = entity_id2url(md, eid)
if url:
if url not in urls:
urls.append(url)
assert config["idp_url"]
assert "sp" in config["service"]
assert "url" in config["service"]["sp"]
def idp_check(self, config):
pass
@@ -50,7 +63,6 @@ class Config(dict):
assert "xmlsec_binary" in config
assert "service" in config
assert "entityid" in config
assert "service_url" in config
if "key_file" in config:
# If you have a key file you have to have a cert file

View File

@@ -897,9 +897,8 @@ class AuthnRequest(AbstractRequest):
c_children['{%s}Scoping' % NAMESPACE] = ('scoping', Scoping)
c_child_order = AbstractRequest.c_child_order[:]
c_child_order.extend(['issuer', 'signature', 'extensions', 'subject',
'name_id_policy', 'conditions', 'requested_authn_context',
'scoping'])
c_child_order.extend(['subject', 'name_id_policy', 'conditions',
'requested_authn_context', 'scoping'])
def __init__(self, id=None, version=None, issue_instant=None,
destination=None, consent=None, issuer=None, signature=None,

View File

@@ -18,6 +18,8 @@
"""Contains classes and functions that a SAML2.0 Identity provider (IdP)
or attribute authority (AA) may use to conclude its tasks.
"""
import shelve
from saml2 import saml, samlp, VERSION
from saml2.utils import sid, decode_base64_and_inflate, make_instance
from saml2.time_util import instant, in_a_while
@@ -60,95 +62,219 @@ def klassdict(klass, text=None, **kwargs):
spec[key] = val
return spec
def kd_status_from_exception(exception):
return klassdict(samlp.Status,
status_code=klassdict(samlp.StatusCode,
value=samlp.STATUS_RESPONDER,
status_code=klassdict(samlp.StatusCode,
value=EXCEPTION2STATUS[exception.__class__])
),
status_message=exception.args[0],
)
def kd_name_id(text="", **kwargs):
return klassdict(saml.NameID, text, **kwargs)
def kd_status_message(text="", **kwargs):
return klassdict(samlp.StatusMessage, text, **kwargs)
def kd_status_code(text="", **kwargs):
return klassdict(samlp.StatusCode, text, **kwargs)
def kd_status(text="", **kwargs):
return klassdict(samlp.Status, text, **kwargs)
def kd_success_status():
return kd_status(status_code=kd_status_code(value=samlp.STATUS_SUCCESS))
def kd_audience(text="", **kwargs):
return klassdict(saml.Audience, text, **kwargs)
def kd_audience_restriction(text="", **kwargs):
return klassdict(saml.AudienceRestriction, text, **kwargs)
def kd_conditions(text="", **kwargs):
return klassdict(saml.Conditions, text, **kwargs)
def kd_attribute(text="", **kwargs):
return klassdict(saml.Attribute, text, **kwargs)
def kd_attribute_value(text="", **kwargs):
return klassdict(saml.AttributeValue, text, **kwargs)
def kd_attribute_statement(text="", **kwargs):
return klassdict(saml.AttributeStatement, text, **kwargs)
def kd_subject_confirmation_data(text="", **kwargs):
return klassdict(saml.SubjectConfirmationData, text, **kwargs)
def kd_subject_confirmation(text="", **kwargs):
return klassdict(saml.SubjectConfirmation, text, **kwargs)
def kd_subject(text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
def kd_authn_statement(text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
def kd_assertion(text="", **kwargs):
kwargs.update({
"version": VERSION,
"id" : sid(),
"issue_instant" : instant(),
})
return klassdict(saml.Assertion, text, **kwargs)
def kd_response(signature=False, encrypt=False, **kwargs):
kwargs.update({
"id" : sid(),
"version": VERSION,
"issue_instant" : instant(),
})
if signature:
kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
return kwargs
def do_attribute_statement(identity):
"""
:param identity: A dictionary with fiendly names as keys
:return:
"""
attrs = []
for key, val in identity.items():
dic = {}
if isinstance(val,basestring):
attrval = kd_attribute_value(val)
elif isinstance(val,list):
attrval = [kd_attribute_value(v) for v in val]
else:
raise OtherError("strange value type on: %s" % val)
dic["attribute_value"] = attrval
if isinstance(key, basestring):
dic["name"] = key
elif isinstance(key, tuple): # 3-tuple
(name,format,friendly) = key
if name:
dic["name"] = name
if format:
dic["name_format"] = format
if friendly:
dic["friendly_name"] = friendly
attrs.append(kd_attribute(**dic))
return kd_attribute_statement(attribute=attrs)
def kd_issuer(text, **kwargs):
return klassdict(saml.Issuer, text, **kwargs)
def do_aa_response(consumer_url, in_response_to,
sp_entity_id, identity, name_id_policies=None,
name_id=None, ip_address="", issuer=None ):
attr_statement = do_attribute_statement(identity)
# start using now and for a hour
conds = kd_conditions(
not_before=instant(),
# an hour from now
not_on_or_after=in_a_while(hours=1),
audience_restriction=kd_audience_restriction(
audience=kd_audience(sp_entity_id)))
# temporary identifier or ??
if not name_id:
name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
tmp = kd_response(
issuer=issuer,
in_response_to=in_response_to,
destination=consumer_url,
status=kd_success_status(),
assertion=kd_assertion(
subject = kd_subject(
name_id=name_id,
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=kd_subject_confirmation(
subject_confirmation_data=kd_subject_confirmation_data(
in_response_to=in_response_to,
not_on_or_after=in_a_while(hours=1),
address=ip_address,
recipient=consumer_url))),
attribute_statement = attr_statement,
authn_statement= kd_authn_statement(
authn_instant=instant(),
session_index=sid()),
conditions=conds,
),
)
return make_instance(samlp.Response, tmp)
class Server(object):
def __init__(self, config_file, log=None):
def __init__(self, config_file="", config=None, log=None, debug=0):
if config_file:
self.conf = Config()
self.conf.load_file(config_file)
self.metadata = self.conf["metadata"]
if "subject_data" in self.conf:
self.id_map = shelve.open(self.conf["subject_data"],
writeback=True)
else:
self.id_map = None
elif config:
self.conf = config
self.metadata = self.conf["metadata"]
self.log = log
self.debug = debug
def issuer(self):
return klassdict( saml.Issuer, self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
return kd_issuer( self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
def status_from_exception(self, exception):
return klassdict(samlp.Status,
status_code=klassdict(samlp.StatusCode,
value=samlp.STATUS_RESPONDER,
status_code=klassdict(samlp.StatusCode,
value=EXCEPTION2STATUS[exception.__class__])
),
status_message=exception.args[0],
)
def name_id(self, text="", **kwargs):
return klassdict(saml.NameID, text, **kwargs)
def status_message(self, text="", **kwargs):
return klassdict(samlp.StatusMessage, text, **kwargs)
def status_code(self, text="", **kwargs):
return klassdict(samlp.StatusCode, text, **kwargs)
def status(self, text="", **kwargs):
return klassdict(samlp.Status, text, **kwargs)
def success_status(self):
return self.status(status_code=self.status_code(
value=samlp.STATUS_SUCCESS))
def audience(self, text="", **kwargs):
return klassdict(saml.Audience, text, **kwargs)
def audience_restriction(self, text="", **kwargs):
return klassdict(saml.AudienceRestriction, text, **kwargs)
def conditions(self, text="", **kwargs):
return klassdict(saml.Conditions, text, **kwargs)
def attribute(self, text="", **kwargs):
return klassdict(saml.Attribute, text, **kwargs)
def attribute_value(self, text="", **kwargs):
return klassdict(saml.AttributeValue, text, **kwargs)
def persistent_id(self, entity_id, subject_id):
"""
:param entity_id: SP entity ID or VO entity ID
:param subject_id: The local identifier of the subject
:return: A arbitrary identifier for the subject unique to the
entity_id
"""
if self.debug:
self.log and self.log.debug("Id map keys: %s" % self.id_map.keys())
def attribute_statement(self, text="", **kwargs):
return klassdict(saml.AttributeStatement, text, **kwargs)
def subject_confirmation_data(self, text="", **kwargs):
return klassdict(saml.SubjectConfirmationData, text, **kwargs)
def subject_confirmation(self, text="", **kwargs):
return klassdict(saml.SubjectConfirmation, text, **kwargs)
def subject(self, text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
try:
map = self.id_map[entity_id]
except KeyError:
map = self.id_map[entity_id] = {"forward":{}, "backward":{}}
def authn_statement(self, text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
try:
if self.debug:
self.log.debug("map forward keys: %s" % map["forward"].keys())
return map["forward"][subject_id]
except KeyError:
while True:
temp_id = sid()
if temp_id not in map["backward"]:
break
map["forward"][subject_id] = temp_id
map["backward"][temp_id] = subject_id
self.id_map[entity_id]= map
self.id_map.sync()
return temp_id
def assertion(self, text="", **kwargs):
kwargs.update({
"version": VERSION,
"id" : sid(),
"issue_instant" : instant(),
})
return klassdict(saml.Assertion, text, **kwargs)
def response(self, signature=False, encrypt=False, **kwargs):
kwargs.update({
"id" : sid(),
"version": VERSION,
"issue_instant" : instant(),
})
if signature:
kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
return kwargs
def parse_authn_request(self, enc_request):
"""Parse a Authentication Request
:param enc_request: The request in its transport format
:return: A tuple of
consumer_url - as gotten from the SPs entity_id and the metadata
id - the id of the request
name_id_policy - how to chose the subjects identifier
spentityid - the entity id of the SP
"""
request_xml = decode_base64_and_inflate(enc_request)
request = samlp.authn_request_from_string(request_xml)
@@ -175,92 +301,77 @@ class Server(object):
return_destination))
print "%s != %s" % (consumer_url, return_destination)
raise OtherError("ConsumerURL and return destination mismatch")
policy = request.name_id_policy
if policy.allow_create.lower() == "true" and \
policy.format == saml.NAMEID_FORMAT_TRANSIENT:
name_id_policies = policy.format
return (consumer_url, id, name_id_policies, spentityid)
self.log and self.log.info("AuthNRequest: %s" % request)
return (consumer_url, id, request.name_id_policy, spentityid)
def allowed_issuer(self, issuer):
""" """
return True
def parse_attribute_query(self, xml_string):
query = samlp.attribute_query_from_string(xml_string)
assert query.version == VERSION
assert query.destination == self.conf["service_url"]
self.log and self.log.info(
"%s ?= %s" % (query.destination,self.conf["service"]["aa"]["url"]))
assert query.destination == self.conf["service"]["aa"]["url"]
self.allowed_issuer(query.issuer)
# verify signature
return (subject, attribute)
subject = query.subject.name_id.text
if query.attribute:
attribute = query.attribute
else:
attribute = None
return (subject, attribute, query)
def find_subject(self, subject, attribute=None):
pass
def do_attribute_statement(self, identity):
"""
:param identity: A dictionary with fiendly names as keys
:return:
"""
attrs = []
for key, val in identity.items():
dic = {}
if isinstance(val,basestring):
attrval = self.attribute_value(val)
elif isinstance(val,list):
attrval = [self.attribute_value(v) for v in val]
else:
raise OtherError("strange value type on: %s" % val)
dic["attribute_value"] = attrval
if isinstance(key, basestring):
dic["name"] = key
elif isinstance(key, tuple): # 3-tuple
(name,format,friendly) = key
if name:
dic["name"] = name
if format:
dic["name_format"] = format
if friendly:
dic["friendly_name"] = friendly
attrs.append(self.attribute(**dic))
return self.attribute_statement(attribute=attrs)
def do_sso_response(self, consumer_url, in_response_to,
sp_entity_id, identity, name_id_policies=None,
subject_id=None ):
sp_entity_id, identity, name_id=None ):
attribute_statement = self.do_attribute_statement(identity)
attr_statement = do_attribute_statement(identity)
# start using now and for a hour
conditions = self.conditions(
conds = kd_conditions(
not_before=instant(),
# an hour from now
not_on_or_after=in_a_while(0,0,0,0,0,1),
audience_restriction=self.audience_restriction(
audience=self.audience(sp_entity_id)))
audience_restriction=kd_audience_restriction(
audience=kd_audience(sp_entity_id)))
# temporary identifier or ??
subject_id = sid()
tmp = self.response(
if not name_id:
name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
tmp = kd_response(
issuer=self.issuer(),
in_response_to=in_response_to,
destination=consumer_url,
status=self.success_status(),
assertion=self.assertion(
subject = self.subject(
name_id=self.name_id(subject_id,
format=saml.NAMEID_FORMAT_TRANSIENT),
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=self.subject_confirmation(
subject_confirmation_data=self.subject_confirmation_data(
in_response_to=in_response_to))),
attribute_statement = attribute_statement,
authn_statement= self.authn_statement(
status=kd_success_status(),
assertion=kd_assertion(
attribute_statement = attr_statement,
authn_statement= kd_authn_statement(
authn_instant=instant(),
session_index=sid()),
conditions=conditions,
conditions=conds,
subject=kd_subject(
name_id=name_id,
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=kd_subject_confirmation(
subject_confirmation_data=kd_subject_confirmation_data(
in_response_to=in_response_to))),
),
)
return make_instance(samlp.Response, tmp)
def do_aa_response(self, consumer_url, in_response_to,
sp_entity_id, identity, name_id_policies=None,
subject_id=None, ip_address=""):
return do_aa_response(consumer_url, in_response_to,
sp_entity_id, identity, name_id_policies,
subject_id, ip_address, self.issuer())

View File

@@ -36,9 +36,13 @@ NAMESPACE = "http://schemas.xmlsoap.org/soap/envelope/"
def parse_soap_enveloped_saml_response(text):
expected_tag = '{%s}Response' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thing(text, expected_tag)
def parse_soap_enveloped_saml_thing(text, expected_tag):
return parse_soap_enveloped_saml_thingy(text, expected_tag)
def parse_soap_enveloped_saml_attribute_query(text):
expected_tag = '{%s}AttributeQuery' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, expected_tag)
def parse_soap_enveloped_saml_thingy(text, expected_tag):
"""Parses a SOAP enveloped SAML thing and returns the thing as
a string.
@@ -59,7 +63,7 @@ def parse_soap_enveloped_saml_thing(text, expected_tag):
else:
return ""
def make_soap_enveloped_saml_thingy(self, thingy):
def make_soap_enveloped_saml_thingy(thingy):
""" Returns a soap envelope containing a SAML request
as a text string.
@@ -86,7 +90,9 @@ class _Http(object):
self.server.add_certificate(keyfile, certfile, "")
def write(self, data):
(response, content) = self.server.request(self.path, "POST", data)
(response, content) = self.server.request(self.path,
"POST", data,
headers={"content-type": "application/soap+xml"})
if response.status == 200:
return content
else:
@@ -98,7 +104,7 @@ class SOAPClient(object):
self.server = _Http(server_url, keyfile, certfile)
def send(self, request):
soap_message = make_soap_enveloped_saml_request(request)
soap_message = make_soap_enveloped_saml_thingy(request)
response = self.server.write(soap_message)
if response:
return parse_soap_enveloped_saml_response(response)

View File

@@ -162,7 +162,7 @@ def add_duration(tid, duration):
# ---------------------------------------------------------------------------
def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
def time_in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0):
"""
format of timedelta:
@@ -173,7 +173,17 @@ def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
t = timedelta(*[days,seconds,microseconds,milliseconds,minutes,
hours,weeks])
soon = now + t
return soon.strftime(TIME_FORMAT)
return soon
def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0):
"""
format of timedelta:
timedelta([days[, seconds[, microseconds[, milliseconds[,
minutes[, hours[, weeks]]]]]]])
"""
return time_in_a_while(days, seconds, microseconds, milliseconds,
minutes, hours, weeks).strftime(TIME_FORMAT)
# ---------------------------------------------------------------------------

View File

@@ -1,5 +1,5 @@
<?xml version='1.0' encoding='UTF-8'?>
<ns0:EntitiesDescriptor name="urn:mace:umu.se:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
<ns0:EntitiesDescriptor name="urn:mace:example.com:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
@@ -15,7 +15,7 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
@@ -31,4 +31,4 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor>
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor>

View File

@@ -1,14 +1,26 @@
{
"entityid" : "urn:mace:umu.se:saml:roland:sp",
"my_name" : "urn:mace:umu.se:saml:roland:sp",
"service_url" : "http://lingon.catalogix.se:8087/",
"service": ["sp"],
"entityid" : "urn:mace:example.com:saml:roland:sp",
"service": {
"sp":{
"my_name" : "urn:mace:example.com:saml:roland:sp",
"url": "http://lingon.catalogix.se:8087/",
}
},
"debug" : 1,
"my_key" : "./mykey.pem",
"my_cert" : "./mycert.pem",
"xmlsec_binary" : "/opt/local/bin/xmlsec1",
"metadata": {
"local": ["/Users/rolandh/code/pysaml2/tests/metadata.xml"],
"local": ["./tests/metadata.xml", "./tests/vo_metadata.xml"],
},
"idp_entity_id": "urn:mace:umu.se:saml:roland:idp",
"idp":{
"entity_id": ["urn:mace:example.com:saml:roland:idp"],
},
"virtual_organization" : {
"urn:mace:example.com:it:tek":{
"nameid_format" : "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID",
"common_identifier": "umuselin",
}
},
"subject_data": "subject_data.db"
}

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from saml2.client import Saml2Client
from saml2 import samlp, client
from saml2 import samlp, client, BINDING_HTTP_POST
from saml2 import saml, utils, config
XML_RESPONSE_FILE = "tests/saml_signed.xml"
@@ -190,3 +190,45 @@ class TestClient:
idp_entry = scope.idp_list.idp_entry[0]
assert idp_entry.name == "Umeå Universitet"
assert idp_entry.loc == "https://idp.umu.se/"
def test_create_auth_request_0(self):
ar = self.client.authn_request("1",
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name")
print ar
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
def test_create_auth_request_vo(self):
assert self.client.config["virtual_organization"].keys() == [
"urn:mace:example.com:it:tek"]
ar = self.client.authn_request("1",
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name",
vo="urn:mace:example.com:it:tek")
print ar
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"

View File

@@ -4,26 +4,30 @@
from saml2.config import Config
c1 = {
"service": ["sp"],
sp1 = {
"entityid" : "urn:mace:umu.se:saml:roland:sp",
# "my_name" : "urn:mace:umu.se:saml:roland:sp",
"service_url" : "http://lingon.catalogix.se:8087/",
# "debug" : 1,
"service": {
"sp": {
"url" : "http://lingon.catalogix.se:8087/",
"name": "test",
}
},
"key_file" : "tests/mykey.pem",
"cert_file" : "tests/mycert.pem",
"xmlsec_binary" : "/opt/local/bin/xmlsec1",
"metadata": {
"local": ["tests/metadata.xml",
"tests/urn-mace-swami.se-swamid-test-1.0-metadata.xml"],
# "remote":{
# "edugain":{
# "url": "https://www.example.com/?id=edugain&set=saml2",
# "cert": "./edugain.pem",
# }
# }
# "remote":{
# "edugain":{
# "url": "https://www.example.com/?id=edugain&set=saml2",
# "cert": "./edugain.pem",
# }
# }
},
"idp" : {
"entity_id": ["urn:mace:umu.se:saml:roland:idp"],
},
"idp_entity_id": "urn:mace:umu.se:saml:roland:idp",
"virtual_organization" : {
"http://vo.example.org/biomed":{
"nameid_format" : "urn:oid:2.16.756.1.2.5.1.1.1-NameID",
@@ -34,4 +38,4 @@ c1 = {
def test_1():
c = Config()
c.load(c1)
c.load(sp1)

View File

@@ -2,9 +2,11 @@
# -*- coding: utf-8 -*-
from saml2.server import Server, OtherError, UnknownPricipal
from saml2 import server
from saml2 import samlp, saml, client, utils
from saml2.utils import make_instance
from py.test import raises
import shelve
SUCCESS_STATUS = """<?xml version=\'1.0\' encoding=\'UTF-8\'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>"""
@@ -15,67 +17,193 @@ ERROR_STATUS = """<?xml version='1.0' encoding='UTF-8'?>
def _eq(l1,l2):
return set(l1) == set(l2)
def test_status_success():
stat = server.kd_status(
status_code=server.kd_status_code(
value=samlp.STATUS_SUCCESS))
status = make_instance( samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_success_status():
stat = server.kd_success_status()
status = make_instance(samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_error_status():
stat = server.kd_status(
status_message=server.kd_status_message(
"Error resolving principal"),
status_code=server.kd_status_code(
value=samlp.STATUS_RESPONDER,
status_code=server.kd_status_code(
value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
status_text = "%s" % make_instance( samlp.Status, stat )
print status_text
assert status_text == ERROR_STATUS
def test_status_from_exception():
e = UnknownPricipal("Error resolving principal")
stat = server.kd_status_from_exception(e)
status_text = "%s" % make_instance( samlp.Status, stat )
assert status_text == ERROR_STATUS
def test_attribute_statement():
astat = server.do_attribute_statement({"surName":"Jeter",
"givenName":"Derek"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert len(attr0.attribute_value) == 1
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
assert len(attr1.attribute_value) == 1
if attr0.name == "givenName":
assert attr0.attribute_value[0].text == "Derek"
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert attr1.name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
def test_audience():
aud_restr = make_instance( saml.AudienceRestriction,
server.kd_audience_restriction(
audience=server.kd_audience("urn:foo:bar")))
assert aud_restr.keyswv() == ["audience"]
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
conds_dict = server.kd_conditions(
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=server.kd_audience_restriction(
audience=server.kd_audience("urn:foo:bar")))
conditions = make_instance(saml.Conditions, conds_dict)
assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
"audience_restriction"])
assert conditions.not_before == "2009-10-30T07:58:10.852Z"
assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
def test_value_1():
#FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
adict = server.kd_attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI)
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
def test_value_2():
adict = server.kd_attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
def test_value_3():
adict = server.kd_attribute(attribute_value="Derek",
name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name", "name_format",
"friendly_name", "attribute_value"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_value_4():
adict = server.kd_attribute(attribute_value="Derek",
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_do_attribute_statement_0():
astat = server.do_attribute_statement({"vo_attr":"foobar"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 1
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert attr0.name == "vo_attr"
assert len(attr0.attribute_value) == 1
assert attr0.attribute_value[0].text == "foobar"
def test_do_attribute_statement():
astat = server.do_attribute_statement({"surName":"Jeter",
"givenName":["Derek","Sanderson"]})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
if attr0.name == "givenName":
assert len(attr0.attribute_value) == 2
assert _eq([av.text for av in attr0.attribute_value],
["Derek","Sanderson"])
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
assert len(attr1.attribute_value) == 1
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert len(attr0.attribute_value) == 1
assert attr1.name == "givenName"
assert len(attr1.attribute_value) == 2
assert _eq([av.text for av in attr1.attribute_value],
["Derek","Sanderson"])
def test_do_attribute_statement_multi():
astat = server.do_attribute_statement(
{("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute)
assert _eq(statement.attribute[0].keyswv(),
["name","name_format","friendly_name","attribute_value"])
attribute = statement.attribute[0]
assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attribute.name_format == (
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
assert attribute.friendly_name == "eduPersonEntitlement"
def test_subject():
adict = server.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
subject = make_instance(saml.Subject, adict)
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
class TestServer():
def setup_class(self):
self.server = Server("tests/server.config")
def test_status_success(self):
stat = self.server.status(
status_code=self.server.status_code(
value=samlp.STATUS_SUCCESS))
status = make_instance( samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_success_status(self):
stat = self.server.success_status()
status = make_instance(samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_error_status(self):
stat = self.server.status(
status_message=self.server.status_message(
"Error resolving principal"),
status_code=self.server.status_code(
value=samlp.STATUS_RESPONDER,
status_code=self.server.status_code(
value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
status_text = "%s" % make_instance( samlp.Status, stat )
print status_text
assert status_text == ERROR_STATUS
def test_status_from_exception(self):
e = UnknownPricipal("Error resolving principal")
stat = self.server.status_from_exception(e)
status_text = "%s" % make_instance( samlp.Status, stat )
assert status_text == ERROR_STATUS
def test_attribute_statement(self):
astat = self.server.do_attribute_statement({"surName":"Jeter",
"givenName":"Derek"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
assert len(attr0.attribute_value) == 1
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
assert len(attr1.attribute_value) == 1
if attr0.name == "givenName":
assert attr0.attribute_value[0].text == "Derek"
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert attr1.name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
def test_issuer(self):
issuer = make_instance( saml.Issuer, self.server.issuer())
@@ -84,130 +212,16 @@ class TestServer():
assert issuer.format == saml.NAMEID_FORMAT_ENTITY
assert issuer.text == self.server.conf["entityid"]
def test_audience(self):
aud_restr = make_instance( saml.AudienceRestriction,
self.server.audience_restriction(
audience=self.server.audience("urn:foo:bar")))
assert aud_restr.keyswv() == ["audience"]
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions(self):
conds_dict = self.server.conditions(
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=self.server.audience_restriction(
audience=self.server.audience("urn:foo:bar")))
conditions = make_instance(saml.Conditions, conds_dict)
assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
"audience_restriction"])
assert conditions.not_before == "2009-10-30T07:58:10.852Z"
assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
def test_value_1(self):
#FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
adict = self.server.attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI)
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
def test_value_2(self):
adict = self.server.attribute(name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
def test_value_3(self):
adict = self.server.attribute(attribute_value="Derek",
name="urn:oid:2.5.4.42",
name_format=saml.NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name", "name_format",
"friendly_name", "attribute_value"])
assert attribute.name == "urn:oid:2.5.4.42"
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_value_4(self):
adict = self.server.attribute(attribute_value="Derek",
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
assert attribute.friendly_name == "givenName"
assert len(attribute.attribute_value) == 1
assert attribute.attribute_value[0].text == "Derek"
def test_do_attribute_statement(self):
astat = self.server.do_attribute_statement({"surName":"Jeter",
"givenName":["Derek","Sanderson"]})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
assert _eq(attr0.keyswv(), ["name","attribute_value"])
attr1 = statement.attribute[1]
assert _eq(attr1.keyswv(), ["name","attribute_value"])
if attr0.name == "givenName":
assert len(attr0.attribute_value) == 2
assert _eq([av.text for av in attr0.attribute_value],
["Derek","Sanderson"])
assert attr1.name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
assert len(attr1.attribute_value) == 1
else:
assert attr0.name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
assert len(attr0.attribute_value) == 1
assert attr1.name == "givenName"
assert len(attr1.attribute_value) == 2
assert _eq([av.text for av in attr1.attribute_value],
["Derek","Sanderson"])
def test_do_attribute_statement_multi(self):
astat = self.server.do_attribute_statement(
{("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"})
statement = make_instance(saml.AttributeStatement,astat)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute)
assert _eq(statement.attribute[0].keyswv(),
["name","name_format","friendly_name","attribute_value"])
attribute = statement.attribute[0]
assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attribute.name_format == (
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
assert attribute.friendly_name == "eduPersonEntitlement"
def test_subject(self):
adict = self.server.subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT)
subject = make_instance(saml.Subject, adict)
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_assertion(self):
tmp = self.server.assertion(
subject= self.server.subject("_aaa",
tmp = server.kd_assertion(
subject= server.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = self.server.attribute_statement(
attribute_statement = server.kd_attribute_statement(
attribute=[
self.server.attribute(attribute_value="Derek",
server.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
self.server.attribute(attribute_value="Jeter",
server.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -217,7 +231,7 @@ class TestServer():
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
assert assertion.issuer.text == "urn:mace:umu.se:saml:roland:sp"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:sp"
#
assert len(assertion.attribute_statement) == 1
attribute_statement = assertion.attribute_statement[0]
@@ -240,17 +254,17 @@ class TestServer():
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
tmp = self.server.response(
tmp = server.kd_response(
in_response_to="_012345",
destination="https:#www.example.com",
status=self.server.success_status(),
assertion=self.server.assertion(
subject = self.server.subject("_aaa",
status=server.kd_success_status(),
assertion=server.kd_assertion(
subject = server.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = self.server.attribute_statement([
self.server.attribute(attribute_value="Derek",
attribute_statement = server.kd_attribute_statement([
server.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
self.server.attribute(attribute_value="Jeter",
server.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -264,7 +278,7 @@ class TestServer():
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:umu.se:saml:roland:sp"
assert response.issuer.text == "urn:mace:example.com:saml:roland:sp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
@@ -273,11 +287,12 @@ class TestServer():
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
authn_request = client.d_authn_request(
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:umu.se:saml:roland:sp",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
@@ -286,11 +301,12 @@ class TestServer():
raises(OtherError,self.server.parse_authn_request,intermed)
def test_parse_faulty_request_to_err_status(self):
authn_request = client.d_authn_request(
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:umu.se:saml:roland:sp",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
@@ -301,7 +317,7 @@ class TestServer():
except OtherError, oe:
print oe.args
status = utils.make_instance(samlp.Status,
self.server.status_from_exception(oe))
server.kd_status_from_exception(oe))
assert status
print status
@@ -314,28 +330,31 @@ class TestServer():
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
authn_request = client.d_authn_request(
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://localhost:8087/",
spentityid = "urn:mace:umu.se:saml:roland:sp",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
print authn_request
intermed = utils.deflate_and_base64_encode("%s" % authn_request)
(consumer_url, id, name_id_policies,
(consumer_url, id, name_id_policy,
sp) = self.server.parse_authn_request(intermed)
assert consumer_url == "http://localhost:8087/"
assert id == "1"
assert name_id_policies == saml.NAMEID_FORMAT_TRANSIENT
assert sp == "urn:mace:umu.se:saml:roland:sp"
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert sp == "urn:mace:example.com:saml:roland:sp"
def test_sso_response(self):
resp = self.server.do_sso_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
"urn:mace:umu.se:saml:roland:sp", # sp_entity_id
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"}
@@ -344,7 +363,7 @@ class TestServer():
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id'])
'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
@@ -363,3 +382,12 @@ class TestServer():
print confirmation.subject_confirmation_data
assert confirmation.subject_confirmation_data.in_response_to == "12"
def test_persistence_0(self):
pid1 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
pid2 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
print pid1, pid2
assert pid1 == pid2

View File

@@ -1,15 +1,68 @@
#!/usr/bin/env python
import os
from saml2 import utils, md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP
from saml2.time_util import in_a_while
def do_sp_sso_descriptor(sp, cert):
return {
"protocol_support_enumeration": samlp.NAMESPACE,
"want_assertions_signed": True,
"authn_requests_signed": False,
"assertion_consumer_service": {
"binding": BINDING_HTTP_POST ,
"location": sp["url"],
"index": 0,
},
"key_descriptor":{
"key_info": {
"x509_data": {
"x509_certificate": cert
}
}
},
}
def do_idp_sso_descriptor(idp, cert):
return {
"protocol_support_enumeration": samlp.NAMESPACE,
"want_authn_requests_signed": True,
"single_sign_on_service": {
"binding": BINDING_HTTP_REDIRECT ,
"location": idp["url"],
},
"key_descriptor":{
"key_info": {
"x509_data": {
"x509_certificate": cert
}
}
},
}
def do_aa_descriptor(aa, cert):
return {
"protocol_support_enumeration": samlp.NAMESPACE,
"attribute_service": {
"binding": BINDING_SOAP ,
"location": aa["url"],
},
"key_descriptor":{
"key_info": {
"x509_data": {
"x509_certificate": cert
}
}
},
}
def entity_descriptor(confd):
mycert = "".join(open(confd["cert_file"]).readlines()[1:-1])
ed = {
"name": "http://%s/saml/test" % os.uname()[1],
"valid_until": in_a_while(days=30),
"valid_until": in_a_while(hours=96),
"entity_id": confd["entityid"],
}
@@ -33,46 +86,21 @@ def entity_descriptor(confd):
if "sp" in confd["service"]:
# The SP
ed["sp_sso_descriptor"] = {
"protocol_support_enumeration": samlp.NAMESPACE,
"want_assertions_signed": True,
"authn_requests_signed": False,
"assertion_consumer_service": {
"binding": BINDING_HTTP_POST ,
"location": confd["service_url"],
"index": 0,
},
"key_descriptor":{
"key_info": {
"x509_data": {
"x509_certificate": mycert
}
}
},
}
elif "idp" in confd["service"]:
ed["idp_sso_descriptor"] = {
"protocol_support_enumeration": samlp.NAMESPACE,
"want_authn_requests_signed": True,
"single_sign_on_service": {
"binding": BINDING_HTTP_REDIRECT ,
"location": confd["service_url"],
},
"key_descriptor":{
"key_info": {
"x509_data": {
"x509_certificate": mycert
}
}
},
}
ed["sp_sso_descriptor"] = do_sp_sso_descriptor(confd["service"]["sp"],
mycert)
if "idp" in confd["service"]:
ed["idp_sso_descriptor"] = do_idp_sso_descriptor(
confd["service"]["idp"],mycert)
if "aa" in confd["service"]:
ed["attribute_authority_descriptor"] = do_aa_descriptor(
confd["service"]["aa"],mycert)
return ed
def entities_descriptor(eds):
return utils.make_instance(md.EntitiesDescriptor,{
"name": "urn:mace:umu.se:saml:test",
"valid_until": in_a_while(30),
"valid_until": in_a_while(hours=96),
"entity_descriptor": eds})
if __name__ == "__main__":