Even more refactoring and the FakeIDP for test purposes.

This commit is contained in:
Roland Hedberg 2012-12-31 16:14:49 +01:00
parent 7127aa55da
commit 0bf0f89af6
8 changed files with 337 additions and 114 deletions

View File

@ -5,8 +5,10 @@ import logging
#from cgi import parse_qs #from cgi import parse_qs
from urlparse import parse_qs from urlparse import parse_qs
from saml2.httputil import Unauthorized, NotFound, BadRequest
from saml2.httputil import ServiceError
from saml2.httputil import Response
from saml2.pack import http_form_post_message from saml2.pack import http_form_post_message
from saml2.s_utils import OtherError
from saml2.saml import AUTHN_PASSWORD from saml2.saml import AUTHN_PASSWORD
from saml2 import server from saml2 import server
from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST
@ -71,6 +73,7 @@ FORM_SPEC = """<form name="myform" method="post" action="%s">
<input type="hidden" name="RelayState" value="%s" /> <input type="hidden" name="RelayState" value="%s" />
</form>""" </form>"""
def sso(environ, start_response, user): def sso(environ, start_response, user):
""" Supposed to return a self issuing Form POST """ """ Supposed to return a self issuing Form POST """
#edict = dict_to_table(environ) #edict = dict_to_table(environ)
@ -85,16 +88,16 @@ def sso(environ, start_response, user):
query = environ["s2repoze.qinfo"] query = environ["s2repoze.qinfo"]
if not query: if not query:
start_response('401 Unauthorized', [('Content-Type', 'text/plain')]) resp = Unauthorized('Unknown user')
return ['Unknown user'] return resp(environ, start_response)
# base 64 encoded request # base 64 encoded request
# Assume default binding, that is HTTP-redirect # Assume default binding, that is HTTP-redirect
req = IDP.parse_authn_request(query["SAMLRequest"][0]) req = IDP.parse_authn_request(query["SAMLRequest"][0])
if req is None: if req is None:
start_response("500", [('Content-Type', 'text/plain')]) resp = ServiceError("Failed to parse the SAML request")
return ["Failed to parse the SAML request"] return resp(environ, start_response)
logger.info("parsed OK") logger.info("parsed OK")
logger.info("%s" % req) logger.info("%s" % req)
@ -112,7 +115,7 @@ def sso(environ, start_response, user):
_binding = req.message.protocol_binding _binding = req.message.protocol_binding
try: try:
resp_args = IDP.response_args(req.message, [_binding]) resp_args = IDP.response_args(req.message, [_binding], "spsso")
except Exception: except Exception:
raise raise
@ -121,7 +124,8 @@ def sso(environ, start_response, user):
# serious error on someones behalf # serious error on someones behalf
logger.error("%s != %s" % (req.message.assertion_consumer_service_url, logger.error("%s != %s" % (req.message.assertion_consumer_service_url,
resp_args["destination"])) resp_args["destination"]))
raise OtherError("ConsumerURL and return destination mismatch") resp = BadRequest("ConsumerURL and return destination mismatch")
raise resp(environ, start_response)
try: try:
authn_resp = IDP.create_authn_response(identity, userid, authn=AUTHN, authn_resp = IDP.create_authn_response(identity, userid, authn=AUTHN,
@ -134,43 +138,43 @@ def sso(environ, start_response, user):
http_args = http_form_post_message(authn_resp, resp_args["destination"], http_args = http_form_post_message(authn_resp, resp_args["destination"],
relay_state=query["RelayState"]) relay_state=query["RelayState"])
start_response('200 OK', http_args["headers"])
return http_args["data"] resp = Response(http_args["data"], headers=http_args["headers"])
return resp(environ, start_response)
def whoami(environ, start_response, user): def whoami(environ, start_response, user):
start_response('200 OK', [('Content-Type', 'text/html')])
identity = environ["repoze.who.identity"].copy() identity = environ["repoze.who.identity"].copy()
for prop in ["login", "password"]: for prop in ["login", "password"]:
try: try:
del identity[prop] del identity[prop]
except KeyError: except KeyError:
continue continue
response = dict_to_table(identity) response = Response(dict_to_table(identity))
return response[:] return response(environ, start_response)
def not_found(environ, start_response): def not_found(environ, start_response):
"""Called if no URL matches.""" """Called if no URL matches."""
start_response('404 NOT FOUND', [('Content-Type', 'text/plain')]) resp = NotFound('Not Found')
return ['Not Found'] return resp(environ, start_response)
def not_authn(environ, start_response): def not_authn(environ, start_response):
if "QUERY_STRING" in environ: if "QUERY_STRING" in environ:
query = parse_qs(environ["QUERY_STRING"]) query = parse_qs(environ["QUERY_STRING"])
logger.info("query: %s" % query) logger.info("query: %s" % query)
start_response('401 Unauthorized', [('Content-Type', 'text/plain')]) resp = Unauthorized('Unknown user')
return ['Unknown user'] return resp(environ, start_response)
def slo(environ, start_response, user): def slo(environ, start_response, user):
""" Expects a HTTP-redirect logout request """ """ Expects a HTTP-redirect logout request """
query = None query = None
if "QUERY_STRING" in environ: if "QUERY_STRING" in environ:
if logger: logger.info("Query string: %s" % environ["QUERY_STRING"]) logger.info("Query string: %s" % environ["QUERY_STRING"])
query = parse_qs(environ["QUERY_STRING"]) query = parse_qs(environ["QUERY_STRING"])
if not query: if not query:
start_response('401 Unauthorized', [('Content-Type', 'text/plain')]) resp = Unauthorized('Unknown user')
return ['Unknown user'] return resp(environ, start_response)
try: try:
req_info = IDP.parse_logout_request(query["SAMLRequest"][0], req_info = IDP.parse_logout_request(query["SAMLRequest"][0],
@ -178,9 +182,9 @@ def slo(environ, start_response, user):
logger.info("LOGOUT request parsed OK") logger.info("LOGOUT request parsed OK")
logger.info("REQ_INFO: %s" % req_info.message) logger.info("REQ_INFO: %s" % req_info.message)
except KeyError, exc: except KeyError, exc:
if logger: logger.info("logout request error: %s" % (exc,)) logger.info("logout request error: %s" % (exc,))
start_response('400 Bad request', [('Content-Type', 'text/plain')]) resp = BadRequest('Request parse error')
return ['Request parse error'] return resp(environ, start_response)
# look for the subject # look for the subject
subject = req_info.subject_id() subject = req_info.subject_id()
@ -203,18 +207,18 @@ def slo(environ, start_response, user):
query["RelayState"], "SAMLResponse") query["RelayState"], "SAMLResponse")
except Exception, exc: except Exception, exc:
start_response('400 Bad request', [('Content-Type', 'text/plain')]) resp = BadRequest('%s' % exc)
return ['%s' % exc] return resp(environ, start_response)
delco = delete_cookie(environ, "pysaml2idp") delco = delete_cookie(environ, "pysaml2idp")
if delco: if delco:
http_args["headers"].append(delco) http_args["headers"].append(delco)
if binding == BINDING_HTTP_POST: if binding == BINDING_HTTP_POST:
start_response("200 OK", http_args["headers"]) resp = Response(http_args["data"], headers=http_args["headers"])
else: else:
start_response("302 Found", http_args["headers"]) resp = NotFound(http_args["data"], headers=http_args["headers"])
return http_args["data"] return resp(environ, start_response)
def delete_cookie(environ, name): def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '') kaka = environ.get("HTTP_COOKIE", '')

View File

@ -1,9 +1,16 @@
#!/usr/bin/env python #!/usr/bin/env python
from Cookie import SimpleCookie
import logging import logging
import re import re
from urlparse import parse_qs from urlparse import parse_qs
from saml2 import BINDING_HTTP_REDIRECT from example.idp.idp import delete_cookie
from saml2 import BINDING_HTTP_REDIRECT, time_util
from saml2.httputil import Response
from saml2.httputil import Unauthorized
from saml2.httputil import NotFound
from saml2.httputil import Redirect
from saml2.httputil import ServiceError
logger = logging.getLogger("saml2.SP") logger = logging.getLogger("saml2.SP")
@ -48,6 +55,26 @@ def dict_to_table(ava, lev=0, width=1):
txt.append('</table>\n') txt.append('</table>\n')
return txt return txt
def _expiration(timeout, format=None):
if timeout == "now":
return time_util.instant(format)
else:
# validity time should match lifetime of assertions
return time_util.in_a_while(minutes=timeout, format=format)
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = morsel
cookie[name]["expires"] =\
_expiration("now", "%a, %d-%b-%Y %H:%M:%S CET")
return tuple(cookie.output().split(": ", 1))
return None
# ----------------------------------------------------------------------------
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def whoami(environ, start_response, user): def whoami(environ, start_response, user):
@ -57,50 +84,56 @@ def whoami(environ, start_response, user):
response = ["<h2>Your identity are supposed to be</h2>"] response = ["<h2>Your identity are supposed to be</h2>"]
response.extend(dict_to_table(identity)) response.extend(dict_to_table(identity))
response.extend("<a href='logout'>Logout</a>") response.extend("<a href='logout'>Logout</a>")
start_response('200 OK', [('Content-Type', 'text/html')]) resp = Response(response)
return response[:] return resp(environ, start_response)
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def not_found(environ, start_response): def not_found(environ, start_response):
"""Called if no URL matches.""" """Called if no URL matches."""
start_response('404 NOT FOUND', [('Content-Type', 'text/plain')]) resp = NotFound('Not Found')
return ['Not Found'] return resp(environ, start_response)
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def not_authn(environ, start_response): def not_authn(environ, start_response):
start_response('401 Unauthorized', [('Content-Type', 'text/plain')]) resp = Unauthorized('Unknown user')
return ['Unknown user'] return resp(environ, start_response)
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def slo(environ, start_response, user): def slo(environ, start_response, user):
# so here I might get either a LogoutResponse or a LogoutRequest # so here I might get either a LogoutResponse or a LogoutRequest
client = environ['repoze.who.plugins']["saml2auth"] client = environ['repoze.who.plugins']["saml2auth"]
sc = client.saml_client sc = client.saml_client
sids = None
if "QUERY_STRING" in environ: if "QUERY_STRING" in environ:
query = parse_qs(environ["QUERY_STRING"]) query = parse_qs(environ["QUERY_STRING"])
logger.info("query: %s" % query) logger.info("query: %s" % query)
try: try:
response = sc.logout_request_response(query["SAMLResponse"][0], response = sc.parse_logout_request_response(query["SAMLResponse"][0],
binding=BINDING_HTTP_REDIRECT) binding=BINDING_HTTP_REDIRECT)
if response: if response:
logger.info("LOGOUT response parsed OK") logger.info("LOGOUT response parsed OK")
except KeyError: except KeyError:
# return error reply # return error reply
pass response = None
if response is None: if response is None:
request = sc.lo request = sc.lo
if not sids:
start_response("302 Found", [("Location", "/done")]) headers = [("Location", "/done")]
return ["Successfull Logout"] delco = delete_cookie(environ, "pysaml2")
if delco:
headers.append(delco)
resp = Redirect("Successful Logout", headers=headers)
return resp(environ, start_response)
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def logout(environ, start_response, user): def logout(environ, start_response, user):
# This is where it starts when a user wants to log out
client = environ['repoze.who.plugins']["saml2auth"] client = environ['repoze.who.plugins']["saml2auth"]
subject_id = environ["repoze.who.identity"]['repoze.who.userid'] subject_id = environ["repoze.who.identity"]['repoze.who.userid']
logger.info("[logout] subject_id: '%s'" % (subject_id,)) logger.info("[logout] subject_id: '%s'" % (subject_id,))
target = "/done" target = "/done"
# What if more than one # What if more than one
tmp = client.saml_client.global_logout(subject_id) tmp = client.saml_client.global_logout(subject_id)
logger.info("[logout] global_logout > %s" % (tmp,)) logger.info("[logout] global_logout > %s" % (tmp,))
@ -110,12 +143,13 @@ def logout(environ, start_response, user):
start_response(code, header) start_response(code, header)
return result return result
else: # All was done using SOAP else: # All was done using SOAP
if result: if result:
start_response("302 Found", [("Location", target)]) resp = Redirect("Successful Logout", headers=[("Location", target)])
return ["Successful Logout"] return resp(environ, start_response)
else: else:
resp = ServiceError("Failed to logout from identity services")
start_response("500 Internal Server Error") start_response("500 Internal Server Error")
return ["Failed to logout from identity services"] return []
#noinspection PyUnusedLocal #noinspection PyUnusedLocal
def done(environ, start_response, user): def done(environ, start_response, user):

View File

@ -38,6 +38,7 @@ from repoze.who.interfaces import IMetadataProvider
from repoze.who.plugins.form import FormPluginBase from repoze.who.plugins.form import FormPluginBase
from saml2 import ecp from saml2 import ecp
from saml2 import BINDING_HTTP_POST
from saml2.client import Saml2Client from saml2.client import Saml2Client
from saml2.discovery import discovery_service_response from saml2.discovery import discovery_service_response
@ -339,8 +340,10 @@ class SAML2Plugin(FormPluginBase):
try: try:
# Evaluate the response, returns a AuthnResponse instance # Evaluate the response, returns a AuthnResponse instance
try: try:
authresp = self.saml_client.authn_request_response(post, authresp = self.saml_client.parse_authn_request_response(
self.outstanding_queries) post["SAMLResponse"],
BINDING_HTTP_POST,
self.outstanding_queries)
except Exception, excp: except Exception, excp:
logger.exception("Exception: %s" % (excp,)) logger.exception("Exception: %s" % (excp,))
raise raise

View File

@ -239,7 +239,7 @@ class Saml2Client(Base):
def _use_soap(self, destination, query_type, **kwargs): def _use_soap(self, destination, query_type, **kwargs):
_create_func = getattr(self, "create_%s" % query_type) _create_func = getattr(self, "create_%s" % query_type)
_response_func = getattr(self, "%s_response" % query_type) _response_func = getattr(self, "parse_%s_response" % query_type)
try: try:
response_args = kwargs["response_args"] response_args = kwargs["response_args"]
del kwargs["response_args"] del kwargs["response_args"]

View File

@ -22,10 +22,13 @@ from saml2.entity import Entity
from saml2.mdstore import destinations from saml2.mdstore import destinations
from saml2.saml import AssertionIDRef, NAMEID_FORMAT_TRANSIENT from saml2.saml import AssertionIDRef, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import AuthnQuery, ArtifactResponse, StatusCode, Status from saml2.samlp import AuthnQuery
from saml2.samlp import ArtifactResponse
from saml2.samlp import StatusCode
from saml2.samlp import Status
from saml2.samlp import Response
from saml2.samlp import ArtifactResolve from saml2.samlp import ArtifactResolve
from saml2.samlp import artifact_resolve_from_string from saml2.samlp import artifact_resolve_from_string
from saml2.samlp import LogoutRequest
from saml2.samlp import AssertionIDRequest from saml2.samlp import AssertionIDRequest
from saml2.samlp import NameIDMappingRequest from saml2.samlp import NameIDMappingRequest
from saml2.samlp import AttributeQuery from saml2.samlp import AttributeQuery
@ -34,7 +37,6 @@ from saml2.samlp import AuthnRequest
import saml2 import saml2
import time import time
import base64
from saml2.soap import parse_soap_enveloped_saml_artifact_resolve from saml2.soap import parse_soap_enveloped_saml_artifact_resolve
try: try:
@ -43,24 +45,17 @@ except ImportError:
# Compatibility with Python <= 2.5 # Compatibility with Python <= 2.5
from cgi import parse_qs from cgi import parse_qs
from saml2.time_util import instant from saml2.s_utils import signature
from saml2.s_utils import signature, rndstr
from saml2.s_utils import sid
from saml2.s_utils import do_attributes from saml2.s_utils import do_attributes
from saml2.s_utils import decode_base64_and_inflate
from saml2 import samlp, saml, class_name from saml2 import samlp, BINDING_SOAP
from saml2 import VERSION from saml2 import saml
from saml2.sigver import pre_signature_part
from saml2.sigver import signed_instance_factory
from saml2.population import Population from saml2.population import Population
from saml2.response import response_factory, attribute_response from saml2.response import AttributeResponse
from saml2.response import LogoutResponse
from saml2.response import AuthnResponse from saml2.response import AuthnResponse
from saml2 import BINDING_HTTP_REDIRECT from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_PAOS, element_to_extension_element from saml2 import BINDING_PAOS, element_to_extension_element
import logging import logging
@ -490,45 +485,39 @@ class Base(Entity):
# ======== response handling =========== # ======== response handling ===========
def parse_authn_request_response(self, post, outstanding, decode=True, def parse_authn_request_response(self, xmlstr, binding, outstanding):
asynchop=True):
""" Deal with an AuthnResponse """ Deal with an AuthnResponse
:param post: The reply as a dictionary :param xmlstr: The reply as a xml string
:param binding: Which binding that was used for the transport
:param outstanding: A dictionary with session IDs as keys and :param outstanding: A dictionary with session IDs as keys and
the original web request from the user before redirection the original web request from the user before redirection
as values. as values.
:param decode: Whether the response is Base64 encoded or not :return: An response.AuthnResponse
:param asynchop: Whether the response was return over a asynchronous
connection. SOAP for instance is synchronous
:return: An response.AuthnResponse or response.LogoutResponse instance
""" """
# If the request contains a samlResponse, try to validate it
try:
saml_response = post['SAMLResponse']
except KeyError:
return None
try: try:
_ = self.config.entityid _ = self.config.entityid
except KeyError: except KeyError:
raise Exception("Missing entity_id specification") raise Exception("Missing entity_id specification")
reply_addr = self.service_url()
resp = None resp = None
if saml_response: if xmlstr:
kwargs = {"outstanding_queries": outstanding,
"allow_unsolicited": self.allow_unsolicited,
"return_addr": self.service_url(),
"entity_id": self.config.entityid,
"attribute_converters": self.config.attribute_converters}
try: try:
resp = response_factory(saml_response, self.config, resp = self._parse_response(xmlstr, AuthnResponse,
reply_addr, outstanding, decode=decode, "assertion_consumer_service",
asynchop=asynchop, binding, **kwargs)
allow_unsolicited=self.allow_unsolicited)
except Exception, exc: except Exception, exc:
logger.error("%s" % exc) logger.error("%s" % exc)
return None return None
logger.debug(">> %s", resp) logger.debug(">> %s", resp)
resp = resp.verify()
if isinstance(resp, AuthnResponse): if isinstance(resp, AuthnResponse):
self.users.add_information_about_person(resp.session_info()) self.users.add_information_about_person(resp.session_info())
logger.info("--- ADDED person info ----") logger.info("--- ADDED person info ----")
@ -537,47 +526,44 @@ class Base(Entity):
saml2.class_name(resp),)) saml2.class_name(resp),))
return resp return resp
#noinspection PyUnusedLocal # ------------------------------------------------------------------------
def parse_authz_decision_query_response(self, response): # SubjectQuery, AuthnQuery, RequestedAuthnContext, AttributeQuery,
# AuthzDecisionQuery all get Response as response
def parse_authz_decision_query_response(self, response,
binding=BINDING_SOAP):
""" Verify that the response is OK """ Verify that the response is OK
""" """
resp = samlp.response_from_string(response)
return resp
def parse_assertion_id_request_response(self, response): return self._parse_response(response, Response, "", binding)
def parse_authn_query_response(self, response, binding=BINDING_SOAP):
""" Verify that the response is OK """ Verify that the response is OK
""" """
resp = samlp.response_from_string(response) return self._parse_response(response, Response, "", binding)
return resp
def parse_authn_query_response(self, response): def parse_assertion_id_request_response(self, response, binding):
""" Verify that the response is OK """ Verify that the response is OK
""" """
resp = samlp.response_from_string(response) return self._parse_response(response, Response, "", binding)
return resp
def parse_attribute_query_response(self, response, **kwargs): # ------------------------------------------------------------------------
try:
# synchronous operation
aresp = attribute_response(self.config, self.config.entityid)
except Exception, exc:
logger.error("%s", (exc,))
return None
_resp = aresp.loads(response, False, response).verify() def parse_attribute_query_response(self, response, binding):
if _resp is None:
logger.error("Didn't like the response")
return None
session_info = _resp.session_info() response = self._parse_response(response, AttributeResponse,
"attribute_consuming_service", binding)
if session_info: # session_info = response.session_info()
if "real_id" in kwargs: #
session_info["name_id"] = kwargs["real_id"] # if session_info:
self.users.add_information_about_person(session_info) # if "real_id" in kwargs:
# session_info["name_id"] = kwargs["real_id"]
# self.users.add_information_about_person(session_info)
#
# logger.info("session: %s" % session_info)
# return session_info
logger.info("session: %s" % session_info)
return session_info
def parse_artifact_resolve_response(self, txt, **kwargs): def parse_artifact_resolve_response(self, txt, **kwargs):
""" """

View File

@ -39,7 +39,10 @@ class Response(object):
mte = self.mako_lookup.get_template(self.mako_template) mte = self.mako_lookup.get_template(self.mako_template)
return [mte.render(**argv)] return [mte.render(**argv)]
else: else:
return [message] if isinstance(message, basestring):
return [message]
else:
return message
class Created(Response): class Created(Response):
_status = "201 Created" _status = "201 Created"
@ -130,3 +133,26 @@ def getpath(environ):
"""Builds a path.""" """Builds a path."""
return ''.join([quote(environ.get('SCRIPT_NAME', '')), return ''.join([quote(environ.get('SCRIPT_NAME', '')),
quote(environ.get('PATH_INFO', ''))]) quote(environ.get('PATH_INFO', ''))])
def get_post(environ):
# the environment variable CONTENT_LENGTH may be empty or missing
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except ValueError:
request_body_size = 0
# When the method is POST the query string will be sent
# in the HTTP request body which is passed by the WSGI server
# in the file like wsgi.input environment variable.
return environ['wsgi.input'].read(request_body_size)
def get_response(environ, start_response):
if environ.get("REQUEST_METHOD") == "GET":
query = environ.get("QUERY_STRING")
elif environ.get("REQUEST_METHOD") == "POST":
query = get_post(environ)
else:
resp = BadRequest("Unsupported method")
return resp(environ, start_response)
return query

167
tests/fakeIDP.py Normal file
View File

@ -0,0 +1,167 @@
from urlparse import parse_qs
from saml2.saml import AUTHN_PASSWORD
from saml2.samlp import attribute_query_from_string, logout_request_from_string
from saml2 import BINDING_HTTP_REDIRECT, pack
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
from saml2.server import Server
from saml2.soap import parse_soap_enveloped_saml_attribute_query
from saml2.soap import parse_soap_enveloped_saml_logout_request
from saml2.soap import make_soap_enveloped_saml_thingy
__author__ = 'rolandh'
TYP = {
"GET": [BINDING_HTTP_REDIRECT],
"POST": [BINDING_HTTP_POST, BINDING_SOAP]
}
def unpack_form(_str, ver="SAMLRequest"):
SR_STR = "name=\"%s\" value=\"" % ver
RS_STR = 'name="RelayState" value="'
i = _str.find(SR_STR)
i += len(SR_STR)
j = _str.find('"', i)
sr = _str[i:j]
k = _str.find(RS_STR, j)
k += len(RS_STR)
l = _str.find('"', k)
rs = _str[k:l]
return {ver:sr, "RelayState":rs}
class FakeIDP(Server):
def __init__(self, config_file=""):
Server.__init__(self, config_file)
#self.sign = False
def receive(self, url, method="GET", **kwargs):
"""
Interface to receive HTTP calls on
:param url:
:param method:
:param kwargs:
:return:
"""
if method == "GET":
path, query = url.split("?")
qs_dict = parse_qs(kwargs["data"])
else:
path = url
qs_dict = parse_qs(kwargs["data"])
req = qs_dict["SAMLRequest"][0]
rstate = qs_dict["RelayState"][0]
response = ""
# Get service from path
for key, vals in self.config.getattr("endpoints", "idp").items():
for endp, binding in vals:
if path == endp:
assert binding in TYP[method]
if key == "single_sign_on_service":
return self.authn_request_endpoint(req, binding,
rstate)
elif key == "single_logout_service":
return self.logout_endpoint(req, binding)
for key, vals in self.config.getattr("endpoints", "aa").items():
for endp, binding in vals:
if path == endp:
assert binding in TYP[method]
if key == "attribute_service":
return self.attribute_query_endpoint(req, binding)
return response
def authn_request_endpoint(self, req, binding, relay_state):
req = self.parse_authn_request(req, binding)
if req.message.protocol_binding == BINDING_HTTP_REDIRECT:
_binding = BINDING_HTTP_POST
else:
_binding = req.message.protocol_binding
try:
resp_args = self.response_args(req.message, [_binding], "spsso")
except Exception:
raise
identity = { "surName":"Hedberg", "givenName": "Roland",
"title": "supertramp", "mail": "roland@example.com"}
userid = "Pavill"
authn_resp = self.create_authn_response(identity,
userid=userid,
authn=(AUTHN_PASSWORD,
"http://www.example.com/login"),
**resp_args)
response = "%s" % authn_resp
return pack.factory(_binding, response,
resp_args["destination"], relay_state,
"SAMLResponse")
def attribute_query_endpoint(self, xml_str, binding):
if binding == BINDING_SOAP:
_str = parse_soap_enveloped_saml_attribute_query(xml_str)
else:
_str = xml_str
aquery = attribute_query_from_string(_str)
extra = {"eduPersonAffiliation": "faculty"}
userid = "Pavill"
name_id = aquery.subject.name_id
attr_resp = self.create_aa_response(aquery.id,
None,
sp_entity_id=aquery.issuer.text,
identity=extra, name_id=name_id,
attributes=aquery.attribute)
if binding == BINDING_SOAP:
# SOAP packing
#headers = {"content-type": "application/soap+xml"}
soap_message = make_soap_enveloped_saml_thingy(attr_resp)
# if self.sign and self.sec:
# _signed = self.sec.sign_statement_using_xmlsec(soap_message,
# class_name(attr_resp),
# nodeid=attr_resp.id)
# soap_message = _signed
response = "%s" % soap_message
else: # Just POST
response = "%s" % attr_resp
return response
def logout_endpoint(self, xml_str, binding):
if binding == BINDING_SOAP:
_str = parse_soap_enveloped_saml_logout_request(xml_str)
else:
_str = xml_str
req = logout_request_from_string(_str)
_resp = self.create_logout_response(req, binding)
if binding == BINDING_SOAP:
# SOAP packing
#headers = {"content-type": "application/soap+xml"}
soap_message = make_soap_enveloped_saml_thingy(_resp)
# if self.sign and self.sec:
# _signed = self.sec.sign_statement_using_xmlsec(soap_message,
# class_name(attr_resp),
# nodeid=attr_resp.id)
# soap_message = _signed
response = "%s" % soap_message
else: # Just POST
response = "%s" % _resp
return response

View File

@ -262,8 +262,9 @@ class TestClient:
resp_str = base64.encodestring(resp_str) resp_str = base64.encodestring(resp_str)
authn_response = self.client.authn_request_response({"SAMLResponse":resp_str}, authn_response = self.client.parse_authn_request_response(
{"id1":"http://foo.example.com/service"}) resp_str, BINDING_HTTP_POST,
{"id1":"http://foo.example.com/service"})
assert authn_response is not None assert authn_response is not None
assert authn_response.issuer() == IDP assert authn_response.issuer() == IDP
@ -303,7 +304,7 @@ class TestClient:
resp_str = base64.encodestring(resp_str) resp_str = base64.encodestring(resp_str)
self.client.authn_request_response({"SAMLResponse":resp_str}, self.client.parse_authn_request_response(resp_str, BINDING_HTTP_POST,
{"id2":"http://foo.example.com/service"}) {"id2":"http://foo.example.com/service"})
# Two persons in the cache # Two persons in the cache
@ -407,7 +408,9 @@ class TestClientWithDummy():
response = self.client.send(**http_args) response = self.client.send(**http_args)
_dic = unpack_form(response["data"][3], "SAMLResponse") _dic = unpack_form(response["data"][3], "SAMLResponse")
resp = self.client.authn_request_response(_dic, {id: "/"}) resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
BINDING_HTTP_POST,
{id: "/"})
ac = resp.assertion.authn_statement[0].authn_context ac = resp.assertion.authn_statement[0].authn_context
assert ac.authenticating_authority[0].text == 'http://www.example.com/login' assert ac.authenticating_authority[0].text == 'http://www.example.com/login'
assert ac.authn_context_class_ref.text == AUTHN_PASSWORD assert ac.authn_context_class_ref.text == AUTHN_PASSWORD