Still prelim work

This commit is contained in:
Roland Hedberg
2012-11-16 08:55:49 +01:00
commit 96abdd254b
8 changed files with 1162 additions and 0 deletions

1
script/__init__.py Normal file
View File

@@ -0,0 +1 @@
__author__ = 'rolandh'

40
setup.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/python
#
# Copyright (C) 2013 Umea Universitet, Sweden
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from setuptools import setup
__author__ = 'rohe0002'
setup(
name="oic",
version="0.3.0",
description="SAML2 test tool",
author = "Roland Hedberg",
author_email = "roland.hedberg@adm.umu.se",
license="Apache 2.0",
packages=["idp_test"],
package_dir = {"": "src"},
classifiers = ["Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
"Topic :: Software Development :: Libraries :: Python Modules"],
install_requires = ["pysaml2",
"mechanize",
"argparse",
"beautifulsoup4"],
zip_safe=False,
)

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
__author__ = 'rolandh'

289
src/idp_test/__init__.py Normal file
View File

@@ -0,0 +1,289 @@
import json
import argparse
import sys
__author__ = 'rolandh'
import traceback
def exception_trace(tag, exc, log=None):
message = traceback.format_exception(*sys.exc_info())
if log:
log.error("[%s] ExcList: %s" % (tag, "".join(message),))
log.error("[%s] Exception: %s" % (tag, exc))
else:
print >> sys.stderr, "[%s] ExcList: %s" % (tag, "".join(message),)
print >> sys.stderr, "[%s] Exception: %s" % (tag, exc)
class SAML2(object):
client_args = ["client_id", "redirect_uris", "password"]
def __init__(self, operations_mod, client_class, msgfactory):
self.operations_mod = operations_mod
self.client_class = client_class
self.client = None
#self.trace = Trace()
self.msgfactory = msgfactory
self._parser = argparse.ArgumentParser()
self._parser.add_argument('-d', dest='debug', action='store_true',
help="Print debug information")
self._parser.add_argument('-v', dest='verbose', action='store_true',
help="Print runtime information")
self._parser.add_argument('-C', dest="ca_certs",
help="CA certs to use to verify HTTPS server certificates, if HTTPS is used and no server CA certs are defined then no cert verification is done")
self._parser.add_argument('-J', dest="json_config_file",
help="Script configuration")
self._parser.add_argument("-l", dest="list", action="store_true",
help="List all the test flows as a JSON object")
self._parser.add_argument("-H", dest="host", default="example.com",
help="Which host the script is running on, used to construct the key export URL")
self._parser.add_argument("flow", nargs="?", help="Which test flow to run")
self.args = None
self.pinfo = None
self.sequences = []
self.function_args = {}
self.signing_key = None
self.encryption_key = None
self.test_log = []
self.environ = {}
self._pop = None
def parse_args(self):
self.json_config= self.json_config_file()
try:
self.features = self.json_config["features"]
except KeyError:
self.features = {}
self.pinfo = self.provider_info()
self.client_conf(self.client_args)
def json_config_file(self):
if self.args.json_config_file == "-":
return json.loads(sys.stdin.read())
else:
return json.loads(open(self.args.json_config_file).read())
def test_summation(self, id):
status = 0
for item in self.test_log:
if item["status"] > status:
status = item["status"]
if status == 0:
status = 1
sum = {
"id": id,
"status": status,
"tests": self.test_log
}
if status == 5:
sum["url"] = self.test_log[-1]["url"]
sum["htmlbody"] = self.test_log[-1]["message"]
return sum
def run(self):
self.args = self._parser.parse_args()
if self.args.list:
return self.operations()
else:
if not self.args.flow:
raise Exception("Missing flow specification")
self.args.flow = self.args.flow.strip("'")
self.args.flow = self.args.flow.strip('"')
flow_spec = self.operations_mod.FLOWS[self.args.flow]
try:
block = flow_spec["block"]
except KeyError:
block = {}
self.parse_args()
_seq = self.make_sequence()
interact = self.get_interactions()
try:
self.do_features(interact, _seq, block)
except Exception,exc:
exception_trace("do_features", exc)
_output = {"status": 4,
"tests": [{"status": 4,
"message":"Couldn't run testflow: %s" % exc,
"id": "verify_features",
"name": "Make sure you don't do things you shouldn't"}]}
#print >> sys.stdout, json.dumps(_output)
return
tests = self.get_test()
self.client.state = "STATE0"
self.environ.update({"provider_info": self.pinfo,
"client": self.client})
try:
except_exception = flow_spec["except_exception"]
except KeyError:
except_exception = False
try:
if self.args.verbose:
print >> sys.stderr, "Set up done, running sequence"
testres, trace = run_sequence(self.client, _seq, self.trace,
interact, self.msgfactory,
self.environ, tests,
self.json_config["features"],
self.args.verbose, self.cconf,
except_exception)
self.test_log.extend(testres)
sum = self.test_summation(self.args.flow)
print >>sys.stdout, json.dumps(sum)
if sum["status"] > 1 or self.args.debug:
print >>sys.stderr, trace
except Exception, err:
#print >> sys.stderr, self.trace
print err
exception_trace("RUN", err)
#if self._pop is not None:
# self._pop.terminate()
if "keyprovider" in self.environ and self.environ["keyprovider"]:
# os.kill(self.environ["keyprovider"].pid, signal.SIGTERM)
self.environ["keyprovider"].terminate()
def operations(self):
lista = []
for key,val in self.operations_mod.FLOWS.items():
item = {"id": key,
"name": val["name"],}
try:
_desc = val["descr"]
if isinstance(_desc, basestring):
item["descr"] = _desc
else:
item["descr"] = "\n".join(_desc)
except KeyError:
pass
for key in ["depends", "endpoints"]:
try:
item[key] = val[key]
except KeyError:
pass
lista.append(item)
print json.dumps(lista)
def provider_info(self):
# Should provide a Metadata class
res = {}
_jc = self.json_config["provider"]
# Backward compatible
if "endpoints" in _jc:
try:
for endp, url in _jc["endpoints"].items():
res[endp] = url
except KeyError:
pass
for key in ProviderConfigurationResponse.c_param.keys():
try:
res[key] = _jc[key]
except KeyError:
pass
return res
def do_features(self, *args):
pass
def export(self):
pass
def client_conf(self, cprop):
if self.args.ca_certs:
self.client = self.client_class(ca_certs=self.args.ca_certs)
else:
try:
self.client = self.client_class(
ca_certs=self.json_config["ca_certs"])
except (KeyError, TypeError):
self.client = self.client_class()
#self.client.http_request = self.client.http.crequest
# set the endpoints in the Client from the provider information
# If they are statically configured, if dynamic it happens elsewhere
for key, val in self.pinfo.items():
if key.endswith("_endpoint"):
setattr(self.client, key, val)
# Client configuration
self.cconf = self.json_config["client"]
# replace pattern with real value
_h = self.args.host
self.cconf["redirect_uris"] = [p % _h for p in self.cconf["redirect_uris"]]
try:
self.client.client_prefs = self.cconf["preferences"]
except KeyError:
pass
# set necessary information in the Client
for prop in cprop:
try:
setattr(self.client, prop, self.cconf[prop])
except KeyError:
pass
def make_sequence(self):
# Whatever is specified on the command line takes precedences
if self.args.flow:
sequence = flow2sequence(self.operations_mod, self.args.flow)
elif self.json_config and "flow" in self.json_config:
sequence = flow2sequence(self.operations_mod,
self.json_config["flow"])
else:
sequence = None
return sequence
def get_interactions(self):
interactions = []
if self.json_config:
try:
interactions = self.json_config["interaction"]
except KeyError:
pass
if self.args.interactions:
_int = self.args.interactions.replace("\'", '"')
if interactions:
interactions.update(json.loads(_int))
else:
interactions = json.loads(_int)
return interactions
def get_test(self):
if self.args.flow:
flow = self.operations_mod.FLOWS[self.args.flow]
elif self.json_config and "flow" in self.json_config:
flow = self.operations_mod.FLOWS[self.json_config["flow"]]
else:
flow = None
try:
return flow["tests"]
except KeyError:
return []

