Fix python3 syntax errors

Retains python2.7 compatibility for all files. Fixes only syntax errors,
tests still fail on python3 for various reasons.
This commit is contained in:
Clint Byrum 2015-05-15 10:52:33 -07:00
parent 4ef44631df
commit 99911f6c4c
80 changed files with 489 additions and 484 deletions

View File

@ -1065,5 +1065,5 @@ if __name__ == '__main__':
PORT = CONFIG.PORT
SRV = make_server(HOST, PORT, application)
print "IdP listening on %s:%s" % (HOST, PORT)
print("IdP listening on %s:%s" % (HOST, PORT))
SRV.serve_forever()

View File

@ -1019,7 +1019,7 @@ if __name__ == '__main__':
PORT = CONFIG.PORT
SRV = make_server(HOST, PORT, application)
print "IdP listening on %s:%s" % (HOST, PORT)
print("IdP listening on %s:%s" % (HOST, PORT))
SRV.serve_forever()
else:
_rot = args.mako_root

View File

@ -269,10 +269,10 @@ class SSO(Service):
try:
resp_args = IDP.response_args(_authn_req)
_resp = None
except UnknownPrincipal, excp:
except UnknownPrincipal as excp:
_resp = IDP.create_error_response(_authn_req.id,
self.destination, excp)
except UnsupportedBinding, excp:
except UnsupportedBinding as excp:
_resp = IDP.create_error_response(_authn_req.id,
self.destination, excp)
@ -281,11 +281,11 @@ class SSO(Service):
def do(self, query, binding_in, relay_state=""):
try:
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal, excp:
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s" % (excp,))
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding, excp:
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s" % (excp,))
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
@ -305,7 +305,7 @@ class SSO(Service):
identity, userid=self.user,
authn=AUTHN_BROKER[self.environ["idp.authn_ref"]], sign_assertion=sign_assertion,
sign_response=False, **resp_args)
except Exception, excp:
except Exception as excp:
logging.error(exception_trace(excp))
resp = ServiceError("Exception: %s" % (excp,))
return resp(self.environ, self.start_response)
@ -548,7 +548,7 @@ class SLO(Service):
_, body = request.split("\n")
logger.debug("req: '%s'" % body)
req_info = IDP.parse_logout_request(body, binding)
except Exception, exc:
except Exception as exc:
logger.error("Bad request: %s" % exc)
resp = BadRequest("%s" % exc)
return resp(self.environ, self.start_response)
@ -565,7 +565,7 @@ class SLO(Service):
# remove the authentication
try:
IDP.session_db.remove_authn_statements(msg.name_id)
except KeyError, exc:
except KeyError as exc:
logger.error("ServiceError: %s" % exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@ -574,7 +574,7 @@ class SLO(Service):
try:
hinfo = IDP.apply_binding(binding, "%s" % resp, "", relay_state)
except Exception, exc:
except Exception as exc:
logger.error("ServiceError: %s" % exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@ -986,7 +986,7 @@ if __name__ == '__main__':
PORT = 8088
SRV = make_server(HOST, PORT, application)
print "IdP listening on %s:%s" % (HOST, PORT)
print("IdP listening on %s:%s" % (HOST, PORT))
SRV.serve_forever()
else:
_rot = args.mako_root

View File

@ -293,5 +293,5 @@ if __name__ == '__main__':
from wsgiref.simple_server import make_server
srv = make_server(HOST, PORT, app_with_auth)
print "SP listening on %s:%s" % (HOST, PORT)
print("SP listening on %s:%s" % (HOST, PORT))
srv.serve_forever()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import logging
import re
import argparse
@ -353,18 +354,18 @@ class ACS(Service):
try:
self.response = self.sp.parse_authn_request_response(
response, binding, self.outstanding_queries, self.cache.outstanding_certs)
except UnknownPrincipal, excp:
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s" % (excp,))
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding, excp:
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s" % (excp,))
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
except VerificationError, err:
except VerificationError as err:
resp = ServiceError("Verification error: %s" % (err,))
return resp(self.environ, self.start_response)
except Exception, err:
except Exception as err:
resp = ServiceError("Other error: %s" % (err,))
return resp(self.environ, self.start_response)
@ -580,7 +581,7 @@ class SSO(object):
if cert is not None:
self.cache.outstanding_certs[_sid] = cert
except Exception, exc:
except Exception as exc:
logger.exception(exc)
resp = ServiceError(
"Failed to construct the AuthnRequest: %s" % exc)
@ -784,14 +785,14 @@ def application(environ, start_response):
if re.match(".*static/.*", path):
return handle_static(environ, start_response, path)
return not_found(environ, start_response)
except StatusError, err:
except StatusError as err:
logging.error("StatusError: %s" % err)
resp = BadRequest("%s" % err)
return resp(environ, start_response)
except Exception, err:
except Exception as err:
#_err = exception_trace("RUN", err)
#logging.error(exception_trace("RUN", _err))
print >> sys.stderr, err
print(err, file=sys.stderr)
resp = ServiceError("%s" % err)
return resp(environ, start_response)
@ -850,7 +851,7 @@ if __name__ == '__main__':
SERVER_KEY, CERT_CHAIN)
_https = " using SSL/TLS"
logger.info("Server starting")
print "SP listening on %s:%s%s" % (HOST, PORT, _https)
print("SP listening on %s:%s%s" % (HOST, PORT, _https))
try:
SRV.start()
except KeyboardInterrupt:

View File

@ -7,4 +7,4 @@ __author__ = 'rhoerbe'
import json, sys
jdata = json.load(sys.stdin)
for k in jdata:
print k["id"]
print(k["id"])

View File

@ -7,4 +7,4 @@ __author__ = 'rhoerbe'
import json, sys
jdata = json.load(sys.stdin)
for k in jdata:
print k["id"]
print(k["id"])

View File

@ -5,6 +5,6 @@ from subprocess import call
for line in fileinput.input():
cmd = "./run_oper.sh " + line.rstrip()
print "executing " + cmd
print("executing " + cmd)
call(cmd, shell=True)

View File

@ -1,3 +1,4 @@
from __future__ import print_function
from importlib import import_module
import json
import os
@ -250,7 +251,7 @@ class SAML2client(object):
"""
"""
print >> sys.stderr, 80 * ":"
print(80 * ":", file=sys.stderr)
hndlr2.setFormatter(formatter_2)
memhndlr.setTarget(hndlr2)
memhndlr.flush()
@ -288,8 +289,8 @@ class SAML2client(object):
try:
self.setup()
except (AttributeError, ToOld), err:
print >> sys.stdout, "Configuration Error: %s" % err
except (AttributeError, ToOld) as err:
print("Configuration Error: %s" % err, file=sys.stderr)
self.client = Saml2Client(self.sp_config)
conv = None
@ -323,17 +324,17 @@ class SAML2client(object):
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
err = None
except CheckError, err:
except CheckError as err:
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
except FatalError, err:
except FatalError as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
except Exception, err:
except Exception as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
@ -344,7 +345,7 @@ class SAML2client(object):
if pp:
pp.pprint(tsum)
else:
print >> sys.stdout, json.dumps(tsum)
print(json.dumps(tsum), file=sys.stdout)
if tsum["status"] > 1 or self.args.debug or err:
self.output_log(memoryhandler, streamhandler)
@ -391,21 +392,21 @@ class SAML2client(object):
lista.append(item)
print json.dumps(lista)
print(json.dumps(lista))
def _get_operation(self, operation):
return self.operations.OPERATIONS[operation]
def make_meta(self):
self.sp_configure(True)
print entity_descriptor(self.sp_config)
print(entity_descriptor(self.sp_config))
def list_conf_id(self):
sys.path.insert(0, ".")
mod = import_module("config")
_res = dict([(key, cnf["description"]) for key, cnf in
mod.CONFIG.items()])
print json.dumps(_res)
print(json.dumps(_res))
def verify_metadata(self):
self.json_config = self.json_config_file()
@ -421,4 +422,4 @@ class SAML2client(object):
chk = CheckSaml2IntMetaData()
output = []
res = chk(env, output)
print >> sys.stdout, res
print(res, file=sys.stdout)

View File

@ -241,7 +241,7 @@ class Conversation(tool.Conversation):
except KeyError:
pass
logger.info("SAML Response: %s" % _resp)
except FatalError, ferr:
except FatalError as ferr:
if _resp:
logger.info("Faulty response: %s" % _resp)
logger.error("Exception %s" % ferr)
@ -250,7 +250,7 @@ class Conversation(tool.Conversation):
return False
except CheckError:
raise
except Exception, err:
except Exception as err:
if _resp:
logger.info("Faulty response: %s" % _resp)
logger.error("Exception %s" % err)

View File

@ -399,7 +399,7 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False,
"""
cinst = None
#print "make_vals(%s, %s)" % (val, klass)
#print("make_vals(%s, %s)" % (val, klass))
if isinstance(val, dict):
cinst = klass().loadd(val, base64encode=base64encode)
@ -719,7 +719,7 @@ class SamlBase(ExtensionContainer):
:return: The instance
"""
#print "set_text: %s" % (val,)
#print("set_text: %s" % (val,))
if isinstance(val, bool):
if val:
setattr(self, "text", "true")
@ -751,7 +751,7 @@ class SamlBase(ExtensionContainer):
"""
for prop, _typ, _req in self.c_attributes.values():
#print "# %s" % (prop)
#print("# %s" % (prop))
if prop in ava:
if isinstance(ava[prop], bool):
setattr(self, prop, "%s" % ava[prop])
@ -764,9 +764,9 @@ class SamlBase(ExtensionContainer):
self.set_text(ava["text"], base64encode)
for prop, klassdef in self.c_children.values():
#print "## %s, %s" % (prop, klassdef)
#print("## %s, %s" % (prop, klassdef))
if prop in ava:
#print "### %s" % ava[prop]
#print("### %s" % ava[prop])
# means there can be a list of values
if isinstance(klassdef, list):
make_vals(ava[prop], klassdef[0], self, prop,

View File

@ -205,9 +205,9 @@ def list_to_local(acs, attrlist, allow_unknown_attributes=False):
def from_local(acs, ava, name_format):
for aconv in acs:
#print ac.format, name_format
#print(ac.format, name_format)
if aconv.name_format == name_format:
#print "Found a name_form converter"
#print("Found a name_form converter")
return aconv.to_(ava)
return None
@ -221,9 +221,9 @@ def from_local_name(acs, attr, name_format):
:return: An Attribute instance
"""
for aconv in acs:
#print ac.format, name_format
#print(ac.format, name_format)
if aconv.name_format == name_format:
#print "Found a name_form converter"
#print("Found a name_form converter")
return aconv.to_format(attr)
return attr
@ -244,7 +244,7 @@ def to_local_name(acs, attr):
def get_local_name(acs, attr, name_format):
for aconv in acs:
#print ac.format, name_format
#print(ac.format, name_format)
if aconv.name_format == name_format:
return aconv._fro[attr]

View File

@ -352,7 +352,7 @@ class OpenSSLWrapper(object):
crypto.verify(ca_cert, signature, cert_certificate,
cert_algorithm)
return True, "Signed certificate is valid and correctly signed by CA certificate."
except crypto.Error, e:
except crypto.Error as e:
return False, "Certificate is incorrectly signed."
except Exception, e:
except Exception as e:
return False, "Certificate is not valid for an unknown reason."

View File

@ -213,7 +213,7 @@ class Client(Entity):
# url I started off with.
pass
else:
print response.error
print(response.error)
raise SAMLError(
"Error POSTing package to SP: %s" % response.error)
@ -290,7 +290,7 @@ class Client(Entity):
except (soap.XmlParseError, AssertionError, KeyError):
pass
#print "RESP",response, self.http.response
#print("RESP",response, self.http.response)
if response.status_code != 404:
raise SAMLError("Error performing operation: %s" % (

View File

@ -942,7 +942,7 @@ class Entity(HTTPBase):
try:
response = response_cls(self.sec, **kwargs)
except Exception, exc:
except Exception as exc:
logger.info("%s" % exc)
raise
@ -953,13 +953,13 @@ class Entity(HTTPBase):
try:
response = response.loads(xmlstr, False, origxml=origxml)
except SigverError, err:
except SigverError as err:
logger.error("Signature Error: %s" % err)
raise
except UnsolicitedResponse:
logger.error("Unsolicited response")
raise
except Exception, err:
except Exception as err:
if "not well-formed" in "%s" % err:
logger.error("Not well-formed XML")
raise

View File

@ -77,10 +77,10 @@ def _since_epoch(cdate):
try:
t = time.strptime(cdate, "%d %b %Y %H:%M:%S %Z") # e.g. 18 Apr 2014 12:30:51 GMT
except ValueError:
raise Exception, 'ValueError: Date "{0}" does not match any of '.format(cdate) + \
raise (Exception, 'ValueError: Date "{0}" does not match any of '.format(cdate) + \
'"%d-%b-%Y %H:%M:%S %Z", ' + \
'"%d-%b-%y %H:%M:%S %Z", ' + \
'"%d %b %Y %H:%M:%S %Z".'
'"%d %b %Y %H:%M:%S %Z".')
#return int(time.mktime(t))
return calendar.timegm(t)
@ -130,7 +130,7 @@ class HTTPBase(object):
for _, a in list(self.cookiejar._cookies.items()):
for _, b in a.items():
for cookie in list(b.values()):
# print cookie
# print(cookie)
if cookie.expires and cookie.expires <= now:
continue
if not re.search("%s$" % cookie.domain, _domain):
@ -235,7 +235,7 @@ class HTTPBase(object):
pass
r = requests.request(method, url, **_kwargs)
logger.debug("Response status: %s" % r.status_code)
except requests.ConnectionError, exc:
except requests.ConnectionError as exc:
raise ConnectionError("%s" % exc)
try:
@ -346,7 +346,7 @@ class HTTPBase(object):
args = self.use_soap(request, destination, headers, sign)
args["headers"] = dict(args["headers"])
response = self.send(**args)
except Exception, exc:
except Exception as exc:
logger.info("HTTPClient exception: %s" % (exc,))
raise

View File

@ -1,3 +1,4 @@
from __future__ import print_function
from dircache import listdir
import logging
import os
@ -360,9 +361,8 @@ class InMemoryMetaData(MetaData):
# have I seen this entity_id before ? If so if log: ignore it
if entity_descr.entity_id in self.entity:
print >> sys.stderr, \
"Duplicated Entity descriptor (entity id: '%s')" % \
entity_descr.entity_id
print("Duplicated Entity descriptor (entity id: '%s')" %
entity_descr.entity_id, file=sys.stderr)
return
_ent = to_dict(entity_descr, self.onts)
@ -404,7 +404,7 @@ class InMemoryMetaData(MetaData):
else:
try:
valid_instance(self.entities_descr)
except NotValid, exc:
except NotValid as exc:
logger.error(exc.args[0])
return
@ -589,7 +589,7 @@ class MetaDataLoader(MetaDataFile):
module, attr = func[:i], func[i + 1:]
try:
mod = import_module(module)
except Exception, e:
except Exception as e:
raise RuntimeError(
'Cannot find metadata provider function %s: "%s"' % (func, e))
@ -1156,7 +1156,7 @@ class MetadataStore(object):
for ent_id, ent_desc in _md.items():
if descriptor in ent_desc:
if ent_id in res:
#print "duplicated entity_id: %s" % res
#print("duplicated entity_id: %s" % res)
pass
else:
res.append(ent_id)

View File

@ -208,11 +208,11 @@ def parse_soap_enveloped_saml(text, body_class, header_class=None):
envelope = ElementTree.fromstring(text)
assert envelope.tag == '{%s}Envelope' % NAMESPACE
#print len(envelope)
#print(len(envelope))
body = None
header = {}
for part in envelope:
#print ">",part.tag
#print(">",part.tag)
if part.tag == '{%s}Body' % NAMESPACE:
for sub in part:
try:
@ -223,11 +223,11 @@ def parse_soap_enveloped_saml(text, body_class, header_class=None):
elif part.tag == '{%s}Header' % NAMESPACE:
if not header_class:
raise Exception("Header where I didn't expect one")
#print "--- HEADER ---"
#print("--- HEADER ---")
for sub in part:
#print ">>",sub.tag
#print(">>",sub.tag)
for klass in header_class:
#print "?{%s}%s" % (klass.c_namespace,klass.c_tag)
#print("?{%s}%s" % (klass.c_namespace,klass.c_tag))
if sub.tag == "{%s}%s" % (klass.c_namespace, klass.c_tag):
header[sub.tag] = \
saml2.create_class_from_element_tree(klass, sub)

View File

@ -50,7 +50,7 @@ class Request(object):
only_valid_cert=only_valid_cert)
except TypeError:
raise
except Exception, excp:
except Exception as excp:
logger.info("EXCEPTION: %s", excp)
if not self.message:
@ -62,7 +62,7 @@ class Request(object):
try:
valid_instance(self.message)
except NotValid, exc:
except NotValid as exc:
logger.error("Not valid request: %s" % exc.args[0])
raise
@ -74,8 +74,8 @@ class Request(object):
self.timeslack).timetuple()
lower = time_util.shift_time(time_util.time_a_while_ago(days=1),
- self.timeslack).timetuple()
# print "issue_instant: %s" % self.message.issue_instant
# print "%s < x < %s" % (lower, upper)
# print("issue_instant: %s" % self.message.issue_instant)
# print("%s < x < %s" % (lower, upper))
issued_at = time_util.str_to_time(self.message.issue_instant)
return issued_at > lower and issued_at < upper

View File

@ -205,7 +205,7 @@ def for_me(conditions, myself):
if audience.text.strip() == myself:
return True
else:
#print "Not for me: %s != %s" % (audience.text.strip(), myself)
#print("Not for me: %s != %s" % (audience.text.strip(), myself))
pass
return False
@ -328,7 +328,7 @@ class StatusResponse(object):
logger.exception("EXCEPTION: %s", excp)
raise
#print "<", self.response
#print("<", self.response)
return self._postamble()
@ -360,8 +360,8 @@ class StatusResponse(object):
self.timeslack).timetuple()
lower = time_util.shift_time(time_util.time_a_while_ago(days=1),
-self.timeslack).timetuple()
# print "issue_instant: %s" % self.response.issue_instant
# print "%s < x < %s" % (lower, upper)
# print("issue_instant: %s" % self.response.issue_instant)
# print("%s < x < %s" % (lower, upper))
issued_at = str_to_time(self.response.issue_instant)
return lower < issued_at < upper
@ -1108,7 +1108,7 @@ class AssertionIDResponse(object):
logger.exception("EXCEPTION: %s", excp)
raise
#print "<", self.response
#print("<", self.response)
return self._postamble()

