From 746a45b5fe445ab044909a7a1d9620ef16636912 Mon Sep 17 00:00:00 2001 From: Roland Hedberg Date: Thu, 21 Feb 2013 13:08:20 +0100 Subject: [PATCH] Added support for signing/verifying messages when using the HTTP-Redirect binding. --- src/saml2/pack.py | 93 +++---------------------------- src/saml2/sigver.py | 89 +++++++++++++++++++++++++++-- tests/test_70_redirect_signing.py | 44 +++++++++++++++ 3 files changed, 134 insertions(+), 92 deletions(-) create mode 100644 tests/test_70_redirect_signing.py diff --git a/src/saml2/pack.py b/src/saml2/pack.py index 4cf5996..eed060c 100644 --- a/src/saml2/pack.py +++ b/src/saml2/pack.py @@ -23,15 +23,18 @@ Bindings normally consists of three parts: - how to package the information - which protocol to use """ -import hashlib import urlparse import saml2 import base64 import urllib -from saml2.s_utils import deflate_and_base64_encode, Unsupported +from saml2.s_utils import deflate_and_base64_encode +from saml2.s_utils import Unsupported import logging -import M2Crypto -from saml2.sigver import RSA_SHA1, rsa_load, x509_rsa_loads, pem_format +from saml2.sigver import RSA_SHA1 +from saml2.sigver import REQ_ORDER +from saml2.sigver import RESP_ORDER +from saml2.sigver import RSASigner +from saml2.sigver import sha1_digest logger = logging.getLogger(__name__) @@ -86,56 +89,6 @@ def http_form_post_message(message, location, relay_state="", return {"headers": [("Content-type", "text/html")], "data": response} -##noinspection PyUnresolvedReferences -#def http_post_message(message, location, relay_state="", typ="SAMLRequest"): -# """ -# -# :param message: -# :param location: -# :param relay_state: -# :param typ: -# :return: -# """ -# return {"headers": [("Content-type", "text/xml")], "data": message} - - -class BadSignature(Exception): - """The signature is invalid.""" - pass - - -def sha1_digest(msg): - return hashlib.sha1(msg).digest() - - -class Signer(object): - """Abstract base class for signing algorithms.""" - def sign(self, msg, key): - """Sign ``msg`` with ``key`` and return the signature.""" - raise NotImplementedError - - def verify(self, msg, sig, key): - """Return True if ``sig`` is a valid signature for ``msg``.""" - raise NotImplementedError - - -class RSASigner(Signer): - def __init__(self, digest, algo): - self.digest = digest - self.algo = algo - - def sign(self, msg, key): - return key.sign(self.digest(msg), self.algo) - - def verify(self, msg, sig, key): - try: - return key.verify(self.digest(msg), sig, self.algo) - except M2Crypto.RSA.RSAError, e: - raise BadSignature(e) - - -REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"] -RESP_ORDER = ["SAMLResponse", "RelayState", "SigAlg"] def http_redirect_message(message, location, relay_state="", typ="SAMLRequest", sigalg=None, key=None): @@ -198,38 +151,6 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest", return {"headers": headers, "data": body} -def verify_redirect_signature(info, cert): - """ - - :param info: A dictionary as produced by parse_qs, means all values are - lists. - :param cert: A certificate to use when verifying the signature - :return: True, if signature verified - """ - - if info["SigAlg"][0] == RSA_SHA1: - if "SAMLRequest" in info: - _order = REQ_ORDER - elif "SAMLResponse" in info: - _order = RESP_ORDER - else: - raise Unsupported( - "Verifying signature on something that should not be signed") - signer = RSASigner(sha1_digest, "sha1") - args = info.copy() - del args["Signature"] # everything but the signature - string = "&".join([urllib.urlencode({k: args[k][0]}) for k in _order]) - _key = x509_rsa_loads(pem_format(cert)) - _sign = base64.b64decode(info["Signature"][0]) - try: - signer.verify(string, _sign, _key) - return True - except BadSignature: - return False - else: - raise Unsupported("Signature algorithm: %s" % info["SigAlg"]) - - DUMMY_NAMESPACE = "http://example.org/" PREFIX = '' diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index f4e58fe..2422a37 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -21,11 +21,13 @@ Based on the use of xmlsec1 binaries and not the python xmlsec module. import base64 from binascii import hexlify +import hashlib import logging import random import os import sys from time import mktime +import urllib import M2Crypto from M2Crypto.X509 import load_cert_string from saml2.samlp import Response @@ -39,8 +41,11 @@ from saml2 import ExtensionElement from saml2 import VERSION from saml2.s_utils import sid +from saml2.s_utils import Unsupported -from saml2.time_util import instant, utc_now, str_to_time +from saml2.time_util import instant +from saml2.time_util import utc_now +from saml2.time_util import str_to_time from tempfile import NamedTemporaryFile from subprocess import Popen, PIPE @@ -315,7 +320,7 @@ def active_cert(key): return False -def cert_from_key_info(key_info): +def cert_from_key_info(key_info, ignore_age=False): """ Get all X509 certs from a KeyInfo instance. Care is taken to make sure that the certs are continues sequences of bytes. @@ -333,14 +338,14 @@ def cert_from_key_info(key_info): cert = x509_certificate.text.strip() cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) - if active_cert(cert): + if ignore_age or active_cert(cert): res.append(cert) else: logger.info("Inactive cert") return res -def cert_from_key_info_dict(key_info): +def cert_from_key_info_dict(key_info, ignore_age=False): """ Get all X509 certs from a KeyInfo dictionary. Care is taken to make sure that the certs are continues sequences of bytes. @@ -360,7 +365,7 @@ def cert_from_key_info_dict(key_info): cert = x509_certificate["text"].strip() cert = "\n".join(split_len("".join([s.strip() for s in cert.split()]), 64)) - if active_cert(cert): + if ignore_age or active_cert(cert): res.append(cert) else: logger.info("Inactive cert") @@ -375,7 +380,8 @@ def cert_from_instance(instance): """ if instance.signature: if instance.signature.key_info: - return cert_from_key_info(instance.signature.key_info) + return cert_from_key_info(instance.signature.key_info, + ignore_age=True) return [] # ============================================================================= @@ -472,6 +478,77 @@ def parse_xmlsec_output(output): __DEBUG = 0 + +class BadSignature(Exception): + """The signature is invalid.""" + pass + + +def sha1_digest(msg): + return hashlib.sha1(msg).digest() + + +class Signer(object): + """Abstract base class for signing algorithms.""" + def sign(self, msg, key): + """Sign ``msg`` with ``key`` and return the signature.""" + raise NotImplementedError + + def verify(self, msg, sig, key): + """Return True if ``sig`` is a valid signature for ``msg``.""" + raise NotImplementedError + + +class RSASigner(Signer): + def __init__(self, digest, algo): + self.digest = digest + self.algo = algo + + def sign(self, msg, key): + return key.sign(self.digest(msg), self.algo) + + def verify(self, msg, sig, key): + try: + return key.verify(self.digest(msg), sig, self.algo) + except M2Crypto.RSA.RSAError, e: + raise BadSignature(e) + + +REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"] +RESP_ORDER = ["SAMLResponse", "RelayState", "SigAlg"] + + +def verify_redirect_signature(info, cert): + """ + + :param info: A dictionary as produced by parse_qs, means all values are + lists. + :param cert: A certificate to use when verifying the signature + :return: True, if signature verified + """ + + if info["SigAlg"][0] == RSA_SHA1: + if "SAMLRequest" in info: + _order = REQ_ORDER + elif "SAMLResponse" in info: + _order = RESP_ORDER + else: + raise Unsupported( + "Verifying signature on something that should not be signed") + signer = RSASigner(sha1_digest, "sha1") + args = info.copy() + del args["Signature"] # everything but the signature + string = "&".join([urllib.urlencode({k: args[k][0]}) for k in _order]) + _key = x509_rsa_loads(pem_format(cert)) + _sign = base64.b64decode(info["Signature"][0]) + try: + signer.verify(string, _sign, _key) + return True + except BadSignature: + return False + else: + raise Unsupported("Signature algorithm: %s" % info["SigAlg"]) + LOG_LINE = 60 * "=" + "\n%s\n" + 60 * "-" + "\n%s" + 60 * "=" LOG_LINE_2 = 60 * "=" + "\n%s\n%s\n" + 60 * "-" + "\n%s" + 60 * "=" diff --git a/tests/test_70_redirect_signing.py b/tests/test_70_redirect_signing.py new file mode 100644 index 0000000..6dd9fc6 --- /dev/null +++ b/tests/test_70_redirect_signing.py @@ -0,0 +1,44 @@ +from saml2.pack import http_redirect_message +from saml2.sigver import verify_redirect_signature +from saml2.sigver import RSA_SHA1 +from saml2.server import Server +from saml2 import BINDING_HTTP_REDIRECT +from saml2.client import Saml2Client +from saml2.config import SPConfig +from saml2.sigver import rsa_load +from urlparse import parse_qs + +__author__ = 'rolandh' + +idp = Server(config_file="idp_all_conf") + +conf = SPConfig() +conf.load_file("servera_conf") +sp = Saml2Client(conf) + +def test(): + srvs = sp.metadata.single_sign_on_service(idp.config.entityid, + BINDING_HTTP_REDIRECT) + + destination = srvs[0]["location"] + req = sp.create_authn_request(destination, id="id1") + + try: + key = sp.sec.key + except AttributeError: + key = rsa_load(sp.sec.key_file) + + info = http_redirect_message(req, destination, relay_state="RS", + typ="SAMLRequest", sigalg=RSA_SHA1, key=key) + + verified_ok = False + + for param, val in info["headers"]: + if param == "Location": + _dict = parse_qs(val.split("?")[1]) + _certs = idp.metadata.certs(sp.config.entityid, "any", "signing") + for cert in _certs: + if verify_redirect_signature(_dict, cert): + verified_ok = True + + assert verified_ok \ No newline at end of file