diff --git a/.gitignore b/.gitignore index a8ddb96..f6d1058 100644 --- a/.gitignore +++ b/.gitignore @@ -178,4 +178,16 @@ example/sp-repoze/sp_conf_example.py example/idp2/idp_conf_example.py -example/sp-wsgi/sp_conf.py +example/idp2/lidp.xml + +example/idp2/old_idp.xml + +example/sp-repoze/old_sp.xml + +example/sp-repoze/sp_conf_2.Pygmalion + +.gitignore.swp + +example/sp-repoze/sp_conf_2.py + +sp.xml diff --git a/example/sp-repoze/who.ini b/example/sp-repoze/who.ini index 1ed329f..b116b99 100644 --- a/example/sp-repoze/who.ini +++ b/example/sp-repoze/who.ini @@ -17,7 +17,7 @@ saml_conf = sp_conf remember_name = auth_tkt sid_store = outstanding idp_query_param = IdPEntityId -discovery = http://130.239.201.5/role/idp.ds +#discovery = http://130.239.201.5/role/idp.ds [general] request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier diff --git a/src/s2repoze/plugins/sp.py b/src/s2repoze/plugins/sp.py index a80dece..8c93b2a 100644 --- a/src/s2repoze/plugins/sp.py +++ b/src/s2repoze/plugins/sp.py @@ -337,7 +337,7 @@ class SAML2Plugin(object): element_to_extension_element(spcertenc)]) if _cli.authn_requests_signed: - _sid = saml2.s_utils.sid(_cli.seed) + _sid = saml2.s_utils.sid() req_id, msg_str = _cli.create_authn_request( dest, vorg=vorg_name, sign=_cli.authn_requests_signed, message_id=_sid, extensions=extensions) diff --git a/src/saml2/__init__.py b/src/saml2/__init__.py index c391a51..7a7079e 100644 --- a/src/saml2/__init__.py +++ b/src/saml2/__init__.py @@ -558,6 +558,106 @@ class SamlBase(ExtensionContainer): except ValueError: pass + def get_ns_map_attribute(self, attributes, uri_set): + for attribute in attributes: + if attribute[0] == "{": + uri, tag = attribute[1:].split("}") + uri_set.add(uri) + return uri_set + + def tag_get_uri(self, elem): + if elem.tag[0] == "{": + uri, tag = elem.tag[1:].split("}") + return uri + return None + def get_ns_map(self, elements, uri_set): + + for elem in elements: + uri_set = self.get_ns_map_attribute(elem.attrib, uri_set) + uri_set = self.get_ns_map(elem._children, uri_set) + uri = self.tag_get_uri(elem) + if uri is not None: + uri_set.add(uri) + return uri_set + + def get_prefix_map(self, elements): + uri_set = self.get_ns_map(elements, set()) + prefix_map = {} + for uri in sorted(uri_set): + prefix_map["encas%d" % len(prefix_map)] = uri + return prefix_map + + def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(self, assertion_tag, advice_tag): + for tmp_encrypted_assertion in self.assertion.advice.encrypted_assertion: + prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree(). + find(assertion_tag)]) + + tree = self._to_element_tree() + + self.set_prefixes(tree.find(assertion_tag).find(advice_tag).find(tmp_encrypted_assertion._to_element_tree() + .tag).find(assertion_tag), prefix_map) + + return ElementTree.tostring(tree, encoding="UTF-8") + + def get_xml_string_with_self_contained_assertion_within_encrypted_assertion(self, assertion_tag): + prefix_map = self.get_prefix_map([self.encrypted_assertion._to_element_tree().find(assertion_tag)]) + + tree = self._to_element_tree() + + self.set_prefixes(tree.find(self.encrypted_assertion._to_element_tree().tag).find(assertion_tag), prefix_map) + + return ElementTree.tostring(tree, encoding="UTF-8") + + + def set_prefixes(self, elem, prefix_map): + + # check if this is a tree wrapper + if not ElementTree.iselement(elem): + elem = elem.getroot() + + # build uri map and add to root element + uri_map = {} + for prefix, uri in prefix_map.items(): + uri_map[uri] = prefix + elem.set("xmlns:" + prefix, uri) + + # fixup all elements in the tree + memo = {} + for elem in elem.getiterator(): + self.fixup_element_prefixes(elem, uri_map, memo) + + + def fixup_element_prefixes(self, elem, uri_map, memo): + def fixup(name): + try: + return memo[name] + except KeyError: + if name[0] != "{": + return + uri, tag = name[1:].split("}") + if uri in uri_map: + new_name = uri_map[uri] + ":" + tag + memo[name] = new_name + return new_name + # fix element name + name = fixup(elem.tag) + if name: + elem.tag = name + # fix attribute names + for key, value in elem.items(): + name = fixup(key) + if name: + elem.set(name, value) + del elem.attrib[key] + + def to_string_force_namespace(self, nspair): + + elem = self._to_element_tree() + + self.set_prefixes(elem, nspair) + + return ElementTree.tostring(elem, encoding="UTF-8") + def to_string(self, nspair=None): """Converts the Saml object to a string containing XML. diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py index 0e66eb2..9a35e11 100644 --- a/src/saml2/assertion.py +++ b/src/saml2/assertion.py @@ -666,7 +666,7 @@ class Assertion(dict): name_id, attrconvs, policy, issuer, authn_class=None, authn_auth=None, authn_decl=None, encrypt=None, sec_context=None, authn_decl_ref=None, authn_instant="", - subject_locality="", authn_statem=None): + subject_locality="", authn_statem=None, add_subject=True): """ Construct the Assertion :param sp_entity_id: The entityid of the SP @@ -722,22 +722,29 @@ class Assertion(dict): else: _authn_statement = None - _ass = assertion_factory( - issuer=issuer, - conditions=conds, - subject=factory( - saml.Subject, - name_id=name_id, - subject_confirmation=[factory( - saml.SubjectConfirmation, - method=saml.SCM_BEARER, - subject_confirmation_data=factory( - saml.SubjectConfirmationData, - in_response_to=in_response_to, - recipient=consumer_url, - not_on_or_after=policy.not_on_or_after(sp_entity_id)))] - ), - ) + if not add_subject: + _ass = assertion_factory( + issuer=issuer, + conditions=conds, + subject=None + ) + else: + _ass = assertion_factory( + issuer=issuer, + conditions=conds, + subject=factory( + saml.Subject, + name_id=name_id, + subject_confirmation=[factory( + saml.SubjectConfirmation, + method=saml.SCM_BEARER, + subject_confirmation_data=factory( + saml.SubjectConfirmationData, + in_response_to=in_response_to, + recipient=consumer_url, + not_on_or_after=policy.not_on_or_after(sp_entity_id)))] + ), + ) if _authn_statement: _ass.authn_statement = [_authn_statement] diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py index a0e5e10..de43387 100644 --- a/src/saml2/client_base.py +++ b/src/saml2/client_base.py @@ -26,7 +26,7 @@ from saml2.soap import make_soap_enveloped_saml_thingy from urlparse import parse_qs -from saml2.s_utils import signature, UnravelError +from saml2.s_utils import signature, UnravelError, exception_trace from saml2.s_utils import do_attributes from saml2 import samlp, BINDING_SOAP, SAMLError @@ -537,7 +537,7 @@ class Base(Entity): # ======== response handling =========== def parse_authn_request_response(self, xmlstr, binding, outstanding=None, - outstanding_certs=None): + outstanding_certs=None, decrypt=True): """ Deal with an AuthnResponse :param xmlstr: The reply as a xml string @@ -545,6 +545,8 @@ class Base(Entity): :param outstanding: A dictionary with session IDs as keys and the original web request from the user before redirection as values. + :param only_identity_in_encrypted_assertion: Must exist an assertion that is not encrypted that contains all + other information like subject and authentication statement. :return: An response.AuthnResponse or None """ @@ -566,6 +568,7 @@ class Base(Entity): "attribute_converters": self.config.attribute_converters, "allow_unknown_attributes": self.config.allow_unknown_attributes, + "decrypt": decrypt } try: resp = self._parse_response(xmlstr, AuthnResponse, @@ -580,13 +583,12 @@ class Base(Entity): logger.error("XML parse error: %s" % err) raise - #logger.debug(">> %s", resp) - if resp is None: return None elif isinstance(resp, AuthnResponse): - self.users.add_information_about_person(resp.session_info()) - logger.info("--- ADDED person info ----") + if resp.assertion is not None and len(resp.response.encrypted_assertion) == 0: + self.users.add_information_about_person(resp.session_info()) + logger.info("--- ADDED person info ----") pass else: logger.error("Response type not supported: %s" % ( diff --git a/src/saml2/entity.py b/src/saml2/entity.py index eadeb10..26c2fe5 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -23,7 +23,7 @@ from saml2 import soap from saml2 import element_to_extension_element from saml2 import extension_elements_to_elements -from saml2.saml import NameID +from saml2.saml import NameID, EncryptedAssertion from saml2.saml import Issuer from saml2.saml import NAMEID_FORMAT_ENTITY from saml2.response import LogoutResponse @@ -63,6 +63,7 @@ from saml2.sigver import CryptoBackendXmlSec1 from saml2.sigver import make_temp from saml2.sigver import pre_encryption_part from saml2.sigver import pre_signature_part +from saml2.sigver import pre_encrypt_assertion from saml2.sigver import signed_instance_factory from saml2.virtual_org import VirtualOrg @@ -502,8 +503,16 @@ class Entity(HTTPBase): def _response(self, in_response_to, consumer_url=None, status=None, issuer=None, sign=False, to_sign=None, - encrypt_assertion=False, encrypt_cert=None, **kwargs): + encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False, + encrypt_cert=None, **kwargs): """ Create a Response. + Encryption: + encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is + true, then will the function try to encrypt the assertion in the the advice element of the main + assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists + in the advice element the main assertion will be encrypted instead, since it's no point to encrypt + If encrypted_advice_attributes is + false the main assertion will be encrypted. Since the same key :param in_response_to: The session identifier of the request :param consumer_url: The URL which should receive the response @@ -533,22 +542,44 @@ class Entity(HTTPBase): return signed_instance_factory(response, self.sec, to_sign) if encrypt_assertion: + node_xpath = None if sign: response.signature = pre_signature_part(response.id, self.sec.my_cert, 1) + sign_class = [(class_name(response), response.id)] cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary) + if encrypted_advice_attributes and response.assertion.advice is not None \ + and len(response.assertion.advice.assertion) == 1: + tmp_assertion = response.assertion.advice.assertion[0] + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + if isinstance(tmp_assertion, list): + response.assertion.advice.encrypted_assertion[0].add_extension_elements(tmp_assertion) + else: + response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion) + response.assertion.advice.assertion = [] + if encrypt_assertion_self_contained: + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = tmp_assertion._to_element_tree().tag + response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + elif encrypt_assertion_self_contained: + assertion_tag = response.assertion._to_element_tree().tag + response = pre_encrypt_assertion(response) + response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( + assertion_tag) + else: + response = pre_encrypt_assertion(response) + if to_sign: + response = signed_instance_factory(response, self.sec, to_sign) _, cert_file = make_temp("%s" % encrypt_cert, decode=False) response = cbxs.encrypt_assertion(response, cert_file, - pre_encryption_part()) + pre_encryption_part(), node_xpath=node_xpath) # template(response.assertion.id)) if sign: - if to_sign: - signed_instance_factory(response, self.sec, to_sign) - else: - # default is to sign the whole response if anything - sign_class = [(class_name(response), response.id)] - return signed_instance_factory(response, self.sec, - sign_class) + return signed_instance_factory(response, self.sec, sign_class) else: return response @@ -946,7 +977,13 @@ class Entity(HTTPBase): decode=False) else: key_file = "" - response = response.verify(key_file) + only_identity_in_encrypted_assertion = False + if "only_identity_in_encrypted_assertion" in kwargs: + only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"] + decrypt = True + if "decrypt" in kwargs: + decrypt = kwargs["decrypt"] + response = response.verify(key_file, decrypt=decrypt) if not response: return None diff --git a/src/saml2/entity_category/at_egov_pvp2.py b/src/saml2/entity_category/at_egov_pvp2.py index ca1a23b..4a041c2 100644 --- a/src/saml2/entity_category/at_egov_pvp2.py +++ b/src/saml2/entity_category/at_egov_pvp2.py @@ -3,7 +3,7 @@ __author__ = 'rhoerbe' #2013-09-05 EGOVTOKEN = ["PVP-VERSION", - "PVP-PRINCIPALNAME", + "PVP-PRINCIPAL-NAME", "PVP-GIVENNAME", "PVP-BIRTHDATE", "PVP-USERID", diff --git a/src/saml2/response.py b/src/saml2/response.py index 8c6332c..bddc032 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -395,7 +395,7 @@ class StatusResponse(object): def loads(self, xmldata, decode=True, origxml=None): return self._loads(xmldata, decode, origxml) - def verify(self, key_file=""): + def verify(self, key_file="", decrypt=True): try: return self._verify() except AssertionError: @@ -622,24 +622,32 @@ class AuthnResponse(StatusResponse): attrlist = enc_attr.extensions_as_elements("Attribute", saml) attribute_statement.attribute.extend(attrlist) + def read_attribute_statement(self, attr_statem): + logger.debug("Attribute Statement: %s" % (attr_statem,)) + for aconv in self.attribute_converters: + logger.debug("Converts name format: %s" % (aconv.name_format,)) + + self.decrypt_attributes(attr_statem) + return to_local(self.attribute_converters, attr_statem, + self.allow_unknown_attributes) + def get_identity(self): """ The assertion can contain zero or one attributeStatements """ - if not self.assertion.attribute_statement: - logger.error("Missing Attribute Statement") - ava = {} - else: + ava = {} + if self.assertion.advice: + if self.assertion.advice.assertion: + for tmp_assertion in self.assertion.advice.assertion: + if tmp_assertion.attribute_statement: + assert len(tmp_assertion.attribute_statement) == 1 + ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0])) + if self.assertion.attribute_statement: assert len(self.assertion.attribute_statement) == 1 _attr_statem = self.assertion.attribute_statement[0] - - logger.debug("Attribute Statement: %s" % (_attr_statem,)) - for aconv in self.attribute_converters: - logger.debug("Converts name format: %s" % (aconv.name_format,)) - - self.decrypt_attributes(_attr_statem) - ava = to_local(self.attribute_converters, _attr_statem, - self.allow_unknown_attributes) + ava.update(self.read_attribute_statement(_attr_statem)) + if not ava: + logger.error("Missing Attribute Statement") return ava def _bearer_confirmed(self, data): @@ -749,11 +757,9 @@ class AuthnResponse(StatusResponse): raise SignatureError("Signature missing for assertion") else: logger.debug("signed") - if not verified: try: - self.sec.check_signature(assertion, class_name(assertion), - self.xmlstr) + self.sec.check_signature(assertion, class_name(assertion),self.xmlstr) except Exception as exc: logger.error("correctly_signed_response: %s" % exc) raise @@ -808,7 +814,7 @@ class AuthnResponse(StatusResponse): res.append(assertion) return res - def parse_assertion(self, key_file=""): + def parse_assertion(self, key_file="", decrypt=True): if self.context == "AuthnQuery": # can contain one or more assertions pass @@ -820,17 +826,35 @@ class AuthnResponse(StatusResponse): raise Exception("No assertion part") res = [] - if self.response.encrypted_assertion: + has_encrypted_assertions = self.response.encrypted_assertion + if not has_encrypted_assertions and self.response.assertion: + for tmp_assertion in self.response.assertion: + if tmp_assertion.advice: + if tmp_assertion.advice.encrypted_assertion: + has_encrypted_assertions = True + break + + if has_encrypted_assertions and decrypt: logger.debug("***Encrypted assertion/-s***") decr_text = self.sec.decrypt(self.xmlstr, key_file) resp = samlp.response_from_string(decr_text) res = self.decrypt_assertions(resp.encrypted_assertion, decr_text) + if resp.assertion: + for tmp_ass in resp.assertion: + if tmp_ass.advice and tmp_ass.advice.encrypted_assertion: + advice_res = self.decrypt_assertions(tmp_ass.advice.encrypted_assertion, decr_text) + if tmp_ass.advice.assertion: + tmp_ass.advice.assertion.extend(advice_res) + else: + tmp_ass.advice.assertion = advice_res + tmp_ass.advice.encrypted_assertion = [] + self.response.assertion = resp.assertion if self.response.assertion: self.response.assertion.extend(res) else: self.response.assertion = res - self.response.encrypted_assertion = [] self.xmlstr = decr_text + self.response.encrypted_assertion = [] if self.response.assertion: logger.debug("***Unencrypted assertion***") @@ -840,10 +864,9 @@ class AuthnResponse(StatusResponse): else: self.assertions.append(assertion) self.assertion = self.assertions[0] - return True - def verify(self, key_file=""): + def verify(self, key_file="", decrypt=True): """ Verify that the assertion is syntactically correct and the signature is correct if present. :param key_file: If not the default key file should be used this is it. @@ -861,7 +884,7 @@ class AuthnResponse(StatusResponse): if not isinstance(self.response, samlp.Response): return self - if self.parse_assertion(key_file): + if self.parse_assertion(key_file, decrypt=decrypt): return self else: logger.error("Could not parse the assertion") @@ -1089,7 +1112,7 @@ class AssertionIDResponse(object): return self._postamble() - def verify(self, key_file=""): + def verify(self, key_file="", decrypt=True): try: valid_instance(self.response) except NotValid as exc: diff --git a/src/saml2/server.py b/src/saml2/server.py index 9ae6bff..c6d3169 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -280,12 +280,50 @@ class Server(Entity): # ------------------------------------------------------------------------ + def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url, name_id, policy, _issuer, + authn_statement, identity, best_effort, sign_response, add_subject=True): + ast = Assertion(identity) + ast.acs = self.config.getattr("attribute_converters", "idp") + if policy is None: + policy = Policy() + try: + ast.apply_policy(sp_entity_id, policy, self.metadata) + except MissingValue, exc: + if not best_effort: + return self.create_error_response(in_response_to, consumer_url, + exc, sign_response) + + if authn: # expected to be a dictionary + # Would like to use dict comprehension but ... + authn_args = dict([ + (AUTHN_DICT_MAP[k], v) for k, v in authn.items() + if k in AUTHN_DICT_MAP]) + + assertion = ast.construct(sp_entity_id, in_response_to, + consumer_url, name_id, + self.config.attribute_converters, + policy, issuer=_issuer, add_subject=add_subject, + **authn_args) + elif authn_statement: # Got a complete AuthnStatement + assertion = ast.construct(sp_entity_id, in_response_to, + consumer_url, name_id, + self.config.attribute_converters, + policy, issuer=_issuer, + authn_statem=authn_statement, add_subject=add_subject) + else: + assertion = ast.construct(sp_entity_id, in_response_to, + consumer_url, name_id, + self.config.attribute_converters, + policy, issuer=_issuer, add_subject=add_subject) + return assertion + def _authn_response(self, in_response_to, consumer_url, sp_entity_id, identity=None, name_id=None, status=None, authn=None, issuer=None, policy=None, sign_assertion=False, sign_response=False, best_effort=False, encrypt_assertion=False, - encrypt_cert=None, authn_statement=None): + encrypt_cert=None, authn_statement=None, + encrypt_assertion_self_contained=False, encrypted_advice_attributes=False): """ Create a response. A layer of indirection. :param in_response_to: The session identifier of the request @@ -309,45 +347,43 @@ class Server(Entity): args = {} #if identity: _issuer = self._issuer(issuer) - ast = Assertion(identity) - ast.acs = self.config.getattr("attribute_converters", "idp") - if policy is None: - policy = Policy() - try: - ast.apply_policy(sp_entity_id, policy, self.metadata) - except MissingValue, exc: - if not best_effort: - return self.create_error_response(in_response_to, consumer_url, - exc, sign_response) - if authn: # expected to be a dictionary - # Would like to use dict comprehension but ... - authn_args = dict([ - (AUTHN_DICT_MAP[k], v) for k, v in authn.items() - if k in AUTHN_DICT_MAP]) + #if encrypt_assertion and show_nameid: + # tmp_name_id = name_id + # name_id = None + # name_id = None + # tmp_authn = authn + # authn = None + # tmp_authn_statement = authn_statement + # authn_statement = None - assertion = ast.construct(sp_entity_id, in_response_to, - consumer_url, name_id, - self.config.attribute_converters, - policy, issuer=_issuer, - **authn_args) - elif authn_statement: # Got a complete AuthnStatement - assertion = ast.construct(sp_entity_id, in_response_to, - consumer_url, name_id, - self.config.attribute_converters, - policy, issuer=_issuer, - authn_statem=authn_statement) + if encrypt_assertion and encrypted_advice_attributes: + assertion_attributes = self.setup_assertion(None, sp_entity_id, None, None, None, policy, + None, None, identity, best_effort, sign_response, False) + assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url, + name_id, policy, _issuer, authn_statement, [], True, + sign_response) + assertion.advice = saml.Advice() + + #assertion.advice.assertion_id_ref.append(saml.AssertionIDRef()) + #assertion.advice.assertion_uri_ref.append(saml.AssertionURIRef()) + assertion.advice.assertion.append(assertion_attributes) else: - assertion = ast.construct(sp_entity_id, in_response_to, - consumer_url, name_id, - self.config.attribute_converters, - policy, issuer=_issuer) + assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url, + name_id, policy, _issuer, authn_statement, identity, True, + sign_response) + to_sign = [] if sign_assertion is not None and sign_assertion: + if assertion.advice and assertion.advice.assertion: + for tmp_assertion in assertion.advice.assertion: + tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1) + to_sign.append((class_name(tmp_assertion), tmp_assertion.id)) assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1) # Just the assertion or the response and the assertion ? - to_sign = [(class_name(assertion), assertion.id)] + to_sign.append((class_name(assertion), assertion.id)) + # Store which assertion that has been sent to which SP about which # subject. @@ -358,12 +394,14 @@ class Server(Entity): args["assertion"] = assertion - if self.support_AssertionIDRequest() or self.support_AuthnQuery(): + if (self.support_AssertionIDRequest() or self.support_AuthnQuery()): self.session_db.store_assertion(assertion, to_sign) return self._response(in_response_to, consumer_url, status, issuer, sign_response, to_sign, encrypt_assertion=encrypt_assertion, - encrypt_cert=encrypt_cert, **args) + encrypt_cert=encrypt_cert, + encrypt_assertion_self_contained=encrypt_assertion_self_contained, + encrypted_advice_attributes=encrypted_advice_attributes, **args) # ------------------------------------------------------------------------ @@ -438,6 +476,8 @@ class Server(Entity): name_id=None, authn=None, issuer=None, sign_response=None, sign_assertion=None, encrypt_cert=None, encrypt_assertion=None, + encrypt_assertion_self_contained=False, + encrypted_advice_attributes=False, **kwargs): """ Constructs an AuthenticationResponse @@ -549,6 +589,8 @@ class Server(Entity): sign_response=sign_response, best_effort=best_effort, encrypt_assertion=encrypt_assertion, + encrypt_assertion_self_contained=encrypt_assertion_self_contained, + encrypted_advice_attributes=encrypted_advice_attributes, encrypt_cert=encrypt_cert) return self._authn_response(in_response_to, # in_response_to destination, # consumer_url @@ -562,6 +604,8 @@ class Server(Entity): sign_response=sign_response, best_effort=best_effort, encrypt_assertion=encrypt_assertion, + encrypt_assertion_self_contained=encrypt_assertion_self_contained, + encrypted_advice_attributes=encrypted_advice_attributes, encrypt_cert=encrypt_cert) except MissingValue, exc: diff --git a/tests/root_cert/localhost.ca.crt b/tests/root_cert/localhost.ca.crt new file mode 100644 index 0000000..c7faff9 --- /dev/null +++ b/tests/root_cert/localhost.ca.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSTCCAbICAQEwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCc2UxCzAJBgNV +BAgTAmFjMQ0wCwYDVQQHEwR1bWVhMRwwGgYDVQQKExNJVFMgVW1lYSBVbml2ZXJz +aXR5MQ0wCwYDVQQLEwRESVJHMRUwEwYDVQQDEwxsb2NhbGhvc3QuY2EwHhcNMTQw +MzE0MDczNDIwWhcNMjQwMzExMDczNDIwWjBtMQswCQYDVQQGEwJzZTELMAkGA1UE +CBMCYWMxDTALBgNVBAcTBHVtZWExHDAaBgNVBAoTE0lUUyBVbWVhIFVuaXZlcnNp +dHkxDTALBgNVBAsTBERJUkcxFTATBgNVBAMTDGxvY2FsaG9zdC5jYTCBnzANBgkq +hkiG9w0BAQEFAAOBjQAwgYkCgYEAzJseOYg+hKGsnGWilv9FrfH2csQ9UZGwgwHz +zR2IquQg/+nONxd4MIwjOnQrLOJuhpu55ZTSpeT901GNDLj4xPQnFrWWyET8NxZg +w7Ilra55iQNaoUWpdi0JQSXI/9CY8t+Y170+7DfBJ6zo4y6+HKaOLNxZy4IbB0SE +aZBGsrUCAwEAATANBgkqhkiG9w0BAQsFAAOBgQAEOQJkD+5fb2mTtxwaZeRyQN9c +If0Xd2E7Z6BstGCUMWa/q4FrNee324kINVsFYg0GBTBwfyYPwR7I70LGjS3gXEzd +RjGx/Z8yvHJyavJj8iRCOflQ0fUlwasFYtrJesPOfM+aJ05Jb8GelUxpKh6BtPlE +IU0FLm//i9ucLQ9zBg== +-----END CERTIFICATE----- diff --git a/tests/root_cert/localhost.ca.key b/tests/root_cert/localhost.ca.key new file mode 100644 index 0000000..a588e7a --- /dev/null +++ b/tests/root_cert/localhost.ca.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDMmx45iD6EoaycZaKW/0Wt8fZyxD1RkbCDAfPNHYiq5CD/6c43 +F3gwjCM6dCss4m6Gm7nllNKl5P3TUY0MuPjE9CcWtZbIRPw3FmDDsiWtrnmJA1qh +Ral2LQlBJcj/0Jjy35jXvT7sN8EnrOjjLr4cpo4s3FnLghsHRIRpkEaytQIDAQAB +AoGAMw6aUjz/bNVzX2u1UPzOhIOWvjjeHFbAt1BraEnwasSWv4W2oeTHZ0XxHIsU +oxS2A/0kPHgQwLkN5ge5rO0TlpAI5X9ZqlJ0SXF5zjJOBtyK6TWoUbwnyzS7lbFC +q9AVrHwMX9uNCboccqzjrzHyNE+4/QT7z2G5AMzjfq+5EwECQQDvOJuUl2pbUUIK +nMCmwkARFEZzZYV2oIBDsagTG8gX7glj5stoYXuez8EnYtNHRDBConyKqruuzqJk +qSKlha7hAkEA2vT4CpAzHSCknwQKXmFwBD5hVBrv+JZSur6XpqEdwXkX2osScAaW +xj3vQEQorJC2CryvUVOTeuFoog0f+6HiVQJBAMs0dMQ2ErxbPBQzr1p4K1/Wrzmb +BVINaKcYJEOHF+Nr6kIYbLTQCeiPZe4E/p/NBomz6MMJ4L/O+xcyrSGZe0ECQQCZ +ejELpnxNpH4AAKML+Ry9vMQYYjFnfGdNAx/l6vWikjEIPYeVAulY2D0GPUCNhXo1 +GIGDbiPodGwVe0G57oVpAkBjckA1LEE1Kzkq5sR9U9t9m+3WSBvMZxLUwJYdVmOY +Y6xKVtJfe9XuQt9tzoWQW8iieWyKOw7yLYCBcjns2iZn +-----END RSA PRIVATE KEY----- diff --git a/tests/test_33_identifier.py b/tests/test_33_identifier.py index 14036b5..6db5a41 100644 --- a/tests/test_33_identifier.py +++ b/tests/test_33_identifier.py @@ -55,6 +55,10 @@ NAME_ID_POLICY_2 = """ class TestIdentifier(): def setup_class(self): + try: + os.remove("subject.db.db") + except: + pass self.id = IdentDB("subject.db", "example.com", "example") def test_persistent_1(self): diff --git a/tests/test_50_server.py b/tests/test_50_server.py index af2290b..4f3c414 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -1,9 +1,13 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import base64 +import os from contextlib import closing from urlparse import parse_qs -from saml2.sigver import pre_encryption_part +import uuid + +from saml2.cert import OpenSSLWrapper +from saml2.sigver import pre_encryption_part, make_temp from saml2.assertion import Policy from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT @@ -41,6 +45,40 @@ def _eq(l1, l2): return set(l1) == set(l2) +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def get_ava(assertion): + ava = {} + for statement in assertion.attribute_statement: + for attr in statement.attribute: + value = [] + for tmp_val in attr.attribute_value: + value.append(tmp_val.text) + key = attr.friendly_name + if key is None or len(key) == 0: + key = attr.text + ava[key] = value + return ava + + +def generate_cert(): + sn = uuid.uuid4().urn + cert_info = { + "cn": "localhost", + "country_code": "se", + "state": "ac", + "city": "Umea", + "organization": "ITS", + "organization_unit": "DIRG" + } + osw = OpenSSLWrapper() + ca_cert_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.crt") + ca_key_str = osw.read_str_from_file("/Users/haho0032/Develop/root_cert/localhost.ca.key") + req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + return cert_str, req_key_str + class TestServer1(): def setup_class(self): self.server = Server("idp_conf") @@ -372,6 +410,354 @@ class TestServer1(): # value. Just that there should be one assert assertion.signature.signature_value.text != "" + + def test_encrypted_signed_response_1(self): + name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + ava = {"givenName": ["Derek"], "surName": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} + + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=True, + encrypt_cert=cert_str, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + _, key_file = make_temp("%s" % cert_key_str, decode=False) + + decr_text = self.server.sec.decrypt(signed_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + valid = self.server.sec.verify_signature(decr_text, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=resp.assertion[0].id, + id_attr="") + assert valid + + assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements, + [saml, samlp]) + assert assertion + assert assertion[0].attribute_statement + + ava = ava = get_ava(assertion[0]) + + assert ava ==\ + {'mail': ['derek@nyy.mlb.com'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']} + + assert 'EncryptedAssertion>