View File

@ -356,7 +356,7 @@ class SAML2Plugin(object):
relay_state=came_from)
logger.debug("ht_args: %s" % ht_args)
except Exception, exc:
except Exception as exc:
logger.exception(exc)
raise Exception(
"Failed to construct the AuthnRequest: %s" % exc)
@ -403,12 +403,12 @@ class SAML2Plugin(object):
post["SAMLResponse"][0], binding, self.outstanding_queries,
self.outstanding_certs)
except Exception, excp:
except Exception as excp:
logger.exception("Exception: %s" % (excp,))
raise
session_info = authresp.session_info()
except TypeError, excp:
except TypeError as excp:
logger.exception("Exception: %s" % (excp,))
return None
@ -526,17 +526,17 @@ class SAML2Plugin(object):
session_info = self._eval_authn_response(
environ, post,
binding=binding)
except Exception, err:
except Exception as err:
environ["s2repoze.saml_error"] = err
return {}
except TypeError, exc:
except TypeError as exc:
# might be a ECP (=SOAP) response
body = environ.get('s2repoze.body', None)
if body:
# might be a ECP response
try:
session_info = self.do_ecp_response(body, environ)
except Exception, err:
except Exception as err:
environ["post.fieldstorage"] = post
environ["s2repoze.saml_error"] = err
return {}

View File

@ -424,7 +424,7 @@ def dynamic_importer(name, class_name=None):
try:
fp, pathname, description = imp.find_module(name)
except ImportError:
print "unable to locate module: " + name
print("unable to locate module: " + name)
return None, None
try:

View File

@ -100,7 +100,7 @@ def _decode_attribute_value(typ, text):
def _verify_value_type(typ, val):
#print "verify value type: %s, %s" % (typ, val)
#print("verify value type: %s, %s" % (typ, val))
if typ == XSD + "string":
try:
return str(val)
@ -253,7 +253,7 @@ class AttributeValueBase(SamlBase):
for attribute, value in tree.attrib.iteritems():
self._convert_element_attribute_to_member(attribute, value)
if tree.text:
#print "set_text:", tree.text
#print("set_text:", tree.text)
# clear type
#self.clear_type()
self.set_text(tree.text)

View File

@ -119,7 +119,7 @@ class Server(Entity):
elif isinstance(dbspec, basestring):
idb = shelve.open(dbspec, writeback=True)
else: # database spec is a a 2-tuple (type, address)
#print >> sys.stderr, "DBSPEC: %s" % (dbspec,)
#print(>> sys.stderr, "DBSPEC: %s" % (dbspec,))
(typ, addr) = dbspec
if typ == "shelve":
idb = shelve.open(addr, writeback=True)
@ -288,7 +288,7 @@ class Server(Entity):
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
@ -565,7 +565,7 @@ class Server(Entity):
name_id_policy)
logger.debug("construct_nameid: %s => %s" % (userid,
name_id))
except IOError, exc:
except IOError as exc:
response = self.create_error_response(in_response_to,
destination,
sp_entity_id,
@ -608,7 +608,7 @@ class Server(Entity):
encrypted_advice_attributes=encrypted_advice_attributes,
encrypt_cert=encrypt_cert)
except MissingValue, exc:
except MissingValue as exc:
return self.create_error_response(in_response_to, destination,
sp_entity_id, exc, name_id)

View File

