Updated pysaml2 to support PEFIM.

Added encrypted assertions with self contained namespaces.

Added possibility to add two assertions. One encrypted with all the attributes and a second one that is not encrypted and contains nameid and authentication statement.
This commit is contained in:
Hans Hörberg
2015-03-16 08:38:49 +01:00
parent 6bd9acb4fe
commit 6e0acc8997
8 changed files with 169 additions and 43 deletions

View File

@@ -17,7 +17,7 @@ saml_conf = sp_conf
remember_name = auth_tkt
sid_store = outstanding
idp_query_param = IdPEntityId
discovery = http://130.239.201.5/role/idp.ds
#discovery = http://130.239.201.5/role/idp.ds
[general]
request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier

View File

@@ -337,7 +337,7 @@ class SAML2Plugin(object):
element_to_extension_element(spcertenc)])
if _cli.authn_requests_signed:
_sid = saml2.s_utils.sid(_cli.seed)
_sid = saml2.s_utils.sid()
req_id, msg_str = _cli.create_authn_request(
dest, vorg=vorg_name, sign=_cli.authn_requests_signed,
message_id=_sid, extensions=extensions)

View File

@@ -558,6 +558,94 @@ class SamlBase(ExtensionContainer):
except ValueError:
pass
def get_ns_map_attribute(self, attributes, uri_set):
for attribute in attributes:
if attribute[0] == "{":
uri, tag = attribute[1:].split("}")
uri_set.add(uri)
return uri_set
def tag_get_uri(self, elem):
if elem.tag[0] == "{":
uri, tag = elem.tag[1:].split("}")
return uri
return None
def get_ns_map(self, elements, uri_set):
for elem in elements:
uri_set = self.get_ns_map_attribute(elem.attrib, uri_set)
uri_set = self.get_ns_map(elem._children, uri_set)
uri = self.tag_get_uri(elem)
if uri is not None:
uri_set.add(uri)
return uri_set
def get_prefix_map(self, elements):
uri_set = self.get_ns_map(elements, set())
prefix_map = {}
for uri in sorted(uri_set):
prefix_map["encas%d" % len(prefix_map)] = uri
return prefix_map
def get_xml_string_with_self_contained__assertion_within_encrypted_assertion(self, assertion_tag):
prefix_map = self.get_prefix_map([self.encrypted_assertion._to_element_tree().find(assertion_tag)])
tree = self._to_element_tree()
self.set_prefixes(tree.find(self.encrypted_assertion._to_element_tree().tag).find(assertion_tag), prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8")
def set_prefixes(self, elem, prefix_map):
# check if this is a tree wrapper
if not ElementTree.iselement(elem):
elem = elem.getroot()
# build uri map and add to root element
uri_map = {}
for prefix, uri in prefix_map.items():
uri_map[uri] = prefix
elem.set("xmlns:" + prefix, uri)
# fixup all elements in the tree
memo = {}
for elem in elem.getiterator():
self.fixup_element_prefixes(elem, uri_map, memo)
def fixup_element_prefixes(self, elem, uri_map, memo):
def fixup(name):
try:
return memo[name]
except KeyError:
if name[0] != "{":
return
uri, tag = name[1:].split("}")
if uri in uri_map:
new_name = uri_map[uri] + ":" + tag
memo[name] = new_name
return new_name
# fix element name
name = fixup(elem.tag)
if name:
elem.tag = name
# fix attribute names
for key, value in elem.items():
name = fixup(key)
if name:
elem.set(name, value)
del elem.attrib[key]
def to_string_force_namespace(self, nspair):
elem = self._to_element_tree()
self.set_prefixes(elem, nspair)
return ElementTree.tostring(elem, encoding="UTF-8")
def to_string(self, nspair=None):
"""Converts the Saml object to a string containing XML.

View File

@@ -26,7 +26,7 @@ from saml2.soap import make_soap_enveloped_saml_thingy
from urlparse import parse_qs
from saml2.s_utils import signature, UnravelError
from saml2.s_utils import signature, UnravelError, exception_trace
from saml2.s_utils import do_attributes
from saml2 import samlp, BINDING_SOAP, SAMLError
@@ -580,13 +580,12 @@ class Base(Entity):
logger.error("XML parse error: %s" % err)
raise
#logger.debug(">> %s", resp)
if resp is None:
return None
elif isinstance(resp, AuthnResponse):
self.users.add_information_about_person(resp.session_info())
logger.info("--- ADDED person info ----")
if resp.assertion is not None and len(resp.response.encrypted_assertion) == 0:
self.users.add_information_about_person(resp.session_info())
logger.info("--- ADDED person info ----")
pass
else:
logger.error("Response type not supported: %s" % (

View File

@@ -63,6 +63,7 @@ from saml2.sigver import CryptoBackendXmlSec1
from saml2.sigver import make_temp
from saml2.sigver import pre_encryption_part
from saml2.sigver import pre_signature_part
from saml2.sigver import pre_encrypt_assertion
from saml2.sigver import signed_instance_factory
from saml2.virtual_org import VirtualOrg
@@ -438,7 +439,7 @@ class Entity(HTTPBase):
request_cls
"""
if not message_id:
message_id = sid(self.seed)
message_id = sid()
for key, val in self.message_args(message_id).items():
if key not in kwargs:
@@ -503,7 +504,7 @@ class Entity(HTTPBase):
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_cert=None, **kwargs):
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypt_cert=None, **kwargs):
""" Create a Response.
:param in_response_to: The session identifier of the request
@@ -537,7 +538,13 @@ class Entity(HTTPBase):
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
if encrypt_assertion_self_contained:
assertion_tag = response.assertion._to_element_tree().tag
response = pre_encrypt_assertion(response)
response = response.get_xml_string_with_self_contained__assertion_within_encrypted_assertion(
assertion_tag)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part())
@@ -547,7 +554,6 @@ class Entity(HTTPBase):
signed_instance_factory(response, self.sec, to_sign)
else:
# default is to sign the whole response if anything
sign_class = [(class_name(response), response.id)]
return signed_instance_factory(response, self.sec,
sign_class)
else:

View File

@@ -3,7 +3,7 @@ __author__ = 'rhoerbe' #2013-09-05
EGOVTOKEN = ["PVP-VERSION",
"PVP-PRINCIPALNAME",
"PVP-PRINCIPAL-NAME",
"PVP-GIVENNAME",
"PVP-BIRTHDATE",
"PVP-USERID",

View File

@@ -820,7 +820,7 @@ class AuthnResponse(StatusResponse):
raise Exception("No assertion part")
res = []
if self.response.encrypted_assertion:
if self.response.encrypted_assertion and key_file is not None and len(key_file) > 0:
logger.debug("***Encrypted assertion/-s***")
decr_text = self.sec.decrypt(self.xmlstr, key_file)
resp = samlp.response_from_string(decr_text)

View File

@@ -280,35 +280,8 @@ class Server(Entity):
# ------------------------------------------------------------------------
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False,
encrypt_cert=None, authn_statement=None):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param authn: A dictionary containing information about the
authn context.
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:param best_effort: Even if not the SPs demands can be met send a
response.
:return: A response instance
"""
to_sign = []
args = {}
#if identity:
_issuer = self._issuer(issuer)
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url, name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
@@ -342,6 +315,56 @@ class Server(Entity):
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer)
return assertion
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False,
encrypt_cert=None, authn_statement=None,
encrypt_assertion_self_contained=False, show_nameid=False):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param authn: A dictionary containing information about the
authn context.
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:param best_effort: Even if not the SPs demands can be met send a
response.
:return: A response instance
"""
to_sign = []
args = {}
#if identity:
_issuer = self._issuer(issuer)
if encrypt_assertion and show_nameid:
tmp_name_id = name_id
name_id = None
name_id = None
tmp_authn = authn
authn = None
tmp_authn_statement = authn_statement
authn_statement = None
assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url, name_id, policy,
_issuer, authn_statement, identity, best_effort, sign_response)
assertion_only_nameid = None
if encrypt_assertion and show_nameid:
assertion_only_nameid = self.setup_assertion(tmp_authn, sp_entity_id, in_response_to, consumer_url,
tmp_name_id, policy, _issuer, tmp_authn_statement, [], True,
sign_response)
if sign_assertion is not None and sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
@@ -358,12 +381,16 @@ class Server(Entity):
args["assertion"] = assertion
if self.support_AssertionIDRequest() or self.support_AuthnQuery():
if assertion_only_nameid is not None:
args["assertion_only_nameid"] = assertion_only_nameid
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()) and assertion_only_nameid is None:
self.session_db.store_assertion(assertion, to_sign)
return self._response(in_response_to, consumer_url, status, issuer,
sign_response, to_sign, encrypt_assertion=encrypt_assertion,
encrypt_cert=encrypt_cert, **args)
encrypt_cert=encrypt_cert,
encrypt_assertion_self_contained=encrypt_assertion_self_contained, **args)
# ------------------------------------------------------------------------
@@ -438,6 +465,8 @@ class Server(Entity):
name_id=None, authn=None, issuer=None,
sign_response=None, sign_assertion=None,
encrypt_cert=None, encrypt_assertion=None,
encrypt_assertion_self_contained=False,
show_nameid=False,
**kwargs):
""" Constructs an AuthenticationResponse
@@ -549,6 +578,8 @@ class Server(Entity):
sign_response=sign_response,
best_effort=best_effort,
encrypt_assertion=encrypt_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
show_nameid=show_nameid,
encrypt_cert=encrypt_cert)
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
@@ -562,6 +593,8 @@ class Server(Entity):
sign_response=sign_response,
best_effort=best_effort,
encrypt_assertion=encrypt_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
show_nameid=show_nameid,
encrypt_cert=encrypt_cert)
except MissingValue, exc: