Better support for signing

This commit is contained in:
Roland Hedberg
2010-04-06 12:38:45 +02:00
parent e14ef914b7
commit c409104131
2 changed files with 172 additions and 64 deletions

View File

@@ -300,8 +300,10 @@ class Server(object):
if not status:
status = success_status_factory()
_issuer = self.issuer()
response = response_factory(
issuer=self.issuer(),
issuer=_issuer,
in_response_to = in_response_to,
destination = consumer_url,
status = status,
@@ -318,7 +320,7 @@ class Server(object):
assertion = ast.construct(sp_entity_id, in_response_to, name_id,
self.conf.attribute_converters(),
policy)
policy, issuer=_issuer)
if sign:
assertion["signature"] = pre_signature_part(assertion["id"])
@@ -379,7 +381,7 @@ class Server(object):
# ------------------------------------------------------------------------
def authn_response(self, identity, in_response_to, destination,
sp_entity_id, name_id_policy, userid):
sp_entity_id, name_id_policy, userid, sign=False):
""" Constructs an AuthenticationResponse
:param identity: Information about an user
@@ -407,11 +409,21 @@ class Server(object):
in_response_to, # in_response_to
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id
name_id,
sign=sign
)
except MissingValue, exc:
response = self.error_response(destination, in_response_to,
sp_entity_id, exc, name_id)
return ("%s" % response).split("\n")
if sign:
try:
return self.sc.sign_statement_using_xmlsec(response,
class_name(response))
except Exception, exc:
response = self.error_response(destination, in_response_to,
sp_entity_id, exc, name_id)
return ("%s" % response).split("\n")
else:
return ("%s" % response).split("\n")

View File

@@ -19,13 +19,15 @@
Based on the use of xmlsec1 binaries and not the python xmlsec module.
"""
from saml2 import samlp, class_name, saml
from saml2 import samlp, class_name, saml, make_instance
from saml2 import create_class_from_xml_string
import xmldsig as ds
from tempfile import NamedTemporaryFile
from subprocess import Popen, PIPE
import base64
import random
import os
import copy
def get_xmlsec_binary():
for path in os.environ["PATH"].split(":"):
@@ -45,6 +47,103 @@ _TEST_ = True
class SignatureError(Exception):
pass
# --------------------------------------------------------------------------
def make_signed_instance(klass, spec, seccont, base64encode=False):
instance = make_instance(klass, spec, base64encode)
if "signature" in spec:
signed_xml = seccont.sign_statement_using_xmlsec("%s" % instance,
class_name(instance))
return create_class_from_xml_string(instance.__class__, signed_xml)
else:
return instance
def _make_vals(val, klass, seccont, klass_inst=None, prop=None, part=False,
base64encode=False):
"""
Creates a class instance with a specified value, the specified
class instance may be a value on a property in a defined class instance.
:param val: The value
:param klass: The value class
:param klass_inst: The class instance which has a property on which
what this function returns is a value.
:param prop: The property which the value should be assigned to.
:param part: If the value is one of a possible list of values it should be
handled slightly different compared to if it isn't.
:return: Value class instance
"""
cinst = None
#print "make_vals(%s, %s)" % (val, klass)
if isinstance(val, dict):
cinst = signed_instance_factory(klass, val, seccont,
base64encode=base64encode)
else:
try:
cinst = klass().set_text(val)
except ValueError, excp:
if not part:
cis = [_make_vals(sval, klass, seccont, klass_inst, prop,
True, base64encode) for sval in val]
setattr(klass_inst, prop, cis)
else:
raise
if part:
return cinst
else:
if cinst:
cis = [cinst]
setattr(klass_inst, prop, cis)
def signed_instance_factory(klass, ava, seccont, base64encode=False):
instance = klass()
for prop in instance.c_attributes.values():
#print "# %s" % (prop)
if prop in ava:
if isinstance(ava[prop], bool):
setattr(instance, prop, "%s" % ava[prop])
elif isinstance(ava[prop], int):
setattr(instance, prop, "%d" % ava[prop])
else:
setattr(instance, prop, ava[prop])
if "text" in ava:
instance.set_text(ava["text"], base64encode)
for prop, klassdef in instance.c_children.values():
#print "## %s, %s" % (prop, klassdef)
if prop in ava:
#print "### %s" % ava[prop]
if isinstance(klassdef, list): # means there can be a list of values
_make_vals(ava[prop], klassdef[0], seccont, instance, prop,
base64encode=base64encode)
else:
cis = _make_vals(ava[prop], klassdef, seccont, instance, prop,
True, base64encode)
setattr(instance, prop, cis)
if "extension_elements" in ava:
for item in ava["extension_elements"]:
instance.extension_elements.append(
ExtensionElement(item["tag"]).loadd(item))
if "extension_attributes" in ava:
for key, val in ava["extension_attributes"].items():
instance.extension_attributes[key] = val
if "signature" in ava:
signed_xml = seccont.sign_statement_using_xmlsec("%s" % instance,
class_name(instance))
return create_class_from_xml_string(instance.__class__, signed_xml)
else:
return instance
# --------------------------------------------------------------------------
def create_id():
""" Create a string of 40 random characters from the set [a-p],
can be used as a unique identifier of objects.
@@ -119,6 +218,8 @@ def _parse_xmlsec_output(output):
return False
return False
__DEBUG = 1
def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="",
node_name=NODE_NAME, debug=False):
""" Verifies the signature of a XML document.
@@ -129,14 +230,14 @@ def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="",
:return: Boolean True if the signature was correct otherwise False.
"""
_, fil = make_temp("%s" % enctext, decode=False)
_, fil = make_temp(enctext, decode=False)
com_list = [xmlsec_binary, "--verify",
"--pubkey-cert-%s" % cert_type, cert_file,
"--id-attr:%s" % ID_ATTR,
node_name, fil]
if debug:
if __DEBUG:
try:
print " ".join(com_list)
except TypeError:
@@ -151,7 +252,7 @@ def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="",
output = Popen(com_list, stderr=PIPE).communicate()[1]
verified = _parse_xmlsec_output(output)
if debug:
if __DEBUG:
print output
print os.stat(cert_file)
print "Verify result: '%s'" % (verified,)
@@ -228,7 +329,9 @@ class SecurityContext(object):
""" Verifies the signature of a XML document.
:param enctext: The XML document as a string
:param der_file: The public key that was used to sign the document
:param cert_file: The public key that was used to sign the document
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
:return: Boolean True if the signature was correct otherwise False.
"""
if not cert_file:
@@ -236,7 +339,40 @@ class SecurityContext(object):
cert_type = self.cert_type
return verify_signature(enctext, self.xmlsec, cert_file, cert_type,
node_name, True)
node_name, self.debug)
def _check_signature(self, decoded_xml, item, node_name=NODE_NAME):
#print item
try:
issuer = item.issuer.text.strip()
except AttributeError:
issuer = None
if self.metadata:
certs = self.metadata.certs(issuer)
else:
certs = []
if not certs:
print "==== Certs from instance ===="
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(item)]
else:
print "==== Certs from metadata ==== %s: %s ====" % (issuer,certs)
if not certs:
raise SignatureError("Missing signing certificate")
verified = False
for _, der_file in certs:
if self.verify_signature(decoded_xml, der_file, "der", node_name):
verified = True
break
if not verified:
raise SignatureError("Failed to verify signature")
return item
def correctly_signed_authn_request(self, decoded_xml, must=False):
""" Check if a request is correctly signed, if we have metadata for
@@ -256,29 +392,7 @@ class SecurityContext(object):
else:
return request
issuer = request.issuer.text.strip()
if self.metadata:
certs = self.metadata.certs(issuer)
else:
certs = []
if not certs:
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(request)]
if not certs:
raise SignatureError("Missing signing certificate")
verified = False
for _, der_file in certs:
if verify_signature(self.xmlsec, decoded_xml, der_file):
verified = True
break
if not verified:
raise SignatureError("Failed to verify signature")
return request
return self._check_signature( decoded_xml, request )
def correctly_signed_response(self, decoded_xml, must=False):
""" Check if a instance is correctly signed, if we have metadata for
@@ -304,33 +418,7 @@ class SecurityContext(object):
if self.debug:
self.log.debug("signed")
issuer = assertion.issuer.text.strip()
if self.debug:
self.log.debug("issuer: %s" % issuer)
if self.metadata:
certs = self.metadata.certs(issuer)
else:
certs = []
if self.debug:
self.log.debug("metadata certs: %s" % certs)
if not certs:
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(assertion)]
if not certs:
raise SignatureError("Missing certificate")
verified = False
for _, der_file in certs:
if self.verify_signature(decoded_xml, der_file, "der"):
verified = True
break
if not verified:
raise SignatureError("Could not verify")
self._check_signature( decoded_xml, assertion )
return response
@@ -395,7 +483,7 @@ PRE_SIGNATURE = {
},
"reference": {
# must be replace by a uriref based on the assertion ID
"uri": "#%s",
"uri": None,
"transforms": {
"transform": [{
"algorithm": ds.TRANSFORM_ENVELOPED,
@@ -417,7 +505,7 @@ PRE_SIGNATURE = {
"signature_value": None,
}
def pre_signature_part(ident):
def pre_signature_part(ident, public_key=None):
"""
If an assertion is to be signed the signature part has to be preset
with which algorithms to be used, this function returns such a
@@ -425,9 +513,17 @@ def pre_signature_part(ident):
:param ident: The identifier of the assertion, so you know which assertion
was signed
:param public_key: The base64 part of a PEM file
:return: A preset signature part
"""
presig = PRE_SIGNATURE
presig = copy.deepcopy(PRE_SIGNATURE)
presig["signed_info"]["reference"]["uri"] = "#%s" % ident
if public_key:
presig["key_info"] = {
"x509_data": {
"x509_certificate": public_key,
}
}
return presig