This commit is contained in:
Roland Hedberg 2015-12-11 14:32:12 +01:00
parent 3b6cad8653
commit 03537cc7e1
13 changed files with 0 additions and 4479 deletions

View File

@ -1,426 +0,0 @@
from __future__ import print_function
from importlib import import_module
import json
import os
import pprint
import types
import argparse
import sys
import six
import logging
import imp
from saml2 import xmldsig
from saml2 import xmlenc
from saml2.client import Saml2Client
from saml2.config import SPConfig
from saml2.mdstore import MetadataStore
from saml2.mdstore import MetaData
from saml2.mdstore import ToOld
from saml2test import CheckError
from saml2test import FatalError
from saml2test import exception_trace
from saml2test import ContextFilter
from idp_test.base import Conversation
from idp_test.check import CheckSaml2IntMetaData
# Schemas supported
from saml2 import md
from saml2 import saml
from saml2 import root_logger
from saml2.extension import mdui
from saml2.extension import idpdisc
from saml2.extension import dri
from saml2.extension import mdattr
from saml2.extension import ui
from saml2.metadata import entity_descriptor
from saml2.saml import NAME_FORMAT_UNSPECIFIED
SCHEMA = [dri, idpdisc, md, mdattr, mdui, saml, ui, xmldsig, xmlenc]
__author__ = 'rolandh'
logger = logging.getLogger("")
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
#formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s")
formatter_2 = logging.Formatter("%(delta).6f - %(levelname)s - [%(name)s] %(message)s")
cf = ContextFilter()
cf.start()
streamhandler = logging.StreamHandler(sys.stderr)
streamhandler.setFormatter(formatter_2)
memoryhandler = logging.handlers.MemoryHandler(1024*10, logging.DEBUG)
memoryhandler.addFilter(cf)
saml2testlog = logging.getLogger("saml2test")
saml2testlog.addHandler(memoryhandler)
saml2testlog.setLevel(logging.DEBUG)
def recursive_find_module(name, path=None):
parts = name.split(".")
mod_a = None
for part in parts:
try:
(fil, pathname, desc) = imp.find_module(part, path)
except ImportError:
raise
mod_a = imp.load_module(name, fil, pathname, desc)
sys.modules[name] = mod_a
path = mod_a.__path__
return mod_a
def get_mod(name, path=None):
try:
mod_a = sys.modules[name]
if not isinstance(mod_a, types.ModuleType):
raise KeyError
except KeyError:
try:
(fil, pathname, desc) = imp.find_module(name, path)
mod_a = imp.load_module(name, fil, pathname, desc)
except ImportError:
if "." in name:
mod_a = recursive_find_module(name, path)
else:
raise
sys.modules[name] = mod_a
return mod_a
class SAML2client(object):
def __init__(self, check_factory):
self.tests = None
self.check_factory = check_factory
self._parser = argparse.ArgumentParser()
self._parser.add_argument('-d', dest='debug', action='store_true',
help="Print debug information")
self._parser.add_argument('-L', dest='log', action='store_true',
help="Print log information")
self._parser.add_argument(
'-C', dest="cacerts",
help=("CA certs to use to verify HTTPS server certificates, ",
"if HTTPS is used and no server CA certs are defined then ",
"no cert verification will be done"))
self._parser.add_argument('-J', dest="json_config_file",
help="Test target configuration")
self._parser.add_argument('-m', dest="metadata", action='store_true',
help="Return the SP metadata")
self._parser.add_argument(
"-l", dest="list", action="store_true",
help="List all the test flows as a JSON object")
self._parser.add_argument(
"-c", dest="spconfig", default="config",
help=("Configuration module for the SP Test Driver at the current"
"directory or the path specified with the -P option. Do not"
"use relative paths or filename extension."))
self._parser.add_argument(
"-P", dest="path", default=".",
help="Path to the configuration stuff")
self._parser.add_argument("-t", dest="testpackage",
help="Module describing tests")
self._parser.add_argument("-O", dest="operations",
help="Tests")
self._parser.add_argument("-Y", dest="pysamllog", action='store_true',
help="Print PySAML2 logs")
self._parser.add_argument("-H", dest="pretty", action='store_true',
help="Output summary on stdout as pretty "
"printed python dict instead of JSON")
self._parser.add_argument("-i", dest="insecure", action='store_true',
help="Do not verify SSL certificate")
self._parser.add_argument("oper", nargs="?", help="Which test to run")
self.interactions = None
self.entity_id = None
self.sp_config = None
self.constraints = {}
self.operations = None
self.args = None
def json_config_file(self):
if self.args.json_config_file == "-":
return json.loads(sys.stdin.read())
else:
return json.loads(open(self.args.json_config_file).read())
def sp_configure(self, metadata_construction=False):
"""
Need to know where 4 different things are. The config, key_file and
cert_file files and the attributemaps directory
"""
# Always first look in the present working directory
sys.path.insert(0, self.args.path)
if self.args.path != ".":
sys.path.insert(0, ".")
mod = import_module(self.args.spconfig)
if self.args.path != ".":
for param in ["attribute_map_dir", "key_file", "cert_file"]:
if mod.CONFIG[param].startswith("/"): # Absolute path
continue
for _path in [".", self.args.path]:
_obj = os.path.join(_path, mod.CONFIG[param])
_obj = os.path.normpath(_obj)
if os.path.exists(_obj):
mod.CONFIG[param] = _obj
break
self.sp_config = SPConfig().load(mod.CONFIG, metadata_construction)
if not self.args.insecure:
self.sp_config.verify_ssl_cert = False
else:
if self.args.ca_certs:
self.sp_config.ca_certs = self.args.ca_certs
else:
self.sp_config.ca_certs = "../keys/cacert.pem"
def setup(self):
self.json_config = self.json_config_file()
_jc = self.json_config
try:
self.interactions = _jc["interaction"]
except KeyError:
self.interactions = []
self.sp_configure()
metadata = MetadataStore(SCHEMA, self.sp_config.attribute_converters,
self.sp_config)
info = _jc["metadata"].encode("utf-8")
md = MetaData(SCHEMA, self.sp_config.attribute_converters, info)
md.load()
metadata[0] = md
self.sp_config.metadata = metadata
if self.args.testpackage:
self.tests = import_module("idp_test.package.%s" %
self.args.testpackage)
try:
self.entity_id = _jc["entity_id"]
# Verify its the correct metadata
assert self.entity_id in md.entity.keys(), "metadata does not contain entityId %s" % self.entity_id
except KeyError:
if len(md.entity.keys()) == 1:
self.entity_id = md.entity.keys()[0]
else:
raise Exception("Don't know which entity to talk to")
if "constraints" in _jc:
self.constraints = _jc["constraints"]
if "name_format" not in self.constraints:
self.constraints["name_format"] = NAME_FORMAT_UNSPECIFIED
def test_summation(self, sid):
status = 0
for item in self.test_log:
if item["status"] > status:
status = item["status"]
if status == 0:
status = 1
info = {
"id": sid,
"status": status,
"tests": self.test_log
}
if status == 5:
info["url"] = self.test_log[-1]["url"]
info["htmlbody"] = self.test_log[-1]["message"]
return info
def output_log(self, memhndlr, hndlr2):
"""
"""
print(80 * ":", file=sys.stderr)
hndlr2.setFormatter(formatter_2)
memhndlr.setTarget(hndlr2)
memhndlr.flush()
memhndlr.close()
# streamhandler.setFormatter(formatter_2)
# pys_memoryhandler.setTarget(streamhandler)
# pys_memoryhandler.flush()
# pys_memoryhandler.close()
def run(self):
self.args = self._parser.parse_args()
if self.args.pysamllog:
root_logger.addHandler(memoryhandler)
root_logger.setLevel(logging.DEBUG)
if self.args.operations:
path, name = os.path.split(self.args.operations)
self.operations = get_mod(name, [path])
else:
self.operations = __import__("idp_test.saml2base",
fromlist=["idp_test"])
if self.args.metadata:
return self.make_meta()
elif self.args.list:
return self.list_operations()
elif self.args.oper == "check":
return self.verify_metadata()
else:
if not self.args.oper:
raise Exception("Missing test case specification")
self.args.oper = self.args.oper.strip("'")
self.args.oper = self.args.oper.strip('"')
try:
self.setup()
except (AttributeError, ToOld) as err:
print("Configuration Error: %s" % err, file=sys.stderr)
self.client = Saml2Client(self.sp_config)
conv = None
if self.args.pretty:
pp = pprint.PrettyPrinter(indent=4)
else:
pp = None
try:
try:
oper = self.operations.OPERATIONS[self.args.oper]
except KeyError:
if self.tests:
try:
oper = self.tests.OPERATIONS[self.args.oper]
except ValueError:
logger.error("Undefined testcase")
return
else:
logger.error("Undefined testcase")
return
logger.info("Starting conversation")
conv = Conversation(self.client, self.sp_config, self.interactions,
check_factory=self.check_factory,
entity_id=self.entity_id,
constraints=self.constraints)
conv.do_sequence(oper)
#testres, trace = do_sequence(oper,
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
err = None
except CheckError as err:
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
except FatalError as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
except Exception as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
if pp:
pp.pprint(tsum)
else:
print(json.dumps(tsum), file=sys.stdout)
if tsum["status"] > 1 or self.args.debug or err:
self.output_log(memoryhandler, streamhandler)
def list_operations(self):
lista = []
for key, val in self.operations.OPERATIONS.items():
item = {"id": key, "name": val["name"]}
try:
_desc = val["descr"]
if isinstance(_desc, six.string_types):
item["descr"] = _desc
else:
item["descr"] = "\n".join(_desc)
except KeyError:
pass
for key in ["depend", "endpoints"]:
try:
item[key] = val[key]
except KeyError:
pass
lista.append(item)
if self.args.testpackage:
mod = import_module(self.args.testpackage, "idp_test")
for key, val in mod.OPERATIONS.items():
item = {"id": key, "name": val["name"]}
try:
_desc = val["descr"]
if isinstance(_desc, six.string_types):
item["descr"] = _desc
else:
item["descr"] = "\n".join(_desc)
except KeyError:
pass
for key in ["depends", "endpoints"]:
try:
item[key] = val[key]
except KeyError:
pass
lista.append(item)
print(json.dumps(lista))
def _get_operation(self, operation):
return self.operations.OPERATIONS[operation]
def make_meta(self):
self.sp_configure(True)
print(entity_descriptor(self.sp_config))
def list_conf_id(self):
sys.path.insert(0, ".")
mod = import_module("config")
_res = dict([(key, cnf["description"]) for key, cnf in
mod.CONFIG.items()])
print(json.dumps(_res))
def verify_metadata(self):
self.json_config = self.json_config_file()
self.sp_configure()
metadata = MetadataStore(SCHEMA, self.sp_config.attribute_converters,
self.sp_config.xmlsec_binary)
info = self.json_config["metadata"].encode("utf-8")
md = MetaData(SCHEMA, self.sp_config.attribute_converters, info)
md.load()
metadata[0] = md
env = {"metadata": metadata}
chk = CheckSaml2IntMetaData()
output = []
res = chk(env, output)
print(res, file=sys.stdout)

View File