392
src/idp_test/base.py Normal file
View File

@@ -0,0 +1,392 @@
#!/usr/bin/env python
from check import ExpectedError
from check import factory
__author__ = 'rohe0002'
import time
import cookielib
from bs4 import BeautifulSoup
class FatalError(Exception):
pass
class Trace(object):
def __init__(self):
self.trace = []
self.start = time.time()
def request(self, msg):
delta = time.time() - self.start
self.trace.append("%f --> %s" % (delta, msg))
def reply(self, msg):
delta = time.time() - self.start
self.trace.append("%f <-- %s" % (delta, msg))
def info(self, msg):
delta = time.time() - self.start
self.trace.append("%f %s" % (delta, msg))
def error(self, msg):
delta = time.time() - self.start
self.trace.append("%f [ERROR] %s" % (delta, msg))
def warning(self, msg):
delta = time.time() - self.start
self.trace.append("%f [WARNING] %s" % (delta, msg))
def __str__(self):
return "\n". join([t.encode("utf-8") for t in self.trace])
def clear(self):
self.trace = []
def __getitem__(self, item):
return self.trace[item]
def next(self):
for line in self.trace:
yield line
def flow2sequence(operations, item):
flow = operations.FLOWS[item]
return [operations.PHASES[phase] for phase in flow["sequence"]]
def endpoint(client, base):
for _endp in client._endpoints:
if getattr(client, _endp) == base:
return True
return False
def check_severity(stat):
if stat["status"] >= 4:
raise FatalError
def pick_interaction(interactions, _base="", content="", req=None):
unic = content
if content:
_bs = BeautifulSoup(content)
else:
_bs = None
for interaction in interactions:
_match = 0
for attr, val in interaction["matches"].items():
if attr == "url":
if val == _base:
_match += 1
elif attr == "title":
if _bs is None:
break
if _bs.title is None:
break
if val in _bs.title.contents:
_match += 1
elif attr == "content":
if unic and val in unic:
_match += 1
elif attr == "class":
if req and val == req:
_match += 1
if _match == len(interaction["matches"]):
return interaction
raise KeyError("No interaction matched")
ORDER = ["url", "response", "content"]
def run_sequence(client, sequence, trace, interaction, msgfactory,
environ=None, tests=None, features=None, verbose=False,
cconf=None, except_exception=None):
item = []
response = None
content = None
url = ""
test_output = []
_keystore = client.keystore
features = features or {}
cjar = {"browser": cookielib.CookieJar(),
"rp": cookielib.CookieJar(),
"service": cookielib.CookieJar()}
environ["sequence"] = sequence
environ["cis"] = []
environ["trace"] = trace
environ["responses"] = []
try:
for creq in sequence:
req = creq()
cfunc = getattr(client, "create_%s" % req.request)
if trace:
trace.info(70*"=")
try:
_pretests = req.tests["pre"]
for test in _pretests:
chk = test()
stat = chk(environ, test_output)
check_severity(stat)
except KeyError:
pass
try:
response = cfunc(**req.args)
try:
for test in req.tests["post"]:
if isinstance(test, tuple):
test, kwargs = test
else:
kwargs = {}
chk = test(**kwargs)
stat = chk(environ, test_output)
check_severity(stat)
if isinstance(chk, ExpectedError):
item.append(stat["temp"])
del stat["temp"]
url = None
break
except KeyError:
pass
except FatalError:
raise
except Exception, err:
environ["exception"] = err
chk = factory("exception")()
chk(environ, test_output)
raise FatalError()
if not response:
continue
if response.status_code >= 400:
done = True
elif url:
done = False
else:
done = True
while not done:
while response.status_code in [302, 301, 303]:
url = response.headers["location"]
trace.reply("REDIRECT TO: %s" % url)
# If back to me
for_me = False
for redirect_uri in client.redirect_uris:
if url.startswith(redirect_uri):
# Back at the RP
environ["client"].cookiejar = cjar["rp"]
for_me=True
if for_me:
done = True
break
else:
try:
part = do_request(client, url, "GET", trace=trace)
except Exception, err:
raise FatalError("%s" % err)
environ.update(dict(zip(ORDER, part)))
(url, response, content) = part
check = factory("check-http-response")()
stat = check(environ, test_output)
check_severity(stat)
if done:
break
_base = url.split("?")[0]
try:
_spec = pick_interaction(interaction, _base, content)
except KeyError:
if creq.method == "POST":
break
elif not req.request in ["AuthorizationRequest",
"OpenIDRequest"]:
break
else:
try:
_check = getattr(req, "interaction_check")
except AttributeError:
_check = None
if _check:
chk = factory("interaction-check")()
chk(environ, test_output)
raise FatalError()
else:
chk = factory("interaction-needed")()
chk(environ, test_output)
raise FatalError()
if len(_spec) > 2:
trace.info(">> %s <<" % _spec["page-type"])
if _spec["page-type"] == "login":
environ["login"] = content
_op = Operation(_spec["control"])
try:
part = _op(environ, trace, url, response, content, features)
environ.update(dict(zip(ORDER, part)))
(url, response, content) = part
check = factory("check-http-response")()
stat = check(environ, test_output)
check_severity(stat)
except FatalError:
raise
except Exception, err:
environ["exception"] = err
chk = factory("exception")()
chk(environ, test_output)
raise FatalError
# if done:
# break
info = None
qresp = None
resp_type = resp.type
if response:
try:
ctype = response.headers["content-type"]
if ctype == "application/jwt":
resp_type = "jwt"
except (AttributeError, TypeError):
pass
if response.status_code >= 400:
pass
elif not url:
if isinstance(content, Message):
qresp = content
elif response.status_code == 200:
info = content
elif resp.where == "url" or response.status_code == 302:
try:
info = response.headers["location"]
resp_type = "urlencoded"
except KeyError:
try:
_check = getattr(req, "interaction_check", None)
except AttributeError:
_check = None
if _check:
chk = factory("interaction-check")()
chk(environ, test_output)
raise FatalError()
else:
chk = factory("missing-redirect")()
stat = chk(environ, test_output)
check_severity(stat)
else:
check = factory("check_content_type_header")()
stat = check(environ, test_output)
check_severity(stat)
info = content
if info and resp.response:
if isinstance(resp.response, basestring):
response = msgfactory(resp.response)
else:
response = resp.response
chk = factory("response-parse")()
environ["response_type"] = response.__name__
environ["responses"].append((response, info))
try:
qresp = client.parse_response(response, info, resp_type,
client.state,
keystore=_keystore,
client_id=client.client_id,
scope="openid")
if trace and qresp:
trace.info("[%s]: %s" % (qresp.type(),
qresp.to_dict()))
item.append(qresp)
environ["response_message"] = qresp
err = None
except Exception, err:
environ["exception"] = "%s" % err
qresp = None
if err and except_exception:
if isinstance(err, except_exception):
trace.info("Got expected exception: %s [%s]" % (err,
err.__class__.__name__))
else:
raise
else:
stat = chk(environ, test_output)
check_severity(stat)
if qresp:
try:
for test in resp.tests["post"]:
if isinstance(test, tuple):
test, kwargs = test
else:
kwargs = {}
chk = test(**kwargs)
stat = chk(environ, test_output)
check_severity(stat)
except KeyError:
pass
resp(environ, qresp)
if tests is not None:
environ["item"] = item
for test, args in tests:
if isinstance(test, basestring):
chk = factory(test)(**args)
else:
chk = test(**args)
try:
check_severity(chk(environ, test_output))
except Exception, err:
raise FatalError("%s" % err)
except FatalError:
pass
except Exception, err:
environ["exception"] = err
chk = factory("exception")()
chk(environ, test_output)
return test_output, "%s" % trace
def run_sequences(client, sequences, trace, interaction,
verbose=False):
for sequence, endpoints, fid in sequences:
# clear cookie cache
client.grant.clear()
try:
client.http.cookiejar.clear()
except AttributeError:
pass
err = run_sequence(client, sequence, trace, interaction, verbose)
if err:
print "%s - FAIL" % fid
print
if not verbose:
print trace
else:
print "%s - OK" % fid
trace.clear()

