Partial commit for decrpyting and verifying signatures at the client. All tests works.
This commit is contained in:
@@ -542,7 +542,7 @@ class Base(Entity):
|
||||
# ======== response handling ===========
|
||||
|
||||
def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
|
||||
outstanding_certs=None, decrypt=True):
|
||||
outstanding_certs=None, decrypt=True, pefim=False):
|
||||
""" Deal with an AuthnResponse
|
||||
|
||||
:param xmlstr: The reply as a xml string
|
||||
@@ -578,7 +578,7 @@ class Base(Entity):
|
||||
try:
|
||||
resp = self._parse_response(xmlstr, AuthnResponse,
|
||||
"assertion_consumer_service",
|
||||
binding, **kwargs)
|
||||
binding, pefim=pefim, **kwargs)
|
||||
except StatusError as err:
|
||||
logger.error("SAML status error: %s" % err)
|
||||
raise
|
||||
|
@@ -978,7 +978,7 @@ class Entity(HTTPBase):
|
||||
# ------------------------------------------------------------------------
|
||||
|
||||
def _parse_response(self, xmlstr, response_cls, service, binding,
|
||||
outstanding_certs=None, **kwargs):
|
||||
outstanding_certs=None, pefim=False, **kwargs):
|
||||
""" Deal with a Response
|
||||
|
||||
:param xmlstr: The response as a xml string
|
||||
@@ -1056,7 +1056,7 @@ class Entity(HTTPBase):
|
||||
decrypt = True
|
||||
if "decrypt" in kwargs:
|
||||
decrypt = kwargs["decrypt"]
|
||||
response = response.verify(key_file, decrypt=decrypt)
|
||||
response = response.verify(key_file, decrypt=decrypt, pefim=pefim)
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
@@ -395,7 +395,7 @@ class StatusResponse(object):
|
||||
def loads(self, xmldata, decode=True, origxml=None):
|
||||
return self._loads(xmldata, decode, origxml)
|
||||
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
try:
|
||||
return self._verify()
|
||||
except AssertionError:
|
||||
@@ -780,10 +780,9 @@ class AuthnResponse(StatusResponse):
|
||||
|
||||
logger.debug("--- Getting Identity ---")
|
||||
|
||||
if self.context == "AuthnReq" or self.context == "AttrQuery":
|
||||
self.ava = self.get_identity()
|
||||
|
||||
logger.debug("--- AVA: %s" % (self.ava,))
|
||||
#if self.context == "AuthnReq" or self.context == "AttrQuery":
|
||||
# self.ava = self.get_identity()
|
||||
# logger.debug("--- AVA: %s" % (self.ava,))
|
||||
|
||||
try:
|
||||
self.get_subject()
|
||||
@@ -808,13 +807,12 @@ class AuthnResponse(StatusResponse):
|
||||
if not self.sec.check_signature(
|
||||
assertion, origdoc=decr_txt,
|
||||
node_name=class_name(assertion), issuer=issuer):
|
||||
logger.error(
|
||||
"Failed to verify signature on '%s'" % assertion)
|
||||
logger.error("Failed to verify signature on '%s'" % assertion)
|
||||
raise SignatureError()
|
||||
res.append(assertion)
|
||||
return res
|
||||
|
||||
def parse_assertion(self, key_file="", decrypt=True):
|
||||
def parse_assertion(self, key_file="", decrypt=True, pefim=False):
|
||||
if self.context == "AuthnQuery":
|
||||
# can contain one or more assertions
|
||||
pass
|
||||
@@ -825,20 +823,28 @@ class AuthnResponse(StatusResponse):
|
||||
except AssertionError:
|
||||
raise Exception("No assertion part")
|
||||
|
||||
res = []
|
||||
has_encrypted_assertions = self.response.encrypted_assertion
|
||||
if not has_encrypted_assertions and self.response.assertion:
|
||||
for tmp_assertion in self.response.assertion:
|
||||
if tmp_assertion.advice:
|
||||
if tmp_assertion.advice.encrypted_assertion:
|
||||
if tmp_assertion.advice.encrypted_assertion:
|
||||
has_encrypted_assertions = True
|
||||
break
|
||||
|
||||
if self.response.assertion:
|
||||
logger.debug("***Unencrypted assertion***")
|
||||
for assertion in self.response.assertion:
|
||||
if not self._assertion(assertion, False):
|
||||
return False
|
||||
|
||||
if has_encrypted_assertions and decrypt:
|
||||
_enc_assertions = []
|
||||
logger.debug("***Encrypted assertion/-s***")
|
||||
decr_text = self.sec.decrypt(self.xmlstr, key_file)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
res = self.decrypt_assertions(resp.encrypted_assertion, decr_text)
|
||||
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text)
|
||||
decr_text = self.sec.decrypt(decr_text, key_file)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
if resp.assertion:
|
||||
for tmp_ass in resp.assertion:
|
||||
if tmp_ass.advice and tmp_ass.advice.encrypted_assertion:
|
||||
@@ -849,26 +855,32 @@ class AuthnResponse(StatusResponse):
|
||||
tmp_ass.advice.assertion.extend(advice_res)
|
||||
else:
|
||||
tmp_ass.advice.assertion = advice_res
|
||||
if not pefim:
|
||||
_enc_assertions.extend(advice_res)
|
||||
tmp_ass.advice.encrypted_assertion = []
|
||||
self.response.assertion = resp.assertion
|
||||
if self.response.assertion:
|
||||
self.response.assertion.extend(res)
|
||||
else:
|
||||
self.response.assertion = res
|
||||
self.response.assertion = resp.assertion
|
||||
for assertion in _enc_assertions:
|
||||
if not self._assertion(assertion, True):
|
||||
return False
|
||||
self.xmlstr = decr_text
|
||||
self.response.encrypted_assertion = []
|
||||
|
||||
if self.response.assertion:
|
||||
logger.debug("***Unencrypted assertion***")
|
||||
for assertion in self.response.assertion:
|
||||
if not self._assertion(assertion, assertion in res):
|
||||
return False
|
||||
else:
|
||||
self.assertions.append(assertion)
|
||||
if assertion.advice and assertion.advice.assertion:
|
||||
for advice_assertion in assertion.advice.assertion:
|
||||
self.assertions.append(assertion)
|
||||
|
||||
if self.assertions and len(self.assertions) > 0:
|
||||
self.assertion = self.assertions[0]
|
||||
|
||||
if self.context == "AuthnReq" or self.context == "AttrQuery":
|
||||
self.ava = self.get_identity()
|
||||
logger.debug("--- AVA: %s" % (self.ava,))
|
||||
|
||||
return True
|
||||
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
""" Verify that the assertion is syntactically correct and
|
||||
the signature is correct if present.
|
||||
:param key_file: If not the default key file should be used this is it.
|
||||
@@ -886,7 +898,7 @@ class AuthnResponse(StatusResponse):
|
||||
if not isinstance(self.response, samlp.Response):
|
||||
return self
|
||||
|
||||
if self.parse_assertion(key_file, decrypt=decrypt):
|
||||
if self.parse_assertion(key_file, decrypt=decrypt, pefim=pefim):
|
||||
return self
|
||||
else:
|
||||
logger.error("Could not parse the assertion")
|
||||
@@ -1114,7 +1126,7 @@ class AssertionIDResponse(object):
|
||||
|
||||
return self._postamble()
|
||||
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
def verify(self, key_file="", decrypt=True, pefim=False):
|
||||
try:
|
||||
valid_instance(self.response)
|
||||
except NotValid as exc:
|
||||
|
@@ -1306,9 +1306,17 @@ class SecurityContext(object):
|
||||
:param enctext: The encrypted text as a string
|
||||
:return: The decrypted text
|
||||
"""
|
||||
_enctext = self.crypto.decrypt(enctext, self.key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
if key_file is not None and len(key_file.strip()) > 0:
|
||||
return self.crypto.decrypt(enctext, key_file)
|
||||
return self.crypto.decrypt(enctext, self.key_file)
|
||||
_enctext = self.crypto.decrypt(enctext, key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
_enctext = self.crypto.decrypt(enctext, self.key_file)
|
||||
if _enctext is not None and len(_enctext) > 0:
|
||||
return _enctext
|
||||
return enctext
|
||||
|
||||
def verify_signature(self, signedtext, cert_file=None, cert_type="pem",
|
||||
node_name=NODE_NAME, node_id=None, id_attr=""):
|
||||
|
@@ -2,9 +2,11 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import uuid
|
||||
import six
|
||||
import urllib
|
||||
import urlparse
|
||||
from saml2.cert import OpenSSLWrapper
|
||||
from saml2.xmldsig import SIG_RSA_SHA256
|
||||
from saml2 import BINDING_HTTP_POST
|
||||
from saml2 import BINDING_HTTP_REDIRECT
|
||||
@@ -25,7 +27,7 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
|
||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
||||
from saml2.saml import NameID
|
||||
from saml2.server import Server
|
||||
from saml2.sigver import pre_encryption_part
|
||||
from saml2.sigver import pre_encryption_part, make_temp
|
||||
from saml2.sigver import rm_xmltag
|
||||
from saml2.sigver import verify_redirect_signature
|
||||
from saml2.s_utils import do_attribute_statement
|
||||
@@ -42,6 +44,28 @@ AUTHN = {
|
||||
}
|
||||
|
||||
|
||||
def generate_cert():
|
||||
sn = uuid.uuid4().urn
|
||||
cert_info = {
|
||||
"cn": "localhost",
|
||||
"country_code": "se",
|
||||
"state": "ac",
|
||||
"city": "Umea",
|
||||
"organization": "ITS",
|
||||
"organization_unit": "DIRG"
|
||||
}
|
||||
osw = OpenSSLWrapper()
|
||||
ca_cert_str = osw.read_str_from_file(
|
||||
full_path("root_cert/localhost.ca.crt"))
|
||||
ca_key_str = osw.read_str_from_file(
|
||||
full_path("root_cert/localhost.ca.key"))
|
||||
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True,
|
||||
sn=sn, key_length=2048)
|
||||
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
|
||||
req_cert_str)
|
||||
return cert_str, req_key_str
|
||||
|
||||
|
||||
def add_subelement(xmldoc, node_name, subelem):
|
||||
s = xmldoc.find(node_name)
|
||||
if s > 0:
|
||||
@@ -292,7 +316,7 @@ class TestClient:
|
||||
except Exception: # missing certificate
|
||||
self.client.sec.verify_signature(ar_str, node_name=class_name(ar))
|
||||
|
||||
def test_response(self):
|
||||
def test_response_1(self):
|
||||
IDP = "urn:mace:example.com:saml:roland:idp"
|
||||
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
|
||||
@@ -368,6 +392,116 @@ class TestClient:
|
||||
print(issuers)
|
||||
assert issuers == [[IDP], [IDP]]
|
||||
|
||||
def test_response_2(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
cert_str, cert_key_str = generate_cert()
|
||||
|
||||
cert =\
|
||||
{
|
||||
"cert": cert_str,
|
||||
"key": cert_key_str
|
||||
}
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=False,
|
||||
encrypt_assertion_self_contained=True,
|
||||
#encrypted_advice_attributes=True,
|
||||
pefim=True,
|
||||
encrypt_cert_advice=cert_str
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, {"id1": cert}, pefim=True)
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def test_response_3(self):
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("server_conf")
|
||||
_client = Saml2Client(conf)
|
||||
|
||||
idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
|
||||
|
||||
self.name_id = self.server.ident.transient_nameid(
|
||||
"urn:mace:example.com:saml:roland:sp", "id1")
|
||||
|
||||
resp = self.server.create_authn_response(
|
||||
identity=ava,
|
||||
in_response_to="id1",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
sp_entity_id="urn:mace:example.com:saml:roland:sp",
|
||||
#name_id_policy=nameid_policy,
|
||||
name_id=self.name_id,
|
||||
userid="foba0001@example.com",
|
||||
authn=AUTHN,
|
||||
sign_response=True,
|
||||
sign_assertion=True,
|
||||
encrypt_assertion=False,
|
||||
encrypt_assertion_self_contained=True,
|
||||
#encrypted_advice_attributes=True,
|
||||
pefim=True,
|
||||
)
|
||||
|
||||
resp_str = "%s" % resp
|
||||
|
||||
resp_str = base64.encodestring(resp_str)
|
||||
|
||||
authn_response = _client.parse_authn_request_response(
|
||||
resp_str, BINDING_HTTP_POST,
|
||||
{"id1": "http://foo.example.com/service"}, pefim=True)
|
||||
|
||||
self.verify_authn_response(idp, authn_response, _client, ava_verify)
|
||||
|
||||
def setup_verify_authn_response(self):
|
||||
idp = "urn:mace:example.com:saml:roland:idp"
|
||||
ava = {"givenName": ["Derek"], "surName": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]}
|
||||
ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], 'sn': ['Jeter'], 'title': ["The man"]}
|
||||
nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT)
|
||||
return idp, ava, ava_verify, nameid_policy
|
||||
|
||||
|
||||
def verify_authn_response(self, idp, authn_response, _client, ava_verify):
|
||||
assert authn_response is not None
|
||||
assert authn_response.issuer() == idp
|
||||
assert authn_response.response.assertion[0].issuer.text == idp
|
||||
session_info = authn_response.session_info()
|
||||
|
||||
assert session_info["ava"] == ava_verify
|
||||
assert session_info["issuer"] == idp
|
||||
assert session_info["came_from"] == "http://foo.example.com/service"
|
||||
response = samlp.response_from_string(authn_response.xmlstr)
|
||||
assert response.destination == "http://lingon.catalogix.se:8087/"
|
||||
|
||||
# One person in the cache
|
||||
assert len(_client.users.subjects()) == 1
|
||||
subject_id = _client.users.subjects()[0]
|
||||
# The information I have about the subject comes from one source
|
||||
assert _client.users.issuers_of_info(subject_id) == [idp]
|
||||
|
||||
|
||||
def test_init_values(self):
|
||||
entityid = self.client.config.entityid
|
||||
print(entityid)
|
||||
@@ -764,4 +898,4 @@ class TestClientWithDummy():
|
||||
if __name__ == "__main__":
|
||||
tc = TestClient()
|
||||
tc.setup_class()
|
||||
tc.test_sign_then_encrypt_assertion_advice()
|
||||
tc.test_response_3()
|
||||
|
Reference in New Issue
Block a user