Merge pull request #192 from HaToHo/master

Encryption and signing of authentication
This commit is contained in:
Roland Hedberg
2015-03-22 19:25:54 +01:00
15 changed files with 824 additions and 98 deletions

14
.gitignore vendored
View File

@@ -178,4 +178,16 @@ example/sp-repoze/sp_conf_example.py
example/idp2/idp_conf_example.py
example/sp-wsgi/sp_conf.py
example/idp2/lidp.xml
example/idp2/old_idp.xml
example/sp-repoze/old_sp.xml
example/sp-repoze/sp_conf_2.Pygmalion
.gitignore.swp
example/sp-repoze/sp_conf_2.py
sp.xml

View File

@@ -17,7 +17,7 @@ saml_conf = sp_conf
remember_name = auth_tkt
sid_store = outstanding
idp_query_param = IdPEntityId
discovery = http://130.239.201.5/role/idp.ds
#discovery = http://130.239.201.5/role/idp.ds
[general]
request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier

View File

@@ -337,7 +337,7 @@ class SAML2Plugin(object):
element_to_extension_element(spcertenc)])
if _cli.authn_requests_signed:
_sid = saml2.s_utils.sid(_cli.seed)
_sid = saml2.s_utils.sid()
req_id, msg_str = _cli.create_authn_request(
dest, vorg=vorg_name, sign=_cli.authn_requests_signed,
message_id=_sid, extensions=extensions)

View File

