Added license info, refactored and added documentation
This commit is contained in:
@@ -1,4 +1,23 @@
|
||||
#!/iusr/bin/env python
|
||||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2009 Umeå University
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Contains classes and functions to alleviate the handling of SAML metadata
|
||||
"""
|
||||
|
||||
import base64
|
||||
import time
|
||||
@@ -6,45 +25,7 @@ from tempfile import NamedTemporaryFile
|
||||
from saml2 import md
|
||||
from saml2 import samlp, BINDING_HTTP_REDIRECT, BINDING_SOAP
|
||||
from saml2.time_util import str_to_time
|
||||
|
||||
def make_temp(string, suffix="", decode=True):
|
||||
""" xmlsec needs files in some cases and I have string hence the
|
||||
need for this function, that creates as temporary file with the
|
||||
string as only content.
|
||||
|
||||
:param string: The information to be placed in the file
|
||||
:param suffix: The temporary file might have to have a specific
|
||||
suffix in certain circumstances.
|
||||
:param decode: The input string might be base64 coded. If so it
|
||||
must be decoded before placed in the file.
|
||||
:return: 2-tuple with file pointer ( so the calling function can
|
||||
close the file) and filename (which is then needed by the
|
||||
xmlsec function).
|
||||
"""
|
||||
ntf = NamedTemporaryFile(suffix=suffix)
|
||||
if decode:
|
||||
ntf.write(base64.b64decode(string))
|
||||
else:
|
||||
ntf.write(string)
|
||||
ntf.seek(0)
|
||||
return ntf, ntf.name
|
||||
|
||||
|
||||
def cert_from_key_info(key_info):
|
||||
""" Get all X509 certs from a KeyInfo instance. Care is taken to make sure
|
||||
that the certs are continues sequences of bytes.
|
||||
|
||||
:param key_info: The KeyInfo instance
|
||||
:return: A possibly empty list of certs
|
||||
"""
|
||||
keys = []
|
||||
for x509_data in key_info.x509_data:
|
||||
#print "X509Data",x509_data
|
||||
for x509_certificate in x509_data.x509_certificate:
|
||||
cert = x509_certificate.text.strip()
|
||||
cert = "".join([s.strip() for s in cert.split("\n")])
|
||||
keys.append(cert)
|
||||
return keys
|
||||
from saml2.sigver import make_temp, cert_from_key_info
|
||||
|
||||
class MetaData(object):
|
||||
""" A class to manage metadata information """
|
||||
@@ -59,7 +40,13 @@ class MetaData(object):
|
||||
self.cache_until = None
|
||||
|
||||
def _vo_metadata(self, entity_descriptor):
|
||||
""" gather the members of VOs """
|
||||
"""
|
||||
Pick out the Affiliation descriptors from an entity
|
||||
descriptor and store the information in a way which is easily
|
||||
accessible.
|
||||
|
||||
:param entity_descriptor: A EntityDescriptor instance
|
||||
"""
|
||||
try:
|
||||
afd = entity_descriptor.affiliation_descriptor
|
||||
except AttributeError:
|
||||
@@ -74,7 +61,13 @@ class MetaData(object):
|
||||
self.vo[entity_descriptor.entity_id] = members
|
||||
|
||||
def _idp_metadata(self, entity_descriptor):
|
||||
|
||||
"""
|
||||
Pick out the IdP SSO descriptors from an entity
|
||||
descriptor and store the information in a way which is easily
|
||||
accessible.
|
||||
|
||||
:param entity_descriptor: A EntityDescriptor instance
|
||||
"""
|
||||
try:
|
||||
isd = entity_descriptor.idp_sso_descriptor
|
||||
except AttributeError:
|
||||
@@ -100,7 +93,13 @@ class MetaData(object):
|
||||
self.idp[entity_descriptor.entity_id] = idps
|
||||
|
||||
def _aad_metadata(self,entity_descriptor):
|
||||
#print entity_descriptor.__dict__.keys()
|
||||
"""
|
||||
Pick out the attribute authority descriptors from an entity
|
||||
descriptor and store the information in a way which is easily
|
||||
accessible.
|
||||
|
||||
:param entity_descriptor: A EntityDescriptor instance
|
||||
"""
|
||||
try:
|
||||
attr_auth_descr = entity_descriptor.attribute_authority_descriptor
|
||||
except AttributeError:
|
||||
@@ -218,22 +217,21 @@ class MetaData(object):
|
||||
return self.vo[entity_id]
|
||||
except KeyError:
|
||||
return []
|
||||
|
||||
def cert_from_assertion(assertion):
|
||||
""" Find certificates that are part of an assertion
|
||||
|
||||
:param assertion: A saml.Assertion instance
|
||||
:return: possible empty list of certificates
|
||||
|
||||
def make_vals(val, klass, klass_inst=None, prop=None, part=False):
|
||||
"""
|
||||
if assertion.signature:
|
||||
if assertion.signature.key_info:
|
||||
return cert_from_key_info(assertion.signature.key_info)
|
||||
return []
|
||||
Creates a class instance with a specified value, the specified
|
||||
class instance are a value on a property in a defined class instance.
|
||||
|
||||
def make_contact_person(spec):
|
||||
return make_xyz(md.ContactPerson, spec)
|
||||
|
||||
def _make_vals(xyz_inst, val, prop, klass, part=False):
|
||||
:param klass_inst: The class instance which has a property on which
|
||||
what this function returns is a value.
|
||||
:param val: The value
|
||||
:param prop: The property which the value should be assigned to.
|
||||
:param klass: The value class
|
||||
:param part: If the value is one of a possible list of values it should be
|
||||
handled slightly different compared to if it isn't.
|
||||
:return: Value class instance
|
||||
"""
|
||||
ci = None
|
||||
#print "_make_val: %s %s (%s)" % (prop,val,klass)
|
||||
if isinstance(val, bool):
|
||||
@@ -242,11 +240,13 @@ def _make_vals(xyz_inst, val, prop, klass, part=False):
|
||||
ci = klass(text="%d" % val)
|
||||
elif isinstance(val, basestring):
|
||||
ci = klass(text=val)
|
||||
elif val == None:
|
||||
ci = klass()
|
||||
elif isinstance(val, dict):
|
||||
ci = make_xyz(klass, val)
|
||||
ci = make_instance(klass, val)
|
||||
elif not part:
|
||||
cis = [_make_vals(xyz_inst, sval, prop, klass, True) for sval in val]
|
||||
setattr(xyz_inst, prop, cis)
|
||||
cis = [make_vals(sval, klass, klass_inst, prop, True) for sval in val]
|
||||
setattr(klass_inst, prop, cis)
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
@@ -255,105 +255,33 @@ def _make_vals(xyz_inst, val, prop, klass, part=False):
|
||||
else:
|
||||
if ci:
|
||||
cis = [ci]
|
||||
setattr(xyz_inst, prop, cis)
|
||||
setattr(klass_inst, prop, cis)
|
||||
|
||||
def make_xyz(xyz, spec):
|
||||
#print "Make", xyz
|
||||
#print "Spec", spec
|
||||
xyz_inst = xyz()
|
||||
for prop in xyz.c_attributes.values():
|
||||
def make_instance(klass, spec):
|
||||
"""
|
||||
Constructs a class instance containing the specified information
|
||||
|
||||
:param klass: The class
|
||||
:param spec: Information to be placed in the instance
|
||||
:return: The instance
|
||||
"""
|
||||
klass_inst = klass()
|
||||
for prop in klass.c_attributes.values():
|
||||
if prop in spec:
|
||||
if isinstance(spec[prop],bool):
|
||||
setattr(xyz_inst,prop,"%s" % spec[prop])
|
||||
setattr(klass_inst,prop,"%s" % spec[prop])
|
||||
elif isinstance(spec[prop], int):
|
||||
setattr(xyz_inst,prop,"%d" % spec[prop])
|
||||
setattr(klass_inst,prop,"%d" % spec[prop])
|
||||
else:
|
||||
setattr(xyz_inst,prop,spec[prop])
|
||||
setattr(klass_inst,prop,spec[prop])
|
||||
if "text" in spec:
|
||||
setattr(xyz_inst,"text",spec["text"])
|
||||
setattr(klass_inst,"text",spec["text"])
|
||||
|
||||
for prop, klass in xyz.c_children.values():
|
||||
#print "Prop",prop
|
||||
#print "Class",klass
|
||||
for prop, klass in klass.c_children.values():
|
||||
if prop in spec:
|
||||
if isinstance(klass, list): # means there can be a list of values
|
||||
#print "Possible list of values"
|
||||
_make_vals(xyz_inst, spec[prop], prop, klass[0])
|
||||
make_vals(spec[prop], klass[0], klass_inst, prop)
|
||||
else:
|
||||
#print "Single value"
|
||||
ci = _make_vals(xyz_inst, spec[prop], prop, klass, True)
|
||||
setattr(xyz_inst, prop, ci)
|
||||
return xyz_inst
|
||||
|
||||
def make_spsso_descriptor(spec):
|
||||
spsso = md.SPSSODescriptor(
|
||||
protocolSupportEnumeration = samlp.NAMESPACE,
|
||||
want_assertions_signed = True,
|
||||
authn_requests_signed = False
|
||||
)
|
||||
|
||||
if "key" in spec:
|
||||
arr=[]
|
||||
for key in spec["key"]:
|
||||
x509_certificate = ds.X509Certificate()
|
||||
if key["filetype"] == "pem":
|
||||
x509_certificate.text = "".join(
|
||||
open(key["file"]).readlines()[1:-1])
|
||||
elif key["filetype"] == "der":
|
||||
x509_certificate.text = open(key["file"]).read()
|
||||
x509_data = ds.X509Data(
|
||||
x509_certificate=x509_certificate)
|
||||
key_info = ds.KeyInfo(x509_data=x509_data)
|
||||
if "key_name" in key:
|
||||
key_info.key_name = ds.KeyName(key_name=key["key_name"])
|
||||
key_desc = md.KeyDescriptor(key_info=key_info)
|
||||
if "use" in key:
|
||||
key_desc.use = key["use"]
|
||||
arr.append(key_desc)
|
||||
spsso.key_descriptor = arr
|
||||
|
||||
if "name_id_format" in spec:
|
||||
arr = []
|
||||
for nif in spec["name_id_format"]:
|
||||
format = md.NameIDFormat()
|
||||
format.text = nif
|
||||
arr.append(format)
|
||||
spsso.name_id_format = arr
|
||||
|
||||
if "assertion_consumer_service" in spec:
|
||||
arr = []
|
||||
for acs in spec["assertion_consumer_service"]:
|
||||
service = md.AssertionConsumerService()
|
||||
service.binding = acs["binding"]
|
||||
service.location = acs["location"]
|
||||
service.index = acs["index"]
|
||||
arr.append(service)
|
||||
spsso.assertion_consumer_service = arr
|
||||
|
||||
if "contact":
|
||||
pass
|
||||
|
||||
return spsso
|
||||
|
||||
def make_entity_description(spec):
|
||||
"""
|
||||
:param spec: dictionary with necessary information
|
||||
:return md.EntityDescriptor instans
|
||||
"""
|
||||
|
||||
ed = md.EntityDescriptor(
|
||||
entity_id = spec["entity_id"],
|
||||
)
|
||||
|
||||
if "organisation" in spec:
|
||||
ed.organization = md.Organization(
|
||||
organization_name = [md.Organization(
|
||||
text=spec["organisation"]["name"])],
|
||||
organization_url = [md.OrganizationURL(
|
||||
text=spec["organisation"]["url"])])
|
||||
|
||||
if "spsso" in spec:
|
||||
ed.sp_sso_descriptor = [
|
||||
make_spsso_descriptor(sp) for sp in spec["spsso"]]
|
||||
|
||||
return ed
|
||||
ci = make_vals(spec[prop], klass, klass_inst, prop, True)
|
||||
setattr(klass_inst, prop, ci)
|
||||
return klass_inst
|
||||
|
||||
Reference in New Issue
Block a user