@ -1,259 +0,0 @@
#!/usr/bin/env python
import inspect
import logging
import urllib
import cookielib
from saml2 import BINDING_HTTP_REDIRECT, BINDING_URI
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
from saml2.mdstore import REQ2SRV
from saml2.pack import http_redirect_message, http_form_post_message
from saml2.s_utils import rndstr
from saml2test import tool
from saml2test import CheckError, FatalError
from saml2test.interaction import InteractionNeeded
try:
from xml.etree import cElementTree as ElementTree
if ElementTree.VERSION < '1.3.0':
# cElementTree has no support for register_namespace
# neither _namespace_map, thus we sacrify performance
# for correctness
from xml.etree import ElementTree
except ImportError:
import cElementTree as ElementTree
__author__ = 'rohe0002'
logger = logging.getLogger(__name__)
class HTTPError(Exception):
pass
def unpack_form(_str, ver="SAMLRequest"):
SR_STR = "name=\"%s\" value=\"" % ver
RS_STR = 'name="RelayState" value="'
i = _str.find(SR_STR)
i += len(SR_STR)
j = _str.find('"', i)
sr = _str[i:j]
k = _str.find(RS_STR, j)
k += len(RS_STR)
l = _str.find('"', k)
rs = _str[k:l]
return {ver: sr, "RelayState": rs}
def form_post(_dict):
return urllib.urlencode(_dict)
def tuple_list2dict(tl):
return dict(tl)
class Conversation(tool.Conversation):
def __init__(self, client, config, interaction,
check_factory, entity_id, msg_factory=None,
features=None, verbose=False, constraints=None):
tool.Conversation.__init__(self, client, config,
interaction, check_factory, msg_factory,
features, verbose)
self.entity_id = entity_id
self.cjar = {"browser": cookielib.CookieJar(),
"rp": cookielib.CookieJar(),
"service": cookielib.CookieJar()}
self.args = {}
self.qargs = {}
self.response_args = {}
self.saml_response = []
self.destination = ""
self.request = None
self.position = ""
self.response = None
self.oper = None
self.msg_constraints = constraints
def send(self):
srvs = getattr(self.client.metadata, REQ2SRV[self.oper.request])(
self.args["entity_id"], self.args["request_binding"], "idpsso")
response = None
for srv in srvs:
response = self._send(srv)
if response is not None:
break
return response
def _send(self, srv):
_client = self.client
loc = srv["location"]
self.qargs["destination"] = loc
self.response_args = {}
use_artifact = getattr(self.oper, "use_artifact", False)
try:
req = self.oper.args["message"]
except KeyError:
req = self.qfunc(**self.qargs)
req_id, self.request = self.oper.pre_processing(req, self.args)
str_req = "%s" % self.request
if use_artifact:
saml_art = _client.use_artifact(str_req, self.args["entity_id"])
logger.info("SAML Artifact: %s", saml_art)
info_typ = "SAMLart"
else:
logger.info("SAML Request: %s", str_req)
info_typ = "SAMLRequest"
# depending on binding send the query
if self.args["request_binding"] is BINDING_SOAP:
res = _client.send_using_soap(str_req, loc)
if res.status_code >= 400:
logger.info("Received a HTTP error (%d) '%s'",
res.status_code, res.text)
raise HTTPError(res.text)
else:
self.response_args["binding"] = BINDING_SOAP
else:
self.response_args["binding"] = self.args["response_binding"]
if self.args["request_binding"] is BINDING_HTTP_REDIRECT:
htargs = http_redirect_message(str_req, loc, self.relay_state,
info_typ)
self.response_args["outstanding"] = {self.request.id: "/"}
#
res = _client.send(htargs["headers"][0][1], "GET")
elif self.args["request_binding"] is BINDING_HTTP_POST:
htargs = http_form_post_message(str_req, loc, self.relay_state,
info_typ)
info = unpack_form(htargs["data"][3])
data = form_post(info)
self.response_args["outstanding"] = {self.request.id: "/"}
htargs["data"] = data
htargs["headers"] = [("Content-type",
'application/x-www-form-urlencoded')]
res = _client.send(loc, "POST", **htargs)
elif self.args["request_binding"] == BINDING_URI:
self.response_args["binding"] = BINDING_URI
htargs = _client.use_http_uri(str_req, "SAMLRequest", loc)
res = _client.send(htargs["url"], "GET")
else:
res = None
if res is not None and res.status_code >= 400:
logger.info("Received a HTTP error (%d) '%s'",
res.status_code, res.text)
raise HTTPError(res.text)
self.last_response = res
try:
self.last_content = res.text
except AttributeError:
self.last_content = None
return res
def init(self, phase):
self.phase = phase
_oper = phase(self)
_oper.setup()
self.args = _oper.args
#self.oper.args = _oper.args.copy()
self.args["entity_id"] = self.entity_id
self.oper = _oper
self.client.cookiejar = self.cjar["browser"]
try:
self.test_sequence(self.oper.tests["pre"])
except KeyError:
pass
def setup_request(self):
query = self.oper.request
_client = self.client
_oper = self.oper
self.response_func = getattr(_client, "parse_%s_response" % query)
qargs = self.args.copy()
self.relay_state = rndstr()
if "message" not in _oper.args:
self.qfunc = getattr(_client, "create_%s" % query)
# remove args the create function can't handle
fargs = inspect.getargspec(self.qfunc).args
if _oper._class:
fargs.extend([p for p, c, r in
_oper._class.c_attributes.values()])
fargs.extend([p for p, c in _oper._class.c_children.values()])
for arg in qargs.keys():
if arg not in fargs:
del qargs[arg]
self.qargs = qargs
def my_endpoints(self):
return [e for e, b in self.client.config.getattr("endpoints", "sp")[
"assertion_consumer_service"]]
def handle_result(self):
try:
if self.last_response.status_code in [302, 303]:
return False
except AttributeError:
pass
_resp = None
try:
response = self.oper.post_processing(self.last_content)
if isinstance(response, dict):
try:
assert self.relay_state == response["RelayState"]
except AssertionError:
assert self.relay_state == response["RelayState"][0]
except KeyError:
pass
if "SAMLResponse" in response:
response = response["SAMLResponse"]
elif "SAMLart" in response:
response = self.client.artifact2message(
response["SAMLart"][0], "idpsso")
_resp = self.response_func(response, **self.response_args)
if not _resp:
return False
self.saml_response.append(_resp)
try:
self.test_sequence(self.oper.tests["post"])
except KeyError:
pass
logger.info("SAML Response: %s", _resp)
except FatalError as ferr:
if _resp:
logger.info("Faulty response: %s", _resp)
logger.error("Exception %s", ferr)
raise
except ElementTree.ParseError:
return False
except CheckError:
raise
except Exception as err:
if _resp:
logger.info("Faulty response: %s", _resp)
logger.error("Exception %s", err)
self.err_check("exception", err)
return True

View File

@ -1,707 +0,0 @@
import inspect
import logging
import sys
from time import mktime
from saml2.response import AttributeResponse
from saml2test import check
from saml2test.check import Check
from saml2.mdstore import REQ2SRV
from saml2.s_utils import UnknownPrincipal
from saml2.s_utils import UnsupportedBinding
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_UNSPECIFIED
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NAME_FORMAT_UNSPECIFIED
from saml2.saml import NAME_FORMAT_URI
from saml2.samlp import STATUS_SUCCESS
from saml2.samlp import Response
from saml2.sigver import cert_from_key_info_dict
from saml2.sigver import key_from_key_value_dict
from saml2.time_util import str_to_time
__author__ = 'rolandh'
INFORMATION = 0
OK = 1
WARNING = 2
ERROR = 3
CRITICAL = 4
INTERACTION = 5
STATUSCODE = ["INFORMATION", "OK", "WARNING", "ERROR", "CRITICAL",
"INTERACTION"]
PREFIX = "-----BEGIN CERTIFICATE-----"
POSTFIX = "-----END CERTIFICATE-----"
M2_TIME_FORMAT = "%b %d %H:%M:%S %Y"
logger = logging.getLogger(__name__)
def to_time(_time):
assert _time.endswith(" GMT")
_time = _time[:-4]
return mktime(str_to_time(_time, M2_TIME_FORMAT))
class CheckSaml2IntMetaData(Check):
"""
Checks that the Metadata follows the Saml2Int profile
"""
cid = "check-saml2int-metadata"
msg = "Metadata error"
def _func(self, conv):
mds = conv.client.metadata.metadata[0]
# Should only be one
ed = mds.entity.values()[0]
res = {}
assert len(ed["idpsso_descriptor"])
idpsso = ed["idpsso_descriptor"][0]
# contact person
if "contact_person" not in idpsso and "contact_person" not in ed:
self._message = "Metadata should contain contact person information"
self._status = WARNING
return res
else:
item = {"support": False, "technical": False}
if "contact_person" in idpsso:
for contact in idpsso["contact_person"]:
try:
item[contact["contact_type"]] = True
except KeyError:
pass
if "contact_person" in ed:
for contact in ed["contact_person"]:
try:
item[contact["contact_type"]] = True
except KeyError:
pass
if "support" in item and "technical" in item:
pass
elif "support" not in item and "technical" not in item:
self._message = \
"Missing technical and support contact information"
self._status = WARNING
elif "technical" not in item:
self._message = "Missing technical contact information"
self._status = WARNING
elif "support" not in item:
self._message = "Missing support contact information"
self._status = WARNING
if self._message:
return res
# NameID format
if "name_id_format" not in idpsso:
self._message = "Metadata should specify NameID format support"
self._status = WARNING
return res
else:
# should support Transient
item = {NAMEID_FORMAT_TRANSIENT: False}
for nformat in idpsso["name_id_format"]:
try:
item[nformat["text"]] = True
except KeyError:
pass
if not item[NAMEID_FORMAT_TRANSIENT]:
self._message = "IdP should support Transient NameID Format"
self._status = WARNING
return res
return res
class CheckATMetaData(CheckSaml2IntMetaData):
cid = "check-at-metadata"
def verify_key_info(self, ki):
# key_info
# one or more key_value and/or x509_data.X509Certificate
verified_x509_keys = []
xkeys = cert_from_key_info_dict(ki)
vkeys = key_from_key_value_dict(ki)
if xkeys or vkeys:
if xkeys:
# time validity is checked in cert_from_key_info_dict
verified_x509_keys.append(xkeys)
if vkeys: # don't expect this to happen
pass
if not verified_x509_keys:
if cert_from_key_info_dict(ki, ignore_age=True):
self._message = "Keys too old"
else:
self._message = "Missing KeyValue or X509Data.X509Certificate"
self._status = CRITICAL
return False
if xkeys and vkeys:
# verify that it's the same keys TODO
pass
return True
def verify_key_descriptor(self, kd):
# key_info
if not self.verify_key_info(kd["key_info"]):
return False
# use
if "use" in kd:
try:
assert kd["use"] in ["encryption", "signing"]
except AssertionError:
self._message = "Unknown use specification: '%s'" % kd.use.text
self._status = CRITICAL
return False
return True
def _func(self, conv):
mds = conv.client.metadata.metadata[0]
# Should only be one
ed = mds.entity.values()[0]
res = {}
assert len(ed["idpsso_descriptor"])
idpsso = ed["idpsso_descriptor"][0]
for kd in idpsso["key_descriptor"]:
if not self.verify_key_descriptor(kd):
return res
return CheckSaml2IntMetaData._func(self, conv)
class CheckSaml2IntAttributes(Check):
"""
Any <saml2:Attribute> elements exchanged via any SAML 2.0 messages,
assertions, or metadata MUST contain a NameFormat of
urn:oasis:names:tc:SAML:2.0:attrname-format:uri.
"""
cid = "check-saml2int-attributes"
msg = "Attribute error"
def _func(self, conv):
response = conv.saml_response[-1]
try:
opaque_identifier = conv.opaque_identifier
except AttributeError:
opaque_identifier = False
try:
name_format_not_specified = conv.name_format_not_specified
except AttributeError:
name_format_not_specified = False
res = {}
# should be a list but isn't
#assert len(response.assertion) == 1
assertion = response.assertion
if isinstance(response, AttributeResponse):
pass
else:
assert len(assertion.authn_statement) == 1
assert len(assertion.attribute_statement) < 2
if assertion.attribute_statement:
atrstat = assertion.attribute_statement[0]
for attr in atrstat.attribute:
try:
assert attr.name_format == NAME_FORMAT_URI
except AssertionError:
self._message = "Attribute name format error"
self._status = CRITICAL
return res
try:
assert attr.name.startswith("urn:oid")
except AssertionError:
self._message = "Attribute name should be an OID"
self._status = CRITICAL
return res
assert not assertion.subject.encrypted_id
assert not assertion.subject.base_id
if opaque_identifier:
try:
assert assertion.subject.name_id.format == \
NAMEID_FORMAT_PERSISTENT
except AssertionError:
self._message = "NameID format should be PERSISTENT"
self._status = WARNING
if name_format_not_specified:
try:
assert assertion.subject.name_id.format == \
NAMEID_FORMAT_TRANSIENT
except AssertionError:
self._message = "NameID format should be TRANSIENT"
self._status = WARNING
return res
class CheckSubjectNameIDFormat(Check):
"""
The <NameIDPolicy> element tailors the name identifier in the subjects of
assertions resulting from an <AuthnRequest>.
When this element is used, if the content is not understood by or acceptable
to the identity provider, then a <Response> message element MUST be
returned with an error <Status>, and MAY contain a second-level
<StatusCode> of urn:oasis:names:tc:SAML:2.0:status:InvalidNameIDPolicy.
If the Format value is omitted or set to urn:oasis:names:tc:SAML:2.0:nameid-
format:unspecified, then the identity provider is free to return any kind
of identifier, subject to any additional constraints due to the content of
this element or the policies of the identity provider or principal.
"""
cid = "check-saml2int-nameid-format"
msg = "Attribute error"
def _func(self, conv):
response = conv.saml_response[-1].response
request = conv.request
res = {}
if request.name_id_policy:
nformat = request.name_id_policy.format
sp_name_qualifier = request.name_id_policy.sp_name_qualifier
subj = response.assertion.subject
try:
assert subj.name_id.format == nformat
if sp_name_qualifier:
assert subj.name_id.sp_name_qualifier == sp_name_qualifier
except AssertionError:
self._message = "The IdP returns wrong NameID format"
self._status = CRITICAL
return res
class CheckLogoutSupport(Check):
"""
Verifies that the tested entity supports single log out
"""
cid = "check-logout-support"
msg = "Does not support logout"
def _func(self, conv):
mds = conv.client.metadata.metadata[0]
# Should only be one
ed = mds.entity.values()[0]
assert len(ed["idpsso_descriptor"])
idpsso = ed["idpsso_descriptor"][0]
try:
assert idpsso["single_logout_service"]
except AssertionError:
self._message = self.msg
self._status = CRITICAL
return {}
class VerifyLogout(Check):
cid = "verify-logout"
msg = "Logout failed"
def _func(self, conv):
# Check that the logout response says it was a success
resp = conv.saml_response[-1]
status = resp.response.status
try:
assert status.status_code.value == STATUS_SUCCESS
except AssertionError:
self._message = self.msg
self._status = CRITICAL
# Check that there are no valid cookies
# should only result in a warning
httpc = conv.client
try:
assert httpc.cookies(conv.destination) == {}
except AssertionError:
self._message = "Remaining cookie ?"
self._status = WARNING
return {}
class VerifyContent(Check):
""" Basic content verification class, does required and max/min checks
"""
cid = "verify-content"
def _func(self, conv):
try:
conv.saml_response[-1].response.verify()
except ValueError:
self._status = CRITICAL
return {}
class VerifySuccessStatus(Check):
""" Verifies that the response was a success response """
cid = "verify-success-status"
def _func(self, conv):
response = conv.saml_response[-1].response
try:
assert response.status.status_code.value == STATUS_SUCCESS
except AssertionError:
self._message = self.msg
self._status = CRITICAL
return {}
class VerifyNameIDPolicyUsage(Check):
"""
Verify the nameID in the response is according to the provided
NameIDPolicy
"""
cid = "verify-name-id-policy-usage"
def _func(self, conv):
response = conv.saml_response[-1].response
nip = conv.oper.args["name_id_policy"]
for assertion in response.assertion:
nid = assertion.subject.name_id
if nip.format:
try:
assert nid.format == nip.format
except AssertionError:
self._message = "Wrong NameID Format"
self._status = WARNING
if nip.sp_name_qualifier:
try:
assert nid.sp_name_qualifier == nip.sp_name_qualifier
except AssertionError:
self._message = "Wrong SPNameQualifier"
self._status = WARNING
return {}
class VerifyNameIDMapping(Check):
"""
Verify that a new NameID is issued and that it follows the
given policy.
"""
cid = "verify-name-id-mapping"
def _func(self, conv):
response = conv.saml_response[-1].response
nip = conv.oper.args["name_id_policy"]
nid = response.name_id
if nip.format:
try:
assert nid.format == nip.format
except AssertionError:
self._message = "Wrong NameID Format"
self._status = WARNING
if nip.sp_name_qualifier:
try:
assert nid.sp_name_qualifier == nip.sp_name_qualifier
except AssertionError:
self._message = "Wrong SPNameQualifier"
self._status = WARNING
return {}
class VerifySPProvidedID(Check):
"""
Verify that the IdP allows the SP so set a SP provided ID
"""
cid = "verify-sp-provided-id"
def _func(self, conv):
response = conv.saml_response[-1].response
nip = conv.oper.args["new_id"]
nid = response.name_id
try:
assert nid.sp_provided_id == nip.new_id
except AssertionError:
self._message = "SP provided id not properly set"
self._status = WARNING
return {}
class VerifyFunctionality(Check):
"""
Verifies that the IdP supports the needed functionality
"""
def _nameid_format_support(self, conv, nameid_format):
md = conv.client.metadata
entity = md[conv.entity_id]
for idp in entity["idpsso_descriptor"]:
for nformat in idp["name_id_format"]:
if nameid_format == nformat["text"]:
return {}
self._message = "No support for NameIDFormat '%s'" % nameid_format
self._status = CRITICAL
return {}
def _srv_support(self, conv, service):
md = conv.client.metadata
entity = md[conv.entity_id]
for desc in ["idpsso_descriptor", "attribute_authority_descriptor",
"auth_authority_descriptor"]:
try:
srvgrps = entity[desc]
except KeyError:
pass
else:
for srvgrp in srvgrps:
if service in srvgrp:
return {}
self._message = "No support for '%s'" % service
self._status = CRITICAL
return {}
def _binding_support(self, conv, request, binding, typ):
service = REQ2SRV[request]
md = conv.client.metadata
entity_id = conv.entity_id
func = getattr(md, service, None)
try:
func(entity_id, binding, typ)
except UnknownPrincipal:
self._message = "Unknown principal: %s" % entity_id
self._status = CRITICAL
except UnsupportedBinding:
self._message = "Unsupported binding at the IdP: %s" % binding
self._status = CRITICAL
return {}
def _func(self, conv):
oper = conv.oper
args = conv.oper.args
res = self._srv_support(conv, REQ2SRV[oper.request])
if self._status != OK:
return res
res = self._binding_support(conv, oper.request, args["request_binding"],
"idpsso")
if self._status != OK:
return res
if "nameid_format" in args and args["nameid_format"]:
if args["nameid_format"] == NAMEID_FORMAT_UNSPECIFIED:
pass
else:
res = self._nameid_format_support(conv, args["nameid_format"])
if "name_id_policy" in args and args["name_id_policy"]:
if args["name_id_policy"].format == NAMEID_FORMAT_UNSPECIFIED:
pass
else:
res = self._nameid_format_support(conv,
args["name_id_policy"].format)
return res
class VerifyAttributeNameFormat(Check):
"""
Verify that the correct attribute name format is used.
"""
cid = "verify-attribute-name-format"
def _func(self, conv):
if "name_format" not in conv.msg_constraints:
return {}
# Should be a AuthnResponse or Response instance
response = conv.saml_response[-1]
assert isinstance(response.response, Response)
assertion = response.assertion
if assertion:
if assertion.attribute_statement:
atrstat = assertion.attribute_statement[0]
for attr in atrstat.attribute:
try:
assert attr.name_format == conv.msg_constraints[
"name_format"]
logger.debug("Attribute name format valid: %s",
attr.name_format)
except AssertionError:
if NAME_FORMAT_UNSPECIFIED != conv.msg_constraints[
"name_format"]:
self._message = \
"Wrong name format: '%s', should be %s" % \
(attr.name_format, \
conv.msg_constraints["name_format"])
self._status = CRITICAL
break
else:
logger.debug("Accepting any attribute name format")
return {}
class VerifyDigestAlgorithm(Check):
"""
verify that the used digest algorithm was one from the approved set.
"""
def _digest_algo(self, signature, allowed):
try:
assert signature.signed_info.reference[0].digest_method.algorithm in allowed
except AssertionError:
self._message = "signature digest algorithm not allowed: '%s'" % \
signature.signed_info.reference[0].digest_method.algorithm
self._status = CRITICAL
return False
return True
def _func(self, conv):
if "digest_algorithm" not in conv.msg_constraints:
logger.info("Not verifying digest_algorithm (not configured)")
return {}
else:
try:
assert len(conv.msg_constraints["digest_algorithm"]) > 0
except AssertionError:
self._message = "List of allowed digest algorithm must not be empty"
self._status = CRITICAL
return {}
_algs = conv.msg_constraints["digest_algorithm"]
response = conv.saml_response[-1].response
if response.signature:
if not self._digest_algo(response.signature, _algs):
return {}
for assertion in response.assertion:
if not self._digest_algo(assertion.signature, _algs):
return {}
return {}
class VerifySignatureAlgorithm(Check):
"""
verify that the used signature algorithm was one from an approved set.
"""
def _sig_algo(self, signature, allowed):
try:
assert signature.signed_info.signature_method.algorithm in allowed
except AssertionError:
self._message = "Wrong algorithm used for signing: '%s'" % \
signature.signed_info.signature_method.algorithm
self._status = CRITICAL
return False
return True
def _func(self, conv):
if "signature_algorithm" not in conv.msg_constraints:
logger.info("Not verifying signature_algorithm (not configured)")
return {}
else:
try:
assert len(conv.msg_constraints["signature_algorithm"]) > 0
except AssertionError:
self._message = "List of allowed signature algorithm must not be empty"
self._status = CRITICAL
return {}
_algs = conv.msg_constraints["signature_algorithm"]
response = conv.saml_response[-1].response
if response.signature:
if not self._sig_algo(response.signature, _algs):
return {}
for assertion in response.assertion:
if not self._sig_algo(assertion.signature, _algs):
return {}
return {}
class VerifySignedPart(Check):
"""
verify that the correct part was signed.
"""
def _func(self, conv):
if "signed_part" not in conv.msg_constraints:
return {}
response = conv.saml_response[-1].response
if "response" in conv.msg_constraints["signed_part"]:
if response.signature:
pass
else:
self._message = "Response not signed"
self._status = CRITICAL
if self._status == OK:
if "assertion" in conv.msg_constraints["signed_part"]:
for assertion in response.assertion:
if assertion.signature:
pass
else:
self._message = "Assertion not signed"
self._status = CRITICAL
break
return {}
class VerifyEndpoint(Check):
def _func(self, conv):
_ = conv.last_response
return {}
# =============================================================================
CLASS_CACHE = {}
def factory(cid, classes=CLASS_CACHE):
if len(classes) == 0:
check.factory(cid, classes)
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj):
try:
classes[obj.cid] = obj
except AttributeError:
pass
if cid in classes:
return classes[cid]
else:
return None

