PEP-8
This commit is contained in:
@@ -80,6 +80,7 @@ def _expiration(timeout, tformat="%a, %d-%b-%Y %H:%M:%S GMT"):
|
||||
def dict2list_of_tuples(d):
|
||||
return [(k, v) for k, v in d.items()]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -96,7 +97,7 @@ class Service(object):
|
||||
return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def unpack_post(self):
|
||||
_dict = parse_qs(get_post(self.environ))
|
||||
logger.debug("unpack_post:: %s" % _dict)
|
||||
@@ -104,14 +105,14 @@ class Service(object):
|
||||
return dict([(k, v[0]) for k, v in _dict.items()])
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def unpack_soap(self):
|
||||
try:
|
||||
query = get_post(self.environ)
|
||||
return {"SAMLRequest": query, "RelayState": ""}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def unpack_either(self):
|
||||
if self.environ["REQUEST_METHOD"] == "GET":
|
||||
_dict = self.unpack_redirect()
|
||||
@@ -124,7 +125,7 @@ class Service(object):
|
||||
|
||||
def operation(self, saml_msg, binding):
|
||||
logger.debug("_operation: %s" % saml_msg)
|
||||
if not saml_msg or not 'SAMLRequest' in saml_msg:
|
||||
if not (saml_msg and 'SAMLRequest' in saml_msg):
|
||||
resp = BadRequest('Error parsing request or no request')
|
||||
return resp(self.environ, self.start_response)
|
||||
else:
|
||||
@@ -136,7 +137,8 @@ class Service(object):
|
||||
encrypt_cert=_encrypt_cert)
|
||||
except KeyError:
|
||||
# Can live with no relay state # TODO or can we, for inacademia?
|
||||
return self.do(saml_msg["SAMLRequest"], binding, saml_msg["RelayState"])
|
||||
return self.do(saml_msg["SAMLRequest"], binding,
|
||||
saml_msg["RelayState"])
|
||||
|
||||
def artifact_operation(self, saml_msg):
|
||||
if not saml_msg:
|
||||
@@ -302,7 +304,7 @@ class SSO(Service):
|
||||
|
||||
if not _resp:
|
||||
identity = USERS[self.user].copy()
|
||||
#identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
|
||||
# identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
|
||||
logger.info("Identity: %s" % (identity,))
|
||||
|
||||
if REPOZE_ID_EQUIVALENT:
|
||||
@@ -367,7 +369,8 @@ class SSO(Service):
|
||||
|
||||
_req = self.req_info.message
|
||||
|
||||
if "SigAlg" in saml_msg and "Signature" in saml_msg: # Signed request
|
||||
if "SigAlg" in saml_msg and "Signature" in saml_msg: # Signed
|
||||
# request
|
||||
issuer = _req.issuer.text
|
||||
_certs = IDP.metadata.certs(issuer, "any", "signing")
|
||||
verified_ok = False
|
||||
@@ -381,7 +384,7 @@ class SSO(Service):
|
||||
|
||||
if self.user:
|
||||
if _req.force_authn is not None and \
|
||||
_req.force_authn.lower() == 'true':
|
||||
_req.force_authn.lower() == 'true':
|
||||
saml_msg["req_info"] = self.req_info
|
||||
key = self._store_request(saml_msg)
|
||||
return self.not_authn(key, _req.requested_authn_context)
|
||||
@@ -425,7 +428,7 @@ class SSO(Service):
|
||||
return self.operation(saml_msg, BINDING_HTTP_POST)
|
||||
|
||||
# def artifact(self):
|
||||
# # Can be either by HTTP_Redirect or HTTP_POST
|
||||
# # Can be either by HTTP_Redirect or HTTP_POST
|
||||
# _req = self._store_request(self.unpack_either())
|
||||
# if isinstance(_req, basestring):
|
||||
# return self.not_authn(_req)
|
||||
@@ -448,7 +451,7 @@ class SSO(Service):
|
||||
self.user = user
|
||||
self.environ[
|
||||
"idp.authn"] = AUTHN_BROKER.get_authn_by_accr(
|
||||
PASSWORD)
|
||||
PASSWORD)
|
||||
except ValueError:
|
||||
resp = Unauthorized()
|
||||
else:
|
||||
@@ -465,6 +468,7 @@ class SSO(Service):
|
||||
self.op_type = "ecp"
|
||||
return self.operation(_dict, BINDING_SOAP)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# === Authentication ====
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -490,11 +494,11 @@ def do_authentication(environ, start_response, authn_context, key,
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
PASSWD = {
|
||||
"daev0001": "qwerty",
|
||||
"haho0032": "qwerty",
|
||||
"roland": "dianakra",
|
||||
"babs": "howes",
|
||||
"upper": "crust"}
|
||||
"daev0001": "qwerty",
|
||||
"haho0032": "qwerty",
|
||||
"roland": "dianakra",
|
||||
"babs": "howes",
|
||||
"upper": "crust"}
|
||||
|
||||
|
||||
def username_password_authn(environ, start_response, reference, key,
|
||||
@@ -568,7 +572,7 @@ def not_found(environ, start_response):
|
||||
# === Single log out ===
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
#def _subject_sp_info(req_info):
|
||||
# def _subject_sp_info(req_info):
|
||||
# # look for the subject
|
||||
# subject = req_info.subject_id()
|
||||
# subject = subject.text.strip()
|
||||
@@ -615,13 +619,13 @@ class SLO(Service):
|
||||
response = True
|
||||
|
||||
try:
|
||||
hinfo = IDP.apply_binding(binding, "%s" % resp, destination, relay_state,
|
||||
response=response)
|
||||
hinfo = IDP.apply_binding(binding, "%s" % resp, destination,
|
||||
relay_state, response=response)
|
||||
except Exception as exc:
|
||||
logger.error("ServiceError: %s" % exc)
|
||||
resp = ServiceError("%s" % exc)
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
#_tlh = dict2list_of_tuples(hinfo["headers"])
|
||||
delco = delete_cookie(self.environ, "idpauthn")
|
||||
if delco:
|
||||
@@ -639,35 +643,36 @@ class SLO(Service):
|
||||
else:
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Manage Name ID service
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class NMI(Service):
|
||||
|
||||
def do(self, query, binding, relay_state="", encrypt_cert=None):
|
||||
logger.info("--- Manage Name ID Service ---")
|
||||
req = IDP.parse_manage_name_id_request(query, binding)
|
||||
request = req.message
|
||||
|
||||
|
||||
# Do the necessary stuff
|
||||
name_id = IDP.ident.handle_manage_name_id_request(
|
||||
request.name_id, request.new_id, request.new_encrypted_id,
|
||||
request.terminate)
|
||||
|
||||
|
||||
logger.debug("New NameID: %s" % name_id)
|
||||
|
||||
|
||||
_resp = IDP.create_manage_name_id_response(request)
|
||||
|
||||
|
||||
# It's using SOAP binding
|
||||
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "",
|
||||
relay_state, response=True)
|
||||
|
||||
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# === Assertion ID request ===
|
||||
# ----------------------------------------------------------------------------
|
||||
@@ -683,9 +688,9 @@ class AIDR(Service):
|
||||
except Unknown:
|
||||
resp = NotFound(aid)
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
hinfo = IDP.apply_binding(BINDING_URI, "%s" % assertion, response=True)
|
||||
|
||||
|
||||
logger.debug("HINFO: %s" % hinfo)
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
@@ -715,6 +720,7 @@ class ARS(Service):
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# === Authn query service ===
|
||||
# ----------------------------------------------------------------------------
|
||||
@@ -769,6 +775,7 @@ class ATTR(Service):
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Name ID Mapping service
|
||||
# When an entity that shares an identifier for a principal with an identity
|
||||
@@ -792,17 +799,17 @@ class NIM(Service):
|
||||
except PolicyError:
|
||||
resp = BadRequest("Unknown entity")
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
info = IDP.response_args(request)
|
||||
_resp = IDP.create_name_id_mapping_response(name_id, **info)
|
||||
|
||||
|
||||
# Only SOAP
|
||||
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "",
|
||||
response=True)
|
||||
|
||||
|
||||
resp = Response(hinfo["data"], headers=hinfo["headers"])
|
||||
return resp(self.environ, self.start_response)
|
||||
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Cookie handling
|
||||
@@ -897,10 +904,10 @@ def metadata(environ, start_response):
|
||||
try:
|
||||
path = args.path
|
||||
if path is None or len(path) == 0:
|
||||
path = os.path.dirname(os.path.abspath( __file__ ))
|
||||
path = os.path.dirname(os.path.abspath(__file__))
|
||||
if path[-1] != "/":
|
||||
path += "/"
|
||||
metadata = create_metadata_string(path+args.config, IDP.config,
|
||||
metadata = create_metadata_string(path + args.config, IDP.config,
|
||||
args.valid, args.cert, args.keyfile,
|
||||
args.id, args.name, args.sign)
|
||||
start_response('200 OK', [('Content-Type', "text/xml")])
|
||||
@@ -961,7 +968,6 @@ def application(environ, start_response):
|
||||
except KeyError:
|
||||
user = None
|
||||
|
||||
|
||||
url_patterns = AUTHN_URLS
|
||||
if not user:
|
||||
logger.info("-- No USER --")
|
||||
@@ -995,7 +1001,8 @@ if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-p', dest='path', help='Path to configuration file.')
|
||||
parser.add_argument('-v', dest='valid',
|
||||
help="How long, in days, the metadata is valid from the time of creation")
|
||||
help="How long, in days, the metadata is valid from "
|
||||
"the time of creation")
|
||||
parser.add_argument('-c', dest='cert', help='certificate')
|
||||
parser.add_argument('-i', dest='id',
|
||||
help="The ID of the entities descriptor")
|
||||
|
||||
Reference in New Issue
Block a user