Use the same naming format everywhere.

New entity method allowed for refactoring.
This commit is contained in:
Roland Hedberg
2013-01-13 18:14:27 +01:00
parent cb40903ef1
commit fdc0555c41
6 changed files with 78 additions and 26 deletions

View File

@@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
REQ2SRV = {
# IDP
"authn_request": "single_sign_on_service",
"nameid_mapping_request": "name_id_mapping_service",
"name_id_mapping_request": "name_id_mapping_service",
# AuthnAuthority
"authn_query": "authn_query_service",
# AttributeAuthority
@@ -34,7 +34,7 @@ REQ2SRV = {
"assertion_id_request": "assertion_id_request_service",
# IDP + SP
"logout_request": "single_logout_service",
"manage_nameid_query": "manage_name_id_service",
"manage_name_id_query": "manage_name_id_service",
"artifact_query": "artifact_resolution_service",
# SP
"assertion_response": "assertion_consumer_service",

View File

@@ -36,19 +36,19 @@ class Request(object):
self.not_on_or_after = 0
def _loads(self, xmldata, binding):
if binding == BINDING_HTTP_REDIRECT:
logger.debug("Expected to decode and inflate xml data")
decoded_xml = s_utils.decode_base64_and_inflate(xmldata)
elif binding == BINDING_HTTP_POST:
decoded_xml = base64.b64decode(xmldata)
else:
decoded_xml = xmldata
# if binding == BINDING_HTTP_REDIRECT:
# logger.debug("Expected to decode and inflate xml data")
# decoded_xml = s_utils.decode_base64_and_inflate(xmldata)
# elif binding == BINDING_HTTP_POST:
# decoded_xml = base64.b64decode(xmldata)
# else:
# decoded_xml = xmldata
# own copy
self.xmlstr = decoded_xml[:]
self.xmlstr = xmldata[:]
logger.info("xmlstr: %s" % (self.xmlstr,))
try:
self.message = self.signature_check(decoded_xml)
self.message = self.signature_check(xmldata)
except TypeError:
raise
except Exception, excp:
@@ -56,7 +56,7 @@ class Request(object):
if not self.message:
logger.error("Response was not correctly signed")
logger.info(decoded_xml)
logger.info(xmldata)
raise IncorrectlySigned()
logger.info("request: %s" % (self.message,))
@@ -178,5 +178,5 @@ class NameIDMappingRequest(Request):
timeslack=0):
Request.__init__(self, sec_context, receiver_addrs,
attribute_converters, timeslack)
self.signature_check = self.sec.correctly_signed_nameid_mapping_request
self.signature_check = self.sec.correctly_signed_name_id_mapping_request

View File

@@ -23,7 +23,8 @@ import logging
import shelve
import sys
import memcache
from saml2.samlp import AuthzDecisionQuery
from saml2.soap import parse_soap_enveloped_saml_name_id_mapping_request
from saml2.samlp import AuthzDecisionQuery, NameIDMappingResponse
from saml2.samlp import AssertionIDRequest
from saml2.samlp import AuthnQuery
from saml2.entity import Entity
@@ -332,7 +333,7 @@ class Server(Entity):
"authn_query_service", binding,
"authn_query")
def parse_nameid_mapping_request(self, xml_string, binding):
def parse_name_id_mapping_request(self, xml_string, binding):
""" Parse a nameid mapping request
:param xml_string: The NameIDMappingRequest as an XML string
@@ -341,7 +342,7 @@ class Server(Entity):
return self._parse_request(xml_string, NameIDMappingRequest,
"manage_name_id_service", binding,
"nameid_mapping_request")
"name_id_mapping_request")
# ------------------------------------------------------------------------
@@ -594,3 +595,33 @@ class Server(Entity):
return self._response(in_response_to, "", status, issuer,
sign_response, to_sign, **args)
def create_name_id_mapping_response(self, name_id=None, encrypted_id=None,
in_response_to=None,
issuer=None, sign_response=False,
status=None):
"""
protocol for mapping a principal's name identifier into a
different name identifier for the same principal.
Done over soap.
:param name_id:
:param encrypted_id:
:param in_response_to:
:param issuer:
:param sign_response:
:param status:
:return:
"""
# Done over SOAP
ms_args = self.message_args()
_resp = NameIDMappingResponse(name_id, encrypted_id,
in_response_to=in_response_to, **ms_args)
if sign_response:
return self.sign(_resp)
else:
logger.info("Message: %s" % _resp)
return _resp

View File

@@ -781,10 +781,16 @@ class SecurityContext(object):
"authz_decision_query", must,
origdoc)
def correctly_signed_nameid_mapping_request(self, decoded_xml, must=False,
def correctly_signed_name_id_mapping_request(self, decoded_xml, must=False,
origdoc=None):
return self.correctly_signed_message(decoded_xml,
"name id_mapping_request",
"name_id_mapping_request",
must, origdoc)
def correctly_signed_name_id_mapping_response(self, decoded_xml, must=False,
origdoc=None):
return self.correctly_signed_message(decoded_xml,
"name_id_mapping_response",
must, origdoc)
def correctly_signed_response(self, decoded_xml, must=False, origdoc=None):

View File

@@ -64,11 +64,11 @@ def parse_soap_enveloped_saml_artifact_response(text):
expected_tag = '{%s}ArtifactResponse' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_nameid_mapping_request(text):
def parse_soap_enveloped_saml_name_id_mapping_request(text):
expected_tag = '{%s}NameIDMappingRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_nameid_mapping_response(text):
def parse_soap_enveloped_saml_name_id_mapping_response(text):
expected_tag = '{%s}NameIDMappingResponse' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import base64
from urlparse import parse_qs
from saml2.saml import AUTHN_PASSWORD
from saml2.samlp import response_from_string
@@ -151,17 +152,27 @@ class TestServer1():
destination = "http://www.example.com",
id = "id1")
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
# should raise an error because faulty spentityid
raises(OtherError, self.server.parse_authn_request, intermed)
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
raises(OtherError, self.server.parse_authn_request,
_dict["SAMLRequest"][0], binding)
def test_parse_faulty_request_to_err_status(self):
authn_request = self.client.create_authn_request(
destination = "http://www.example.com")
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
try:
self.server.parse_authn_request(intermed)
self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
status = None
except OtherError, oe:
print oe.args
@@ -182,9 +193,13 @@ class TestServer1():
destination = "http://localhost:8088/sso")
print authn_request
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
"http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
req = self.server.parse_authn_request(intermed)
req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
# returns a dictionary
print req
resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])