Moved functions from make_metadata to metadata to make them usable in other contexts

This commit is contained in:
Roland Hedberg
2010-08-08 15:04:16 +02:00
parent f835337fb7
commit 027c08fdcd
3 changed files with 539 additions and 346 deletions

View File

@@ -22,14 +22,19 @@ Contains classes and functions to alleviate the handling of SAML metadata
import httplib2
import sys
from decorator import decorator
import xmldsig as ds
from saml2 import md, BINDING_HTTP_POST
from saml2 import samlp, BINDING_HTTP_REDIRECT, BINDING_SOAP
#from saml2.time_util import str_to_time
from saml2 import md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP, class_name
from saml2.s_utils import factory
from saml2.saml import NAME_FORMAT_URI
from saml2.time_util import in_a_while
from saml2.time_util import valid
from saml2.attribute_converter import from_local_name, ac_factory
from saml2.attribute_converter import ava_fro, AttributeConverter
from saml2.sigver import pre_signature_part
from saml2.sigver import make_temp, cert_from_key_info, verify_signature
from saml2.sigver import pem_format
from saml2.time_util import valid
from saml2.attribute_converter import ava_fro
@decorator
def keep_updated(func, self, entity_id, *args, **kwargs):
@@ -45,7 +50,7 @@ def keep_updated(func, self, entity_id, *args, **kwargs):
class MetaData(object):
""" A class to manage metadata information """
def __init__(self, xmlsec_binary=None, attrconv=None, log=None, extras=None):
def __init__(self, xmlsec_binary=None, attrconv=None, log=None):
self.log = log
self.xmlsec_binary = xmlsec_binary
self.attrconv = attrconv or []
@@ -58,7 +63,7 @@ class MetaData(object):
self._import = {}
self._wants = {}
def _vo_metadata(self, entity_descriptor, entity, tag):
def _vo_metadata(self, entity_descr, entity, tag):
"""
Pick out the Affiliation descriptors from an entity
descriptor and store the information in a way which is easily
@@ -67,7 +72,7 @@ class MetaData(object):
:param entity_descriptor: A EntityDescriptor instance
"""
afd = entity_descriptor.affiliation_descriptor
afd = entity_descr.affiliation_descriptor
if afd:
members = [member.text.strip() for member in afd.affiliate_member]
@@ -75,16 +80,16 @@ class MetaData(object):
if members != []:
entity[tag] = members
def _sp_metadata(self, entity_descriptor, entity, tag):
def _sp_metadata(self, entity_descr, entity, tag):
"""
Pick out the SP SSO descriptors from an entity
descriptor and store the information in a way which is easily
accessible.
:param entity_descriptor: A EntityDescriptor instance
:param entity_descr: A EntityDescriptor instance
"""
try:
ssd = entity_descriptor.spsso_descriptor
ssd = entity_descr.spsso_descriptor
except AttributeError:
return
@@ -120,7 +125,7 @@ class MetaData(object):
if required or optional:
#print "REQ",required
#print "OPT",optional
self._wants[entity_descriptor.entity_id] = (ava_fro(self.attrconv,
self._wants[entity_descr.entity_id] = (ava_fro(self.attrconv,
required),
ava_fro(self.attrconv,
optional))
@@ -128,16 +133,16 @@ class MetaData(object):
if ssds:
entity[tag] = ssds
def _idp_metadata(self, entity_descriptor, entity, tag):
def _idp_metadata(self, entity_descr, entity, tag):
"""
Pick out the IdP SSO descriptors from an entity
descriptor and store the information in a way which is easily
accessible.
:param entity_descriptor: A EntityDescriptor instance
:param entity_descr: A EntityDescriptor instance
"""
try:
isd = entity_descriptor.idpsso_descriptor
isd = entity_descr.idpsso_descriptor
except AttributeError:
return
@@ -160,18 +165,18 @@ class MetaData(object):
if idps:
entity[tag] = idps
def _aad_metadata(self, entity_descriptor, entity, tag):
def _aad_metadata(self, entity_descr, entity, tag):
"""
Pick out the attribute authority descriptors from an entity
descriptor and store the information in a way which is easily
accessible.
:param entity_descriptor: A EntityDescriptor instance
:param entity_descr: A EntityDescriptor instance
"""
try:
attr_auth_descr = entity_descriptor.attribute_authority_descriptor
attr_auth_descr = entity_descr.attribute_authority_descriptor
except AttributeError:
#print "No Attribute AD: %s" % entity_descriptor.entity_id
#print "No Attribute AD: %s" % entity_descr.entity_id
return
aads = []
@@ -237,46 +242,46 @@ class MetaData(object):
# now = time.gmtime()
entities_descriptor = md.entities_descriptor_from_string(xml_str)
entities_descr = md.entities_descriptor_from_string(xml_str)
try:
valid(entities_descriptor.valid_until)
valid(entities_descr.valid_until)
except AttributeError:
pass
for entity_descriptor in entities_descriptor.entity_descriptor:
for entity_descr in entities_descr.entity_descriptor:
try:
if not valid(entity_descriptor.valid_until):
if not valid(entity_descr.valid_until):
if self.log:
self.log.info(
"Entity descriptor (entity id:%s) to old" % \
entity_descriptor.entity_id)
entity_descr.entity_id)
else:
print >> sys.stderr, \
"Entity descriptor (entity id:%s) to old" % \
entity_descriptor.entity_id
entity_descr.entity_id
continue
except AttributeError:
pass
try:
self._import[source].append(entity_descriptor.entity_id)
self._import[source].append(entity_descr.entity_id)
except KeyError:
self._import[source] = [entity_descriptor.entity_id]
self._import[source] = [entity_descr.entity_id]
entity = self.entity[entity_descriptor.entity_id] = {}
entity["valid_until"] = entities_descriptor.valid_until
self._idp_metadata(entity_descriptor, entity, "idp_sso")
self._sp_metadata(entity_descriptor, entity, "sp_sso")
self._aad_metadata(entity_descriptor, entity,
entity = self.entity[entity_descr.entity_id] = {}
entity["valid_until"] = entities_descr.valid_until
self._idp_metadata(entity_descr, entity, "idp_sso")
self._sp_metadata(entity_descr, entity, "sp_sso")
self._aad_metadata(entity_descr, entity,
"attribute_authority")
self._vo_metadata(entity_descriptor, entity, "affiliation")
self._vo_metadata(entity_descr, entity, "affiliation")
try:
entity["organization"] = entity_descriptor.organization
entity["organization"] = entity_descr.organization
except AttributeError:
pass
try:
entity["contact_person"] = entity_descriptor.contact_person
entity["contact_person"] = entity_descr.contact_person
except AttributeError:
pass
@@ -390,7 +395,6 @@ class MetaData(object):
:param entityid: The Entity ID
:return: A name
"""
name = ""
try:
org = self.entity[entity_id]["organization"]
@@ -408,7 +412,7 @@ class MetaData(object):
if name:
name = name.text
except KeyError:
pass
name = ""
return name
@@ -475,4 +479,306 @@ class MetaData(object):
if not name:
name = self._location(edict["idp_sso"])[0]
idps[entity_id] = (name, edict["idp_sso"])
return idps
return idps
DEFAULTS = {
"want_assertions_signed": "true",
"authn_requests_signed": "false",
"want_authn_requests_signed": "true",
}
ORG_ATTR_TRANSL = {
"organization_name": ("name", md.OrganizationName),
"organization_display_name": ("display_name", md.OrganizationDisplayName),
"organization_url": ("url", md.OrganizationURL)
}
def _localized_name(val, klass):
try:
(text, lang) = val
return klass(text=text, lang=lang)
except ValueError:
return klass(text=val)
def do_organization_info(conf):
""" decription of an organization in the configuration is
a dictionary of keys and values, where the values might be tuples.
"organization": {
"name": ("AB Exempel", "se"),
"display_name": ("AB Exempel", "se"),
"url": "http://www.example.org"
}
"""
try:
corg = conf["organization"]
org = md.Organization()
for dkey, (ckey, klass) in ORG_ATTR_TRANSL.items():
if ckey not in corg:
continue
if isinstance(corg[ckey], basestring):
setattr(org, dkey, [_localized_name(corg[ckey], klass)])
elif isinstance(corg[ckey], list):
setattr(org, dkey,
[_localized_name(n, klass) for n in corg[ckey]])
else:
setattr(org, dkey, [_localized_name(corg[ckey], klass)])
return org
except KeyError:
return None
def do_contact_person_info(conf):
""" Creates a ContactPerson instance from configuration information"""
contact_person = md.ContactPerson
cps = []
try:
for corg in conf["contact_person"]:
cper = md.ContactPerson()
for (key, classpec) in contact_person.c_children.values():
try:
value = corg[key]
data = []
if isinstance(classpec, list):
# What if value is not a list ?
if isinstance(value, basestring):
data = [classpec[0](text=value)]
else:
for val in value:
data.append(classpec[0](text=val))
else:
data = classpec(text=value)
setattr(cper, key, data)
except KeyError:
pass
for (prop, classpec, _) in contact_person.c_attributes.values():
try:
# should do a check for valid value
setattr(cper, prop, corg[prop])
except KeyError:
pass
cps.append(cper)
except KeyError:
pass
return cps
def do_key_descriptor(cert):
return md.KeyDescriptor(
key_info = ds.KeyInfo(
x509_data=ds.X509Data(
x509_certificate=ds.X509Certificate(text=cert)
)
)
)
def do_requested_attribute(attributes, acs, is_required="false"):
lista = []
for attr in attributes:
attr = from_local_name(acs, attr, NAME_FORMAT_URI)
args = {}
for key in attr.keyswv():
args[key] = getattr(attr, key)
args["is_required"] = is_required
lista.append(md.RequestedAttribute(**args))
return lista
ENDPOINTS = {
"sp": {
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"assertion_consumer_service": (md.AssertionConsumerService, True),
},
"idp":{
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"single_sign_on_service": (md.SingleSignOnService, False),
"name_id_mapping_service": (md.NameIDMappingService, False),
"assertion_id_request_service": (md.AssertionIDRequestService, False),
},
"aa":{
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"assertion_id_request_service": (md.AssertionIDRequestService, False),
"attribute_service": (md.AttributeService, False)
},
}
DEFAULT_BINDING = {
"assertion_consumer_service": BINDING_HTTP_POST,
"single_sign_on_service": BINDING_HTTP_POST,
"single_logout_service": BINDING_HTTP_POST,
"attribute_service": BINDING_SOAP,
"artifact_resolution_service": BINDING_SOAP
}
def do_endpoints(conf, endpoints):
service = {}
for endpoint, (eclass, indexed) in endpoints.items():
try:
servs = []
i = 1
for args in conf[endpoint]:
if isinstance(args, basestring): # Assume it's the location
args = {"location":args,
"binding": DEFAULT_BINDING[endpoint]}
if indexed:
args["index"] = "%d" % i
servs.append(factory(eclass, **args))
i += 1
service[endpoint] = servs
except KeyError:
pass
return service
def do_sp_sso_descriptor(servprov, acs, cert=None):
spsso = md.SPSSODescriptor()
spsso.protocol_support_enumeration = samlp.NAMESPACE
if servprov["endpoints"]:
for (endpoint, instlist) in do_endpoints(servprov["endpoints"],
ENDPOINTS["sp"]).items():
setattr(spsso, endpoint, instlist)
if cert:
spsso.key_descriptor = do_key_descriptor(cert)
for key in ["want_assertions_signed", "authn_requests_signed"]:
try:
setattr(spsso, key, "%s" % servprov[key])
except KeyError:
setattr(spsso, key, DEFAULTS[key])
requested_attributes = []
if "required_attributes" in servprov:
requested_attributes.extend(do_requested_attribute(
servprov["required_attributes"],
acs,
is_required="true"))
if "optional_attributes" in servprov:
requested_attributes.extend(do_requested_attribute(
servprov["optional_attributes"],
acs,
is_required="false"))
if requested_attributes:
spsso.attribute_consuming_service = [md.AttributeConsumingService(
requested_attribute=requested_attributes,
service_name= [md.ServiceName(lang="en",text=servprov["name"])]
)]
try:
spsso.attribute_consuming_service[0].service_description = [
md.ServiceDescription(
text=servprov["description"])]
except KeyError:
pass
# if "discovery_service" in sp:
# spsso.extensions= {"extension_elements":[
# {
# "tag":"DiscoveryResponse",
# "namespace":md.IDPDISC,
# "attributes": {
# "index":"1",
# "binding": md.IDPDISC,
# "location":sp["url"]
# }
# }
# ]}
return spsso
def do_idp_sso_descriptor(idp, cert=None):
idpsso = md.IDPSSODescriptor()
idpsso.protocol_support_enumeration = samlp.NAMESPACE
if idp["endpoints"]:
for (endpoint, instlist) in do_endpoints(idp["endpoints"],
ENDPOINTS["idp"]).items():
setattr(idpsso, endpoint, instlist)
if cert:
idpsso.key_descriptor = do_key_descriptor(cert)
for key in ["want_authn_requests_signed"]:
try:
setattr(idpsso, key, "%s" % idp[key])
except KeyError:
setattr(idpsso, key, DEFAULTS[key])
return idpsso
def do_aa_descriptor(ata, cert):
aad = md.AttributeAuthorityDescriptor()
aad.protocol_support_enumeration = samlp.NAMESPACE
if ata["endpoints"]:
for (endpoint, instlist) in do_endpoints(ata["endpoints"],
ENDPOINTS["aa"]).items():
setattr(aad, endpoint, instlist)
if cert:
aad.key_descriptor = do_key_descriptor(cert)
return aad
def entity_descriptor(confd, valid_for):
mycert = "".join(open(confd["cert_file"]).readlines()[1:-1])
if "attribute_map_dir" in confd:
attrconverters = ac_factory(confd["attribute_map_dir"])
else:
attrconverters = [AttributeConverter()]
#if "attribute_maps" in confd:
# (forward,backward) = parse_attribute_map(confd["attribute_maps"])
#else:
# backward = {}
entd = md.EntityDescriptor()
entd.entity_id = confd["entityid"]
if valid_for:
entd.valid_until = in_a_while(hours=valid_for)
entd.organization = do_organization_info(confd)
entd.contact_person = do_contact_person_info(confd)
if "sp" in confd["service"]:
# The SP
entd.spsso_descriptor = do_sp_sso_descriptor(confd["service"]["sp"],
attrconverters, mycert)
if "idp" in confd["service"]:
entd.idpsso_descriptor = do_idp_sso_descriptor(
confd["service"]["idp"], mycert)
if "aa" in confd["service"]:
entd.attribute_authority_descriptor = do_aa_descriptor(
confd["service"]["aa"], mycert)
return entd
def entities_descriptor(eds, valid_for, name, ident, sign, secc):
entities = md.EntitiesDescriptor(entity_descriptor= eds)
if valid_for:
entities.valid_until = in_a_while(hours=valid_for)
if name:
entities.name = name
if ident:
entities.id = ident
if sign:
entities.signature = pre_signature_part(ident)
if sign:
entities = secc.sign_statement_using_xmlsec("%s" % entities,
class_name(entities))
return entities

191
tests/test_61_makemeta.py Normal file
View File

@@ -0,0 +1,191 @@
from saml2 import metadata
from saml2 import md, make_instance
from saml2 import BINDING_HTTP_POST
from saml2.attribute_converter import from_local_name, ac_factory
from saml2.saml import NAME_FORMAT_URI
def _eq(l1,l2):
return set(l1) == set(l2)
SP = {
"name" : "Rolands SP",
"description": "One of the best SPs in business",
"endpoints": {
"single_logout_service" : ["http://localhost:8087/logout"],
"assertion_consumer_service" : [{"location":"http://localhost:8087/",
"binding":BINDING_HTTP_POST},]
},
"required_attributes": ["sn", "givenName", "mail"],
"optional_attributes": ["title"],
"idp": {
"" : "https://example.com/saml2/idp/SSOService.php",
},
}
IDP = {
"name" : "Rolands IdP",
"endpoints": {
"single_sign_on_service" : ["http://localhost:8088/sso"],
},
"policy": {
"default": {
"lifetime": {"minutes":15},
"attribute_restrictions": None, # means all I have
"name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"nameid_format": "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent",
}
}
}
def test_org_1():
desc = { "organization": {
"name": [("Example Company","en"), ("Exempel AB","se"), "Example",],
"display_name": ["Example AS", ("Voorbeeld AZ", "")],
"url": [("http://example.com","en")],
}}
org = metadata.do_organization_info(desc)
print org
assert isinstance(org, md.Organization)
print org.keyswv()
assert _eq(org.keyswv(), ['organization_name',
'organization_display_name','organization_url'])
assert len(org.organization_name) == 3
assert len(org.organization_display_name) == 2
assert len(org.organization_url) == 1
def test_org_2():
desc = { "organization": {
"name": [("Example Company","en"), ("Exempel AB","se"), "Example",],
"display_name": "Example AS",
"url": ("http://example.com","en"),
}}
org = metadata.do_organization_info(desc)
print org
assert _eq(org.keyswv(), ['organization_name',
'organization_display_name','organization_url'])
assert len(org.organization_name) == 3
assert len(org.organization_display_name) == 1
assert org.organization_display_name[0].text == 'Example AS'
assert len(org.organization_url) == 1
assert isinstance(org.organization_url[0], md.OrganizationURL)
assert org.organization_url[0].lang == "en"
assert org.organization_url[0].text == 'http://example.com'
def test_org_3():
desc = {"organization": { "display_name": ["Rolands SAML"] } }
org = metadata.do_organization_info(desc)
assert _eq(org.keyswv(), ['organization_display_name'])
assert len(org.organization_display_name) == 1
def test_contact_0():
conf = {"contact_person": [{
"given_name":"Roland",
"sur_name": "Hedberg",
"telephone_number": "+46 70 100 00 00",
"email_address": ["foo@eample.com", "foo@example.org"],
"contact_type": "technical"
}]}
contact_person = metadata.do_contact_person_info(conf)
assert _eq(contact_person[0].keyswv(), ['given_name', 'sur_name',
'contact_type', 'telephone_number',
"email_address"])
print contact_person[0]
person = contact_person[0]
assert person.contact_type == "technical"
assert isinstance(person.given_name, md.GivenName)
assert person.given_name.text == "Roland"
assert isinstance(person.sur_name, md.SurName)
assert person.sur_name.text == "Hedberg"
assert isinstance(person.telephone_number[0], md.TelephoneNumber)
assert person.telephone_number[0].text == "+46 70 100 00 00"
assert len(person.email_address) == 2
assert isinstance(person.email_address[0], md.EmailAddress)
assert person.email_address[0].text == "foo@eample.com"
def test_do_endpoints():
eps = metadata.do_endpoints(SP["endpoints"],
metadata.ENDPOINTS["sp"])
print eps
assert _eq(eps.keys(), ["assertion_consumer_service",
"single_logout_service"])
assert len(eps["single_logout_service"]) == 1
sls = eps["single_logout_service"][0]
assert sls.location == "http://localhost:8087/logout"
assert sls.binding == BINDING_HTTP_POST
assert len(eps["assertion_consumer_service"]) == 1
acs = eps["assertion_consumer_service"][0]
assert acs.location == "http://localhost:8087/"
assert acs.binding == BINDING_HTTP_POST
assert "artifact_resolution_service" not in eps
assert "manage_name_id_service" not in eps
def test_required_attributes():
attrconverters = ac_factory("../tests/attributemaps")
ras = metadata.do_requested_attribute(SP["required_attributes"],
attrconverters, is_required="true")
assert len(ras) == len(SP["required_attributes"])
print ras[0]
assert ras[0].name == 'urn:oid:2.5.4.4'
assert ras[0].name_format == NAME_FORMAT_URI
assert ras[0].is_required == "true"
def test_optional_attributes():
attrconverters = ac_factory("../tests/attributemaps")
ras = metadata.do_requested_attribute(SP["optional_attributes"],
attrconverters)
assert len(ras) == len(SP["optional_attributes"])
print ras[0]
assert ras[0].name == 'urn:oid:2.5.4.12'
assert ras[0].name_format == NAME_FORMAT_URI
assert ras[0].is_required == "false"
def test_do_sp_sso_descriptor():
attrconverters = ac_factory("../tests/attributemaps")
spsso = metadata.do_sp_sso_descriptor(SP, attrconverters)
assert isinstance(spsso, md.SPSSODescriptor)
assert _eq(spsso.keyswv(), ['authn_requests_signed',
'attribute_consuming_service',
'single_logout_service',
'protocol_support_enumeration',
'assertion_consumer_service',
'want_assertions_signed'])
assert spsso.authn_requests_signed == "false"
assert spsso.want_assertions_signed == "true"
len (spsso.attribute_consuming_service) == 1
acs = spsso.attribute_consuming_service[0]
print acs.keyswv()
assert _eq(acs.keyswv(), ['requested_attribute', 'service_name',
'service_description'])
assert acs.service_name[0].text == SP["name"]
assert acs.service_description[0].text == SP["description"]
assert len(acs.requested_attribute) == 4
assert acs.requested_attribute[0].friendly_name == "sn"
assert acs.requested_attribute[0].name == 'urn:oid:2.5.4.4'
assert acs.requested_attribute[0].name_format == NAME_FORMAT_URI
assert acs.requested_attribute[0].is_required == "true"
def test_entity_description():
confd = eval(open("../tests/server.config").read())
entd = metadata.entity_descriptor(confd, 1)
assert entd != None
print entd.keyswv()
assert _eq(entd.keyswv(), ['valid_until', 'entity_id', 'contact_person',
'spsso_descriptor', 'organization'])
print entd
assert entd.entity_id == "urn:mace:example.com:saml:roland:sp"
def test_do_idp_sso_descriptor():
idpsso = metadata.do_idp_sso_descriptor(IDP)
assert isinstance(idpsso, md.IDPSSODescriptor)
assert _eq(idpsso.keyswv(), ['protocol_support_enumeration',
'single_sign_on_service',
'want_authn_requests_signed'])

View File

@@ -1,15 +1,9 @@
#!/usr/bin/env python
import os
import getopt
import xmldsig as ds
from saml2 import utils, md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP, class_name, make_instance
from saml2.time_util import in_a_while
from saml2.s_utils import parse_attribute_map, factory
from saml2.saml import NAME_FORMAT_URI
from saml2.sigver import pre_signature_part, SecurityContext
from saml2.attribute_converter import from_local_name, ac_factory
from saml2.metadata import entity_descriptor, entities_descriptor
from saml2.sigver import SecurityContext
HELP_MESSAGE = """
Usage: make_metadata [options] 1*configurationfile
@@ -29,304 +23,6 @@ class Usage(Exception):
def __init__(self, msg):
self.msg = msg
DEFAULTS = {
"want_assertions_signed": "true",
"authn_requests_signed": "false",
"want_authn_requests_signed": "true",
}
ORG_ATTR_TRANSL = {
"organization_name": ("name", md.OrganizationName),
"organization_display_name": ("display_name", md.OrganizationDisplayName),
"organization_url": ("url", md.OrganizationURL)
}
def _localized_name(val, klass):
try:
(text,lang) = val
return klass(text=text,lang=lang)
except ValueError:
return klass(text=val)
def do_organization_info(conf):
""" decription of an organization in the configuration is
a dictionary of keys and values, where the values might be tuples.
"organization": {
"name": ("AB Exempel", "se"),
"display_name": ("AB Exempel", "se"),
"url": "http://www.example.org"
}
"""
try:
corg = conf["organization"]
org = md.Organization()
for dkey, (ckey, klass) in ORG_ATTR_TRANSL.items():
if ckey not in corg:
continue
if isinstance(corg[ckey], basestring):
setattr(org, dkey, [_localized_name(corg[ckey], klass)])
elif isinstance(corg[ckey], list):
setattr(org, dkey, [_localized_name(n, klass) for n in corg[ckey]])
else:
setattr(org, dkey, [_localized_name(corg[ckey], klass)])
return org
except KeyError:
return None
def do_contact_person_info(conf):
"""
"""
contact_person = md.ContactPerson
cps = []
try:
for corg in conf["contact_person"]:
cp = md.ContactPerson()
for (key, classpec) in contact_person.c_children.values():
try:
value = corg[key]
data = []
if isinstance(classpec, list):
# What if value is not a list ?
if isinstance(value, basestring):
data = [classpec[0](text=value)]
else:
for val in value:
data.append(classpec[0](text=val))
else:
data = classpec(text=value)
setattr(cp, key, data)
except KeyError:
pass
for (prop, classpec, req) in contact_person.c_attributes.values():
try:
# should do a check for valid value
setattr(cp, prop, corg[prop])
except KeyError:
pass
cps.append(cp)
except KeyError:
pass
return cps
def do_key_descriptor(cert):
return md.KeyDescriptor(
key_info=ds.KeyInfo(
x509_data=ds.X509Data(
x509_certificate=ds.X509Certificate(text=cert)
)
)
)
def do_requested_attribute(attributes, acs, is_required="false"):
lista = []
for attr in attributes:
attr = from_local_name(acs, attr, NAME_FORMAT_URI)
args = {}
for key in attr.keyswv():
args[key] = getattr(attr,key)
args["is_required"] = is_required
lista.append(md.RequestedAttribute(**args))
return lista
ENDPOINTS = {
"sp": {
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"assertion_consumer_service": (md.AssertionConsumerService, True),
},
"idp":{
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"single_sign_on_service": (md.SingleSignOnService, False),
"name_id_mapping_service": (md.NameIDMappingService, False),
"assertion_id_request_service": (md.AssertionIDRequestService, False),
},
"aa":{
"artifact_resolution_service": (md.ArtifactResolutionService, True),
"single_logout_service": (md.SingleLogoutService, False),
"manage_name_id_service": (md.ManageNameIDService, False),
"assertion_id_request_service": (md.AssertionIDRequestService, False),
"attribute_service": (md.AttributeService, False)
},
}
DEFAULT_BINDING = {
"assertion_consumer_service": BINDING_HTTP_POST,
"single_sign_on_service": BINDING_HTTP_POST,
"single_logout_service": BINDING_HTTP_POST,
"attribute_service": BINDING_SOAP,
"artifact_resolution_service": BINDING_SOAP
}
def do_endpoints(conf, endpoints):
service = {}
for endpoint, (eclass, indexed) in endpoints.items():
try:
servs = []
i = 1
for args in conf[endpoint]:
if isinstance(args, basestring): # Assume it's the location
args = {"location":args, "binding": DEFAULT_BINDING[endpoint]}
if indexed:
args["index"] = "%d" % i
servs.append(factory(eclass, **args))
i += 1
service[endpoint] = servs
except KeyError:
pass
return service
def do_sp_sso_descriptor(sp, acs, cert=None):
spsso = md.SPSSODescriptor()
spsso.protocol_support_enumeration=samlp.NAMESPACE
if sp["endpoints"]:
for (endpoint, instlist) in do_endpoints(sp["endpoints"],
ENDPOINTS["sp"]).items():
setattr(spsso, endpoint, instlist)
if cert:
spsso.key_descriptor=do_key_descriptor(cert)
for key in ["want_assertions_signed", "authn_requests_signed"]:
try:
setattr(spsso, key, "%s" % sp[key])
except KeyError:
setattr(spsso, key, DEFAULTS[key])
requested_attributes = []
if "required_attributes" in sp:
requested_attributes.extend(do_requested_attribute(
sp["required_attributes"],
acs,
is_required="true"))
if "optional_attributes" in sp:
requested_attributes.extend(do_requested_attribute(
sp["optional_attributes"],
acs,
is_required="false"))
if requested_attributes:
spsso.attribute_consuming_service = [md.AttributeConsumingService(
requested_attribute=requested_attributes,
service_name= [md.ServiceName(lang="en",text=sp["name"])]
)]
try:
spsso.attribute_consuming_service[0].service_description = [
md.ServiceDescription(text=sp["description"])]
except KeyError:
pass
# if "discovery_service" in sp:
# spsso.extensions= {"extension_elements":[
# {
# "tag":"DiscoveryResponse",
# "namespace":md.IDPDISC,
# "attributes": {
# "index":"1",
# "binding": md.IDPDISC,
# "location":sp["url"]
# }
# }
# ]}
return spsso
def do_idp_sso_descriptor(idp, cert=None):
idpsso = md.IDPSSODescriptor()
idpsso.protocol_support_enumeration=samlp.NAMESPACE
if idp["endpoints"]:
for (endpoint, instlist) in do_endpoints(idp["endpoints"],
ENDPOINTS["idp"]).items():
setattr(idpsso, endpoint, instlist)
if cert:
idpsso.key_descriptor=do_key_descriptor(cert)
for key in ["want_authn_requests_signed"]:
try:
setattr(idpsso, key, "%s" % idp[key])
except KeyError:
setattr(idpsso, key, DEFAULTS[key])
return idpsso
def do_aa_descriptor(aa, cert):
aa = md.AttributeAuthorityDescriptor()
aa.protocol_support_enumeration=samlp.NAMESPACE
if idp["endpoints"]:
for (endpoint, instlist) in do_endpoints(aa["endpoints"],
ENDPOINTS["aa"]).items():
setattr(aasso, endpoint, instlist)
if cert:
aa.key_descriptor=do_key_descriptor(cert)
return aa
def entity_descriptor(confd, valid_for):
mycert = "".join(open(confd["cert_file"]).readlines()[1:-1])
if "attribute_map_dir" in confd:
attrconverters = ac_factory(confd["attribute_map_dir"])
else:
attrconverters = [AttributeConverter()]
#if "attribute_maps" in confd:
# (forward,backward) = parse_attribute_map(confd["attribute_maps"])
#else:
# backward = {}
ed = md.EntityDescriptor()
ed.entity_id=confd["entityid"]
if valid_for:
ed.valid_until = in_a_while(hours=valid_for)
ed.organization = do_organization_info(confd)
ed.contact_person = do_contact_person_info(confd)
if "sp" in confd["service"]:
# The SP
ed.spsso_descriptor = do_sp_sso_descriptor(confd["service"]["sp"],
attrconverters, mycert)
if "idp" in confd["service"]:
ed.idpsso_descriptor = do_idp_sso_descriptor(
confd["service"]["idp"], mycert)
if "aa" in confd["service"]:
ed.attribute_authority_descriptor = do_aa_descriptor(
confd["service"]["aa"], mycert)
return ed
def entities_descriptor(eds, valid_for, name, id, sign, sc):
entities = md.EntitiesDescriptor(entity_descriptor= eds)
if valid_for:
entities.valid_until = in_a_while(hours=valid_for)
if name:
entities.name = name
if id:
entities.id = id
if sign:
entities.signature = pre_signature_part(id)
if sign:
entities = sc.sign_statement_using_xmlsec("%s" % entities,
class_name(entities))
return entities
def main(args):
try:
@@ -375,8 +71,8 @@ def main(args):
confd = eval(open(conf).read())
eds.append(entity_descriptor(confd, valid_for))
sc = SecurityContext(xmlsec, keyfile)
print entities_descriptor(eds, valid_for, name, id, sign, sc)
secc = SecurityContext(xmlsec, keyfile)
print entities_descriptor(eds, valid_for, name, id, sign, secc)
if __name__ == "__main__":
import sys