Updated pysaml2 to support PEFIM.
Added a decrypt flag so a proxy can choose not to decrypt an encrypted assertion. Fix so an signature on a response is always validated. Moved back to original solution. The only use case where the signature should be validated is if the proxy is transparent and the signature is designated for the Service Provider. This use case is no longer valid and if it is to be used a new flag must be created, like never_validate_signature. The default value of never_validate_signature is False.
This commit is contained in:
@@ -537,7 +537,7 @@ class Base(Entity):
|
||||
# ======== response handling ===========
|
||||
|
||||
def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
|
||||
outstanding_certs=None):
|
||||
outstanding_certs=None, decrypt=True):
|
||||
""" Deal with an AuthnResponse
|
||||
|
||||
:param xmlstr: The reply as a xml string
|
||||
@@ -567,7 +567,8 @@ class Base(Entity):
|
||||
"entity_id": self.config.entityid,
|
||||
"attribute_converters": self.config.attribute_converters,
|
||||
"allow_unknown_attributes":
|
||||
self.config.allow_unknown_attributes
|
||||
self.config.allow_unknown_attributes,
|
||||
"decrypt": decrypt
|
||||
}
|
||||
try:
|
||||
resp = self._parse_response(xmlstr, AuthnResponse,
|
||||
|
@@ -973,7 +973,10 @@ class Entity(HTTPBase):
|
||||
only_identity_in_encrypted_assertion = False
|
||||
if "only_identity_in_encrypted_assertion" in kwargs:
|
||||
only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"]
|
||||
response = response.verify(key_file)
|
||||
decrypt = True
|
||||
if "decrypt" in kwargs:
|
||||
decrypt = kwargs["decrypt"]
|
||||
response = response.verify(key_file, decrypt=decrypt)
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
@@ -395,7 +395,7 @@ class StatusResponse(object):
|
||||
def loads(self, xmldata, decode=True, origxml=None):
|
||||
return self._loads(xmldata, decode, origxml)
|
||||
|
||||
def verify(self, key_file=""):
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
try:
|
||||
return self._verify()
|
||||
except AssertionError:
|
||||
@@ -759,9 +759,7 @@ class AuthnResponse(StatusResponse):
|
||||
logger.debug("signed")
|
||||
if not verified:
|
||||
try:
|
||||
if self.require_signature:
|
||||
self.sec.check_signature(assertion, class_name(assertion),
|
||||
self.xmlstr)
|
||||
self.sec.check_signature(assertion, class_name(assertion),self.xmlstr)
|
||||
except Exception as exc:
|
||||
logger.error("correctly_signed_response: %s" % exc)
|
||||
raise
|
||||
@@ -816,7 +814,7 @@ class AuthnResponse(StatusResponse):
|
||||
res.append(assertion)
|
||||
return res
|
||||
|
||||
def parse_assertion(self, key_file=""):
|
||||
def parse_assertion(self, key_file="", decrypt=True):
|
||||
if self.context == "AuthnQuery":
|
||||
# can contain one or more assertions
|
||||
pass
|
||||
@@ -836,7 +834,7 @@ class AuthnResponse(StatusResponse):
|
||||
has_encrypted_assertions = True
|
||||
break
|
||||
|
||||
if has_encrypted_assertions and key_file is not None and len(key_file) > 0:
|
||||
if has_encrypted_assertions and decrypt:
|
||||
logger.debug("***Encrypted assertion/-s***")
|
||||
decr_text = self.sec.decrypt(self.xmlstr, key_file)
|
||||
resp = samlp.response_from_string(decr_text)
|
||||
@@ -868,7 +866,7 @@ class AuthnResponse(StatusResponse):
|
||||
self.assertion = self.assertions[0]
|
||||
return True
|
||||
|
||||
def verify(self, key_file=""):
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
""" Verify that the assertion is syntactically correct and
|
||||
the signature is correct if present.
|
||||
:param key_file: If not the default key file should be used this is it.
|
||||
@@ -886,7 +884,7 @@ class AuthnResponse(StatusResponse):
|
||||
if not isinstance(self.response, samlp.Response):
|
||||
return self
|
||||
|
||||
if self.parse_assertion(key_file):
|
||||
if self.parse_assertion(key_file, decrypt=decrypt):
|
||||
return self
|
||||
else:
|
||||
logger.error("Could not parse the assertion")
|
||||
@@ -1114,7 +1112,7 @@ class AssertionIDResponse(object):
|
||||
|
||||
return self._postamble()
|
||||
|
||||
def verify(self, key_file=""):
|
||||
def verify(self, key_file="", decrypt=True):
|
||||
try:
|
||||
valid_instance(self.response)
|
||||
except NotValid as exc:
|
||||
|
@@ -55,6 +55,10 @@ NAME_ID_POLICY_2 = """<?xml version="1.0" encoding="utf-8"?>
|
||||
|
||||
class TestIdentifier():
|
||||
def setup_class(self):
|
||||
try:
|
||||
os.remove("subject.db.db")
|
||||
except:
|
||||
pass
|
||||
self.id = IdentDB("subject.db", "example.com", "example")
|
||||
|
||||
def test_persistent_1(self):
|
||||
|
Reference in New Issue
Block a user