Fixed so ECP now works both on SP, IdP and Client side. Minor tweaks left.

This commit is contained in:
Roland Hedberg
2013-01-24 10:39:51 +01:00
parent cf2d75b70d
commit b295a359b9
10 changed files with 344 additions and 185 deletions

View File

@@ -18,13 +18,13 @@
"""Contains classes and functions that a SAML2.0 Service Provider (SP) may use
to conclude its tasks.
"""
from saml2.schema import soapenv
from saml2.entity import Entity
from saml2.mdstore import destinations
from saml2.saml import AssertionIDRef
from saml2.profile import paos, ecp
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.samlp import AuthnQuery
from saml2.samlp import AssertionIDRequest
from saml2.samlp import NameIDMappingRequest
from saml2.samlp import AttributeQuery
from saml2.samlp import AuthzDecisionQuery
@@ -32,6 +32,7 @@ from saml2.samlp import AuthnRequest
import saml2
import time
from saml2.soap import make_soap_enveloped_saml_thingy
try:
from urlparse import parse_qs
@@ -42,8 +43,9 @@ except ImportError:
from saml2.s_utils import signature
from saml2.s_utils import do_attributes
from saml2 import samlp, BINDING_SOAP
from saml2 import samlp, BINDING_SOAP, element_to_extension_element
from saml2 import saml
from saml2 import soap
from saml2.population import Population
from saml2.response import AttributeResponse
@@ -71,6 +73,10 @@ FORM_SPEC = """<form method="post" action="%s">
LAX = False
IDPDISC_POLICY = "urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol:single"
ECP_SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
ACTOR = "http://schemas.xmlsoap.org/soap/actor/next"
MIME_PAOS = "application/vnd.paos+xml"
class IdpUnspecified(Exception):
pass
@@ -219,24 +225,27 @@ class Base(Entity):
:return: <samlp:AuthnRequest> instance
"""
args = {}
try:
service_url = kwargs["assertion_consumer_service_url"]
args["assertion_consumer_service_url"] = kwargs["assertion_consumer_service_url"]
except KeyError:
if service_url_binding is None:
service_url = self.service_url(binding)
else:
service_url = self.service_url(service_url_binding)
args["assertion_consumer_service_url"] = service_url
try:
my_name = kwargs["provider_name"]
args["provider_name"] = kwargs["provider_name"]
except KeyError:
if binding == BINDING_PAOS:
my_name = None
pass
else:
my_name = self._my_name()
args["provider_name"] = self._my_name()
try:
name_id_policy = kwargs["name_id_policy"]
args["name_id_policy"] = kwargs["name_id_policy"]
del kwargs["name_id_policy"]
except:
if allow_create:
allow_create="true"
@@ -257,22 +266,24 @@ class Base(Entity):
name_id_policy.format = saml.NAMEID_FORMAT_PERSISTENT
except KeyError:
pass
args["name_id_policy"] = name_id_policy
if extensions is None:
extensions = []
for key,val in kwargs.items():
if key not in AuthnRequest.c_attributes and \
key not in AuthnRequest.c_children:
# extension elements allowed
extensions.append(saml2.element_to_extension_element(val))
if kwargs:
if extensions is None:
extensions = []
fargs = [p for p,c,r in AuthnRequest.c_attributes.values()]
fargs.extend([p for p,c in AuthnRequest.c_children.values()])
for key,val in kwargs.items():
if key not in fargs:
# extension elements allowed
extensions.append(saml2.element_to_extension_element(val))
else:
args[key] = val
return self._message(AuthnRequest, destination, id, consent,
extensions, sign,
assertion_consumer_service_url=service_url,
protocol_binding=binding,
name_id_policy=name_id_policy,
provider_name=my_name,
scoping=scoping)
scoping=scoping, **args)
def create_attribute_query(self, destination, subject_id,
@@ -550,3 +561,87 @@ class Base(Entity):
"""
return self._parse_response(txt, NameIDMappingResponse, "", binding)
# ------------------- ECP ------------------------------------------------
def create_ecp_authn_request(self, entityid=None, relay_state="", sign=False):
""" Makes an authentication request.
:param entityid: The entity ID of the IdP to send the request to
:param relay_state: A token that can be used by the SP to know
where to continue the conversation with the client
:param sign: Whether the request should be signed or not.
:return: SOAP message with the AuthnRequest
"""
# ----------------------------------------
# <paos:Request>
# ----------------------------------------
my_url = self.service_url(BINDING_PAOS)
# must_understand and act according to the standard
#
paos_request = paos.Request(must_understand="1", actor=ACTOR,
response_consumer_url=my_url,
service = ECP_SERVICE)
# ----------------------------------------
# <ecp:RelayState>
# ----------------------------------------
relay_state = ecp.RelayState(actor=ACTOR, must_understand="1",
text=relay_state)
# ----------------------------------------
# <samlp:AuthnRequest>
# ----------------------------------------
logger.info("entityid: %s, binding: %s" % (entityid, BINDING_SOAP))
# The IDP publishes support for ECP by using the SOAP binding on
# SingleSignOnService
_, location = self.pick_binding("single_sign_on_service",
[BINDING_SOAP], entity_id=entityid)
authn_req = self.create_authn_request(location, binding=BINDING_SOAP,
service_url_binding=BINDING_PAOS)
# ----------------------------------------
# The SOAP envelope
# ----------------------------------------
soap_envelope = make_soap_enveloped_saml_thingy(authn_req,[paos_request,
relay_state])
return authn_req.id, "%s" % soap_envelope
def parse_ecp_authn_response(self, str, outstanding=None):
rdict = soap.class_instances_from_soap_enveloped_saml_thingies(str,
[paos,
ecp,
samlp])
_relay_state = None
for item in rdict["header"]:
if item.c_tag == "RelayState" and\
item.c_namespace == ecp.NAMESPACE:
_relay_state = item
response = self.parse_authn_request_response(rdict["body"],
BINDING_PAOS, outstanding)
return response, _relay_state
def can_handle_ecp_response(self, response):
try:
accept = response.headers["accept"]
except KeyError:
try:
accept = response.headers["Accept"]
except KeyError:
return False
if MIME_PAOS in accept:
return True
else:
return False

View File

@@ -150,7 +150,8 @@ class ConfigurationError(Exception):
class Config(object):
def_context = ""
def __init__(self):
def __init__(self, homedir="."):
self._homedir = homedir
self.entityid = None
self.xmlsec_binary= None
self.debug=False

View File

@@ -37,7 +37,6 @@ from saml2.response import authn_response
logger = logging.getLogger(__name__)
SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
def ecp_capable(headers):
if "application/vnd.paos+xml" in headers["Accept"]:
@@ -48,7 +47,6 @@ def ecp_capable(headers):
return False
ACTOR = "http://schemas.xmlsoap.org/soap/actor/next"
#noinspection PyUnusedLocal
def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
@@ -68,7 +66,7 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
# ----------------------------------------
my_url = cls.service_url(BINDING_PAOS)
# must_understan and actor according to the standard
# must_understand and actor according to the standard
#
paos_request = paos.Request(must_understand="1", actor=ACTOR,
response_consumer_url=my_url,

View File

@@ -16,34 +16,38 @@
# limitations under the License.
"""
Contains a class that can be used handle all the ECP handling for other python
Contains a class that can do SAML ECP Authentication for other python
programs.
"""
import cookielib
import logging
import sys
from saml2 import soap
from saml2 import saml
from saml2 import samlp
from saml2 import BINDING_PAOS
from saml2 import BINDING_SOAP
from saml2 import class_name
from saml2.client_base import MIME_PAOS
from saml2.config import Config
from saml2.entity import Entity
from saml2.httpbase import set_list2dict, dict2set_list
from saml2.profile import paos
from saml2.profile import ecp
from saml2.metadata import MetaData
from saml2.mdstore import MetadataStore
from saml2.s_utils import BadRequest
SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
PAOS_HEADER_INFO = 'ver="%s";"%s"' % (paos.NAMESPACE, SERVICE)
logger = logging.getLogger(__name__)
class Client(object):
class Client(Entity):
def __init__(self, user, passwd, sp="", idp=None, metadata_file=None,
xmlsec_binary=None, verbose=0, ca_certs="",
disable_ssl_certificate_validation=True):
disable_ssl_certificate_validation=True, key_file=None,
cert_file=None):
"""
:param user: user name
:param passwd: user password
@@ -58,6 +62,13 @@ class Client(object):
disable_ssl_certificate_validation is true, SSL cert validation
will not be performed.
"""
config = Config()
config.disable_ssl_certificate_validation = disable_ssl_certificate_validation
config.key_file = key_file
config.cert_file = cert_file
config.ca_certs = ca_certs
Entity.__init__(self, "sp", config)
self._idp = idp
self._sp = sp
self.user = user
@@ -65,10 +76,9 @@ class Client(object):
self._verbose = verbose
if metadata_file:
self._metadata = MetaData()
self._metadata.import_metadata(open(metadata_file).read(),
xmlsec_binary)
self._debug_info("Loaded metadata from '%s'" % metadata_file)
self._metadata = MetadataStore([saml, samlp], None, xmlsec_binary)
self._metadata.load("local", metadata_file)
logger.debug("Loaded metadata from '%s'" % metadata_file)
else:
self._metadata = None
@@ -76,91 +86,55 @@ class Client(object):
self.done_ecp = False
self.cookie_jar = cookielib.LWPCookieJar()
self.http = soap.HTTPClient(self._sp, cookiejar=self.cookie_jar,
ca_certs=ca_certs,
disable_ssl_certificate_validation=disable_ssl_certificate_validation)
def _debug_info(self, text):
logger.debug(text)
if self._verbose:
print >> sys.stderr, text
def find_idp_endpoint(self, idp_entity_id):
if self._idp:
return self._idp
if idp_entity_id and not self._metadata:
raise Exception(
"Can't handle IdP entity ID if I don't have metadata")
if idp_entity_id:
for binding in [BINDING_PAOS, BINDING_SOAP]:
ssos = self._metadata.single_sign_on_services(idp_entity_id,
binding=binding)
if ssos:
self._idp = ssos[0]
logger.debug("IdP endpoint: '%s'" % self._idp)
return self._idp
raise Exception("No suitable endpoint found for entity id '%s'" % (
idp_entity_id,))
else:
raise Exception("No entity ID -> no endpoint")
def phase2(self, authn_request, rc_url, idp_entity_id, headers=None,
idp_endpoint=None, sign=False, sec=""):
sign=False, **kwargs):
"""
Doing the second phase of the ECP conversation
Doing the second phase of the ECP conversation, the conversation
with the IdP happens.
:param authn_request: The AuthenticationRequest
:param rc_url: The assertion consumer service url
:param rc_url: The assertion consumer service url of the SP
:param idp_entity_id: The EntityID of the IdP
:param headers: Possible extra headers
:param idp_endpoint: Where to send it all
:param sign: If the message should be signed
:param sec: security context
:return: The response from the IdP
"""
idp_request = soap.make_soap_enveloped_saml_thingy(authn_request)
if sign:
_signed = sec.sign_statement_using_xmlsec(idp_request,
class_name(authn_request),
nodeid=authn_request.id)
idp_request = _signed
if not idp_endpoint:
idp_endpoint = self.find_idp_endpoint(idp_entity_id)
_, destination = self.pick_binding("single_sign_on_service",
[BINDING_PAOS], "idpsso",
entity_id=idp_entity_id)
if self.user and self.passwd:
self.http.add_credentials(self.user, self.passwd)
ht_args = self.apply_binding(BINDING_PAOS, authn_request, destination,
sign=sign)
self._debug_info("[P2] Sending request: %s" % idp_request)
if headers:
ht_args["headers"].extend(headers)
logger.debug("[P2] Sending request: %s" % ht_args["data"])
# POST the request to the IdP
response = self.http.post(idp_request, headers=headers,
path=idp_endpoint)
response = self.send(destination, **ht_args)
self._debug_info("[P2] Got IdP response: %s" % response)
logger.debug("[P2] Got IdP response: %s" % response)
if response is None or response is False:
if response.status_code != 200:
raise Exception(
"Request to IdP failed (%s): %s" % (self.http.response.status,
self.http.error_description))
"Request to IdP failed (%s): %s" % (response.status_code,
response.error))
# SAMLP response in a SOAP envelope body, ecp response in headers
respdict = soap.class_instances_from_soap_enveloped_saml_thingies(
response, [paos, ecp,samlp])
respdict = self.parse_soap_message(response.text)
if respdict is None:
raise Exception("Unexpected reply from the IdP")
self._debug_info("[P2] IdP response dict: %s" % respdict)
logger.debug("[P2] IdP response dict: %s" % respdict)
idp_response = respdict["body"]
assert idp_response.c_tag == "Response"
self._debug_info("[P2] IdP AUTHN response: %s" % idp_response)
logger.debug("[P2] IdP AUTHN response: %s" % idp_response)
_ecp_response = None
for item in respdict["header"]:
@@ -173,21 +147,17 @@ class Client(object):
error = ("response_consumer_url '%s' does not match" % rc_url,
"assertion_consumer_service_url '%s" % _acs_url)
# Send an error message to the SP
fault_text = soap.soap_fault(error)
_ = self.http.post(fault_text, path=rc_url)
_ = self.send(rc_url, "POST", data=soap.soap_fault(error))
# Raise an exception so the user knows something went wrong
raise Exception(error)
return idp_response
#noinspection PyUnusedLocal
def ecp_conversation(self, respdict, idp_entity_id=None):
""" """
def parse_sp_ecp_response(self, respdict):
if respdict is None:
raise Exception("Unexpected reply from the SP")
self._debug_info("[P1] SP response dict: %s" % respdict)
logger.debug("[P1] SP response dict: %s" % respdict)
# AuthnRequest in the body or not
authn_request = respdict["body"]
@@ -197,89 +167,112 @@ class Client(object):
_relay_state = None
_paos_request = None
for item in respdict["header"]:
if item.c_tag == "RelayState" and\
item.c_namespace == ecp.NAMESPACE:
if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
_relay_state = item
if item.c_tag == "Request" and\
item.c_namespace == paos.NAMESPACE:
if item.c_tag == "Request" and item.c_namespace == paos.NAMESPACE:
_paos_request = item
if _paos_request is None:
raise BadRequest("Missing request")
_rc_url = _paos_request.response_consumer_url
return {"authn_request": authn_request, "rc_url": _rc_url,
"relay_state": _relay_state}
def ecp_conversation(self, respdict, idp_entity_id=None):
"""
:param respdict:
:param idp_entity_id:
:return:
"""
args = self.parse_sp_ecp_response(respdict)
# **********************
# Phase 2 - talk to the IdP
# **********************
idp_response = self.phase2(authn_request, _rc_url, idp_entity_id)
idp_response = self.phase2(idp_entity_id=idp_entity_id, **args)
# **********************************
# Phase 3 - back to the SP
# **********************************
sp_response = soap.make_soap_enveloped_saml_thingy(idp_response,
[_relay_state])
ht_args = self.use_soap(idp_response, args["rc_url"],
[args["relay_state"]])
self._debug_info("[P3] Post to SP: %s" % sp_response)
logger.debug("[P3] Post to SP: %s" % ht_args["data"])
headers = {'Content-Type': 'application/vnd.paos+xml', }
ht_args["headers"].append(('Content-Type', 'application/vnd.paos+xml'))
# POST the package from the IdP to the SP
response = self.http.post(sp_response, headers, _rc_url)
response = self.send(args["rc_url"], "POST", **ht_args)
if not response:
if self.http.response.status == 302:
# ignore where the SP is redirecting us to and go for the
# url I started off with.
pass
else:
print self.http.error_description
raise Exception(
"Error POSTing package to SP: %s" % self.http.response.reason)
if response.status_code == 302:
# ignore where the SP is redirecting us to and go for the
# url I started off with.
pass
else:
print response.error
raise Exception(
"Error POSTing package to SP: %s" % response.error)
self._debug_info("[P3] IdP response: %s" % response)
logger.debug("[P3] SP response: %s" % response.text)
self.done_ecp = True
logger.debug("Done ECP")
return None
def add_paos_headers(self, headers=None):
if headers:
headers = set_list2dict(headers)
headers["PAOS"] = PAOS_HEADER_INFO
if "Accept" in headers:
headers["Accept"] += ";%s" % MIME_PAOS
elif "accept" in headers:
headers["Accept"] = headers["accept"]
headers["Accept"] += ";%s" % MIME_PAOS
del headers["accept"]
headers = dict2set_list(headers)
else:
headers = [
('Accept', 'text/html; %s' % MIME_PAOS),
('PAOS', PAOS_HEADER_INFO)
]
def operation(self, idp_entity_id, op, **opargs):
if "path" not in opargs:
opargs["path"] = self._sp
return headers
def operation(self, url, idp_entity_id, op, **opargs):
"""
This is the method that should be used by someone that wants
to authenticate using SAML ECP
:param url: The page that access is sought for
:param idp_entity_id: The entity ID of the IdP that should be
used for authentication
:param op: Which HTTP operation (GET/POST/PUT/DELETE)
:param opargs: Arguments to the HTTP call
:return: The page
"""
if url not in opargs:
url = self._sp
# ********************************************
# Phase 1 - First conversation with the SP
# ********************************************
# headers needed to indicate to the SP that I'm ECP enabled
if "headers" in opargs and opargs["headers"]:
opargs["headers"]["PAOS"] = PAOS_HEADER_INFO
if "Accept" in opargs["headers"]:
opargs["headers"]["Accept"] += ";application/vnd.paos+xml"
elif "accept" in opargs["headers"]:
opargs["headers"]["Accept"] = opargs["headers"]["accept"]
opargs["headers"]["Accept"] += ";application/vnd.paos+xml"
del opargs["headers"]["accept"]
else:
opargs["headers"] = {
'Accept': 'text/html; application/vnd.paos+xml',
'PAOS': PAOS_HEADER_INFO
}
opargs["headers"] = self.add_paos_headers(opargs["headers"])
# request target from SP
# can remove the PAOS header now
# try:
# del opargs["headers"]["PAOS"]
# except KeyError:
# pass
response = op(**opargs)
self._debug_info("[Op] SP response: %s" % response)
response = self.send(url, op, **opargs)
logger.debug("[Op] SP response: %s" % response)
if not response:
if response.status_code != 200:
raise Exception(
"Request to SP failed: %s" % self.http.error_description)
"Request to SP failed: %s" % response.error)
# The response might be a AuthnRequest instance in a SOAP envelope
# body. If so it's the start of the ECP conversation
@@ -290,35 +283,34 @@ class Client(object):
# if 'holder-of-key' option then one or more <ecp:SubjectConfirmation>
# header blocks may also be present
try:
respdict = soap.class_instances_from_soap_enveloped_saml_thingies(
response,[paos, ecp,samlp])
respdict = self.parse_soap_message(response.text)
self.ecp_conversation(respdict, idp_entity_id)
# should by now be authenticated so this should go smoothly
response = op(**opargs)
response = self.send(url, op, **opargs)
except (soap.XmlParseError, AssertionError, KeyError):
pass
#print "RESP",response, self.http.response
if not response:
if self.http.response.status != 404:
raise Exception("Error performing operation: %s" % (
self.http.error_description,))
if response.status_code != 404:
raise Exception("Error performing operation: %s" % (response.error,))
return response
def delete(self, path=None, idp_entity_id=None):
return self.operation(idp_entity_id, self.http.delete, path=path)
# different HTTP operations
def delete(self, url=None, idp_entity_id=None):
return self.operation(url, idp_entity_id, "DELETE")
def get(self, path=None, idp_entity_id=None, headers=None):
return self.operation(idp_entity_id, self.http.get, path=path,
def get(self, url=None, idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "GET", headers=headers)
def post(self, url=None, data="", idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "POST", data=data,
headers=headers)
def post(self, path=None, data="", idp_entity_id=None, headers=None):
return self.operation(idp_entity_id, self.http.post, data=data,
path=path, headers=headers)
def put(self, path=None, data="", idp_entity_id=None, headers=None):
return self.operation(idp_entity_id, self.http.put, data=data,
path=path, headers=headers)
def put(self, url=None, data="", idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "PUT", data=data,
headers=headers)

View File

@@ -2,9 +2,15 @@ import base64
import logging
from hashlib import sha1
from saml2.metadata import ENDPOINTS
from saml2.profile import paos, ecp
from saml2.soap import parse_soap_enveloped_saml_artifact_resolve
from saml2 import samlp, saml, response, BINDING_URI, BINDING_HTTP_ARTIFACT
from saml2 import samlp
from saml2 import saml
from saml2 import response
from saml2 import BINDING_URI
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_PAOS
from saml2 import request
from saml2 import soap
from saml2 import element_to_extension_element
@@ -106,7 +112,10 @@ class Entity(HTTPBase):
self.vorg = None
self.artifact = {}
self.sourceid = self.metadata.construct_source_id()
if self.metadata:
self.sourceid = self.metadata.construct_source_id()
else:
self.sourceid = {}
def _issuer(self, entityid=None):
""" Return an Issuer instance """
@@ -120,7 +129,7 @@ class Entity(HTTPBase):
format=NAMEID_FORMAT_ENTITY)
def apply_binding(self, binding, msg_str, destination="", relay_state="",
response=False):
response=False, sign=False):
"""
Construct the necessary HTTP arguments dependent on Binding
@@ -148,8 +157,8 @@ class Entity(HTTPBase):
info = self.use_http_get(msg_str, destination, relay_state, typ)
info["url"] = destination
info["method"] = "GET"
elif binding == BINDING_SOAP:
info = self.use_soap(msg_str, destination)
elif binding == BINDING_SOAP or binding == BINDING_PAOS:
info = self.use_soap(msg_str, destination, sign=sign)
elif binding == BINDING_URI:
info = self.use_http_uri(msg_str, typ, destination)
elif binding == BINDING_HTTP_ARTIFACT:
@@ -257,6 +266,17 @@ class Entity(HTTPBase):
return xmlstr
def parse_soap_message(self, text):
"""
:param text: The SOAP message
:return: A dictionary with two keys "body" and "header"
"""
return soap.class_instances_from_soap_enveloped_saml_thingies(text,
[paos,
ecp,
samlp])
# --------------------------------------------------------------------------
def sign(self, msg, mid=None, to_sign=None):
@@ -292,7 +312,9 @@ class Entity(HTTPBase):
if not id:
id = sid(self.seed)
kwargs.update(self.message_args(id))
for key, val in self.message_args(id).items():
if key not in kwargs:
kwargs[key] = val
req = request_cls(**kwargs)

View File

@@ -7,6 +7,7 @@ import urlparse
import requests
import time
from Cookie import SimpleCookie
from saml2.profile import paos
from saml2.time_util import utc_now
from saml2 import class_name
from saml2.pack import http_form_post_message
@@ -67,6 +68,11 @@ def _since_epoch(cdate):
#return int(time.mktime(t))
return calendar.timegm(t)
def set_list2dict(sl):
return dict(sl)
def dict2set_list(dic):
return [(k,v) for k,v in dic.items()]
class HTTPBase(object):
def __init__(self, verify=True, ca_bundle=None, key_file=None,
@@ -82,6 +88,8 @@ class HTTPBase(object):
self.request_args["cert"] = (cert_file, key_file)
self.sec = None
self.user = None
self.passwd = None
def cookies(self, url):
"""
@@ -159,6 +167,9 @@ class HTTPBase(object):
if self.cookiejar:
_kwargs["cookies"] = self.cookies(url)
if self.user and self.passwd:
_kwargs["auth"]= (self.user, self.passwd)
#logger.info("SENT COOKIEs: %s" % (_kwargs["cookies"],))
try:
r = requests.request(method, url, **_kwargs)
@@ -245,7 +256,7 @@ class HTTPBase(object):
return info
def use_soap(self, request, destination="", headers=None, sign=False):
def use_soap(self, request, destination="", soap_headers=None, sign=False):
"""
Construct the necessary information for using SOAP+POST
@@ -255,15 +266,12 @@ class HTTPBase(object):
:param sign:
:return: dictionary
"""
if headers is None:
headers = [("content-type", "application/soap+xml")]
else:
headers.append(("content-type", "application/soap+xml"))
headers = [("content-type", "application/soap+xml")]
soap_message = make_soap_enveloped_saml_thingy(request)
soap_message = make_soap_enveloped_saml_thingy(request, soap_headers)
logger.error("SOAP message: %s" % soap_message)
if sign and self.sec:
_signed = self.sec.sign_statement_using_xmlsec(soap_message,
class_name(request),
@@ -301,3 +309,7 @@ class HTTPBase(object):
raise HTTPError("%d:%s" % (response.status_code, response.error))
else:
return None
def add_credentials(self, user, passwd):
self.user = user
self.passwd = passwd

View File

@@ -139,7 +139,7 @@ class AttributeQuery(Request):
return []
class AuthnRequest(Request):
msgtype = "auth_request"
msgtype = "authn_request"
def __init__(self, sec_context, receiver_addrs, attribute_converters,
timeslack=0):
Request.__init__(self, sec_context, receiver_addrs,

View File

@@ -24,11 +24,12 @@ import shelve
import sys
import memcache
from hashlib import sha1
from saml2.schema import soapenv
from saml2.samlp import NameIDMappingResponse
from saml2.samlp import NameIDMappingResponse, Response
from saml2.entity import Entity
from saml2 import saml
from saml2 import saml, element_to_extension_element
from saml2 import class_name
from saml2 import BINDING_HTTP_REDIRECT
@@ -40,8 +41,6 @@ from saml2.request import AuthzDecisionQuery
from saml2.request import AuthnQuery
from saml2.s_utils import MissingValue, Unknown
from saml2.s_utils import BadRequest
from saml2.s_utils import error_status_factory
from saml2.sigver import pre_signature_part, signed_instance_factory
@@ -51,6 +50,8 @@ from saml2.assertion import restriction_from_attribute_spec
from saml2.assertion import filter_attribute_value_assertions
from saml2.ident import IdentDB
#from saml2.profile import paos
from saml2.profile import ecp
logger = logging.getLogger(__name__)
@@ -448,7 +449,7 @@ class Server(Entity):
:param assertion_id:
:param in_response_to:
:param issuer:
:param sign_response:
:param sign:
:param status:
:return:
"""
@@ -524,3 +525,40 @@ class Server(Entity):
return self._response(in_response_to, "", status, issuer,
sign_response, to_sign=[], **args)
# ---------
def parse_ecp_authn_request(self):
pass
def create_ecp_authn_request_response(self, acs_url, identity,
in_response_to, destination,
sp_entity_id, name_id_policy=None,
userid=None, name_id=None, authn=None,
authn_decl=None, issuer=None,
sign_response=False,
sign_assertion=False):
# ----------------------------------------
# <ecp:Response
# ----------------------------------------
ecp_response = ecp.Response(assertion_consumer_service_url=acs_url)
header = soapenv.Header()
header.extension_elements = [element_to_extension_element(ecp_response)]
# ----------------------------------------
# <samlp:Response
# ----------------------------------------
response = self.create_authn_response(identity, in_response_to,
destination, sp_entity_id,
name_id_policy, userid, name_id,
authn, authn_decl, issuer,
sign_response, sign_assertion)
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)]
soap_envelope = soapenv.Envelope(header=header, body=body)
return "%s" % soap_envelope

View File

@@ -65,8 +65,8 @@ def parse_soap_enveloped_saml_logout_request(text):
expected_tag = '{%s}LogoutRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_authentication_request(text):
expected_tag = '{%s}AuthenticationRequest' % SAMLP_NAMESPACE
def parse_soap_enveloped_saml_authn_request(text):
expected_tag = '{%s}AuthnRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_artifact_resolve(text):

View File

@@ -41,7 +41,8 @@ CONFIG = {
"single_sign_on_service" : [
("%s/sso/redirect" % BASE, BINDING_HTTP_REDIRECT),
("%s/sso/post" % BASE, BINDING_HTTP_POST),
("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT)
("%s/sso/art" % BASE, BINDING_HTTP_ARTIFACT),
("%s/sso/paos" % BASE, BINDING_SOAP)
],
"single_logout_service": [
("%s/slo/soap" % BASE, BINDING_SOAP),