View File

@ -1,388 +0,0 @@
__author__ = 'rohe0002'
import json
import logging
import six
from urlparse import urlparse
from bs4 import BeautifulSoup
from mechanize import ParseResponseEx
from mechanize._form import ControlNotFoundError, AmbiguityError
from mechanize._form import ListControl
logger = logging.getLogger(__name__)
def pick_interaction(interactions, _base="", content="", req=None):
unic = content
if content:
_bs = BeautifulSoup(content)
else:
_bs = None
for interaction in interactions:
_match = 0
for attr, val in interaction["matches"].items():
if attr == "url":
if val == _base:
_match += 1
elif attr == "title":
if _bs is None:
break
if _bs.title is None:
break
if val in _bs.title.contents:
_match += 1
else:
_c = _bs.title.contents
if isinstance(_c, list) and not isinstance(
_c, six.string_types):
for _line in _c:
if val in _line:
_match += 1
continue
elif attr == "content":
if unic and val in unic:
_match += 1
elif attr == "class":
if req and val == req:
_match += 1
if _match == len(interaction["matches"]):
return interaction
raise KeyError("No interaction matched")
class FlowException(Exception):
def __init__(self, function="", content="", url=""):
Exception.__init__(self)
self.function = function
self.content = content
self.url = url
def __str__(self):
return json.dumps(self.__dict__)
class RResponse():
"""
A Response class that behaves in the way that mechanize expects it.
Links to a requests.Response
"""
def __init__(self, resp):
self._resp = resp
self.index = 0
self.text = resp.text
if isinstance(self.text, unicode):
if resp.encoding == "UTF-8":
self.text = self.text.encode("utf-8")
else:
self.text = self.text.encode("latin-1")
self._len = len(self.text)
self.url = str(resp.url)
self.statuscode = resp.status_code
def geturl(self):
return self._resp.url
def __getitem__(self, item):
try:
return getattr(self._resp, item)
except AttributeError:
return getattr(self._resp.headers, item)
def __getattribute__(self, item):
try:
return getattr(self._resp, item)
except AttributeError:
return getattr(self._resp.headers, item)
def read(self, size=0):
"""
Read from the content of the response. The class remembers what has
been read so it's possible to read small consecutive parts of the
content.
:param size: The number of bytes to read
:return: Somewhere between zero and 'size' number of bytes depending
on how much it left in the content buffer to read.
"""
if size:
if self._len < size:
return self.text
else:
if self._len == self.index:
part = None
elif self._len - self.index < size:
part = self.text[self.index:]
self.index = self._len
else:
part = self.text[self.index: self.index + size]
self.index += size
return part
else:
return self.text
def pick_form(response, url=None, **kwargs):
"""
Picks which form in a web-page that should be used
:param response: A HTTP request response. A DResponse instance
:param content: The HTTP response content
:param url: The url the request was sent to
:return: The picked form or None of no form matched the criteria.
"""
#_txt = response.text
forms = ParseResponseEx(response)
if not forms:
raise FlowException(content=response.text, url=url)
#if len(forms) == 1:
# return forms[0]
#else:
_form = None
# ignore the first form, because I use ParseResponseEx which adds
# one form at the top of the list
forms = forms[1:]
if len(forms) == 1:
_form = forms[0]
else:
if "pick" in kwargs:
_dict = kwargs["pick"]
for form in forms:
if _form:
break
for key, _ava in _dict.items():
if key == "form":
_keys = form.attrs.keys()
for attr, val in _ava.items():
if attr in _keys and val == form.attrs[attr]:
_form = form
elif key == "control":
prop = _ava["id"]
_default = _ava["value"]
try:
orig_val = form[prop]
if isinstance(orig_val, six.string_types):
if orig_val == _default:
_form = form
elif _default in orig_val:
_form = form
except KeyError:
pass
except ControlNotFoundError:
pass
elif key == "method":
if form.method == _ava:
_form = form
else:
_form = None
if not _form:
break
elif "index" in kwargs:
_form = forms[int(kwargs["index"])]
return _form
def do_click(httpc, form, **kwargs):
"""
Emulates the user clicking submit on a form.
:param httpc: The Client instance
:param form: The form that should be submitted
:return: What do_request() returns
"""
if "click" in kwargs:
request = None
_name = kwargs["click"]
try:
_ = form.find_control(name=_name)
request = form.click(name=_name)
except AmbiguityError:
# more than one control with that name
_val = kwargs["set"][_name]
_nr = 0
while True:
try:
cntrl = form.find_control(name=_name, nr=_nr)
if cntrl.value == _val:
request = form.click(name=_name, nr=_nr)
break
else:
_nr += 1
except ControlNotFoundError:
raise Exception("No submit control with the name='%s' and "
"value='%s' could be found" % (_name,
_val))
else:
request = form.click()
headers = {}
for key, val in request.unredirected_hdrs.items():
headers[key] = val
url = request._Request__original
if form.method == "POST":
return httpc.send(url, "POST", data=request.data, headers=headers)
else:
return httpc.send(url, "GET", headers=headers)
def select_form(httpc, orig_response, **kwargs):
"""
Pick a form on a web page, possibly enter some information and submit
the form.
:param httpc: A HTTP client instance
:param orig_response: The original response (as returned by requests)
:return: The response do_click() returns
"""
response = RResponse(orig_response)
try:
_url = response.url
except KeyError:
_url = kwargs["location"]
form = pick_form(response, _url, **kwargs)
#form.backwards_compatible = False
if not form:
raise Exception("Can't pick a form !!")
if "set" in kwargs:
for key, val in kwargs["set"].items():
if key.startswith("_"):
continue
if "click" in kwargs and kwargs["click"] == key:
continue
try:
form[key] = val
except ControlNotFoundError:
pass
except TypeError:
cntrl = form.find_control(key)
if isinstance(cntrl, ListControl):
form[key] = [val]
else:
raise
return do_click(httpc, form, **kwargs)
#noinspection PyUnusedLocal
def chose(httpc, orig_response, path, **kwargs):
"""
Sends a HTTP GET to a url given by the present url and the given
relative path.
:param orig_response: The original response
:param content: The content of the response
:param path: The relative path to add to the base URL
:return: The response do_click() returns
"""
if not path.startswith("http"):
try:
_url = orig_response.url
except KeyError:
_url = kwargs["location"]
part = urlparse(_url)
url = "%s://%s%s" % (part[0], part[1], path)
else:
url = path
return httpc.send(url, "GET")
#return resp, ""
def post_form(httpc, orig_response, **kwargs):
"""
The same as select_form but with no possibility of change the content
of the form.
:param httpc: A HTTP Client instance
:param orig_response: The original response (as returned by requests)
:param content: The content of the response
:return: The response do_click() returns
"""
response = RResponse(orig_response)
form = pick_form(response, **kwargs)
return do_click(httpc, form, **kwargs)
def NoneFunc():
return None
#noinspection PyUnusedLocal
def parse(httpc, orig_response, **kwargs):
# content is a form from which I get the SAMLResponse
response = RResponse(orig_response)
form = pick_form(response, **kwargs)
#form.backwards_compatible = False
if not form:
raise Exception("Can't pick a form !!")
return {"SAMLResponse": form["SAMLResponse"],
"RelayState": form["RelayState"]}
#noinspection PyUnusedLocal
def interaction(args):
_type = args["type"]
if _type == "form":
return select_form
elif _type == "link":
return chose
elif _type == "response":
return parse
else:
return NoneFunc
# ========================================================================
class Operation(object):
def __init__(self, args=None):
if args:
self.function = interaction(args)
self.args = args or {}
self.request = None
def update(self, dic):
self.args.update(dic)
#noinspection PyUnusedLocal
def post_op(self, result, conv, args):
pass
def __call__(self, httpc, conv, location, response, content,
features):
try:
_args = self.args.copy()
except (KeyError, AttributeError):
_args = {}
_args["location"] = location
_args["features"] = features
logger.info("--> FUNCTION: %s", self.function.__name__)
logger.info("--> ARGS: %s", _args)
result = self.function(httpc, response, **_args)
self.post_op(result, conv, _args)
return result

View File

@ -1 +0,0 @@
__author__ = 'rolandh'

View File

@ -1,49 +0,0 @@
from idp_test import CheckSaml2IntMetaData
from idp_test.check import CheckSaml2IntAttributes
from saml2 import SamlBase, ExtensionContainer
__author__ = 'rolandh'
from idp_test.saml2base import AuthnRequest
class DummyExtension(SamlBase):
"""The urn:mace:umu.se:SAML:2.0:extension:foo element """
c_tag = 'DummyExtension'
c_namespace = "urn:mace:umu.se:SAML:2.0:extension:foo"
c_value_type = {'base': 'NCName'}
c_children = SamlBase.c_children.copy()
c_attributes = SamlBase.c_attributes.copy()
c_child_order = SamlBase.c_child_order[:]
c_cardinality = SamlBase.c_cardinality.copy()
class AuthnRequest_UnknownIssuer(AuthnRequest):
def pre_processing(self, message, args):
_issuer = message.issuer
_issuer.text = "https://www.example.com/foobar.xml"
return message
class AuthnRequest_UnknownExtension(AuthnRequest):
def pre_processing(self, message, args):
message.extension = ExtensionContainer()
message.extension.add_extension_element(DummyExtension(text="foo"))
return message
OPERATIONS = {
'authn_unknown-issuer': {
"name": 'AuthnRequest with unknown issuer',
"descr": 'AuthnRequest with unknown issuer',
"sequence": [AuthnRequest_UnknownIssuer],
"depends": ['authn'],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": [CheckSaml2IntAttributes]}
},
'authn_unknown-extension': {
"name": 'AuthnRequest with unknown extension',
"descr": 'AuthnRequest with unknown extension',
"sequence": [AuthnRequest_UnknownExtension],
"depends": ['authn'],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": [CheckSaml2IntAttributes]}
},
}

View File

@ -1,554 +0,0 @@
from saml2 import samlp
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_PAOS
from saml2 import BINDING_SOAP
from saml2 import BINDING_URI
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_UNSPECIFIED
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NAMEID_FORMAT_EMAILADDRESS
from idp_test.check import CheckLogoutSupport
from idp_test.check import CheckSaml2IntAttributes
from idp_test.check import CheckSaml2IntMetaData
from idp_test.check import VerifyAttributeNameFormat
from idp_test.check import VerifyFunctionality
from idp_test.check import VerifyContent
from idp_test.check import VerifyNameIDMapping
from idp_test.check import VerifyNameIDPolicyUsage
from idp_test.check import VerifySuccessStatus
from idp_test.check import VerifyDigestAlgorithm
from idp_test.check import VerifySignatureAlgorithm
from idp_test.check import VerifySignedPart
from idp_test.check import VerifyEndpoint
from saml2.samlp import NameIDPolicy
__author__ = 'rolandh'
class Request(object):
_args = {}
_class = None
tests = {"post": [VerifyContent], "pre": []}
def __init__(self, conv):
self.args = self._args.copy()
self.conv = conv
def setup(self):
pass
def pre_processing(self, message, args):
return message
def post_processing(self, message):
return message
#class Saml2IntRequest(Request):
# tests = {"pre": [],
# "post": [CheckSaml2IntAttributes, VerifyContent
# # CheckSubjectNameIDFormat,
# ]}
class AuthnRequest(Request):
_class = samlp.AuthnRequest
request = "authn_request"
_args = {"response_binding": BINDING_HTTP_POST,
"request_binding": BINDING_HTTP_REDIRECT,
"nameid_format": NAMEID_FORMAT_PERSISTENT,
"allow_create": True}
tests = {"pre": [VerifyFunctionality],
"post": [CheckSaml2IntAttributes,
VerifyAttributeNameFormat,
VerifySignedPart,
VerifyDigestAlgorithm,
VerifySignatureAlgorithm]}
class AuthnRequestNID_Transient(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
url = ""
for url, binding in endps["assertion_consumer_service"]:
if binding == BINDING_HTTP_POST:
self.args["assertion_consumer_service_url"] = url
break
self.tests["post"].append((VerifyEndpoint, {"endpoint": url}))
class AuthnRequestNID_Email(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = NAMEID_FORMAT_EMAILADDRESS
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
url = ""
for url, binding in endps["assertion_consumer_service"]:
if binding == BINDING_HTTP_POST:
self.args["assertion_consumer_service_url"] = url
break
self.tests["post"].append((VerifyEndpoint, {"endpoint": url}))
class AuthnRequestNID_Unspecified(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = NAMEID_FORMAT_UNSPECIFIED
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
url = ""
for url, binding in endps["assertion_consumer_service"]:
if binding == BINDING_HTTP_POST:
self.args["assertion_consumer_service_url"] = url
break
self.tests["post"].append((VerifyEndpoint, {"endpoint": url}))
class AuthnRequestNID_no(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = ""
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
url = ""
for url, binding in endps["assertion_consumer_service"]:
if binding == BINDING_HTTP_POST:
self.args["assertion_consumer_service_url"] = url
break
self.tests["post"].append((VerifyEndpoint, {"endpoint": url}))
class AuthnRequestEndpointIndex(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["attribute_consuming_service_index"] = 3
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
acs3 = endps["assertion_consumer_service"][3]
self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]}))
class AuthnRequestEndpointIndexNIDTransient(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["attribute_consuming_service_index"] = 3
self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
acs3 = endps["assertion_consumer_service"][3]
self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]}))
class AuthnRequestSpecEndpoint(AuthnRequest):
def setup(self):
cnf = self.conv.client.config
endps = cnf.getattr("endpoints", "sp")
acs3 = endps["assertion_consumer_service"][3]
self.args["assertion_consumer_service_url"] = acs3[0]
self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]}))
class DynAuthnRequest(Request):
_class = samlp.AuthnRequest
request = "authn_request"
_args = {"response_binding": BINDING_HTTP_POST}
tests = {}
name_id_formats = [NAMEID_FORMAT_TRANSIENT, NAMEID_FORMAT_PERSISTENT]
bindings = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST]
def setup(self):
metadata = self.conv.client.metadata
entity = metadata[self.conv.entity_id]
self.args.update({"nameid_format": "", "request_binding": ""})
for idp in entity["idpsso_descriptor"]:
for nformat in self.name_id_formats:
if self.args["nameid_format"]:
break
for nif in idp["name_id_format"]:
if nif["text"] == nformat:
self.args["nameid_format"] = nformat
break
for bind in self.bindings:
if self.args["request_binding"]:
break
for sso in idp["single_sign_on_service"]:
if sso["binding"] == bind:
self.args["request_binding"] = bind
break
class AuthnRequestPost(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["request_binding"] = BINDING_HTTP_POST
class AuthnRequest_using_Artifact(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["response_binding"] = BINDING_HTTP_ARTIFACT
self.args["binding"] = BINDING_HTTP_ARTIFACT
class AuthnRequest_using_ArtifactNID_Transient(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT
self.args["response_binding"] = BINDING_HTTP_ARTIFACT
self.args["binding"] = BINDING_HTTP_ARTIFACT
class AuthnRequestPostNID_Transient(AuthnRequestPost):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT
class LogOutRequest(Request):
request = "logout_request"
_args = {"request_binding": BINDING_SOAP}
tests = {"pre": [VerifyFunctionality], "post": []}
def __init__(self, conv):
Request.__init__(self, conv)
self.tests["pre"].append(CheckLogoutSupport)
#self.tests["post"].append(VerifyLogout)
def setup(self):
resp = self.conv.saml_response[-1].response
assertion = resp.assertion[0]
subj = assertion.subject
self.args["name_id"] = subj.name_id
self.args["issuer_entity_id"] = assertion.issuer.text
class AssertionIDRequest(Request):
request = "assertion_id_request"
_args = {"request_binding": BINDING_URI,
"response_binding": None}
tests = {"pre": [VerifyFunctionality]}
def setup(self):
assertion = self.conv.saml_response[-1].assertion
self.args["assertion_id_refs"] = [assertion.id]
class AuthnQuery(Request):
request = "authn_query"
_args = {"request_binding": BINDING_SOAP}
tests = {"pre": [VerifyFunctionality], "post": []}
def __init__(self, conv):
Request.__init__(self, conv)
self.tests["post"].append(VerifySuccessStatus)
def setup(self):
assertion = self.conv.saml_response[-1].assertion
self.args["subject"] = assertion.subject
class NameIDMappingRequest(Request):
request = "name_id_mapping_request"
_args = {"request_binding": BINDING_SOAP,
"name_id_policy": NameIDPolicy(format=NAMEID_FORMAT_PERSISTENT,
sp_name_qualifier="GroupOn",
allow_create="true")}
def __init__(self, conv):
Request.__init__(self, conv)
self.tests["post"].append(VerifyNameIDMapping)
def setup(self):
assertion = self.conv.saml_response[-1].assertion
self.args["name_id"] = assertion.subject.name_id
class AuthnRequest_NameIDPolicy1(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["name_id_policy"] = NameIDPolicy(
format=NAMEID_FORMAT_PERSISTENT, sp_name_qualifier="Group1",
allow_create="true")
self.tests["post"].append(VerifyNameIDPolicyUsage)
class AuthnRequest_NameIDPolicy1Transient(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["name_id_policy"] = NameIDPolicy(
format=NAMEID_FORMAT_TRANSIENT, sp_name_qualifier="Group1",
allow_create="true")
self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT
self.tests["post"].append(VerifyNameIDPolicyUsage)
class AuthnRequest_TransientNameID(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["name_id_policy"] = NameIDPolicy(
format=NAMEID_FORMAT_TRANSIENT, sp_name_qualifier="Group",
allow_create="true")
self.tests["post"].append(VerifyNameIDPolicyUsage)
class ECP_AuthnRequest(AuthnRequest):
def __init__(self, conv):
AuthnRequest.__init__(self, conv)
self.args["request_binding"] = BINDING_SOAP
self.args["service_url_binding"] = BINDING_PAOS
def setup(self):
_client = self.conv.client
_client.user = "babs"
_client.passwd = "howes"
# def post_processing(self, message):
# # Unpacking SOAP message
# return parse_soap_enveloped_saml_response(message)
class ManageNameIDRequest(Request):
request = "manage_name_id_request"
_args = {"request_binding": BINDING_SOAP,
"new_id": samlp.NewID("New identifier")}
def __init__(self, conv):
Request.__init__(self, conv)
self.tests["post"].append(VerifySuccessStatus)
def setup(self):
assertion = self.conv.saml_response[-1].assertion
self.args["name_id"] = assertion.subject.name_id
class AttributeQuery(Request):
request = "attribute_query"
_args = {"request_binding": BINDING_SOAP}
tests = {"pre": [VerifyFunctionality],
"post": [CheckSaml2IntAttributes, VerifyAttributeNameFormat]}
def setup(self):
assertion = self.conv.saml_response[-1].assertion
self.args["name_id"] = assertion.subject.name_id
# -----------------------------------------------------------------------------
OPERATIONS = {
'verify': {
'tc_id': "S2c-16",
"name": 'Verify SAML connectivity',
"descr": 'Uses AuthnRequest to check connectivity',
"sequence": [DynAuthnRequest],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []}
},
'authn': {
"tc_id": "S2c-02",
"name": 'Absolute basic AuthnRequest',
"descr": 'AuthnRequest using HTTP-Redirect',
"sequence": [AuthnRequest],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["verify"]
},
'authn-nid_transient': {
"tc_id": "S2c-10",
"name": 'AuthnRequest, NameID-trans',
"descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, '
'transient name ID',
"sequence": [AuthnRequestNID_Transient],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn"]
},
'authn-nid_email': {
"tc_id": "S2c-20",
"name": 'AuthnRequest email nameID',
"descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, NameID-email'
'specified',
"sequence": [AuthnRequestNID_Email],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn"]
},
'authn-nid_no': {
"tc_id": "S2c-21",
"name": 'AuthnRequest no NameID format',
"descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, no NameID format '
'specified',
"sequence": [AuthnRequestNID_no],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn"]
},
'authn-nid_unspecified': {
"tc_id": "S2c-21",
"name": 'AuthnRequest using unspecified NameID format',
"descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, NameID-unspec',
"sequence": [AuthnRequestNID_Unspecified],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn"]
},
'authn-post': {
"tc_id": "S2c-08",
"name": 'Basic SAML2 AuthnRequest using HTTP POST',
"descr": 'AuthnRequest using HTTP-POST',
"sequence": [AuthnRequestPost],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn"]
},
'authn-post-transient': {
"tc_id": "S2c-09",
"name": 'AuthnRequest HTTP-POST, transient NameID fmt',
"descr": 'AuthnRequest using HTTP-POST expecting transient NameID',
"sequence": [AuthnRequestPostNID_Transient],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": []},
"depend":["authn-post"]
},
'attribute-query':{
"tc_id": "S2c-01",
"name": "Attribute query",
"sequence":[AuthnRequest, AttributeQuery],
"depend":["authn"]
},
'attribute-query-transient':{
"tc_id": "S2c-20",
"name": "Attribute query, NameID transient",
"sequence":[AuthnRequestNID_Transient, AttributeQuery],
"depend":["authn"]
},
'authn_endpoint_index': {
"tc_id": "S2c-03",
"name": 'AuthnRequest, endpoint index',
"descr": '',
"sequence": [AuthnRequestEndpointIndex],
"depend":["authn"]
},
'authn_endpoint_index-transient': {
"tc_id": "S2c-03",
"name": 'AuthnRequest, endpoint index, NameID-trans',
"descr": '',
"sequence": [AuthnRequestEndpointIndexNIDTransient],
"depend":["authn"]
},
'authn_specified_endpoint': {
"tc_id": "S2c-04",
"name": 'AuthnRequest, specified endpoint',
"descr": '',
"sequence": [AuthnRequestSpecEndpoint],
"depend":["authn"]
},
'authn-artifact':{
'tc_id': "S2c-05",
"name": "SAML2 AuthnRequest using an artifact",
"descr": ('AuthnRequest using HTTP-Redirect and artifact'),
"sequence": [AuthnRequest_using_Artifact]
},
'authn-artifact_nid-transient':{
'tc_id': "S2c-05",
"name": "SAML2 AuthnRequest expecting artifact response",
"descr": ('AuthnRequest using HTTP-Redirect and artifact'),
"sequence": [AuthnRequest_using_ArtifactNID_Transient]
},
'authn-assertion_id_request': {
"tc_id": "S2c-06",
"name": 'AuthnRequest then AssertionIDRequest',
"descr": 'AuthnRequest followed by an AssertionIDRequest',
"sequence": [AuthnRequest, AssertionIDRequest],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
'authn-nid_transient-assertion_id_request': {
"tc_id": "S2c-26",
"name": 'AuthnRequest then AssertionIDRequest, NameID-trans',
"descr": 'AuthnRequest followed by an AssertionIDRequest',
"sequence": [AuthnRequestNID_Transient, AssertionIDRequest],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
'authn-with-name_id_policy': {
"tc_id": "S2c-11",
"name": 'SAML2 AuthnRequest with specific NameIDPolicy',
"descr": 'AuthnRequest with specific NameIDPolicy',
"sequence": [AuthnRequest_NameIDPolicy1],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
'authn-with-name_id_policy_nid-transient': {
"tc_id": "S2c-31",
"name": 'AuthnRequest NameIDPolicy transient',
"descr": 'AuthnRequest with specific NameIDPolicy',
"sequence": [AuthnRequest_NameIDPolicy1Transient],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
'ecp_authn': {
'tc_id': "S2c-12",
"name": "AuthnRequest using ECP and PAOS",
"descr": "SAML2 AuthnRequest using ECP and PAOS",
"sequence":[ECP_AuthnRequest]
},
'log-in-out': {
"tc_id": "S2c-13",
"name": 'Basic SAML2 log in and out',
"descr": 'AuthnRequest using HTTP-Redirect followed by a logout',
"sequence": [AuthnRequest, LogOutRequest],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
'manage_nameid':{
"tc_id": "S2c-14",
"name": "ManageNameID; set NameID",
"descr": "Setting the SP provided ID by using ManageNameID",
"sequence":[AuthnRequest, ManageNameIDRequest],
"depend":["authn"]
},
'nameid-mapping':{
"tc_id": "S2c-15",
"name": "Simple NameIDMapping request",
"sequence":[AuthnRequest, NameIDMappingRequest],
"depend":["authn"]
},
'manage_nameid_nid-transient':{
"tc_id": "S2c-16",
"name": "ManageNameID; set NameID; AuthRequ/NameID-trans",
"descr": "Setting the SP provided ID by using ManageNameID",
"sequence":[AuthnRequestNID_Transient, ManageNameIDRequest],
"depend":["authn"]
},
'authn-authn_query': {
"tc_id": "S2c-17",
"name": 'AuthnRequest then AuthnQuery',
"descr": 'AuthnRequest followed by an AuthnQuery',
"sequence": [AuthnRequest, AuthnQuery],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []},
"depend":["authn"]
},
}

View File

@ -1,226 +0,0 @@
from saml2 import config, NAMEID_FORMAT_EMAILADDRESS
from saml2 import samlp
from saml2 import BINDING_HTTP_POST
from saml2 import VERSION
from saml2.client import Saml2Client
from saml2.s_utils import rndstr
from saml2.time_util import instant
__author__ = 'rolandh'
try:
from xmlsec_location import xmlsec_path
except ImportError:
xmlsec_path = '/opt/local/bin/xmlsec1'
cnf_dict = {
"entityid" : "urn:mace:example.com:saml:roland:sp",
"name" : "urn:mace:example.com:saml:roland:sp",
"description": "Test SP",
"service": {
"sp": {
"endpoints":{
"assertion_consumer_service": ["http://lingon.catalogix.se:8087/"],
},
"required_attributes": ["surName", "givenName", "mail"],
"optional_attributes": ["title"],
"idp": ["urn:mace:example.com:saml:roland:idp"],
}
},
"key_file" : "test.key",
"cert_file" : "test.pem",
"ca_certs": "cacerts.txt",
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["idp.xml"],
},
"subject_data": "subject_data.db",
"accepted_time_diff": 60,
"attribute_map_dir" : "attributemaps",
}
conf = config.SPConfig()
conf.load(cnf_dict)
client = Saml2Client(conf)
binding= BINDING_HTTP_POST
query_id = rndstr()
service_url = "https://example.com"
authn_request = {
#===== AuthRequest =====
"subject":{
"base_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"name_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"format":None,
"sp_provided_id": None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"encrypted_id":{
"encrypted_data":None,
"encrypted_key":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"subject_confirmation":[{
"base_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"name_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"format":None,
"sp_provided_id": None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"encrypted_id":{
"encrypted_data":None,
"encrypted_key":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"subject_confirmation_data":{
"not_before":None,
"not_on_or_after":None,
"recipient":None,
"in_response_to":None,
"address":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"text":None,
"extension_elements":None,
"extension_attributes":None,
}],
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
#NameIDPolicy
"name_id_policy":{
"format":NAMEID_FORMAT_EMAILADDRESS,
# NAMEID_FORMAT_EMAILADDRESS = (
# "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress")
# NAMEID_FORMAT_UNSPECIFIED = (
# "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified")
# NAMEID_FORMAT_ENCRYPTED = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:encrypted")
# NAMEID_FORMAT_PERSISTENT = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent")
# NAMEID_FORMAT_TRANSIENT = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:transient")
# NAMEID_FORMAT_ENTITY = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:entity")
"sp_name_qualifier":None,
"allow_create":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#saml.Conditions
"conditions":{
#Condition
"condition":[{}],
#AudienceRestriction
"audience_restriction":[{}],
#OneTimeUse
"one_time_use":[{}],
#ProxyRestriction
"proxy_restriction":[{}],
#not_before=None,
#not_on_or_after=None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#RequestedAuthnContext
"requested_authn_context":{
#saml.AuthnContextClassRef
"authn_context_class_ref":None,
#saml.AuthnContextDeclRef
"authn_context_decl_ref":None,
#AuthnContextComparisonType_
"comparison":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#Scoping
"scoping":{
#IDPList
"idp_list":{
#IDPEntry
"idp_entry":{
"provider_id":None,
"name":None,
"loc":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#GetComplete
"get_complete":{},
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#RequesterID
"requester_id":{},
#proxy_count=None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
"force_authn":None,
"is_passive":None,
"protocol_binding":None,
"assertion_consumer_service_index":None,
"assertion_consumer_service_url":None,
"attribute_consuming_service_index":None,
"provider_name":None,
#saml.Issuer
"issuer":{},
#ds.Signature
"signature":{},
#Extensions
"extensions":{},
"id":None,
"version":None,
"issue_instant":None,
"destination":None,
"consent":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
}
request = samlp.AuthnRequest(
id= query_id,
version= VERSION,
issue_instant= instant(),
assertion_consumer_service_url= service_url,
protocol_binding= binding
)

View File

@ -123,7 +123,6 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
:param sigalg: Which algorithm the signature function will use to sign
the message
:param signer: A signature function that can be used to sign the message
:param key: Key to use for signing
:return: A tuple containing header information and a HTML message.
"""

View File

@ -1,298 +0,0 @@
from __future__ import print_function
import json
import pprint
import argparse
import os.path
import sys
import traceback
from importlib import import_module
from idp_test import SCHEMA
from saml2 import root_logger
from saml2.mdstore import MetadataStore, MetaData
from saml2.saml import NAME_FORMAT_UNSPECIFIED
from saml2.server import Server
from saml2.config import IdPConfig
from saml2.config import logging
from sp_test.base import Conversation
from saml2test import FatalError
from saml2test import CheckError
from saml2test import ContextFilter
from saml2test import exception_trace
from saml2test.check import CRITICAL
__author__ = 'rolandh'
#formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s")
formatter_2 = logging.Formatter(
"%(delta).6f - %(levelname)s - [%(name)s] %(message)s")
cf = ContextFilter()
cf.start()
streamhandler = logging.StreamHandler(sys.stderr)
streamhandler.setFormatter(formatter_2)
memoryhandler = logging.handlers.MemoryHandler(1024 * 10, logging.DEBUG)
memoryhandler.addFilter(cf)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(memoryhandler)
logger.setLevel(logging.DEBUG)
class Client(object):
def __init__(self, operations, check_factory):
self.operations = operations
self.tests = None
self.check_factory = check_factory
self._parser = argparse.ArgumentParser()
self._parser.add_argument("-c", dest="config", default="config",
help="Configuration file for the IdP")
self._parser.add_argument(
'-C', dest="ca_certs",
help=("CA certs to use to verify HTTPS server certificates, ",
"if HTTPS is used and no server CA certs are defined then ",
"no cert verification will be done"))
self._parser.add_argument('-d', dest='debug', action='store_true',
help="Print debug information")
self._parser.add_argument("-H", dest="pretty", action='store_true',
help="Output summary on stdout as pretty "
"printed python dict instead of JSON")
self._parser.add_argument("-i", dest="insecure", action='store_true',
help="Do not verify SSL certificate")
self._parser.add_argument("-I", dest="keysdir", default="keys",
help="Directory for invalid IDP keys")
self._parser.add_argument('-J', dest="json_config_file",
help="Test target configuration")
self._parser.add_argument(
'-k', dest='content_log', action='store_true',
help="Log HTTP content in spearate files in directory "
"<operation>/, which defaults to the path in -L")
self._parser.add_argument(
"-l", dest="list", action="store_true",
help="List all the test flows as a JSON object")
self._parser.add_argument("-L", dest="logpath", default=".",
help="Path to the logfile directory")
self._parser.add_argument('-m', dest="metadata", action='store_true',
help="Return the IdP metadata")
self._parser.add_argument(
"-P", dest="configpath", default=".",
help="Path to the configuration file for the IdP")
self._parser.add_argument("-t", dest="testpackage",
help="Module describing tests")
#self._parser.add_argument('-v', dest='verbose', action='store_true',
# help="Print runtime information") # unsused
self._parser.add_argument("-Y", dest="pysamllog", action='store_true',
help="Print PySAML2 logs")
self._parser.add_argument("oper", nargs="?", help="Which test to run")
self.interactions = None
self.entity_id = None
self.constraints = {}
self.args = None
self.idp = None
self.idp_config = None
def json_config_file(self):
if self.args.json_config_file == "-":
return json.loads(sys.stdin.read())
else:
return json.loads(open(self.args.json_config_file).read())
def idp_configure(self, metadata_construction=False):
sys.path.insert(0, self.args.configpath)
mod = import_module(self.args.config)
self.idp_config = IdPConfig().load(mod.CONFIG, metadata_construction)
if not self.args.insecure:
self.idp_config.verify_ssl_cert = False
else:
if self.args.ca_certs:
self.idp_config.ca_certs = self.args.ca_certs
else:
self.idp_config.ca_certs = "../keys/cacert.pem"
# hack to change idp cert without config change. TODO: find interface to
# change IDP cert after __init__
if self.args.oper == 'sp-04':
self.idp_config.cert_file = os.path.join(self.args.keysdir, "non_md_cert.pem")
self.idp_config.key_file = os.path.join(self.args.keysdir, "non_md_key.pem")
for f in [self.idp_config.cert_file, self.idp_config.key_file]:
if not os.path.isfile(f):
print("File not found: %s" % os.path.abspath(f))
raise
self.idp = Server(config=self.idp_config)
def test_summation(self, sid):
status = 0
for item in self.test_log:
if item["status"] > status:
status = item["status"]
if status == 0:
status = 1
info = {
"id": sid,
"status": status,
"tests": self.test_log
}
if status == 5:
info["url"] = self.test_log[-1]["url"]
info["htmlbody"] = self.test_log[-1]["message"]
return info
def output_log(self, memhndlr, hndlr2):
"""
"""
print(80 * ":", file=sys.stderr)
hndlr2.setFormatter(formatter_2)
memhndlr.setTarget(hndlr2)
memhndlr.flush()
memhndlr.close()
def run(self):
self.args = self._parser.parse_args()
if self.args.pysamllog:
root_logger.addHandler(memoryhandler)
root_logger.setLevel(logging.DEBUG)
if self.args.metadata:
return self.make_meta()
elif self.args.list:
return self.list_operations()
elif self.args.oper == "check":
return self.verify_metadata()
else:
if not self.args.oper:
raise Exception("Missing test case specification")
self.args.oper = self.args.oper.strip("'")
self.args.oper = self.args.oper.strip('"')
self.setup()
try:
oper = self.operations.OPERATIONS[self.args.oper]
except KeyError:
if self.tests:
try:
oper = self.tests.OPERATIONS[self.args.oper]
except ValueError:
print("Undefined testcase " + self.args.oper,
file=sys.stderr)
return
else:
print("Undefined testcase " + self.args.oper, file=sys.stderr)
return
if self.args.pretty:
pp = pprint.PrettyPrinter(indent=4)
else:
pp = None
logger.info("Starting conversation")
conv = Conversation(self.idp, self.idp_config,
self.interactions, self.json_config,
check_factory=self.check_factory,
entity_id=self.entity_id,
constraints=self.constraints,
commandlineargs = self.args)
try:
conv.do_sequence_and_tests(oper["sequence"], oper["tests"])
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
err = None
except CheckError as err:
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
except FatalError as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
except Exception as err:
if conv:
conv.test_output.append({"status": CRITICAL,
"name": "test driver error",
"id": "critial exception"})
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
logger.error("Unexpected exception in test driver %s",
traceback.format_exception(*sys.exc_info()))
if pp:
pp.pprint(tsum)
else:
print(json.dumps(tsum), file=sys.stdout)
if tsum["status"] > 1 or self.args.debug or err:
self.output_log(memoryhandler, streamhandler)
def setup(self):
self.json_config = self.json_config_file()
_jc = self.json_config
try:
self.interactions = _jc["interaction"]
except KeyError:
self.interactions = []
self.idp_configure()
metadata = MetadataStore(SCHEMA, self.idp_config.attribute_converters,
self.idp_config)
info = _jc["metadata"].encode("utf-8")
md = MetaData(SCHEMA, self.idp_config.attribute_converters, info)
md.load()
metadata[0] = md
self.idp.metadata = metadata
#self.idp_config.metadata = metadata
if self.args.testpackage:
self.tests = import_module("sp_test.package.%s" %
self.args.testpackage)
try:
self.entity_id = _jc["entity_id"]
# Verify its the correct metadata
assert self.entity_id in md.entity.keys()
except KeyError:
if len(md.entity.keys()) == 1:
self.entity_id = md.entity.keys()[0]
else:
raise Exception("Don't know which entity to talk to")
if "constraints" in _jc:
self.constraints = _jc["constraints"]
if "name_format" not in self.constraints:
self.constraints["name_format"] = NAME_FORMAT_UNSPECIFIED
def make_meta(self):
pass
def list_operations(self):
res = []
for key, val in self.operations.OPERATIONS.items():
res.append({"id": key, "name": val["name"]})
print(json.dumps(res))
def verify_metadata(self):
pass

View File

@ -1,538 +0,0 @@
import base64
import cookielib
import re
import os
import traceback
import urllib
import sys
import six
from urlparse import parse_qs
from saml2 import BINDING_HTTP_REDIRECT, class_name
from saml2 import BINDING_HTTP_POST
from saml2.request import SERVICE2REQUEST
from saml2.sigver import signed_instance_factory, pre_signature_part
from saml2.httputil import HttpParameters
from saml2test import CheckError, FatalError
from saml2test.check import Check
from saml2test.check import ExpectedError
from saml2test.check import INTERACTION
from saml2test.check import STATUSCODE
from saml2test.interaction import Action
from saml2test.interaction import Interaction
from saml2test.interaction import InteractionNeeded
from sp_test.tests import ErrorResponse
__author__ = 'rolandh'
import logging
logger = logging.getLogger(__name__)
FILE_EXT = {"text/html": "html", "test/plain": "txt", "application/json": "json",
"text/xml": "xml", "application/xml": "xml", }
camel2underscore = re.compile('((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))')
class Conversation():
def __init__(self, instance, config, interaction, json_config,
check_factory, entity_id, msg_factory=None,
features=None, constraints=None, # verbose=False,
expect_exception=None, commandlineargs=None):
self.instance = instance
self._config = config
self.test_output = []
self.features = features
#self.verbose = verbose # removed (not used)
self.check_factory = check_factory
self.msg_factory = msg_factory
self.expect_exception = expect_exception
self.commandlineargs = commandlineargs
self.cjar = {"browser": cookielib.CookieJar(),
"rp": cookielib.CookieJar(),
"service": cookielib.CookieJar()}
self.protocol_response = []
self.last_response = None
self.last_content = None
self.response = None
self.interaction = Interaction(self.instance, interaction)
self.exception = None
self.entity_id = entity_id
self.cjar = {"rp": cookielib.CookieJar()}
self.args = {}
self.qargs = {}
self.response_args = {}
self.saml_response = []
self.destination = ""
self.request = None
self.position = ""
self.response = None
self.oper = None
self.msg_constraints = constraints
self.json_config = json_config
self.start_page = json_config["start_page"]
def check_severity(self, stat):
if stat["status"] >= 3:
logger.error("WHERE: %s", stat["id"])
logger.error("STATUS:%s", STATUSCODE[stat["status"]])
try:
logger.error("HTTP STATUS: %s", stat["http_status"])
except KeyError:
pass
try:
logger.error("INFO: %s", stat["message"])
except KeyError:
pass
raise CheckError
def do_check(self, test, **kwargs):
if isinstance(test, six.string_types):
chk = self.check_factory(test)(**kwargs)
else:
chk = test(**kwargs)
if not chk.call_on_redirect() and \
300 < self.last_response.status_code <= 303:
pass
else:
stat = chk(self, self.test_output)
self.check_severity(stat)
def err_check(self, test, err=None, bryt=True):
if err:
self.exception = err
chk = self.check_factory(test)()
chk(self, self.test_output)
if bryt:
e = FatalError("%s" % err)
e.trace = "".join(traceback.format_exception(*sys.exc_info()))
raise e
def test_sequence(self, sequence):
if sequence is None:
return True
for test in sequence:
if isinstance(test, tuple):
test, kwargs = test
else:
kwargs = {}
self.do_check(test, **kwargs)
if test == ExpectedError:
return False # TODO: return value is unused
return True
def my_endpoints(self):
for serv in ["aa", "aq", "idp"]:
endpoints = self._config.getattr("endpoints", serv)
if endpoints:
for typ, spec in endpoints.items():
for url, binding in spec:
yield url
def which_endpoint(self, url):
for serv in ["aa", "aq", "idp"]:
endpoints = self._config.getattr("endpoints", serv)
if endpoints:
for typ, spec in endpoints.items():
for endp, binding in spec:
if url.startswith(endp):
return typ, binding
return None
def _log_response(self, response):
"""Depending on -k argument write content to either logger or extra file
Create the <operation> directory; delete all possibly existing files
Write response content into response_x.<ext> (with x incrementing from 0)
"""
logger.info("<-- Status: %s", response.status_code)
if response.status_code in [302, 301, 303]:
logger.info("<-- location: %s",
response.headers._store['location'][1])
else:
if self.commandlineargs.content_log:
self._content_log_fileno = getattr(self, '_content_log_fileno', 0) + 1
if not getattr(self, 'logcontentpath', None):
try:
content_type_hdr = response.headers._store['content-type'][1]
l = content_type_hdr.split(';') + ['charset=ISO-8859-1',]
content_type = l[0]
encoding = l[1].split("=")
ext = "." + FILE_EXT[content_type]
except Exception as e:
ext = ""
self._logcontentpath = os.path.join(
self.commandlineargs.logpath,
self.commandlineargs.oper)
if not os.path.exists(self._logcontentpath):
os.makedirs(self._logcontentpath)
for fn in os.listdir(self._logcontentpath):
old_file = os.path.join(self._logcontentpath, fn)
if os.path.isfile(old_file):
os.unlink(old_file)
fn = os.path.join(self._logcontentpath, "response_%d%s"
% (self._content_log_fileno, ext ))
f = open(fn, "w")
f.write(response.content)
f.close()
logger.info("<-- Response content (encoding=%s) in file %s",
encoding, fn)
pass
else:
logger.info("<-- Content: %s", response.content)
def wb_send_GET_startpage(self):
"""
The action that starts the whole sequence, a HTTP GET on a web page
"""
self.last_response = self.instance.send(self.start_page)
self._log_response(self.last_response)
def handle_result(self, check_response=None):
if check_response:
if isinstance(check_response(), Check):
if 300 < self.last_response.status_code <= 303:
self._redirect(self.last_response)
self.do_check(check_response)
else:
# A HTTP redirect or HTTP Post
if 300 < self.last_response.status_code <= 303:
self._redirect(self.last_response)
if self.last_response.status_code >= 400:
raise FatalError(self.last_response.reason)
_txt = self.last_response.content
assert _txt.startswith("<h2>")
else:
if 300 < self.last_response.status_code <= 303:
self._redirect(self.last_response)
_txt = self.last_response.content
if self.last_response.status_code >= 400:
raise FatalError("Did not expected error")
def parse_saml_message(self):
try:
url, query = self.last_response.headers["location"].split("?")
except KeyError:
return
_dict = parse_qs(query)
try:
self.relay_state = _dict["RelayState"][0]
except KeyError:
self.relay_state = ""
_str = _dict["SAMLRequest"][0]
self.saml_request = self.instance._parse_request(
_str, SERVICE2REQUEST[self._endpoint], self._endpoint,
self._binding)
if self._binding == BINDING_HTTP_REDIRECT:
self.http_parameters = HttpParameters(_dict)
def _redirect(self, _response):
rdseq = []
url = None
while _response.status_code in [302, 301, 303]:
url = _response.headers["location"]
if url in rdseq:
raise FatalError("Loop detected in redirects")
else:
rdseq.append(url)
if len(rdseq) > 8:
raise FatalError(
"Too long sequence of redirects: %s" % rdseq)
logger.info("--> REDIRECT TO: %s", url)
# If back to me
for_me = False
try:
self._endpoint, self._binding = self.which_endpoint(url)
for_me = True
except TypeError:
pass
if for_me:
break
else:
try:
_response = self.instance.send(url, "GET")
except Exception as err:
raise FatalError("%s" % err)
self._log_response(_response)
self.last_response = _response
if _response.status_code >= 400:
break
return url
def send_idp_response(self, req_flow, resp_flow):
"""
:param req_flow: The flow to check the request
:param resp_flow: The flow to prepare the response
:return: The SP's HTTP response on receiving the SAML response
"""
# Pick information from the request that should be in the response
args = self.instance.response_args(self.saml_request.message,
[resp_flow._binding])
_mods = list(resp_flow.__mro__[:])
_mods.reverse()
for m in _mods:
try:
args.update(self.json_config["args"][m.__name__])
except KeyError:
pass
args.update(resp_flow._response_args)
for param in ["identity", "userid"]:
if param in self.json_config:
args[param] = self.json_config[param]
if resp_flow == ErrorResponse:
func = getattr(self.instance, "create_error_response")
else:
_op = camel2underscore.sub(r'_\1', req_flow._class.c_tag).lower()
func = getattr(self.instance, "create_%s_response" % _op)
# get from config which parts shall be signed
sign = []
for styp in ["sign_assertion", "sign_response"]:
if styp in args:
try:
if args[styp].lower() == "always":
sign.append(styp)
del args[styp]
except (AttributeError, TypeError):
raise AssertionError('config parameters "sign_assertion", '
'"sign_response" must be of type string')
response = func(**args)
response = resp_flow(self).pre_processing(response)
# and now for signing
if sign:
to_sign = []
try:
_digest_alg=args["sign_digest_alg"]
except KeyError:
_digest_alg=None
try:
_sign_alg=args["sign_signature_alg"]
except KeyError:
_sign_alg=None
# Order is important, first assertion and then response if both
if "sign_assertion" in sign:
to_sign = [(class_name(response.assertion),
response.assertion.id)]
response.assertion.signature = pre_signature_part(
response.assertion.id, self.instance.sec.my_cert, 1,
digest_alg=_digest_alg, sign_alg=_sign_alg)
if "sign_response" in sign:
to_sign = [(class_name(response), response.id)]
response.signature = pre_signature_part(
response.id, self.instance.sec.my_cert, 1,
digest_alg=_digest_alg, sign_alg=_sign_alg)
response = signed_instance_factory(response, self.instance.sec,
to_sign)
info = self.instance.apply_binding(resp_flow._binding, response,
args["destination"],
self.relay_state,
"SAMLResponse", resp_flow._sign)
if resp_flow._binding == BINDING_HTTP_REDIRECT:
url = None
for param, value in info["headers"]:
if param == "Location":
url = value
break
self.last_response = self.instance.send(url)
elif resp_flow._binding == BINDING_HTTP_POST:
resp_flow = base64.b64encode("%s" % response)
info["data"] = urllib.urlencode({"SAMLResponse": resp_flow,
"RelayState": self.relay_state})
info["method"] = "POST"
info["headers"] = {
'Content-type': 'application/x-www-form-urlencoded'}
self.last_response = self.instance.send(**info)
try:
self.last_content = self.last_response.content
except AttributeError:
self.last_content = None
self._log_response(self.last_response)
def do_flow(self, flow, mid_tests):
"""
Solicited or 'un-solicited' flows.
Solicited always starts with the Web client accessing a page.
Un-solicited starts with the IDP sending a SAMl Response.
"""
if len(flow) >= 3:
self.wb_send_GET_startpage()
self.intermit(flow[0]._interaction)
self.parse_saml_message()
# make sure I got the request I expected
assert isinstance(self.saml_request.message, flow[1]._class)
try:
self.test_sequence(mid_tests)
except KeyError:
pass
self.send_idp_response(flow[1], flow[2])
if len(flow) == 4:
self.handle_result(flow[3])
else:
self.handle_result()
def do_sequence_and_tests(self, oper, tests=None):
self.current_oper = oper
try:
self.test_sequence(tests["pre"])
except KeyError:
pass
for flow in oper:
try:
self.do_flow(flow, tests["mid"])
except InteractionNeeded:
self.test_output.append({"status": INTERACTION,
"message": "see detail log for response content",
"id": "exception",
"name": "interaction needed",
"url": self.position})
break
except FatalError:
raise
except Exception as err:
#self.err_check("exception", err)
raise
try:
self.test_sequence(tests["post"])
except KeyError:
pass
def intermit(self, page_types):
"""
Currently handles only SP-issued redirects
:param page_types: not used (could be used to implement wayf, disco)
"""
_response = self.last_response
_last_action = None
_same_actions = 0
if _response.status_code >= 400:
try:
self.last_content = _response.text
except AttributeError:
self.last_content = None
raise FatalError(
"HTTP response status code: %d" % _response.status_code)
url = _response.url
content = _response.text
done = False
while not done:
rdseq = []
while _response.status_code in [302, 301, 303]:
url = _response.headers["location"]
if url in rdseq:
raise FatalError("Loop detected in redirects")
else:
rdseq.append(url)
if len(rdseq) > 8:
raise FatalError(
"Too long sequence of redirects: %s" % rdseq)
# If back to me
for_me = False
try:
self._endpoint, self._binding = self.which_endpoint(url)
for_me = True
except TypeError:
pass
if for_me:
done = True
break
else:
try:
_response = self.instance.send(url, "GET")
except Exception as err:
raise FatalError("%s" % err)
self._log_response(_response)
content = _response.text
self.position = url
self.last_content = content
self.response = _response
if _response.status_code >= 400:
done = True
break
if done or url is None:
break
_base = url.split("?")[0]
try:
_spec = self.interaction.pick_interaction(_base, content)
except InteractionNeeded:
self.position = url
logger.error("Page Content: %s", content)
raise
except KeyError:
self.position = url
logger.error("Page Content: %s", content)
self.err_check("interaction-needed")
if _spec == _last_action:
_same_actions += 1
if _same_actions >= 3:
raise InteractionNeeded("Interaction loop detection")
else:
_last_action = _spec
if len(_spec) > 2:
logger.info(">> %s <<", _spec["page-type"])
if _spec["page-type"] == "login":
self.login_page = content
_op = Action(_spec["control"])
try:
_response = _op(self.instance, self, logger, url,
_response, content, self.features)
if isinstance(_response, dict):
self.last_response = _response
self.last_content = _response
return _response
content = _response.text
self.position = url
self.last_content = content
self.response = _response
if _response.status_code >= 400:
break
except (FatalError, InteractionNeeded):
raise
except Exception as err:
self.err_check("exception", err, False)
self.last_response = _response
try:
self.last_content = _response.text
except AttributeError:
self.last_content = None

View File

@ -1,258 +0,0 @@
import inspect
import logging
import re
import sys
from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2test.check import Check
from saml2test.check import ERROR, INFORMATION, WARNING
from saml2test import check
from saml2test.interaction import Interaction
__author__ = 'rolandh'
logger = logging.getLogger(__name__)
class VerifyAuthnRequest(Check):
""" Basic AuthnRequest verification as provided by pysaml2
"""
cid = "verify-authnrequest"
def _func(self, conv):
try:
conv.saml_request.message.verify()
except ValueError:
self._status = ERROR
return {}
class MatchResult(Check):
cid = "match-result"
def _func(self, conv):
interaction = Interaction(conv.instance, [conv.json_config["result"]])
_int = interaction.pick_interaction(content=conv.last_response.content)
return {}
class ErrorResponse(Check):
cid = "saml-error"
msg = "Expected error message, but test target returned OK"
def _func(self, conv):
try:
assert conv.last_response.status_code >= 400
except AssertionError:
self._message = self.msg
self._status = ERROR
return {}
class VerifyDigestAlgorithm(Check):
"""
verify that the used digest algorithm was one from the approved set.
"""
cid = "verify-digest-algorithm"
def _digest_algo(self, signature, allowed):
_alg = signature.signed_info.reference[0].digest_method.algorithm
try:
assert alg in allowed
except AssertionError:
self._message = "signature digest algorithm not allowed: " + alg
self._status = ERROR
return False
return True
def _func(self, conv):
if "digest_algorithm" not in conv.msg_constraints:
logger.info("Not verifying digest_algorithm (not configured)")
return {}
else:
try:
assert len(conv.msg_constraints["digest_algorithm"]) > 0
except AssertionError:
self._message = "List of allowed digest algorithm must not be empty"
self._status = ERROR
return {}
_algs = conv.msg_constraints["digest_algorithm"]
request = conv.saml_request.message
if request.signature:
if not self._digest_algo(request.signature, _algs):
return {}
elif conv._binding == BINDING_HTTP_REDIRECT:
self._message = "no digest with redirect binding"
self._status = INFORMATION
return {}
elif conv._binding == BINDING_HTTP_POST:
self._message = "cannot verify digest algorithm: request not signed"
self._status = WARNING
return {}
return {}
class VerifyIfRequestIsSigned(Check):
"""
verify that the request has been signed
"""
cid = "verify-if-request-is-signed"
def _func(self, conv):
try:
check_sig = conv.msg_constraints["authnRequest_signature_required"]
except KeyError:
check_sig = False
if check_sig:
if conv._binding == BINDING_HTTP_REDIRECT:
try:
assert conv.http_parameters.signature is not None
except AssertionError:
self._message = "No AuthnRequest simple signature found"
self._status = ERROR
return {}
else:
try:
assert conv.saml_request.message.signature is not None
except AssertionError:
self._message = "No AuthnRequest XML signature found"
self._status = ERROR
return {}
else:
logger.debug("AuthnRequest signature is optional")
return {}
return {}
class VerifySignatureAlgorithm(Check):
"""
verify that the used signature algorithm was one from an approved set.
"""
cid = "verify-signature-algorithm"
def _func(self, conv):
if "signature_algorithm" not in conv.msg_constraints:
logger.info("Not verifying signature_algorithm (not configured)")
return {}
else:
try:
assert len(conv.msg_constraints["signature_algorithm"]) > 0
except AssertionError:
self._message = "List of allowed signature algorithm must " \
"not be empty"
self._status = ERROR
return {}
allowed_algs = [a[1] for a in conv.msg_constraints["signature_algorithm"]]
if conv._binding == BINDING_HTTP_REDIRECT:
if getattr(conv.http_parameters, "signature", None):
_alg = conv.http_parameters.sigalg
try:
assert _alg in allowed_algs
except AssertionError:
self._message = "Algorithm not in white list for " \
"redirect signing: " + _alg
self._status = ERROR
else:
signature = getattr(conv.saml_request.message, "signature", None)
if signature:
try:
assert signature.signed_info.signature_method.algorithm in \
allowed_algs
except AssertionError:
self._message = "Wrong algorithm used for signing: '%s'" % \
signature.signed_info.signature_method.algorithm
self._status = ERROR
else:
self._message = "cannot verify signature algorithm: request not signed"
self._status = WARNING
return {}
return {}
class VerifyEchopageContents(Check):
""" Verify that the last success response (HTTP code 200) from the SP
contains static text and SAML response values
"""
cid = "verify-echopage-contents"
msg = "Cannot match expected contents on SP echo page"
def _func(self, conv):
if conv.last_response.status_code < 300:
try:
pattern = conv.json_config["echopageIdPattern"]
m = re.search(pattern, conv.last_response.content)
try:
assert m is not None
except AssertionError:
self._message = "Cannot match expected static contents " \
"in SP echo page"
self._status = ERROR
for pattern in conv.json_config["echopageContentPattern"]:
m = re.search(pattern, conv.last_response.content)
try:
assert m is not None
except AssertionError:
self._message = 'Cannot match expected response value' \
', pattern="' + pattern + '"'
self._status = ERROR
except KeyError:
self._message = 'Configuration error: missing key ' \
'"echopageIdString" in test target config'
self._status = ERROR
return {}
def call_on_redirect(self):
return False
class SetResponseAndAssertionSignaturesFalse(Check):
""" Prepare config to suppress signatures of both response and assertion"""
cid = "set-response-and-assertion-signature-false"
msg = "Prepare config to suppress signatures of both response and assertion"
def _func(self, conv):
conv.json_config['args']['AuthnResponse']['sign_assertion'] = 'never'
conv.json_config['args']['AuthnResponse']['sign_response'] = 'never'
self._status = INFORMATION
return {}
#class SetInvalidIdpKey(Check):
# """ Prepare config to set IDP signing key to some useless key"""
# cid = "set-idp-key-invalid"
# msg = "Prepare config to set IDP signing key invalid"
#
# def _func(self, conv):
# conv.instance.sec.cert_file = conv.instance.config.invalid_idp_cert_file
# conv.instance.sec.key_file = conv.instance.config.invalid_idp_key_file
# return {}
# =============================================================================
CLASS_CACHE = {}
def factory(cid, classes=CLASS_CACHE):
if len(classes) == 0:
check.factory(cid, classes)
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj):
try:
classes[obj.cid] = obj
except AttributeError:
pass
if cid in classes:
return classes[cid]
else:
return None

View File

@ -1,774 +0,0 @@
import copy
from saml2 import samlp, SamlBase
from saml2 import NAMEID_FORMAT_EMAILADDRESS
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2.s_utils import rndstr
from saml2.saml import SCM_BEARER, Condition, XSI_TYPE, Audience
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import SCM_SENDER_VOUCHES
from saml2.saml import ConditionAbstractType_
from saml2.samlp import STATUS_AUTHN_FAILED
from saml2.time_util import in_a_while, a_while_ago
from sp_test import check
from sp_test.check import VerifyAuthnRequest, VerifyDigestAlgorithm
from sp_test.check import VerifySignatureAlgorithm, VerifyIfRequestIsSigned
from sp_test.check import SetResponseAndAssertionSignaturesFalse
from saml2test.check import CheckSpHttpResponseOK, CheckSpHttpResponse500
from saml2test import ip_addresses
__author__ = 'rolandh'
USER = {
"adam": {
"given_name": "Adam",
"sn": "Andersson"
},
"eva": {
"given_name": "Eva",
"sn": "Svensson"
}
}
# Extension class - extra condition
class TimeRestriction(ConditionAbstractType_):
""" """
c_tag = 'TimeRestriction'
c_namespace = "urn:mace:umu.se:sso"
c_children = ConditionAbstractType_.c_children.copy()
c_attributes = ConditionAbstractType_.c_attributes.copy()
c_child_order = ConditionAbstractType_.c_child_order[:]
c_cardinality = ConditionAbstractType_.c_cardinality.copy()
c_attributes['StartTime'] = ('start_time', 'time', False)
c_attributes['EndTime'] = ('end_time', 'time', False)
def __init__(self,
start_time=None,
end_time=None,
text=None,
extension_elements=None,
extension_attributes=None):
ConditionAbstractType_.__init__(
self, text=text, extension_elements=extension_elements,
extension_attributes=extension_attributes)
self.start_time = start_time
self.end_time = end_time
# =============================================================================
class Response(object):
_args = {}
_class = samlp.Response
_sign = False
tests = {"pre": [], "post": []}
def __init__(self, conv):
self.args = self._args.copy()
self.conv = conv
def setup(self):
pass
def pre_processing(self, message, *kwargs):
return message
def post_processing(self, message, *kwargs):
return message
class Request(object):
response = ""
_class = None
tests = {"pre": [],
"mid": [VerifyAuthnRequest],
"post": []}
def __init__(self):
pass
def __call__(self, conv, response):
pass
class Operation(object):
pass
class AuthnResponse(Response):
_response_args = {
"identity": USER["adam"],
"userid": "adam",
}
_binding = BINDING_HTTP_POST
class AuthnResponse_redirect(AuthnResponse):
_binding = BINDING_HTTP_REDIRECT
class ErrorResponse(Response):
_response_args = {
"info": (STATUS_AUTHN_FAILED, "Unknown user")
}
_binding = BINDING_HTTP_POST
class LogoutResponse(Response):
_class = samlp.LogoutRequest
pass
class Login(Operation):
_interaction = ["wayf"]
class AuthnRequest(Request):
_class = samlp.AuthnRequest
class AuthnResponse_NameIDformat_persistent(AuthnResponse):
def pre_processing(self, message, **kwargs):
name_id = message.assertion.subject.name_id
name_id.name_format = NAMEID_FORMAT_PERSISTENT
return message
class AuthnResponse_NameIDformat_email(AuthnResponse):
def pre_processing(self, message, **kwargs):
name_id = message.assertion.subject.name_id
name_id.name_format = NAMEID_FORMAT_EMAILADDRESS
name_id.text = "adam@example.com"
return message
class AuthnResponse_NameIDformat_foo(AuthnResponse):
def pre_processing(self, message, **kwargs):
name_id = message.assertion.subject.name_id
name_id.name_format = "foo"
name_id.text = "fruit basket"
return message
class AuthnResponse_without_SubjectConfirmationData_1(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation.subject_confirmation_data = None
_confirmation.method = SCM_SENDER_VOUCHES
return message
class AuthnResponse_without_SubjectConfirmationData_2(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation.subject_confirmation_data = None
_confirmation.method = SCM_BEARER
return message
class AuthnResponse_rnd_Response_inresponseto(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.in_response_to = "invalid_rand_" + rndstr(6)
return message
class AuthnResponse_rnd_Response_assertion_inresponseto(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.assertion.in_response_to = "invalid_rand_" + rndstr(6)
return message
class AuthnResponse_Response_no_inresponse(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.in_response_to = None
return message
class AuthnResponse_SubjectConfirmationData_no_inresponse(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation[0].subject_confirmation_data.in_response_to = None
return message
class AuthnResponse_wrong_Recipient(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation[0].subject_confirmation_data.recipient = rndstr(16)
return message
class AuthnResponse_missing_Recipient(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation[0].subject_confirmation_data.recipient = None
return message
class AuthnResponse_missing_Recipient(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation[0].subject_confirmation_data.recipient = None
return message
class AuthnResponse_broken_destination(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.destination = "NotAUrl"
return message
class AuthnResponse_correct_recipient_address(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
if "localhost" in self.conv.entity_id:
addr = "127.0.0.1"
else:
addr = ip_addresses()[0]
_confirmation[0].subject_confirmation_data.address = addr
return message
class AuthnResponse_incorrect_recipient_address(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
_confirmation[0].subject_confirmation_data.address = "10.0.0.1"
return message
class AuthnResponse_2_recipients_me_last(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
sc = copy.copy(_confirmation[0])
if "localhost" in self.conv.entity_id:
addr = "127.0.0.1"
else:
addr = ip_addresses()[0]
sc.subject_confirmation_data.address = addr
_confirmation.insert(0, sc)
return message
class AuthnResponse_2_recipients_me_first(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation
sc = copy.copy(_confirmation[0])
if "localhost" in self.conv.entity_id:
addr = "127.0.0.1"
else:
addr = ip_addresses()[0]
sc.subject_confirmation_data.address = addr
_confirmation.append(sc)
return message
class AuthnResponse_unknown_condition(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
conditions.condition = [Condition(
extension_elements=[TimeRestriction(start_time="08:00:00",
end_time="17:00:00")],
extension_attributes={XSI_TYPE: "foo:bas"})]
return message
class AuthnResponse_future_NotBefore(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
# Valid starting five hours from now
conditions.not_before = in_a_while(hours=5)
return message
class AuthnResponse_past_NotOnOrAfter(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
# Valid up until five hours ago
conditions.not_on_or_after = a_while_ago(hours=5)
return message
class AuthnResponse_past_SubjectConfirmationData_NotOnOrAfter(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation[0]
_confirmation.subject_confirmation_data.not_on_or_after = a_while_ago(
hours=5)
return message
class AuthnResponse_future_SubjectConfirmationData_NotBefore(AuthnResponse):
def pre_processing(self, message, **kwargs):
_confirmation = message.assertion.subject.subject_confirmation[0]
_confirmation.subject_confirmation_data.not_before = in_a_while(
hours=5)
return message
class AuthnResponse_past_AuthnStatement_SessionNotOnOrAfter(AuthnResponse):
def pre_processing(self, message, **kwargs):
_statement = message.assertion.authn_statement[0]
_statement.session_not_on_or_after = a_while_ago(hours=5)
return message
class AuthnResponse_missing_AuthnStatement(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.assertion.authn_statement = []
return message
class AuthnResponse_future_24h_IssueInstant(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.assertion.issue_instant = in_a_while(hours=24)
return message
class AuthnResponse_past_24h_IssueInstant(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.assertion.issue_instant = a_while_ago(hours=24)
return message
class AuthnResponse_datetime_millisecond(AuthnResponse):
def pre_processing(self, message, **kwargs):
message.assertion.issue_instant = in_a_while(milliseconds=123)
return message
class AuthnResponse_AudienceRestriction_no_audience(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
conditions.audience_restriction[0].audience = None
return message
class AuthnResponse_AudienceRestriction_wrong_audience(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
conditions.audience_restriction[0].audience = [
Audience("http://saml.example.com")]
return message
class AuthnResponse_AudienceRestriction_prepended_audience(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
extra = Audience("http://saml.example.com")
conditions.audience_restriction[0].audience.insert(0, extra)
return message
class AuthnResponse_AudienceRestriction_appended_audience(AuthnResponse):
def pre_processing(self, message, **kwargs):
conditions = message.assertion.conditions
extra = Audience("http://saml.example.com")
conditions.audience_restriction[0].audience.append(extra)
return message
PHASES = {
"login_redirect": (Login, AuthnRequest, AuthnResponse_redirect),
}
# Each operation defines 4 flows and 3 sets of tests, in chronological order:
# test "pre": executes before anything is sent to the SP
# flow 0: Start conversation flow
# flow 1: SAML request flow
# test "mid": executes after receiving the SAML request
# flow 2: SAML response flow
# flow 3: check SP response after authentication
# test "post": executes after finals response has been received from SP
OPERATIONS = {
'sp-00': {
"name": 'Basic Login test expect HTTP 200 result',
"descr": 'WebSSO verify authentication request, verify '
'HTTP-Response after sending the SAML response',
"sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponseOK)],
"tests": {"pre": [], "mid": [], "post": []}
},
'sp-01': {
"name": 'Login OK & echo page verification test',
"descr": 'Same as SP-00, then check if result page is displayed',
"sequence": [(Login, AuthnRequest, AuthnResponse, check.VerifyEchopageContents)],
"tests": {"pre": [], "mid": [], "post": []}
},
'sp-02': {
"name": 'Require AuthnRequest to be signed',
"descr": 'Same as SP-00, and check if a request signature can be found',
"sequence": [(Login, AuthnRequest, AuthnResponse, None)],
"tests": {"pre": [], "mid": [VerifyIfRequestIsSigned], "post": []}
},
'sp-03': {
"name": 'Reject unsigned reponse/assertion',
"descr": 'Check if SP flags missing signature with HTTP 500',
"sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponse500)],
"tests": {"pre": [SetResponseAndAssertionSignaturesFalse], "mid": [], "post": []}
},
'sp-04': { # test-case specific code in sp_test/__init__
"name": 'Reject siganture with invalid IDP key',
"descr": 'IDP-key for otherwise valid signature not in metadata - expect HTTP 500 result',
"sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponse500)],
"tests": {"pre": [], "mid": [], "post": []}
},
'sp-05': {
"name": 'Verify digest algorithm',
"descr": 'Trigger WebSSO AuthnRequest and verify that the used '
'digest algorithm was one from the approved set.',
"sequence": [(Login, AuthnRequest, AuthnResponse, None)],
"tests": {"pre": [], "mid": [VerifyDigestAlgorithm], "post": []}
},
'sp-06': {
"name": 'Verify signature algorithm',
"descr": 'Trigger WebSSO AuthnRequest and verify that the used '
'signature algorithm was one from the approved set.',
"sequence": [(Login, AuthnRequest, AuthnResponse, None)],
"tests": {"pre": [], "mid": [VerifySignatureAlgorithm], "post": []}
},
'sp-08': {
"name": "SP should accept a Response without a "
"SubjectConfirmationData element. If confirmation method"
"is SCM_SENDER_VOUCHES",
"sequence": [(Login, AuthnRequest,
AuthnResponse_without_SubjectConfirmationData_2,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL02': {
"name": 'Verify various aspects of the generated AuthnRequest message',
"descr": 'Basic Login test',
"sequence": [],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL03': {
"name": "SP should not accept a Response as valid, when the StatusCode"
" is not success",
"sequence": [(Login, AuthnRequest, ErrorResponse, check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL04': {
"name": "SP should accept a NameID with Format: persistent",
"sequence": [(Login, AuthnRequest,
AuthnResponse_NameIDformat_persistent, None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL05': {
"name": "SP should accept a NameID with Format: e-mail",
"sequence": [(Login, AuthnRequest, AuthnResponse_NameIDformat_email,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL06': {
"name": "Do SP work with unknown NameID Format, such as : foo",
"sequence": [(Login, AuthnRequest, AuthnResponse_NameIDformat_foo,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL07': {
"name": "SP should accept a Response without a "
"SubjectConfirmationData element. If confirmation method "
"is SCM_SENDER_VOUCHES",
"sequence": [(Login, AuthnRequest,
AuthnResponse_without_SubjectConfirmationData_1, None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL09': {
"name": "SP should not accept a response InResponseTo "
"which is chosen randomly",
"sequence": [(Login, AuthnRequest,
AuthnResponse_rnd_Response_inresponseto,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL10': {
"name": "SP should not accept an assertion InResponseTo "
"which is chosen randomly",
"sequence": [(Login, AuthnRequest,
AuthnResponse_rnd_Response_assertion_inresponseto,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL11': {
"name": "Does the SP allow the InResponseTo attribute to be missing"
"from the Response element?",
"sequence": [(Login, AuthnRequest,
AuthnResponse_Response_no_inresponse,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL12': {
"name": "Does the SP allow the InResponseTo attribute to be missing"
"from the SubjectConfirmationData element?"
"(Test is questionable - review)", # TODO
"sequence": [(Login, AuthnRequest,
AuthnResponse_SubjectConfirmationData_no_inresponse,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL13': {
"name": "SP should not accept a broken DestinationURL attribute",
"sequence": [(Login, AuthnRequest,
AuthnResponse_broken_destination,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
# New untested
'FL14a': {
"name": "SP should not accept wrong Recipient attribute",
"sequence": [(Login, AuthnRequest,
AuthnResponse_broken_destination,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL14b': {
"name": "SP should not accept missing Recipient attribute",
"sequence": [(Login, AuthnRequest,
AuthnResponse_missing_Recipient,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL20': {
"name": "Accept a Response with a SubjectConfirmationData elements "
"with a correct @Address attribute",
"sequence": [(Login, AuthnRequest,
AuthnResponse_correct_recipient_address,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL21': {
"name": "Accept a Response with a SubjectConfirmationData elements "
"with a incorrect @Address attribute",
"sequence": [(Login, AuthnRequest,
AuthnResponse_incorrect_recipient_address,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL22': {
"name": "Accept a Response with two SubjectConfirmationData elements"
"representing two recipients (test 1 of 2, correct one last)",
"sequence": [(Login, AuthnRequest,
AuthnResponse_2_recipients_me_last,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL23': {
"name": "Accept a Response with two SubjectConfirmationData elements"
"representing two recipients (test 1 of 2, correct one last)",
"sequence": [(Login, AuthnRequest,
AuthnResponse_2_recipients_me_first,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL26': {
"name": "Reject an assertion containing an unknown Condition.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_unknown_condition,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL27': {
"name": "Reject a Response with a Condition with a NotBefore in the "
"future.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_future_NotBefore,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL28': {
"name": "Reject a Response with a Condition with a NotOnOrAfter in "
"the past.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_future_NotBefore,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL29': {
"name": "Reject a Response with a SubjectConfirmationData@NotOnOrAfter "
"in the past",
"sequence": [(Login, AuthnRequest,
AuthnResponse_past_SubjectConfirmationData_NotOnOrAfter,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL24': {
"name": "Reject a Response with a SubjectConfirmationData@NotBefore "
"in the future",
"sequence": [(Login, AuthnRequest,
AuthnResponse_future_SubjectConfirmationData_NotBefore,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL30': {
"name": "Reject a Response with an AuthnStatement where "
"SessionNotOnOrAfter is set in the past.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_past_AuthnStatement_SessionNotOnOrAfter,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL31': {
"name": "Reject a Response with an AuthnStatement missing",
"sequence": [(Login, AuthnRequest,
AuthnResponse_missing_AuthnStatement,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL32': {
"name": "Reject an IssueInstant far (24 hours) into the future",
"sequence": [(Login, AuthnRequest,
AuthnResponse_future_24h_IssueInstant,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL33': {
"name": "Reject an IssueInstant far (24 hours) into the past",
"sequence": [(Login, AuthnRequest,
AuthnResponse_past_24h_IssueInstant,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL34': {
"name": "Accept xs:datetime with millisecond precision "
"http://www.w3.org/TR/xmlschema-2/#dateTime",
"sequence": [(Login, AuthnRequest,
AuthnResponse_datetime_millisecond,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL36': {
"name": "Reject a Response with a Condition with a empty set of "
"Audience.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_AudienceRestriction_no_audience,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL37': {
"name": "Reject a Response with a Condition with a wrong Audience.",
"sequence": [(Login, AuthnRequest,
AuthnResponse_AudienceRestriction_wrong_audience,
check.ErrorResponse)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL38': {
"name": "Accept a Response with a Condition with an additional "
"Audience prepended",
"sequence": [(Login, AuthnRequest,
AuthnResponse_AudienceRestriction_prepended_audience,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
'FL39': {
"name": "Accept a Response with a Condition with an additional "
"Audience appended",
"sequence": [(Login, AuthnRequest,
AuthnResponse_AudienceRestriction_appended_audience,
None)],
"tests": {"pre": [], "mid": [], "post": []}
},
}
#
# SP should not accept a broken Recipient attribute in assertion
# SubjectConfirmationData/@Recipient
# SP should not accept a broken DestinationURL attribute in response
# SP should accept a Response with two SubjectConfirmationData elements
# representing two recipients (test 1 of 2, correct one last)
# SP should accept a Response with two SubjectConfirmationData elements
# representing two recipients (test 1 of 2, correct one first)
# SP should accept a Response with two SubjectConfirmation elements
# representing two recipients (test 1 of 2, correct one last)
# SP should accept a Response with two SubjectConfirmation elements
# representing two recipients (test 1 of 2, correct one first)
# SP should accept a Response with a SubjectConfirmationData elements with a
# correct @Address attribute
# SP should nnot accept a Response with a SubjectConfirmationData elements
# with a incorrect @Address attribute
# SP should accept a Response with multiple SubjectConfirmation elements
# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of
# 2, correct o last)
# SP should accept a Response with multiple SubjectConfirmationData elements
# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of
# 2, corr one last)
# SP should accept a Response with multiple SubjectConfirmationData elements
# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of
# 2, corr one first)
# SP Should not accept an assertion containing an uknown Condition
# SP should not accept a Response with a Condition with a NotBefore in the
# future.
# SP should not accept a Response with a Condition with a NotOnOrAfter in
# the past.
# SP should not accept a Response with a
# SubjectConfirmationData@NotOnOrAfter in the past
# SP should not accept a Response with a AuthnStatement where
# SessionNotOnOrAfter is set in the past
# SP should not accept a Response with a AuthnStatement missing
# SP should not accept an IssueInstant far (24 hours) into the future
# SP should not accept an IssueInstant far (24 hours) into the past
# SP should accept xs:datetime with millisecond precision http://www.w3
# .org/TR/xmlschema-2/#dateTime
# SP should accept xs:datetime with microsecond precision http://www.w3
# .org/TR/xmlschema-2/#dateTime
# SP should not accept a Response with a Condition with a empty set of
# Audience.
# SP should not accept a Response with a Condition with a wrong Audience.
# SP should accept a Response with a Condition with an addition Audience
# prepended.
# SP should accept a Response with a Condition with an addition Audience
# appended.
# SP should not accept multiple AudienceRestrictions where the intersection
# is zero. (test 1 of 2)
# SP should not accept multiple AudienceRestrictions where the intersection
# is zero. (test 2 of 2)
# SP should accept multiple AudienceRestrictions where the intersection
# includes the correct audience.
# SP should accept that only the Assertion is signed instead of the Response.
# SP should accept that both the Response and the Assertion is signed.
# Do SP work when RelayState information is lost?
# Do SP accept an unknown Extensions element in the Response?
# SP MUST not accept response when the saml-namespace is invalid
# SP MUST NOT re-use the same ID in subsequent requests.
# SP MUST NOT accept a replayed Response. An identical Response/Assertion
# used a second time. [Profiles]: 4.1.4.5 POST-Specific Processing Rules (
# test 1 of 2: s inresponseto)
# SP MUST NOT accept a replayed Response. An identical Response/Assertion
# used a second time. [Profiles]: 4.1.4.5 POST-Specific Processing Rules (
# test 2 of 2: unsolicited response)
# SP SHOULD find attributes in a second AttributeStatement, not only in the
# first.
# SP SHOULD NOT accept an signed assertion embedded in an AttributeValue
# inside an unsigned assertion.
# SP SHOULD NOT accept an signed assertion embedded in an AttributeValue
# inside an unsigned assertion. (Signature moved out...)
# SP SHOULD NOT accept an signed assertion, where the signature is referring
# to another assertion.
# SP SHOULD find attributes in a second Assertion/AttributeStatement,
# not only in one of them (test 1 of 2 - attributes in first).
# SP SHOULD find attributes in a second Assertion/AttributeStatement,
# not only in one of them (test 2 of 2 - attributes in last).
# SP SHOULD NOT accept attributes in unsigned 2nd assertion. (test 1 of 2)
# SP SHOULD NOT accept attributes in unsigned 2nd assertion. (test 2 of 2)
# SP SHOULD NOT accept authnstatement in unsigned 2nd assertion. (test 1 of 2)
# SP SHOULD NOT accept authnstatement in unsigned 2nd assertion. (test 2 of 2)
# Basic SP-initated Logout Test
# Basic IdP-initated Logout Test
# SP MUST NOT accept LogoutRequest when NameID content is wrong
# SP MUST NOT accept LogoutRequest when NameID@Format is wrong
# SP MUST NOT accept LogoutRequest when NameID@SPNameQualifier is wrong
# SP MUST NOT logout user when invalid SessionIndex is sent
# SP MUST NOT accept LogoutRequest when Issuer is wrong
# SP MUST NOT accept LogoutRequest when Destination is wrong
# SP MUST NOT accept unsigned LogoutRequest
# SP MUST accept LogoutRequest with sessionindex in a separate session,
# ot relying on the session-cookie.
# SP MUST accept an LogoutRequest with no sessionindex (sent in separate
# session, no session-cookies)
# SP MUST accept an LogoutRequest with two sesionindexes (first valid) (sent
# in separate session, no session-cookies)
# SP MUST accept an LogoutRequest with two sesionindexes (second valid) (
# sent in separate session, no session-cookies)
# Session fixtation check