Allow more direct modifications of nested items.
This commit is contained in:
@@ -68,3 +68,31 @@ def add_path(tdict, path):
|
||||
t[path[-2]] = path[-1]
|
||||
|
||||
return tdict
|
||||
|
||||
|
||||
def is_set(tdict, path):
|
||||
"""
|
||||
|
||||
:param tdict: a dictionary representing a argument tree
|
||||
:param path: a path list
|
||||
:return: True/False if the value is set
|
||||
"""
|
||||
t = tdict
|
||||
for step in path:
|
||||
try:
|
||||
t = t[step]
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
if t is not None:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_attr(tdict, path):
|
||||
t = tdict
|
||||
for step in path:
|
||||
t = t[step]
|
||||
|
||||
return t
|
||||
|
@@ -558,7 +558,7 @@ class Base(Entity):
|
||||
# ======== response handling ===========
|
||||
|
||||
def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
|
||||
outstanding_certs=None):
|
||||
outstanding_certs=None, conv_info=None):
|
||||
""" Deal with an AuthnResponse
|
||||
|
||||
:param xmlstr: The reply as a xml string
|
||||
@@ -566,11 +566,8 @@ class Base(Entity):
|
||||
:param outstanding: A dictionary with session IDs as keys and
|
||||
the original web request from the user before redirection
|
||||
as values.
|
||||
:param only_identity_in_encrypted_assertion: Must exist an assertion
|
||||
that is not encrypted that contains all
|
||||
other information like
|
||||
subject and
|
||||
authentication statement.
|
||||
:param outstanding_certs:
|
||||
:param conv_info: Information about the conversation.
|
||||
:return: An response.AuthnResponse or None
|
||||
"""
|
||||
|
||||
@@ -592,6 +589,7 @@ class Base(Entity):
|
||||
"attribute_converters": self.config.attribute_converters,
|
||||
"allow_unknown_attributes":
|
||||
self.config.allow_unknown_attributes,
|
||||
'conv_info': conv_info
|
||||
}
|
||||
try:
|
||||
resp = self._parse_response(xmlstr, AuthnResponse,
|
||||
|
@@ -220,7 +220,7 @@ def for_me(conditions, myself):
|
||||
|
||||
def authn_response(conf, return_addrs, outstanding_queries=None, timeslack=0,
|
||||
asynchop=True, allow_unsolicited=False,
|
||||
want_assertions_signed=False):
|
||||
want_assertions_signed=False, conv_info=None):
|
||||
sec = security_context(conf)
|
||||
if not timeslack:
|
||||
try:
|
||||
@@ -231,12 +231,13 @@ def authn_response(conf, return_addrs, outstanding_queries=None, timeslack=0,
|
||||
return AuthnResponse(sec, conf.attribute_converters, conf.entityid,
|
||||
return_addrs, outstanding_queries, timeslack,
|
||||
asynchop=asynchop, allow_unsolicited=allow_unsolicited,
|
||||
want_assertions_signed=want_assertions_signed)
|
||||
want_assertions_signed=want_assertions_signed,
|
||||
conv_info=conv_info)
|
||||
|
||||
|
||||
# comes in over SOAP so synchronous
|
||||
def attribute_response(conf, return_addrs, timeslack=0, asynchop=False,
|
||||
test=False):
|
||||
test=False, conv_info=None):
|
||||
sec = security_context(conf)
|
||||
if not timeslack:
|
||||
try:
|
||||
@@ -246,14 +247,14 @@ def attribute_response(conf, return_addrs, timeslack=0, asynchop=False,
|
||||
|
||||
return AttributeResponse(sec, conf.attribute_converters, conf.entityid,
|
||||
return_addrs, timeslack, asynchop=asynchop,
|
||||
test=test)
|
||||
test=test, conv_info=conv_info)
|
||||
|
||||
|
||||
class StatusResponse(object):
|
||||
msgtype = "status_response"
|
||||
|
||||
def __init__(self, sec_context, return_addrs=None, timeslack=0,
|
||||
request_id=0, asynchop=True):
|
||||
request_id=0, asynchop=True, conv_info=None):
|
||||
self.sec = sec_context
|
||||
self.return_addrs = return_addrs
|
||||
|
||||
@@ -272,6 +273,7 @@ class StatusResponse(object):
|
||||
self.not_signed = False
|
||||
self.asynchop = asynchop
|
||||
self.do_not_verify = False
|
||||
self.conv_info = conv_info or {}
|
||||
|
||||
def _clear(self):
|
||||
self.xmlstr = ""
|
||||
@@ -429,9 +431,9 @@ class LogoutResponse(StatusResponse):
|
||||
msgtype = "logout_response"
|
||||
|
||||
def __init__(self, sec_context, return_addrs=None, timeslack=0,
|
||||
asynchop=True):
|
||||
asynchop=True, conv_info=None):
|
||||
StatusResponse.__init__(self, sec_context, return_addrs, timeslack,
|
||||
asynchop=asynchop)
|
||||
asynchop=asynchop, conv_info=conv_info)
|
||||
self.signature_check = self.sec.correctly_signed_logout_response
|
||||
|
||||
|
||||
@@ -439,9 +441,9 @@ class NameIDMappingResponse(StatusResponse):
|
||||
msgtype = "name_id_mapping_response"
|
||||
|
||||
def __init__(self, sec_context, return_addrs=None, timeslack=0,
|
||||
request_id=0, asynchop=True):
|
||||
request_id=0, asynchop=True, conv_info=None):
|
||||
StatusResponse.__init__(self, sec_context, return_addrs, timeslack,
|
||||
request_id, asynchop)
|
||||
request_id, asynchop, conv_info=conv_info)
|
||||
self.signature_check = self.sec \
|
||||
.correctly_signed_name_id_mapping_response
|
||||
|
||||
@@ -450,9 +452,9 @@ class ManageNameIDResponse(StatusResponse):
|
||||
msgtype = "manage_name_id_response"
|
||||
|
||||
def __init__(self, sec_context, return_addrs=None, timeslack=0,
|
||||
request_id=0, asynchop=True):
|
||||
request_id=0, asynchop=True, conv_info=None):
|
||||
StatusResponse.__init__(self, sec_context, return_addrs, timeslack,
|
||||
request_id, asynchop)
|
||||
request_id, asynchop, conv_info=conv_info)
|
||||
self.signature_check = self.sec.correctly_signed_manage_name_id_response
|
||||
|
||||
|
||||
@@ -469,10 +471,10 @@ class AuthnResponse(StatusResponse):
|
||||
timeslack=0, asynchop=True, allow_unsolicited=False,
|
||||
test=False, allow_unknown_attributes=False,
|
||||
want_assertions_signed=False, want_response_signed=False,
|
||||
**kwargs):
|
||||
conv_info=None, **kwargs):
|
||||
|
||||
StatusResponse.__init__(self, sec_context, return_addrs, timeslack,
|
||||
asynchop=asynchop)
|
||||
asynchop=asynchop, conv_info=conv_info)
|
||||
self.entity_id = entity_id
|
||||
self.attribute_converters = attribute_converters
|
||||
if outstanding_queries:
|
||||
@@ -721,6 +723,10 @@ class AuthnResponse(StatusResponse):
|
||||
assert self.assertion.subject
|
||||
subject = self.assertion.subject
|
||||
subjconf = []
|
||||
|
||||
if not self.verify_attesting_entity(subject.subject_confirmation):
|
||||
raise VerificationError("No valid attesting address")
|
||||
|
||||
for subject_confirmation in subject.subject_confirmation:
|
||||
_data = subject_confirmation.subject_confirmation_data
|
||||
|
||||
@@ -736,6 +742,10 @@ class AuthnResponse(StatusResponse):
|
||||
raise ValueError("Unknown subject confirmation method: %s" % (
|
||||
subject_confirmation.method,))
|
||||
|
||||
_recip = _data.recipient
|
||||
if not _recip or not self.verify_recipient(_recip):
|
||||
raise VerificationError("No valid recipient")
|
||||
|
||||
subjconf.append(subject_confirmation)
|
||||
|
||||
if not subjconf:
|
||||
@@ -933,7 +943,7 @@ class AuthnResponse(StatusResponse):
|
||||
decr_text_old = None
|
||||
while (self.find_encrypt_data(
|
||||
resp) or self.find_encrypt_data_assertion_list(
|
||||
_enc_assertions)) and \
|
||||
_enc_assertions)) and \
|
||||
decr_text_old != decr_text:
|
||||
decr_text_old = decr_text
|
||||
decr_text = self.sec.decrypt_keys(decr_text, keys)
|
||||
@@ -984,9 +994,11 @@ class AuthnResponse(StatusResponse):
|
||||
return True
|
||||
|
||||
def verify(self, keys=None):
|
||||
""" Verify that the assertion is syntactically correct and
|
||||
the signature is correct if present.
|
||||
:param key_file: If not the default key file should be used this is it.
|
||||
""" Verify that the assertion is syntactically correct and the
|
||||
signature is correct if present.
|
||||
|
||||
:param keys: If not the default key file should be used then use one
|
||||
of these.
|
||||
"""
|
||||
|
||||
try:
|
||||
@@ -1069,21 +1081,54 @@ class AuthnResponse(StatusResponse):
|
||||
return "%s" % self.xmlstr.decode("utf-8")
|
||||
return "%s" % self.xmlstr
|
||||
|
||||
def verify_attesting_entity(self, address):
|
||||
def verify_recipient(self, recipient):
|
||||
"""
|
||||
Assumes one assertion. At least one address specification has to be
|
||||
correct.
|
||||
Verify that I'm the recipient of the assertion
|
||||
|
||||
:param recipient: A URI specifying the entity or location to which an
|
||||
attesting entity can present the assertion.
|
||||
:return: True/False
|
||||
"""
|
||||
if not self.conv_info:
|
||||
return True
|
||||
|
||||
:param address: IP address of attesting entity
|
||||
_info = self.conv_info
|
||||
|
||||
try:
|
||||
if recipient == _info['entity_id']:
|
||||
return True
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
if recipient in self.return_addrs:
|
||||
return True
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def verify_attesting_entity(self, subject_confirmation):
|
||||
"""
|
||||
At least one address specification has to be correct.
|
||||
|
||||
:param subject_confirmation: A SubbjectConfirmation instance
|
||||
:return: True/False
|
||||
"""
|
||||
|
||||
try:
|
||||
address = self.conv_info['remote_addr']
|
||||
except KeyError:
|
||||
address = '0.0.0.0'
|
||||
|
||||
correct = 0
|
||||
for subject_conf in self.assertion.subject.subject_confirmation:
|
||||
for subject_conf in subject_confirmation:
|
||||
if subject_conf.subject_confirmation_data is None:
|
||||
correct += 1 # In reality undefined
|
||||
elif subject_conf.subject_confirmation_data.address:
|
||||
if subject_conf.subject_confirmation_data.address == address:
|
||||
if address == '0.0.0.0': # accept anything
|
||||
correct += 1
|
||||
elif subject_conf.subject_confirmation_data.address == address:
|
||||
correct += 1
|
||||
else:
|
||||
correct += 1
|
||||
@@ -1098,10 +1143,12 @@ class AuthnQueryResponse(AuthnResponse):
|
||||
msgtype = "authn_query_response"
|
||||
|
||||
def __init__(self, sec_context, attribute_converters, entity_id,
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False):
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False,
|
||||
conv_info=None):
|
||||
AuthnResponse.__init__(self, sec_context, attribute_converters,
|
||||
entity_id, return_addrs, timeslack=timeslack,
|
||||
asynchop=asynchop, test=test)
|
||||
asynchop=asynchop, test=test,
|
||||
conv_info=conv_info)
|
||||
self.entity_id = entity_id
|
||||
self.attribute_converters = attribute_converters
|
||||
self.assertion = None
|
||||
@@ -1115,10 +1162,12 @@ class AttributeResponse(AuthnResponse):
|
||||
msgtype = "attribute_response"
|
||||
|
||||
def __init__(self, sec_context, attribute_converters, entity_id,
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False):
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False,
|
||||
conv_info=None):
|
||||
AuthnResponse.__init__(self, sec_context, attribute_converters,
|
||||
entity_id, return_addrs, timeslack=timeslack,
|
||||
asynchop=asynchop, test=test)
|
||||
asynchop=asynchop, test=test,
|
||||
conv_info=conv_info)
|
||||
self.entity_id = entity_id
|
||||
self.attribute_converters = attribute_converters
|
||||
self.assertion = None
|
||||
@@ -1131,10 +1180,11 @@ class AuthzResponse(AuthnResponse):
|
||||
msgtype = "authz_decision_response"
|
||||
|
||||
def __init__(self, sec_context, attribute_converters, entity_id,
|
||||
return_addrs=None, timeslack=0, asynchop=False):
|
||||
return_addrs=None, timeslack=0, asynchop=False,
|
||||
conv_info=None):
|
||||
AuthnResponse.__init__(self, sec_context, attribute_converters,
|
||||
entity_id, return_addrs, timeslack=timeslack,
|
||||
asynchop=asynchop)
|
||||
asynchop=asynchop, conv_info=conv_info)
|
||||
self.entity_id = entity_id
|
||||
self.attribute_converters = attribute_converters
|
||||
self.assertion = None
|
||||
@@ -1145,10 +1195,12 @@ class ArtifactResponse(AuthnResponse):
|
||||
msgtype = "artifact_response"
|
||||
|
||||
def __init__(self, sec_context, attribute_converters, entity_id,
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False):
|
||||
return_addrs=None, timeslack=0, asynchop=False, test=False,
|
||||
conv_info=None):
|
||||
AuthnResponse.__init__(self, sec_context, attribute_converters,
|
||||
entity_id, return_addrs, timeslack=timeslack,
|
||||
asynchop=asynchop, test=test)
|
||||
asynchop=asynchop, test=test,
|
||||
conv_info=conv_info)
|
||||
self.entity_id = entity_id
|
||||
self.attribute_converters = attribute_converters
|
||||
self.assertion = None
|
||||
@@ -1158,7 +1210,7 @@ class ArtifactResponse(AuthnResponse):
|
||||
def response_factory(xmlstr, conf, return_addrs=None, outstanding_queries=None,
|
||||
timeslack=0, decode=True, request_id=0, origxml=None,
|
||||
asynchop=True, allow_unsolicited=False,
|
||||
want_assertions_signed=False):
|
||||
want_assertions_signed=False, conv_info=None):
|
||||
sec_context = security_context(conf)
|
||||
if not timeslack:
|
||||
try:
|
||||
@@ -1171,23 +1223,23 @@ def response_factory(xmlstr, conf, return_addrs=None, outstanding_queries=None,
|
||||
extension_schema = conf.extension_schema
|
||||
|
||||
response = StatusResponse(sec_context, return_addrs, timeslack, request_id,
|
||||
asynchop)
|
||||
asynchop, conv_info=conv_info)
|
||||
try:
|
||||
response.loads(xmlstr, decode, origxml)
|
||||
if response.response.assertion or response.response.encrypted_assertion:
|
||||
authnresp = AuthnResponse(sec_context, attribute_converters,
|
||||
entity_id, return_addrs,
|
||||
outstanding_queries, timeslack, asynchop,
|
||||
allow_unsolicited,
|
||||
extension_schema=extension_schema,
|
||||
want_assertions_signed=want_assertions_signed)
|
||||
authnresp = AuthnResponse(
|
||||
sec_context, attribute_converters, entity_id, return_addrs,
|
||||
outstanding_queries, timeslack, asynchop, allow_unsolicited,
|
||||
extension_schema=extension_schema,
|
||||
want_assertions_signed=want_assertions_signed,
|
||||
conv_info=conv_info)
|
||||
authnresp.update(response)
|
||||
return authnresp
|
||||
except TypeError:
|
||||
response.signature_check = sec_context.correctly_signed_logout_response
|
||||
response.loads(xmlstr, decode, origxml)
|
||||
logoutresp = LogoutResponse(sec_context, return_addrs, timeslack,
|
||||
asynchop=asynchop)
|
||||
asynchop=asynchop, conv_info=conv_info)
|
||||
logoutresp.update(response)
|
||||
return logoutresp
|
||||
|
||||
|
@@ -18,7 +18,7 @@ from saml2 import saml
|
||||
from saml2 import element_to_extension_element
|
||||
from saml2 import class_name
|
||||
from saml2 import BINDING_HTTP_REDIRECT
|
||||
from saml2.argtree import add_path
|
||||
from saml2.argtree import add_path, is_set
|
||||
|
||||
from saml2.entity import Entity
|
||||
from saml2.eptid import Eptid
|
||||
@@ -289,6 +289,41 @@ class Server(Entity):
|
||||
return self._parse_request(xml_string, NameIDMappingRequest,
|
||||
"name_id_mapping_service", binding)
|
||||
|
||||
@staticmethod
|
||||
def update_farg(in_response_to, consumer_url, farg=None):
|
||||
if not farg:
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', in_response_to])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', consumer_url])
|
||||
else:
|
||||
if not is_set(farg,
|
||||
['assertion', 'subject', 'subject_confirmation',
|
||||
'method']):
|
||||
add_path(farg,
|
||||
['assertion', 'subject', 'subject_confirmation',
|
||||
'method', saml.SCM_BEARER])
|
||||
if not is_set(farg,
|
||||
['assertion', 'subject', 'subject_confirmation',
|
||||
'subject_confirmation_data', 'in_response_to']):
|
||||
add_path(farg,
|
||||
['assertion', 'subject', 'subject_confirmation',
|
||||
'subject_confirmation_data', 'in_response_to',
|
||||
in_response_to])
|
||||
if not is_set(farg, ['assertion', 'subject', 'subject_confirmation',
|
||||
'subject_confirmation_data', 'recipient']):
|
||||
add_path(farg,
|
||||
['assertion', 'subject', 'subject_confirmation',
|
||||
'subject_confirmation_data', 'recipient',
|
||||
consumer_url])
|
||||
return farg
|
||||
|
||||
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
|
||||
name_id, policy, _issuer, authn_statement, identity,
|
||||
best_effort, sign_response, farg=None, **kwargs):
|
||||
@@ -323,17 +358,7 @@ class Server(Entity):
|
||||
return self.create_error_response(in_response_to, consumer_url,
|
||||
exc, sign_response)
|
||||
|
||||
if not farg:
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to', in_response_to])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', consumer_url])
|
||||
farg = self.update_farg(in_response_to, consumer_url, farg)
|
||||
|
||||
if authn: # expected to be a dictionary
|
||||
# Would like to use dict comprehension but ...
|
||||
@@ -369,7 +394,7 @@ class Server(Entity):
|
||||
encrypt_assertion_self_contained=False,
|
||||
encrypted_advice_attributes=False,
|
||||
pefim=False, sign_alg=None, digest_alg=None,
|
||||
assertion_args=None):
|
||||
farg=None):
|
||||
""" Create a response. A layer of indirection.
|
||||
|
||||
:param in_response_to: The session identifier of the request
|
||||
@@ -401,11 +426,11 @@ class Server(Entity):
|
||||
:param sign_assertion: True if assertions should be signed.
|
||||
:param pefim: True if a response according to the PEFIM profile
|
||||
should be created.
|
||||
:param assertion_args: Argument to pass on to the assertion constructor
|
||||
:param farg: Argument to pass on to the assertion constructor
|
||||
:return: A response instance
|
||||
"""
|
||||
|
||||
if assertion_args is None:
|
||||
if farg is None:
|
||||
assertion_args = {}
|
||||
|
||||
args = {}
|
||||
@@ -421,23 +446,16 @@ class Server(Entity):
|
||||
# tmp_authn_statement = authn_statement
|
||||
# authn_statement = None
|
||||
|
||||
try:
|
||||
ass_in_response_to = assertion_args['in_response_to']
|
||||
except KeyError:
|
||||
ass_in_response_to = in_response_to
|
||||
else:
|
||||
del assertion_args['in_response_to']
|
||||
|
||||
if pefim:
|
||||
encrypted_advice_attributes = True
|
||||
encrypt_assertion_self_contained = True
|
||||
assertion_attributes = self.setup_assertion(
|
||||
None, sp_entity_id, None, None, None, policy, None, None,
|
||||
identity, best_effort, sign_response, farg=assertion_args)
|
||||
identity, best_effort, sign_response, farg=farg)
|
||||
assertion = self.setup_assertion(
|
||||
authn, sp_entity_id, ass_in_response_to, consumer_url, name_id,
|
||||
authn, sp_entity_id, in_response_to, consumer_url, name_id,
|
||||
policy, _issuer, authn_statement, [], True, sign_response,
|
||||
farg=assertion_args)
|
||||
farg=farg)
|
||||
assertion.advice = saml.Advice()
|
||||
|
||||
# assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
|
||||
@@ -445,9 +463,9 @@ class Server(Entity):
|
||||
assertion.advice.assertion.append(assertion_attributes)
|
||||
else:
|
||||
assertion = self.setup_assertion(
|
||||
authn, sp_entity_id, ass_in_response_to, consumer_url, name_id,
|
||||
authn, sp_entity_id, in_response_to, consumer_url, name_id,
|
||||
policy, _issuer, authn_statement, identity, True,
|
||||
sign_response, farg=assertion_args)
|
||||
sign_response, farg=farg)
|
||||
|
||||
to_sign = []
|
||||
if not encrypt_assertion:
|
||||
@@ -514,18 +532,7 @@ class Server(Entity):
|
||||
to_sign = []
|
||||
|
||||
if identity:
|
||||
if not farg:
|
||||
farg = add_path(
|
||||
{},
|
||||
['assertion', 'subject', 'subject_confirmation', 'method',
|
||||
saml.SCM_BEARER])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'in_response_to',
|
||||
in_response_to])
|
||||
add_path(
|
||||
farg['assertion']['subject']['subject_confirmation'],
|
||||
['subject_confirmation_data', 'recipient', destination])
|
||||
farg = self.update_farg(in_response_to, destination, farg=farg)
|
||||
|
||||
_issuer = self._issuer(issuer)
|
||||
ast = Assertion(identity)
|
||||
@@ -656,7 +663,7 @@ class Server(Entity):
|
||||
else:
|
||||
args['name_id'] = kwargs['name_id']
|
||||
|
||||
for param in ['status', 'assertion_args']:
|
||||
for param in ['status', 'farg']:
|
||||
try:
|
||||
args[param] = kwargs[param]
|
||||
except KeyError:
|
||||
@@ -714,7 +721,8 @@ class Server(Entity):
|
||||
encrypt_cert_advice=encrypt_cert_advice,
|
||||
encrypt_cert_assertion=encrypt_cert_assertion,
|
||||
encrypt_assertion=encrypt_assertion,
|
||||
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
|
||||
encrypt_assertion_self_contained
|
||||
=encrypt_assertion_self_contained,
|
||||
encrypted_advice_attributes=encrypted_advice_attributes,
|
||||
pefim=pefim, **kwargs)
|
||||
except IOError as exc:
|
||||
|
@@ -1,7 +1,7 @@
|
||||
from saml2 import saml
|
||||
from saml2.saml import Subject
|
||||
from saml2.samlp import Response
|
||||
from saml2.argtree import set_arg, add_path
|
||||
from saml2.argtree import set_arg, add_path, is_set
|
||||
from saml2.argtree import find_paths
|
||||
|
||||
__author__ = 'roland'
|
||||
@@ -32,13 +32,22 @@ def test_set_arg():
|
||||
def test_multi():
|
||||
t = {}
|
||||
t = add_path(t, ['subject_confirmation','method',saml.SCM_BEARER])
|
||||
x = add_path(
|
||||
t['subject_confirmation'],
|
||||
['subject_confirmation_data','in_response_to','1234'])
|
||||
add_path(t['subject_confirmation'],
|
||||
['subject_confirmation_data','in_response_to','1234'])
|
||||
|
||||
print(t)
|
||||
assert t == {
|
||||
'subject_confirmation': {
|
||||
'subject_confirmation_data': {'in_response_to': '1234'},
|
||||
'method': 'urn:oasis:names:tc:SAML:2.0:cm:bearer'}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_is_set():
|
||||
t = {}
|
||||
t = add_path(t, ['subject_confirmation','method',saml.SCM_BEARER])
|
||||
add_path(t['subject_confirmation'],
|
||||
['subject_confirmation_data','in_response_to','1234'])
|
||||
|
||||
assert is_set(t, ['subject_confirmation','method'])
|
||||
assert is_set(t, ['subject_confirmation', 'subject_confirmation_data',
|
||||
'receiver']) is False
|
||||
|
@@ -2,8 +2,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import datetime
|
||||
import re
|
||||
from six.moves.urllib.parse import quote_plus
|
||||
#from future.backports.urllib.parse import quote_plus
|
||||
|
||||
from future.backports.urllib.parse import quote_plus
|
||||
|
||||
from saml2.config import Config
|
||||
from saml2.mdstore import MetadataStore
|
||||
from saml2.mdstore import MetaDataMDX
|
||||
@@ -447,6 +448,7 @@ def test_get_certs_from_metadata_without_keydescriptor():
|
||||
|
||||
assert len(certs) == 0
|
||||
|
||||
|
||||
def test_metadata_extension_algsupport():
|
||||
mds = MetadataStore(ATTRCONV, None)
|
||||
mds.imp(METADATACONF["12"])
|
||||
|
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
import pytest
|
||||
|
||||
__author__ = 'rolandh'
|
||||
|
||||
@@ -11,6 +12,7 @@ from pytest import raises
|
||||
SESSION_INFO_PATTERN = {"ava":{}, "came from":"", "not_on_or_after":0,
|
||||
"issuer":"", "session_id":-1}
|
||||
|
||||
@pytest.mark.mongo
|
||||
class TestMongoDBCache():
|
||||
def setup_class(self):
|
||||
try:
|
||||
|
@@ -1,5 +1,6 @@
|
||||
from contextlib import closing
|
||||
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
|
||||
import pytest
|
||||
from saml2 import BINDING_HTTP_POST
|
||||
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
|
||||
from saml2.client import Saml2Client
|
||||
@@ -19,6 +20,7 @@ def _eq(l1, l2):
|
||||
return set(l1) == set(l2)
|
||||
|
||||
|
||||
@pytest.mark.mongo
|
||||
def test_flow():
|
||||
sp = Saml2Client(config_file="servera_conf")
|
||||
try:
|
||||
@@ -63,6 +65,7 @@ def test_flow():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.mongo
|
||||
def test_eptid_mongo_db():
|
||||
try:
|
||||
edb = EptidMDB("secret", "idp")
|
||||
|
Reference in New Issue
Block a user