From 027c08fdcde2ad133a0cea01cd613287ac8a4306 Mon Sep 17 00:00:00 2001 From: Roland Hedberg Date: Sun, 8 Aug 2010 15:04:16 +0200 Subject: [PATCH] Moved functions from make_metadata to metadata to make them usable in other contexts --- src/saml2/metadata.py | 382 ++++++++++++++++++++++++++++++++++---- tests/test_61_makemeta.py | 191 +++++++++++++++++++ tools/make_metadata.py | 312 +------------------------------ 3 files changed, 539 insertions(+), 346 deletions(-) create mode 100644 tests/test_61_makemeta.py diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index 8a74f4b..5b8b1be 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -22,14 +22,19 @@ Contains classes and functions to alleviate the handling of SAML metadata import httplib2 import sys from decorator import decorator +import xmldsig as ds -from saml2 import md, BINDING_HTTP_POST -from saml2 import samlp, BINDING_HTTP_REDIRECT, BINDING_SOAP -#from saml2.time_util import str_to_time +from saml2 import md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT +from saml2 import BINDING_SOAP, class_name +from saml2.s_utils import factory +from saml2.saml import NAME_FORMAT_URI +from saml2.time_util import in_a_while +from saml2.time_util import valid +from saml2.attribute_converter import from_local_name, ac_factory +from saml2.attribute_converter import ava_fro, AttributeConverter +from saml2.sigver import pre_signature_part from saml2.sigver import make_temp, cert_from_key_info, verify_signature from saml2.sigver import pem_format -from saml2.time_util import valid -from saml2.attribute_converter import ava_fro @decorator def keep_updated(func, self, entity_id, *args, **kwargs): @@ -45,7 +50,7 @@ def keep_updated(func, self, entity_id, *args, **kwargs): class MetaData(object): """ A class to manage metadata information """ - def __init__(self, xmlsec_binary=None, attrconv=None, log=None, extras=None): + def __init__(self, xmlsec_binary=None, attrconv=None, log=None): self.log = log self.xmlsec_binary = xmlsec_binary self.attrconv = attrconv or [] @@ -58,7 +63,7 @@ class MetaData(object): self._import = {} self._wants = {} - def _vo_metadata(self, entity_descriptor, entity, tag): + def _vo_metadata(self, entity_descr, entity, tag): """ Pick out the Affiliation descriptors from an entity descriptor and store the information in a way which is easily @@ -67,7 +72,7 @@ class MetaData(object): :param entity_descriptor: A EntityDescriptor instance """ - afd = entity_descriptor.affiliation_descriptor + afd = entity_descr.affiliation_descriptor if afd: members = [member.text.strip() for member in afd.affiliate_member] @@ -75,16 +80,16 @@ class MetaData(object): if members != []: entity[tag] = members - def _sp_metadata(self, entity_descriptor, entity, tag): + def _sp_metadata(self, entity_descr, entity, tag): """ Pick out the SP SSO descriptors from an entity descriptor and store the information in a way which is easily accessible. - :param entity_descriptor: A EntityDescriptor instance + :param entity_descr: A EntityDescriptor instance """ try: - ssd = entity_descriptor.spsso_descriptor + ssd = entity_descr.spsso_descriptor except AttributeError: return @@ -120,7 +125,7 @@ class MetaData(object): if required or optional: #print "REQ",required #print "OPT",optional - self._wants[entity_descriptor.entity_id] = (ava_fro(self.attrconv, + self._wants[entity_descr.entity_id] = (ava_fro(self.attrconv, required), ava_fro(self.attrconv, optional)) @@ -128,16 +133,16 @@ class MetaData(object): if ssds: entity[tag] = ssds - def _idp_metadata(self, entity_descriptor, entity, tag): + def _idp_metadata(self, entity_descr, entity, tag): """ Pick out the IdP SSO descriptors from an entity descriptor and store the information in a way which is easily accessible. - :param entity_descriptor: A EntityDescriptor instance + :param entity_descr: A EntityDescriptor instance """ try: - isd = entity_descriptor.idpsso_descriptor + isd = entity_descr.idpsso_descriptor except AttributeError: return @@ -160,18 +165,18 @@ class MetaData(object): if idps: entity[tag] = idps - def _aad_metadata(self, entity_descriptor, entity, tag): + def _aad_metadata(self, entity_descr, entity, tag): """ Pick out the attribute authority descriptors from an entity descriptor and store the information in a way which is easily accessible. - :param entity_descriptor: A EntityDescriptor instance + :param entity_descr: A EntityDescriptor instance """ try: - attr_auth_descr = entity_descriptor.attribute_authority_descriptor + attr_auth_descr = entity_descr.attribute_authority_descriptor except AttributeError: - #print "No Attribute AD: %s" % entity_descriptor.entity_id + #print "No Attribute AD: %s" % entity_descr.entity_id return aads = [] @@ -237,46 +242,46 @@ class MetaData(object): # now = time.gmtime() - entities_descriptor = md.entities_descriptor_from_string(xml_str) + entities_descr = md.entities_descriptor_from_string(xml_str) try: - valid(entities_descriptor.valid_until) + valid(entities_descr.valid_until) except AttributeError: pass - for entity_descriptor in entities_descriptor.entity_descriptor: + for entity_descr in entities_descr.entity_descriptor: try: - if not valid(entity_descriptor.valid_until): + if not valid(entity_descr.valid_until): if self.log: self.log.info( "Entity descriptor (entity id:%s) to old" % \ - entity_descriptor.entity_id) + entity_descr.entity_id) else: print >> sys.stderr, \ "Entity descriptor (entity id:%s) to old" % \ - entity_descriptor.entity_id + entity_descr.entity_id continue except AttributeError: pass try: - self._import[source].append(entity_descriptor.entity_id) + self._import[source].append(entity_descr.entity_id) except KeyError: - self._import[source] = [entity_descriptor.entity_id] + self._import[source] = [entity_descr.entity_id] - entity = self.entity[entity_descriptor.entity_id] = {} - entity["valid_until"] = entities_descriptor.valid_until - self._idp_metadata(entity_descriptor, entity, "idp_sso") - self._sp_metadata(entity_descriptor, entity, "sp_sso") - self._aad_metadata(entity_descriptor, entity, + entity = self.entity[entity_descr.entity_id] = {} + entity["valid_until"] = entities_descr.valid_until + self._idp_metadata(entity_descr, entity, "idp_sso") + self._sp_metadata(entity_descr, entity, "sp_sso") + self._aad_metadata(entity_descr, entity, "attribute_authority") - self._vo_metadata(entity_descriptor, entity, "affiliation") + self._vo_metadata(entity_descr, entity, "affiliation") try: - entity["organization"] = entity_descriptor.organization + entity["organization"] = entity_descr.organization except AttributeError: pass try: - entity["contact_person"] = entity_descriptor.contact_person + entity["contact_person"] = entity_descr.contact_person except AttributeError: pass @@ -390,7 +395,6 @@ class MetaData(object): :param entityid: The Entity ID :return: A name """ - name = "" try: org = self.entity[entity_id]["organization"] @@ -408,7 +412,7 @@ class MetaData(object): if name: name = name.text except KeyError: - pass + name = "" return name @@ -475,4 +479,306 @@ class MetaData(object): if not name: name = self._location(edict["idp_sso"])[0] idps[entity_id] = (name, edict["idp_sso"]) - return idps \ No newline at end of file + return idps + + +DEFAULTS = { + "want_assertions_signed": "true", + "authn_requests_signed": "false", + "want_authn_requests_signed": "true", +} + +ORG_ATTR_TRANSL = { + "organization_name": ("name", md.OrganizationName), + "organization_display_name": ("display_name", md.OrganizationDisplayName), + "organization_url": ("url", md.OrganizationURL) +} + +def _localized_name(val, klass): + try: + (text, lang) = val + return klass(text=text, lang=lang) + except ValueError: + return klass(text=val) + +def do_organization_info(conf): + """ decription of an organization in the configuration is + a dictionary of keys and values, where the values might be tuples. + + "organization": { + "name": ("AB Exempel", "se"), + "display_name": ("AB Exempel", "se"), + "url": "http://www.example.org" + } + """ + try: + corg = conf["organization"] + org = md.Organization() + for dkey, (ckey, klass) in ORG_ATTR_TRANSL.items(): + if ckey not in corg: + continue + if isinstance(corg[ckey], basestring): + setattr(org, dkey, [_localized_name(corg[ckey], klass)]) + elif isinstance(corg[ckey], list): + setattr(org, dkey, + [_localized_name(n, klass) for n in corg[ckey]]) + else: + setattr(org, dkey, [_localized_name(corg[ckey], klass)]) + return org + except KeyError: + return None + +def do_contact_person_info(conf): + """ Creates a ContactPerson instance from configuration information""" + + contact_person = md.ContactPerson + cps = [] + try: + for corg in conf["contact_person"]: + cper = md.ContactPerson() + for (key, classpec) in contact_person.c_children.values(): + try: + value = corg[key] + data = [] + if isinstance(classpec, list): + # What if value is not a list ? + if isinstance(value, basestring): + data = [classpec[0](text=value)] + else: + for val in value: + data.append(classpec[0](text=val)) + else: + data = classpec(text=value) + setattr(cper, key, data) + except KeyError: + pass + for (prop, classpec, _) in contact_person.c_attributes.values(): + try: + # should do a check for valid value + setattr(cper, prop, corg[prop]) + except KeyError: + pass + cps.append(cper) + except KeyError: + pass + return cps + +def do_key_descriptor(cert): + return md.KeyDescriptor( + key_info = ds.KeyInfo( + x509_data=ds.X509Data( + x509_certificate=ds.X509Certificate(text=cert) + ) + ) + ) + +def do_requested_attribute(attributes, acs, is_required="false"): + lista = [] + for attr in attributes: + attr = from_local_name(acs, attr, NAME_FORMAT_URI) + args = {} + for key in attr.keyswv(): + args[key] = getattr(attr, key) + args["is_required"] = is_required + lista.append(md.RequestedAttribute(**args)) + return lista + +ENDPOINTS = { + "sp": { + "artifact_resolution_service": (md.ArtifactResolutionService, True), + "single_logout_service": (md.SingleLogoutService, False), + "manage_name_id_service": (md.ManageNameIDService, False), + "assertion_consumer_service": (md.AssertionConsumerService, True), + }, + "idp":{ + "artifact_resolution_service": (md.ArtifactResolutionService, True), + "single_logout_service": (md.SingleLogoutService, False), + "manage_name_id_service": (md.ManageNameIDService, False), + + "single_sign_on_service": (md.SingleSignOnService, False), + "name_id_mapping_service": (md.NameIDMappingService, False), + + "assertion_id_request_service": (md.AssertionIDRequestService, False), + }, + "aa":{ + "artifact_resolution_service": (md.ArtifactResolutionService, True), + "single_logout_service": (md.SingleLogoutService, False), + "manage_name_id_service": (md.ManageNameIDService, False), + + "assertion_id_request_service": (md.AssertionIDRequestService, False), + + "attribute_service": (md.AttributeService, False) + }, +} + +DEFAULT_BINDING = { + "assertion_consumer_service": BINDING_HTTP_POST, + "single_sign_on_service": BINDING_HTTP_POST, + "single_logout_service": BINDING_HTTP_POST, + "attribute_service": BINDING_SOAP, + "artifact_resolution_service": BINDING_SOAP +} + +def do_endpoints(conf, endpoints): + service = {} + + for endpoint, (eclass, indexed) in endpoints.items(): + try: + servs = [] + i = 1 + for args in conf[endpoint]: + if isinstance(args, basestring): # Assume it's the location + args = {"location":args, + "binding": DEFAULT_BINDING[endpoint]} + if indexed: + args["index"] = "%d" % i + servs.append(factory(eclass, **args)) + i += 1 + service[endpoint] = servs + except KeyError: + pass + return service + +def do_sp_sso_descriptor(servprov, acs, cert=None): + spsso = md.SPSSODescriptor() + spsso.protocol_support_enumeration = samlp.NAMESPACE + + if servprov["endpoints"]: + for (endpoint, instlist) in do_endpoints(servprov["endpoints"], + ENDPOINTS["sp"]).items(): + setattr(spsso, endpoint, instlist) + + if cert: + spsso.key_descriptor = do_key_descriptor(cert) + + for key in ["want_assertions_signed", "authn_requests_signed"]: + try: + setattr(spsso, key, "%s" % servprov[key]) + except KeyError: + setattr(spsso, key, DEFAULTS[key]) + + requested_attributes = [] + if "required_attributes" in servprov: + requested_attributes.extend(do_requested_attribute( + servprov["required_attributes"], + acs, + is_required="true")) + + if "optional_attributes" in servprov: + requested_attributes.extend(do_requested_attribute( + servprov["optional_attributes"], + acs, + is_required="false")) + + if requested_attributes: + spsso.attribute_consuming_service = [md.AttributeConsumingService( + requested_attribute=requested_attributes, + service_name= [md.ServiceName(lang="en",text=servprov["name"])] + )] + try: + spsso.attribute_consuming_service[0].service_description = [ + md.ServiceDescription( + text=servprov["description"])] + except KeyError: + pass + + # if "discovery_service" in sp: + # spsso.extensions= {"extension_elements":[ + # { + # "tag":"DiscoveryResponse", + # "namespace":md.IDPDISC, + # "attributes": { + # "index":"1", + # "binding": md.IDPDISC, + # "location":sp["url"] + # } + # } + # ]} + + return spsso + +def do_idp_sso_descriptor(idp, cert=None): + idpsso = md.IDPSSODescriptor() + idpsso.protocol_support_enumeration = samlp.NAMESPACE + + if idp["endpoints"]: + for (endpoint, instlist) in do_endpoints(idp["endpoints"], + ENDPOINTS["idp"]).items(): + setattr(idpsso, endpoint, instlist) + + if cert: + idpsso.key_descriptor = do_key_descriptor(cert) + + for key in ["want_authn_requests_signed"]: + try: + setattr(idpsso, key, "%s" % idp[key]) + except KeyError: + setattr(idpsso, key, DEFAULTS[key]) + + return idpsso + +def do_aa_descriptor(ata, cert): + aad = md.AttributeAuthorityDescriptor() + aad.protocol_support_enumeration = samlp.NAMESPACE + + if ata["endpoints"]: + for (endpoint, instlist) in do_endpoints(ata["endpoints"], + ENDPOINTS["aa"]).items(): + setattr(aad, endpoint, instlist) + + if cert: + aad.key_descriptor = do_key_descriptor(cert) + + return aad + +def entity_descriptor(confd, valid_for): + mycert = "".join(open(confd["cert_file"]).readlines()[1:-1]) + + if "attribute_map_dir" in confd: + attrconverters = ac_factory(confd["attribute_map_dir"]) + else: + attrconverters = [AttributeConverter()] + + #if "attribute_maps" in confd: + # (forward,backward) = parse_attribute_map(confd["attribute_maps"]) + #else: + # backward = {} + + entd = md.EntityDescriptor() + entd.entity_id = confd["entityid"] + + if valid_for: + entd.valid_until = in_a_while(hours=valid_for) + + entd.organization = do_organization_info(confd) + entd.contact_person = do_contact_person_info(confd) + + if "sp" in confd["service"]: + # The SP + entd.spsso_descriptor = do_sp_sso_descriptor(confd["service"]["sp"], + attrconverters, mycert) + if "idp" in confd["service"]: + entd.idpsso_descriptor = do_idp_sso_descriptor( + confd["service"]["idp"], mycert) + if "aa" in confd["service"]: + entd.attribute_authority_descriptor = do_aa_descriptor( + confd["service"]["aa"], mycert) + + return entd + +def entities_descriptor(eds, valid_for, name, ident, sign, secc): + entities = md.EntitiesDescriptor(entity_descriptor= eds) + if valid_for: + entities.valid_until = in_a_while(hours=valid_for) + if name: + entities.name = name + if ident: + entities.id = ident + + if sign: + entities.signature = pre_signature_part(ident) + + if sign: + entities = secc.sign_statement_using_xmlsec("%s" % entities, + class_name(entities)) + return entities diff --git a/tests/test_61_makemeta.py b/tests/test_61_makemeta.py new file mode 100644 index 0000000..1ed27f6 --- /dev/null +++ b/tests/test_61_makemeta.py @@ -0,0 +1,191 @@ +from saml2 import metadata +from saml2 import md, make_instance +from saml2 import BINDING_HTTP_POST +from saml2.attribute_converter import from_local_name, ac_factory +from saml2.saml import NAME_FORMAT_URI + +def _eq(l1,l2): + return set(l1) == set(l2) + +SP = { + "name" : "Rolands SP", + "description": "One of the best SPs in business", + "endpoints": { + "single_logout_service" : ["http://localhost:8087/logout"], + "assertion_consumer_service" : [{"location":"http://localhost:8087/", + "binding":BINDING_HTTP_POST},] + }, + "required_attributes": ["sn", "givenName", "mail"], + "optional_attributes": ["title"], + "idp": { + "" : "https://example.com/saml2/idp/SSOService.php", + }, +} + +IDP = { + "name" : "Rolands IdP", + "endpoints": { + "single_sign_on_service" : ["http://localhost:8088/sso"], + }, + "policy": { + "default": { + "lifetime": {"minutes":15}, + "attribute_restrictions": None, # means all I have + "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + }, + "urn:mace:example.com:saml:roland:sp": { + "lifetime": {"minutes": 5}, + "nameid_format": "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent", + } + } +} + +def test_org_1(): + desc = { "organization": { + "name": [("Example Company","en"), ("Exempel AB","se"), "Example",], + "display_name": ["Example AS", ("Voorbeeld AZ", "")], + "url": [("http://example.com","en")], + }} + org = metadata.do_organization_info(desc) + print org + assert isinstance(org, md.Organization) + print org.keyswv() + assert _eq(org.keyswv(), ['organization_name', + 'organization_display_name','organization_url']) + assert len(org.organization_name) == 3 + assert len(org.organization_display_name) == 2 + assert len(org.organization_url) == 1 + +def test_org_2(): + desc = { "organization": { + "name": [("Example Company","en"), ("Exempel AB","se"), "Example",], + "display_name": "Example AS", + "url": ("http://example.com","en"), + }} + org = metadata.do_organization_info(desc) + print org + assert _eq(org.keyswv(), ['organization_name', + 'organization_display_name','organization_url']) + assert len(org.organization_name) == 3 + assert len(org.organization_display_name) == 1 + assert org.organization_display_name[0].text == 'Example AS' + assert len(org.organization_url) == 1 + assert isinstance(org.organization_url[0], md.OrganizationURL) + assert org.organization_url[0].lang == "en" + assert org.organization_url[0].text == 'http://example.com' + +def test_org_3(): + desc = {"organization": { "display_name": ["Rolands SAML"] } } + org = metadata.do_organization_info(desc) + assert _eq(org.keyswv(), ['organization_display_name']) + assert len(org.organization_display_name) == 1 + +def test_contact_0(): + conf = {"contact_person": [{ + "given_name":"Roland", + "sur_name": "Hedberg", + "telephone_number": "+46 70 100 00 00", + "email_address": ["foo@eample.com", "foo@example.org"], + "contact_type": "technical" + }]} + contact_person = metadata.do_contact_person_info(conf) + assert _eq(contact_person[0].keyswv(), ['given_name', 'sur_name', + 'contact_type', 'telephone_number', + "email_address"]) + print contact_person[0] + person = contact_person[0] + assert person.contact_type == "technical" + assert isinstance(person.given_name, md.GivenName) + assert person.given_name.text == "Roland" + assert isinstance(person.sur_name, md.SurName) + assert person.sur_name.text == "Hedberg" + assert isinstance(person.telephone_number[0], md.TelephoneNumber) + assert person.telephone_number[0].text == "+46 70 100 00 00" + assert len(person.email_address) == 2 + assert isinstance(person.email_address[0], md.EmailAddress) + assert person.email_address[0].text == "foo@eample.com" + +def test_do_endpoints(): + eps = metadata.do_endpoints(SP["endpoints"], + metadata.ENDPOINTS["sp"]) + print eps + assert _eq(eps.keys(), ["assertion_consumer_service", + "single_logout_service"]) + + assert len(eps["single_logout_service"]) == 1 + sls = eps["single_logout_service"][0] + assert sls.location == "http://localhost:8087/logout" + assert sls.binding == BINDING_HTTP_POST + + assert len(eps["assertion_consumer_service"]) == 1 + acs = eps["assertion_consumer_service"][0] + assert acs.location == "http://localhost:8087/" + assert acs.binding == BINDING_HTTP_POST + + assert "artifact_resolution_service" not in eps + assert "manage_name_id_service" not in eps + +def test_required_attributes(): + attrconverters = ac_factory("../tests/attributemaps") + ras = metadata.do_requested_attribute(SP["required_attributes"], + attrconverters, is_required="true") + assert len(ras) == len(SP["required_attributes"]) + print ras[0] + assert ras[0].name == 'urn:oid:2.5.4.4' + assert ras[0].name_format == NAME_FORMAT_URI + assert ras[0].is_required == "true" + +def test_optional_attributes(): + attrconverters = ac_factory("../tests/attributemaps") + ras = metadata.do_requested_attribute(SP["optional_attributes"], + attrconverters) + assert len(ras) == len(SP["optional_attributes"]) + print ras[0] + assert ras[0].name == 'urn:oid:2.5.4.12' + assert ras[0].name_format == NAME_FORMAT_URI + assert ras[0].is_required == "false" + +def test_do_sp_sso_descriptor(): + attrconverters = ac_factory("../tests/attributemaps") + spsso = metadata.do_sp_sso_descriptor(SP, attrconverters) + + assert isinstance(spsso, md.SPSSODescriptor) + assert _eq(spsso.keyswv(), ['authn_requests_signed', + 'attribute_consuming_service', + 'single_logout_service', + 'protocol_support_enumeration', + 'assertion_consumer_service', + 'want_assertions_signed']) + + assert spsso.authn_requests_signed == "false" + assert spsso.want_assertions_signed == "true" + len (spsso.attribute_consuming_service) == 1 + acs = spsso.attribute_consuming_service[0] + print acs.keyswv() + assert _eq(acs.keyswv(), ['requested_attribute', 'service_name', + 'service_description']) + assert acs.service_name[0].text == SP["name"] + assert acs.service_description[0].text == SP["description"] + assert len(acs.requested_attribute) == 4 + assert acs.requested_attribute[0].friendly_name == "sn" + assert acs.requested_attribute[0].name == 'urn:oid:2.5.4.4' + assert acs.requested_attribute[0].name_format == NAME_FORMAT_URI + assert acs.requested_attribute[0].is_required == "true" + +def test_entity_description(): + confd = eval(open("../tests/server.config").read()) + entd = metadata.entity_descriptor(confd, 1) + assert entd != None + print entd.keyswv() + assert _eq(entd.keyswv(), ['valid_until', 'entity_id', 'contact_person', + 'spsso_descriptor', 'organization']) + print entd + assert entd.entity_id == "urn:mace:example.com:saml:roland:sp" + +def test_do_idp_sso_descriptor(): + idpsso = metadata.do_idp_sso_descriptor(IDP) + + assert isinstance(idpsso, md.IDPSSODescriptor) + assert _eq(idpsso.keyswv(), ['protocol_support_enumeration', + 'single_sign_on_service', + 'want_authn_requests_signed']) \ No newline at end of file diff --git a/tools/make_metadata.py b/tools/make_metadata.py index 0efa7db..67ecbd0 100755 --- a/tools/make_metadata.py +++ b/tools/make_metadata.py @@ -1,15 +1,9 @@ #!/usr/bin/env python import os import getopt -import xmldsig as ds -from saml2 import utils, md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT -from saml2 import BINDING_SOAP, class_name, make_instance -from saml2.time_util import in_a_while -from saml2.s_utils import parse_attribute_map, factory -from saml2.saml import NAME_FORMAT_URI -from saml2.sigver import pre_signature_part, SecurityContext -from saml2.attribute_converter import from_local_name, ac_factory +from saml2.metadata import entity_descriptor, entities_descriptor +from saml2.sigver import SecurityContext HELP_MESSAGE = """ Usage: make_metadata [options] 1*configurationfile @@ -29,304 +23,6 @@ class Usage(Exception): def __init__(self, msg): self.msg = msg -DEFAULTS = { - "want_assertions_signed": "true", - "authn_requests_signed": "false", - "want_authn_requests_signed": "true", -} - -ORG_ATTR_TRANSL = { - "organization_name": ("name", md.OrganizationName), - "organization_display_name": ("display_name", md.OrganizationDisplayName), - "organization_url": ("url", md.OrganizationURL) -} - -def _localized_name(val, klass): - try: - (text,lang) = val - return klass(text=text,lang=lang) - except ValueError: - return klass(text=val) - -def do_organization_info(conf): - """ decription of an organization in the configuration is - a dictionary of keys and values, where the values might be tuples. - - "organization": { - "name": ("AB Exempel", "se"), - "display_name": ("AB Exempel", "se"), - "url": "http://www.example.org" - } - """ - try: - corg = conf["organization"] - org = md.Organization() - for dkey, (ckey, klass) in ORG_ATTR_TRANSL.items(): - if ckey not in corg: - continue - if isinstance(corg[ckey], basestring): - setattr(org, dkey, [_localized_name(corg[ckey], klass)]) - elif isinstance(corg[ckey], list): - setattr(org, dkey, [_localized_name(n, klass) for n in corg[ckey]]) - else: - setattr(org, dkey, [_localized_name(corg[ckey], klass)]) - return org - except KeyError: - return None - -def do_contact_person_info(conf): - """ - """ - contact_person = md.ContactPerson - cps = [] - try: - for corg in conf["contact_person"]: - cp = md.ContactPerson() - for (key, classpec) in contact_person.c_children.values(): - try: - value = corg[key] - data = [] - if isinstance(classpec, list): - # What if value is not a list ? - if isinstance(value, basestring): - data = [classpec[0](text=value)] - else: - for val in value: - data.append(classpec[0](text=val)) - else: - data = classpec(text=value) - setattr(cp, key, data) - except KeyError: - pass - for (prop, classpec, req) in contact_person.c_attributes.values(): - try: - # should do a check for valid value - setattr(cp, prop, corg[prop]) - except KeyError: - pass - cps.append(cp) - except KeyError: - pass - return cps - -def do_key_descriptor(cert): - return md.KeyDescriptor( - key_info=ds.KeyInfo( - x509_data=ds.X509Data( - x509_certificate=ds.X509Certificate(text=cert) - ) - ) - ) - -def do_requested_attribute(attributes, acs, is_required="false"): - lista = [] - for attr in attributes: - attr = from_local_name(acs, attr, NAME_FORMAT_URI) - args = {} - for key in attr.keyswv(): - args[key] = getattr(attr,key) - args["is_required"] = is_required - lista.append(md.RequestedAttribute(**args)) - return lista - -ENDPOINTS = { - "sp": { - "artifact_resolution_service": (md.ArtifactResolutionService, True), - "single_logout_service": (md.SingleLogoutService, False), - "manage_name_id_service": (md.ManageNameIDService, False), - "assertion_consumer_service": (md.AssertionConsumerService, True), - }, - "idp":{ - "artifact_resolution_service": (md.ArtifactResolutionService, True), - "single_logout_service": (md.SingleLogoutService, False), - "manage_name_id_service": (md.ManageNameIDService, False), - - "single_sign_on_service": (md.SingleSignOnService, False), - "name_id_mapping_service": (md.NameIDMappingService, False), - - "assertion_id_request_service": (md.AssertionIDRequestService, False), - }, - "aa":{ - "artifact_resolution_service": (md.ArtifactResolutionService, True), - "single_logout_service": (md.SingleLogoutService, False), - "manage_name_id_service": (md.ManageNameIDService, False), - - "assertion_id_request_service": (md.AssertionIDRequestService, False), - - "attribute_service": (md.AttributeService, False) - }, -} - -DEFAULT_BINDING = { - "assertion_consumer_service": BINDING_HTTP_POST, - "single_sign_on_service": BINDING_HTTP_POST, - "single_logout_service": BINDING_HTTP_POST, - "attribute_service": BINDING_SOAP, - "artifact_resolution_service": BINDING_SOAP -} - -def do_endpoints(conf, endpoints): - service = {} - - for endpoint, (eclass, indexed) in endpoints.items(): - try: - servs = [] - i = 1 - for args in conf[endpoint]: - if isinstance(args, basestring): # Assume it's the location - args = {"location":args, "binding": DEFAULT_BINDING[endpoint]} - if indexed: - args["index"] = "%d" % i - servs.append(factory(eclass, **args)) - i += 1 - service[endpoint] = servs - except KeyError: - pass - return service - -def do_sp_sso_descriptor(sp, acs, cert=None): - spsso = md.SPSSODescriptor() - spsso.protocol_support_enumeration=samlp.NAMESPACE - - if sp["endpoints"]: - for (endpoint, instlist) in do_endpoints(sp["endpoints"], - ENDPOINTS["sp"]).items(): - setattr(spsso, endpoint, instlist) - - if cert: - spsso.key_descriptor=do_key_descriptor(cert) - - for key in ["want_assertions_signed", "authn_requests_signed"]: - try: - setattr(spsso, key, "%s" % sp[key]) - except KeyError: - setattr(spsso, key, DEFAULTS[key]) - - requested_attributes = [] - if "required_attributes" in sp: - requested_attributes.extend(do_requested_attribute( - sp["required_attributes"], - acs, - is_required="true")) - - if "optional_attributes" in sp: - requested_attributes.extend(do_requested_attribute( - sp["optional_attributes"], - acs, - is_required="false")) - - if requested_attributes: - spsso.attribute_consuming_service = [md.AttributeConsumingService( - requested_attribute=requested_attributes, - service_name= [md.ServiceName(lang="en",text=sp["name"])] - )] - try: - spsso.attribute_consuming_service[0].service_description = [ - md.ServiceDescription(text=sp["description"])] - except KeyError: - pass - - # if "discovery_service" in sp: - # spsso.extensions= {"extension_elements":[ - # { - # "tag":"DiscoveryResponse", - # "namespace":md.IDPDISC, - # "attributes": { - # "index":"1", - # "binding": md.IDPDISC, - # "location":sp["url"] - # } - # } - # ]} - - return spsso - -def do_idp_sso_descriptor(idp, cert=None): - idpsso = md.IDPSSODescriptor() - idpsso.protocol_support_enumeration=samlp.NAMESPACE - - if idp["endpoints"]: - for (endpoint, instlist) in do_endpoints(idp["endpoints"], - ENDPOINTS["idp"]).items(): - setattr(idpsso, endpoint, instlist) - - if cert: - idpsso.key_descriptor=do_key_descriptor(cert) - - for key in ["want_authn_requests_signed"]: - try: - setattr(idpsso, key, "%s" % idp[key]) - except KeyError: - setattr(idpsso, key, DEFAULTS[key]) - - return idpsso - -def do_aa_descriptor(aa, cert): - aa = md.AttributeAuthorityDescriptor() - aa.protocol_support_enumeration=samlp.NAMESPACE - - if idp["endpoints"]: - for (endpoint, instlist) in do_endpoints(aa["endpoints"], - ENDPOINTS["aa"]).items(): - setattr(aasso, endpoint, instlist) - - if cert: - aa.key_descriptor=do_key_descriptor(cert) - - return aa - -def entity_descriptor(confd, valid_for): - mycert = "".join(open(confd["cert_file"]).readlines()[1:-1]) - - if "attribute_map_dir" in confd: - attrconverters = ac_factory(confd["attribute_map_dir"]) - else: - attrconverters = [AttributeConverter()] - - #if "attribute_maps" in confd: - # (forward,backward) = parse_attribute_map(confd["attribute_maps"]) - #else: - # backward = {} - - ed = md.EntityDescriptor() - ed.entity_id=confd["entityid"] - - if valid_for: - ed.valid_until = in_a_while(hours=valid_for) - - ed.organization = do_organization_info(confd) - ed.contact_person = do_contact_person_info(confd) - - if "sp" in confd["service"]: - # The SP - ed.spsso_descriptor = do_sp_sso_descriptor(confd["service"]["sp"], - attrconverters, mycert) - if "idp" in confd["service"]: - ed.idpsso_descriptor = do_idp_sso_descriptor( - confd["service"]["idp"], mycert) - if "aa" in confd["service"]: - ed.attribute_authority_descriptor = do_aa_descriptor( - confd["service"]["aa"], mycert) - - return ed - -def entities_descriptor(eds, valid_for, name, id, sign, sc): - entities = md.EntitiesDescriptor(entity_descriptor= eds) - if valid_for: - entities.valid_until = in_a_while(hours=valid_for) - if name: - entities.name = name - if id: - entities.id = id - - if sign: - entities.signature = pre_signature_part(id) - - if sign: - entities = sc.sign_statement_using_xmlsec("%s" % entities, - class_name(entities)) - return entities - def main(args): try: @@ -375,8 +71,8 @@ def main(args): confd = eval(open(conf).read()) eds.append(entity_descriptor(confd, valid_for)) - sc = SecurityContext(xmlsec, keyfile) - print entities_descriptor(eds, valid_for, name, id, sign, sc) + secc = SecurityContext(xmlsec, keyfile) + print entities_descriptor(eds, valid_for, name, id, sign, secc) if __name__ == "__main__": import sys