Added support for encrypted assertions

This commit is contained in:
Roland Hedberg
2009-10-27 17:01:41 +01:00
parent 724c0a0d2b
commit 9a39c0e745

View File

@@ -3,17 +3,23 @@ import urllib
import saml2
import base64
import time
import re
try:
from hashlib import md5
except ImportError:
from md5 import md5
import zlib
from saml2 import samlp, saml
from saml2.sigver import correctly_signed_response
from saml2 import samlp, saml, extension_element_to_element
from saml2.sigver import correctly_signed_response, decrypt
from saml2.soap import SOAPClient
DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT
# 2009-07-05T15:35:29Z
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# 2009-10-27T09:00:27.604Z
TIME_FORMAT_WITH_FRAGMENT = re.compile(
"^(\d{4,4}-\d{2,2}-\d{2,2}T\d{2,2}:\d{2,2}:\d{2,2})\.\d*Z$")
FORM_SPEC = """<form method="post" action="%s">
<input type="hidden" name="SAMLRequest" value="%s" />
@@ -130,8 +136,8 @@ class Saml2Client:
return None
def authenticate(self, spentityid, location="", service_url="",
requestor="", my_name="", relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT):
my_name="", relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT, log=None):
""" Either verifies an authentication Response or if none is present
send an authentication request.
@@ -140,16 +146,22 @@ class Saml2Client:
IdP
:param location: Where the IdP is.
:param service_url: The service URL
:param requestor: Issuer of the AuthN request
:param my_name: The providers name
:param relay_state: To where the user should be returned after
successfull log in.
:return: AuthnRequest reponse
"""
if log:
log.info("spentityid: %s" % spentityid)
log.info("location: %s" % location)
log.info("service_url: %s" % service_url)
log.info("my_name: %s" % my_name)
sid = _sid()
authen_req = "%s" % self.create_authn_request(sid, location,
service_url, spentityid, my_name)
log and log.info("AuthNReq: %s" % authen_req)
if binding == saml2.BINDING_HTTP_POST:
# No valid ticket; Send a form to the client
# THIS IS NOT TO BE USED RIGHT NOW
@@ -210,7 +222,8 @@ class Saml2Client:
log and log.info("response: %s" % (response,))
try:
(ava, name_id, came_from) = self.do_response(response,
requestor, outstanding, log)
requestor, outstanding,
decoded_xml, log)
except AttributeError, exc:
log and log.error("AttributeError: %s" % (exc,))
return ({}, "")
@@ -222,44 +235,9 @@ class Saml2Client:
ava["__userid"] = name_id
return (ava, came_from)
def do_response(self, response, requestor, outstanding=None,
log=None):
"""
Parse a response, verify that it is a response for me and
expected by me and that it is correct.
def _assertion(self, assertion, outstanding, requestor, log):
""" """
:param response: The response as a structure
:param requestor: The host (me) that asked for a AuthN response
:param outstanding: A dictionary with session ids as keys and request
URIs as values.
:result: A 2-tuple with attribute value assertions as a dictionary and
the NameID
"""
if not outstanding:
outstanding = {}
if response.status:
status = response.status
if status.status_code.value != samlp.STATUS_SUCCESS:
log and log.info("Not successfull operation: %s" % status)
raise Exception(
"Not successfull according to: %s" % \
status.status_code.value)
# MUST contain *one* assertion
assert len(response.assertion) == 1
assertion = response.assertion[0]
if response.in_response_to:
if response.in_response_to in outstanding:
came_from = outstanding[response.in_response_to]
elif LAX:
came_from = ""
else:
log and log.info("Session id I don't recall using")
raise Exception("Session id I don't recall using")
# the assertion MUST contain one AuthNStatement
assert len(assertion.authn_statement) == 1
# authn_statement = assertion.authn_statement[0]
@@ -297,7 +275,15 @@ class Saml2Client:
assert assertion.conditions
condition = assertion.conditions
now = time.gmtime()
if time.strptime(condition.not_on_or_after, TIME_FORMAT) < now:
try:
not_on_or_after = time.strptime(condition.not_on_or_after,
TIME_FORMAT)
except Exception: # assume it's a format problem
m = TIME_FORMAT_WITH_FRAGMENT.match(condition.not_on_or_after)
not_on_or_after = time.strptime(m.groups()[0]+"Z", TIME_FORMAT)
if not_on_or_after < now:
# To old ignore
if not LAX:
log and log.info("To old: %s" % condition.not_on_or_after)
@@ -306,9 +292,66 @@ class Saml2Client:
if not LAX:
log and log.info("Not for me!!!")
return None # # verify signature
return (ava, name_id, came_from)
def _encrypted_assertion(self, xmlstr, outstanding, requestor, log):
decrypt_xml = decrypt(xmlstr, self.key_file, self.xmlsec_binary)
response = samlp.response_from_string(decrypt_xml)
ee = response.encrypted_assertion[0].extension_elements[0]
assertion = extension_element_to_element(
ee,
saml.ELEMENT_FROM_STRING,
namespace=saml.NAMESPACE)
log and log.info("Decrypted Assertion: %s" % assertion)
return self._assertion(assertion, outstanding, requestor, log)
def do_response(self, response, requestor, outstanding=None,
xmlstr="", log=None):
"""
Parse a response, verify that it is a response for me and
expected by me and that it is correct.
:param response: The response as a structure
:param requestor: The host (me) that asked for a AuthN response
:param outstanding: A dictionary with session ids as keys and request
URIs as values.
:result: A 2-tuple with attribute value assertions as a dictionary and
the NameID
"""
if not outstanding:
outstanding = {}
if response.status:
status = response.status
if status.status_code.value != samlp.STATUS_SUCCESS:
log and log.info("Not successfull operation: %s" % status)
raise Exception(
"Not successfull according to: %s" % \
status.status_code.value)
if response.in_response_to:
if response.in_response_to in outstanding:
came_from = outstanding[response.in_response_to]
elif LAX:
came_from = ""
else:
log and log.info("Session id I don't recall using")
raise Exception("Session id I don't recall using")
# MUST contain *one* assertion
assert len(response.assertion) == 1 or \
len(response.encrypted_assertion) == 1
if response.assertion:
return self._assertion(response.assertion[0], outstanding,
requestor, log)
else:
return self._encrypted_assertion(xmlstr, outstanding,
requestor, log)
def create_attribute_request(self, sid, subject_id, issuer, destination,
attribute=None, sp_name_qualifier=None, name_qualifier=None,
format=None):
@@ -451,9 +494,6 @@ class Saml2Client:
# ----------------------------------------------------------------------
#2009-07-05T15:35:29Z
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def for_me(condition, myself ):
for restriction in condition.audience_restriction:
audience = restriction.audience
@@ -470,7 +510,10 @@ def get_attribute_values(attribute_statement):
result = {}
for attribute in attribute_statement.attribute:
# Check name_format ??
name = attribute.name.strip()
try:
name = attribute.friendly_name.strip()
except AttributeError:
name = attribute.name.strip()
result[name] = []
for value in attribute.attribute_value:
result[name].append(value.text.strip())