Mapping between protocol request and metadata service description

This commit is contained in:
Roland Hedberg
2012-11-19 12:45:40 +01:00
parent 411fd23050
commit ce15badb7d

View File

@@ -59,6 +59,28 @@ from saml2.country_codes import D_COUNTRIES
logger = logging.getLogger(__name__)
REQ2SRV = {
# IDP
"authn_request": "single_sign_on_service",
"nameid_mapping_request": "name_id_mapping_service",
# AuthnAuthority
"authn_query": "authn_query_service",
# AttributeAuthority
"attribute_query": "attribute_service",
# PDP
"authz_decision_query": "authz_service",
# AuthnAuthority + IDP + PDP + AttributeAuthority
"assertion_id_request": "assertion_id_request_service",
# IDP + SP
"logout_query": "single_logout_service",
"manage_nameid_query": "manage_name_id_service",
"artifact_query": "artifact_resolution_service",
# SP
"assertion_response": "assertion_consumer_service",
"attribute_response": "attribute_consuming_service",
}
def metadata_extension_modules():
_pre = "saml2.extension"
res = []
@@ -106,6 +128,7 @@ class MetaData(object):
self._keys = {}
self._extension_modules = metadata_extension_modules()
self.post_load_process = post_load_process
self.entities_descr = {}
def _extensions(self, entity):
if entity.extensions:
@@ -401,6 +424,7 @@ class MetaData(object):
#print >> sys.stderr, "Loading %s" % (source,)
entities_descr = md.entities_descriptor_from_string(xml_str)
self.entities_descr[source] = entities_descr
if not entities_descr:
entity_descr = md.entity_descriptor_from_string(xml_str)
if entity_descr:
@@ -466,8 +490,55 @@ class MetaData(object):
loc[sso.binding] = sso.location
return loc
def _loc(self, desc, service, binding=None):
loc = []
for des in desc:
endps = getattr(des, service, None)
if endps:
for endp in endps:
if binding:
if binding == endp.binding:
loc.append(endp.location)
else:
loc.append((endp.binding, endp.location))
return loc
@keep_updated
def single_sign_on_services(self, entity_id,
def _service(self, entity_id, service, binding=None, typ=None):
""" Get me all single-sign-on services with a specified
entity ID that supports the specified version of binding.
:param entity_id: The EntityId
:param binding: A binding identifier
:param service: which service that is sought for
:param typ: Type of service (idp, attribute_authority, ...)
:return: list of single-sign-on service location run by the entity
with the specified EntityId. Or if no binding was specified
a list of 2-tuples (binding, location)
"""
loc = []
if typ is None:
try:
srv = self.entity[entity_id]
except KeyError:
return loc
for item in ["idpsso", "attribute_authority", "authn_authority",
"pdp", "role", "spsso"]:
if item in srv:
loc.extend(self._loc(srv[item], service, binding))
else:
try:
desc = self.entity[entity_id][typ]
except KeyError:
return loc
loc.extend(self._loc(desc, service, binding))
return loc
def single_sign_on_service(self, entity_id,
binding = BINDING_HTTP_REDIRECT):
""" Get me all single-sign-on services with a specified
entity ID that supports the specified version of binding.
@@ -478,24 +549,25 @@ class MetaData(object):
with the specified EntityId.
"""
loc = []
try:
idps = self.entity[entity_id]["idpsso"]
except KeyError:
return loc
#print idps
for idp in idps:
#print "==",idp.keyswv()
for sso in idp.single_sign_on_service:
#print "SSO",sso
if binding == sso.binding:
loc.append(sso.location)
return loc
return self._service(entity_id, "single_sign_on_service", binding,
"idpsso")
def name_id_mapping_service(self, entity_id, binding=BINDING_HTTP_REDIRECT):
""" Get me all nameID mapping services with a specified
entity ID that supports the specified version of binding.
:param entity_id: The EntityId
:param binding: A binding identifier
:return: list of single-sign-on service location run by the entity
with the specified EntityId.
"""
return self._service(entity_id, "name_id_mapping_service", binding,
"idpsso")
@keep_updated
def single_sign_on_services_with_uiinfo(self, entity_id,
binding = BINDING_HTTP_REDIRECT):
binding=BINDING_HTTP_REDIRECT):
""" Get me all single-sign-on services with a specified
entity ID that supports the specified version of binding.
@@ -524,35 +596,125 @@ class MetaData(object):
loc.append((sso.location, uiinfo))
return loc
@keep_updated
def single_logout_services(self, entity_id, typ,
binding = BINDING_HTTP_REDIRECT):
def single_logout_service(self, entity_id, binding=BINDING_HTTP_REDIRECT,
typ=None):
""" Get me all single-logout services that supports the specified
binding version.
:param entity_id: The EntityId
:param typ: "sp", "idp" or "aa"
:param binding: A binding identifier
:param typ: If a specific service is wanted
:return: list of single-logout service location run by the entity
with the specified EntityId.
"""
# May raise KeyError
#print >> sys.stderr, "%s" % self.entity[entity_id]
return self._service(entity_id, "single_logout_service", binding, typ)
loc = []
try:
sss = self.entity[entity_id]["%ssso" % typ]
except KeyError:
return loc
def authn_query_service(self, entity_id, binding=BINDING_SOAP):
""" Get me all authn_query services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:return: list service locations.
"""
return self._service(entity_id, "single_logout_service", binding,
typ="authn_authority")
def attribute_service(self, entity_id, binding=BINDING_HTTP_REDIRECT):
""" Get me all authn_query services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:return: list service locations.
"""
return self._service(entity_id, "attribute_service", binding,
typ="attribute_authority")
def authz_service(self, entity_id, binding=BINDING_SOAP, typ="pdp"):
""" Get me all authz services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:return: list service locations.
"""
return self._service(entity_id, "authz_service", binding, typ)
def assertion_id_request_service(self, entity_id,
binding=BINDING_SOAP, typ=None):
""" Get me all assertion_id_request services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:param typ: Type of service container (idpsso, .. )
:return: list service locations.
"""
return self._service(entity_id, "attribute_service", binding,
typ)
def manage_name_id_service(self, entity_id,
binding=BINDING_HTTP_REDIRECT, typ=None):
""" Get me all manage_name_id services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:param typ: Type of service container (idpsso, .. )
:return: list service locations.
"""
return self._service(entity_id, "manage_name_id_service", binding,
typ)
def artifact_resolution_service(self, entity_id,
binding=BINDING_HTTP_REDIRECT, typ=None):
""" Get me all artifact_resolution services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:param typ: Type of service container (idpsso, .. )
:return: list service locations.
"""
return self._service(entity_id, "artifact_resolution_service", binding,
typ)
def assertion_consumer_service(self, entity_id, binding=BINDING_HTTP_POST,
typ="spsso"):
""" Get me all artifact_resolution services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:param typ: Type of service container (idpsso, .. )
:return: list service locations.
"""
return self._service(entity_id, "assertion_consumer_service", binding,
typ)
def attribute_consuming_service(self, entity_id,
binding=BINDING_HTTP_REDIRECT, typ="spsso"):
""" Get me all artifact_resolution services that supports the specified
binding version.
:param entity_id: The EntityId
:param binding: A binding identifier
:param typ: Type of service container (idpsso, .. )
:return: list service locations.
"""
return self._service(entity_id, "attribute_consuming_service", binding,
typ)
for entity in sss:
for slo in entity.single_logout_service:
if binding == slo.binding:
loc.append(slo.location)
return loc
@keep_updated
def attribute_authority(self, entity_id):
try:
@@ -567,17 +729,6 @@ class MetaData(object):
except KeyError:
return []
def authz_service_endpoints(self, entity_id, binding=BINDING_SOAP):
try:
result = []
for pdp in self.entity[entity_id]["pdp"]:
for aserv in pdp.authz_service:
if aserv.binding == binding:
result.append(aserv.location)
return result
except KeyError:
return []
def locations(self):
""" Returns all the locations that are know using this metadata file.