Added support for signing/verifying messages when using the HTTP-Redirect binding.
This commit is contained in:
@@ -23,15 +23,18 @@ Bindings normally consists of three parts:
|
||||
- how to package the information
|
||||
- which protocol to use
|
||||
"""
|
||||
import hashlib
|
||||
import urlparse
|
||||
import saml2
|
||||
import base64
|
||||
import urllib
|
||||
from saml2.s_utils import deflate_and_base64_encode, Unsupported
|
||||
from saml2.s_utils import deflate_and_base64_encode
|
||||
from saml2.s_utils import Unsupported
|
||||
import logging
|
||||
import M2Crypto
|
||||
from saml2.sigver import RSA_SHA1, rsa_load, x509_rsa_loads, pem_format
|
||||
from saml2.sigver import RSA_SHA1
|
||||
from saml2.sigver import REQ_ORDER
|
||||
from saml2.sigver import RESP_ORDER
|
||||
from saml2.sigver import RSASigner
|
||||
from saml2.sigver import sha1_digest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -86,56 +89,6 @@ def http_form_post_message(message, location, relay_state="",
|
||||
|
||||
return {"headers": [("Content-type", "text/html")], "data": response}
|
||||
|
||||
##noinspection PyUnresolvedReferences
|
||||
#def http_post_message(message, location, relay_state="", typ="SAMLRequest"):
|
||||
# """
|
||||
#
|
||||
# :param message:
|
||||
# :param location:
|
||||
# :param relay_state:
|
||||
# :param typ:
|
||||
# :return:
|
||||
# """
|
||||
# return {"headers": [("Content-type", "text/xml")], "data": message}
|
||||
|
||||
|
||||
class BadSignature(Exception):
|
||||
"""The signature is invalid."""
|
||||
pass
|
||||
|
||||
|
||||
def sha1_digest(msg):
|
||||
return hashlib.sha1(msg).digest()
|
||||
|
||||
|
||||
class Signer(object):
|
||||
"""Abstract base class for signing algorithms."""
|
||||
def sign(self, msg, key):
|
||||
"""Sign ``msg`` with ``key`` and return the signature."""
|
||||
raise NotImplementedError
|
||||
|
||||
def verify(self, msg, sig, key):
|
||||
"""Return True if ``sig`` is a valid signature for ``msg``."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class RSASigner(Signer):
|
||||
def __init__(self, digest, algo):
|
||||
self.digest = digest
|
||||
self.algo = algo
|
||||
|
||||
def sign(self, msg, key):
|
||||
return key.sign(self.digest(msg), self.algo)
|
||||
|
||||
def verify(self, msg, sig, key):
|
||||
try:
|
||||
return key.verify(self.digest(msg), sig, self.algo)
|
||||
except M2Crypto.RSA.RSAError, e:
|
||||
raise BadSignature(e)
|
||||
|
||||
|
||||
REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]
|
||||
RESP_ORDER = ["SAMLResponse", "RelayState", "SigAlg"]
|
||||
|
||||
def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
|
||||
sigalg=None, key=None):
|
||||
@@ -198,38 +151,6 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
|
||||
return {"headers": headers, "data": body}
|
||||
|
||||
|
||||
def verify_redirect_signature(info, cert):
|
||||
"""
|
||||
|
||||
:param info: A dictionary as produced by parse_qs, means all values are
|
||||
lists.
|
||||
:param cert: A certificate to use when verifying the signature
|
||||
:return: True, if signature verified
|
||||
"""
|
||||
|
||||
if info["SigAlg"][0] == RSA_SHA1:
|
||||
if "SAMLRequest" in info:
|
||||
_order = REQ_ORDER
|
||||
elif "SAMLResponse" in info:
|
||||
_order = RESP_ORDER
|
||||
else:
|
||||
raise Unsupported(
|
||||
"Verifying signature on something that should not be signed")
|
||||
signer = RSASigner(sha1_digest, "sha1")
|
||||
args = info.copy()
|
||||
del args["Signature"] # everything but the signature
|
||||
string = "&".join([urllib.urlencode({k: args[k][0]}) for k in _order])
|
||||
_key = x509_rsa_loads(pem_format(cert))
|
||||
_sign = base64.b64decode(info["Signature"][0])
|
||||
try:
|
||||
signer.verify(string, _sign, _key)
|
||||
return True
|
||||
except BadSignature:
|
||||
return False
|
||||
else:
|
||||
raise Unsupported("Signature algorithm: %s" % info["SigAlg"])
|
||||
|
||||
|
||||
DUMMY_NAMESPACE = "http://example.org/"
|
||||
PREFIX = '<?xml version="1.0" encoding="UTF-8"?>'
|
||||
|
||||
|
@@ -21,11 +21,13 @@ Based on the use of xmlsec1 binaries and not the python xmlsec module.
|
||||
|
||||
import base64
|
||||
from binascii import hexlify
|
||||
import hashlib
|
||||
import logging
|
||||
import random
|
||||
import os
|
||||
import sys
|
||||
from time import mktime
|
||||
import urllib
|
||||
import M2Crypto
|
||||
from M2Crypto.X509 import load_cert_string
|
||||
from saml2.samlp import Response
|
||||
@@ -39,8 +41,11 @@ from saml2 import ExtensionElement
|
||||
from saml2 import VERSION
|
||||
|
||||
from saml2.s_utils import sid
|
||||
from saml2.s_utils import Unsupported
|
||||
|
||||
from saml2.time_util import instant, utc_now, str_to_time
|
||||
from saml2.time_util import instant
|
||||
from saml2.time_util import utc_now
|
||||
from saml2.time_util import str_to_time
|
||||
|
||||
from tempfile import NamedTemporaryFile
|
||||
from subprocess import Popen, PIPE
|
||||
@@ -315,7 +320,7 @@ def active_cert(key):
|
||||
return False
|
||||
|
||||
|
||||
def cert_from_key_info(key_info):
|
||||
def cert_from_key_info(key_info, ignore_age=False):
|
||||
""" Get all X509 certs from a KeyInfo instance. Care is taken to make sure
|
||||
that the certs are continues sequences of bytes.
|
||||
|
||||
@@ -333,14 +338,14 @@ def cert_from_key_info(key_info):
|
||||
cert = x509_certificate.text.strip()
|
||||
cert = "\n".join(split_len("".join([s.strip() for s in
|
||||
cert.split()]), 64))
|
||||
if active_cert(cert):
|
||||
if ignore_age or active_cert(cert):
|
||||
res.append(cert)
|
||||
else:
|
||||
logger.info("Inactive cert")
|
||||
return res
|
||||
|
||||
|
||||
def cert_from_key_info_dict(key_info):
|
||||
def cert_from_key_info_dict(key_info, ignore_age=False):
|
||||
""" Get all X509 certs from a KeyInfo dictionary. Care is taken to make sure
|
||||
that the certs are continues sequences of bytes.
|
||||
|
||||
@@ -360,7 +365,7 @@ def cert_from_key_info_dict(key_info):
|
||||
cert = x509_certificate["text"].strip()
|
||||
cert = "\n".join(split_len("".join([s.strip() for s in
|
||||
cert.split()]), 64))
|
||||
if active_cert(cert):
|
||||
if ignore_age or active_cert(cert):
|
||||
res.append(cert)
|
||||
else:
|
||||
logger.info("Inactive cert")
|
||||
@@ -375,7 +380,8 @@ def cert_from_instance(instance):
|
||||
"""
|
||||
if instance.signature:
|
||||
if instance.signature.key_info:
|
||||
return cert_from_key_info(instance.signature.key_info)
|
||||
return cert_from_key_info(instance.signature.key_info,
|
||||
ignore_age=True)
|
||||
return []
|
||||
|
||||
# =============================================================================
|
||||
@@ -472,6 +478,77 @@ def parse_xmlsec_output(output):
|
||||
|
||||
__DEBUG = 0
|
||||
|
||||
|
||||
class BadSignature(Exception):
|
||||
"""The signature is invalid."""
|
||||
pass
|
||||
|
||||
|
||||
def sha1_digest(msg):
|
||||
return hashlib.sha1(msg).digest()
|
||||
|
||||
|
||||
class Signer(object):
|
||||
"""Abstract base class for signing algorithms."""
|
||||
def sign(self, msg, key):
|
||||
"""Sign ``msg`` with ``key`` and return the signature."""
|
||||
raise NotImplementedError
|
||||
|
||||
def verify(self, msg, sig, key):
|
||||
"""Return True if ``sig`` is a valid signature for ``msg``."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class RSASigner(Signer):
|
||||
def __init__(self, digest, algo):
|
||||
self.digest = digest
|
||||
self.algo = algo
|
||||
|
||||
def sign(self, msg, key):
|
||||
return key.sign(self.digest(msg), self.algo)
|
||||
|
||||
def verify(self, msg, sig, key):
|
||||
try:
|
||||
return key.verify(self.digest(msg), sig, self.algo)
|
||||
except M2Crypto.RSA.RSAError, e:
|
||||
raise BadSignature(e)
|
||||
|
||||
|
||||
REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]
|
||||
RESP_ORDER = ["SAMLResponse", "RelayState", "SigAlg"]
|
||||
|
||||
|
||||
def verify_redirect_signature(info, cert):
|
||||
"""
|
||||
|
||||
:param info: A dictionary as produced by parse_qs, means all values are
|
||||
lists.
|
||||
:param cert: A certificate to use when verifying the signature
|
||||
:return: True, if signature verified
|
||||
"""
|
||||
|
||||
if info["SigAlg"][0] == RSA_SHA1:
|
||||
if "SAMLRequest" in info:
|
||||
_order = REQ_ORDER
|
||||
elif "SAMLResponse" in info:
|
||||
_order = RESP_ORDER
|
||||
else:
|
||||
raise Unsupported(
|
||||
"Verifying signature on something that should not be signed")
|
||||
signer = RSASigner(sha1_digest, "sha1")
|
||||
args = info.copy()
|
||||
del args["Signature"] # everything but the signature
|
||||
string = "&".join([urllib.urlencode({k: args[k][0]}) for k in _order])
|
||||
_key = x509_rsa_loads(pem_format(cert))
|
||||
_sign = base64.b64decode(info["Signature"][0])
|
||||
try:
|
||||
signer.verify(string, _sign, _key)
|
||||
return True
|
||||
except BadSignature:
|
||||
return False
|
||||
else:
|
||||
raise Unsupported("Signature algorithm: %s" % info["SigAlg"])
|
||||
|
||||
LOG_LINE = 60 * "=" + "\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
|
||||
LOG_LINE_2 = 60 * "=" + "\n%s\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
|
||||
|
||||
|
44
tests/test_70_redirect_signing.py
Normal file
44
tests/test_70_redirect_signing.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from saml2.pack import http_redirect_message
|
||||
from saml2.sigver import verify_redirect_signature
|
||||
from saml2.sigver import RSA_SHA1
|
||||
from saml2.server import Server
|
||||
from saml2 import BINDING_HTTP_REDIRECT
|
||||
from saml2.client import Saml2Client
|
||||
from saml2.config import SPConfig
|
||||
from saml2.sigver import rsa_load
|
||||
from urlparse import parse_qs
|
||||
|
||||
__author__ = 'rolandh'
|
||||
|
||||
idp = Server(config_file="idp_all_conf")
|
||||
|
||||
conf = SPConfig()
|
||||
conf.load_file("servera_conf")
|
||||
sp = Saml2Client(conf)
|
||||
|
||||
def test():
|
||||
srvs = sp.metadata.single_sign_on_service(idp.config.entityid,
|
||||
BINDING_HTTP_REDIRECT)
|
||||
|
||||
destination = srvs[0]["location"]
|
||||
req = sp.create_authn_request(destination, id="id1")
|
||||
|
||||
try:
|
||||
key = sp.sec.key
|
||||
except AttributeError:
|
||||
key = rsa_load(sp.sec.key_file)
|
||||
|
||||
info = http_redirect_message(req, destination, relay_state="RS",
|
||||
typ="SAMLRequest", sigalg=RSA_SHA1, key=key)
|
||||
|
||||
verified_ok = False
|
||||
|
||||
for param, val in info["headers"]:
|
||||
if param == "Location":
|
||||
_dict = parse_qs(val.split("?")[1])
|
||||
_certs = idp.metadata.certs(sp.config.entityid, "any", "signing")
|
||||
for cert in _certs:
|
||||
if verify_redirect_signature(_dict, cert):
|
||||
verified_ok = True
|
||||
|
||||
assert verified_ok
|
Reference in New Issue
Block a user