@@ -558,6 +558,106 @@ class SamlBase(ExtensionContainer):
except ValueError:
pass
def get_ns_map_attribute(self, attributes, uri_set):
for attribute in attributes:
if attribute[0] == "{":
uri, tag = attribute[1:].split("}")
uri_set.add(uri)
return uri_set
def tag_get_uri(self, elem):
if elem.tag[0] == "{":
uri, tag = elem.tag[1:].split("}")
return uri
return None
def get_ns_map(self, elements, uri_set):
for elem in elements:
uri_set = self.get_ns_map_attribute(elem.attrib, uri_set)
uri_set = self.get_ns_map(elem._children, uri_set)
uri = self.tag_get_uri(elem)
if uri is not None:
uri_set.add(uri)
return uri_set
def get_prefix_map(self, elements):
uri_set = self.get_ns_map(elements, set())
prefix_map = {}
for uri in sorted(uri_set):
prefix_map["encas%d" % len(prefix_map)] = uri
return prefix_map
def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(self, assertion_tag, advice_tag):
for tmp_encrypted_assertion in self.assertion.advice.encrypted_assertion:
prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree().
find(assertion_tag)])
tree = self._to_element_tree()
self.set_prefixes(tree.find(assertion_tag).find(advice_tag).find(tmp_encrypted_assertion._to_element_tree()
.tag).find(assertion_tag), prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8")
def get_xml_string_with_self_contained_assertion_within_encrypted_assertion(self, assertion_tag):
prefix_map = self.get_prefix_map([self.encrypted_assertion._to_element_tree().find(assertion_tag)])
tree = self._to_element_tree()
self.set_prefixes(tree.find(self.encrypted_assertion._to_element_tree().tag).find(assertion_tag), prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8")
def set_prefixes(self, elem, prefix_map):
# check if this is a tree wrapper
if not ElementTree.iselement(elem):
elem = elem.getroot()
# build uri map and add to root element
uri_map = {}
for prefix, uri in prefix_map.items():
uri_map[uri] = prefix
elem.set("xmlns:" + prefix, uri)
# fixup all elements in the tree
memo = {}
for elem in elem.getiterator():
self.fixup_element_prefixes(elem, uri_map, memo)
def fixup_element_prefixes(self, elem, uri_map, memo):
def fixup(name):
try:
return memo[name]
except KeyError:
if name[0] != "{":
return
uri, tag = name[1:].split("}")
if uri in uri_map:
new_name = uri_map[uri] + ":" + tag
memo[name] = new_name
return new_name
# fix element name
name = fixup(elem.tag)
if name:
elem.tag = name
# fix attribute names
for key, value in elem.items():
name = fixup(key)
if name:
elem.set(name, value)
del elem.attrib[key]
def to_string_force_namespace(self, nspair):
elem = self._to_element_tree()
self.set_prefixes(elem, nspair)
return ElementTree.tostring(elem, encoding="UTF-8")
def to_string(self, nspair=None):
"""Converts the Saml object to a string containing XML.

View File

@@ -666,7 +666,7 @@ class Assertion(dict):
name_id, attrconvs, policy, issuer, authn_class=None,
authn_auth=None, authn_decl=None, encrypt=None,
sec_context=None, authn_decl_ref=None, authn_instant="",
subject_locality="", authn_statem=None):
subject_locality="", authn_statem=None, add_subject=True):
""" Construct the Assertion
:param sp_entity_id: The entityid of the SP
@@ -722,6 +722,13 @@ class Assertion(dict):
else:
_authn_statement = None
if not add_subject:
_ass = assertion_factory(
issuer=issuer,
conditions=conds,
subject=None
)
else:
_ass = assertion_factory(
issuer=issuer,
conditions=conds,

View File

@@ -26,7 +26,7 @@ from saml2.soap import make_soap_enveloped_saml_thingy
from urlparse import parse_qs
from saml2.s_utils import signature, UnravelError
from saml2.s_utils import signature, UnravelError, exception_trace
from saml2.s_utils import do_attributes
from saml2 import samlp, BINDING_SOAP, SAMLError
@@ -537,7 +537,7 @@ class Base(Entity):
# ======== response handling ===========
def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
outstanding_certs=None):
outstanding_certs=None, decrypt=True):
""" Deal with an AuthnResponse
:param xmlstr: The reply as a xml string
@@ -545,6 +545,8 @@ class Base(Entity):
:param outstanding: A dictionary with session IDs as keys and
the original web request from the user before redirection
as values.
:param only_identity_in_encrypted_assertion: Must exist an assertion that is not encrypted that contains all
other information like subject and authentication statement.
:return: An response.AuthnResponse or None
"""
@@ -566,6 +568,7 @@ class Base(Entity):
"attribute_converters": self.config.attribute_converters,
"allow_unknown_attributes":
self.config.allow_unknown_attributes,
"decrypt": decrypt
}
try:
resp = self._parse_response(xmlstr, AuthnResponse,
@@ -580,11 +583,10 @@ class Base(Entity):
logger.error("XML parse error: %s" % err)
raise
#logger.debug(">> %s", resp)
if resp is None:
return None
elif isinstance(resp, AuthnResponse):
if resp.assertion is not None and len(resp.response.encrypted_assertion) == 0:
self.users.add_information_about_person(resp.session_info())
logger.info("--- ADDED person info ----")
pass

View File

@@ -23,7 +23,7 @@ from saml2 import soap
from saml2 import element_to_extension_element
from saml2 import extension_elements_to_elements
from saml2.saml import NameID
from saml2.saml import NameID, EncryptedAssertion
from saml2.saml import Issuer
from saml2.saml import NAMEID_FORMAT_ENTITY
from saml2.response import LogoutResponse
@@ -63,6 +63,7 @@ from saml2.sigver import CryptoBackendXmlSec1
from saml2.sigver import make_temp
from saml2.sigver import pre_encryption_part
from saml2.sigver import pre_signature_part
from saml2.sigver import pre_encrypt_assertion
from saml2.sigver import signed_instance_factory
from saml2.virtual_org import VirtualOrg
@@ -502,8 +503,16 @@ class Entity(HTTPBase):
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_cert=None, **kwargs):
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
encrypt_cert=None, **kwargs):
""" Create a Response.
Encryption:
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
true, then will the function try to encrypt the assertion in the the advice element of the main
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
If encrypted_advice_attributes is
false the main assertion will be encrypted. Since the same key
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
@@ -533,22 +542,44 @@ class Entity(HTTPBase):
return signed_instance_factory(response, self.sec, to_sign)
if encrypt_assertion:
node_xpath = None
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
if encrypted_advice_attributes and response.assertion.advice is not None \
and len(response.assertion.advice.assertion) == 1:
tmp_assertion = response.assertion.advice.assertion[0]
response.assertion.advice.encrypted_assertion = []
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
if isinstance(tmp_assertion, list):
response.assertion.advice.encrypted_assertion[0].add_extension_elements(tmp_assertion)
else:
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
response.assertion.advice.assertion = []
if encrypt_assertion_self_contained:
advice_tag = response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
assertion_tag, advice_tag)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
elif encrypt_assertion_self_contained:
assertion_tag = response.assertion._to_element_tree().tag
response = pre_encrypt_assertion(response)
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
assertion_tag)
else:
response = pre_encrypt_assertion(response)
if to_sign:
response = signed_instance_factory(response, self.sec, to_sign)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part())
pre_encryption_part(), node_xpath=node_xpath)
# template(response.assertion.id))
if sign:
if to_sign:
signed_instance_factory(response, self.sec, to_sign)
else:
# default is to sign the whole response if anything
sign_class = [(class_name(response), response.id)]
return signed_instance_factory(response, self.sec,
sign_class)
return signed_instance_factory(response, self.sec, sign_class)
else:
return response
@@ -946,7 +977,13 @@ class Entity(HTTPBase):
decode=False)
else:
key_file = ""
response = response.verify(key_file)
only_identity_in_encrypted_assertion = False
if "only_identity_in_encrypted_assertion" in kwargs:
only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"]
decrypt = True
if "decrypt" in kwargs:
decrypt = kwargs["decrypt"]
response = response.verify(key_file, decrypt=decrypt)
if not response:
return None

View File

@@ -3,7 +3,7 @@ __author__ = 'rhoerbe' #2013-09-05
EGOVTOKEN = ["PVP-VERSION",
"PVP-PRINCIPALNAME",
"PVP-PRINCIPAL-NAME",
"PVP-GIVENNAME",
"PVP-BIRTHDATE",
"PVP-USERID",

View File

@@ -395,7 +395,7 @@ class StatusResponse(object):
def loads(self, xmldata, decode=True, origxml=None):
return self._loads(xmldata, decode, origxml)
def verify(self, key_file=""):
def verify(self, key_file="", decrypt=True):
try:
return self._verify()
except AssertionError:
@@ -622,24 +622,32 @@ class AuthnResponse(StatusResponse):
attrlist = enc_attr.extensions_as_elements("Attribute", saml)
attribute_statement.attribute.extend(attrlist)
def read_attribute_statement(self, attr_statem):
logger.debug("Attribute Statement: %s" % (attr_statem,))
for aconv in self.attribute_converters:
logger.debug("Converts name format: %s" % (aconv.name_format,))
self.decrypt_attributes(attr_statem)
return to_local(self.attribute_converters, attr_statem,
self.allow_unknown_attributes)
def get_identity(self):
""" The assertion can contain zero or one attributeStatements
"""
if not self.assertion.attribute_statement:
logger.error("Missing Attribute Statement")
ava = {}
else:
if self.assertion.advice:
if self.assertion.advice.assertion:
for tmp_assertion in self.assertion.advice.assertion:
if tmp_assertion.attribute_statement:
assert len(tmp_assertion.attribute_statement) == 1
ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0]))
if self.assertion.attribute_statement:
assert len(self.assertion.attribute_statement) == 1
_attr_statem = self.assertion.attribute_statement[0]
logger.debug("Attribute Statement: %s" % (_attr_statem,))
for aconv in self.attribute_converters:
logger.debug("Converts name format: %s" % (aconv.name_format,))
self.decrypt_attributes(_attr_statem)
ava = to_local(self.attribute_converters, _attr_statem,
self.allow_unknown_attributes)
ava.update(self.read_attribute_statement(_attr_statem))
if not ava:
logger.error("Missing Attribute Statement")
return ava
def _bearer_confirmed(self, data):
@@ -749,11 +757,9 @@ class AuthnResponse(StatusResponse):
raise SignatureError("Signature missing for assertion")
else:
logger.debug("signed")
if not verified:
try:
self.sec.check_signature(assertion, class_name(assertion),
self.xmlstr)
self.sec.check_signature(assertion, class_name(assertion),self.xmlstr)
except Exception as exc:
logger.error("correctly_signed_response: %s" % exc)
raise
@@ -808,7 +814,7 @@ class AuthnResponse(StatusResponse):
res.append(assertion)
return res
def parse_assertion(self, key_file=""):
def parse_assertion(self, key_file="", decrypt=True):
if self.context == "AuthnQuery":
# can contain one or more assertions
pass
@@ -820,17 +826,35 @@ class AuthnResponse(StatusResponse):
raise Exception("No assertion part")
res = []
if self.response.encrypted_assertion:
has_encrypted_assertions = self.response.encrypted_assertion
if not has_encrypted_assertions and self.response.assertion:
for tmp_assertion in self.response.assertion:
if tmp_assertion.advice:
if tmp_assertion.advice.encrypted_assertion:
has_encrypted_assertions = True
break
if has_encrypted_assertions and decrypt:
logger.debug("***Encrypted assertion/-s***")
decr_text = self.sec.decrypt(self.xmlstr, key_file)
resp = samlp.response_from_string(decr_text)
res = self.decrypt_assertions(resp.encrypted_assertion, decr_text)
if resp.assertion:
for tmp_ass in resp.assertion:
if tmp_ass.advice and tmp_ass.advice.encrypted_assertion:
advice_res = self.decrypt_assertions(tmp_ass.advice.encrypted_assertion, decr_text)
if tmp_ass.advice.assertion:
tmp_ass.advice.assertion.extend(advice_res)
else:
tmp_ass.advice.assertion = advice_res
tmp_ass.advice.encrypted_assertion = []
self.response.assertion = resp.assertion
if self.response.assertion:
self.response.assertion.extend(res)
else:
self.response.assertion = res
self.response.encrypted_assertion = []
self.xmlstr = decr_text
self.response.encrypted_assertion = []
if self.response.assertion:
logger.debug("***Unencrypted assertion***")
@@ -840,10 +864,9 @@ class AuthnResponse(StatusResponse):
else:
self.assertions.append(assertion)
self.assertion = self.assertions[0]
return True
def verify(self, key_file=""):
def verify(self, key_file="", decrypt=True):
""" Verify that the assertion is syntactically correct and
the signature is correct if present.
:param key_file: If not the default key file should be used this is it.
@@ -861,7 +884,7 @@ class AuthnResponse(StatusResponse):
if not isinstance(self.response, samlp.Response):
return self
if self.parse_assertion(key_file):
if self.parse_assertion(key_file, decrypt=decrypt):
return self
else:
logger.error("Could not parse the assertion")
@@ -1089,7 +1112,7 @@ class AssertionIDResponse(object):
return self._postamble()
def verify(self, key_file=""):
def verify(self, key_file="", decrypt=True):
try:
valid_instance(self.response)
except NotValid as exc:

