Merge branch 'master' of github.com:rohe/pysaml2

This commit is contained in:
Roland Hedberg
2015-04-30 12:21:10 +02:00
10 changed files with 167 additions and 27 deletions

View File

@@ -12,6 +12,7 @@
# serve to show the default.
import sys, os
import alabaster
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@@ -22,6 +23,7 @@ import sys, os
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage']
# Add any paths that contain templates here, relative to this directory.
@@ -91,6 +93,7 @@ pygments_style = 'sphinx'
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme_path = [alabaster.get_path()]
html_theme = 'alabaster'
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'

View File

@@ -336,6 +336,8 @@ class SSO(Service):
_resp = IDP.create_authn_response(
identity, userid=self.user,
encrypt_cert=encrypt_cert,
encrypt_assertion_self_contained=True,
encrypted_advice_attributes=True,
**resp_args)
except Exception as excp:
logging.error(exception_trace(excp))
@@ -400,9 +402,9 @@ class SSO(Service):
return resp(self.environ, self.start_response)
if self.user:
saml_msg["req_info"] = self.req_info
if _req.force_authn is not None and \
_req.force_authn.lower() == 'true':
saml_msg["req_info"] = self.req_info
key = self._store_request(saml_msg)
return self.not_authn(key, _req.requested_authn_context)
else:
@@ -1014,6 +1016,7 @@ def application(environ, start_response):
if isinstance(callback, tuple):
cls = callback[0](environ, start_response, user)
func = getattr(cls, callback[1])
return func()
return callback(environ, start_response, user)

View File

