Merge pull request #224 from SpamapS/master

A plethora of python3 fixes
This commit is contained in:
Roland Hedberg
2015-06-06 09:14:33 +02:00
43 changed files with 294 additions and 240 deletions

View File

@@ -83,6 +83,8 @@ def create_class_from_xml_string(target_class, xml_string):
the contents of the XML - or None if the root XML tag and namespace did
not match those of the target class.
"""
if not isinstance(xml_string, six.binary_type):
xml_string = xml_string.encode('utf-8')
tree = ElementTree.fromstring(xml_string)
return create_class_from_element_tree(target_class, tree)
@@ -576,7 +578,7 @@ class SamlBase(ExtensionContainer):
for elem in elements:
uri_set = self.get_ns_map_attribute(elem.attrib, uri_set)
uri_set = self.get_ns_map(elem._children, uri_set)
uri_set = self.get_ns_map(elem.getchildren(), uri_set)
uri = self.tag_get_uri(elem)
if uri is not None:
uri_set.add(uri)
@@ -600,7 +602,7 @@ class SamlBase(ExtensionContainer):
if assertion is not None:
self.set_prefixes(assertion, prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8")
return ElementTree.tostring(tree, encoding="UTF-8").decode('utf-8')
def get_xml_string_with_self_contained_assertion_within_encrypted_assertion(self, assertion_tag):
""" Makes a encrypted assertion only containing self contained namespaces.
@@ -614,7 +616,7 @@ class SamlBase(ExtensionContainer):
self.set_prefixes(tree.find(self.encrypted_assertion._to_element_tree().tag).find(assertion_tag), prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8")
return ElementTree.tostring(tree, encoding="UTF-8").decode('utf-8')
def set_prefixes(self, elem, prefix_map):
@@ -825,10 +827,14 @@ class SamlBase(ExtensionContainer):
return False
elif isinstance(svals, list):
for sval in svals:
for oval in ovals:
if sval == oval:
break
else:
try:
for oval in ovals:
if sval == oval:
break
else:
return False
except TypeError:
# ovals isn't iterable
return False
else:
if svals == ovals: # Since I only support '=='

View File

@@ -229,7 +229,7 @@ def filter_attribute_value_assertions(ava, attribute_restrictions=None):
if not attribute_restrictions:
return ava
for attr, vals in ava.items():
for attr, vals in list(ava.items()):
_attr = attr.lower()
try:
_rests = attribute_restrictions[_attr]
@@ -767,7 +767,7 @@ class Assertion(dict):
policy.acs = self.acs
ava = policy.restrict(self, sp_entity_id, metadata)
for key, val in self.items():
for key, val in list(self.items()):
if key in ava:
self[key] = ava[key]
else:

View File

@@ -23,7 +23,7 @@ class CacheError(SAMLError):
class Cache(object):
def __init__(self, filename=None):
if filename:
self._db = shelve.open(filename, writeback=True)
self._db = shelve.open(filename, writeback=True, protocol=2)
self._sync = True
else:
self._db = {}
@@ -96,7 +96,7 @@ class Cache(object):
cni = code(name_id)
(timestamp, info) = self._db[cni][entity_id]
if check_not_on_or_after and time_util.after(timestamp):
raise ToOld("past %s" % timestamp)
raise ToOld("past %s" % str(timestamp))
return info or None
@@ -139,7 +139,7 @@ class Cache(object):
:return: A possibly empty list of entity identifiers
"""
cni = code(name_id)
return self._db[cni].keys()
return list(self._db[cni].keys())
def receivers(self, name_id):
""" Another name for entities() just to make it more logic in the IdP

View File

@@ -149,7 +149,7 @@ class Base(Entity):
raise IdpUnspecified("Too many IdPs to choose from: %s" % eids)
try:
srvs = self.metadata.single_sign_on_service(eids.keys()[0], binding)
srvs = self.metadata.single_sign_on_service(list(eids.keys())[0], binding)
return destinations(srvs)[0]
except IndexError:
raise IdpUnspecified("No IdP to send to given the premises")

View File

@@ -1,6 +1,4 @@
from urllib import urlencode
from urlparse import parse_qs
from urlparse import urlparse
from six.moves.urllib.parse import urlencode, parse_qs, urlparse
from saml2.entity import Entity
from saml2.response import VerificationError

View File

@@ -7,7 +7,7 @@ Contains a class that can do SAML ECP Authentication for other python
programs.
"""
import cookielib
from six.moves import http_cookiejar as cookielib
import logging
from saml2 import soap

View File

@@ -34,7 +34,7 @@ from saml2.time_util import instant
from saml2.s_utils import sid
from saml2.s_utils import UnravelError
from saml2.s_utils import error_status_factory
from saml2.s_utils import rndstr
from saml2.s_utils import rndbytes
from saml2.s_utils import success_status_factory
from saml2.s_utils import decode_base64_and_inflate
from saml2.s_utils import UnsupportedBinding
@@ -73,7 +73,7 @@ logger = logging.getLogger(__name__)
__author__ = 'rolandh'
ARTIFACT_TYPECODE = '\x00\x04'
ARTIFACT_TYPECODE = b'\x00\x04'
SERVICE2MESSAGE = {
"single_sign_on_service": AuthnRequest,
@@ -103,11 +103,17 @@ def create_artifact(entity_id, message_handle, endpoint_index=0):
:param endpoint_index:
:return:
"""
if not isinstance(entity_id, six.binary_type):
entity_id = entity_id.encode('utf-8')
sourceid = sha1(entity_id)
ter = "%s%.2x%s%s" % (ARTIFACT_TYPECODE, endpoint_index,
sourceid.digest(), message_handle)
return base64.b64encode(ter)
if not isinstance(message_handle, six.binary_type):
message_handle = message_handle.encode('utf-8')
ter = b"".join((ARTIFACT_TYPECODE,
("%.2x" % endpoint_index).encode('ascii'),
sourceid.digest(),
message_handle))
return base64.b64encode(ter).decode('ascii')
class Entity(HTTPBase):
@@ -540,7 +546,7 @@ class Entity(HTTPBase):
_cert = "%s%s" % (begin_cert, _cert)
if end_cert not in _cert:
_cert = "%s%s" % (_cert, end_cert)
_, cert_file = make_temp(_cert, decode=False)
_, cert_file = make_temp(_cert.encode('ascii'), decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part(), node_xpath=node_xpath)
return response
@@ -1115,8 +1121,8 @@ class Entity(HTTPBase):
:param endpoint_index:
:return:
"""
message_handle = sha1("%s" % message)
message_handle.update(rndstr())
message_handle = sha1(str(message).encode('utf-8'))
message_handle.update(rndbytes())
mhd = message_handle.digest()
saml_art = create_artifact(self.config.entityid, mhd, endpoint_index)
self.artifact[saml_art] = message

View File

@@ -42,6 +42,8 @@ class Eptid(object):
return self._db[key]
def __setitem__(self, key, value):
if six.PY3 and isinstance(key, six.binary_type):
key = key.decode('utf-8')
self._db[key] = value
def get(self, idp, sp, *args):

View File

@@ -21,7 +21,7 @@ class AllowDescriptor(Filter):
def __call__(self, entity_descriptor):
# get descriptors
_all = []
for desc in entity_descriptor.keys():
for desc in list(entity_descriptor.keys()):
if desc.endswith("_descriptor"):
typ, _ = desc.rsplit("_", 1)
if typ in self.allow:

View File

@@ -5,6 +5,7 @@ import copy
import re
import urllib
from six.moves.urllib.parse import urlparse
from six.moves.urllib.parse import urlencode
import requests
import time
from six.moves.http_cookies import SimpleCookie
@@ -269,10 +270,10 @@ class HTTPBase(object):
@staticmethod
def use_http_artifact(message, destination="", relay_state=""):
if relay_state:
query = urllib.urlencode({"SAMLart": message,
"RelayState": relay_state})
query = urlencode({"SAMLart": message,
"RelayState": relay_state})
else:
query = urllib.urlencode({"SAMLart": message})
query = urlencode({"SAMLart": message})
info = {
"data": "",
"url": "%s?%s" % (destination, query)
@@ -281,9 +282,13 @@ class HTTPBase(object):
@staticmethod
def use_http_uri(message, typ, destination="", relay_state=""):
if "\n" in message:
data = message.split("\n")[1]
else:
data = message.strip()
if typ == "SAMLResponse":
info = {
"data": message.split("\n")[1],
"data": data,
"headers": [
("Content-Type", "application/samlassertion+xml"),
("Cache-Control", "no-cache, no-store"),
@@ -293,10 +298,10 @@ class HTTPBase(object):
elif typ == "SAMLRequest":
# msg should be an identifier
if relay_state:
query = urllib.urlencode({"ID": message,
"RelayState": relay_state})
query = urlencode({"ID": message,
"RelayState": relay_state})
else:
query = urllib.urlencode({"ID": message})
query = urlencode({"ID": message})
info = {
"data": "",
"url": "%s?%s" % (destination, query)

View File

@@ -5,9 +5,8 @@ import time
import cgi
import six
from urllib import quote
from urlparse import parse_qs
from Cookie import SimpleCookie
from six.moves.urllib.parse import quote, parse_qs
from six.moves.http_cookies import SimpleCookie
from saml2 import BINDING_HTTP_ARTIFACT, SAMLError
from saml2 import BINDING_HTTP_REDIRECT

View File

@@ -78,7 +78,7 @@ class IdentDB(object):
"""
def __init__(self, db, domain="", name_qualifier=""):
if isinstance(db, six.string_types):
self.db = shelve.open(db)
self.db = shelve.open(db, protocol=2)
else:
self.db = db
self.domain = domain
@@ -111,9 +111,6 @@ class IdentDB(object):
:param ident: user identifier
:param name_id: NameID instance
"""
if isinstance(ident, six.string_types):
ident = ident.encode("utf-8")
# One user may have more than one NameID defined
try:
val = self.db[ident].split(" ")

View File

@@ -10,10 +10,9 @@ Bindings normally consists of three parts:
- how to package the information
- which protocol to use
"""
from six.moves.urllib.parse import urlparse
from six.moves.urllib.parse import urlparse, urlencode
import saml2
import base64
import urllib
from saml2.s_utils import deflate_and_base64_encode
from saml2.s_utils import Unsupported
import logging
@@ -59,12 +58,15 @@ def http_form_post_message(message, location, relay_state="",
response = ["<head>", """<title>SAML 2.0 POST</title>""", "</head><body>"]
if not isinstance(message, six.string_types):
message = "%s" % (message,)
message = str(message)
if not isinstance(message, six.binary_type):
message = message.encode('utf-8')
if typ == "SAMLRequest" or typ == "SAMLResponse":
_msg = base64.b64encode(message)
else:
_msg = message
_msg = _msg.decode('ascii')
response.append(FORM_SPEC % (location, typ, _msg, relay_state))
@@ -123,12 +125,12 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
except:
raise Unsupported("Signing algorithm")
else:
string = "&".join([urllib.urlencode({k: args[k]})
for k in _order if k in args])
string = "&".join([urlencode({k: args[k]})
for k in _order if k in args]).encode('ascii')
args["Signature"] = base64.b64encode(signer.sign(string, key))
string = urllib.urlencode(args)
string = urlencode(args)
else:
string = urllib.urlencode(args)
string = urlencode(args)
glue_char = "&" if urlparse(location).query else "?"
login_url = glue_char.join([location, string])
@@ -166,9 +168,10 @@ def make_soap_enveloped_saml_thingy(thingy, header_parts=None):
if isinstance(thingy, six.string_types):
# remove the first XML version/encoding line
logger.debug("thingy0: %s" % thingy)
_part = thingy.split("\n")
thingy = "".join(_part[1:])
if thingy[0:5].lower() == '<?xml':
logger.debug("thingy0: %s" % thingy)
_part = thingy.split("\n")
thingy = "".join(_part[1:])
thingy = thingy.replace(PREFIX, "")
logger.debug("thingy: %s" % thingy)
_child = ElementTree.Element('')

View File

@@ -13,11 +13,11 @@ import shelve
import traceback
import saml2
import six
from urlparse import parse_qs, urlparse
from six.moves.urllib.parse import parse_qs, urlparse
from saml2.samlp import Extensions
from saml2 import xmldsig as ds
from StringIO import StringIO
from six import StringIO
from paste.httpexceptions import HTTPSeeOther, HTTPRedirection
from paste.httpexceptions import HTTPNotImplemented
@@ -27,7 +27,7 @@ from paste.request import construct_url
from saml2.extension.pefim import SPCertEnc
from saml2.httputil import SeeOther
from saml2.client_base import ECP_SERVICE
from zope.interface import implements
from zope.interface import implementer
from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator
from repoze.who.interfaces import IMetadataProvider
@@ -80,8 +80,8 @@ class ECP_response(object):
return [self.content]
@implementer(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider)
class SAML2Plugin(object):
implements(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider)
def __init__(self, rememberer_name, config, saml_client, wayf, cache,
sid_store=None, discovery="", idp_query_param="",
@@ -100,11 +100,13 @@ class SAML2Plugin(object):
except KeyError:
self.metadata = None
if sid_store:
self.outstanding_queries = shelve.open(sid_store, writeback=True)
self.outstanding_queries = shelve.open(sid_store, writeback=True,
protocol=2)
else:
self.outstanding_queries = {}
if sid_store_cert:
self.outstanding_certs = shelve.open(sid_store_cert, writeback=True)
self.outstanding_certs = shelve.open(sid_store_cert, writeback=True,
protocol=2)
else:
self.outstanding_certs = {}

View File

@@ -152,6 +152,8 @@ def deflate_and_base64_encode(string_val):
:param string_val: The string to deflate and encode
:return: The deflated and encoded string
"""
if not isinstance(string_val, six.binary_type):
string_val = string_val.encode('utf-8')
return base64.b64encode(zlib.compress(string_val)[2:-4])
@@ -372,8 +374,16 @@ def factory(klass, **kwargs):
def signature(secret, parts):
"""Generates a signature.
"""Generates a signature. All strings are assumed to be utf-8
"""
if not isinstance(secret, six.binary_type):
secret = secret.encode('utf-8')
newparts = []
for part in parts:
if not isinstance(part, six.binary_type):
part = part.encode('utf-8')
newparts.append(part)
parts = newparts
if sys.version_info >= (2, 5):
csum = hmac.new(secret, digestmod=hashlib.sha1)
else:

View File

@@ -119,7 +119,7 @@ def _verify_value_type(typ, val):
if typ == XSD + "base64Binary":
import base64
return base64.decodestring(val)
return base64.decodestring(val.encode('utf-8'))
class AttributeValueBase(SamlBase):
@@ -193,6 +193,8 @@ class AttributeValueBase(SamlBase):
val = base64.encodestring(val)
self.set_type("xs:base64Binary")
else:
if isinstance(val, six.binary_type):
val = val.decode('utf-8')
if isinstance(val, six.string_types):
if not typ:
self.set_type("xs:string")

View File

@@ -2,7 +2,7 @@ import logging
from hashlib import sha1
from saml2.ident import code
from saml2.ident import code_binary
from saml2 import md
from saml2 import saml
@@ -49,7 +49,7 @@ class SessionStorage(object):
def store_assertion(self, assertion, to_sign):
self.assertion[assertion.id] = (assertion, to_sign)
key = sha1(code(assertion.subject.name_id)).hexdigest()
key = sha1(code_binary(assertion.subject.name_id)).hexdigest()
try:
self.authn[key].append(assertion.authn_statement)
except KeyError:
@@ -68,7 +68,7 @@ class SessionStorage(object):
:return:
"""
result = []
key = sha1(code(name_id)).hexdigest()
key = sha1(code_binary(name_id)).hexdigest()
try:
statements = self.authn[key]
except KeyError:
@@ -89,6 +89,6 @@ class SessionStorage(object):
def remove_authn_statements(self, name_id):
logger.debug("remove authn about: %s" % name_id)
nkey = sha1(code(name_id)).hexdigest()
nkey = sha1(code_binary(name_id)).hexdigest()
del self.authn[nkey]

View File

@@ -9,6 +9,7 @@ import logging
import os
import importlib
import dbm
import shelve
import six
import threading
@@ -56,6 +57,17 @@ AUTHN_DICT_MAP = {
"subject_locality": "subject_locality"
}
def _shelve_compat(name, *args, **kwargs):
try:
return shelve.open(name, *args, **kwargs)
except dbm.error[0]:
# Python 3 whichdb needs to try .db to determine type
if name.endswith('.db'):
name = name.rsplit('.db', 1)[0]
return shelve.open(name, *args, **kwargs)
else:
raise
class Server(Entity):
""" A class that does things that IdPs or AAs do """
@@ -118,12 +130,12 @@ class Server(Entity):
if not dbspec:
idb = {}
elif isinstance(dbspec, six.string_types):
idb = shelve.open(dbspec, writeback=True)
idb = _shelve_compat(dbspec, writeback=True, protocol=2)
else: # database spec is a a 2-tuple (type, address)
#print(>> sys.stderr, "DBSPEC: %s" % (dbspec,))
(typ, addr) = dbspec
if typ == "shelve":
idb = shelve.open(addr, writeback=True)
idb = _shelve_compat(addr, writeback=True, protocol=2)
elif typ == "memcached":
import memcache

View File

@@ -11,7 +11,7 @@ import hashlib
import logging
import os
import ssl
import urllib
from six.moves.urllib.parse import urlencode
from time import mktime
from binascii import hexlify
@@ -266,7 +266,7 @@ def _instance(klass, ava, seccont, base64encode=False, elements_to_sign=None):
#print("# %s" % (prop))
if prop in ava:
if isinstance(ava[prop], bool):
setattr(instance, prop, "%s" % ava[prop])
setattr(instance, prop, str(ava[prop]).encode('utf-8'))
elif isinstance(ava[prop], int):
setattr(instance, prop, "%d" % ava[prop])
else:
@@ -313,7 +313,7 @@ def signed_instance_factory(instance, seccont, elements_to_sign=None):
:return: A class instance if not signed otherwise a string
"""
if elements_to_sign:
signed_xml = "%s" % instance
signed_xml = str(instance)
for (node_name, nodeid) in elements_to_sign:
signed_xml = seccont.sign_statement(
signed_xml, node_name=node_name, node_id=nodeid)
@@ -351,6 +351,7 @@ def make_temp(string, suffix="", decode=True, delete=True):
xmlsec function).
"""
ntf = NamedTemporaryFile(suffix=suffix, delete=delete)
assert isinstance(string, six.binary_type)
if decode:
ntf.write(base64.b64decode(string))
else:
@@ -527,7 +528,7 @@ def rsa_eq(key1, key2):
def extract_rsa_key_from_x509_cert(pem):
# Convert from PEM to DER
der = ssl.PEM_cert_to_DER_cert(pem)
der = ssl.PEM_cert_to_DER_cert(pem.decode('ascii'))
# Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
cert = DerSequence()
@@ -543,7 +544,7 @@ def extract_rsa_key_from_x509_cert(pem):
def pem_format(key):
return "\n".join(["-----BEGIN CERTIFICATE-----",
key, "-----END CERTIFICATE-----"])
key, "-----END CERTIFICATE-----"]).encode('ascii')
def import_rsa_key_from_file(filename):
@@ -634,7 +635,8 @@ def verify_redirect_signature(saml_msg, cert=None, sigkey=None):
_args = saml_msg.copy()
del _args["Signature"] # everything but the signature
string = "&".join(
[urllib.urlencode({k: _args[k]}) for k in _order if k in _args])
[urlencode({k: _args[k]}) for k in _order if k in
_args]).encode('ascii')
if cert:
_key = extract_rsa_key_from_x509_cert(pem_format(cert))
else:
@@ -740,8 +742,9 @@ class CryptoBackendXmlSec1(CryptoBackend):
def version(self):
com_list = [self.xmlsec, "--version"]
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
content = pof.stdout.read().decode('ascii')
try:
return pof.stdout.read().split(" ")[1]
return content.split(" ")[1]
except IndexError:
return ""
@@ -757,7 +760,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
:return:
"""
logger.debug("Encryption input len: %d" % len(text))
_, fil = make_temp("%s" % text, decode=False)
_, fil = make_temp(str(text).encode('utf-8'), decode=False)
com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key,
"--session-key", session_key_type, "--xml-data", fil]
@@ -768,6 +771,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [template],
exception=DecryptError,
validate_output=False)
if isinstance(output, six.binary_type):
output = output.decode('utf-8')
return output
def encrypt_assertion(self, statement, enc_key, template,
@@ -785,8 +790,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
if isinstance(statement, SamlBase):
statement = pre_encrypt_assertion(statement)
_, fil = make_temp("%s" % statement, decode=False, delete=False)
_, tmpl = make_temp("%s" % template, decode=False)
_, fil = make_temp(str(statement).encode('utf-8'), decode=False, delete=False)
_, tmpl = make_temp(str(template).encode('utf-8'), decode=False)
if not node_xpath:
node_xpath = ASSERT_XPATH
@@ -804,7 +809,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
if not output:
raise EncryptError(_stderr)
return output
return output.decode('utf-8')
def decrypt(self, enctext, key_file):
"""
@@ -815,7 +820,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
"""
logger.debug("Decrypt input len: %d" % len(enctext))
_, fil = make_temp("%s" % enctext, decode=False)
_, fil = make_temp(str(enctext).encode('utf-8'), decode=False)
com_list = [self.xmlsec, "--decrypt", "--privkey-pem",
key_file, "--id-attr:%s" % ID_ATTR, ENC_KEY_CLASS]
@@ -823,7 +828,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil],
exception=DecryptError,
validate_output=False)
return output
return output.decode('utf-8')
def sign_statement(self, statement, node_name, key_file, node_id,
id_attr):
@@ -838,9 +843,11 @@ class CryptoBackendXmlSec1(CryptoBackend):
'id','Id' or 'ID'
:return: The signed statement
"""
if not isinstance(statement, six.binary_type):
statement = str(statement).encode('utf-8')
_, fil = make_temp("%s" % statement, suffix=".xml", decode=False,
delete=self._xmlsec_delete_tmpfiles)
_, fil = make_temp(statement, suffix=".xml",
decode=False, delete=self._xmlsec_delete_tmpfiles)
com_list = [self.xmlsec, "--sign",
"--privkey-pem", key_file,
@@ -854,7 +861,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
# this doesn't work if --store-signatures are used
if stdout == "":
if signed_statement:
return signed_statement
return signed_statement.decode('utf-8')
logger.error(
"Signing operation failed :\nstdout : %s\nstderr : %s" % (
stdout, stderr))
@@ -875,6 +882,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
:param id_attr: Should normally be one of "id", "Id" or "ID"
:return: Boolean True if the signature was correct otherwise False.
"""
if not isinstance(signedtext, six.binary_type):
signedtext = signedtext.encode('utf-8')
_, fil = make_temp(signedtext, suffix=".xml",
decode=False, delete=self._xmlsec_delete_tmpfiles)
@@ -924,8 +933,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
p_out = pof.stdout.read()
p_err = pof.stderr.read()
p_out = pof.stdout.read().decode('utf-8')
p_err = pof.stderr.read().decode('utf-8')
if pof.returncode is not None and pof.returncode < 0:
logger.error(LOG_LINE % (p_out, p_err))
@@ -1348,7 +1357,9 @@ class SecurityContext(object):
return _enctext
for _key in keys:
if _key is not None and len(_key.strip()) > 0:
_, key_file = make_temp("%s" % _key, decode=False)
if not isinstance(_key, six.binary_type):
_key = str(_key).encode('ascii')
_, key_file = make_temp(_key, decode=False)
_enctext = self.crypto.decrypt(enctext, key_file)
if _enctext is not None and len(_enctext) > 0:
return _enctext
@@ -1709,7 +1720,7 @@ class SecurityContext(object):
id_attr = ID_ATTR
if not key_file and key:
_, key_file = make_temp("%s" % key, ".pem")
_, key_file = make_temp(str(key).encode('utf-8'), ".pem")
if not key and not key_file:
key_file = self.key_file