View File

@@ -280,12 +280,50 @@ class Server(Entity):
# ------------------------------------------------------------------------
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url, name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response, add_subject=True):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict([
(AUTHN_DICT_MAP[k], v) for k, v in authn.items()
if k in AUTHN_DICT_MAP])
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer, add_subject=add_subject,
**authn_args)
elif authn_statement: # Got a complete AuthnStatement
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_statem=authn_statement, add_subject=add_subject)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer, add_subject=add_subject)
return assertion
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False,
encrypt_cert=None, authn_statement=None):
encrypt_cert=None, authn_statement=None,
encrypt_assertion_self_contained=False, encrypted_advice_attributes=False):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -309,45 +347,43 @@ class Server(Entity):
args = {}
#if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict([
(AUTHN_DICT_MAP[k], v) for k, v in authn.items()
if k in AUTHN_DICT_MAP])
#if encrypt_assertion and show_nameid:
# tmp_name_id = name_id
# name_id = None
# name_id = None
# tmp_authn = authn
# authn = None
# tmp_authn_statement = authn_statement
# authn_statement = None
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
**authn_args)
elif authn_statement: # Got a complete AuthnStatement
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_statem=authn_statement)
if encrypt_assertion and encrypted_advice_attributes:
assertion_attributes = self.setup_assertion(None, sp_entity_id, None, None, None, policy,
None, None, identity, best_effort, sign_response, False)
assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer, authn_statement, [], True,
sign_response)
assertion.advice = saml.Advice()
#assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
#assertion.advice.assertion_uri_ref.append(saml.AssertionURIRef())
assertion.advice.assertion.append(assertion_attributes)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer)
assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer, authn_statement, identity, True,
sign_response)
to_sign = []
if sign_assertion is not None and sign_assertion:
if assertion.advice and assertion.advice.assertion:
for tmp_assertion in assertion.advice.assertion:
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
to_sign.append((class_name(assertion), assertion.id))
# Store which assertion that has been sent to which SP about which
# subject.
@@ -358,12 +394,14 @@ class Server(Entity):
args["assertion"] = assertion
if self.support_AssertionIDRequest() or self.support_AuthnQuery():
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()):
self.session_db.store_assertion(assertion, to_sign)
return self._response(in_response_to, consumer_url, status, issuer,
sign_response, to_sign, encrypt_assertion=encrypt_assertion,
encrypt_cert=encrypt_cert, **args)
encrypt_cert=encrypt_cert,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes, **args)
# ------------------------------------------------------------------------
@@ -438,6 +476,8 @@ class Server(Entity):
name_id=None, authn=None, issuer=None,
sign_response=None, sign_assertion=None,
encrypt_cert=None, encrypt_assertion=None,
encrypt_assertion_self_contained=False,
encrypted_advice_attributes=False,
**kwargs):
""" Constructs an AuthenticationResponse
@@ -549,6 +589,8 @@ class Server(Entity):
sign_response=sign_response,
best_effort=best_effort,
encrypt_assertion=encrypt_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
encrypt_cert=encrypt_cert)
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
@@ -562,6 +604,8 @@ class Server(Entity):
sign_response=sign_response,
best_effort=best_effort,
encrypt_assertion=encrypt_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
encrypt_cert=encrypt_cert)
except MissingValue, exc:

View File

@@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSTCCAbICAQEwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCc2UxCzAJBgNV
BAgTAmFjMQ0wCwYDVQQHEwR1bWVhMRwwGgYDVQQKExNJVFMgVW1lYSBVbml2ZXJz
aXR5MQ0wCwYDVQQLEwRESVJHMRUwEwYDVQQDEwxsb2NhbGhvc3QuY2EwHhcNMTQw
MzE0MDczNDIwWhcNMjQwMzExMDczNDIwWjBtMQswCQYDVQQGEwJzZTELMAkGA1UE
CBMCYWMxDTALBgNVBAcTBHVtZWExHDAaBgNVBAoTE0lUUyBVbWVhIFVuaXZlcnNp
dHkxDTALBgNVBAsTBERJUkcxFTATBgNVBAMTDGxvY2FsaG9zdC5jYTCBnzANBgkq
hkiG9w0BAQEFAAOBjQAwgYkCgYEAzJseOYg+hKGsnGWilv9FrfH2csQ9UZGwgwHz
zR2IquQg/+nONxd4MIwjOnQrLOJuhpu55ZTSpeT901GNDLj4xPQnFrWWyET8NxZg
w7Ilra55iQNaoUWpdi0JQSXI/9CY8t+Y170+7DfBJ6zo4y6+HKaOLNxZy4IbB0SE
aZBGsrUCAwEAATANBgkqhkiG9w0BAQsFAAOBgQAEOQJkD+5fb2mTtxwaZeRyQN9c
If0Xd2E7Z6BstGCUMWa/q4FrNee324kINVsFYg0GBTBwfyYPwR7I70LGjS3gXEzd
RjGx/Z8yvHJyavJj8iRCOflQ0fUlwasFYtrJesPOfM+aJ05Jb8GelUxpKh6BtPlE
IU0FLm//i9ucLQ9zBg==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDMmx45iD6EoaycZaKW/0Wt8fZyxD1RkbCDAfPNHYiq5CD/6c43
F3gwjCM6dCss4m6Gm7nllNKl5P3TUY0MuPjE9CcWtZbIRPw3FmDDsiWtrnmJA1qh
Ral2LQlBJcj/0Jjy35jXvT7sN8EnrOjjLr4cpo4s3FnLghsHRIRpkEaytQIDAQAB
AoGAMw6aUjz/bNVzX2u1UPzOhIOWvjjeHFbAt1BraEnwasSWv4W2oeTHZ0XxHIsU
oxS2A/0kPHgQwLkN5ge5rO0TlpAI5X9ZqlJ0SXF5zjJOBtyK6TWoUbwnyzS7lbFC
q9AVrHwMX9uNCboccqzjrzHyNE+4/QT7z2G5AMzjfq+5EwECQQDvOJuUl2pbUUIK
nMCmwkARFEZzZYV2oIBDsagTG8gX7glj5stoYXuez8EnYtNHRDBConyKqruuzqJk
qSKlha7hAkEA2vT4CpAzHSCknwQKXmFwBD5hVBrv+JZSur6XpqEdwXkX2osScAaW
xj3vQEQorJC2CryvUVOTeuFoog0f+6HiVQJBAMs0dMQ2ErxbPBQzr1p4K1/Wrzmb
BVINaKcYJEOHF+Nr6kIYbLTQCeiPZe4E/p/NBomz6MMJ4L/O+xcyrSGZe0ECQQCZ
ejELpnxNpH4AAKML+Ry9vMQYYjFnfGdNAx/l6vWikjEIPYeVAulY2D0GPUCNhXo1
GIGDbiPodGwVe0G57oVpAkBjckA1LEE1Kzkq5sR9U9t9m+3WSBvMZxLUwJYdVmOY
Y6xKVtJfe9XuQt9tzoWQW8iieWyKOw7yLYCBcjns2iZn
-----END RSA PRIVATE KEY-----

View File

@@ -55,6 +55,10 @@ NAME_ID_POLICY_2 = """<?xml version="1.0" encoding="utf-8"?>
class TestIdentifier():
def setup_class(self):
try:
os.remove("subject.db.db")
except:
pass
self.id = IdentDB("subject.db", "example.com", "example")
def test_persistent_1(self):

