Merge pull request #20 from fredrikt/master

test multiple_signatures
This commit is contained in:
Roland Hedberg
2013-05-03 01:53:40 -07:00
10 changed files with 170 additions and 79 deletions

View File

@@ -323,11 +323,8 @@ class Config(object):
def load_metadata(self, metadata_conf):
""" Loads metadata into an internal structure """
xmlsec_binary = self.xmlsec_binary
acs = self.attribute_converters
if xmlsec_binary is None:
raise Exception("Missing xmlsec1 specification")
if acs is None:
raise Exception("Missing attribute converter specification")
@@ -341,7 +338,7 @@ class Config(object):
disable_validation = False
mds = MetadataStore(
ONTS.values(), acs, xmlsec_binary, ca_certs,
ONTS.values(), acs, self, ca_certs,
disable_ssl_certificate_validation=disable_validation)
mds.imp(metadata_conf)

View File

@@ -79,7 +79,7 @@ class Client(Entity):
self._verbose = verbose
if metadata_file:
self._metadata = MetadataStore([saml, samlp], None, xmlsec_binary)
self._metadata = MetadataStore([saml, samlp], None, config)
self._metadata.load("local", metadata_file)
logger.debug("Loaded metadata from '%s'" % metadata_file)
else:

View File

@@ -303,8 +303,9 @@ class HTTPBase(object):
logger.debug("SOAP message: %s" % soap_message)
if sign and self.sec:
_signed = self.sec.sign_statement_using_xmlsec(
soap_message, class_name=class_name(request), node_id=request.id)
_signed = self.sec.sign_statement(soap_message,
class_name=class_name(request),
nodeid=request.id)
soap_message = _signed
return {"url": destination, "method": "POST",

View File

@@ -14,10 +14,11 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
from saml2.s_utils import UnsupportedBinding, UnknownPrincipal
from saml2.sigver import verify_signature, split_len
from saml2.sigver import split_len
from saml2.validate import valid_instance
from saml2.time_util import valid
from saml2.validate import NotValid
from saml2.sigver import security_context
__author__ = 'rolandh'
@@ -340,11 +341,19 @@ class MetaDataExtern(MetaData):
Accessible but HTTP GET.
"""
def __init__(self, onts, attrc, url, xmlsec_binary, cert, http):
def __init__(self, onts, attrc, url, security, cert, http):
"""
:params onts:
:params attrc:
:params url:
:params security: SecurityContext()
:params cert:
:params http:
"""
MetaData.__init__(self, onts, attrc)
self.url = url
self.security = security
self.cert = cert
self.xmlsec_binary = xmlsec_binary
self.http = http
def load(self):
@@ -354,10 +363,12 @@ class MetaDataExtern(MetaData):
"""
response = self.http.send(self.url)
if response.status == 200:
if verify_signature(
response.text, self.xmlsec_binary, self.cert,
node_name="%s:%s" % (md.EntitiesDescriptor.c_namespace,
md.EntitiesDescriptor.c_tag)):
node_name="%s:%s" % (md.EntitiesDescriptor.c_namespace,
md.EntitiesDescriptor.c_tag)
if self.security.verify_signature(response.text,
node_name=node_name,
cert_file=self.cert,
):
self.parse(response.text)
return True
else:
@@ -379,13 +390,20 @@ class MetaDataMD(MetaData):
class MetadataStore(object):
def __init__(self, onts, attrc, xmlsec_binary=None, ca_certs=None,
def __init__(self, onts, attrc, config, ca_certs=None,
disable_ssl_certificate_validation=False):
"""
:params onts:
:params attrc:
:params config: Config()
:params ca_certs:
:params disable_ssl_certificate_validation:
"""
self.onts = onts
self.attrc = attrc
self.http = HTTPBase(verify=disable_ssl_certificate_validation,
ca_bundle=ca_certs)
self.xmlsec_binary = xmlsec_binary
self.security = security_context(config)
self.ii = 0
self.metadata = {}
@@ -400,7 +418,7 @@ class MetadataStore(object):
elif typ == "remote":
key = kwargs["url"]
md = MetaDataExtern(self.onts, self.attrc,
kwargs["url"], self.xmlsec_binary,
kwargs["url"], self.security,
kwargs["cert"], self.http)
elif typ == "mdfile":
key = args[0]

View File

@@ -596,8 +596,7 @@ def entities_descriptor(eds, valid_for, name, ident, sign, secc):
entities.signature = pre_signature_part(ident, secc.my_cert, 1)
entities.id = ident
xmldoc = secc.sign_statement_using_xmlsec("%s" % entities,
class_name(entities))
xmldoc = secc.sign_statement("%s" % entities, class_name(entities))
entities = md.entities_descriptor_from_string(xmldoc)
return entities
@@ -608,5 +607,5 @@ def sign_entity_descriptor(edesc, ident, secc):
edesc.signature = pre_signature_part(ident, secc.my_cert, 1)
edesc.id = ident
xmldoc = secc.sign_statement_using_xmlsec("%s" % edesc, class_name(edesc))
xmldoc = secc.sign_statement("%s" % edesc, class_name(edesc))
return md.entity_descriptor_from_string(xmldoc)

View File

@@ -251,7 +251,7 @@ def signed_instance_factory(instance, seccont, elements_to_sign=None):
if elements_to_sign:
signed_xml = "%s" % instance
for (node_name, nodeid) in elements_to_sign:
signed_xml = seccont.sign_statement_using_xmlsec(
signed_xml = seccont.sign_statement(
signed_xml, class_name=node_name, node_id=nodeid)
#print "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
@@ -557,35 +557,6 @@ LOG_LINE = 60 * "=" + "\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
LOG_LINE_2 = 60 * "=" + "\n%s\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="pem",
node_name=NODE_NAME, debug=False, node_id=None,
id_attr=""):
""" Verifies the signature of a XML document.
:param enctext: The signed XML document
:param xmlsec_binary: The xmlsec1 binaries to be used (or CryptoBackend())
:param cert_file: The public key used to decrypt the signature
:param cert_type: The cert format
:param node_name: The SAML class of the root node in the signed document
:param debug: To debug or not
:param node_id: The identifier of the root node if any
:return: The signed document if all was OK otherwise will raise an
exception.
"""
if not id_attr:
id_attr = ID_ATTR
crypto = xmlsec_binary
if not isinstance(crypto, CryptoBackend):
# backwards compatibility
crypto = CryptoBackendXmlSec1(xmlsec_binary, debug=debug)
return crypto.validate_signature(enctext, cert_file=cert_file,
cert_type=cert_type, node_name=node_name,
node_id=node_id, id_attr=id_attr,
)
# ---------------------------------------------------------------------------
@@ -636,7 +607,7 @@ class CryptoBackend():
def decrypt(self, enctext, key_file):
raise NotImplementedError()
def sign_statement(self, statement, class_name, key, key_file, nodeid,
def sign_statement(self, statement, class_name, key_file, nodeid,
id_attr):
raise NotImplementedError()
@@ -646,6 +617,10 @@ class CryptoBackend():
class CryptoBackendXmlSec1(CryptoBackend):
"""
CryptoBackend implementation using external binary xmlsec1 to sign
and verify XML documents.
"""
__DEBUG = 0
@@ -680,8 +655,19 @@ class CryptoBackendXmlSec1(CryptoBackend):
validate_output=False)
return output
def sign_statement(self, statement, class_name, key, key_file, node_id,
def sign_statement(self, statement, class_name, key_file, node_id,
id_attr):
"""
Sign an XML statement.
:param statement: The statement to be signed
:param class_name: string like 'urn:oasis:names:...:Assertion'
:param key_file: The file where the key can be found
:param node_id:
:param id_attr: The attribute name for the identifier, normally one of
'id','Id' or 'ID'
:return: The signed statement
"""
_, fil = make_temp("%s" % statement, decode=False)
@@ -707,9 +693,20 @@ class CryptoBackendXmlSec1(CryptoBackend):
except DecryptError:
raise Exception("Signing failed")
def validate_signature(self, enctext, cert_file, cert_type, node_name,
def validate_signature(self, signedtext, cert_file, cert_type, node_name,
node_id, id_attr):
_, fil = make_temp(enctext, decode=False)
"""
Validate signature on XML document.
:param signedtext: The XML document as a string
:param cert_file: The public key that was used to sign the document
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
:param node_id: The identifier of the node
:param id_attr: Should normally be one of "id", "Id" or "ID"
:return: Boolean True if the signature was correct otherwise False.
"""
_, fil = make_temp(signedtext, decode=False)
com_list = [self.xmlsec, "--verify",
"--pubkey-cert-%s" % cert_type, cert_file,
@@ -768,6 +765,64 @@ class CryptoBackendXmlSec1(CryptoBackend):
ntf.seek(0)
return p_out, p_err, ntf.read()
class CryptoBackendXMLSecurity(CryptoBackend):
"""
CryptoBackend implementation using pyXMLSecurity to sign and verify
XML documents.
Encrypt and decrypt is currently unsupported by pyXMLSecurity.
pyXMLSecurity uses lxml (libxml2) to parse XML data, but otherwise
try to get by with native Python code. It does native Python RSA
signatures, or alternatively PyKCS11 to offload cryptographic work
to an external PKCS#11 module.
"""
def __init__(self, debug=False):
CryptoBackend.__init__(self)
self.debug = debug
def sign_statement(self, statement, _class_name, key_file, _nodeid,
_id_attr):
"""
Sign an XML statement.
The parameters actually used in this CryptoBackend
implementation are :
:param statement: XML as string
:param key_file: xmlsec key_spec string(), filename,
"pkcs11://" URI or PEM data
:returns: Signed XML as string
"""
import xmlsec
import lxml.etree
xml = xmlsec.parse_xml(statement)
signed = xmlsec.sign(xml, key_file)
return lxml.etree.tostring(signed, xml_declaration=True)
def validate_signature(self, signedtext, cert_file, cert_type, _node_name,
_node_id, _id_attr):
"""
Validate signature on XML document.
The parameters actually used in this CryptoBackend
implementation are :
:param signedtext: The signed XML data as string
:param cert_file: xmlsec key_spec string(), filename,
"pkcs11://" URI or PEM data
:param cert_type: string, must be 'pem' for now
:returns: True on successful validation, False otherwise
"""
if cert_type != "pem":
raise Unsupported("Only PEM certs supported here")
import xmlsec
xml = xmlsec.parse_xml(signedtext)
try:
return xmlsec.verify(xml, cert_file)
except xmlsec.XMLSigException:
return False
def security_context(conf, debug=None):
""" Creates a security context based on the configuration
@@ -788,6 +843,9 @@ def security_context(conf, debug=None):
_only_md = False
crypto = get_xmlsec_cryptobackend(conf.xmlsec_binary, debug=debug)
# Uncomment this to enable the new and somewhat untested pyXMLSecurity
# crypto backend.
#crypto = CryptoBackendXMLSecurity(debug=debug)
return SecurityContext(crypto, conf.key_file,
cert_file=conf.cert_file, metadata=metadata,
@@ -822,7 +880,7 @@ class SecurityContext(object):
else:
self.template = template
self.key_type = encrypt_key_type
self.encrypt_key_type = encrypt_key_type
def correctly_signed(self, xml, must=False):
logger.info("verify correct signature")
@@ -841,7 +899,7 @@ class SecurityContext(object):
:result: An encrypted XML text
"""
if not key_type:
key_type = self.key_type
key_type = self.encrypt_key_type
if not template:
template = self.template
@@ -855,11 +913,11 @@ class SecurityContext(object):
"""
return self.crypto.decrypt(enctext, self.key_file)
def verify_signature(self, enctext, cert_file=None, cert_type="pem",
def verify_signature(self, signedtext, cert_file=None, cert_type="pem",
node_name=NODE_NAME, node_id=None, id_attr=""):
""" Verifies the signature of a XML document.
:param enctext: The XML document as a string
:param signedtext: The XML document as a string
:param cert_file: The public key that was used to sign the document
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
@@ -873,8 +931,14 @@ class SecurityContext(object):
cert_file = self.cert_file
cert_type = self.cert_type
return verify_signature(enctext, self.crypto, cert_file, cert_type,
node_name, self.debug, node_id, id_attr)
if not id_attr:
id_attr = ID_ATTR
return self.crypto.validate_signature(signedtext, cert_file=cert_file,
cert_type=cert_type,
node_name=node_name,
node_id=node_id, id_attr=id_attr,
)
def _check_signature(self, decoded_xml, item, node_name=NODE_NAME,
origdoc=None, id_attr="", must=False):
@@ -950,9 +1014,9 @@ class SecurityContext(object):
the entity that sent the info use that, if not use the key that are in
the message if any.
:param decoded_xml:
:param decoded_xml: The SAML message as a XML string
:param msgtype:
:param must:
:param must: Whether there must be a signature
:param origdoc:
:return:
"""
@@ -1066,6 +1130,7 @@ class SecurityContext(object):
:param decoded_xml: The SAML message as a XML string
:param must: Whether there must be a signature
:param origdoc:
:return: None if the signature can not be verified otherwise an instance
"""
@@ -1109,8 +1174,10 @@ class SecurityContext(object):
"""Sign a SAML statement.
:param statement: The statement to be signed
:param class_name: string like 'urn:oasis:names:...:Assertion'
:param key: The key to be used for the signing, either this or
:param key_file: The file where the key can be found
:param node_id:
:param id_attr: The attribute name for the identifier, normally one of
'id','Id' or 'ID'
:return: The signed statement
@@ -1124,7 +1191,7 @@ class SecurityContext(object):
if not key and not key_file:
key_file = self.key_file
return self.crypto.sign_statement(statement, class_name, key, key_file,
return self.crypto.sign_statement(statement, class_name, key_file,
node_id, id_attr)
def sign_assertion_using_xmlsec(self, statement, **kwargs):
@@ -1197,6 +1264,7 @@ def pre_signature_part(ident, public_key=None, identifier=None):
:param ident: The identifier of the assertion, so you know which assertion
was signed
:param public_key: The base64 part of a PEM file
:param identifier:
:return: A preset signature part
"""

View File

@@ -12,6 +12,8 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import saml
from saml2 import sigver
from saml2 import config
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import d_to_local_name
@@ -24,8 +26,10 @@ from saml2.s_utils import UnknownPrincipal
import xmldsig
import xmlenc
from pathutils import full_path, xmlsec_path
from pathutils import full_path
sec_config = config.Config()
#sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
ONTS = {
saml.NAMESPACE: saml,
@@ -83,7 +87,7 @@ def _fix_valid_until(xmlstring):
def test_swami_1():
UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["1"])
@@ -116,7 +120,7 @@ def test_swami_1():
def test_incommon_1():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["2"])
@@ -154,7 +158,7 @@ def test_incommon_1():
def test_ext_2():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["3"])
@@ -167,7 +171,7 @@ def test_ext_2():
def test_example():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["4"])
@@ -183,7 +187,7 @@ def test_example():
def test_switch_1():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["5"])
@@ -211,7 +215,7 @@ def test_switch_1():
def test_sp_metadata():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["6"])

View File

@@ -28,9 +28,8 @@ sp1 = {
}
}
},
"key_file" : full_path("mykey.pem"),
"cert_file" : full_path("mycert.pem"),
#"xmlsec_binary" : "/opt/local/bin/xmlsec1",
"key_file" : full_path("test.key"),
"cert_file" : full_path("test.pem"),
"metadata": {
"local": [full_path("metadata.xml"),
full_path("urn-mace-swami.se-swamid-test-1.0-metadata.xml")],

View File

@@ -9,6 +9,7 @@ from saml2 import sigver
from saml2 import class_name
from saml2 import time_util
from saml2 import saml, samlp
from saml2 import config
from saml2.s_utils import factory, do_attribute_statement
from saml2.sigver import xmlsec_version, get_xmlsec_cryptobackend, get_xmlsec_binary
@@ -360,8 +361,9 @@ class TestSecurity():
class TestSecurityMetadata():
def setup_class(self):
xmlexec = get_xmlsec_binary()
md = MetadataStore([saml, samlp], None, xmlexec)
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
md.load("local", full_path("metadata_cert.xml"))
crypto = get_xmlsec_cryptobackend()

View File

@@ -8,6 +8,7 @@ from saml2.attribute_converter import d_to_local_name, ac_factory
from saml2 import saml
from saml2 import md
from saml2 import config
from saml2.extension import mdui
from saml2.extension import idpdisc
@@ -17,7 +18,7 @@ from saml2.extension import ui
import xmldsig
import xmlenc
from pathutils import full_path, xmlsec_path
from pathutils import full_path
ONTS = {
saml.NAMESPACE: saml,
@@ -39,8 +40,10 @@ def _eq(l1, l2):
def test_metadata():
conf = config.Config()
conf.load_file("idp_conf_mdb")
UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
mds = MetadataStore(ONTS.values(), ATTRCONV, conf,
disable_ssl_certificate_validation=True)
mds.imp({"local": [full_path("swamid-1.0.xml")]})