@@ -2,13 +2,14 @@
import logging
import re
import argparse
from saml2.extension.pefim import SPCertEnc
import service_conf
from Cookie import SimpleCookie
from urlparse import parse_qs
import sys
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_REDIRECT, element_to_extension_element
from saml2 import BINDING_SOAP
from saml2 import time_util
from saml2 import ecp
@@ -33,6 +34,8 @@ from saml2.s_utils import UnsupportedBinding
from saml2.s_utils import sid
from saml2.s_utils import rndstr
#from srtest import exception_trace
from saml2.md import Extensions
import xmldsig as ds
logger = logging.getLogger("")
hdlr = logging.FileHandler('spx.log')
@@ -152,6 +155,7 @@ class Cache(object):
self.uid2user = {}
self.cookie_name = "spauthn"
self.outstanding_queries = {}
self.outstanding_certs = {}
self.relay_state = {}
self.user = {}
self.result = {}
@@ -348,7 +352,7 @@ class ACS(Service):
try:
self.response = self.sp.parse_authn_request_response(
response, binding, self.outstanding_queries)
response, binding, self.outstanding_queries, self.cache.outstanding_certs)
except UnknownPrincipal, excp:
logger.error("UnknownPrincipal: %s" % (excp,))
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
@@ -551,13 +555,31 @@ class SSO(object):
"assertion_consumer_service"]
# just pick one
endp, return_binding = acs[0]
extensions = None
cert = None
if _cli.config.generate_cert_func is not None:
cert_str, req_key_str = _cli.config.generate_cert_func()
cert = {
"cert": cert_str,
"key": req_key_str
}
spcertenc = SPCertEnc(x509_data=ds.X509Data(
x509_certificate=ds.X509Certificate(text=cert_str)))
extensions = Extensions(extension_elements=[
element_to_extension_element(spcertenc)])
req_id, req = _cli.create_authn_request(destination,
binding=return_binding)
binding=return_binding, extensions=extensions)
_rstate = rndstr()
self.cache.relay_state[_rstate] = came_from
ht_args = _cli.apply_binding(_binding, "%s" % req, destination,
relay_state=_rstate)
_sid = req_id
if cert is not None:
self.cache.outstanding_certs[_sid] = cert
except Exception, exc:
logger.exception(exc)
resp = ServiceError(

View File

@@ -27,7 +27,7 @@ install_requires = [
'paste',
'zope.interface',
'repoze.who',
'pycrypto >= 2.2', # 'Crypto'
'pycrypto >= 2.5', # 'Crypto'
'pytz',
'pyOpenSSL',
'python-dateutil',
@@ -37,7 +37,7 @@ install_requires = [
tests_require = [
'mongodict',
'pyasn1',
'pymongo',
'pymongo==3.0.1',
'python-memcached == 1.51',
'pytest',
'mako',

View File

@@ -87,7 +87,7 @@ class AESCipher(object):
return cmsg
def decrypt(self, msg, iv=None, padding="PKCS#7", b64dec=True):
def decrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7", b64dec=True):
"""
:param key: The encryption key
:param iv: init vector
@@ -102,7 +102,7 @@ class AESCipher(object):
_iv = data[:AES.block_size]
if iv:
assert iv == _iv
cipher, iv = self.build_cipher(iv)
cipher, iv = self.build_cipher(iv, alg=alg)
res = cipher.decrypt(data)[AES.block_size:]
if padding in ["PKCS#5", "PKCS#7"]:
res = res[:-ord(res[-1])]

View File

@@ -64,21 +64,73 @@ class Saml2Client(Base):
:return: session id and AuthnRequest info
"""
destination = self._sso_location(entityid, binding)
reqid, negotiated_binding, info = self.prepare_for_negotiated_authenticate(
entityid=entityid,
relay_state=relay_state,
binding=binding,
vorg=vorg,
nameid_format=nameid_format,
scoping=scoping,
consent=consent,
extensions=extensions,
sign=sign,
response_binding=response_binding,
**kwargs)
reqid, req = self.create_authn_request(destination, vorg, scoping,
response_binding, nameid_format,
consent=consent,
extensions=extensions, sign=sign,
**kwargs)
_req_str = "%s" % req
logger.info("AuthNReq: %s" % _req_str)
info = self.apply_binding(binding, _req_str, destination, relay_state)
assert negotiated_binding == binding
return reqid, info
def prepare_for_negotiated_authenticate(self, entityid=None, relay_state="",
binding=None, vorg="",
nameid_format=None,
scoping=None, consent=None, extensions=None,
sign=None,
response_binding=saml2.BINDING_HTTP_POST,
**kwargs):
""" Makes all necessary preparations for an authentication request that negotiates
which binding to use for authentication.
:param entityid: The entity ID of the IdP to send the request to
:param relay_state: To where the user should be returned after
successfull log in.
:param binding: Which binding to use for sending the request
:param vorg: The entity_id of the virtual organization I'm a member of
:param scoping: For which IdPs this query are aimed.
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
:param sign: Whether the request should be signed or not.
:param response_binding: Which binding to use for receiving the response
:param kwargs: Extra key word arguments
:return: session id and AuthnRequest info
"""
expected_binding = binding
for binding in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST]:
if expected_binding and binding != expected_binding:
continue
destination = self._sso_location(entityid, binding)
logger.info("destination to provider: %s" % destination)
reqid, request = self.create_authn_request(
destination, vorg, scoping, response_binding, nameid_format,
consent=consent,
extensions=extensions, sign=sign,
**kwargs)
_req_str = str(request)
logger.info("AuthNReq: %s" % _req_str)
http_info = self.apply_binding(binding, _req_str, destination,
relay_state)
return reqid, binding, http_info
else:
raise SignOnError("No supported bindings available for authentication")
def global_logout(self, name_id, reason="", expire=None, sign=None):
""" More or less a layer of indirection :-/
Bootstrapping the whole thing by finding all the IdPs that should

View File

@@ -71,6 +71,10 @@ class VerifyError(SAMLError):
pass
class SignOnError(SAMLError):
pass
class LogoutError(SAMLError):
pass

View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python
import logging
from pymongo.mongo_client import MongoClient
__author__ = 'rolandh'
from pymongo import Connection
#import cjson
import time
from datetime import datetime
@@ -18,9 +18,9 @@ logger = logging.getLogger(__name__)
class Cache(object):
def __init__(self, server=None, debug=0, db=None):
if server:
connection = Connection(server)
connection = MongoClient(server)
else:
connection = Connection()
connection = MongoClient()
if db:
self._db = connection[db]

View File

@@ -1820,10 +1820,11 @@ def pre_encrypt_assertion(response):
assertion = response.assertion
response.assertion = None
response.encrypted_assertion = EncryptedAssertion()
if isinstance(assertion, list):
response.encrypted_assertion.add_extension_elements(assertion)
else:
response.encrypted_assertion.add_extension_element(assertion)
if assertion is not None:
if isinstance(assertion, list):
response.encrypted_assertion.add_extension_elements(assertion)
else:
response.encrypted_assertion.add_extension_element(assertion)
# txt = "%s" % response
# _ass = "%s" % assertion
# _ass = rm_xmltag(_ass)

View File

@@ -635,6 +635,26 @@ class TestClientWithDummy():
resp_args = self.server.response_args(req.message, [response_binding])
assert resp_args["binding"] == response_binding
def test_do_negotiated_authn(self):
binding = BINDING_HTTP_REDIRECT
response_binding = BINDING_HTTP_POST
sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate(
IDP, "http://www.example.com/relay_state",
binding=binding, response_binding=response_binding)
assert binding == auth_binding
assert isinstance(sid, basestring)
assert len(http_args) == 4
assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == []
redirect_url = http_args["headers"][0][1]
_, _, _, _, qs, _ = urlparse.urlparse(redirect_url)
qs_dict = urlparse.parse_qs(qs)
req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0],
binding)
resp_args = self.server.response_args(req.message, [response_binding])
assert resp_args["binding"] == response_binding
def test_do_attribute_query(self):
response = self.client.do_attribute_query(
IDP, "_e7b68a04488f715cda642fbdd90099f5",
@@ -699,6 +719,41 @@ class TestClientWithDummy():
'http://www.example.com/login'
assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
def test_negotiated_post_sso(self):
binding = BINDING_HTTP_POST
response_binding = BINDING_HTTP_POST
sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate(
"urn:mace:example.com:saml:roland:idp", relay_state="really",
binding=binding, response_binding=response_binding)
_dic = unpack_form(http_args["data"][3])
assert binding == auth_binding
req = self.server.parse_authn_request(_dic["SAMLRequest"], binding)
resp_args = self.server.response_args(req.message, [response_binding])
assert resp_args["binding"] == response_binding
# Normally a response would now be sent back to the users web client
# Here I fake what the client will do
# create the form post
http_args["data"] = urllib.urlencode(_dic)
http_args["method"] = "POST"
http_args["dummy"] = _dic["SAMLRequest"]
http_args["headers"] = [('Content-type',
'application/x-www-form-urlencoded')]
response = self.client.send(**http_args)
print response.text
_dic = unpack_form(response.text[3], "SAMLResponse")
resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
BINDING_HTTP_POST,
{sid: "/"})
ac = resp.assertion.authn_statement[0].authn_context
assert ac.authenticating_authority[0].text == \
'http://www.example.com/login'
assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
# if __name__ == "__main__":
# tc = TestClient()
@@ -708,4 +763,4 @@ class TestClientWithDummy():
if __name__ == "__main__":
tc = TestClient()
tc.setup_class()
tc.test_sign_then_encrypt_assertion_advice()
tc.test_sign_then_encrypt_assertion_advice()