@ -232,7 +232,7 @@ def _make_vals(val, klass, seccont, klass_inst=None, prop=None, part=False,
"""
cinst = None
#print "make_vals(%s, %s)" % (val, klass)
#print("make_vals(%s, %s)" % (val, klass))
if isinstance(val, dict):
cinst = _instance(klass, val, seccont, base64encode=base64encode,
@ -261,7 +261,7 @@ def _instance(klass, ava, seccont, base64encode=False, elements_to_sign=None):
instance = klass()
for prop in instance.c_attributes.values():
#print "# %s" % (prop)
#print("# %s" % (prop))
if prop in ava:
if isinstance(ava[prop], bool):
setattr(instance, prop, "%s" % ava[prop])
@ -274,9 +274,9 @@ def _instance(klass, ava, seccont, base64encode=False, elements_to_sign=None):
instance.set_text(ava["text"], base64encode)
for prop, klassdef in instance.c_children.values():
#print "## %s, %s" % (prop, klassdef)
#print("## %s, %s" % (prop, klassdef))
if prop in ava:
#print "### %s" % ava[prop]
#print("### %s" % ava[prop])
if isinstance(klassdef, list):
# means there can be a list of values
_make_vals(ava[prop], klassdef[0], seccont, instance, prop,
@ -316,9 +316,9 @@ def signed_instance_factory(instance, seccont, elements_to_sign=None):
signed_xml = seccont.sign_statement(
signed_xml, node_name=node_name, node_id=nodeid)
#print "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#print "%s" % signed_xml
#print "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
#print("%s" % signed_xml)
#print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
return signed_xml
else:
return instance
@ -406,7 +406,7 @@ def cert_from_key_info(key_info, ignore_age=False):
"""
res = []
for x509_data in key_info.x509_data:
#print "X509Data",x509_data
#print("X509Data",x509_data)
x509_certificate = x509_data.x509_certificate
cert = x509_certificate.text.strip()
cert = "\n".join(split_len("".join([s.strip() for s in
@ -886,15 +886,15 @@ class CryptoBackendXmlSec1(CryptoBackend):
if self.__DEBUG:
try:
print " ".join(com_list)
print( " ".join(com_list))
except TypeError:
print "cert_type", cert_type
print "cert_file", cert_file
print "node_name", node_name
print "fil", fil
print("cert_type", cert_type)
print("cert_file", cert_file)
print("node_name", node_name)
print("fil", fil)
raise
print "%s: %s" % (cert_file, os.access(cert_file, os.F_OK))
print "%s: %s" % (fil, os.access(fil, os.F_OK))
print("%s: %s" % (cert_file, os.access(cert_file, os.F_OK)))
print("%s: %s" % (fil, os.access(fil, os.F_OK)))
(_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil],
exception=SignatureError)
@ -930,7 +930,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
try:
if validate_output:
parse_xmlsec_output(p_err)
except XmlsecError, exc:
except XmlsecError as exc:
logger.error(LOG_LINE_2 % (p_out, p_err, exc))
raise
@ -1339,7 +1339,7 @@ class SecurityContext(object):
def _check_signature(self, decoded_xml, item, node_name=NODE_NAME,
origdoc=None, id_attr="", must=False,
only_valid_cert=False):
#print item
#print(item)
try:
issuer = item.issuer.text.strip()
except AttributeError:
@ -1374,7 +1374,7 @@ class SecurityContext(object):
if not certs:
raise MissingKey("%s" % issuer)
#print certs
#print(certs)
verified = False
last_pem_file = None
@ -1402,13 +1402,13 @@ class SecurityContext(object):
node_id=item.id, id_attr=id_attr):
verified = True
break
except XmlsecError, exc:
except XmlsecError as exc:
logger.error("check_sig: %s" % exc)
pass
except SignatureError, exc:
except SignatureError as exc:
logger.error("check_sig: %s" % exc)
pass
except Exception, exc:
except Exception as exc:
logger.error("check_sig: %s" % exc)
raise
@ -1612,7 +1612,7 @@ class SecurityContext(object):
# try:
# self._check_signature(decoded_xml, assertion,
# class_name(assertion), origdoc)
# except Exception, exc:
# except Exception as exc:
# logger.error("correctly_signed_response: %s" % exc)
# raise
@ -1863,4 +1863,4 @@ if __name__ == '__main__':
args = parser.parse_args()
if args.listsigalgs:
print '\n'.join([key for key, value in SIGNER_ALGS.items()])
print('\n'.join([key for key, value in SIGNER_ALGS.items()]))

View File

@ -184,7 +184,7 @@ def class_instances_from_soap_enveloped_saml_thingies(text, modules):
"""
try:
envelope = ElementTree.fromstring(text)
except Exception, exc:
except Exception as exc:
raise XmlParseError("%s" % exc)
assert envelope.tag == '{%s}Envelope' % soapenv.NAMESPACE
@ -210,7 +210,7 @@ def open_soap_envelope(text):
"""
try:
envelope = ElementTree.fromstring(text)
except Exception, exc:
except Exception as exc:
raise XmlParseError("%s" % exc)
assert envelope.tag == '{%s}Envelope' % soapenv.NAMESPACE

View File

@ -5,6 +5,7 @@
Implements some usefull functions when dealing with validity of
different types of information.
"""
from __future__ import print_function
import calendar
import re
@ -72,7 +73,7 @@ def parse_duration(duration):
dlen = len(duration)
for code, typ in D_FORMAT:
#print duration[index:], code
#print(duration[index:], code)
if duration[index] == '-':
raise Exception("Negation not allowed on individual items")
if code == "T":
@ -241,8 +242,8 @@ def str_to_time(timestr, format=TIME_FORMAT):
except ValueError: # assume it's a format problem
try:
elem = TIME_FORMAT_WITH_FRAGMENT.match(timestr)
except Exception, exc:
print >> sys.stderr, "Exception: %s on %s" % (exc, timestr)
except Exception as exc:
print("Exception: %s on %s" % (exc, timestr), file=sys.stderr)
raise
then = time.strptime(elem.groups()[0] + "Z", TIME_FORMAT)

View File

@ -337,10 +337,10 @@ def valid(typ, value):
def _valid_instance(instance, val):
try:
val.verify()
except NotValid, exc:
except NotValid as exc:
raise NotValid("Class '%s' instance: %s" % (
instance.__class__.__name__, exc.args[0]))
except OutsideCardinality, exc:
except OutsideCardinality as exc:
raise NotValid(
"Class '%s' instance cardinality error: %s" % (
instance.__class__.__name__, exc.args[0]))
@ -361,7 +361,7 @@ def valid_instance(instance):
try:
validate_value_type(instance.text.strip(),
instclass.c_value_type)
except NotValid, exc:
except NotValid as exc:
raise NotValid("Class '%s' instance: %s" % (class_name,
exc.args[0]))
@ -382,7 +382,7 @@ def valid_instance(instance):
validate_value_type(value, spec)
else:
valid(typ, value)
except (NotValid, ValueError), exc:
except (NotValid, ValueError) as exc:
txt = ERROR_TEXT % (value, name, exc.args[0])
raise NotValid("Class '%s' instance: %s" % (class_name, txt))

View File

@ -144,7 +144,7 @@ class Conversation(object):
try:
logger.info("GET %s" % url)
_response = self.client.send(url, "GET")
except Exception, err:
except Exception as err:
raise FatalError("%s" % err)
content = _response.text
@ -224,7 +224,7 @@ class Conversation(object):
raise OperationError()
except (FatalError, InteractionNeeded, OperationError):
raise
except Exception, err:
except Exception as err:
self.err_check("exception", err, False)
self.last_response = _response
@ -300,7 +300,7 @@ class Conversation(object):
break
except (FatalError, OperationError):
raise
except Exception, err:
except Exception as err:
#self.err_check("exception", err)
raise

View File

@ -1,3 +1,4 @@
from __future__ import print_function
import json
import pprint
import argparse
@ -123,7 +124,7 @@ class Client(object):
self.idp_config.key_file = os.path.join(self.args.keysdir, "non_md_key.pem")
for f in [self.idp_config.cert_file, self.idp_config.key_file]:
if not os.path.isfile(f):
print "File not found: %s" % os.path.abspath(f)
print("File not found: %s" % os.path.abspath(f))
raise
self.idp = Server(config=self.idp_config)
@ -153,7 +154,7 @@ class Client(object):
"""
"""
print >> sys.stderr, 80 * ":"
print(80 * ":", file=sys.stderr)
hndlr2.setFormatter(formatter_2)
memhndlr.setTarget(hndlr2)
memhndlr.flush()
@ -187,10 +188,11 @@ class Client(object):
try:
oper = self.tests.OPERATIONS[self.args.oper]
except ValueError:
print >> sys.stderr, "Undefined testcase " + self.args.oper
print("Undefined testcase " + self.args.oper,
file=sys.stderr)
return
else:
print >> sys.stderr, "Undefined testcase " + self.args.oper
print("Undefined testcase " + self.args.oper, file=sys.stderr)
return
if self.args.pretty:
@ -210,17 +212,17 @@ class Client(object):
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
err = None
except CheckError, err:
except CheckError as err:
self.test_log = conv.test_output
tsum = self.test_summation(self.args.oper)
except FatalError, err:
except FatalError as err:
if conv:
self.test_log = conv.test_output
self.test_log.append(exception_trace("RUN", err))
else:
self.test_log = exception_trace("RUN", err)
tsum = self.test_summation(self.args.oper)
except Exception, err:
except Exception as err:
if conv:
conv.test_output.append({"status": CRITICAL,
"name": "test driver error",
@ -237,7 +239,7 @@ class Client(object):
if pp:
pp.pprint(tsum)
else:
print >> sys.stdout, json.dumps(tsum)
print(json.dumps(tsum), file=sys.stdout)
if tsum["status"] > 1 or self.args.debug or err:
self.output_log(memoryhandler, streamhandler)
@ -290,7 +292,7 @@ class Client(object):
for key, val in self.operations.OPERATIONS.items():
res.append({"id": key, "name": val["name"]})
print json.dumps(res)
print(json.dumps(res))
def verify_metadata(self):
pass

View File

@ -264,7 +264,7 @@ class Conversation():
else:
try:
_response = self.instance.send(url, "GET")
except Exception, err:
except Exception as err:
raise FatalError("%s" % err)
self._log_response(_response)
@ -468,7 +468,7 @@ class Conversation():
else:
try:
_response = self.instance.send(url, "GET")
except Exception, err:
except Exception as err:
raise FatalError("%s" % err)
self._log_response(_response)
@ -527,7 +527,7 @@ class Conversation():
break
except (FatalError, InteractionNeeded):
raise
except Exception, err:
except Exception as err:
self.err_check("exception", err, False)
self.last_response = _response

View File

@ -150,10 +150,10 @@ class TestPKCS11():
})
)
self.configured = True
except Exception, ex:
print "-" * 64
except Exception as ex:
print("-" * 64)
traceback.print_exc()
print "-" * 64
print("-" * 64)
logging.warning("PKCS11 tests disabled: unable to initialize test token: %s" % ex)
raise
@ -176,7 +176,7 @@ class TestPKCS11():
env = {}
if self.softhsm_conf is not None:
env['SOFTHSM_CONF'] = self.softhsm_conf
#print "env SOFTHSM_CONF=%s " % softhsm_conf +" ".join(args)
#print("env SOFTHSM_CONF=%s " % softhsm_conf +" ".join(args))
logging.debug("Environment {!r}".format(env))
logging.debug("Executing {!r}".format(args))
args = ['ls']
@ -197,24 +197,24 @@ class TestPKCS11():
os.environ['SOFTHSM_CONF'] = self.softhsm_conf
ass = self._assertion
print ass
print(ass)
sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id)
#print sign_ass
#print(sign_ass)
sass = saml.assertion_from_string(sign_ass)
#print sass
#print(sass)
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
print "Crypto version : %s" % (self.sec.crypto.version())
print("Crypto version : %s" % (self.sec.crypto.version()))
item = self.sec.check_signature(sass, class_name(sass), sign_ass)
assert isinstance(item, saml.Assertion)
print "Test PASSED"
print("Test PASSED")
def test_xmlsec_cryptobackend():

View File

@ -99,4 +99,4 @@ info = {
}
}
print json.dumps(info)
print(json.dumps(info))

View File

@ -64,4 +64,4 @@ info = {
"name_format": NAME_FORMAT_URI
}
print json.dumps(info)
print(json.dumps(info))

View File

@ -84,9 +84,9 @@ def test_org_1():
"url": [("http://example.com","en")],
}
org = metadata.do_organization_info(desc)
print org
print(org)
assert isinstance(org, md.Organization)
print org.keyswv()
print(org.keyswv())
assert _eq(org.keyswv(), ['organization_name',
'organization_display_name','organization_url'])
assert len(org.organization_name) == 3
@ -100,7 +100,7 @@ def test_org_2():
"url": ("http://example.com","en"),
}
org = metadata.do_organization_info(desc)
print org
print(org)
assert _eq(org.keyswv(), ['organization_name',
'organization_display_name','organization_url'])
assert len(org.organization_name) == 3
@ -129,7 +129,7 @@ def test_contact_0():
assert _eq(contact_person[0].keyswv(), ['given_name', 'sur_name',
'contact_type', 'telephone_number',
"email_address"])
print contact_person[0]
print(contact_person[0])
person = contact_person[0]
assert person.contact_type == "technical"
assert isinstance(person.given_name, md.GivenName)
@ -145,7 +145,7 @@ def test_contact_0():
def test_do_endpoints():
eps = metadata.do_endpoints(SP["service"]["sp"]["endpoints"],
metadata.ENDPOINTS["sp"])
print eps
print(eps)
assert _eq(eps.keys(), ["assertion_consumer_service",
"single_logout_service"])
@ -168,7 +168,7 @@ def test_required_attributes():
SP["service"]["sp"]["required_attributes"],
attrconverters, is_required="true")
assert len(ras) == len(SP["service"]["sp"]["required_attributes"])
print ras[0]
print(ras[0])
assert ras[0].name == 'urn:oid:2.5.4.4'
assert ras[0].name_format == NAME_FORMAT_URI
assert ras[0].is_required == "true"
@ -179,7 +179,7 @@ def test_optional_attributes():
SP["service"]["sp"]["optional_attributes"],
attrconverters)
assert len(ras) == len(SP["service"]["sp"]["optional_attributes"])
print ras[0]
print(ras[0])
assert ras[0].name == 'urn:oid:2.5.4.12'
assert ras[0].name_format == NAME_FORMAT_URI
assert ras[0].is_required == "false"
@ -200,7 +200,7 @@ def test_do_sp_sso_descriptor():
assert spsso.want_assertions_signed == "true"
assert len (spsso.attribute_consuming_service) == 1
acs = spsso.attribute_consuming_service[0]
print acs.keyswv()
print(acs.keyswv())
assert _eq(acs.keyswv(), ['requested_attribute', 'service_name',
'service_description', 'index'])
assert acs.service_name[0].text == SP["name"]
@ -218,7 +218,7 @@ def test_do_sp_sso_descriptor_2():
spsso = metadata.do_spsso_descriptor(conf)
assert isinstance(spsso, md.SPSSODescriptor)
print spsso.keyswv()
print(spsso.keyswv())
assert _eq(spsso.keyswv(), ['authn_requests_signed',
'attribute_consuming_service',
'single_logout_service',
@ -229,11 +229,11 @@ def test_do_sp_sso_descriptor_2():
exts = spsso.extensions.extension_elements
assert len(exts) == 1
print exts
print(exts)
idpd = saml2.extension_element_to_element(exts[0],
idpdisc.ELEMENT_FROM_STRING,
namespace=idpdisc.NAMESPACE)
print idpd
print(idpd)
assert idpd.location == "http://example.com/sp/ds"
assert idpd.index == "0"
assert idpd.binding == "urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol"
@ -241,13 +241,13 @@ def test_do_sp_sso_descriptor_2():
def test_entity_description():
#confd = eval(open("../tests/server.config").read())
confd = SPConfig().load_file("server_conf")
print confd.attribute_converters
print(confd.attribute_converters)
entd = metadata.entity_descriptor(confd)
assert entd is not None
print entd.keyswv()
print(entd.keyswv())
assert _eq(entd.keyswv(), ['valid_until', 'entity_id', 'contact_person',
'spsso_descriptor', 'organization'])
print entd
print(entd)
assert entd.entity_id == "urn:mace:example.com:saml:roland:sp"
def test_do_idp_sso_descriptor():
@ -261,7 +261,7 @@ def test_do_idp_sso_descriptor():
"extensions"])
exts = idpsso.extensions.extension_elements
assert len(exts) == 2
print exts
print(exts)
inst = saml2.extension_element_to_element(exts[0],
shibmd.ELEMENT_FROM_STRING,
namespace=shibmd.NAMESPACE)

View File

@ -72,4 +72,4 @@ info = {
},
}
print json.dumps(info)
print(json.dumps(info))

View File

@ -86,7 +86,7 @@ class TestSPKIData:
def testUsingTestData(self):
"""Test for spki_data_from_string() using test data"""
new_spki_data = ds.spki_data_from_string(ds_data.TEST_SPKI_DATA)
print new_spki_data
print(new_spki_data)
assert new_spki_data.spki_sexp[0].text.strip() == "spki sexp"
assert new_spki_data.spki_sexp[1].text.strip() == "spki sexp2"
@ -148,7 +148,7 @@ class TestX509Data:
def testAccessors(self):
"""Test for X509Data accessors"""
st = ds.x509_issuer_serial_from_string(ds_data.TEST_X509_ISSUER_SERIAL)
print st
print(st)
self.x509_data.x509_issuer_serial= st
self.x509_data.x509_ski = ds.X509SKI(text="x509 ski")
self.x509_data.x509_subject_name = ds.X509SubjectName(
@ -158,8 +158,8 @@ class TestX509Data:
self.x509_data.x509_crl = ds.X509CRL(text="x509 crl")
new_x509_data = ds.x509_data_from_string(self.x509_data.to_string())
print new_x509_data.keyswv()
print new_x509_data.__dict__.keys()
print(new_x509_data.keyswv())
print(new_x509_data.__dict__.keys())
assert new_x509_data.x509_issuer_serial
assert isinstance(new_x509_data.x509_issuer_serial, ds.X509IssuerSerial)
assert new_x509_data.x509_ski.text.strip() == "x509 ski"

View File

@ -45,7 +45,7 @@ data2 = """<?xml version='1.0' encoding='UTF-8'?>
def test_2():
ed = xenc.encrypted_data_from_string(data2)
assert ed
print ed
print(ed)
assert ed.type == "http://www.w3.org/2001/04/xmlenc#Element"
assert ed.encryption_method is not None
em = ed.encryption_method
@ -76,7 +76,7 @@ data3 = """<?xml version='1.0' encoding='UTF-8'?>
def test_3():
ed = xenc.encrypted_data_from_string(data3)
assert ed
print ed
print(ed)
assert ed.encryption_method != None
em = ed.encryption_method
assert em.algorithm == 'http://www.w3.org/2001/04/xmlenc#aes128-cbc'
@ -126,7 +126,7 @@ data4 = """<?xml version='1.0' encoding='UTF-8'?>
def test_4():
ek = xenc.encrypted_key_from_string(data4)
assert ek
print ek
print(ek)
assert ek.encryption_method != None
em = ek.encryption_method
assert em.algorithm == 'http://www.w3.org/2001/04/xmlenc#rsa-1_5'
@ -159,8 +159,8 @@ data5 = """<CipherReference URI="http://www.example.com/CipherValues.xml"
def test_5():
cr = xenc.cipher_reference_from_string(data5)
assert cr
print cr
print cr.keyswv()
print(cr)
print(cr.keyswv())
trs = cr.transforms
assert len(trs.transform) == 2
tr = trs.transform[0]
@ -189,7 +189,7 @@ data6 = """<ReferenceList xmlns="http://www.w3.org/2001/04/xmlenc#">
def test_6():
rl = xenc.reference_list_from_string(data6)
assert rl
print rl
print(rl)
assert len(rl.data_reference) == 1
dr = rl.data_reference[0]
assert dr.uri == "#invoice34"

View File

@ -40,7 +40,7 @@ class TestExtensionElement:
ee.loadd(ava)
del ava["tag"]
print ava
print(ava)
ee = saml2.ExtensionElement("")
raises(KeyError, "ee.loadd(ava)")
@ -144,7 +144,7 @@ class TestExtensionContainer:
}]
ees = [saml2.ExtensionElement("").loadd(a) for a in avas]
print ees
print(ees)
ec = saml2.ExtensionContainer(extension_elements=ees)
esl = ec.find_extensions(tag="tag2")
assert len(esl) == 1
@ -195,7 +195,7 @@ class TestSAMLBase:
}
foo = saml2.make_vals(ava, Issuer, part=True)
print foo
print(foo)
assert foo.format == NAMEID_FORMAT_EMAILADDRESS
assert foo.sp_name_qualifier == "loa"
assert foo.text == "free text"
@ -204,7 +204,7 @@ class TestSAMLBase:
ava = "free text"
foo = saml2.make_vals(ava, Issuer, part=True)
print foo
print(foo)
assert foo.keyswv() == ["text"]
assert foo.text == "free text"
@ -224,8 +224,8 @@ class TestSAMLBase:
txt = foo.to_string()
nsstr = foo.to_string({"saml": saml.NAMESPACE})
assert nsstr != txt
print txt
print nsstr
print(txt)
print(nsstr)
assert "saml:AttributeValue" in nsstr
assert "saml:AttributeValue" not in txt
@ -677,28 +677,28 @@ class TestAttribute:
def test_basic_str(self):
attribute = saml.attribute_from_string(BASIC_STR_AV)
print attribute
print(attribute)
assert attribute.attribute_value[0].text.strip() == "By-Tor"
def test_basic_int(self):
attribute = saml.attribute_from_string(BASIC_INT_AV)
print attribute
print(attribute)
assert attribute.attribute_value[0].text == "23"
def test_basic_base64(self):
attribute = saml.attribute_from_string(BASIC_BASE64_AV)
print attribute
print(attribute)
assert attribute.attribute_value[0].text == "VU5JTkVUVA=="
assert attribute.attribute_value[0].get_type() == "xs:base64Binary"
def test_basic_boolean_true(self):
attribute = saml.attribute_from_string(BASIC_BOOLEAN_TRUE_AV)
print attribute
print(attribute)
assert attribute.attribute_value[0].text.lower() == "true"
def test_basic_boolean_false(self):
attribute = saml.attribute_from_string(BASIC_BOOLEAN_FALSE_AV)
print attribute
print(attribute)
assert attribute.attribute_value[0].text.lower() == "false"
@ -1090,7 +1090,7 @@ class TestEvidence:
self.evidence.assertion.append(saml.Assertion())
self.evidence.encrypted_assertion.append(saml.EncryptedAssertion())
new_evidence = saml.evidence_from_string(self.evidence.to_string())
print new_evidence
print(new_evidence)
assert self.evidence.to_string() == new_evidence.to_string()
assert isinstance(new_evidence.assertion_id_ref[0],
saml.AssertionIDRef)