118
src/idp_test/check.py Normal file
View File

@@ -0,0 +1,118 @@
import inspect
import sys
__author__ = 'rolandh'
INFORMATION = 0
OK = 1
WARNING = 2
ERROR = 3
CRITICAL = 4
INTERACTION = 5
STATUSCODE = ["INFORMATION", "OK", "WARNING", "ERROR", "CRITICAL",
"INTERACTION"]
class Check():
""" General test
"""
id = "check"
msg = "OK"
def __init__(self, **kwargs):
self._status = OK
self._message = ""
self.content = None
self.url = ""
self._kwargs = kwargs
def _func(self, environ):
return {}
def __call__(self, environ=None, output=None):
_stat = self.response(**self._func(environ))
output.append(_stat)
return _stat
def response(self, **kwargs):
try:
name = " ".join([s.strip() for s in self.__doc__.strip().split("\n")])
except AttributeError:
name = ""
res = {
"id": self.id,
"status": self._status,
"name": name
}
if self._message:
res["message"] = self._message
if kwargs:
res.update(kwargs)
return res
class ExpectedError(Check):
pass
class CriticalError(Check):
status = CRITICAL
class Error(Check):
status = ERROR
class Other(CriticalError):
""" Other error """
msg = "Other error"
class CheckHTTPResponse(CriticalError):
"""
Checks that the HTTP response status is within the 200 or 300 range
"""
id = "check-http-response"
msg = "IdP error"
def _func(self, environ):
_response = environ["response"]
_content = environ["content"]
res = {}
if _response.status_code >= 400 :
self._status = self.status
self._message = self.msg
# if CONT_JSON in _response.headers["content-type"]:
# try:
# err = ErrorResponse().deserialize(_content, "json")
# self._message = err.to_json()
# except Exception:
# res["content"] = _content
# else:
# res["content"] = _content
res["url"] = environ["url"]
res["http_status"] = _response.status_code
else:
# might still be an error message
try:
# err = ErrorResponse().deserialize(_content, "json")
# err.verify()
# self._message = err.to_json()
self._status = self.status
except Exception:
pass
res["url"] = environ["url"]
return res
def factory(id):
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj):
try:
if obj.id == id:
return obj
except AttributeError:
pass
return None