View File

@@ -313,4 +313,8 @@ def later_than(after, before):
elif isinstance(before, int):
before = time.gmtime(before)
if before is None:
return True
if after is None:
return False
return after >= before

View File

@@ -1,4 +1,4 @@
from urlparse import parse_qs
from six.moves.urllib.parse import parse_qs
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import attribute_query_from_string, logout_request_from_string
from saml2 import BINDING_HTTP_REDIRECT, pack

View File

@@ -183,7 +183,7 @@ class TestExtensionContainer:
ec = saml2.ExtensionContainer()
ec.add_extension_attribute("foo", "bar")
assert len(ec.extension_attributes) == 1
assert ec.extension_attributes.keys()[0] == "foo"
assert list(ec.extension_attributes.keys())[0] == "foo"
class TestSAMLBase:
@@ -216,13 +216,14 @@ class TestSAMLBase:
attr = Attribute()
saml2.make_vals(ava, AttributeValue, attr, prop="attribute_value")
assert attr.keyswv() == ["name_format", "attribute_value"]
assert sorted(attr.keyswv()) == sorted(["name_format",
"attribute_value"])
assert len(attr.attribute_value) == 4
def test_to_string_nspair(self):
foo = saml2.make_vals("lions", AttributeValue, part=True)
txt = foo.to_string()
nsstr = foo.to_string({"saml": saml.NAMESPACE})
txt = foo.to_string().decode('utf-8')
nsstr = foo.to_string({"saml": saml.NAMESPACE}).decode('utf-8')
assert nsstr != txt
print(txt)
print(nsstr)
@@ -1215,4 +1216,4 @@ class TestAssertion:
if __name__ == "__main__":
t = TestSAMLBase()
t.test_make_vals_multi_dict()
t.test_make_vals_multi_dict()