View File

@ -170,7 +170,7 @@ def test_ee_1():
ee = saml2.extension_element_from_string(
"""<?xml version='1.0' encoding='UTF-8'?><foo>bar</foo>""")
assert ee != None
print ee.__dict__
print(ee.__dict__)
assert ee.attributes == {}
assert ee.tag == "foo"
assert ee.namespace == None
@ -182,7 +182,7 @@ def test_ee_2():
ee = saml2.extension_element_from_string(
"""<?xml version='1.0' encoding='UTF-8'?><foo id="xyz">bar</foo>""")
assert ee != None
print ee.__dict__
print(ee.__dict__)
assert ee.attributes == {"id": "xyz"}
assert ee.tag == "foo"
assert ee.namespace == None
@ -196,7 +196,7 @@ def test_ee_3():
<foo xmlns="urn:mace:example.com:saml:ns"
id="xyz">bar</foo>""")
assert ee != None
print ee.__dict__
print(ee.__dict__)
assert ee.attributes == {"id": "xyz"}
assert ee.tag == "foo"
assert ee.namespace == "urn:mace:example.com:saml:ns"
@ -210,7 +210,7 @@ def test_ee_4():
<foo xmlns="urn:mace:example.com:saml:ns">
<id>xyz</id><bar>tre</bar></foo>""")
assert ee != None
print ee.__dict__
print(ee.__dict__)
assert ee.attributes == {}
assert ee.tag == "foo"
assert ee.namespace == "urn:mace:example.com:saml:ns"
@ -221,7 +221,7 @@ def test_ee_4():
ids = ee.find_children("id", "urn:mace:example.com:saml:ns")
assert ids != []
cid = ids[0]
print cid.__dict__
print(cid.__dict__)
assert cid.attributes == {}
assert cid.tag == "id"
assert cid.namespace == "urn:mace:example.com:saml:ns"
@ -241,7 +241,7 @@ def test_ee_5():
ee.children.append(ce)
assert ee != None
print ee.__dict__
print(ee.__dict__)
assert ee.attributes == {}
assert ee.tag == "foo"
assert ee.namespace == "urn:mace:example.com:saml:ns"
@ -249,7 +249,7 @@ def test_ee_5():
assert ee.text.strip() == "bar"
c = ee.children[0]
print c.__dict__
print(c.__dict__)
child = ee.find_children(namespace="urn:mace:example.com:saml:cu")
assert len(child) == 1
@ -259,7 +259,7 @@ def test_ee_5():
assert len(child) == 1
child = ee.find_children("edugain", "urn:mace:example.com:saml:cu")
assert len(child) == 0
print ee.to_string()
print(ee.to_string())
def test_ee_6():
@ -277,7 +277,7 @@ def test_ee_6():
pee = saml2._extension_element_from_element_tree(et)
assert pee != None
print pee.__dict__
print(pee.__dict__)
assert pee.attributes == {}
assert pee.tag == "foo"
assert pee.namespace == "urn:mace:example.com:saml:ns"
@ -285,7 +285,7 @@ def test_ee_6():
assert pee.text.strip() == "bar"
c = pee.children[0]
print c.__dict__
print(c.__dict__)
child = pee.find_children(namespace="urn:mace:example.com:saml:cu")
assert len(child) == 1
@ -295,7 +295,7 @@ def test_ee_6():
assert len(child) == 1
child = pee.find_children("edugain", "urn:mace:example.com:saml:cu")
assert len(child) == 0
print pee.to_string()
print(pee.to_string())
NAMEID_WITH_ATTRIBUTE_EXTENSION = """<?xml version="1.0" encoding="utf-8"?>
@ -312,7 +312,7 @@ NAMEID_WITH_ATTRIBUTE_EXTENSION = """<?xml version="1.0" encoding="utf-8"?>
def test_nameid_with_extension():
kl = create_class_from_xml_string(NameID, NAMEID_WITH_ATTRIBUTE_EXTENSION)
assert kl != None
print kl.__dict__
print(kl.__dict__)
assert kl.format == "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
assert kl.sp_provided_id == "sp provided id"
assert kl.text.strip() == "roland@example.com"
@ -346,7 +346,7 @@ def test_subject_confirmation_with_extension():
kl = create_class_from_xml_string(SubjectConfirmation,
SUBJECT_CONFIRMATION_WITH_MEMBER_EXTENSION)
assert kl != None
print kl.__dict__
print(kl.__dict__)
assert kl.extension_attributes == {}
assert kl.method == "urn:oasis:names:tc:SAML:2.0:cm:bearer"
name_id = kl.name_id
@ -376,8 +376,8 @@ def test_to_fro_string_1():
txt = kl.to_string()
cpy = create_class_from_xml_string(SubjectConfirmation, txt)
print kl.__dict__
print cpy.__dict__
print(kl.__dict__)
print(cpy.__dict__)
assert kl.text.strip() == cpy.text.strip()
assert _eq(kl.keyswv(), cpy.keyswv())
@ -405,7 +405,7 @@ def test_make_vals_list_of_strs():
def test_attribute_element_to_extension_element():
attr = create_class_from_xml_string(Attribute, saml2_data.TEST_ATTRIBUTE)
ee = saml2.element_to_extension_element(attr)
print ee.__dict__
print(ee.__dict__)
assert ee.tag == "Attribute"
assert ee.namespace == 'urn:oasis:names:tc:SAML:2.0:assertion'
assert _eq(ee.attributes.keys(), ['FriendlyName', 'Name', 'NameFormat'])
@ -436,7 +436,7 @@ def test_ee_7():
</ExternalEntityAttributeAuthority>
""")
print ee.__dict__
print(ee.__dict__)
assert len(ee.children) == 2
for child in ee.children:
assert child.namespace == "urn:oasis:names:tc:SAML:metadata:dynamicsaml"
@ -479,7 +479,7 @@ def test_extension_element_loadd():
}
ee = saml2.ExtensionElement(ava["tag"]).loadd(ava)
print ee.__dict__
print(ee.__dict__)
assert len(ee.children) == 2
for child in ee.children:
assert child.namespace == "urn:oasis:names:tc:SAML:metadata:dynamicsaml"
@ -529,7 +529,7 @@ def test_extensions_loadd():
extension = saml2.SamlBase()
extension.loadd(ava)
print extension.__dict__
print(extension.__dict__)
assert len(extension.extension_elements) == 1
ee = extension.extension_elements[0]
assert len(ee.children) == 2

View File

@ -82,7 +82,7 @@ class TestStatusCode:
self.status_code.value = samlp.STATUS_RESPONDER
self.status_code.status_code = samlp.StatusCode(
value=samlp.STATUS_REQUEST_DENIED)
print self.status_code.__dict__
print(self.status_code.__dict__)
new_status_code = samlp.status_code_from_string(self.status_code.to_string())
assert new_status_code.value == samlp.STATUS_RESPONDER
assert new_status_code.status_code.value == \

View File

@ -33,7 +33,7 @@ class TestEndpointType:
self.endpoint.binding = saml2.BINDING_HTTP_POST
self.endpoint.location = "http://www.example.com/endpoint"
self.endpoint.response_location = "http://www.example.com/response"
print self.endpoint.__class__.c_attributes.items()
print(self.endpoint.__class__.c_attributes.items())
new_endpoint = md.endpoint_type__from_string(self.endpoint.to_string())
assert new_endpoint.binding == saml2.BINDING_HTTP_POST
assert new_endpoint.location == "http://www.example.com/endpoint"
@ -111,7 +111,7 @@ class TestOrganizationName:
"""Test for organization_name_from_string() using test data."""
new_organization_name = md.organization_name_from_string(
md_data.TEST_ORGANIZATION_NAME)
print new_organization_name.keyswv()
print(new_organization_name.keyswv())
assert new_organization_name.lang == "se"
assert new_organization_name.text.strip() == "Catalogix"
@ -147,7 +147,7 @@ class TestOrganizationURL:
"""Test for OrganizationURL accessors"""
self.organization_url.lang = "ja"
self.organization_url.text = "http://www.example.com/"
print self.organization_url.to_string()
print(self.organization_url.to_string())
new_organization_url = md.organization_url_from_string(
self.organization_url.to_string())
assert new_organization_url.lang == "ja"
@ -953,10 +953,10 @@ class TestSPSSODescriptor:
self.sp_sso_descriptor.attribute_consuming_service.append(
md.AttributeConsumingService())
print self.sp_sso_descriptor
print(self.sp_sso_descriptor)
new_sp_sso_descriptor = md.spsso_descriptor_from_string(
self.sp_sso_descriptor.to_string())
print new_sp_sso_descriptor
print(new_sp_sso_descriptor)
assert new_sp_sso_descriptor.id == "ID"
assert new_sp_sso_descriptor.valid_until == "2008-09-14T01:05:02Z"
assert new_sp_sso_descriptor.cache_duration == "10:00:00:00"
@ -999,13 +999,13 @@ class TestSPSSODescriptor:
assert new_sp_sso_descriptor.error_url == "http://www.example.com/errorURL"
assert isinstance(new_sp_sso_descriptor.signature, ds.Signature)
assert isinstance(new_sp_sso_descriptor.extensions, md.Extensions)
print new_sp_sso_descriptor.extensions.__dict__
print(new_sp_sso_descriptor.extensions.__dict__)
assert len(new_sp_sso_descriptor.extensions.extension_elements) == 2
for eelem in new_sp_sso_descriptor.extensions.extension_elements:
print "EE",eelem.__dict__
print("EE",eelem.__dict__)
dp = extension_element_to_element(eelem, idpdisc.ELEMENT_FROM_STRING,
idpdisc.NAMESPACE)
print "DP",dp.c_tag, dp.c_namespace,dp.__dict__
print("DP",dp.c_tag, dp.c_namespace,dp.__dict__)
assert isinstance(dp, idpdisc.DiscoveryResponse)
assert isinstance(new_sp_sso_descriptor.key_descriptor[0],
md.KeyDescriptor)

View File

@ -33,13 +33,13 @@ def _eq(l1, l2):
def _oeq(l1, l2):
if len(l1) != len(l2):
print "Different number of items"
print("Different number of items")
return False
for item in l1:
if item not in l2:
print "%s not in l2" % (item,)
print("%s not in l2" % (item,))
for ite in l2:
print "\t%s" % (ite,)
print("\t%s" % (ite,))
return False
return True
@ -67,7 +67,7 @@ def test_error_status():
samlp.STATUS_RESPONDER)
status_text = "%s" % status
print status_text
print(status_text)
assert status_text == ERROR_STATUS
@ -75,14 +75,14 @@ def test_status_from_exception():
e = utils.UnknownPrincipal("Error resolving principal")
stat = utils.error_status_factory(e)
status_text = "%s" % stat
print status_text
print(status_text)
assert status_text == ERROR_STATUS
def test_attribute_sn():
attr = utils.do_attributes({"surName": ("Jeter", "")})
assert len(attr) == 1
print attr
print(attr)
inst = attr[0]
assert inst.name == "surName"
assert len(inst.attribute_value) == 1
@ -95,7 +95,7 @@ def test_attribute_age():
assert len(attr) == 1
inst = attr[0]
print inst
print(inst)
assert inst.name == "age"
assert len(inst.attribute_value) == 1
av = inst.attribute_value[0]
@ -108,7 +108,7 @@ def test_attribute_onoff():
assert len(attr) == 1
inst = attr[0]
print inst
print(inst)
assert inst.name == "onoff"
assert len(inst.attribute_value) == 1
av = inst.attribute_value[0]
@ -122,7 +122,7 @@ def test_attribute_base64():
assert len(attr) == 1
inst = attr[0]
print inst
print(inst)
assert inst.name == "name"
assert len(inst.attribute_value) == 1
av = inst.attribute_value[0]
@ -133,7 +133,7 @@ def test_attribute_base64():
def test_attribute_statement():
statement = do_attribute_statement({"surName": ("Jeter", ""),
"givenName": ("Derek", "")})
print statement
print(statement)
assert statement.keyswv() == ["attribute"]
assert len(statement.attribute) == 2
attr0 = statement.attribute[0]
@ -304,7 +304,7 @@ def test_parse_attribute_map():
assert _eq(forward.keys(), backward.values())
assert _eq(forward.values(), backward.keys())
print forward.keys()
print(forward.keys())
assert _oeq(forward.keys(), [
('urn:oid:1.3.6.1.4.1.5923.1.1.1.7', NAME_FORMAT_URI),
('urn:oid:0.9.2342.19200300.100.1.1', NAME_FORMAT_URI),

View File

@ -28,13 +28,13 @@ class TestAC():
self.acs = attribute_converter.ac_factory(full_path("attributemaps"))
def test_setup(self):
print self.acs
print(self.acs)
assert len(self.acs) == 3
assert _eq([a.name_format for a in self.acs],[BASIC_NF, URI_NF, SAML1] )
def test_ava_fro_1(self):
ats = saml.attribute_statement_from_string(STATEMENT1)
#print ats
#print(ats)
ava = None
for ac in self.acs:
@ -45,7 +45,7 @@ class TestAC():
# break if we have something
if ava:
break
print ava.keys()
print(ava.keys())
assert _eq(ava.keys(), ['givenName', 'displayName', 'uid',
'eduPersonNickname', 'street',
'eduPersonScopedAffiliation',
@ -56,12 +56,12 @@ class TestAC():
def test_ava_fro_2(self):
ats = saml.attribute_statement_from_string(STATEMENT2)
#print ats
#print(ats)
ava = {}
for ac in self.acs:
ava.update(ac.fro(ats))
print ava.keys()
print(ava.keys())
assert _eq(ava.keys(), ['eduPersonEntitlement', 'eduPersonAffiliation',
'uid', 'mail', 'givenName', 'sn'])
@ -146,7 +146,7 @@ class TestAC():
#
# result = attribute_converter.ava_fro(self.acs, attr)
#
# print result
# print(result)
# assert result == {'givenName': [], 'sn': [], 'title': []}
def test_to_local_name_from_basic(self):
@ -197,7 +197,7 @@ def test_noop_attribute_conversion():
aconv = AttributeConverterNOOP(URI_NF)
res = aconv.to_(ava)
print res
print(res)
assert len(res) == 2
for attr in res:
assert len(attr.attribute_value) == 1

View File

@ -240,7 +240,7 @@ def test_filter_attribute_value_assertions_0(AVA):
ava = filter_attribute_value_assertions(AVA[3].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert ava.keys() == ["surName"]
assert ava["surName"] == ["Hedberg"]
@ -258,7 +258,7 @@ def test_filter_attribute_value_assertions_1(AVA):
ava = filter_attribute_value_assertions(AVA[0].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["surName"] == ["Jeter"]
assert ava["givenName"] == ["Derek"]
@ -266,7 +266,7 @@ def test_filter_attribute_value_assertions_1(AVA):
ava = filter_attribute_value_assertions(AVA[1].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert _eq(ava.keys(), ["surName"])
assert ava["surName"] == ["Howard"]
@ -283,20 +283,20 @@ def test_filter_attribute_value_assertions_2(AVA):
ava = filter_attribute_value_assertions(AVA[0].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert _eq(ava.keys(), [])
ava = filter_attribute_value_assertions(AVA[1].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Ryan"]
ava = filter_attribute_value_assertions(AVA[3].copy(),
p.get_attribute_restrictions(""))
print ava
print(ava)
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Roland"]
@ -307,8 +307,8 @@ def test_filter_attribute_value_assertions_2(AVA):
def test_assertion_1(AVA):
ava = Assertion(AVA[0])
print ava
print ava.__dict__
print(ava)
print(ava.__dict__)
policy = Policy({
"default": {
@ -320,7 +320,7 @@ def test_assertion_1(AVA):
ava = ava.apply_policy("", policy)
print ava
print(ava)
assert _eq(ava.keys(), [])
ava = Assertion(AVA[1].copy())
@ -530,7 +530,7 @@ def test_filter_values_req_opt_4():
"eduPersonAffiliation": ["staff"], "uid": ["rohe0002"]}
ava = assertion.filter_on_demands(ava, rava, oava)
print ava
print(ava)
assert _eq(ava.keys(), ['givenName', 'sn'])
assert ava == {'givenName': ['Roland'], 'sn': ['Hedberg']}
@ -772,7 +772,7 @@ def test_assertion_with_noop_attribute_conv():
policy, issuer=issuer, authn_decl=ACD,
authn_auth="authn_authn")
print msg
print(msg)
for attr in msg.attribute_statement[0].attribute:
assert attr.name_format == NAME_FORMAT_URI
assert len(attr.attribute_value) == 1
@ -820,7 +820,7 @@ def test_assertion_with_zero_attributes():
policy, issuer=issuer, authn_decl=ACD,
authn_auth="authn_authn")
print msg
print(msg)
assert msg.attribute_statement == []
@ -842,7 +842,7 @@ def test_assertion_with_authn_instant():
authn_auth="authn_authn",
authn_instant=1234567890)
print msg
print(msg)
assert msg.authn_statement[0].authn_instant == "2009-02-13T23:31:30Z"

View File

@ -43,7 +43,7 @@ def test_construct_contact():
"__class__": _class(md.EmailAddress)}],
}, ONTS)
print c
print(c)
assert c.given_name.text == "Roland"
assert c.sur_name.text == "Hedberg"
assert c.email_address[0].text == "roland@catalogix.se"

View File

@ -148,10 +148,10 @@ def test_incommon_1():
mds.imp(METADATACONF["2"])
print mds.entities()
print(mds.entities())
assert mds.entities() > 1700
idps = mds.with_descriptor("idpsso")
print idps.keys()
print(idps.keys())
assert len(idps) > 300 # ~ 18%
try:
_ = mds.single_sign_on_service('urn:mace:incommon:uiuc.edu')
@ -160,7 +160,7 @@ def test_incommon_1():
idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu')
assert len(idpsso) == 1
print idpsso
print(idpsso)
assert destinations(idpsso) == [
'https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
@ -176,7 +176,7 @@ def test_incommon_1():
# Look for attribute authorities
aas = mds.with_descriptor("attribute_authority")
print aas.keys()
print(aas.keys())
assert len(aas) == 180
@ -216,24 +216,24 @@ def test_switch_1():
mds.imp(METADATACONF["5"])
assert len(mds.keys()) > 160
idps = mds.with_descriptor("idpsso")
print idps.keys()
print(idps.keys())
idpsso = mds.single_sign_on_service(
'https://aai-demo-idp.switch.ch/idp/shibboleth')
assert len(idpsso) == 1
print idpsso
print(idpsso)
assert destinations(idpsso) == [
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
assert len(idps) > 30
aas = mds.with_descriptor("attribute_authority")
print aas.keys()
print(aas.keys())
aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
print aad.keys()
print(aad.keys())
assert len(aad["attribute_authority_descriptor"]) == 1
assert len(aad["idpsso_descriptor"]) == 1
sps = mds.with_descriptor("spsso")
dual = [eid for eid, ent in idps.items() if eid in sps]
print len(dual)
print(len(dual))
assert len(dual) == 0
@ -243,7 +243,7 @@ def test_metadata_file():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["8"])
print len(mds.keys())
print(len(mds.keys()))
assert len(mds.keys()) == 560
@ -280,7 +280,7 @@ def test_load_local_dir():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["9"])
print mds
print(mds)
assert len(mds) == 3 # Three sources
assert len(mds.keys()) == 4 # number of idps
@ -291,7 +291,7 @@ def test_load_extern_incommon():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["10"])
print mds
print(mds)
assert mds
assert len(mds.keys())

View File

@ -134,10 +134,10 @@ def test_incommon_1():
mds.imp(METADATACONF["2"])
print mds.entities()
print(mds.entities())
assert mds.entities() > 1700
idps = mds.with_descriptor("idpsso")
print idps.keys()
print(idps.keys())
assert len(idps) > 300 # ~ 18%
try:
_ = mds.single_sign_on_service('urn:mace:incommon:uiuc.edu')
@ -146,7 +146,7 @@ def test_incommon_1():
idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu')
assert len(idpsso) == 1
print idpsso
print(idpsso)
assert destinations(idpsso) == [
'https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
@ -162,7 +162,7 @@ def test_incommon_1():
# Look for attribute authorities
aas = mds.with_descriptor("attribute_authority")
print aas.keys()
print(aas.keys())
assert len(aas) == 180
@ -202,24 +202,24 @@ def test_switch_1():
mds.imp(METADATACONF["5"])
assert len(mds.keys()) > 160
idps = mds.with_descriptor("idpsso")
print idps.keys()
print(idps.keys())
idpsso = mds.single_sign_on_service(
'https://aai-demo-idp.switch.ch/idp/shibboleth')
assert len(idpsso) == 1
print idpsso
print(idpsso)
assert destinations(idpsso) == [
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
assert len(idps) > 30
aas = mds.with_descriptor("attribute_authority")
print aas.keys()
print(aas.keys())
aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
print aad.keys()
print(aad.keys())
assert len(aad["attribute_authority_descriptor"]) == 1
assert len(aad["idpsso_descriptor"]) == 1
sps = mds.with_descriptor("spsso")
dual = [eid for eid, ent in idps.items() if eid in sps]
print len(dual)
print(len(dual))
assert len(dual) == 0
@ -229,7 +229,7 @@ def test_metadata_file():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["8"])
print len(mds.keys())
print(len(mds.keys()))
assert len(mds.keys()) == 560
@ -265,7 +265,7 @@ def test_load_local_dir():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["9"])
print mds
print(mds)
assert len(mds) == 3 # Three sources
assert len(mds.keys()) == 4 # number of idps
@ -276,7 +276,7 @@ def test_load_external():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["10"])
print mds
print(mds)
assert len(mds) == 1 # One source
assert len(mds.keys()) > 1 # number of idps

View File

@ -172,7 +172,7 @@ def _eq(l1, l2):
def test_1():
c = SPConfig().load(sp1)
c.context = "sp"
print c
print(c)
assert c._sp_endpoints
assert c._sp_name
assert c._sp_idp
@ -193,7 +193,7 @@ def test_2():
c = SPConfig().load(sp2)
c.context = "sp"
print c
print(c)
assert c._sp_endpoints
assert c.getattr("endpoints", "sp")
assert c._sp_idp
@ -235,7 +235,7 @@ def test_idp_1():
c = IdPConfig().load(IDP1)
c.context = "idp"
print c
print(c)
assert c.endpoint("single_sign_on_service")[0] == 'http://localhost:8088/'
attribute_restrictions = c.getattr("policy",
@ -247,7 +247,7 @@ def test_idp_2():
c = IdPConfig().load(IDP2)
c.context = "idp"
print c
print(c)
assert c.endpoint("single_logout_service",
BINDING_SOAP) == []
assert c.endpoint("single_logout_service",
@ -293,7 +293,7 @@ def test_conf_syslog():
root_logger.level = logging.NOTSET
root_logger.handlers = []
print c.logger
print(c.logger)
c.setup_logger()
assert root_logger.level != logging.NOTSET
@ -302,7 +302,7 @@ def test_conf_syslog():
assert isinstance(root_logger.handlers[0],
logging.handlers.SysLogHandler)
handler = root_logger.handlers[0]
print handler.__dict__
print(handler.__dict__)
assert handler.facility == "local3"
assert handler.address == ('localhost', 514)
if sys.version >= (2, 7):

View File

@ -114,12 +114,12 @@ class TestIdentifier():
})
name_id_policy = samlp.name_id_policy_from_string(NAME_ID_POLICY_1)
print name_id_policy
print(name_id_policy)
nameid = self.id.construct_nameid("foobar", policy,
'http://vo.example.org/biomed',
name_id_policy)
print nameid
print(nameid)
assert _eq(nameid.keyswv(), ['text', 'sp_name_qualifier', 'format',
'name_qualifier'])
assert nameid.sp_name_qualifier == 'http://vo.example.org/biomed'
@ -155,7 +155,7 @@ class TestIdentifier():
sp_id = "urn:mace:umu.se:sp"
nameid = self.id.persistent_nameid("abcd0001", sp_id)
remote_id = nameid.text.strip()
print remote_id
print(remote_id)
local = self.id.find_local_id(nameid)
assert local == "abcd0001"
@ -167,7 +167,7 @@ class TestIdentifier():
sp_id = "urn:mace:umu.se:sp"
nameid = self.id.transient_nameid("abcd0001", sp_id)
remote_id = nameid.text.strip()
print remote_id
print(remote_id)
local = self.id.find_local_id(nameid)
assert local == "abcd0001"

View File

@ -30,7 +30,7 @@ class TestMongoDBCache():
info = self.cache.get("1234", "abcd")
#{u'issuer': u'', u'came from': u'', u'ava': {u'givenName': [u'Derek']}, u'session_id': -1, u'not_on_or_after': 0}
ava = info["ava"]
print ava
print(ava)
assert ava.keys() == ["givenName"]
assert ava["givenName"] == ["Derek"]
@ -53,7 +53,7 @@ class TestMongoDBCache():
self.cache.delete("1234")
info = self.cache.get("1234", "abcd")
print info
print(info)
assert info == {}
def test_subjects(self):
@ -75,7 +75,7 @@ class TestMongoDBCache():
self.cache.set("1234", "xyzv", session_info, not_on_or_after)
(ident, _) = self.cache.get_identity("1234")
print ident
print(ident)
assert len(ident.keys()) == 2
assert "givenName" in ident.keys()
assert "mail" in ident.keys()
@ -87,5 +87,5 @@ class TestMongoDBCache():
self.cache.delete("1234")
info = self.cache.get("1234", "xyzv")
print info
print(info)
assert info == {}

View File

@ -160,7 +160,7 @@ def test_idp_policy_filter():
policy = idp.config.getattr("policy", "idp")
ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", idp.metadata)
print ava
print(ava)
assert ava.keys() == ["eduPersonTargetedID"] # because no entity category
if __name__ == "__main__":

View File

@ -70,7 +70,7 @@ def test_cert_from_instance_1():
assertion = response.assertion[0]
certs = sigver.cert_from_instance(assertion)
assert len(certs) == 1
print certs[0]
print(certs[0])
assert certs[0] == CERT1
@ -82,7 +82,7 @@ def test_cert_from_instance_ssp():
assert len(certs) == 1
assert certs[0] == CERT_SSP
der = base64.b64decode(certs[0])
print str(decoder.decode(der)).replace('.', "\n.")
print(str(decoder.decode(der)).replace('.', "\n."))
assert decoder.decode(der)
@ -145,18 +145,18 @@ class TestSecurity():
def test_sign_assertion(self):
ass = self._assertion
print ass
print(ass)
sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id)
#print sign_ass
#print(sign_ass)
sass = saml.assertion_from_string(sign_ass)
#print sass
#print(sass)
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
print "Crypto version : %s" % (self.sec.crypto.version())
print("Crypto version : %s" % (self.sec.crypto.version()))
item = self.sec.check_signature(sass, class_name(sass), sign_ass)
@ -176,7 +176,7 @@ class TestSecurity():
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
print "Crypto version : %s" % (self.sec.crypto.version())
print("Crypto version : %s" % (self.sec.crypto.version()))
item = self.sec.check_signature(sass, class_name(sass),
sign_ass, must=True)
@ -234,11 +234,11 @@ class TestSecurity():
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
assert s_response is not None
print s_response
print(s_response)
response = response_from_string(s_response)
sass = response.assertion[0]
print sass
print(sass)
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
@ -301,11 +301,11 @@ class TestSecurity():
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
print s_response
print(s_response)
res = self.sec.verify_signature("%s" % s_response,
node_name=class_name(samlp.Response()))
print res
print(res)
assert res
def test_sign_verify_with_cert_from_instance(self):
@ -353,7 +353,7 @@ class TestSecurity():
to_sign = [(class_name(assertion), assertion.id)]
s_assertion = sigver.signed_instance_factory(assertion, self.sec,
to_sign)
print s_assertion
print(s_assertion)
ass = assertion_from_string(s_assertion)
ci = "".join(sigver.cert_from_instance(ass)[0].split())
assert ci == self.sec.my_cert
@ -470,7 +470,7 @@ def test_xbox():
if _txt:
assertions.append(ass)
print assertions
print(assertions)
def test_xmlsec_err():

View File

@ -127,7 +127,7 @@ class TestResponse:
resp.parse_assertion()
si = resp.session_info()
assert si
print si["ava"]
print(si["ava"])
if __name__ == "__main__":

View File

@ -26,7 +26,7 @@ AUTHN = {
def test_pre_enc():
tmpl = pre_encryption_part()
print tmpl
print(tmpl)
assert "%s" % tmpl == TMPL
@ -41,7 +41,7 @@ def test_reshuffle_response():
resp2 = pre_encrypt_assertion(resp_)
print resp2
print(resp2)
assert resp2.encrypted_assertion.extension_elements
@ -75,7 +75,7 @@ def test_enc1():
(_stdout, _stderr, output) = crypto._run_xmlsec(
com_list, [tmpl], exception=EncryptError, validate_output=False)
print output
print(output)
assert _stderr == ""
assert _stdout == ""
@ -94,7 +94,7 @@ def test_enc2():
enc_resp = crypto.encrypt_assertion(resp_, full_path("pubkey.pem"),
pre_encryption_part())
print enc_resp
print(enc_resp)
assert enc_resp
if __name__ == "__main__":

View File

@ -63,13 +63,13 @@ class TestAuthnResponse:
def test_verify_1(self):
xml_response = "%s" % (self._resp_,)
print xml_response
print(xml_response)
self.ar.outstanding_queries = {"id12": "http://localhost:8088/sso"}
self.ar.timeslack = 10000
self.ar.loads(xml_response, decode=False)
self.ar.verify()
print self.ar.__dict__
print(self.ar.__dict__)
assert self.ar.came_from == 'http://localhost:8088/sso'
assert self.ar.session_id() == "id12"
assert self.ar.ava["givenName"] == IDENTITY["givenName"]
@ -78,14 +78,14 @@ class TestAuthnResponse:
def test_verify_signed_1(self):
xml_response = self._sign_resp_
print xml_response
print(xml_response)
self.ar.outstanding_queries = {"id12": "http://localhost:8088/sso"}
self.ar.timeslack = 10000
self.ar.loads(xml_response, decode=False)
self.ar.verify()
print self.ar.__dict__
print(self.ar.__dict__)
assert self.ar.came_from == 'http://localhost:8088/sso'
assert self.ar.session_id() == "id12"
assert self.ar.ava["sn"] == IDENTITY["surName"]
@ -103,7 +103,7 @@ class TestAuthnResponse:
self.ar.loads(xml_response, decode=False)
self.ar.verify()
print self.ar.__dict__
print(self.ar.__dict__)
assert self.ar.came_from == 'http://localhost:8088/foo'
assert self.ar.session_id() == ID
assert self.ar.name_id
@ -117,7 +117,7 @@ class TestAuthnResponse:
self.ar.loads(xml_response, decode=False)
self.ar.verify()
print self.ar.assertion
print(self.ar.assertion)
assert len(self.ar.assertion.authn_statement) == 1
authn_info = self.ar.authn_info()
assert len(authn_info) == 1

View File

@ -160,7 +160,7 @@ class TestServer1():
issuer=self.server._issuer(),
)
print response.keyswv()
print(response.keyswv())
assert _eq(response.keyswv(), ['destination', 'assertion', 'status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
@ -170,7 +170,7 @@ class TestServer1():
assert response.in_response_to == "_012345"
#
status = response.status
print status
print(status)
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
@ -182,7 +182,7 @@ class TestServer1():
htargs = self.client.apply_binding(
binding, "%s" % authn_request, "http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
print(_dict)
raises(OtherError, self.server.parse_authn_request,
_dict["SAMLRequest"][0], binding)
@ -194,17 +194,17 @@ class TestServer1():
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
print(_dict)
try:
self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
status = None
except OtherError, oe:
print oe.args
except OtherError as oe:
print(oe.args)
status = s_utils.error_status_factory(oe)
assert status
print status
print(status)
assert _eq(status.keyswv(), ["status_code", "status_message"])
assert status.status_message.text == 'Not destined for me!'
status_code = status.status_code
@ -216,16 +216,16 @@ class TestServer1():
req_id, authn_request = self.client.create_authn_request(
message_id="id1", destination="http://localhost:8088/sso")
print authn_request
print(authn_request)
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
print(_dict)
req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
# returns a dictionary
print req
print(req)
resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
assert resp_args["in_response_to"] == "id1"
@ -253,7 +253,7 @@ class TestServer1():
authn=AUTHN
)
print resp.keyswv()
print(resp.keyswv())
assert _eq(resp.keyswv(), ['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id', 'issuer'])
@ -264,12 +264,12 @@ class TestServer1():
assert resp.assertion
assert resp.assertion
assertion = resp.assertion
print assertion
print(assertion)
assert assertion.authn_statement
assert assertion.conditions
assert assertion.attribute_statement
attribute_statement = assertion.attribute_statement
print attribute_statement
print(attribute_statement)
assert len(attribute_statement[0].attribute) == 4
# Pick out one attribute
attr = None
@ -286,8 +286,8 @@ class TestServer1():
assert assertion.subject.name_id
assert assertion.subject.subject_confirmation
confirmation = assertion.subject.subject_confirmation[0]
print confirmation.keyswv()
print confirmation.subject_confirmation_data
print(confirmation.keyswv())
print(confirmation.subject_confirmation_data)
assert confirmation.subject_confirmation_data.in_response_to == "id12"
def test_sso_response_without_identity(self):
@ -302,7 +302,7 @@ class TestServer1():
best_effort=True
)
print resp.keyswv()
print(resp.keyswv())
assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer',
'assertion'])
@ -327,7 +327,7 @@ class TestServer1():
best_effort=True
)
print resp.keyswv()
print(resp.keyswv())
assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer',
'assertion'])
@ -339,13 +339,13 @@ class TestServer1():
resp = self.server.create_error_response(
"id12", "http://localhost:8087/", exc)
print resp.keyswv()
print(resp.keyswv())
assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "id12"
assert resp.status
print resp.status
print(resp.status)
assert resp.status.status_code.value == samlp.STATUS_RESPONDER
assert resp.status.status_code.status_code.value == \
samlp.STATUS_REQUEST_UNSUPPORTED
@ -370,11 +370,11 @@ class TestServer1():
"foba0001@example.com", authn=AUTHN)
response = samlp.response_from_string(resp_str)
print response.keyswv()
print(response.keyswv())
assert _eq(response.keyswv(), ['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
print response.assertion[0].keyswv()
print(response.assertion[0].keyswv())
assert len(response.assertion) == 1
assert _eq(response.assertion[0].keyswv(), ['attribute_statement',
'issue_instant', 'version',
@ -384,7 +384,7 @@ class TestServer1():
assertion = response.assertion[0]
assert len(assertion.attribute_statement) == 1
astate = assertion.attribute_statement[0]
print astate
print(astate)
assert len(astate.attribute) == 4
def test_signed_response(self):
@ -402,7 +402,7 @@ class TestServer1():
sign_assertion=True
)
print signed_resp
print(signed_resp)
assert signed_resp
sresponse = response_from_string(signed_resp)
@ -832,7 +832,7 @@ class TestServer2():
def test_do_attribute_reponse(self):
aa_policy = self.server.config.getattr("policy", "idp")
print aa_policy.__dict__
print(aa_policy.__dict__)
response = self.server.create_attribute_response(
IDENTITY.copy(), "aaa", "http://example.com/sp/",
"urn:mace:example.com:sp:1")
@ -881,7 +881,7 @@ class TestServerLogout():
def test_1(self):
with closing(Server("idp_slo_redirect_conf")) as server:
req_id, request = _logout_request("sp_slo_redirect_conf")
print request
print(request)
bindings = [BINDING_HTTP_REDIRECT]
response = server.create_logout_response(request, bindings)
binding, destination = server.pick_binding("single_logout_service",

View File

@ -85,10 +85,10 @@ def _leq(l1, l2):
# client = Saml2Client({})
# (ava, name_id, real_uri) = \
# client.do_response(response, "xenosmilus.umdc.umu.se")
# print 40*"="
# print ava
# print 40*","
# print name_id
# print(40*"=")
# print(ava)
# print(40*",")
# print(name_id)
# assert False
REQ1 = {"1.2.14": """<?xml version='1.0' encoding='UTF-8'?>
@ -150,7 +150,7 @@ class TestClient:
attrq = samlp.attribute_query_from_string(reqstr)
print attrq.keyswv()
print(attrq.keyswv())
assert _leq(attrq.keyswv(), ['destination', 'subject', 'issue_instant',
'version', 'id', 'issuer'])
@ -179,7 +179,7 @@ class TestClient:
format=saml.NAMEID_FORMAT_PERSISTENT,
message_id="id1")
print req.to_string()
print(req.to_string())
assert req.destination == "https://idp.example.com/idp/"
assert req.id == "id1"
assert req.version == "2.0"
@ -229,7 +229,7 @@ class TestClient:
"http://www.example.com/sso", message_id="id1")[1]
ar = samlp.authn_request_from_string(ar_str)
print ar
print(ar)
assert ar.assertion_consumer_service_url == ("http://lingon.catalogix"
".se:8087/")
assert ar.destination == "http://www.example.com/sso"
@ -252,7 +252,7 @@ class TestClient:
message_id="666")[1]
ar = samlp.authn_request_from_string(ar_str)
print ar
print(ar)
assert ar.id == "666"
assert ar.assertion_consumer_service_url == "http://lingon.catalogix" \
".se:8087/"
@ -267,7 +267,7 @@ class TestClient:
assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
def test_sign_auth_request_0(self):
#print self.client.config
#print(self.client.config)
req_id, areq = self.client.create_authn_request(
"http://www.example.com/sso", sign=True, message_id="id1")
@ -279,11 +279,11 @@ class TestClient:
assert ar.signature
assert ar.signature.signature_value
signed_info = ar.signature.signed_info
#print signed_info
#print(signed_info)
assert len(signed_info.reference) == 1
assert signed_info.reference[0].uri == "#id1"
assert signed_info.reference[0].digest_value
print "------------------------------------------------"
print("------------------------------------------------")
try:
assert self.client.sec.correctly_signed_authn_request(
ar_str, self.client.config.xmlsec_binary,
@ -322,7 +322,7 @@ class TestClient:
assert authn_response.response.assertion[0].issuer.text == IDP
session_info = authn_response.session_info()
print session_info
print(session_info)
assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'],
'givenName': ['Derek'],
'sn': ['Jeter'],
@ -335,7 +335,7 @@ class TestClient:
# One person in the cache
assert len(self.client.users.subjects()) == 1
subject_id = self.client.users.subjects()[0]
print "||||", self.client.users.get_info_from(subject_id, IDP)
print("||||", self.client.users.get_info_from(subject_id, IDP))
# The information I have about the subject comes from one source
assert self.client.users.issuers_of_info(subject_id) == [IDP]
@ -364,19 +364,19 @@ class TestClient:
issuers = [self.client.users.issuers_of_info(s) for s in
self.client.users.subjects()]
# The information I have about the subjects comes from the same source
print issuers
print(issuers)
assert issuers == [[IDP], [IDP]]
def test_init_values(self):
entityid = self.client.config.entityid
print entityid
print(entityid)
assert entityid == "urn:mace:example.com:saml:roland:sp"
print self.client.metadata.with_descriptor("idpsso")
print(self.client.metadata.with_descriptor("idpsso"))
location = self.client._sso_location()
print location
print(location)
assert location == 'http://localhost:8088/sso'
my_name = self.client._my_name()
print my_name
print(my_name)
assert my_name == "urn:mace:example.com:saml:roland:sp"
def test_sign_then_encrypt_assertion(self):
@ -440,7 +440,7 @@ class TestClient:
seresp.assertion = resp_ass
seresp.encrypted_assertion = None
#print _sresp
#print(_sresp)
assert seresp.assertion
@ -600,7 +600,7 @@ class TestClient:
res = self.server.parse_authn_request(qs["SAMLRequest"][0],
BINDING_HTTP_REDIRECT)
print res
print(res)
# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"
@ -679,7 +679,7 @@ class TestClientWithDummy():
entity_ids = self.client.users.issuers_of_info(nid)
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
resp = self.client.global_logout(nid, "Tired", in_a_while(minutes=5))
print resp
print(resp)
assert resp
assert len(resp) == 1
assert resp.keys() == entity_ids
@ -709,7 +709,7 @@ class TestClientWithDummy():
'application/x-www-form-urlencoded')]
response = self.client.send(**http_args)
print response.text
print(response.text)
_dic = unpack_form(response.text[3], "SAMLResponse")
resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
BINDING_HTTP_POST,
@ -744,7 +744,7 @@ class TestClientWithDummy():
'application/x-www-form-urlencoded')]
response = self.client.send(**http_args)
print response.text
print(response.text)
_dic = unpack_form(response.text[3], "SAMLResponse")
resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
BINDING_HTTP_POST,

View File

@ -39,18 +39,18 @@ class TestVirtualOrg():
def test_mta(self):
aas = self.vo.members_to_ask(nid)
print aas
print(aas)
assert len(aas) == 1
assert 'urn:mace:example.com:saml:aa' in aas
def test_unknown_subject(self):
aas = self.vo.members_to_ask(nid0)
print aas
print(aas)
assert len(aas) == 2
def test_id(self):
cid = self.vo.get_common_identifier(nid)
print cid
print(cid)
assert cid == "deje0001"
def test_id_unknown(self):
@ -68,18 +68,18 @@ class TestVirtualOrg_2():
def test_mta(self):
aas = self.sp.vorg.members_to_ask(nid)
print aas
print(aas)
assert len(aas) == 1
assert 'urn:mace:example.com:saml:aa' in aas
def test_unknown_subject(self):
aas = self.sp.vorg.members_to_ask(nid0)
print aas
print(aas)
assert len(aas) == 2
def test_id(self):
cid = self.sp.vorg.get_common_identifier(nid)
print cid
print(cid)
assert cid == "deje0001"
def test_id_unknown(self):

View File

@ -122,7 +122,7 @@ def test_complete_flow():
ht_args = client.use_soap(idp_response, cargs["rc_url"],
[cargs["relay_state"]])
print ht_args
print(ht_args)
# ------------ @SP -----------------------------
@ -139,7 +139,7 @@ def test_complete_flow():
resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"})
print resp.response
print(resp.response)
assert resp.response.destination == "http://lingon.catalogix.se:8087/paos"
assert resp.response.status.status_code.value == STATUS_SUCCESS

