Merge pull request #284 from rebeckag/optional_keydescriptor

KeyDescriptor is optional in metadata
This commit is contained in:
Roland Hedberg
2015-11-17 09:16:23 +01:00
2 changed files with 109 additions and 105 deletions

View File

@@ -314,7 +314,43 @@ class MetaData(object):
'''
Returns certificates for the given Entity
'''
raise NotImplementedError
ent = self[entity_id]
def extract_certs(srvs):
res = []
for srv in srvs:
if "key_descriptor" in srv:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
return res
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
res.extend(extract_certs(srvs))
else:
srvs = ent["%s_descriptor" % descriptor]
res = extract_certs(srvs)
return res
class InMemoryMetaData(MetaData):
@@ -511,45 +547,6 @@ class InMemoryMetaData(MetaData):
return res
def certs(self, entity_id, descriptor, use="signing"):
ent = self.__getitem__(entity_id)
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
else:
srvs = ent["%s_descriptor" % descriptor]
res = []
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
return res
def signed(self):
if self.entities_descr and self.entities_descr.signature:
return True
@@ -752,7 +749,7 @@ class MetaDataMDX(InMemoryMetaData):
raise KeyError
class MetadataStore(object):
class MetadataStore(MetaData):
def __init__(self, onts, attrc, config, ca_certs=None,
check_validity=True,
disable_ssl_certificate_validation=False,
@@ -1062,45 +1059,6 @@ class MetadataStore(object):
return name(_md[entity_id], langpref)
return None
def certs(self, entity_id, descriptor, use="signing"):
ent = self.__getitem__(entity_id)
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
else:
srvs = ent["%s_descriptor" % descriptor]
res = []
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
return res
def vo_members(self, entity_id):
ad = self.__getitem__(entity_id)["affiliation_descriptor"]
return [m["text"] for m in ad["affiliate_member"]]

View File

@@ -5,11 +5,9 @@ import re
from six.moves.urllib.parse import quote_plus
from saml2.config import Config
from saml2.httpbase import HTTPBase
from saml2.mdstore import MetadataStore, MetaDataMDX
from saml2.mdstore import destinations
from saml2.mdstore import name
from saml2 import md
from saml2 import sigver
from saml2 import BINDING_SOAP
@@ -20,7 +18,6 @@ from saml2 import saml
from saml2 import config
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import d_to_local_name
from saml2.extension import mdui
from saml2.extension import idpdisc
from saml2.extension import dri
@@ -29,12 +26,27 @@ from saml2.extension import ui
from saml2.s_utils import UnknownPrincipal
from saml2 import xmldsig
from saml2 import xmlenc
from pathutils import full_path
sec_config = config.Config()
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk="""
TEST_METADATA_STRING = """
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
@@ -51,21 +63,8 @@ TEST_METADATA_STRING = """
<ds:KeyInfo>
<ds:X509Data>
<ds:X509Certificate>
MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate>
{cert_data}
</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
@@ -85,7 +84,7 @@ TEST_METADATA_STRING = """
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>
"""
""".format(cert_data=TEST_CERT)
ONTS = {
saml.NAMESPACE: saml,
@@ -149,7 +148,7 @@ METADATACONF = {
}],
"11": [{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [(TEST_METADATA_STRING, )]
"metadata": [(TEST_METADATA_STRING,)]
}],
}
@@ -372,7 +371,7 @@ def test_load_string():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
#print(mds)
# print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
@@ -384,5 +383,52 @@ def test_load_string():
assert len(certs) == 1
def test_get_certs_from_metadata():
mds = MetadataStore(ONTS.values(), ATTRCONV, None)
mds.imp(METADATACONF["11"])
certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any")
certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert certs1[0] == certs2[0] == TEST_CERT
def test_get_certs_from_metadata_without_keydescriptor():
mds = MetadataStore(ONTS.values(), ATTRCONV, None)
mds.imp([{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [("""
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:shibmeta="urn:mace:shibboleth:metadata:1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
Name="urn:mace:example.com:test-1.0">
<EntityDescriptor
entityID="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
xml:base="swamid-1.0/idp.umu.se-saml2.xml">
<IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
<SingleSignOnService
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
Location="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"/>
</IDPSSODescriptor>
<Organization>
<OrganizationName xml:lang="en">Catalogix</OrganizationName>
<OrganizationDisplayName xml:lang="en">Catalogix</OrganizationDisplayName>
<OrganizationURL xml:lang="en">http://www.catalogix.se</OrganizationURL>
</Organization>
<ContactPerson contactType="technical">
<SurName>Hedberg</SurName>
<EmailAddress>datordrift@catalogix.se</EmailAddress>
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>""",)]
}])
certs = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert len(certs) == 0
if __name__ == "__main__":
test_load_local()
test_get_certs_from_metadata()