View File

@@ -1,9 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import base64
import os
from contextlib import closing
from urlparse import parse_qs
from saml2.sigver import pre_encryption_part
import uuid
from saml2.cert import OpenSSLWrapper
from saml2.sigver import pre_encryption_part, make_temp
from saml2.assertion import Policy
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
@@ -41,6 +45,40 @@ def _eq(l1, l2):
return set(l1) == set(l2)
BASEDIR = os.path.abspath(os.path.dirname(__file__))
def get_ava(assertion):
ava = {}
for statement in assertion.attribute_statement:
for attr in statement.attribute:
value = []
for tmp_val in attr.attribute_value:
value.append(tmp_val.text)
key = attr.friendly_name
if key is None or len(key) == 0:
key = attr.text
ava[key] = value
return ava
def generate_cert():
sn = uuid.uuid4().urn
cert_info = {
"cn": "localhost",
"country_code": "se",
"state": "ac",
"city": "Umea",
"organization": "ITS",
"organization_unit": "DIRG"
}
osw = OpenSSLWrapper()
ca_cert_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.crt")
ca_key_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.key")
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
return cert_str, req_key_str
class TestServer1():
def setup_class(self):
self.server = Server("idp_conf")
@@ -372,6 +410,354 @@ class TestServer1():
# value. Just that there should be one
assert assertion.signature.signature_value.text != ""
def test_encrypted_signed_response_1(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=True,
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
encrypted_advice_attributes=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id,
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=resp.assertion[0].id,
id_attr="")
assert valid
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
[saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=assertion[0].id,
id_attr="")
assert valid
def test_encrypted_signed_response_2(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=True,
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id,
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
assert resp.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
assert 'EncryptedAssertion><encas2:Assertion xmlns:encas0="http://www.w3.org/2000/09/xmldsig#" ' \
'xmlns:encas1="http://www.w3.org/2001/XMLSchema-instance" ' \
'xmlns:encas2="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=assertion[0].id,
id_attr="")
assert valid
def test_encrypted_signed_response_3(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=True,
sign_assertion=True,
encrypt_assertion=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id,
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
assert resp.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
assert 'xmlns:encas' not in decr_text
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=assertion[0].id,
id_attr="")
assert valid
def test_encrypted_signed_response_4(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=True,
sign_assertion=True,
encrypt_assertion=True,
encrypted_advice_attributes=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id,
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=resp.assertion[0].id,
id_attr="")
assert valid
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
[saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
#Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature
#is no longer working. :(
assert 'xmlns:encas' not in decr_text
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=assertion[0].id,
id_attr="")
assert valid
def test_encrypted_response_1(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=False,
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
encrypted_advice_attributes=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
assert sresponse.signature is None
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
assert resp.assertion[0].signature is None
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
[saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
assert assertion[0].signature is None
def test_encrypted_response_2(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=False,
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
encrypted_advice_attributes=False,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
assert sresponse.signature is None
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
assert resp.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
assert assertion[0].signature is None
def test_slo_http_post(self):
soon = time_util.in_a_while(days=1)
sinfo = {
@@ -509,3 +895,4 @@ class TestServerLogout():
if __name__ == "__main__":
ts = TestServer1()
ts.setup_class()
ts.test_encrypted_response_1()

View File

@@ -20,7 +20,7 @@ from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2.config import SPConfig
from saml2.response import LogoutResponse
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
from saml2.server import Server
@@ -499,6 +499,86 @@ class TestClient:
assert resp.assertion
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
def test_sign_then_encrypt_assertion_advice(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"})
a_assertion = a_asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_assertion.signature = sigver.pre_signature_part(
a_assertion.id, _sec.my_cert, 1)
assertion.advice = Advice()
assertion.advice.encrypted_assertion = []
assertion.advice.encrypted_assertion.append(EncryptedAssertion())
assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion)
response = sigver.response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
issuer=self.server._issuer()
)
response.assertion.append(assertion)
response = _sec.sign_statement("%s" % response, class_name(a_assertion),
key_file=self.client.sec.key_file,
node_id=a_assertion.id)
#xmldoc = "%s" % response
# strangely enough I get different tags if I run this test separately
# or as part of a bunch of tests.
#xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part(), node_xpath=node_xpath)
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.assertion.advice
assert resp.assertion.advice.assertion
assert resp.ava == \
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['test.testsson@test.se']}
def test_signed_redirect(self):
msg_str = "%s" % self.client.create_authn_request(
@@ -628,4 +708,4 @@ class TestClientWithDummy():
if __name__ == "__main__":
tc = TestClient()
tc.setup_class()
tc.test_sign_then_encrypt_assertion2()
tc.test_sign_then_encrypt_assertion_advice()