Merge branch 'roland_master' into example_updates

This commit is contained in:
Mathias Hedström
2015-10-06 13:33:06 +02:00
22 changed files with 1446 additions and 1022 deletions

View File

@@ -1,14 +1,18 @@
language: python
sudo: false
env:
- TOX_ENV=py27
- TOX_ENV=py34
addons:
apt:
packages:
- xmlsec1
services:
- mongodb
install:
- sudo apt-get install xmlsec1
script:
- ./setup.py test

View File

@@ -4,12 +4,14 @@ import logging
import re
import argparse
import os
from future.backports.http.cookies import SimpleCookie
import six
from saml2.extension.pefim import SPCertEnc
from saml2.metadata import create_metadata_string
import service_conf
from Cookie import SimpleCookie
from urlparse import parse_qs
from six.moves.urllib.parse import parse_qs
import sys
from saml2 import BINDING_HTTP_REDIRECT, element_to_extension_element
@@ -59,7 +61,7 @@ def dict_to_table(ava, lev=0, width=1):
txt = ['<table border=%s bordercolor="black">\n' % width]
for prop, valarr in ava.items():
txt.append("<tr>\n")
if isinstance(valarr, basestring):
if isinstance(valarr, six.string_types):
txt.append("<th>%s</th>\n" % str(prop))
try:
txt.append("<td>%s</td>\n" % valarr.encode("utf8"))

View File

@@ -1,7 +1,4 @@
import logging
from urllib import urlencode
from urlparse import parse_qs
from urlparse import urlsplit
import six
import time
import ldap
@@ -13,6 +10,8 @@ from saml2.httputil import Redirect
from saml2.httputil import Unauthorized
from saml2.httputil import parse_cookie
from six.moves.urllib.parse import urlencode, parse_qs, urlsplit
__author__ = 'rolandh'
logger = logging.getLogger(__name__)

View File

@@ -14,6 +14,8 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
import saml2.xmldsig as ds
from saml2.ident import decode, code
from saml2.httpbase import HTTPError
from saml2.s_utils import sid
@@ -161,7 +163,7 @@ class Saml2Client(Base):
return self.do_logout(name_id, entity_ids, reason, expire, sign)
def do_logout(self, name_id, entity_ids, reason, expire, sign=None,
expected_binding=None):
expected_binding=None, **kwargs):
"""
:param name_id: Identifier of the Subject (a NameID instance)
@@ -172,6 +174,7 @@ class Saml2Client(Base):
:param sign: Whether to sign the request or not
:param expected_binding: Specify the expected binding then not try it
all
:param kwargs: Extra key word arguments.
:return:
"""
# check time
@@ -203,9 +206,14 @@ class Saml2Client(Base):
destination = destinations(srvs)[0]
logger.info("destination to provider: %s" % destination)
try:
session_info = self.users.get_info_from(name_id, entity_id)
session_indexes = [session_info['session_index']]
except KeyError:
session_indexes = None
req_id, request = self.create_logout_request(
destination, entity_id, name_id=name_id, reason=reason,
expire=expire)
expire=expire, session_indexes=session_indexes)
# to_sign = []
if binding.startswith("http://"):
@@ -214,15 +222,23 @@ class Saml2Client(Base):
if sign is None:
sign = self.logout_requests_signed
sigalg = None
key = None
if sign:
if binding == BINDING_HTTP_REDIRECT:
sigalg = kwargs.get("sigalg", ds.sig_default)
key = kwargs.get("key", self.signkey)
srequest = str(request)
else:
srequest = self.sign(request)
else:
srequest = "%s" % request
srequest = str(request)
relay_state = self._relay_state(req_id)
http_info = self.apply_binding(binding, srequest, destination,
relay_state)
relay_state, sigalg=sigalg,
key=key)
if binding == BINDING_SOAP:
response = self.send(**http_info)

View File