View File

@@ -548,5 +548,5 @@ def test_extensions_loadd():
assert _eq(nid.attributes.keys(), ["Format"])
assert nid.text.strip() == "http://federationX.org"
assert extension.extension_attributes.keys() == ["foo"]
assert list(extension.extension_attributes.keys()) == ["foo"]
assert extension.extension_attributes["foo"] == "bar"

View File

@@ -9,7 +9,6 @@ from saml2.time_util import before, after, not_before, not_on_or_after
def test_f_quotient():
assert f_quotient(-1, 3) == -1
assert f_quotient(0, 3) == 0
assert f_quotient(1, 3) == 0
assert f_quotient(2, 3) == 0
@@ -28,7 +27,6 @@ def test_modulo():
def test_f_quotient_2():
assert f_quotient(0, 1, 13) == -1
for i in range(1, 13):
assert f_quotient(i, 1, 13) == 0
assert f_quotient(13, 1, 13) == 1

View File

@@ -3,6 +3,8 @@
import base64
import six
from saml2 import s_utils as utils
from saml2 import saml
from saml2 import samlp
@@ -15,16 +17,20 @@ from py.test import raises
from pathutils import full_path
SUCCESS_STATUS = ('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n'
XML_HEADER = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n'
SUCCESS_STATUS_NO_HEADER = (
'<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode '
'Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>')
SUCCESS_STATUS = '%s%s' % (XML_HEADER, SUCCESS_STATUS_NO_HEADER)
ERROR_STATUS = ('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n'
ERROR_STATUS_NO_HEADER = (
'<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode '
'Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode '
'Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" '
'/></ns0:StatusCode><ns0:StatusMessage>Error resolving '
'principal</ns0:StatusMessage></ns0:Status>')
ERROR_STATUS = '%s%s' % (XML_HEADER, ERROR_STATUS_NO_HEADER)
def _eq(l1, l2):
@@ -48,16 +54,20 @@ def test_inflate_then_deflate():
txt = """Selma Lagerlöf (1858-1940) was born in Östra Emterwik, Värmland,
Sweden. She was brought up on Mårbacka, the family estate, which she did
not leave until 1881, when she went to a teachers' college at Stockholm"""
if not isinstance(txt, six.binary_type):
txt = txt.encode('utf-8')
interm = utils.deflate_and_base64_encode(txt)
bis = utils.decode_base64_and_inflate(interm)
if not isinstance(bis, six.binary_type):
bis = bis.encode('utf-8')
assert bis == txt
def test_status_success():
status = utils.success_status_factory()
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status_text in (SUCCESS_STATUS_NO_HEADER, SUCCESS_STATUS)
assert status.status_code.value == samlp.STATUS_SUCCESS
@@ -68,7 +78,7 @@ def test_error_status():
status_text = "%s" % status
print(status_text)
assert status_text == ERROR_STATUS
assert status_text in (ERROR_STATUS_NO_HEADER, ERROR_STATUS)
def test_status_from_exception():
@@ -76,7 +86,7 @@ def test_status_from_exception():
stat = utils.error_status_factory(e)
status_text = "%s" % stat
print(status_text)
assert status_text == ERROR_STATUS
assert status_text in (ERROR_STATUS_NO_HEADER, ERROR_STATUS)
def test_attribute_sn():
@@ -117,7 +127,10 @@ def test_attribute_onoff():
def test_attribute_base64():
b64sl = base64.b64encode("Selma Lagerlöf")
txt = "Selma Lagerlöf"
if not isinstance(txt, six.binary_type):
txt = txt.encode("utf-8")
b64sl = base64.b64encode(txt).decode('ascii')
attr = utils.do_attributes({"name": (b64sl, "xs:base64Binary")})
assert len(attr) == 1

View File

@@ -61,7 +61,7 @@ def test_filter_on_attributes_0():
ava = {"serialNumber": ["12345"]}
ava = filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
@@ -73,7 +73,7 @@ def test_filter_on_attributes_1():
ava = {"serialNumber": ["12345"], "givenName": ["Lars"]}
ava = filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
# ----------------------------------------------------------------------
@@ -143,12 +143,12 @@ def test_ava_filter_1():
"mail": "derek@example.com"}
ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp", None, None)
assert _eq(ava.keys(), ["givenName", "surName"])
assert _eq(list(ava.keys()), ["givenName", "surName"])
ava = {"givenName": "Derek",
"mail": "derek@nyy.umu.se"}
assert _eq(ava.keys(), ["givenName", "mail"])
assert _eq(sorted(list(ava.keys())), ["givenName", "mail"])
def test_ava_filter_2():
@@ -176,7 +176,7 @@ def test_ava_filter_2():
_ava =policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, [mail],
[gn, sn])
assert _eq(_ava.keys(), ["givenName", "surName"])
assert _eq(sorted(list(_ava.keys())), ["givenName", "surName"])
ava = {"givenName": "Derek",
"surName": "Jeter"}
@@ -241,7 +241,7 @@ def test_filter_attribute_value_assertions_0(AVA):
p.get_attribute_restrictions(""))
print(ava)
assert ava.keys() == ["surName"]
assert list(ava.keys()) == ["surName"]
assert ava["surName"] == ["Hedberg"]
@@ -267,7 +267,7 @@ def test_filter_attribute_value_assertions_1(AVA):
p.get_attribute_restrictions(""))
print(ava)
assert _eq(ava.keys(), ["surName"])
assert _eq(list(ava.keys()), ["surName"])
assert ava["surName"] == ["Howard"]
@@ -290,14 +290,14 @@ def test_filter_attribute_value_assertions_2(AVA):
p.get_attribute_restrictions(""))
print(ava)
assert _eq(ava.keys(), ["givenName"])
assert _eq(list(ava.keys()), ["givenName"])
assert ava["givenName"] == ["Ryan"]
ava = filter_attribute_value_assertions(AVA[3].copy(),
p.get_attribute_restrictions(""))
print(ava)
assert _eq(ava.keys(), ["givenName"])
assert _eq(list(ava.keys()), ["givenName"])
assert ava["givenName"] == ["Roland"]
@@ -321,16 +321,16 @@ def test_assertion_1(AVA):
ava = ava.apply_policy("", policy)
print(ava)
assert _eq(ava.keys(), [])
assert _eq(list(ava.keys()), [])
ava = Assertion(AVA[1].copy())
ava = ava.apply_policy("", policy)
assert _eq(ava.keys(), ["givenName"])
assert _eq(list(ava.keys()), ["givenName"])
assert ava["givenName"] == ["Ryan"]
ava = Assertion(AVA[3].copy())
ava = ava.apply_policy("", policy)
assert _eq(ava.keys(), ["givenName"])
assert _eq(list(ava.keys()), ["givenName"])
assert ava["givenName"] == ["Roland"]
@@ -358,10 +358,11 @@ def test_assertion_2():
assert len(attribute) == 4
names = [attr.name for attr in attribute]
assert _eq(names, ['urn:oid:0.9.2342.19200300.100.1.3',
'urn:oid:1.3.6.1.4.1.5923.1.1.1.10',
'urn:oid:2.16.840.1.113730.3.1.241',
'urn:oid:0.9.2342.19200300.100.1.1'])
assert _eq(sorted(list(names)), [
'urn:oid:0.9.2342.19200300.100.1.1',
'urn:oid:0.9.2342.19200300.100.1.3',
'urn:oid:1.3.6.1.4.1.5923.1.1.1.10',
'urn:oid:2.16.840.1.113730.3.1.241'])
# ----------------------------------------------------------------------------
@@ -389,7 +390,7 @@ def test_filter_values_req_3():
ava = {"serialNumber": ["12345"]}
ava = filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
@@ -415,7 +416,7 @@ def test_filter_values_req_5():
ava = {"serialNumber": ["12345", "54321"]}
ava = filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert ava["serialNumber"] == ["12345"]
@@ -429,7 +430,7 @@ def test_filter_values_req_6():
ava = {"serialNumber": ["12345", "54321"]}
ava = filter_on_attributes(ava, required)
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert ava["serialNumber"] == ["54321"]
@@ -446,7 +447,7 @@ def test_filter_values_req_opt_0():
ava = {"serialNumber": ["12345", "54321"]}
ava = filter_on_attributes(ava, [r], [o])
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert _eq(ava["serialNumber"], ["12345", "54321"])
@@ -464,7 +465,7 @@ def test_filter_values_req_opt_1():
ava = {"serialNumber": ["12345", "54321"]}
ava = filter_on_attributes(ava, [r], [o])
assert ava.keys() == ["serialNumber"]
assert list(ava.keys()) == ["serialNumber"]
assert _eq(ava["serialNumber"], ["12345", "54321"])
@@ -531,7 +532,7 @@ def test_filter_values_req_opt_4():
ava = assertion.filter_on_demands(ava, rava, oava)
print(ava)
assert _eq(ava.keys(), ['givenName', 'sn'])
assert _eq(sorted(list(ava.keys())), ['givenName', 'sn'])
assert ava == {'givenName': ['Roland'], 'sn': ['Hedberg']}
@@ -557,7 +558,7 @@ def test_filter_ava_0():
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
assert _eq(ava.keys(), ["givenName", "surName", "mail"])
assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
assert ava["mail"] == ["derek@nyy.mlb.com"]
@@ -584,7 +585,7 @@ def test_filter_ava_1():
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
assert _eq(ava.keys(), ["givenName", "surName"])
assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
@@ -609,7 +610,7 @@ def test_filter_ava_2():
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
assert _eq(ava.keys(), ["mail"])
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
@@ -633,7 +634,7 @@ def test_filter_ava_3():
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
assert _eq(ava.keys(), ["mail"])
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["dj@example.com"]
@@ -657,7 +658,7 @@ def test_filter_ava_4():
# No restrictions apply
ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], [])
assert _eq(ava.keys(), ['mail', 'givenName', 'surName'])
assert _eq(sorted(list(ava.keys())), ['mail', 'givenName', 'surName'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
@@ -720,7 +721,7 @@ def test_filter_on_wire_representation_1():
"edupersonaffiliation": ["staff"], "uid": ["rohe0002"]}
ava = assertion.filter_on_wire_representation(ava, acs, r, o)
assert _eq(ava.keys(), ["sn", "givenname"])
assert _eq(sorted(list(ava.keys())), ["givenname", "sn"])
def test_filter_on_wire_representation_2():
@@ -745,7 +746,7 @@ def test_filter_on_wire_representation_2():
"title": ["Master"], "uid": ["rohe0002"]}
ava = assertion.filter_on_wire_representation(ava, acs, r, o)
assert _eq(ava.keys(), ["sn", "givenname", "title"])
assert _eq(sorted(list(ava.keys())), ["givenname", "sn", "title"])
length = pword.Length(min="4")

View File

@@ -190,7 +190,7 @@ def test_ext_2():
ents = mds.with_descriptor("spsso")
for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT,
BINDING_HTTP_REDIRECT]:
assert mds.single_logout_service(ents.keys()[0], binding, "spsso")
assert mds.single_logout_service(list(ents.keys())[0], binding, "spsso")
def test_example():
@@ -201,7 +201,7 @@ def test_example():
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert idps.keys() == [
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',

View File

@@ -176,7 +176,7 @@ def test_ext_2():
ents = mds.with_descriptor("spsso")
for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT,
BINDING_HTTP_REDIRECT]:
assert mds.single_logout_service(ents.keys()[0], binding, "spsso")
assert mds.single_logout_service(list(ents.keys())[0], binding, "spsso")
def test_example():
@@ -187,7 +187,7 @@ def test_example():
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert idps.keys() == [
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',

View File

@@ -180,11 +180,11 @@ def test_1():
assert isinstance(md, MetadataStore)
assert len(c._sp_idp) == 1
assert c._sp_idp.keys() == ["urn:mace:example.com:saml:roland:idp"]
assert c._sp_idp.values() == [{'single_sign_on_service':
{
'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect':
'http://localhost:8088/sso/'}}]
assert list(c._sp_idp.keys()) == ["urn:mace:example.com:saml:roland:idp"]
assert list(c._sp_idp.values()) == [{'single_sign_on_service':
{
'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect':
'http://localhost:8088/sso/'}}]
assert c.only_use_keys_in_metadata
@@ -202,8 +202,8 @@ def test_2():
assert c._sp_required_attributes
assert len(c._sp_idp) == 1
assert c._sp_idp.keys() == [""]
assert c._sp_idp.values() == [
assert list(c._sp_idp.keys()) == [""]
assert list(c._sp_idp.values()) == [
"https://example.com/saml2/idp/SSOService.php"]
assert c.only_use_keys_in_metadata is True
@@ -263,7 +263,7 @@ def test_wayf():
c.context = "sp"
idps = c.metadata.with_descriptor("idpsso")
ent = idps.values()[0]
ent = list(idps.values())[0]
assert name(ent) == 'Example Co.'
assert name(ent, "se") == 'Exempel AB'
@@ -305,7 +305,8 @@ def test_conf_syslog():
print(handler.__dict__)
assert handler.facility == "local3"
assert handler.address == ('localhost', 514)
if sys.version >= (2, 7):
if ((sys.version_info.major == 2 and sys.version_info.minor >= 7) or
sys.version_info.major > 2):
assert handler.socktype == 2
else:
pass

View File

@@ -35,7 +35,7 @@ class TestClass:
(ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == []
assert ava.keys() == ["givenName"]
assert list(ava.keys()) == ["givenName"]
assert ava["givenName"] == ["Derek"]
def test_add_ava_info(self):

View File

@@ -37,7 +37,7 @@ class TestPopulationMemoryBased():
self.population.add_information_about_person(session_info)
issuers = self.population.issuers_of_info(nid)
assert issuers == [IDP_ONE]
assert list(issuers) == [IDP_ONE]
subjects = [code(c) for c in self.population.subjects()]
assert subjects == [cnid]
# Are any of the sources gone stale
@@ -55,7 +55,8 @@ class TestPopulationMemoryBased():
'surName': 'Andersson'}
info = self.population.get_info_from(nid, IDP_ONE)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
assert sorted(list(info.keys())) == sorted(["not_on_or_after",
"name_id", "ava"])
assert info["name_id"] == nid
assert info["ava"] == {'mail': 'anders.andersson@example.com',
'givenName': 'Anders',
@@ -93,7 +94,8 @@ class TestPopulationMemoryBased():
"eduPersonEntitlement": "Anka"}
info = self.population.get_info_from(nid, IDP_OTHER)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
assert sorted(list(info.keys())) == sorted(["not_on_or_after",
"name_id", "ava"])
assert info["name_id"] == nid
assert info["ava"] == {"eduPersonEntitlement": "Anka"}
@@ -111,7 +113,7 @@ class TestPopulationMemoryBased():
self.population.add_information_about_person(session_info)
issuers = self.population.issuers_of_info(nida)
assert issuers == [IDP_ONE]
assert list(issuers) == [IDP_ONE]
subjects = [code(c) for c in self.population.subjects()]
assert _eq(subjects, [cnid, cnida])
@@ -130,7 +132,8 @@ class TestPopulationMemoryBased():
}
info = self.population.get_info_from(nida, IDP_ONE)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
assert sorted(list(info.keys())) == sorted(["not_on_or_after",
"name_id", "ava"])
assert info["name_id"] == nida
assert info["ava"] == {"givenName": "Bertil",
"surName": "Bertilsson",
@@ -170,6 +173,7 @@ class TestPopulationMemoryBased():
"eduPersonEntitlement": "Anka"}
info = self.population.get_info_from(nid, IDP_OTHER)
assert list(info.keys()) == ["not_on_or_after", "name_id", "ava"]
assert sorted(list(info.keys())) == sorted(["not_on_or_after",
"name_id", "ava"])
assert info["name_id"] == nid
assert info["ava"] == {"eduPersonEntitlement": "Anka"}

View File

@@ -31,7 +31,7 @@ class TestMongoDBCache():
#{u'issuer': u'', u'came from': u'', u'ava': {u'givenName': [u'Derek']}, u'session_id': -1, u'not_on_or_after': 0}
ava = info["ava"]
print(ava)
assert ava.keys() == ["givenName"]
assert list(ava.keys()) == ["givenName"]
assert ava["givenName"] == ["Derek"]
def test_set_get_2(self):

View File

@@ -56,7 +56,7 @@ def test_filter_ava():
ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS)
assert _eq(ava.keys(), ['mail', 'givenName', 'sn', 'c'])
assert _eq(list(ava.keys()), ['mail', 'givenName', 'sn', 'c'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
@@ -77,7 +77,7 @@ def test_filter_ava2():
# Mismatch, policy deals with eduGAIN, metadata says SWAMID
# So only minimum should come out
assert _eq(ava.keys(), ['eduPersonTargetedID'])
assert _eq(list(ava.keys()), ['eduPersonTargetedID'])
def test_filter_ava3():
@@ -100,7 +100,7 @@ def test_filter_ava3():
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
assert _eq(ava.keys(), ['eduPersonTargetedID', "norEduPersonNIN"])
assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"])
def test_filter_ava4():
@@ -123,7 +123,7 @@ def test_filter_ava4():
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
assert _eq(ava.keys(), ['eduPersonTargetedID', "givenName", "c", "mail",
assert _eq(list(ava.keys()), ['eduPersonTargetedID', "givenName", "c", "mail",
"sn"])
@@ -147,7 +147,7 @@ def test_filter_ava5():
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
assert _eq(ava.keys(), ['eduPersonTargetedID'])
assert _eq(list(ava.keys()), ['eduPersonTargetedID'])
def test_idp_policy_filter():
@@ -161,7 +161,7 @@ def test_idp_policy_filter():
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata)
print(ava)
assert ava.keys() == ["eduPersonTargetedID"] # because no entity category
assert list(ava.keys()) == ["eduPersonTargetedID"] # because no entity category
if __name__ == "__main__":
test_idp_policy_filter()

View File

@@ -303,10 +303,11 @@ class TestSecurity():
to_sign = [(class_name(self._assertion), self._assertion.id),
(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
s_response = sigver.signed_instance_factory(response, self.sec,
to_sign)
print(s_response)
res = self.sec.verify_signature("%s" % s_response,
res = self.sec.verify_signature(s_response,
node_name=class_name(samlp.Response()))
print(res)
@@ -331,7 +332,7 @@ class TestSecurity():
assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_response,
res = self.sec.verify_signature(s_response,
node_name=class_name(samlp.Response()))
assert res
@@ -362,7 +363,7 @@ class TestSecurity():
ci = "".join(sigver.cert_from_instance(ass)[0].split())
assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_assertion,
res = self.sec.verify_signature(s_assertion,
node_name=class_name(ass))
assert res
@@ -451,9 +452,9 @@ def test_xbox():
encrypted_assertion = EncryptedAssertion()
encrypted_assertion.add_extension_element(_ass0)
_, pre = make_temp("%s" % pre_encryption_part(), decode=False)
_, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False)
enctext = sec.crypto.encrypt(
"%s" % encrypted_assertion, conf.cert_file, pre, "des-192",
str(encrypted_assertion), conf.cert_file, pre, "des-192",
'/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')

View File

@@ -9,8 +9,8 @@ from pathutils import full_path
__author__ = 'roland'
TMPL = """<?xml version='1.0' encoding='UTF-8'?>
<ns0:EncryptedData xmlns:ns0="http://www.w3.org/2001/04/xmlenc#" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" Id="ED" Type="http://www.w3.org/2001/04/xmlenc#Element"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc" /><ns1:KeyInfo><ns0:EncryptedKey Id="EK"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-1_5" /><ns1:KeyInfo><ns1:KeyName>my-rsa-key</ns1:KeyName></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedKey></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedData>"""
TMPL_NO_HEADER = """<ns0:EncryptedData xmlns:ns0="http://www.w3.org/2001/04/xmlenc#" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" Id="ED" Type="http://www.w3.org/2001/04/xmlenc#Element"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc" /><ns1:KeyInfo><ns0:EncryptedKey Id="EK"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-1_5" /><ns1:KeyInfo><ns1:KeyName>my-rsa-key</ns1:KeyName></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedKey></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedData>"""
TMPL = "<?xml version='1.0' encoding='UTF-8'?>\n%s" % TMPL_NO_HEADER
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"],
@@ -27,7 +27,7 @@ AUTHN = {
def test_pre_enc():
tmpl = pre_encryption_part()
print(tmpl)
assert "%s" % tmpl == TMPL
assert "%s" % tmpl in (TMPL_NO_HEADER, TMPL)
def test_reshuffle_response():

View File

@@ -4,7 +4,7 @@ import base64
import copy
import os
from contextlib import closing
from urlparse import parse_qs
from six.moves.urllib.parse import parse_qs
import uuid
from saml2.cert import OpenSSLWrapper
@@ -541,7 +541,6 @@ class TestServer1():
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
pefim=True,
#encrypted_advice_attributes=True,
encrypt_cert_advice=cert_str,
)
@@ -562,7 +561,7 @@ class TestServer1():
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
_, key_file = make_temp(str(cert_key_str).encode('ascii'), decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
@@ -654,7 +653,7 @@ class TestServer1():
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
_, key_file = make_temp(str(cert_key_str).encode('ascii'), decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
@@ -689,7 +688,6 @@ class TestServer1():
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice=cert_str,
)
@@ -717,7 +715,7 @@ class TestServer1():
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
_, key_file = make_temp(cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(decr_text, key_file)
@@ -750,7 +748,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice=cert_str_advice,
)
@@ -761,7 +758,7 @@ class TestServer1():
assert sresponse.signature is None
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
_, key_file = make_temp(cert_key_str_advice, decode=False)
decr_text = self.server.sec.decrypt(_resp, key_file)
@@ -783,7 +780,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice=cert_str_advice,
)
@@ -794,7 +790,7 @@ class TestServer1():
decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
_, key_file = make_temp(cert_key_str_advice, decode=False)
decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file)
@@ -825,7 +821,7 @@ class TestServer1():
assert sresponse.signature is None
_, key_file = make_temp("%s" % cert_key_str_assertion, decode=False)
_, key_file = make_temp(cert_key_str_assertion, decode=False)
decr_text = self.server.sec.decrypt(_resp, key_file)
@@ -876,7 +872,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True
)
@@ -909,7 +904,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice=cert_str_advice,
encrypt_cert_assertion=cert_str_assertion
@@ -919,11 +913,11 @@ class TestServer1():
assert sresponse.signature is None
_, key_file = make_temp("%s" % cert_key_str_assertion, decode=False)
_, key_file = make_temp(cert_key_str_assertion, decode=False)
decr_text_1 = _server.sec.decrypt(_resp, key_file)
_, key_file = make_temp("%s" % cert_key_str_advice, decode=False)
_, key_file = make_temp(cert_key_str_advice, decode=False)
decr_text_2 = _server.sec.decrypt(decr_text_1, key_file)
@@ -944,7 +938,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True
)
@@ -974,7 +967,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice="whatever",
encrypt_cert_assertion="whatever"
@@ -996,7 +988,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice="whatever",
)
@@ -1039,7 +1030,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice="whatever",
encrypt_cert_assertion="whatever"
@@ -1061,7 +1051,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice="whatever",
)
@@ -1104,7 +1093,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
)
@@ -1120,7 +1108,6 @@ class TestServer1():
sign_assertion=False,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True
)
@@ -1160,7 +1147,7 @@ class TestServer1():
issuer_entity_id="urn:mace:example.com:saml:roland:idp",
reason="I'm tired of this")
intermed = base64.b64encode("%s" % logout_request)
intermed = base64.b64encode(str(logout_request).encode('utf-8'))
#saml_soap = make_soap_enveloped_saml_thingy(logout_request)
request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST)

View File

@@ -4,8 +4,7 @@
import base64
import uuid
import six
import urllib
import urlparse
from six.moves.urllib.parse import parse_qs, urlencode, urlparse
from saml2.cert import OpenSSLWrapper
from saml2.xmldsig import SIG_RSA_SHA256
from saml2 import BINDING_HTTP_POST
@@ -76,9 +75,13 @@ def add_subelement(xmldoc, node_name, subelem):
while xmldoc[c] == " ":
spaces += " "
c += 1
# Sometimes we get an xml header, sometimes we don't.
subelem_str = str(subelem)
if subelem_str[0:5].lower() == '<?xml':
subelem_str = subelem_str.split("\n", 1)[1]
xmldoc = xmldoc.replace(
"<%s:%s%s/>" % (tag, node_name, spaces),
"<%s:%s%s>%s</%s:%s>" % (tag, node_name, spaces, subelem, tag,
"<%s:%s%s>%s</%s:%s>" % (tag, node_name, spaces, subelem_str, tag,
node_name))
return xmldoc
@@ -161,7 +164,7 @@ class TestClient:
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
format=saml.NAMEID_FORMAT_PERSISTENT,
message_id="id1")
reqstr = "%s" % req.to_string()
reqstr = "%s" % req.to_string().decode('utf-8')
assert req.destination == "https://idp.example.com/idp/"
assert req.id == "id1"
@@ -267,7 +270,7 @@ class TestClient:
assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
def test_create_auth_request_vo(self):
assert self.client.config.vorg.keys() == [
assert list(self.client.config.vorg.keys()) == [
"urn:mace:example.com:it:tek"]
ar_str = "%s" % self.client.create_authn_request(
@@ -336,7 +339,7 @@ class TestClient:
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -378,7 +381,7 @@ class TestClient:
userid="also0001@example.com",
authn=AUTHN)
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -415,7 +418,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -423,14 +425,13 @@ class TestClient:
sign_assertion=True,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_advice=cert_str
)
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -453,7 +454,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -461,13 +461,12 @@ class TestClient:
sign_assertion=True,
encrypt_assertion=False,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
)
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -490,7 +489,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -498,13 +496,12 @@ class TestClient:
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
)
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -535,7 +532,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -543,14 +539,13 @@ class TestClient:
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_assertion=cert_str
)
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -589,7 +584,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -597,7 +591,6 @@ class TestClient:
sign_assertion=True,
encrypt_assertion=True,
encrypt_assertion_self_contained=True,
#encrypted_advice_attributes=True,
pefim=True,
encrypt_cert_assertion=cert_assertion_str,
encrypt_cert_advice=cert_advice_str
@@ -605,7 +598,7 @@ class TestClient:
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -628,7 +621,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -641,7 +633,7 @@ class TestClient:
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -672,7 +664,6 @@ class TestClient:
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
#name_id_policy=nameid_policy,
name_id=self.name_id,
userid="foba0001@example.com",
authn=AUTHN,
@@ -685,7 +676,7 @@ class TestClient:
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
authn_response = _client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -842,7 +833,7 @@ class TestClient:
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
resp_str = base64.encodestring(enctext.encode('utf-8'))
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -917,7 +908,7 @@ class TestClient:
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
resp_str = base64.encodestring(enctext.encode('utf-8'))
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -1151,7 +1142,7 @@ class TestClient:
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring("%s" % response)
resp_str = base64.encodestring(str(response).encode('utf-8'))
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
@@ -1177,7 +1168,7 @@ class TestClient:
relay_state="relay2", sigalg=SIG_RSA_SHA256, key=key)
loc = info["headers"][0][1]
qs = urlparse.parse_qs(loc[1:])
qs = parse_qs(loc[1:])
assert _leq(qs.keys(),
['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
@@ -1214,8 +1205,8 @@ class TestClientWithDummy():
assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == []
redirect_url = http_args["headers"][0][1]
_, _, _, _, qs, _ = urlparse.urlparse(redirect_url)
qs_dict = urlparse.parse_qs(qs)
_, _, _, _, qs, _ = urlparse(redirect_url)
qs_dict = parse_qs(qs)
req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0],
binding)
resp_args = self.server.response_args(req.message, [response_binding])
@@ -1234,8 +1225,8 @@ class TestClientWithDummy():
assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == []
redirect_url = http_args["headers"][0][1]
_, _, _, _, qs, _ = urlparse.urlparse(redirect_url)
qs_dict = urlparse.parse_qs(qs)
_, _, _, _, qs, _ = urlparse(redirect_url)
qs_dict = parse_qs(qs)
req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0],
binding)
resp_args = self.server.response_args(req.message, [response_binding])
@@ -1268,7 +1259,7 @@ class TestClientWithDummy():
print(resp)
assert resp
assert len(resp) == 1
assert resp.keys() == entity_ids
assert list(resp.keys()) == entity_ids
response = resp[entity_ids[0]]
assert isinstance(response, LogoutResponse)
@@ -1288,7 +1279,7 @@ class TestClientWithDummy():
# Here I fake what the client will do
# create the form post
http_args["data"] = urllib.urlencode(_dic)
http_args["data"] = urlencode(_dic)
http_args["method"] = "POST"
http_args["dummy"] = _dic["SAMLRequest"]
http_args["headers"] = [('Content-type',
@@ -1323,7 +1314,7 @@ class TestClientWithDummy():
# Here I fake what the client will do
# create the form post
http_args["data"] = urllib.urlencode(_dic)
http_args["data"] = urlencode(_dic)
http_args["method"] = "POST"
http_args["dummy"] = _dic["SAMLRequest"]
http_args["headers"] = [('Content-type',

View File

@@ -64,7 +64,7 @@ class TestSP():
"urn:mace:example.com:saml:roland:sp", trans_name_policy,
"foba0001@example.com", authn=AUTHN)
resp_str = base64.encodestring(resp_str)
resp_str = base64.encodestring(resp_str.encode('utf-8'))
self.sp.outstanding_queries = {"id1": "http://www.example.com/service"}
session_info = self.sp._eval_authn_response(
{}, {"SAMLResponse": [resp_str]})

View File

@@ -33,7 +33,7 @@ class TestVirtualOrg():
conf.load_file("server_conf")
self.sp = Saml2Client(conf)
vo_name = conf.vorg.keys()[0]
vo_name = list(conf.vorg.keys())[0]
self.vo = conf.vorg[vo_name]
add_derek_info(self.sp)
@@ -62,7 +62,7 @@ class TestVirtualOrg_2():
def setup_class(self):
conf = config.SPConfig()
conf.load_file("server_conf")
vo_name = conf.vorg.keys()[0]
vo_name = list(conf.vorg.keys())[0]
self.sp = Saml2Client(conf, virtual_organization=vo_name)
add_derek_info(self.sp)

View File

@@ -1,8 +1,8 @@
import base64
from contextlib import closing
from hashlib import sha1
from urlparse import urlparse
from urlparse import parse_qs
from six.moves.urllib.parse import urlparse
from six.moves.urllib.parse import parse_qs
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_POST
@@ -54,14 +54,14 @@ def get_msg(hinfo, binding, response=False):
def test_create_artifact():
b64art = create_artifact("http://sp.example.com/saml.xml",
"aabbccddeeffgghhiijj")
b"aabbccddeeffgghhiijj")
art = base64.b64decode(b64art)
art = base64.b64decode(b64art.encode('ascii'))
assert art[:2] == '\x00\x04'
assert art[:2] == ARTIFACT_TYPECODE
assert int(art[2:4]) == 0
s = sha1("http://sp.example.com/saml.xml")
s = sha1(b"http://sp.example.com/saml.xml")
assert art[4:24] == s.digest()
SP = 'urn:mace:example.com:saml:roland:sp'
@@ -74,7 +74,7 @@ def test_create_artifact_resolve():
#assert artifact[:2] == '\x00\x04'
#assert int(artifact[2:4]) == 0
#
s = sha1(SP)
s = sha1(SP.encode('ascii'))
assert artifact[4:24] == s.digest()
with closing(Server(config_file="idp_all_conf")) as idp:
@@ -116,7 +116,7 @@ def test_artifact_flow():
[BINDING_HTTP_ARTIFACT],
entity_id=idp.config.entityid)
hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state)
hinfo = sp.apply_binding(binding, artifact, destination, relay_state)
# ========== @IDP ============

View File

@@ -1,5 +1,5 @@
from contextlib import closing
from urlparse import urlparse, parse_qs
from six.moves.urllib.parse import urlparse, parse_qs
from saml2 import BINDING_SOAP, BINDING_HTTP_POST
__author__ = 'rolandh'

View File

@@ -1,6 +1,6 @@
from contextlib import closing
from urlparse import parse_qs
from urlparse import urlparse
from six.moves.urllib.parse import parse_qs
from six.moves.urllib.parse import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import AuthnRequest
from saml2.samlp import NameIDPolicy

View File

@@ -7,7 +7,7 @@ from saml2.server import Server
from saml2 import BINDING_HTTP_REDIRECT
from saml2.client import Saml2Client
from saml2.config import SPConfig
from urlparse import parse_qs
from six.moves.urllib.parse import parse_qs
from pathutils import dotname
@@ -54,4 +54,4 @@ def test():
if __name__ == "__main__":
test()
test()