View File

@ -85,7 +85,7 @@ def test_create_artifact_resolve():
msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid())
print msg
print(msg)
args = idp.use_soap(msg, destination, None, False)
@ -93,7 +93,7 @@ def test_create_artifact_resolve():
ar = sp.parse_artifact_resolve(args["data"])
print ar
print(ar)
assert ar.artifact.text == b64art
@ -177,7 +177,7 @@ def test_artifact_flow():
authn=AUTHN,
**resp_args)
print response
print(response)
# with the response in hand create an artifact
@ -201,7 +201,7 @@ def test_artifact_flow():
# Got an artifact want to replace it with the real message
msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid())
print msg
print(msg)
hinfo = sp.use_soap(msg, destination, None, False)
@ -211,7 +211,7 @@ def test_artifact_flow():
ar = idp.parse_artifact_resolve(msg)
print ar
print(ar)
assert ar.artifact.text == artifact3

View File

@ -55,7 +55,7 @@ def test_basic():
_id, aq = sp.create_authn_query(subject, destination, authn_context)
print aq
print(aq)
assert isinstance(aq, AuthnQuery)
@ -104,7 +104,7 @@ def test_flow():
aq_id, aq = sp.create_authn_query(subject, destination, authn_context)
print aq
print(aq)
assert isinstance(aq, AuthnQuery)
binding = BINDING_SOAP
@ -123,7 +123,7 @@ def test_flow():
p_res = idp.create_authn_query_response(msg.subject, msg.session_index,
msg.requested_authn_context)
print p_res
print(p_res)
hinfo = idp.apply_binding(binding, "%s" % p_res, "", "state2",
response=True)
@ -134,7 +134,7 @@ def test_flow():
final = sp.parse_authn_query_response(xmlstr, binding)
print final
print(final)
assert final.response.id == p_res.id