@@ -6,8 +6,6 @@
to conclude its tasks.
"""
import threading
from six.moves.urllib.parse import urlencode
from six.moves.urllib.parse import urlparse
import six
from saml2.entity import Entity
@@ -26,8 +24,11 @@ import time
from saml2.soap import make_soap_enveloped_saml_thingy
from six.moves.urllib.parse import parse_qs
from six.moves.urllib.parse import urlencode
from six.moves.urllib.parse import urlparse
from saml2.s_utils import signature, UnravelError, exception_trace
from saml2.s_utils import signature
from saml2.s_utils import UnravelError
from saml2.s_utils import do_attributes
from saml2 import samlp, BINDING_SOAP, SAMLError

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python
from saml2.saml import NAME_FORMAT_URI
__author__ = 'rolandh'
@@ -93,6 +94,7 @@ SP_ARGS = [
"ecp",
"name_id_format",
"logout_requests_signed",
"requested_attribute_name_format"
]
AA_IDP_ARGS = [
@@ -236,6 +238,7 @@ class Config(object):
self.extensions = {}
self.attribute = []
self.attribute_profile = []
self.requested_attribute_name_format = NAME_FORMAT_URI
def setattr(self, context, attr, val):
if context == "":

View File

@@ -795,7 +795,7 @@ class MetadataStore(object):
self.ii += 1
key = self.ii
kwargs.update(_args)
_md = MetaData(self.onts, self.attrc, args[0], **kwargs)
_md = InMemoryMetaData(self.onts, self.attrc, args[0])
elif typ == "remote":
key = kwargs["url"]
for _key in ["node_name", "check_validity"]:

View File

@@ -59,6 +59,7 @@ bMDNS = b'"urn:oasis:names:tc:SAML:2.0:metadata"'
XMLNSXS = " xmlns:xs=\"http://www.w3.org/2001/XMLSchema\""
bXMLNSXS = b" xmlns:xs=\"http://www.w3.org/2001/XMLSchema\""
def metadata_tostring_fix(desc, nspair, xmlstring=""):
if not xmlstring:
xmlstring = desc.to_string(nspair)
@@ -73,8 +74,8 @@ def metadata_tostring_fix(desc, nspair, xmlstring=""):
return xmlstring
def create_metadata_string(configfile, config, valid, cert, keyfile, mid, name,
sign):
def create_metadata_string(configfile, config=None, valid=None, cert=None,
keyfile=None, mid=None, name=None, sign=None):
valid_for = 0
nspair = {"xs": "http://www.w3.org/2001/XMLSchema"}
# paths = [".", "/opt/local/bin"]
@@ -83,27 +84,22 @@ def create_metadata_string(configfile, config, valid, cert, keyfile, mid, name,
valid_for = int(valid) # Hours
eds = []
if config is not None:
eds.append(entity_descriptor(config))
else:
if config is None:
if configfile.endswith(".py"):
configfile = configfile[:-3]
config = Config().load_file(configfile, metadata_construction=True)
eds.append(entity_descriptor(config))
conf = Config()
conf.key_file = keyfile
conf.cert_file = cert
conf.key_file = config.key_file or keyfile
conf.cert_file = config.cert_file or cert
conf.debug = 1
conf.xmlsec_binary = config.xmlsec_binary
secc = security_context(conf)
if mid:
desc = entities_descriptor(eds, valid_for, name, mid,
eid, xmldoc = entities_descriptor(eds, valid_for, name, mid,
sign, secc)
valid_instance(desc)
return metadata_tostring_fix(desc, nspair)
else:
eid = eds[0]
if sign:
@@ -112,8 +108,7 @@ def create_metadata_string(configfile, config, valid, cert, keyfile, mid, name,
xmldoc = None
valid_instance(eid)
xmldoc = metadata_tostring_fix(eid, nspair, xmldoc)
return xmldoc
return metadata_tostring_fix(eid, nspair, xmldoc)
def _localized_name(val, klass):
@@ -239,15 +234,19 @@ def do_key_descriptor(cert=None, enc_cert=None, use="both"):
return kd_list
def do_requested_attribute(attributes, acs, is_required="false"):
def do_requested_attribute(attributes, acs, is_required="false",
name_format=NAME_FORMAT_URI):
lista = []
for attr in attributes:
attr = from_local_name(acs, attr, NAME_FORMAT_URI)
attr = from_local_name(acs, attr, name_format)
args = {}
if isinstance(attr, six.string_types):
args["name"] = attr
else:
for key in attr.keyswv():
args[key] = getattr(attr, key)
args["is_required"] = is_required
args["name_format"] = NAME_FORMAT_URI
args["name_format"] = name_format
lista.append(md.RequestedAttribute(**args))
return lista
@@ -344,6 +343,7 @@ def do_idpdisc(discovery_response):
return idpdisc.DiscoveryResponse(index="0", location=discovery_response,
binding=idpdisc.NAMESPACE)
ENDPOINTS = {
"sp": {
"artifact_resolution_service": (md.ArtifactResolutionService, True),
@@ -423,7 +423,8 @@ def do_endpoints(conf, endpoints):
servs = []
i = 1
for args in conf[endpoint]:
if isinstance(args, six.string_types): # Assume it's the location
if isinstance(args,
six.string_types): # Assume it's the location
args = {"location": args,
"binding": DEFAULT_BINDING[endpoint]}
elif isinstance(args, tuple) or isinstance(args, list):
@@ -451,6 +452,7 @@ def do_endpoints(conf, endpoints):
pass
return service
DEFAULT = {
"want_assertions_signed": "true",
"authn_requests_signed": "false",
@@ -460,19 +462,25 @@ DEFAULT = {
def do_attribute_consuming_service(conf, spsso):
service_description = service_name = None
requested_attributes = []
acs = conf.attribute_converters
req = conf.getattr("required_attributes", "sp")
req_attr_name_format = conf.getattr("requested_attribute_name_format", "sp")
if req_attr_name_format is None:
req_attr_name_format = conf.requested_attribute_name_format
if req:
requested_attributes.extend(do_requested_attribute(req, acs,
is_required="true"))
requested_attributes.extend(
do_requested_attribute(req, acs, is_required="true",
name_format=req_attr_name_format))
opt = conf.getattr("optional_attributes", "sp")
if opt:
requested_attributes.extend(do_requested_attribute(opt, acs))
requested_attributes.extend(
do_requested_attribute(opt, acs, name_format=req_attr_name_format))
try:
if conf.description:
@@ -548,7 +556,8 @@ def do_spsso_descriptor(conf, cert=None, enc_cert=None):
if cert or enc_cert:
metadata_key_usage = conf.metadata_key_usage
spsso.key_descriptor = do_key_descriptor(cert=cert, enc_cert=enc_cert, use=metadata_key_usage)
spsso.key_descriptor = do_key_descriptor(cert=cert, enc_cert=enc_cert,
use=metadata_key_usage)
for key in ["want_assertions_signed", "authn_requests_signed"]:
try:
@@ -596,7 +605,8 @@ def do_idpsso_descriptor(conf, cert=None, enc_cert=None):
idpsso.extensions.add_extension_element(do_uiinfo(ui_info))
if cert or enc_cert:
idpsso.key_descriptor = do_key_descriptor(cert, enc_cert, use=conf.metadata_key_usage)
idpsso.key_descriptor = do_key_descriptor(cert, enc_cert,
use=conf.metadata_key_usage)
for key in ["want_authn_requests_signed"]:
# "want_authn_requests_only_with_valid_cert"]:
@@ -626,7 +636,8 @@ def do_aa_descriptor(conf, cert=None, enc_cert=None):
_do_nameid_format(aad, conf, "aa")
if cert or enc_cert:
aad.key_descriptor = do_key_descriptor(cert, enc_cert, use=conf.metadata_key_usage)
aad.key_descriptor = do_key_descriptor(cert, enc_cert,
use=conf.metadata_key_usage)
attributes = conf.getattr("attribute", "aa")
if attributes:
@@ -655,7 +666,8 @@ def do_aq_descriptor(conf, cert=None, enc_cert=None):
_do_nameid_format(aqs, conf, "aq")
if cert or enc_cert:
aqs.key_descriptor = do_key_descriptor(cert, enc_cert, use=conf.metadata_key_usage)
aqs.key_descriptor = do_key_descriptor(cert, enc_cert,
use=conf.metadata_key_usage)
return aqs
@@ -676,7 +688,8 @@ def do_pdp_descriptor(conf, cert=None, enc_cert=None):
_do_nameid_format(pdp, conf, "pdp")
if cert:
pdp.key_descriptor = do_key_descriptor(cert, enc_cert, use=conf.metadata_key_usage)
pdp.key_descriptor = do_key_descriptor(cert, enc_cert,
use=conf.metadata_key_usage)
return pdp
@@ -693,7 +706,8 @@ def entity_descriptor(confd):
if confd.encryption_keypairs is not None:
enc_cert = []
for _encryption in confd.encryption_keypairs:
enc_cert.append("".join(open(_encryption["cert_file"]).readlines()[1:-1]))
enc_cert.append(
"".join(open(_encryption["cert_file"]).readlines()[1:-1]))
entd = md.EntityDescriptor()
entd.entity_id = confd.entityid
@@ -727,13 +741,15 @@ def entity_descriptor(confd):
entd.idpsso_descriptor = do_idpsso_descriptor(confd, mycert, enc_cert)
if "aa" in serves:
confd.context = "aa"
entd.attribute_authority_descriptor = do_aa_descriptor(confd, mycert, enc_cert)
entd.attribute_authority_descriptor = do_aa_descriptor(confd, mycert,
enc_cert)
if "pdp" in serves:
confd.context = "pdp"
entd.pdp_descriptor = do_pdp_descriptor(confd, mycert, enc_cert)
if "aq" in serves:
confd.context = "aq"
entd.authn_authority_descriptor = do_aq_descriptor(confd, mycert, enc_cert)
entd.authn_authority_descriptor = do_aq_descriptor(confd, mycert,
enc_cert)
return entd

View File

@@ -265,6 +265,7 @@ class StatusResponse(object):
self.require_response_signature = False
self.not_signed = False
self.asynchop = asynchop
self.do_not_verify = False
def _clear(self):
self.xmlstr = ""
@@ -316,10 +317,16 @@ class StatusResponse(object):
else:
self.origxml = self.xmlstr
if self.do_not_verify:
args = {"do_not_verify": True}
else:
args = {}
try:
self.response = self.signature_check(
xmldata, origdoc=origxml, must=self.require_signature,
require_response_signature=self.require_response_signature)
require_response_signature=self.require_response_signature,
**args)
except TypeError:
raise
@@ -759,7 +766,7 @@ class AuthnResponse(StatusResponse):
raise SignatureError("Signature missing for assertion")
else:
logger.debug("signed")
if not verified:
if not verified and self.do_not_verify is False:
try:
self.sec.check_signature(assertion, class_name(assertion),self.xmlstr)
except Exception as exc:
@@ -990,6 +997,10 @@ class AuthnResponse(StatusResponse):
res = []
for astat in self.assertion.authn_statement:
context = astat.authn_context
try:
authn_instant = astat.authn_instant
except AttributeError:
authn_instant = ""
if context:
try:
aclass = context.authn_context_class_ref.text
@@ -1000,7 +1011,7 @@ class AuthnResponse(StatusResponse):
context.authenticating_authority]
except AttributeError:
authn_auth = []
res.append((aclass, authn_auth))
res.append((aclass, authn_auth, authn_instant))
return res
def authz_decision_info(self):
@@ -1025,9 +1036,11 @@ class AuthnResponse(StatusResponse):
"issuer": self.issuer(), "not_on_or_after": nooa,
"authz_decision_info": self.authz_decision_info()}
else:
authn_statement = self.assertion.authn_statement[0]
return {"ava": self.ava, "name_id": self.name_id,
"came_from": self.came_from, "issuer": self.issuer(),
"not_on_or_after": nooa, "authn_info": self.authn_info()}
"not_on_or_after": nooa, "authn_info": self.authn_info(),
"session_index": authn_statement.session_index}
def __str__(self):
if not isinstance(self.xmlstr, six.string_types):

View File

@@ -644,7 +644,7 @@ class Server(Entity):
encrypted_advice_attributes=encrypted_advice_attributes,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
pefim=peifm)
pefim=pefim)
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id

View File

@@ -353,7 +353,10 @@ def make_temp(string, suffix="", decode=True, delete=True):
xmlsec function).
"""
ntf = NamedTemporaryFile(suffix=suffix, delete=delete)
assert isinstance(string, six.binary_type)
# Python3 tempfile requires byte-like object
if not isinstance(string, six.binary_type):
string = string.encode("utf8")
if decode:
ntf.write(base64.b64decode(string))
else:
@@ -657,6 +660,12 @@ LOG_LINE = 60 * "=" + "\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
LOG_LINE_2 = 60 * "=" + "\n%s\n%s\n" + 60 * "-" + "\n%s" + 60 * "="
def make_str(txt):
if isinstance(txt, six.string_types):
return txt
else:
return txt.decode("utf8")
# ---------------------------------------------------------------------------
@@ -674,29 +683,32 @@ def read_cert_from_file(cert_file, cert_type):
return ""
if cert_type == "pem":
line = open(cert_file).read().replace("\r\n", "\n").split("\n")
_a = read_file(cert_file, 'rb').decode("utf8")
_b = _a.replace("\r\n", "\n")
lines = _b.split("\n")
if line[0] == "-----BEGIN CERTIFICATE-----":
line = line[1:]
elif line[0] == "-----BEGIN PUBLIC KEY-----":
line = line[1:]
for pattern in ("-----BEGIN CERTIFICATE-----",
"-----BEGIN PUBLIC KEY-----"):
if pattern in lines:
lines = lines[lines.index(pattern)+1:]
break
else:
raise CertificateError("Strange beginning of PEM file")
while line[-1] == "":
line = line[:-1]
if line[-1] == "-----END CERTIFICATE-----":
line = line[:-1]
elif line[-1] == "-----END PUBLIC KEY-----":
line = line[:-1]
for pattern in ("-----END CERTIFICATE-----",
"-----END PUBLIC KEY-----"):
if pattern in lines:
lines = lines[:lines.index(pattern)]
break
else:
raise CertificateError("Strange end of PEM file")
return "".join(line)
return make_str("".join(lines).encode("utf8"))
if cert_type in ["der", "cer", "crt"]:
data = read_file(cert_file)
return base64.b64encode(str(data))
data = read_file(cert_file, 'rb')
_cert = base64.b64encode(data)
return make_str(_cert)
class CryptoBackend():
@@ -850,8 +862,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
'id','Id' or 'ID'
:return: The signed statement
"""
if not isinstance(statement, six.binary_type):
statement = str(statement).encode('utf-8')
if isinstance(statement, SamlBase):
statement = str(statement)
_, fil = make_temp(statement, suffix=".xml",
decode=False, delete=self._xmlsec_delete_tmpfiles)
@@ -1284,8 +1296,6 @@ class SecurityContext(object):
self.encryption_keypairs = encryption_keypairs
self.enc_cert_type = enc_cert_type
self.my_cert = read_cert_from_file(cert_file, cert_type)
self.cert_handler = CertHandler(self, cert_file, cert_type, key_file,
@@ -1678,29 +1688,14 @@ class SecurityContext(object):
raise TypeError("Not a Response")
if response.signature:
self._check_signature(decoded_xml, response, class_name(response),
origdoc)
if "do_not_verify" in kwargs:
pass
else:
self._check_signature(decoded_xml, response,
class_name(response), origdoc)
elif require_response_signature:
raise SignatureError("Signature missing for response")
# if isinstance(response, Response) and response.assertion:
# # Try to find the signing cert in the assertion
# for assertion in response.assertion:
# if not hasattr(assertion, 'signature') or not assertion.signature:
# logger.debug("unsigned")
# if must:
# raise SignatureError("Signature missing for assertion")
# continue
# else:
# logger.debug("signed")
#
# try:
# self._check_signature(decoded_xml, assertion,
# class_name(assertion), origdoc)
# except Exception as exc:
# logger.error("correctly_signed_response: %s" % exc)
# raise
return response
#--------------------------------------------------------------------------

View File

@@ -280,7 +280,7 @@ def before(point):
elif isinstance(point, int):
point = time.gmtime(point)
return time.gmtime() < point
return time.gmtime() <= point
def after(point):

View File

@@ -1,11 +1,11 @@
import calendar
from six.moves.urllib.parse import urlparse
import re
from saml2 import time_util
import struct
import base64
# Also defined in saml2.saml but can't import from there
from saml2 import time_util
XSI_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
XSI_NIL = '{%s}nil' % XSI_NAMESPACE
# ---------------------------------------------------------
@@ -26,6 +26,14 @@ class MustValueError(ValueError):
class ShouldValueError(ValueError):
pass
class ResponseLifetimeExceed(Exception):
pass
class ToEarly(Exception):
pass
# --------------------- validators -------------------------------------
#
@@ -82,8 +90,8 @@ def validate_on_or_after(not_on_or_after, slack):
now = time_util.utc_now()
nooa = calendar.timegm(time_util.str_to_time(not_on_or_after))
if now > nooa + slack:
raise Exception("Can't use it, it's too old %d > %d" %
(nooa, now))
raise ResponseLifetimeExceed(
"Can't use it, it's too old %d > %d".format(nooa, now))
return nooa
else:
return False
@@ -94,7 +102,8 @@ def validate_before(not_before, slack):
now = time_util.utc_now()
nbefore = calendar.timegm(time_util.str_to_time(not_before))
if nbefore > now + slack:
raise Exception("Can't use it yet %d <= %d" % (nbefore, now))
raise ToEarly("Can't use it yet %d <= %d" % (nbefore,
now))
return True
@@ -447,6 +456,6 @@ def valid_instance(instance):
def valid_domain_name(dns_name):
m = re.match(
"^[a-z0-9]+([-.]{ 1 }[a-z0-9]+).[a-z]{2,5}(:[0-9]{1,5})?(\/.)?$",
dns_name, "ix")
dns_name, re.I)
if not m:
raise ValueError("Not a proper domain name")

View File

@@ -7,6 +7,7 @@
__author__ = 'roland.hedberg@umu.se (Roland Hedberg)'
import unittest
try:
from xml.etree import ElementTree
except ImportError:
@@ -23,8 +24,8 @@ from saml2.extension import shibmd
from saml2 import extension_element_to_element
import md_data, ds_data
class TestEndpointType:
class TestEndpointType:
def setup_class(self):
self.endpoint = md.EndpointType_()
@@ -37,18 +38,19 @@ class TestEndpointType:
new_endpoint = md.endpoint_type__from_string(self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for endpoint_type_from_string() using test data."""
new_endpoint = md.endpoint_type__from_string(md_data.TEST_ENDPOINT)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestIndexedEndpointType:
def setup_class(self):
self.i_e = md.IndexedEndpointType_()
@@ -68,7 +70,8 @@ class TestIndexedEndpointType:
def testUsingTestData(self):
"""Test for indexed_endpoint_type_from_string() using test data."""
new_i_e = md.indexed_endpoint_type__from_string(md_data.TEST_INDEXED_ENDPOINT)
new_i_e = md.indexed_endpoint_type__from_string(
md_data.TEST_INDEXED_ENDPOINT)
assert new_i_e.binding == saml2.BINDING_HTTP_POST
assert new_i_e.location == "http://www.example.com/endpoint"
assert new_i_e.response_location == "http://www.example.com/response"
@@ -77,7 +80,6 @@ class TestIndexedEndpointType:
class TestExtensions:
def setup_class(self):
self.extensions = md.Extensions()
@@ -94,7 +96,6 @@ class TestExtensions:
class TestOrganizationName:
def setup_class(self):
self.organization_name = md.OrganizationName()
@@ -117,7 +118,6 @@ class TestOrganizationName:
class TestOrganizationDisplayName:
def setup_class(self):
self.od_name = md.OrganizationDisplayName()
@@ -139,7 +139,6 @@ class TestOrganizationDisplayName:
class TestOrganizationURL:
def setup_class(self):
self.organization_url = md.OrganizationURL()
@@ -162,7 +161,6 @@ class TestOrganizationURL:
class TestOrganization:
def setup_class(self):
self.organization = md.Organization()
@@ -176,7 +174,8 @@ class TestOrganization:
md_data.TEST_ORGANIZATION_DISPLAY_NAME))
self.organization.organization_url.append(
md.organization_url_from_string(md_data.TEST_ORGANIZATION_URL))
new_organization = md.organization_from_string(self.organization.to_string())
new_organization = md.organization_from_string(
self.organization.to_string())
assert isinstance(new_organization.extensions, md.Extensions)
assert isinstance(new_organization.organization_name[0],
md.OrganizationName)
@@ -186,12 +185,13 @@ class TestOrganization:
md.OrganizationURL)
assert new_organization.organization_name[0].text.strip() == "Catalogix"
assert new_organization.organization_name[0].lang == "se"
assert new_organization.organization_display_name[0].text.strip() == "Catalogix"
assert new_organization.organization_display_name[
0].text.strip() == "Catalogix"
assert new_organization.organization_display_name[0].lang == "se"
assert new_organization.organization_url[0].text.strip() == "http://www.example.com/"
assert new_organization.organization_url[
0].text.strip() == "http://www.example.com/"
assert new_organization.organization_url[0].lang == "no"
def testUsingTestData(self):
"""Test for organization_from_string() using test data."""
new_organization = md.organization_from_string(
@@ -203,16 +203,18 @@ class TestOrganization:
md.OrganizationDisplayName)
assert isinstance(new_organization.organization_url[0],
md.OrganizationURL)
assert new_organization.organization_name[0].text.strip() == "Catalogix AB"
assert new_organization.organization_name[
0].text.strip() == "Catalogix AB"
assert new_organization.organization_name[0].lang == "se"
assert new_organization.organization_display_name[0].text.strip() == "Catalogix AS"
assert new_organization.organization_display_name[
0].text.strip() == "Catalogix AS"
assert new_organization.organization_display_name[0].lang == "no"
assert new_organization.organization_url[0].text.strip() == "http://www.example.com/"
assert new_organization.organization_url[
0].text.strip() == "http://www.example.com/"
assert new_organization.organization_url[0].lang == "en"
class TestContactPerson:
def setup_class(self):
self.contact_person = md.ContactPerson()
@@ -233,12 +235,16 @@ class TestContactPerson:
self.contact_person.to_string())
assert new_contact_person.contact_type == "technical"
assert isinstance(new_contact_person.extensions, md.Extensions)
assert new_contact_person.company.text.strip() == "SIOS Technology, Inc."
assert new_contact_person.company.text.strip() == "SIOS Technology, " \
"Inc."
assert new_contact_person.given_name.text.strip() == "Takashi"
assert new_contact_person.sur_name.text.strip() == "Matsuo"
assert new_contact_person.email_address[0].text.strip() == "tmatsuo@example.com"
assert new_contact_person.email_address[1].text.strip() == "tmatsuo@shehas.net"
assert new_contact_person.telephone_number[0].text.strip() == "00-0000-0000"
assert new_contact_person.email_address[
0].text.strip() == "tmatsuo@example.com"
assert new_contact_person.email_address[
1].text.strip() == "tmatsuo@shehas.net"
assert new_contact_person.telephone_number[
0].text.strip() == "00-0000-0000"
def testUsingTestData(self):
"""Test for contact_person_from_string() using test data."""
@@ -246,15 +252,19 @@ class TestContactPerson:
md_data.TEST_CONTACT_PERSON)
assert new_contact_person.contact_type == "technical"
assert isinstance(new_contact_person.extensions, md.Extensions)
assert new_contact_person.company.text.strip() == "SIOS Technology, Inc."
assert new_contact_person.company.text.strip() == "SIOS Technology, " \
"Inc."
assert new_contact_person.given_name.text.strip() == "Takashi"
assert new_contact_person.sur_name.text.strip() == "Matsuo"
assert new_contact_person.email_address[0].text.strip() == "tmatsuo@example.com"
assert new_contact_person.email_address[1].text.strip() == "tmatsuo@shehas.net"
assert new_contact_person.telephone_number[0].text.strip() == "00-0000-0000"
assert new_contact_person.email_address[
0].text.strip() == "tmatsuo@example.com"
assert new_contact_person.email_address[
1].text.strip() == "tmatsuo@shehas.net"
assert new_contact_person.telephone_number[
0].text.strip() == "00-0000-0000"
class TestAdditionalMetadataLocation:
def setup_class(self):
self.additional_metadata_location = md.AdditionalMetadataLocation()
@@ -264,17 +274,25 @@ class TestAdditionalMetadataLocation:
"http://www.example.com/namespace")
self.additional_metadata_location.text = (
"http://www.example.com/AdditionalMetadataLocation")
new_additional_metadata_location = md.additional_metadata_location_from_string(
new_additional_metadata_location = \
md.additional_metadata_location_from_string(
self.additional_metadata_location.to_string())
assert new_additional_metadata_location.namespace == "http://www.example.com/namespace"
assert new_additional_metadata_location.text.strip() == "http://www.example.com/AdditionalMetadataLocation"
assert new_additional_metadata_location.namespace == \
"http://www.example.com/namespace"
assert new_additional_metadata_location.text.strip() == \
"http://www.example.com/AdditionalMetadataLocation"
def testUsingTestData(self):
"""Test for additional_metadata_location_from_string() using test data."""
new_additional_metadata_location = md.additional_metadata_location_from_string(
"""Test for additional_metadata_location_from_string() using test
data."""
new_additional_metadata_location = \
md.additional_metadata_location_from_string(
md_data.TEST_ADDITIONAL_METADATA_LOCATION)
assert new_additional_metadata_location.namespace == "http://www.example.com/namespace"
assert new_additional_metadata_location.text.strip() == "http://www.example.com/AdditionalMetadataLocation"
assert new_additional_metadata_location.namespace == \
"http://www.example.com/namespace"
assert new_additional_metadata_location.text.strip() == \
"http://www.example.com/AdditionalMetadataLocation"
# class TestKeySize:
#
@@ -311,7 +329,6 @@ class TestAdditionalMetadataLocation:
class TestEncryptionMethod:
def setup_class(self):
self.encryption_method = md.EncryptionMethod()
@@ -321,18 +338,19 @@ class TestEncryptionMethod:
"http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p")
new_encryption_method = md.encryption_method_from_string(
self.encryption_method.to_string())
assert new_encryption_method.algorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"
assert new_encryption_method.algorithm == \
"http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"
def testUsingTestData(self):
"""Test for encryption_method_from_string() using test data."""
new_encryption_method = md.encryption_method_from_string(
md_data.TEST_ENCRYPTION_METHOD)
assert new_encryption_method.algorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"
assert new_encryption_method.algorithm == \
"http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"
assert new_encryption_method.oae_pparams.text.strip() == "9lWu3Q=="
class TestKeyDescriptor:
def setup_class(self):
self.key_descriptor = md.KeyDescriptor()
@@ -342,7 +360,8 @@ class TestKeyDescriptor:
self.key_descriptor.use = "signing"
self.key_descriptor.key_info = ds.key_info_from_string(
ds_data.TEST_KEY_INFO)
self.key_descriptor.encryption_method.append(md.encryption_method_from_string(
self.key_descriptor.encryption_method.append(
md.encryption_method_from_string(
md_data.TEST_ENCRYPTION_METHOD))
new_key_descriptor = md.key_descriptor_from_string(
self.key_descriptor.to_string())
@@ -374,7 +393,8 @@ class TestRoleDescriptor:
self.role_descriptor.error_url = "http://www.example.com/errorURL"
self.role_descriptor.signature = ds.Signature()
self.role_descriptor.extensions = md.Extensions()
self.role_descriptor.key_descriptor.append(md.key_descriptor_from_string(
self.role_descriptor.key_descriptor.append(
md.key_descriptor_from_string(
md_data.TEST_KEY_DESCRIPTOR))
self.role_descriptor.organization = md.Organization()
self.role_descriptor.contact_person.append(md.ContactPerson())
@@ -384,8 +404,10 @@ class TestRoleDescriptor:
assert new_role_descriptor.id == "ID"
assert new_role_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_role_descriptor.cache_duration == "10:00:00:00"
assert new_role_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_role_descriptor.error_url == "http://www.example.com/errorURL"
assert new_role_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_role_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_role_descriptor.signature, ds.Signature)
assert isinstance(new_role_descriptor.extensions, md.Extensions)
assert isinstance(new_role_descriptor.key_descriptor[0],
@@ -401,8 +423,10 @@ class TestRoleDescriptor:
assert new_role_descriptor.id == "ID"
assert new_role_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_role_descriptor.cache_duration == "10:00:00:00"
assert new_role_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_role_descriptor.error_url == "http://www.example.com/errorURL"
assert new_role_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_role_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_role_descriptor.signature, ds.Signature)
assert isinstance(new_role_descriptor.extensions, md.Extensions)
assert isinstance(new_role_descriptor.key_descriptor[0],
@@ -411,6 +435,7 @@ class TestRoleDescriptor:
assert isinstance(new_role_descriptor.contact_person[0],
md.ContactPerson)
# class TestSSODescriptor:
# def setup_class(self):
# self.sso_descriptor = md.SSODescriptorType_()
@@ -462,7 +487,6 @@ class TestRoleDescriptor:
#
class TestArtifactResolutionService:
def setup_class(self):
self.i_e = md.ArtifactResolutionService()
@@ -473,7 +497,8 @@ class TestArtifactResolutionService:
self.i_e.response_location = "http://www.example.com/response"
self.i_e.index = "1"
self.i_e.is_default = "false"
new_i_e = md.artifact_resolution_service_from_string(self.i_e.to_string())
new_i_e = md.artifact_resolution_service_from_string(
self.i_e.to_string())
assert new_i_e.binding == saml2.BINDING_HTTP_POST
assert new_i_e.location == "http://www.example.com/endpoint"
assert new_i_e.response_location == "http://www.example.com/response"
@@ -481,7 +506,8 @@ class TestArtifactResolutionService:
assert new_i_e.is_default == "false"
def testUsingTestData(self):
"""Test for artifact_resolution_service_from_string() using test data."""
"""Test for artifact_resolution_service_from_string() using test
data."""
new_i_e = md.artifact_resolution_service_from_string(
md_data.TEST_ARTIFACT_RESOLUTION_SERVICE)
assert new_i_e.binding == saml2.BINDING_HTTP_POST
@@ -492,7 +518,6 @@ class TestArtifactResolutionService:
class TestSingleLogout:
def setup_class(self):
self.endpoint = md.SingleLogoutService()
@@ -501,10 +526,12 @@ class TestSingleLogout:
self.endpoint.binding = saml2.BINDING_HTTP_POST
self.endpoint.location = "http://www.example.com/endpoint"
self.endpoint.response_location = "http://www.example.com/response"
new_endpoint = md.single_logout_service_from_string(self.endpoint.to_string())
new_endpoint = md.single_logout_service_from_string(
self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for single_logout_service_from_string() using test data."""
@@ -512,11 +539,11 @@ class TestSingleLogout:
md_data.TEST_SINGLE_LOGOUT_SERVICE)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestManageNameIDService:
def setup_class(self):
self.endpoint = md.ManageNameIDService()
@@ -525,10 +552,12 @@ class TestManageNameIDService:
self.endpoint.binding = saml2.BINDING_HTTP_POST
self.endpoint.location = "http://www.example.com/endpoint"
self.endpoint.response_location = "http://www.example.com/response"
new_endpoint = md.manage_name_id_service_from_string(self.endpoint.to_string())
new_endpoint = md.manage_name_id_service_from_string(
self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for manage_name_id_service_from_string() using test data."""
@@ -536,11 +565,11 @@ class TestManageNameIDService:
md_data.TEST_MANAGE_NAMEID_SERVICE)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestNameIDFormat:
def setup_class(self):
self.name_id_format = md.NameIDFormat()
@@ -549,17 +578,18 @@ class TestNameIDFormat:
self.name_id_format.text = saml.NAMEID_FORMAT_EMAILADDRESS
new_name_id_format = md.name_id_format_from_string(
self.name_id_format.to_string())
assert new_name_id_format.text.strip() == saml.NAMEID_FORMAT_EMAILADDRESS
assert new_name_id_format.text.strip() == \
saml.NAMEID_FORMAT_EMAILADDRESS
def testUsingTestData(self):
"""Test for name_id_format_from_string() using test data."""
new_name_id_format = md.name_id_format_from_string(
md_data.TEST_NAME_ID_FORMAT)
assert new_name_id_format.text.strip() == saml.NAMEID_FORMAT_EMAILADDRESS
assert new_name_id_format.text.strip() == \
saml.NAMEID_FORMAT_EMAILADDRESS
class TestSingleSignOnService:
def setup_class(self):
self.endpoint = md.SingleSignOnService()
@@ -568,10 +598,12 @@ class TestSingleSignOnService:
self.endpoint.binding = saml2.BINDING_HTTP_POST
self.endpoint.location = "http://www.example.com/endpoint"
self.endpoint.response_location = "http://www.example.com/response"
new_endpoint = md.single_sign_on_service_from_string(self.endpoint.to_string())
new_endpoint = md.single_sign_on_service_from_string(
self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for SingelSignOn_service_from_string() using test data."""
@@ -579,10 +611,11 @@ class TestSingleSignOnService:
md_data.TEST_SINGLE_SIGN_ON_SERVICE)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestNameIDMappingService:
def setup_class(self):
self.endpoint = md.NameIDMappingService()
@@ -591,10 +624,12 @@ class TestNameIDMappingService:
self.endpoint.binding = saml2.BINDING_HTTP_POST
self.endpoint.location = "http://www.example.com/endpoint"
self.endpoint.response_location = "http://www.example.com/response"
new_endpoint = md.name_id_mapping_service_from_string(self.endpoint.to_string())
new_endpoint = md.name_id_mapping_service_from_string(
self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for name_id_mapping_service_from_string() using test data."""
@@ -602,10 +637,11 @@ class TestNameIDMappingService:
md_data.TEST_NAME_ID_MAPPING_SERVICE)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestAssertionIDRequestService:
def setup_class(self):
self.endpoint = md.AssertionIDRequestService()
@@ -618,18 +654,21 @@ class TestAssertionIDRequestService:
self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
def testUsingTestData(self):
"""Test for assertion_id_request_service_from_string() using test data."""
"""Test for assertion_id_request_service_from_string() using test
data."""
new_endpoint = md.assertion_id_request_service_from_string(
md_data.TEST_ASSERTION_ID_REQUEST_SERVICE)
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
assert new_endpoint.response_location == "http://www.example.com/response"
assert new_endpoint.response_location == \
"http://www.example.com/response"
class TestAttributeProfile:
def setup_class(self):
self.attribute_profile = md.AttributeProfile()
@@ -638,13 +677,15 @@ class TestAttributeProfile:
self.attribute_profile.text = saml.PROFILE_ATTRIBUTE_BASIC
new_attribute_profile = md.attribute_profile_from_string(
self.attribute_profile.to_string())
assert new_attribute_profile.text.strip() == saml.PROFILE_ATTRIBUTE_BASIC
assert new_attribute_profile.text.strip() == \
saml.PROFILE_ATTRIBUTE_BASIC
def testUsingTestData(self):
"""Test for name_id_format_from_string() using test data."""
new_attribute_profile = md.attribute_profile_from_string(
md_data.TEST_ATTRIBUTE_PROFILE)
assert new_attribute_profile.text.strip() == saml.PROFILE_ATTRIBUTE_BASIC
assert new_attribute_profile.text.strip() == \
saml.PROFILE_ATTRIBUTE_BASIC
class TestIDPSSODescriptor:
@@ -661,7 +702,8 @@ class TestIDPSSODescriptor:
self.idp_sso_descriptor.error_url = "http://www.example.com/errorURL"
self.idp_sso_descriptor.signature = ds.Signature()
self.idp_sso_descriptor.extensions = md.Extensions()
self.idp_sso_descriptor.key_descriptor.append(md.key_descriptor_from_string(
self.idp_sso_descriptor.key_descriptor.append(
md.key_descriptor_from_string(
md_data.TEST_KEY_DESCRIPTOR))
self.idp_sso_descriptor.organization = md.Organization()
self.idp_sso_descriptor.contact_person.append(md.ContactPerson())
@@ -689,8 +731,10 @@ class TestIDPSSODescriptor:
assert new_idp_sso_descriptor.id == "ID"
assert new_idp_sso_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_idp_sso_descriptor.cache_duration == "10:00:00:00"
assert new_idp_sso_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_idp_sso_descriptor.error_url == "http://www.example.com/errorURL"
assert new_idp_sso_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_idp_sso_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_idp_sso_descriptor.signature, ds.Signature)
assert isinstance(new_idp_sso_descriptor.extensions, md.Extensions)
assert isinstance(new_idp_sso_descriptor.key_descriptor[0],
@@ -728,8 +772,10 @@ class TestIDPSSODescriptor:
assert new_idp_sso_descriptor.id == "ID"
assert new_idp_sso_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_idp_sso_descriptor.cache_duration == "10:00:00:00"
assert new_idp_sso_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_idp_sso_descriptor.error_url == "http://www.example.com/errorURL"
assert new_idp_sso_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_idp_sso_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_idp_sso_descriptor.signature, ds.Signature)
assert isinstance(new_idp_sso_descriptor.extensions, md.Extensions)
assert isinstance(new_idp_sso_descriptor.key_descriptor[0],
@@ -778,8 +824,8 @@ class TestIDPSSODescriptor:
assert inst.text == "example.org"
assert inst.regexp == "false"
class TestAssertionConsumerService:
class TestAssertionConsumerService:
def setup_class(self):
self.i_e = md.AssertionConsumerService()
@@ -790,7 +836,8 @@ class TestAssertionConsumerService:
self.i_e.response_location = "http://www.example.com/response"
self.i_e.index = "1"
self.i_e.is_default = "false"
new_i_e = md.assertion_consumer_service_from_string(self.i_e.to_string())
new_i_e = md.assertion_consumer_service_from_string(
self.i_e.to_string())
assert new_i_e.binding == saml2.BINDING_HTTP_POST
assert new_i_e.location == "http://www.example.com/endpoint"
assert new_i_e.response_location == "http://www.example.com/response"
@@ -809,7 +856,6 @@ class TestAssertionConsumerService:
class TestRequestedAttribute:
def setup_class(self):
self.requested_attribute = md.RequestedAttribute()
@@ -835,7 +881,6 @@ class TestRequestedAttribute:
class TestServiceName:
def setup_class(self):
self.service_name = md.ServiceName()
@@ -843,19 +888,20 @@ class TestServiceName:
"""Test for ServiceName accessors"""
self.service_name.lang = "en"
self.service_name.text = "SIOS mail"
new_service_name = md.service_name_from_string(self.service_name.to_string())
new_service_name = md.service_name_from_string(
self.service_name.to_string())
assert new_service_name.lang == "en"
assert new_service_name.text.strip() == "SIOS mail"
def testUsingTestData(self):
"""Test for organization_name_from_string() using test data."""
new_service_name = md.service_name_from_string(md_data.TEST_SERVICE_NAME)
new_service_name = md.service_name_from_string(
md_data.TEST_SERVICE_NAME)
assert new_service_name.lang == "en"
assert new_service_name.text.strip() == "Catalogix Whois"
class TestServiceDescription:
def setup_class(self):
self.service_description = md.ServiceDescription()
@@ -877,7 +923,6 @@ class TestServiceDescription:
class TestAttributeConsumingService:
def setup_class(self):
self.attribute_consuming_service = md.AttributeConsumingService()
@@ -891,7 +936,8 @@ class TestAttributeConsumingService:
self.attribute_consuming_service.index = "1"
self.attribute_consuming_service.is_default = "true"
new_attribute_consuming_service = md.attribute_consuming_service_from_string(
new_attribute_consuming_service = \
md.attribute_consuming_service_from_string(
self.attribute_consuming_service.to_string())
assert new_attribute_consuming_service.index == "1"
assert new_attribute_consuming_service.is_default == "true"
@@ -905,8 +951,10 @@ class TestAttributeConsumingService:
md.RequestedAttribute)
def testUsingTestData(self):
"""Test for attribute_consuming_service_from_string() using test data."""
new_attribute_consuming_service = md.attribute_consuming_service_from_string(
"""Test for attribute_consuming_service_from_string() using test
data."""
new_attribute_consuming_service = \
md.attribute_consuming_service_from_string(
md_data.TEST_ATTRIBUTE_CONSUMING_SERVICE)
assert new_attribute_consuming_service.index == "1"
assert new_attribute_consuming_service.is_default == "true"
@@ -934,7 +982,8 @@ class TestSPSSODescriptor:
self.sp_sso_descriptor.error_url = "http://www.example.com/errorURL"
self.sp_sso_descriptor.signature = ds.Signature()
self.sp_sso_descriptor.extensions = md.Extensions()
self.sp_sso_descriptor.key_descriptor.append(md.key_descriptor_from_string(
self.sp_sso_descriptor.key_descriptor.append(
md.key_descriptor_from_string(
md_data.TEST_KEY_DESCRIPTOR))
self.sp_sso_descriptor.organization = md.Organization()
self.sp_sso_descriptor.contact_person.append(md.ContactPerson())
@@ -960,8 +1009,10 @@ class TestSPSSODescriptor:
assert new_sp_sso_descriptor.id == "ID"
assert new_sp_sso_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_sp_sso_descriptor.cache_duration == "10:00:00:00"
assert new_sp_sso_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_sp_sso_descriptor.error_url == "http://www.example.com/errorURL"
assert new_sp_sso_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_sp_sso_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_sp_sso_descriptor.signature, ds.Signature)
assert isinstance(new_sp_sso_descriptor.extensions, md.Extensions)
assert isinstance(new_sp_sso_descriptor.key_descriptor[0],
@@ -995,15 +1046,18 @@ class TestSPSSODescriptor:
assert new_sp_sso_descriptor.id == "ID"
assert new_sp_sso_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_sp_sso_descriptor.cache_duration == "10:00:00:00"
assert new_sp_sso_descriptor.protocol_support_enumeration == samlp.NAMESPACE
assert new_sp_sso_descriptor.error_url == "http://www.example.com/errorURL"
assert new_sp_sso_descriptor.protocol_support_enumeration == \
samlp.NAMESPACE
assert new_sp_sso_descriptor.error_url == \
"http://www.example.com/errorURL"
assert isinstance(new_sp_sso_descriptor.signature, ds.Signature)
assert isinstance(new_sp_sso_descriptor.extensions, md.Extensions)
print(new_sp_sso_descriptor.extensions.__dict__)
assert len(new_sp_sso_descriptor.extensions.extension_elements) == 2
for eelem in new_sp_sso_descriptor.extensions.extension_elements:
print("EE", eelem.__dict__)
dp = extension_element_to_element(eelem, idpdisc.ELEMENT_FROM_STRING,
dp = extension_element_to_element(eelem,
idpdisc.ELEMENT_FROM_STRING,
idpdisc.NAMESPACE)
print("DP", dp.c_tag, dp.c_namespace, dp.__dict__)
assert isinstance(dp, idpdisc.DiscoveryResponse)
@@ -1145,3 +1199,7 @@ class TestEntitiesDescriptor:
md.EntitiesDescriptor)
if __name__ == "__main__":
c = TestIDPSSODescriptor()
c.setup_class()
c.testAccessors()

View File

@@ -3,6 +3,7 @@
import datetime
import re
from six.moves.urllib.parse import quote_plus
from saml2.config import Config
from saml2.httpbase import HTTPBase
from saml2.mdstore import MetadataStore, MetaDataMDX
@@ -34,6 +35,58 @@ from pathutils import full_path
sec_config = config.Config()
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
TEST_METADATA_STRING = """
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:shibmeta="urn:mace:shibboleth:metadata:1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
Name="urn:mace:example.com:test-1.0">
<EntityDescriptor
entityID="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
xml:base="swamid-1.0/idp.umu.se-saml2.xml">
<IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<KeyDescriptor>
<ds:KeyInfo>
<ds:X509Data>
<ds:X509Certificate>
MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
<NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
<SingleSignOnService
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
Location="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"/>
</IDPSSODescriptor>
<Organization>
<OrganizationName xml:lang="en">Catalogix</OrganizationName>
<OrganizationDisplayName xml:lang="en">Catalogix</OrganizationDisplayName>
<OrganizationURL xml:lang="en">http://www.catalogix.se</OrganizationURL>
</Organization>
<ContactPerson contactType="technical">
<SurName>Hedberg</SurName>
<EmailAddress>datordrift@catalogix.se</EmailAddress>
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>
"""
ONTS = {
saml.NAMESPACE: saml,
mdui.NAMESPACE: mdui,
@@ -68,7 +121,8 @@ METADATACONF = {
# {
# "class": "saml2.mdstore.MetaDataExtern",
# "metadata": [
# ("https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2",
# ("https://kalmar2.org/simplesaml/module.php/aggregator/?id
# =kalmarcentral2&set=saml2",
# full_path("kalmar2.pem")), ],
# }],
"4": [{
@@ -92,8 +146,11 @@ METADATACONF = {
"metadata": [
("http://md.incommon.org/InCommon/InCommon-metadata-export.xml",
full_path("inc-md-cert.pem"))]
}
]
}],
"11": [{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [(TEST_METADATA_STRING, )]
}],
}
@@ -295,5 +352,37 @@ def test_load_extern_incommon():
assert mds
assert len(mds.keys())
def test_load_local():
# string representation of XML idp definition
idp_metadata = open(full_path("metadata.xml")).read()
saml_config = Config()
config_dict = {
"metadata": {"inline": [idp_metadata]}
}
cfg = saml_config.load(config_dict)
assert cfg
def test_load_string():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
#print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
"idpsso", "signing")
assert len(certs) == 1
if __name__ == "__main__":
test_load_extern_incommon()
test_load_local()

View File

@@ -32,6 +32,58 @@ from pathutils import full_path
sec_config = config.Config()
#sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
TEST_METADATA_STRING = """
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:shibmeta="urn:mace:shibboleth:metadata:1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
Name="urn:mace:example.com:test-1.0">
<EntityDescriptor
entityID="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
xml:base="swamid-1.0/idp.umu.se-saml2.xml">
<IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<KeyDescriptor>
<ds:KeyInfo>
<ds:X509Data>
<ds:X509Certificate>
MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
<NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
<SingleSignOnService
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
Location="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"/>
</IDPSSODescriptor>
<Organization>
<OrganizationName xml:lang="en">Catalogix</OrganizationName>
<OrganizationDisplayName xml:lang="en">Catalogix</OrganizationDisplayName>
<OrganizationURL xml:lang="en">http://www.catalogix.se</OrganizationURL>
</Organization>
<ContactPerson contactType="technical">
<SurName>Hedberg</SurName>
<EmailAddress>datordrift@catalogix.se</EmailAddress>
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>
"""
ONTS = {
saml.NAMESPACE: saml,
mdui.NAMESPACE: mdui,
@@ -79,6 +131,9 @@ METADATACONF = {
"remote": [
{"url": "http://md.incommon.org/InCommon/InCommon-metadata-export.xml",
"cert": full_path("inc-md-cert.pem")}]
},
"11": {
"inline": [TEST_METADATA_STRING]
}
}
@@ -280,5 +335,23 @@ def test_load_external():
assert len(mds) == 1 # One source
assert len(mds.keys()) > 1 # number of idps
def test_load_string():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
"idpsso", "signing")
assert len(certs) == 1
if __name__ == "__main__":
test_load_external()

51
tests/test_39_metadata.py Normal file
View File

@@ -0,0 +1,51 @@
import copy
from saml2.config import SPConfig
from saml2.metadata import entity_descriptor
from saml2.saml import NAME_FORMAT_URI, NAME_FORMAT_BASIC
__author__ = 'roland'
sp_conf = {
"entityid": "urn:mace:umu.se:saml:roland:sp",
"name": "Rolands SP",
"service": {
"sp": {
"endpoints": {
"assertion_consumer_service": [
"http://lingon.catalogix.se:8087/"],
},
"required_attributes": ["surName", "givenName", "mail"],
"optional_attributes": ["title"],
"idp": {
"": "https://example.com/saml2/idp/SSOService.php",
},
"authn_requests_signed": True,
"logout_requests_signed": True,
}
},
}
def test_requested_attribute_name_format():
cnf = SPConfig().load(sp_conf, metadata_construction=True)
ed = entity_descriptor(cnf)
assert len(ed.spsso_descriptor.attribute_consuming_service) == 1
acs = ed.spsso_descriptor.attribute_consuming_service[0]
assert len(acs.requested_attribute) == 4
for req_attr in acs.requested_attribute:
assert req_attr.name_format == NAME_FORMAT_URI
sp2 = copy.copy(sp_conf)
sp2["service"]["sp"]["requested_attribute_name_format"] = NAME_FORMAT_BASIC
cnf2 = SPConfig().load(sp2, metadata_construction=True)
ed = entity_descriptor(cnf2)
acs = ed.spsso_descriptor.attribute_consuming_service[0]
assert len(acs.requested_attribute) == 4
for req_attr in acs.requested_attribute:
assert req_attr.name_format == NAME_FORMAT_BASIC
if __name__ == '__main__':
test_requested_attribute_name_format()

View File

@@ -1,17 +1,20 @@
#!/usr/bin/env python
import base64
from saml2.sigver import pre_encryption_part, make_temp, XmlsecError, \
SigverError
from saml2.mdstore import MetadataStore
from saml2.saml import assertion_from_string, EncryptedAssertion
from saml2.samlp import response_from_string
from saml2 import sigver, extension_elements_to_elements
from saml2 import sigver
from saml2 import extension_elements_to_elements
from saml2 import class_name
from saml2 import time_util
from saml2 import saml, samlp
from saml2 import config
from saml2.sigver import pre_encryption_part
from saml2.sigver import make_temp
from saml2.sigver import XmlsecError
from saml2.sigver import SigverError
from saml2.mdstore import MetadataStore
from saml2.saml import assertion_from_string
from saml2.saml import EncryptedAssertion
from saml2.samlp import response_from_string
from saml2.s_utils import factory, do_attribute_statement
from py.test import raises
@@ -510,6 +513,6 @@ def test_xmlsec_err():
if __name__ == "__main__":
t = TestSecurity()
t.setup_class()
t.test_verify_1()
t.test_sign_assertion()
#test_xmlsec_err()

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from contextlib import closing
from datetime import datetime
from dateutil import parser
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.server import Server
@@ -123,6 +125,9 @@ class TestAuthnResponse:
assert len(authn_info) == 1
assert authn_info[0][0] == INTERNETPROTOCOLPASSWORD
assert authn_info[0][1] == ["http://www.example.com/login"]
now = datetime.now()
dt = parser.parse(authn_info[0][2])
assert now.year == dt.year and now.month == dt.month and now.day == dt.day
session_info = self.ar.session_info()
assert session_info["authn_info"] == authn_info

View File

@@ -25,6 +25,7 @@ from saml2.response import LogoutResponse
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
from saml2.samlp import SessionIndex
from saml2.server import Server
from saml2.sigver import pre_encryption_part, make_temp, pre_encrypt_assertion
from saml2.sigver import rm_xmltag
@@ -319,6 +320,19 @@ class TestClient:
except Exception: # missing certificate
self.client.sec.verify_signature(ar_str, node_name=class_name(ar))
def test_create_logout_request(self):
req_id, req = self.client.create_logout_request(
"http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp",
name_id=nid, reason="Tired", expire=in_a_while(minutes=15),
session_indexes=["_foo"])
assert req.destination == "http://localhost:8088/slo"
assert req.reason == "Tired"
assert req.version == "2.0"
assert req.name_id == nid
assert req.issuer.text == "urn:mace:example.com:saml:roland:sp"
assert req.session_index == [SessionIndex("_foo")]
def test_response_1(self):
IDP = "urn:mace:example.com:saml:roland:idp"
@@ -359,6 +373,7 @@ class TestClient:
assert session_info["came_from"] == "http://foo.example.com/service"
response = samlp.response_from_string(authn_response.xmlstr)
assert response.destination == "http://lingon.catalogix.se:8087/"
assert "session_index" in session_info
# One person in the cache
assert len(self.client.users.subjects()) == 1
@@ -1179,6 +1194,78 @@ class TestClient:
BINDING_HTTP_REDIRECT)
print(res)
def test_do_logout_signed_redirect(self):
conf = config.SPConfig()
conf.load_file("sp_slo_redirect_conf")
client = Saml2Client(conf)
key = client.signkey
# information about the user from an IdP
session_info = {
"name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after": in_a_while(minutes=15),
"ava": {
"givenName": "Anders",
"surName": "Andersson",
"mail": "anders.andersson@example.com"
}
}
client.users.add_information_about_person(session_info)
entity_ids = client.users.issuers_of_info(nid)
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
resp = client.do_logout(nid, entity_ids, "Tired", in_a_while(minutes=5),
sign=True, expected_binding=BINDING_HTTP_REDIRECT)
assert list(resp.keys()) == entity_ids
binding, info = resp[entity_ids[0]]
assert binding == BINDING_HTTP_REDIRECT
loc = info["headers"][0][1]
_, _, _, _, qs, _ = urlparse(loc)
qs = parse_qs(qs)
assert _leq(qs.keys(),
['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
assert verify_redirect_signature(list_values2simpletons(qs),
sigkey=key)
res = self.server.parse_logout_request(qs["SAMLRequest"][0],
BINDING_HTTP_REDIRECT)
print(res)
def test_do_logout_post(self):
# information about the user from an IdP
session_info = {
"name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after": in_a_while(minutes=15),
"ava": {
"givenName": "Anders",
"surName": "Andersson",
"mail": "anders.andersson@example.com"
},
"session_index": SessionIndex("_foo")
}
self.client.users.add_information_about_person(session_info)
entity_ids = self.client.users.issuers_of_info(nid)
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
resp = self.client.do_logout(nid, entity_ids, "Tired",
in_a_while(minutes=5), sign=True,
expected_binding=BINDING_HTTP_POST)
assert resp
assert len(resp) == 1
assert list(resp.keys()) == entity_ids
binding, info = resp[entity_ids[0]]
assert binding == BINDING_HTTP_POST
_dic = unpack_form(info["data"][3])
res = self.server.parse_logout_request(_dic["SAMLRequest"],
BINDING_HTTP_POST)
assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr
# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"

View File

@@ -81,4 +81,4 @@ else:
valid_instance(eid)
xmldoc = metadata_tostring_fix(eid, nspair, xmldoc)
print(xmldoc)
print(xmldoc.decode("utf-8"))

View File

@@ -90,7 +90,7 @@ for line in open(args.conf).readlines():
metad = MetaDataExtern(ONTS.values(), ATTRCONV, spec[1],
sc, cert=spec[2], http=httpc, **kwargs)
if metad:
if metad is not None:
try:
metad.load()
except: