commit 96abdd254b9b7d240ee51908a7454cc4c4bb0e92 Author: Roland Hedberg Date: Fri Nov 16 08:55:49 2012 +0100 Still prelim work diff --git a/script/__init__.py b/script/__init__.py new file mode 100644 index 0000000..3b031d2 --- /dev/null +++ b/script/__init__.py @@ -0,0 +1 @@ +__author__ = 'rolandh' diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6cab7b3 --- /dev/null +++ b/setup.py @@ -0,0 +1,40 @@ +#!/usr/bin/python +# +# Copyright (C) 2013 Umea Universitet, Sweden +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from setuptools import setup + +__author__ = 'rohe0002' + +setup( + name="oic", + version="0.3.0", + description="SAML2 test tool", + author = "Roland Hedberg", + author_email = "roland.hedberg@adm.umu.se", + license="Apache 2.0", + packages=["idp_test"], + package_dir = {"": "src"}, + classifiers = ["Development Status :: 4 - Beta", + "License :: OSI Approved :: Apache Software License", + "Topic :: Software Development :: Libraries :: Python Modules"], + install_requires = ["pysaml2", + "mechanize", + "argparse", + "beautifulsoup4"], + + zip_safe=False, + ) \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..3b031d2 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +__author__ = 'rolandh' diff --git a/src/idp_test/__init__.py b/src/idp_test/__init__.py new file mode 100644 index 0000000..83efde0 --- /dev/null +++ b/src/idp_test/__init__.py @@ -0,0 +1,289 @@ +import json +import argparse +import sys + +__author__ = 'rolandh' + +import traceback + +def exception_trace(tag, exc, log=None): + message = traceback.format_exception(*sys.exc_info()) + if log: + log.error("[%s] ExcList: %s" % (tag, "".join(message),)) + log.error("[%s] Exception: %s" % (tag, exc)) + else: + print >> sys.stderr, "[%s] ExcList: %s" % (tag, "".join(message),) + print >> sys.stderr, "[%s] Exception: %s" % (tag, exc) + +class SAML2(object): + client_args = ["client_id", "redirect_uris", "password"] + + def __init__(self, operations_mod, client_class, msgfactory): + self.operations_mod = operations_mod + self.client_class = client_class + self.client = None + #self.trace = Trace() + self.msgfactory = msgfactory + + self._parser = argparse.ArgumentParser() + self._parser.add_argument('-d', dest='debug', action='store_true', + help="Print debug information") + self._parser.add_argument('-v', dest='verbose', action='store_true', + help="Print runtime information") + self._parser.add_argument('-C', dest="ca_certs", + help="CA certs to use to verify HTTPS server certificates, if HTTPS is used and no server CA certs are defined then no cert verification is done") + self._parser.add_argument('-J', dest="json_config_file", + help="Script configuration") + self._parser.add_argument("-l", dest="list", action="store_true", + help="List all the test flows as a JSON object") + self._parser.add_argument("-H", dest="host", default="example.com", + help="Which host the script is running on, used to construct the key export URL") + self._parser.add_argument("flow", nargs="?", help="Which test flow to run") + + self.args = None + self.pinfo = None + self.sequences = [] + self.function_args = {} + self.signing_key = None + self.encryption_key = None + self.test_log = [] + self.environ = {} + self._pop = None + + def parse_args(self): + self.json_config= self.json_config_file() + + try: + self.features = self.json_config["features"] + except KeyError: + self.features = {} + + self.pinfo = self.provider_info() + self.client_conf(self.client_args) + + def json_config_file(self): + if self.args.json_config_file == "-": + return json.loads(sys.stdin.read()) + else: + return json.loads(open(self.args.json_config_file).read()) + + def test_summation(self, id): + status = 0 + for item in self.test_log: + if item["status"] > status: + status = item["status"] + + if status == 0: + status = 1 + + sum = { + "id": id, + "status": status, + "tests": self.test_log + } + + if status == 5: + sum["url"] = self.test_log[-1]["url"] + sum["htmlbody"] = self.test_log[-1]["message"] + + return sum + + def run(self): + self.args = self._parser.parse_args() + + if self.args.list: + return self.operations() + else: + if not self.args.flow: + raise Exception("Missing flow specification") + self.args.flow = self.args.flow.strip("'") + self.args.flow = self.args.flow.strip('"') + + flow_spec = self.operations_mod.FLOWS[self.args.flow] + try: + block = flow_spec["block"] + except KeyError: + block = {} + + self.parse_args() + _seq = self.make_sequence() + interact = self.get_interactions() + + try: + self.do_features(interact, _seq, block) + except Exception,exc: + exception_trace("do_features", exc) + _output = {"status": 4, + "tests": [{"status": 4, + "message":"Couldn't run testflow: %s" % exc, + "id": "verify_features", + "name": "Make sure you don't do things you shouldn't"}]} + #print >> sys.stdout, json.dumps(_output) + return + + tests = self.get_test() + self.client.state = "STATE0" + + self.environ.update({"provider_info": self.pinfo, + "client": self.client}) + + try: + except_exception = flow_spec["except_exception"] + except KeyError: + except_exception = False + + try: + if self.args.verbose: + print >> sys.stderr, "Set up done, running sequence" + testres, trace = run_sequence(self.client, _seq, self.trace, + interact, self.msgfactory, + self.environ, tests, + self.json_config["features"], + self.args.verbose, self.cconf, + except_exception) + self.test_log.extend(testres) + sum = self.test_summation(self.args.flow) + print >>sys.stdout, json.dumps(sum) + if sum["status"] > 1 or self.args.debug: + print >>sys.stderr, trace + except Exception, err: + #print >> sys.stderr, self.trace + print err + exception_trace("RUN", err) + + #if self._pop is not None: + # self._pop.terminate() + if "keyprovider" in self.environ and self.environ["keyprovider"]: + # os.kill(self.environ["keyprovider"].pid, signal.SIGTERM) + self.environ["keyprovider"].terminate() + + def operations(self): + lista = [] + for key,val in self.operations_mod.FLOWS.items(): + item = {"id": key, + "name": val["name"],} + try: + _desc = val["descr"] + if isinstance(_desc, basestring): + item["descr"] = _desc + else: + item["descr"] = "\n".join(_desc) + except KeyError: + pass + + for key in ["depends", "endpoints"]: + try: + item[key] = val[key] + except KeyError: + pass + + lista.append(item) + + print json.dumps(lista) + + def provider_info(self): + # Should provide a Metadata class + res = {} + _jc = self.json_config["provider"] + + # Backward compatible + if "endpoints" in _jc: + try: + for endp, url in _jc["endpoints"].items(): + res[endp] = url + except KeyError: + pass + + for key in ProviderConfigurationResponse.c_param.keys(): + try: + res[key] = _jc[key] + except KeyError: + pass + + return res + + def do_features(self, *args): + pass + + def export(self): + pass + + def client_conf(self, cprop): + if self.args.ca_certs: + self.client = self.client_class(ca_certs=self.args.ca_certs) + else: + try: + self.client = self.client_class( + ca_certs=self.json_config["ca_certs"]) + except (KeyError, TypeError): + self.client = self.client_class() + + #self.client.http_request = self.client.http.crequest + + # set the endpoints in the Client from the provider information + # If they are statically configured, if dynamic it happens elsewhere + for key, val in self.pinfo.items(): + if key.endswith("_endpoint"): + setattr(self.client, key, val) + + # Client configuration + self.cconf = self.json_config["client"] + # replace pattern with real value + _h = self.args.host + self.cconf["redirect_uris"] = [p % _h for p in self.cconf["redirect_uris"]] + + try: + self.client.client_prefs = self.cconf["preferences"] + except KeyError: + pass + + # set necessary information in the Client + for prop in cprop: + try: + setattr(self.client, prop, self.cconf[prop]) + except KeyError: + pass + + def make_sequence(self): + # Whatever is specified on the command line takes precedences + if self.args.flow: + sequence = flow2sequence(self.operations_mod, self.args.flow) + elif self.json_config and "flow" in self.json_config: + sequence = flow2sequence(self.operations_mod, + self.json_config["flow"]) + else: + sequence = None + + return sequence + + def get_interactions(self): + interactions = [] + + if self.json_config: + try: + interactions = self.json_config["interaction"] + except KeyError: + pass + + if self.args.interactions: + _int = self.args.interactions.replace("\'", '"') + if interactions: + interactions.update(json.loads(_int)) + else: + interactions = json.loads(_int) + + return interactions + + def get_test(self): + if self.args.flow: + flow = self.operations_mod.FLOWS[self.args.flow] + elif self.json_config and "flow" in self.json_config: + flow = self.operations_mod.FLOWS[self.json_config["flow"]] + else: + flow = None + + try: + return flow["tests"] + except KeyError: + return [] + diff --git a/src/idp_test/base.py b/src/idp_test/base.py new file mode 100644 index 0000000..0487cab --- /dev/null +++ b/src/idp_test/base.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python +from check import ExpectedError +from check import factory + +__author__ = 'rohe0002' + +import time +import cookielib + +from bs4 import BeautifulSoup + +class FatalError(Exception): + pass + +class Trace(object): + def __init__(self): + self.trace = [] + self.start = time.time() + + def request(self, msg): + delta = time.time() - self.start + self.trace.append("%f --> %s" % (delta, msg)) + + def reply(self, msg): + delta = time.time() - self.start + self.trace.append("%f <-- %s" % (delta, msg)) + + def info(self, msg): + delta = time.time() - self.start + self.trace.append("%f %s" % (delta, msg)) + + def error(self, msg): + delta = time.time() - self.start + self.trace.append("%f [ERROR] %s" % (delta, msg)) + + def warning(self, msg): + delta = time.time() - self.start + self.trace.append("%f [WARNING] %s" % (delta, msg)) + + def __str__(self): + return "\n". join([t.encode("utf-8") for t in self.trace]) + + def clear(self): + self.trace = [] + + def __getitem__(self, item): + return self.trace[item] + + def next(self): + for line in self.trace: + yield line + +def flow2sequence(operations, item): + flow = operations.FLOWS[item] + return [operations.PHASES[phase] for phase in flow["sequence"]] + +def endpoint(client, base): + for _endp in client._endpoints: + if getattr(client, _endp) == base: + return True + + return False + +def check_severity(stat): + if stat["status"] >= 4: + raise FatalError + + +def pick_interaction(interactions, _base="", content="", req=None): + unic = content + if content: + _bs = BeautifulSoup(content) + else: + _bs = None + + for interaction in interactions: + _match = 0 + for attr, val in interaction["matches"].items(): + if attr == "url": + if val == _base: + _match += 1 + elif attr == "title": + if _bs is None: + break + if _bs.title is None: + break + if val in _bs.title.contents: + _match += 1 + elif attr == "content": + if unic and val in unic: + _match += 1 + elif attr == "class": + if req and val == req: + _match += 1 + + if _match == len(interaction["matches"]): + return interaction + + raise KeyError("No interaction matched") + +ORDER = ["url", "response", "content"] + +def run_sequence(client, sequence, trace, interaction, msgfactory, + environ=None, tests=None, features=None, verbose=False, + cconf=None, except_exception=None): + item = [] + response = None + content = None + url = "" + test_output = [] + _keystore = client.keystore + features = features or {} + + cjar = {"browser": cookielib.CookieJar(), + "rp": cookielib.CookieJar(), + "service": cookielib.CookieJar()} + + environ["sequence"] = sequence + environ["cis"] = [] + environ["trace"] = trace + environ["responses"] = [] + + try: + for creq in sequence: + req = creq() + cfunc = getattr(client, "create_%s" % req.request) + if trace: + trace.info(70*"=") + + try: + _pretests = req.tests["pre"] + for test in _pretests: + chk = test() + stat = chk(environ, test_output) + check_severity(stat) + except KeyError: + pass + + try: + response = cfunc(**req.args) + + try: + for test in req.tests["post"]: + if isinstance(test, tuple): + test, kwargs = test + else: + kwargs = {} + chk = test(**kwargs) + stat = chk(environ, test_output) + check_severity(stat) + if isinstance(chk, ExpectedError): + item.append(stat["temp"]) + del stat["temp"] + url = None + break + except KeyError: + pass + + except FatalError: + raise + except Exception, err: + environ["exception"] = err + chk = factory("exception")() + chk(environ, test_output) + raise FatalError() + + if not response: + continue + + if response.status_code >= 400: + done = True + elif url: + done = False + else: + done = True + + while not done: + while response.status_code in [302, 301, 303]: + url = response.headers["location"] + + trace.reply("REDIRECT TO: %s" % url) + # If back to me + for_me = False + for redirect_uri in client.redirect_uris: + if url.startswith(redirect_uri): + # Back at the RP + environ["client"].cookiejar = cjar["rp"] + for_me=True + + if for_me: + done = True + break + else: + try: + part = do_request(client, url, "GET", trace=trace) + except Exception, err: + raise FatalError("%s" % err) + environ.update(dict(zip(ORDER, part))) + (url, response, content) = part + + check = factory("check-http-response")() + stat = check(environ, test_output) + check_severity(stat) + + if done: + break + + _base = url.split("?")[0] + + try: + _spec = pick_interaction(interaction, _base, content) + except KeyError: + if creq.method == "POST": + break + elif not req.request in ["AuthorizationRequest", + "OpenIDRequest"]: + break + else: + try: + _check = getattr(req, "interaction_check") + except AttributeError: + _check = None + + if _check: + chk = factory("interaction-check")() + chk(environ, test_output) + raise FatalError() + else: + chk = factory("interaction-needed")() + chk(environ, test_output) + raise FatalError() + + if len(_spec) > 2: + trace.info(">> %s <<" % _spec["page-type"]) + if _spec["page-type"] == "login": + environ["login"] = content + + _op = Operation(_spec["control"]) + + try: + part = _op(environ, trace, url, response, content, features) + environ.update(dict(zip(ORDER, part))) + (url, response, content) = part + + check = factory("check-http-response")() + stat = check(environ, test_output) + check_severity(stat) + except FatalError: + raise + except Exception, err: + environ["exception"] = err + chk = factory("exception")() + chk(environ, test_output) + raise FatalError + + # if done: + # break + + info = None + qresp = None + resp_type = resp.type + if response: + try: + ctype = response.headers["content-type"] + if ctype == "application/jwt": + resp_type = "jwt" + except (AttributeError, TypeError): + pass + + if response.status_code >= 400: + pass + elif not url: + if isinstance(content, Message): + qresp = content + elif response.status_code == 200: + info = content + elif resp.where == "url" or response.status_code == 302: + try: + info = response.headers["location"] + resp_type = "urlencoded" + except KeyError: + try: + _check = getattr(req, "interaction_check", None) + except AttributeError: + _check = None + + if _check: + chk = factory("interaction-check")() + chk(environ, test_output) + raise FatalError() + else: + chk = factory("missing-redirect")() + stat = chk(environ, test_output) + check_severity(stat) + else: + check = factory("check_content_type_header")() + stat = check(environ, test_output) + check_severity(stat) + info = content + + if info and resp.response: + if isinstance(resp.response, basestring): + response = msgfactory(resp.response) + else: + response = resp.response + + chk = factory("response-parse")() + environ["response_type"] = response.__name__ + environ["responses"].append((response, info)) + try: + qresp = client.parse_response(response, info, resp_type, + client.state, + keystore=_keystore, + client_id=client.client_id, + scope="openid") + if trace and qresp: + trace.info("[%s]: %s" % (qresp.type(), + qresp.to_dict())) + item.append(qresp) + environ["response_message"] = qresp + err = None + except Exception, err: + environ["exception"] = "%s" % err + qresp = None + if err and except_exception: + if isinstance(err, except_exception): + trace.info("Got expected exception: %s [%s]" % (err, + err.__class__.__name__)) + else: + raise + else: + stat = chk(environ, test_output) + check_severity(stat) + + if qresp: + try: + for test in resp.tests["post"]: + if isinstance(test, tuple): + test, kwargs = test + else: + kwargs = {} + chk = test(**kwargs) + stat = chk(environ, test_output) + check_severity(stat) + except KeyError: + pass + + resp(environ, qresp) + + if tests is not None: + environ["item"] = item + for test, args in tests: + if isinstance(test, basestring): + chk = factory(test)(**args) + else: + chk = test(**args) + try: + check_severity(chk(environ, test_output)) + except Exception, err: + raise FatalError("%s" % err) + + except FatalError: + pass + except Exception, err: + environ["exception"] = err + chk = factory("exception")() + chk(environ, test_output) + + return test_output, "%s" % trace + + +def run_sequences(client, sequences, trace, interaction, + verbose=False): + for sequence, endpoints, fid in sequences: + # clear cookie cache + client.grant.clear() + try: + client.http.cookiejar.clear() + except AttributeError: + pass + + err = run_sequence(client, sequence, trace, interaction, verbose) + + if err: + print "%s - FAIL" % fid + print + if not verbose: + print trace + else: + print "%s - OK" % fid + + trace.clear() diff --git a/src/idp_test/check.py b/src/idp_test/check.py new file mode 100644 index 0000000..e8678ef --- /dev/null +++ b/src/idp_test/check.py @@ -0,0 +1,118 @@ +import inspect +import sys + +__author__ = 'rolandh' + +INFORMATION = 0 +OK = 1 +WARNING = 2 +ERROR = 3 +CRITICAL = 4 +INTERACTION = 5 + +STATUSCODE = ["INFORMATION", "OK", "WARNING", "ERROR", "CRITICAL", + "INTERACTION"] + +class Check(): + """ General test + """ + id = "check" + msg = "OK" + + def __init__(self, **kwargs): + self._status = OK + self._message = "" + self.content = None + self.url = "" + self._kwargs = kwargs + + def _func(self, environ): + return {} + + def __call__(self, environ=None, output=None): + _stat = self.response(**self._func(environ)) + output.append(_stat) + return _stat + + def response(self, **kwargs): + try: + name = " ".join([s.strip() for s in self.__doc__.strip().split("\n")]) + except AttributeError: + name = "" + + res = { + "id": self.id, + "status": self._status, + "name": name + } + + if self._message: + res["message"] = self._message + + if kwargs: + res.update(kwargs) + + return res + +class ExpectedError(Check): + pass + +class CriticalError(Check): + status = CRITICAL + +class Error(Check): + status = ERROR + +class Other(CriticalError): + """ Other error """ + msg = "Other error" + +class CheckHTTPResponse(CriticalError): + """ + Checks that the HTTP response status is within the 200 or 300 range + """ + id = "check-http-response" + msg = "IdP error" + + def _func(self, environ): + _response = environ["response"] + _content = environ["content"] + + res = {} + if _response.status_code >= 400 : + self._status = self.status + self._message = self.msg +# if CONT_JSON in _response.headers["content-type"]: +# try: +# err = ErrorResponse().deserialize(_content, "json") +# self._message = err.to_json() +# except Exception: +# res["content"] = _content +# else: +# res["content"] = _content + res["url"] = environ["url"] + res["http_status"] = _response.status_code + else: + # might still be an error message + try: +# err = ErrorResponse().deserialize(_content, "json") +# err.verify() +# self._message = err.to_json() + self._status = self.status + except Exception: + pass + + res["url"] = environ["url"] + + return res + +def factory(id): + for name, obj in inspect.getmembers(sys.modules[__name__]): + if inspect.isclass(obj): + try: + if obj.id == id: + return obj + except AttributeError: + pass + + return None diff --git a/src/idp_test/operations.py b/src/idp_test/operations.py new file mode 100644 index 0000000..6cbf975 --- /dev/null +++ b/src/idp_test/operations.py @@ -0,0 +1,95 @@ +from check import CheckHTTPResponse + +__author__ = 'rolandh' + +class Request(): + request = "" + method = "" + lax = False + _request_args= {} + kw_args = {} + tests = {"post": [CheckHTTPResponse], "pre":[]} + + def __init__(self, cconf=None): + self.cconf = cconf + self.request_args = self._request_args.copy() + + #noinspection PyUnusedLocal + def __call__(self, environ, trace, location, response, content, features): + _client = environ["client"] + try: + kwargs = self.kw_args.copy() + except KeyError: + kwargs = {} + + func = getattr(_client, "do_%s" % self.request) + + ht_add = None + + if "authn_method" in kwargs: + h_arg = _client.init_authentication_method(cis, **kwargs) + else: + h_arg = None + + url, body, ht_args, cis = _client.uri_and_body(request, cis, + method=self.method, + request_args=_req) + + environ["cis"].append(cis) + if h_arg: + ht_args.update(h_arg) + if ht_add: + ht_args.update({"headers": ht_add}) + + if trace: + try: + oro = unpack(cis["request"])[1] + trace.request("OpenID Request Object: %s" % oro) + except KeyError: + pass + trace.request("URL: %s" % url) + trace.request("BODY: %s" % body) + try: + trace.request("HEADERS: %s" % ht_args["headers"]) + except KeyError: + pass + + response = _client.http_request(url, method=self.method, data=body, + **ht_args) + + if trace: + trace.reply("RESPONSE: %s" % response) + trace.reply("CONTENT: %s" % response.text) + if response.status_code in [301, 302]: + trace.reply("LOCATION: %s" % response.headers["location"]) + trace.reply("COOKIES: %s" % response.cookies) + # try: + # trace.reply("HeaderCookies: %s" % response.headers["set-cookie"]) + # except KeyError: + # pass + + return url, response, response.text + + def update(self, dic): + _tmp = {"request": self.request_args.copy(), "kw": self.kw_args} + for key, val in self.rec_update(_tmp, dic).items(): + setattr(self, "%s_args" % key, val) + + def rec_update(self, dic0, dic1): + res = {} + for key, val in dic0.items(): + if key not in dic1: + res[key] = val + else: + if isinstance(val, dict): + res[key] = self.rec_update(val, dic1[key]) + else: + res[key] = dic1[key] + + for key, val in dic1.items(): + if key in dic0: + continue + else: + res[key] = val + + return res diff --git a/src/idp_test/tmp.py b/src/idp_test/tmp.py new file mode 100644 index 0000000..bec82e8 --- /dev/null +++ b/src/idp_test/tmp.py @@ -0,0 +1,226 @@ +from oic.oauth2 import rndstr +from saml2 import config, NAMEID_FORMAT_EMAILADDRESS +from saml2 import samlp +from saml2 import BINDING_HTTP_POST +from saml2 import VERSION + +from saml2.client import Saml2Client +from saml2.time_util import instant + +__author__ = 'rolandh' + +try: + from xmlsec_location import xmlsec_path +except ImportError: + xmlsec_path = '/opt/local/bin/xmlsec1' + +cnf_dict = { + "entityid" : "urn:mace:example.com:saml:roland:sp", + "name" : "urn:mace:example.com:saml:roland:sp", + "description": "Test SP", + "service": { + "sp": { + "endpoints":{ + "assertion_consumer_service": ["http://lingon.catalogix.se:8087/"], + }, + "required_attributes": ["surName", "givenName", "mail"], + "optional_attributes": ["title"], + "idp": ["urn:mace:example.com:saml:roland:idp"], + } + }, + "key_file" : "test.key", + "cert_file" : "test.pem", + "ca_certs": "cacerts.txt", + "xmlsec_binary" : xmlsec_path, + "metadata": { + "local": ["idp.xml"], + }, + "subject_data": "subject_data.db", + "accepted_time_diff": 60, + "attribute_map_dir" : "attributemaps", +} + + +conf = config.SPConfig() +conf.load(cnf_dict) +client = Saml2Client(conf) + +binding= BINDING_HTTP_POST +query_id = rndstr() +service_url = "https://example.com" + +authn_request = { + #===== AuthRequest ===== + "subject":{ + "base_id":{ + "name_qualifier":None, + "sp_name_qualifier":None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "name_id":{ + "name_qualifier":None, + "sp_name_qualifier":None, + "format":None, + "sp_provided_id": None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "encrypted_id":{ + "encrypted_data":None, + "encrypted_key":None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "subject_confirmation":[{ + "base_id":{ + "name_qualifier":None, + "sp_name_qualifier":None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "name_id":{ + "name_qualifier":None, + "sp_name_qualifier":None, + "format":None, + "sp_provided_id": None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "encrypted_id":{ + "encrypted_data":None, + "encrypted_key":None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "subject_confirmation_data":{ + "not_before":None, + "not_on_or_after":None, + "recipient":None, + "in_response_to":None, + "address":None, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + "text":None, + "extension_elements":None, + "extension_attributes":None, + }], + "text":None, + "extension_elements":None, + "extension_attributes":None, + }, + #NameIDPolicy + "name_id_policy":{ + "format":NAMEID_FORMAT_EMAILADDRESS, + # NAMEID_FORMAT_EMAILADDRESS = ( + # "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress") + # NAMEID_FORMAT_UNSPECIFIED = ( + # "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified") + # NAMEID_FORMAT_ENCRYPTED = ( + # "urn:oasis:names:tc:SAML:2.0:nameid-format:encrypted") + # NAMEID_FORMAT_PERSISTENT = ( + # "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent") + # NAMEID_FORMAT_TRANSIENT = ( + # "urn:oasis:names:tc:SAML:2.0:nameid-format:transient") + # NAMEID_FORMAT_ENTITY = ( + # "urn:oasis:names:tc:SAML:2.0:nameid-format:entity") + + "sp_name_qualifier":None, + "allow_create":None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + #saml.Conditions + "conditions":{ + #Condition + "condition":[{}], + #AudienceRestriction + "audience_restriction":[{}], + #OneTimeUse + "one_time_use":[{}], + #ProxyRestriction + "proxy_restriction":[{}], + #not_before=None, + #not_on_or_after=None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + #RequestedAuthnContext + "requested_authn_context":{ + #saml.AuthnContextClassRef + "authn_context_class_ref":None, + #saml.AuthnContextDeclRef + "authn_context_decl_ref":None, + #AuthnContextComparisonType_ + "comparison":None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + #Scoping + "scoping":{ + #IDPList + "idp_list":{ + #IDPEntry + "idp_entry":{ + "provider_id":None, + "name":None, + "loc":None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + #GetComplete + "get_complete":{}, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + #RequesterID + "requester_id":{}, + #proxy_count=None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + }, + "force_authn":None, + "is_passive":None, + "protocol_binding":None, + "assertion_consumer_service_index":None, + "assertion_consumer_service_url":None, + "attribute_consuming_service_index":None, + "provider_name":None, + #saml.Issuer + "issuer":{}, + #ds.Signature + "signature":{}, + #Extensions + "extensions":{}, + "id":None, + "version":None, + "issue_instant":None, + "destination":None, + "consent":None, + #text=None, + #extension_elements=None, + #extension_attributes=None, + +} + +request = samlp.AuthnRequest( + id= query_id, + version= VERSION, + issue_instant= instant(), + assertion_consumer_service_url= service_url, + protocol_binding= binding +)