View File

@@ -0,0 +1,95 @@
from check import CheckHTTPResponse
__author__ = 'rolandh'
class Request():
request = ""
method = ""
lax = False
_request_args= {}
kw_args = {}
tests = {"post": [CheckHTTPResponse], "pre":[]}
def __init__(self, cconf=None):
self.cconf = cconf
self.request_args = self._request_args.copy()
#noinspection PyUnusedLocal
def __call__(self, environ, trace, location, response, content, features):
_client = environ["client"]
try:
kwargs = self.kw_args.copy()
except KeyError:
kwargs = {}
func = getattr(_client, "do_%s" % self.request)
ht_add = None
if "authn_method" in kwargs:
h_arg = _client.init_authentication_method(cis, **kwargs)
else:
h_arg = None
url, body, ht_args, cis = _client.uri_and_body(request, cis,
method=self.method,
request_args=_req)
environ["cis"].append(cis)
if h_arg:
ht_args.update(h_arg)
if ht_add:
ht_args.update({"headers": ht_add})
if trace:
try:
oro = unpack(cis["request"])[1]
trace.request("OpenID Request Object: %s" % oro)
except KeyError:
pass
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
try:
trace.request("HEADERS: %s" % ht_args["headers"])
except KeyError:
pass
response = _client.http_request(url, method=self.method, data=body,
**ht_args)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % response.text)
if response.status_code in [301, 302]:
trace.reply("LOCATION: %s" % response.headers["location"])
trace.reply("COOKIES: %s" % response.cookies)
# try:
# trace.reply("HeaderCookies: %s" % response.headers["set-cookie"])
# except KeyError:
# pass
return url, response, response.text
def update(self, dic):
_tmp = {"request": self.request_args.copy(), "kw": self.kw_args}
for key, val in self.rec_update(_tmp, dic).items():
setattr(self, "%s_args" % key, val)
def rec_update(self, dic0, dic1):
res = {}
for key, val in dic0.items():
if key not in dic1:
res[key] = val
else:
if isinstance(val, dict):
res[key] = self.rec_update(val, dic1[key])
else:
res[key] = dic1[key]
for key, val in dic1.items():
if key in dic0:
continue
else:
res[key] = val
return res

