Not lists but strings as values in dictionary input to verify_redirect_signature.
This commit is contained in:
@@ -611,18 +611,18 @@ RESP_ORDER = ["SAMLResponse", "RelayState", "SigAlg"]
|
|||||||
def verify_redirect_signature(saml_msg, cert=None, sigkey=None):
|
def verify_redirect_signature(saml_msg, cert=None, sigkey=None):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param saml_msg: A dictionary as produced by parse_qs, means all values are
|
:param saml_msg: A dictionary with strings as values, *NOT* lists as
|
||||||
lists.
|
produced by parse_qs.
|
||||||
:param cert: A certificate to use when verifying the signature
|
:param cert: A certificate to use when verifying the signature
|
||||||
:return: True, if signature verified
|
:return: True, if signature verified
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
signer = SIGNER_ALGS[saml_msg["SigAlg"][0]]
|
signer = SIGNER_ALGS[saml_msg["SigAlg"]]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise Unsupported("Signature algorithm: %s" % saml_msg["SigAlg"])
|
raise Unsupported("Signature algorithm: %s" % saml_msg["SigAlg"])
|
||||||
else:
|
else:
|
||||||
if saml_msg["SigAlg"][0] in SIGNER_ALGS:
|
if saml_msg["SigAlg"] in SIGNER_ALGS:
|
||||||
if "SAMLRequest" in saml_msg:
|
if "SAMLRequest" in saml_msg:
|
||||||
_order = REQ_ORDER
|
_order = REQ_ORDER
|
||||||
elif "SAMLResponse" in saml_msg:
|
elif "SAMLResponse" in saml_msg:
|
||||||
@@ -631,15 +631,15 @@ def verify_redirect_signature(saml_msg, cert=None, sigkey=None):
|
|||||||
raise Unsupported(
|
raise Unsupported(
|
||||||
"Verifying signature on something that should not be "
|
"Verifying signature on something that should not be "
|
||||||
"signed")
|
"signed")
|
||||||
args = saml_msg.copy()
|
_args = saml_msg.copy()
|
||||||
del args["Signature"] # everything but the signature
|
del _args["Signature"] # everything but the signature
|
||||||
string = "&".join(
|
string = "&".join(
|
||||||
[urllib.urlencode({k: args[k][0]}) for k in _order if k in args])
|
[urllib.urlencode({k: _args[k]}) for k in _order if k in _args])
|
||||||
if cert:
|
if cert:
|
||||||
_key = extract_rsa_key_from_x509_cert(pem_format(cert))
|
_key = extract_rsa_key_from_x509_cert(pem_format(cert))
|
||||||
else:
|
else:
|
||||||
_key = sigkey
|
_key = sigkey
|
||||||
_sign = base64.b64decode(saml_msg["Signature"][0])
|
_sign = base64.b64decode(saml_msg["Signature"])
|
||||||
|
|
||||||
return bool(signer.verify(string, _sign, _key))
|
return bool(signer.verify(string, _sign, _key))
|
||||||
|
|
||||||
|
@@ -24,8 +24,9 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion
|
|||||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
||||||
from saml2.saml import NameID
|
from saml2.saml import NameID
|
||||||
from saml2.server import Server
|
from saml2.server import Server
|
||||||
from saml2.sigver import pre_encryption_part, rm_xmltag, \
|
from saml2.sigver import pre_encryption_part
|
||||||
verify_redirect_signature
|
from saml2.sigver import rm_xmltag
|
||||||
|
from saml2.sigver import verify_redirect_signature
|
||||||
from saml2.s_utils import do_attribute_statement
|
from saml2.s_utils import do_attribute_statement
|
||||||
from saml2.s_utils import factory
|
from saml2.s_utils import factory
|
||||||
from saml2.time_util import in_a_while
|
from saml2.time_util import in_a_while
|
||||||
@@ -114,6 +115,10 @@ nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
|
|||||||
text="123456")
|
text="123456")
|
||||||
|
|
||||||
|
|
||||||
|
def list_values2simpletons(_dict):
|
||||||
|
return dict([(k, v[0]) for k, v in _dict.items()])
|
||||||
|
|
||||||
|
|
||||||
class TestClient:
|
class TestClient:
|
||||||
def setup_class(self):
|
def setup_class(self):
|
||||||
self.server = Server("idp_conf")
|
self.server = Server("idp_conf")
|
||||||
@@ -510,7 +515,8 @@ class TestClient:
|
|||||||
assert _leq(qs.keys(),
|
assert _leq(qs.keys(),
|
||||||
['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
|
['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
|
||||||
|
|
||||||
assert verify_redirect_signature(qs, sigkey=key)
|
assert verify_redirect_signature(list_values2simpletons(qs),
|
||||||
|
sigkey=key)
|
||||||
|
|
||||||
res = self.server.parse_authn_request(qs["SAMLRequest"][0],
|
res = self.server.parse_authn_request(qs["SAMLRequest"][0],
|
||||||
BINDING_HTTP_REDIRECT)
|
BINDING_HTTP_REDIRECT)
|
||||||
|
@@ -13,6 +13,11 @@ from pathutils import dotname
|
|||||||
|
|
||||||
__author__ = 'rolandh'
|
__author__ = 'rolandh'
|
||||||
|
|
||||||
|
|
||||||
|
def list_values2simpletons(_dict):
|
||||||
|
return dict([(k, v[0]) for k, v in _dict.items()])
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
with closing(Server(config_file=dotname("idp_all_conf"))) as idp:
|
with closing(Server(config_file=dotname("idp_all_conf"))) as idp:
|
||||||
conf = SPConfig()
|
conf = SPConfig()
|
||||||
@@ -41,7 +46,12 @@ def test():
|
|||||||
_dict = parse_qs(val.split("?")[1])
|
_dict = parse_qs(val.split("?")[1])
|
||||||
_certs = idp.metadata.certs(sp.config.entityid, "any", "signing")
|
_certs = idp.metadata.certs(sp.config.entityid, "any", "signing")
|
||||||
for cert in _certs:
|
for cert in _certs:
|
||||||
if verify_redirect_signature(_dict, cert):
|
if verify_redirect_signature(
|
||||||
|
list_values2simpletons(_dict), cert):
|
||||||
verified_ok = True
|
verified_ok = True
|
||||||
|
|
||||||
assert verified_ok
|
assert verified_ok
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test()
|
Reference in New Issue
Block a user