View File

@ -24,7 +24,7 @@ def test_base_request():
mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination)
print nmr
print(nmr)
assert isinstance(nmr, NameIDMappingRequest)
@ -44,7 +44,7 @@ def test_request_response():
mid, nmr = sp.create_name_id_mapping_request(policy, nameid, destination)
print nmr
print(nmr)
args = sp.use_soap(nmr, destination)
@ -58,7 +58,7 @@ def test_request_response():
idp_response = idp.create_name_id_mapping_response(
name_id, in_response_to=in_response_to)
print idp_response
print(idp_response)
ht_args = sp.use_soap(idp_response)
@ -66,7 +66,7 @@ def test_request_response():
_resp = sp.parse_name_id_mapping_request_response(ht_args["data"], binding)
print _resp.response
print(_resp.response)
r_name_id = _resp.response.name_id

View File

@ -21,14 +21,14 @@ def test_basic():
mid, mreq = sp.create_manage_name_id_request(destination, name_id=nameid,
new_id=newid)
print mreq
print(mreq)
rargs = sp.apply_binding(binding, "%s" % mreq, destination, "")
# --------- @IDP --------------
_req = idp.parse_manage_name_id_request(rargs["data"], binding)
print _req.message
print(_req.message)
assert mid == _req.message.id
@ -45,14 +45,14 @@ def test_flow():
mid, midq = sp.create_manage_name_id_request(destination, name_id=nameid,
new_id=newid)
print midq
print(midq)
rargs = sp.apply_binding(binding, "%s" % midq, destination, "")
# --------- @IDP --------------
_req = idp.parse_manage_name_id_request(rargs["data"], binding)
print _req.message
print(_req.message)
mnir = idp.create_manage_name_id_response(_req.message, [binding])
@ -64,14 +64,14 @@ def test_flow():
respargs = idp.apply_binding(binding, "%s" % mnir, destination, "")
print respargs
print(respargs)
# ---------- @SP ---------------
_response = sp.parse_manage_name_id_request_response(respargs["data"],
binding)
print _response.response
print(_response.response)
assert _response.response.id == mnir.id

View File

@ -107,5 +107,5 @@ def test_basic_flow():
final = sp.parse_assertion_id_request_response(xmlstr, binding)
print final.response
print(final.response)
assert isinstance(final.response, Assertion)

View File

@ -41,7 +41,7 @@ def test_construct_deconstruct_request():
returnIDParam="foo",
return_url="https://example.com/saml/sp/disc")
print url
print(url)
ds = DiscoveryServer(config_file=dotname("disco_conf"))
dsr = ds.parse_discovery_service_request(url)

View File

@ -6,13 +6,13 @@ __author__ = 'rolandh'
def test_eptid():
edb = Eptid("secret")
e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
print e1
print(e1)
assert e1.startswith("idp_entity_id!sp_entity_id!")
e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
assert e1 == e2
e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2", "some other data")
print e3
print(e3)
assert e1 != e3
e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id", "some other data")
@ -23,13 +23,13 @@ def test_eptid():
def test_eptid_shelve():
edb = EptidShelve("secret", "eptid.db")
e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
print e1
print(e1)
assert e1.startswith("idp_entity_id!sp_entity_id!")
e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
assert e1 == e2
e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2", "some other data")
print e3
print(e3)
assert e1 != e3
e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id", "some other data")

View File

@ -71,7 +71,7 @@ def test_eptid_mongo_db():
else:
e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
"some other data")
print e1
print(e1)
assert e1.startswith("idp_entity_id!sp_entity_id!")
e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
"some other data")
@ -79,7 +79,7 @@ def test_eptid_mongo_db():
e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2",
"some other data")
print e3
print(e3)
assert e1 != e3
e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id",

View File

@ -35,7 +35,7 @@ req_id, req = client.create_authn_request(
extensions=extensions)
print req
print(req)
# Get a certificate from an authn request

View File

