To increase testability. Name harmonization with other modules. Dictionary instead of cgi.FieldStorage in basic modules

This commit is contained in:
Roland Hedberg
2010-04-08 15:43:26 +02:00
parent 2072d844ad
commit 7551eacd66

View File

@@ -49,7 +49,15 @@ def construct_came_from(environ):
return came_from
# FormPluginBase defines the methods remember and forget
def cgi_fieldStorage_to_dict( fieldStorage ):
"""Get a plain dictionary, rather than the '.value' system used by the
cgi module."""
params = {}
for key in fieldStorage.keys():
params[ key ] = fieldStorage[ key ].value
return params
class SAML2Plugin(FormPluginBase):
implements(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider)
@@ -81,7 +89,7 @@ class SAML2Plugin(FormPluginBase):
self.metadata = self.conf["metadata"]
except KeyError:
self.metadata = None
self.outstanding_authn = {}
self.outstanding_queries = {}
self.iam = os.uname()[1]
if cache:
@@ -164,7 +172,7 @@ class SAML2Plugin(FormPluginBase):
vorg=vorg)
# remember the request
self.outstanding_authn[sid] = came_from
self.outstanding_queries[sid] = came_from
if self.debug:
self.log and self.log.info('sc returned: %s' % (result,))
@@ -205,16 +213,55 @@ class SAML2Plugin(FormPluginBase):
identity["password"] = ""
identity['repoze.who.userid'] = name_id
identity["user"] = session_info["ava"]
if self.debug:
self.log and self.log.info("Identity: %s" % identity)
if self.debug and self.log:
self.log.info("Identity: %s" % identity)
return identity
def _eval_authn_response(self, environ, post):
""" """
self.log and self.log.info("Got AuthN response, checking..")
scl = Saml2Client(environ, self.conf, self.debug)
print "Outstanding: %s" % (self.outstanding_queries,)
try:
# Evaluate the response, returns a AuthnResponse instance
try:
ar = scl.response(post, self.conf["entityid"],
self.outstanding_queries, self.log)
except Exception, excp:
self.log and self.log.error("Exception: %s" % (excp,))
raise
session_info = ar.session_info()
# Cache it
name_id = self._cache_session(session_info)
if self.debug:
self.log and self.log.info("stored %s with key %s" % (
session_info, name_id))
except TypeError, excp:
self.log and self.log.error("Exception: %s" % (excp,))
return None
if session_info["came_from"]:
if self.debug:
self.log and self.log.info(
"came_from << %s" % session_info["came_from"])
try:
path, query = session_info["came_from"].split('?')
environ["PATH_INFO"] = path
environ["QUERY_STRING"] = query
except ValueError:
environ["PATH_INFO"] = session_info["came_from"]
return session_info
#### IIdentifier ####
def identify(self, environ):
self.log = environ.get('repoze.who.logger','')
self.log.info("ENVIRON: %s" % environ)
self.log.info("self: %s" % (self.__dict__,))
if self.log:
self.log.info("ENVIRON: %s" % environ)
self.log.info("self: %s" % (self.__dict__,))
uri = environ.get('REQUEST_URI', construct_url(environ))
if self.debug:
@@ -242,43 +289,15 @@ class SAML2Plugin(FormPluginBase):
# check for SAML2 authN response
#if self.debug:
self.log.debug("Got AuthN response, checking..")
scl = Saml2Client(environ, self.conf, self.debug)
try:
# Evaluate the response, returns a AuthnResponse instance
try:
ar = scl.response(post, self.conf["entityid"],
self.outstanding_authn,
self.log)
except Exception, excp:
self.log.error("Exception: %s" % (excp,))
raise
session_info = ar.session_info()
# Cache it
name_id = self._cache_session(session_info)
if self.debug:
self.log and self.log.info("stored %s with key %s" % (
session_info, name_id))
except TypeError, excp:
self.log.error("Exception: %s" % (excp,))
return None
if session_info["came_from"]:
if self.debug:
self.log and self.log.info(
"came_from << %s" % session_info["came_from"])
try:
path, query = session_info["came_from"].split('?')
environ["PATH_INFO"] = path
environ["QUERY_STRING"] = query
except ValueError:
environ["PATH_INFO"] = session_info["came_from"]
environ["s2repoze.sessioninfo"] = session_info
session_info = _eval_authn_response(self, environ,
cgi_fieldStorage_to_dict(post))
if session_info:
environ["s2repoze.sessioninfo"] = session_info
# contruct and return the identity
return self._construct_identity(name_id, session_info)
# contruct and return the identity
return self._construct_identity(name_id, session_info)
else:
return None
def _vo_members_to_ask(self, subject_id):
# Find the member of the Virtual Organization that I haven't