226
src/idp_test/tmp.py Normal file
View File

@@ -0,0 +1,226 @@
from oic.oauth2 import rndstr
from saml2 import config, NAMEID_FORMAT_EMAILADDRESS
from saml2 import samlp
from saml2 import BINDING_HTTP_POST
from saml2 import VERSION
from saml2.client import Saml2Client
from saml2.time_util import instant
__author__ = 'rolandh'
try:
from xmlsec_location import xmlsec_path
except ImportError:
xmlsec_path = '/opt/local/bin/xmlsec1'
cnf_dict = {
"entityid" : "urn:mace:example.com:saml:roland:sp",
"name" : "urn:mace:example.com:saml:roland:sp",
"description": "Test SP",
"service": {
"sp": {
"endpoints":{
"assertion_consumer_service": ["http://lingon.catalogix.se:8087/"],
},
"required_attributes": ["surName", "givenName", "mail"],
"optional_attributes": ["title"],
"idp": ["urn:mace:example.com:saml:roland:idp"],
}
},
"key_file" : "test.key",
"cert_file" : "test.pem",
"ca_certs": "cacerts.txt",
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["idp.xml"],
},
"subject_data": "subject_data.db",
"accepted_time_diff": 60,
"attribute_map_dir" : "attributemaps",
}
conf = config.SPConfig()
conf.load(cnf_dict)
client = Saml2Client(conf)
binding= BINDING_HTTP_POST
query_id = rndstr()
service_url = "https://example.com"
authn_request = {
#===== AuthRequest =====
"subject":{
"base_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"name_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"format":None,
"sp_provided_id": None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"encrypted_id":{
"encrypted_data":None,
"encrypted_key":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"subject_confirmation":[{
"base_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"name_id":{
"name_qualifier":None,
"sp_name_qualifier":None,
"format":None,
"sp_provided_id": None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"encrypted_id":{
"encrypted_data":None,
"encrypted_key":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"subject_confirmation_data":{
"not_before":None,
"not_on_or_after":None,
"recipient":None,
"in_response_to":None,
"address":None,
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
"text":None,
"extension_elements":None,
"extension_attributes":None,
}],
"text":None,
"extension_elements":None,
"extension_attributes":None,
},
#NameIDPolicy
"name_id_policy":{
"format":NAMEID_FORMAT_EMAILADDRESS,
# NAMEID_FORMAT_EMAILADDRESS = (
# "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress")
# NAMEID_FORMAT_UNSPECIFIED = (
# "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified")
# NAMEID_FORMAT_ENCRYPTED = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:encrypted")
# NAMEID_FORMAT_PERSISTENT = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent")
# NAMEID_FORMAT_TRANSIENT = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:transient")
# NAMEID_FORMAT_ENTITY = (
# "urn:oasis:names:tc:SAML:2.0:nameid-format:entity")
"sp_name_qualifier":None,
"allow_create":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#saml.Conditions
"conditions":{
#Condition
"condition":[{}],
#AudienceRestriction
"audience_restriction":[{}],
#OneTimeUse
"one_time_use":[{}],
#ProxyRestriction
"proxy_restriction":[{}],
#not_before=None,
#not_on_or_after=None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#RequestedAuthnContext
"requested_authn_context":{
#saml.AuthnContextClassRef
"authn_context_class_ref":None,
#saml.AuthnContextDeclRef
"authn_context_decl_ref":None,
#AuthnContextComparisonType_
"comparison":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#Scoping
"scoping":{
#IDPList
"idp_list":{
#IDPEntry
"idp_entry":{
"provider_id":None,
"name":None,
"loc":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#GetComplete
"get_complete":{},
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
#RequesterID
"requester_id":{},
#proxy_count=None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
},
"force_authn":None,
"is_passive":None,
"protocol_binding":None,
"assertion_consumer_service_index":None,
"assertion_consumer_service_url":None,
"attribute_consuming_service_index":None,
"provider_name":None,
#saml.Issuer
"issuer":{},
#ds.Signature
"signature":{},
#Extensions
"extensions":{},
"id":None,
"version":None,
"issue_instant":None,
"destination":None,
"consent":None,
#text=None,
#extension_elements=None,
#extension_attributes=None,
}
request = samlp.AuthnRequest(
id= query_id,
version= VERSION,
issue_instant= instant(),
assertion_consumer_service_url= service_url,
protocol_binding= binding
)