@ -8,7 +8,7 @@ fil = "sp_mdext_conf.py"
cnf = Config().load_file(fil, metadata_construction=True)
ed = entity_descriptor(cnf)
print ed
print(ed)
assert ed.spsso_descriptor.extensions
assert len(ed.spsso_descriptor.extensions.extension_elements) == 3

View File

@ -69,7 +69,7 @@ if args.id:
desc, xmldoc = entities_descriptor(eds, valid_for, args.name, args.id,
args.sign, secc)
valid_instance(desc)
print desc.to_string(nspair)
print(desc.to_string(nspair))
else:
for eid in eds:
if args.sign:
@ -81,4 +81,4 @@ else:
valid_instance(eid)
xmldoc = metadata_tostring_fix(eid, nspair, xmldoc)
print xmldoc
print(xmldoc)

View File

@ -73,4 +73,4 @@ if metad is not None:
f.write(txt)
f.close()
else:
print txt
print(txt)

View File

@ -65,7 +65,7 @@ def main():
if metad:
metad.load()
print metad.dumps()
print(metad.dumps())
if __name__ == '__main__':

View File

@ -35,7 +35,7 @@ for i in range(1, 10):
_ = mdmd.keys()
print time.time() - start
print(time.time() - start)
start = time.time()
for i in range(1, 10):
@ -44,4 +44,4 @@ for i in range(1, 10):
mdf.load()
_ = mdf.keys()
print time.time() - start
print(time.time() - start)

View File

@ -98,6 +98,6 @@ for line in open(args.conf).readlines():
mds.metadata[spec[1]] = metad
print mds.dumps(args.output)
print(mds.dumps(args.output))

View File

@ -835,9 +835,9 @@ def _spec(elem):
def _do_from_string(name):
print
print "def %s_from_string(xml_string):" % pyify(name)
print "%sreturn saml2.create_class_from_xml_string(%s, xml_string)" % (
INDENT, name)
print("def %s_from_string(xml_string):" % pyify(name))
print("%sreturn saml2.create_class_from_xml_string(%s, xml_string)" % (
INDENT, name))
def _namespace_and_tag(obj, param, top):
try:
@ -898,7 +898,7 @@ class Attribute(Simple):
# default, fixed, use, type
if DEBUG:
print "#ATTR", self.__dict__
print("#ATTR", self.__dict__)
external = False
name = ""
@ -971,7 +971,7 @@ class Attribute(Simple):
pass
if DEBUG:
print "#--ATTR py_attr:%s" % (objekt,)
print("#--ATTR py_attr:%s" % (objekt,))
return objekt
@ -1077,8 +1077,8 @@ class Complex(object):
return self._own, self._inherited
if DEBUG:
print self.__dict__
print "#-- %d parts" % len(self.parts)
print(self.__dict__)
print("#-- %d parts" % len(self.parts))
self._extend(top, sup, argv, parent)
@ -1094,7 +1094,7 @@ class Complex(object):
# string = "== %s (%s)" % (self.name,self.__class__)
# except AttributeError:
# string = "== (%s)" % (self.__class__,)
# print string
# print(string)
for part in self.parts:
if isinstance(part, Element):
res.append(name_or_ref(part, top))
@ -1168,8 +1168,8 @@ class Element(Complex):
try:
argv_copy = sd_copy(argv)
return [self.repr(top, sup, argv_copy, parent=parent)], []
except AttributeError, exc:
print "#!!!!", exc
except AttributeError as exc:
print("#!!!!", exc)
return [], []
def elements(self, top):
@ -1197,9 +1197,8 @@ class Element(Complex):
myname = ""
if DEBUG:
print "#Element.repr '%s' (child=%s) [%s]" % (myname,
child,
self._generated)
print("#Element.repr '%s' (child=%s) [%s]" %
(myname, child, self._generated))
self.py_class = objekt = PyElement(myname, root=top)
min_max(self, objekt, argv)
@ -1214,9 +1213,9 @@ class Element(Complex):
objekt.ref = superkl
else:
objekt.ref = (namespace, superkl)
except AttributeError, exc:
except AttributeError as exc:
if DEBUG:
print "#===>", exc
print("#===>", exc)
typ = self.type
@ -1257,7 +1256,7 @@ class Element(Complex):
objekt.scoped = True
else:
if DEBUG:
print "$", self
print("$", self)
raise
if parent:
@ -1310,7 +1309,7 @@ class Sequence(Complex):
argv_copy[key] = val
if DEBUG:
print "#Sequence: %s" % argv
print("#Sequence: %s" % argv)
return Complex.collect(self, top, sup, argv_copy, parent)
@ -1336,7 +1335,7 @@ class Extension(Complex):
return self._own, self._inherited
if DEBUG:
print "#!!!", self.__dict__
print("#!!!", self.__dict__)
try:
base = self.base
@ -1346,14 +1345,14 @@ class Extension(Complex):
cti = get_type_def(tag, top.parts)
if not cti.py_class:
cti.repr(top, sup)
#print "#EXT..",ct._collection
#print("#EXT..",ct._collection)
self._inherited = cti.py_class.properties[0][:]
self._inherited.extend(cti.py_class.properties[1])
elif self.xmlns_map[namespace] == XMLSCHEMA:
base = tag
else:
iattr = _import_attrs(top.modul[namespace], tag, top)
#print "#EXT..-", ia
#print("#EXT..-", ia)
self._inherited = iattr
except (AttributeError, ValueError):
base = None
@ -1373,7 +1372,7 @@ class Choice(Complex):
argv_copy["minOccurs"] = 0
if DEBUG:
print "#Choice: %s" % argv
print("#Choice: %s" % argv)
return Complex.collect(self, top, sup, argv_copy, parent=parent)
class Restriction(Complex):
@ -1411,11 +1410,11 @@ class ComplexType(Complex):
else:
new_sup = "%s.%s" % (namespace, name)
#print "#Superior: %s" % new_sup
#print("#Superior: %s" % new_sup)
if new_sup:
sup = new_sup
else:
#print "#>>", self.parts[0].__class__
#print("#>>", self.parts[0].__class__)
pass
try:
@ -1473,8 +1472,8 @@ class Group(Complex):
"Reference to group in other XSD file, not supported")
except KeyError:
raise Exception("Missing namespace definition")
except AttributeError, exc:
print "#!!!!", exc
except AttributeError as exc:
print("#!!!!", exc)
return [], []
def repr(self, top=None, sup=None, argv=None, _child=True, parent=""):
@ -1615,7 +1614,7 @@ def sort_elements(els):
if pres != val:
diff = True
#print els
#print(els)
partres = []
for key, val in els.items():
if not val:
@ -1645,24 +1644,24 @@ def output(elem, target_namespace, eldict, ignore=None):
if prep:
done = 1
if isinstance(prep, basestring):
print prep
print(prep)
else:
for item in prep:
print item
print
print
print(item)
print()
print()
if text:
done = 1
elem.done = True
print text
print
print(text)
print()
return done
def intro():
print """#!/usr/bin/env python
print("""#!/usr/bin/env python
#
# Generated %s by parse_xsd.py version %s.
@ -1670,7 +1669,7 @@ def intro():
import saml2
from saml2 import SamlBase
""" % (time.ctime(), __version__)
""" % (time.ctime(), __version__))
#NAMESPACE = 'http://www.w3.org/2000/09/xmldsig#'
@ -1759,7 +1758,7 @@ class Schema(Complex):
udict[elem] = elem.undefined(eldict)
keys = [k.name for k in udict.keys()]
print "#", keys
print("#", keys)
res = (None, [])
if not udict:
return res
@ -1834,19 +1833,19 @@ class Schema(Complex):
return undone
def _element_from_string(self):
print "ELEMENT_FROM_STRING = {"
print("ELEMENT_FROM_STRING = {")
for elem in self.elems:
if isinstance(elem, PyAttribute) or isinstance(elem, PyGroup):
continue
if elem.abstract:
continue
print "%s%s.c_tag: %s_from_string," % (INDENT, elem.class_name,
pyify(elem.class_name))
print "}"
print
print("%s%s.c_tag: %s_from_string," % (INDENT, elem.class_name,
pyify(elem.class_name)))
print("}")
print()
def _element_by_tag(self):
print "ELEMENT_BY_TAG = {"
print("ELEMENT_BY_TAG = {")
listed = []
for elem in self.elems:
if isinstance(elem, PyAttribute) or isinstance(elem, PyGroup):
@ -1854,16 +1853,16 @@ class Schema(Complex):
if elem.abstract:
continue
lcen = elem.name
print "%s'%s': %s," % (INDENT, lcen, elem.class_name)
print("%s'%s': %s," % (INDENT, lcen, elem.class_name))
listed.append(lcen)
for elem in self.elems:
if isinstance(elem, PyAttribute) or isinstance(elem, PyGroup):
continue
lcen = elem.name
if elem.abstract and lcen not in listed:
print "%s'%s': %s," % (INDENT, lcen, elem.class_name)
print("%s'%s': %s," % (INDENT, lcen, elem.class_name))
listed.append(lcen)
print "}"
print("}")
print
def out(self):
@ -1884,26 +1883,26 @@ class Schema(Complex):
for elem in self.elems:
eldict[elem.name] = elem
#print eldict.keys()
#print(eldict.keys())
intro()
for modul in self.add:
print "from %s import *" % modul
print("from %s import *" % modul)
for _namespace, (mod, namn) in self.impo.items():
if namn:
print "import %s as %s" % (mod, namn)
print
print "NAMESPACE = '%s'" % self.target_namespace
print("import %s as %s" % (mod, namn))
print( )
print("NAMESPACE = '%s'" % self.target_namespace)
print
for defs in self.defs:
print defs
print(defs)
print
exceptions = []
block = []
while self._do(eldict):
print "#.................."
print("#..................")
(objekt, tups) = self.adjust(eldict, block)
if not objekt:
break
@ -1917,29 +1916,30 @@ class Schema(Complex):
block = block_items(objekt, block, eldict)
if exceptions:
print "#", 70*'+'
print("#", 70*'+')
for line in exceptions:
print line
print "#", 70*'+'
print(line)
print("#", 70*'+')
print
for attrgrp in self.attrgrp:
print "AG_%s = [" % attrgrp.name
print("AG_%s = [" % attrgrp.name)
for prop in attrgrp.properties[0]:
if isinstance(prop.type, PyObj):
print "%s('%s', %s_, %s)," % (INDENT, prop.name,
prop.type.name, prop.required)
print("%s('%s', %s_, %s)," % (INDENT, prop.name,
prop.type.name,
prop.required))
else:
print "%s('%s', '%s', %s)," % (INDENT, prop.name,
prop.type, prop.required)
print "]"
print
print("%s('%s', '%s', %s)," % (INDENT, prop.name,
prop.type, prop.required))
print("]")
print()
self._element_from_string()
self._element_by_tag()
print
print "def factory(tag, **kwargs):"
print " return ELEMENT_BY_TAG[tag](**kwargs)"
print("def factory(tag, **kwargs):")
print(" return ELEMENT_BY_TAG[tag](**kwargs)")
print
@ -1991,7 +1991,7 @@ def evaluate(typ, elem):
try:
return ELEMENTFUNCTION[typ](elem)
except KeyError:
print "Unknown type", typ
print("Unknown type", typ)
NS_MAP = "xmlns_map"
@ -2015,14 +2015,14 @@ def parse_nsmap(fil):
return ElementTree.ElementTree(root)
def usage():
print "Usage: parse_xsd [-i <module:as>] xsd.file > module.py"
print("Usage: parse_xsd [-i <module:as>] xsd.file > module.py")
def recursive_find_module(name, path=None):
parts = name.split(".")
mod_a = None
for part in parts:
#print "$$", part, path
#print("$$", part, path)
try:
(fil, pathname, desc) = imp.find_module(part, path)
except ImportError:
@ -2144,9 +2144,9 @@ def main(argv):
try:
opts, args = getopt.getopt(argv, "a:d:hi:I:s:",
["add=", "help", "import=", "defs="])
except getopt.GetoptError, err:
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
print(str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
@ -2177,12 +2177,12 @@ def main(argv):
assert False, "unhandled option"
if not args:
print "No XSD-file specified"
print("No XSD-file specified")
usage()
sys.exit(2)
schema = read_schema(args[0], add, defs, impo, modul, ignore, sdir)
#print schema.__dict__
#print(schema.__dict__)
schema.out()
if __name__ == "__main__":

View File

@ -58,7 +58,7 @@ class AMap(object):
try:
assert self.mod.MAP["to"][val] == key
except KeyError: # missing value
print "# Added %s=%s" % (self.mod.MAP["to"][val], key)
print("# Added %s=%s" % (self.mod.MAP["to"][val], key))
self.mod.MAP["to"][val] = key
except AssertionError:
raise Exception("Mismatch key:%s '%s' != '%s'" % (
@ -66,7 +66,7 @@ class AMap(object):
for val in self.mod.MAP["to"].values():
if val not in self.mod.MAP["fro"]:
print "# Missing URN '%s'" % val
print("# Missing URN '%s'" % val)
def do_fro(self):
txt = ["%s'fro': {" % self.indent]
@ -133,7 +133,7 @@ if __name__ == "__main__":
continue
elif fname.endswith(".pyc"):
continue
print 10 * "=" + fname + 10 * "="
print(10 * "=" + fname + 10 * "=")
amap = AMap(_name, fname, 4 * " ")
f = open(fname, "w")
f.write("%s" % amap)

View File

@ -86,7 +86,7 @@ if metad:
except:
raise
else:
print "OK"
print("OK")