Classes for security context and dealing with AuthnResponse

This commit is contained in:
Roland Hedberg
2010-03-28 16:52:23 +02:00
parent a0ba1568ff
commit 66f57e821b
8 changed files with 342 additions and 565 deletions

View File

@@ -31,12 +31,12 @@ from saml2.utils import do_attributes, args2dict
from saml2 import samlp, saml, extension_element_to_element
from saml2 import VERSION, class_name, make_instance
from saml2.sigver import correctly_signed_response, decrypt
from saml2.sigver import pre_signature_part, sign_assertion_using_xmlsec
from saml2.sigver import sign_statement_using_xmlsec
from saml2.sigver import pre_signature_part
from saml2.sigver import security_context
from saml2.soap import SOAPClient
from saml2.attribute_converter import to_local
from saml2.authnresponse import authn_response
DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT
@@ -48,24 +48,6 @@ FORM_SPEC = """<form method="post" action="%s">
LAX = False
def _use_on_or_after(condition, slack):
now = time.mktime(time.gmtime())
not_on_or_after = time.mktime(str_to_time(condition.not_on_or_after))
if not_on_or_after < now + slack:
# To old ignore
raise Exception("To old can't use it")
return not_on_or_after
def _use_before(condition, slack):
not_before = time.mktime(str_to_time(condition.not_before))
now = time.mktime(time.gmtime())
if not_before > now + slack:
# Can't use it yet
raise Exception("Can't use it yet %s <= %s" % (not_before, now))
return True
class Saml2Client(object):
""" The basic pySAML2 service provider class """
@@ -79,7 +61,8 @@ class Saml2Client(object):
self.config = config
if "metadata" in config:
self.metadata = config["metadata"]
self.sc = security_context(config)
def _init_request(self, request, destination):
#request.id = sid()
request.version = VERSION
@@ -129,9 +112,10 @@ class Saml2Client(object):
if post.has_key("SAMLResponse"):
saml_response = post['SAMLResponse'].value
if saml_response:
return self.verify_response(saml_response, requestor,
outstanding, log,
context="AuthNReq")
ar = authn_response(self.conf, requestor, outstanding, log)
ar.loads(saml_response)
return ar.verify()
return None
def authn_request(self, query_id, destination, service_url, spentityid,
@@ -184,9 +168,8 @@ class Saml2Client(object):
request = make_instance(samlp.AuthnRequest, prel)
if sign:
return sign_statement_using_xmlsec("%s" % request, class_name(request),
self.config["xmlsec_binary"],
key_file=self.config["key_file"])
return self.sc.sign_statement_using_xmlsec("%s" % request,
class_name(request))
#return samlp.authn_request_from_string(sreq)
else:
return "%s" % request
@@ -195,8 +178,7 @@ class Saml2Client(object):
my_name="", relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT, log=None,
vorg="", scoping=None):
""" Either verifies an authentication Response or if none is present
send an authentication request.
""" Sends an authentication request.
:param spentityid: The SP EntityID
:param binding: How the authentication request should be sent to the
@@ -253,211 +235,6 @@ class Saml2Client(object):
raise Exception("Unkown binding type: %s" % binding)
return (session_id, response)
def verify_response(self, xml_response, requestor, outstanding=None,
log=None, decode=True, context="", lax=False):
""" Verify a response
:param xml_response: The response as a XML string
:param requestor: The hostname of the machine
:param outstanding: A collection of outstanding authentication requests
:param log: Where logging information should be sent
:param decode: There for testing purposes
:param lax: Accept things you normally shouldn't
:return: A 2-tuple consisting of an identity description and the
real relay-state
"""
if not outstanding:
outstanding = {}
if decode:
decoded_xml = base64.b64decode(xml_response)
else:
decoded_xml = xml_response
# own copy
xmlstr = decoded_xml[:]
log and log.info("verify correct signature")
response = correctly_signed_response(decoded_xml,
self.config["xmlsec_binary"], log=log)
if not response:
if log:
log.error("Response was not correctly signed")
log.info(decoded_xml)
return None
else:
log and log.error("Response was correctly signed or nor signed")
log and log.info("response: %s" % (response,))
try:
session_info = self.do_response(response,
requestor,
outstanding=outstanding,
xmlstr=xmlstr,
log=log, context=context,
lax=lax)
session_info["issuer"] = response.issuer.text
session_info["session_id"] = response.in_response_to
except AttributeError, exc:
if log:
log.error("AttributeError: %s" % (exc,))
else:
print >> sys.stderr, "AttributeError: %s" % (exc,)
return None
except Exception, exc:
if log:
log.error("Exception: %s" % (exc,))
else:
print >> sys.stderr, "Exception: %s" % (exc,)
return None
session_info["ava"]["__userid"] = session_info["name_id"]
return session_info
def _verify_condition(self, assertion, requestor, log, lax=False,
slack=0):
# The Identity Provider MUST include a <saml:Conditions> element
#print "Conditions",assertion.conditions
assert assertion.conditions
condition = assertion.conditions
log and log.info("condition: %s" % condition)
try:
slack = self.config["accept_time_diff"]
except KeyError:
slack = 0
try:
not_on_or_after = _use_on_or_after(condition, slack)
_use_before(condition, slack)
except Exception:
if not lax:
raise
else:
not_on_or_after = 0
if not for_me(condition, requestor):
if not LAX and not lax:
raise Exception("Not for me!!!")
return not_on_or_after
def _websso(self, assertion, _outstanding, _requestor, _log=None):
# the assertion MUST contain one AuthNStatement
assert len(assertion.authn_statement) == 1
# authn_statement = assertion.authn_statement[0]
# check authn_statement.session_index
def _assertion(self, assertion, outstanding, requestor, log, context,
lax):
if log:
log.info("assertion context: %s" % (context,))
log.info("assertion keys: %s" % (assertion.keyswv()))
log.info("outstanding: %s" % (outstanding))
if context == "AuthNReq":
self._websso(assertion, outstanding, requestor, log)
# The Identity Provider MUST include a <saml:Conditions> element
#print "Conditions",assertion.conditions
assert assertion.conditions
log and log.info("verify_condition")
not_on_or_after = self._verify_condition(assertion, requestor, log,
lax)
# The assertion can contain zero or one attributeStatements
if not assertion.attribute_statement:
ava = {}
else:
assert len(assertion.attribute_statement) == 1
ava = to_local(self.config.attribute_converters(),
assertion.attribute_statement[0])
log and log.info("AVA: %s" % (ava,))
# The assertion must contain a Subject
assert assertion.subject
subject = assertion.subject
for subject_confirmation in subject.subject_confirmation:
data = subject_confirmation.subject_confirmation_data
if data.in_response_to in outstanding:
came_from = outstanding[data.in_response_to]
del outstanding[data.in_response_to]
elif LAX:
came_from = ""
else:
print data.in_response_to, outstanding.keys()
raise Exception(
"Combination of session id and requestURI I don't recall")
# The subject must contain a name_id
assert subject.name_id
name_id = subject.name_id.text.strip()
return {"ava":ava, "name_id":name_id, "came_from":came_from,
"not_on_or_after":not_on_or_after}
def _encrypted_assertion(self, xmlstr, outstanding, requestor,
log=None, context=""):
log and log.debug("Decrypt message")
decrypt_xml = decrypt(xmlstr, self.config["key_file"],
self.config["xmlsec_binary"], log=log)
log and log.debug("Decryption successfull")
response = samlp.response_from_string(decrypt_xml)
log and log.debug("Parsed decrypted assertion successfull")
enc = response.encrypted_assertion[0].extension_elements[0]
assertion = extension_element_to_element(
enc,
saml.ELEMENT_FROM_STRING,
namespace=saml.NAMESPACE)
log and log.info("Decrypted Assertion: %s" % assertion)
return self._assertion(assertion, outstanding, requestor, log,
context)
def do_response(self, response, requestor, outstanding=None,
xmlstr="", log=None, context="", lax=False):
"""
Parse a response, verify that it is a response for me and
expected by me and that it is correct.
:param response: The response as a structure
:param requestor: The host (me) that asked for a AuthN response
:param outstanding: A dictionary with session ids as keys and request
URIs as values.
:param lax: Accept things you normally shouldn't
:result: A 2-tuple with attribute value assertions as a dictionary
as one part and the NameID as the other.
"""
if not outstanding:
outstanding = {}
if response.status:
status = response.status
if status.status_code.value != samlp.STATUS_SUCCESS:
log and log.info("Not successfull operation: %s" % status)
raise Exception(
"Not successfull according to: %s" % \
status.status_code.value)
# MUST contain *one* assertion
try:
assert len(response.assertion) == 1 or \
len(response.encrypted_assertion) == 1
except AssertionError:
raise Exception("No assertion part")
if response.assertion:
log and log.info("***Unencrypted response***")
return self._assertion(response.assertion[0], outstanding,
requestor, log, context, lax)
else:
log and log.info("***Encrypted response***")
return self._encrypted_assertion(xmlstr, outstanding,
requestor, log, context)
def create_attribute_query(self, session_id, subject_id, issuer,
destination, attribute=None, sp_name_qualifier=None,
@@ -504,9 +281,7 @@ class Saml2Client(object):
request = make_instance(samlp.AttributeQuery, prequery)
if sign:
signed_req = sign_assertion_using_xmlsec("%s" % request,
self.config["xmlsec_binary"],
key_file=self.config["key_file"])
signed_req = self.sc.sign_assertion_using_xmlsec("%s" % request)
return samlp.attribute_query_from_string(signed_req)
else:
@@ -596,12 +371,6 @@ class Saml2Client(object):
# ----------------------------------------------------------------------
def for_me(condition, myself ):
for restriction in condition.audience_restriction:
audience = restriction.audience
if audience.text.strip() == myself:
return True
ROW = """<tr><td>%s</td><td>%s</td></tr>"""
def _print_statement(statem):
@@ -637,4 +406,3 @@ def _print_statements(states):
def print_response(resp):
print _print_statement(resp)
print resp.to_string()

View File

@@ -112,6 +112,9 @@ class Config(dict):
return self
def xmlsec(self):
return self["xmlsec_binary"]
def services(self):
return self["service"].keys()

View File

@@ -372,6 +372,7 @@ class SubjectConfirmation(SamlBase):
extension_elements=None, extension_attributes=None):
"""Constructor for SubjectConfirmation
:param method: Subject confirmation method
:param base_id: Method attribute
:param name_id: NameID element
:param subject_confirmation_data: SubjectConfirmationData element

View File

@@ -31,7 +31,7 @@ from saml2.utils import OtherError, do_attribute_statement
from saml2.utils import VersionMismatch, UnknownPrincipal, UnsupportedBinding
from saml2.utils import status_from_exception_factory
from saml2.sigver import correctly_signed_authn_request
from saml2.sigver import security_context
from saml2.sigver import pre_signature_part
from saml2.time_util import instant, in_a_while
from saml2.config import Config
@@ -42,21 +42,26 @@ class UnknownVO(Exception):
pass
class Identifier(object):
""" A class that handles identifiers of objects """
def __init__(self, dbname, entityid, voconf=None, debug=0, log=None):
self.map = shelve.open(dbname,writeback=True)
self.entityid = entityid
self.voconf = voconf
self.debug = debug
self.log = log
def persistent(self, entity_id, subject_id):
""" Keeps the link between a permanent identifier and a
temporary/pseudo-temporary identifier for a subject
The store supports look-up both ways: from a permanent local
identifier to a identifier used talking to a SP and from an
identifier given back by an SP to the local permanent.
:param entity_id: SP entity ID or VO entity ID
:param subject_id: The local identifier of the subject
:param subject_id: The local permanent identifier of the subject
:return: An arbitrary identifier for the subject unique to the
entity_id
service/group of services/VO with a given entity_id
"""
if self.debug:
self.log and self.log.debug("Id map keys: %s" % self.map.keys())
@@ -104,12 +109,30 @@ class Identifier(object):
sp_name_qualifier=sp_name_qualifier)
def persistent_nameid(self, sp_name_qualifier, userid):
""" Get or create a persistent identifier for this object to be used
when communicating with servers using a specific SPNameQualifier
:param sp_name_qualifier: An identifier for a 'context'
:param userid: The local permanent identifier of the object
:return: A persistent random identifier.
"""
subj_id = self.persistent(sp_name_qualifier, userid)
return args2dict(subj_id, format=saml.NAMEID_FORMAT_PERSISTENT,
sp_name_qualifier=sp_name_qualifier)
def construct_nameid(self, local_policy, userid, sp_entity_id,
identity=None, name_id_policy=None):
""" Returns a name_id for the object. How the name_id is
constructed depends on the context.
:param local_policy: The policy the server is configured to follow
:param user: The local permanent identifier of the object
:param sp_entity_id: The 'user' of the name_id
:param identity: Attribute/value pairs describing the object
:param name_id_policy: The policy the server on the other side wants
us to follow.
:return: NameID instance precursor
"""
if name_id_policy and name_id_policy.sp_name_qualifier:
return self._get_vo_identifier(name_id_policy.sp_name_qualifier,
userid, identity)
@@ -121,6 +144,7 @@ class Identifier(object):
return self.temporary_nameid()
def temporary_nameid(self):
""" Returns a random one-time identifier """
return args2dict(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
@@ -137,12 +161,14 @@ class Server(object):
self.conf = config
self.metadata = self.conf["metadata"]
self.sc = security_context(self.conf, log)
if cache:
self.cache = Cache(cache)
else:
self.cache = Cache()
def load_config(self, config_file):
self.conf = Config()
self.conf.load_file(config_file)
if "subject_data" in self.conf:
@@ -153,8 +179,9 @@ class Server(object):
self.id = None
def issuer(self):
""" Return an Issuer precursor """
return args2dict( self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
format=saml.NAMEID_FORMAT_ENTITY)
def parse_authn_request(self, enc_request):
@@ -164,15 +191,14 @@ class Server(object):
:return: A dictionary with keys:
consumer_url - as gotten from the SPs entity_id and the metadata
id - the id of the request
name_id_policy - how to chose the subjects identifier
sp_entityid - the entity id of the SP
sp_entity_id - the entity id of the SP
request - The verified request
"""
response = {}
request_xml = decode_base64_and_inflate(enc_request)
try:
request = correctly_signed_authn_request(request_xml,
self.conf["xmlsec_binary"], log=self.log)
request = self.sc.correctly_signed_authn_request(request_xml)
if self.log and self.debug:
self.log.error("Request was correctly signed")
except Exception:
@@ -187,18 +213,20 @@ class Server(object):
if request.version != VERSION:
raise VersionMismatch(
"can't work with version %s" % request.version)
sp_entityid = request.issuer.text
sp_entity_id = request.issuer.text
# used to find return address in metadata
try:
consumer_url = self.metadata.consumer_url(sp_entityid)
consumer_url = self.metadata.consumer_url(sp_entity_id)
except KeyError:
self.log and self.log.info(
"entities: %s" % self.metadata.entity.keys())
raise UnknownPrincipal(sp_entityid)
raise UnknownPrincipal(sp_entity_id)
if not consumer_url: # what to do ?
raise UnsupportedBinding(sp_entityid)
raise UnsupportedBinding(sp_entity_id)
response["sp_entityid"] = sp_entityid
response["sp_entity_id"] = sp_entity_id
if consumer_url != return_destination:
# serious error on someones behalf
@@ -214,17 +242,30 @@ class Server(object):
return response
def wants(self, sp_entity_id):
"""
""" Returns what attributes this SP requiers and which are optional
if any such demands are registered in the Metadata.
:param sp_entity_id: The entity id of the SP
:return: 2-tuple, list of required and list of optional attributes
"""
return self.metadata.requests(sp_entity_id)
def parse_attribute_query(self, xml_string):
""" Parse an attribute query
:param xml_string: The Attribute Query as an XML string
:return: 3-Tuple containing:
subject - identifier of the subject
attribute - which attributes that the requestor wants back
query - the whole query
"""
query = samlp.attribute_query_from_string(xml_string)
# Check that it's
assert query.version == VERSION
self.log and self.log.info(
"%s ?= %s" % (query.destination, self.conf.aa_url))
# Is it for me ?
assert query.destination == self.conf.aa_url
# verify signature
@@ -235,10 +276,7 @@ class Server(object):
else:
attribute = None
return (subject, attribute, query)
def find_subject(self, subject, attribute=None):
pass
# ------------------------------------------------------------------------
def _response(self, consumer_url, in_response_to, sp_entity_id,

View File

@@ -19,7 +19,7 @@
Based on the use of xmlsec1 binaries and not the python xmlsec module.
"""
from saml2 import samlp
from saml2 import samlp, class_name, saml
import xmldsig as ds
from tempfile import NamedTemporaryFile
from subprocess import Popen, PIPE
@@ -44,33 +44,6 @@ _TEST_ = True
class SignatureError(Exception):
pass
def decrypt( enctext, key_file, xmlsec_binary, log=None):
""" Decrypting an encrypted text by the use of a private key.
:param input: The encrypted text as a string
:param key_file: The name of the key file
:param xmlsec_binary: Where on the computer the xmlsec binary is.
:param log: A reference to a logging instance.
:return: The decrypted text
"""
log and log.info("input len: %d" % len(enctext))
_, fil = make_temp("%s" % enctext, decode=False)
ntf = NamedTemporaryFile()
log and log.info("xmlsec binary: %s" % xmlsec_binary)
com_list = [xmlsec_binary, "--decrypt",
"--privkey-pem", key_file,
"--output", ntf.name,
"--id-attr:%s" % ID_ATTR,
ENC_NODE_NAME, fil]
log and log.info("Decrypt command: %s" % " ".join(com_list))
result = Popen(com_list, stderr=PIPE).communicate()
log and log.info("Decrypt result: %s" % (result,))
ntf.seek(0)
return ntf.read()
def create_id():
""" Create a string of 40 random characters from the set [a-p],
@@ -145,14 +118,9 @@ def _parse_xmlsec_output(output):
elif line == "FALSE":
return False
return False
def verify_signature_assertion(xmlsec_binary, enctext, cert_file):
return verify_signature(xmlsec_binary, enctext, cert_file,
"der",
"urn:oasis:names:tc:SAML:2.0:assertion:Assertion")
def verify_signature(xmlsec_binary, enctext, cert_file,
cert_type="der", node_name=NODE_NAME):
def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="",
node_name=NODE_NAME, debug=False):
""" Verifies the signature of a XML document.
:param xmlsec_binary: The xmlsec1 binaries to be used
@@ -160,6 +128,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file,
:param der_file: The public key that was used to sign the document
:return: Boolean True if the signature was correct otherwise False.
"""
_, fil = make_temp("%s" % enctext, decode=False)
com_list = [xmlsec_binary, "--verify",
@@ -167,7 +136,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file,
"--id-attr:%s" % ID_ATTR,
node_name, fil]
if _TEST_:
if debug:
try:
print " ".join(com_list)
except TypeError:
@@ -182,7 +151,7 @@ def verify_signature(xmlsec_binary, enctext, cert_file,
output = Popen(com_list, stderr=PIPE).communicate()[1]
verified = _parse_xmlsec_output(output)
if _TEST_:
if debug:
print output
print os.stat(cert_file)
print "Verify result: '%s'" % (verified,)
@@ -191,176 +160,229 @@ def verify_signature(xmlsec_binary, enctext, cert_file,
return verified
def correctly_signed_authn_request(decoded_xml, xmlsec_binary=XMLSEC_BINARY,
metadata=None, log=None, must=False):
""" Check if a request is correctly signed, if we have metadata for
the SP that sent the info use that, if not use the key that are in
the message if any.
:param decode_xml: The SAML message as a XML string
:param xmlsec_binary: Where the xmlsec1 binary can be found on this
system.
:param metadata: Metadata information
:return: None if the signature can not be verified otherwise
request as a samlp.Request instance
"""
request = samlp.authn_request_from_string(decoded_xml)
# ---------------------------------------------------------------------------
if not request.signature:
if must:
raise SignatureError("Missing must signature")
else:
return request
def security_context(conf, log=None):
if not conf:
return None
issuer = request.issuer.text.strip()
if metadata:
certs = metadata.certs(issuer)
else:
certs = []
try:
debug = conf["debug"]
except KeyError:
debug = 0
if not certs:
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(request)]
if not certs:
raise SignatureError("Missing signing certificate")
return SecurityContext(conf.xmlsec(), conf["key_file"], "pem",
conf["cert_file"], "pem", conf["metadata"],
log=log, debug=debug)
verified = False
for _, der_file in certs:
if verify_signature(xmlsec_binary, decoded_xml, der_file):
verified = True
break
if not verified:
raise SignatureError("Failed to verify signature")
class SecurityContext(object):
def __init__(self, xmlsec_binary, key_file="", key_type= "", cert_file="",
cert_type="", metadata=None, log=None, debug=False):
self.xmlsec = xmlsec_binary
self.key_file = key_file
self.cert_file = cert_file
self.cert_type = cert_type
self.metadata = metadata
self.log = log
self.debug = debug
return request
if self.debug and not self.log:
self.debug = 0
def correctly_signed(self, xml, must=False):
self.log and self.log.info("verify correct signature")
return self.correctly_signed_response(xml, must)
def decrypt(self, enctext):
""" Decrypting an encrypted text by the use of a private key.
:param enctext: The encrypted text as a string
:return: The decrypted text
"""
self.log and self.log.info("input len: %d" % len(enctext))
_, fil = make_temp("%s" % enctext, decode=False)
ntf = NamedTemporaryFile()
com_list = [self.xmlsec, "--decrypt",
"--privkey-pem", key_file,
"--output", ntf.name,
"--id-attr:%s" % ID_ATTR,
ENC_NODE_NAME, fil]
if self.debug:
self.log.debug("Decrypt command: %s" % " ".join(com_list))
result = Popen(com_list, stderr=PIPE).communicate()
if self.debug:
self.log.debug("Decrypt result: %s" % (result,))
ntf.seek(0)
return ntf.read()
def correctly_signed_response(decoded_xml,
xmlsec_binary=XMLSEC_BINARY, metadata=None, log=None, must=False):
""" Check if a instance is correctly signed, if we have metadata for
the IdP that sent the info use that, if not use the key that are in
the message if any.
:param decode_xml: The SAML message as a XML string
:param xmlsec_binary: Where the xmlsec1 binary can be found on this
system.
:param metadata: Metadata information
:return: None if the signature can not be verified otherwise an instance
"""
response = samlp.response_from_string(decoded_xml)
def verify_signature(self, enctext, cert_file=None, cert_type="pem",
node_name=NODE_NAME):
""" Verifies the signature of a XML document.
:param enctext: The XML document as a string
:param der_file: The public key that was used to sign the document
:return: Boolean True if the signature was correct otherwise False.
"""
if not cert_file:
cert_file = self.cert_file
cert_type = self.cert_type
return verify_signature(enctext, self.xmlsec, cert_file, cert_type,
node_name, True)
def correctly_signed_authn_request(self, decoded_xml, must=False):
""" Check if a request is correctly signed, if we have metadata for
the SP that sent the info use that, if not use the key that are in
the message if any.
:param decode_xml: The SAML message as a XML string
:param must: Whether there must be a signature
:return: None if the signature can not be verified otherwise
request as a samlp.Request instance
"""
request = samlp.authn_request_from_string(decoded_xml)
if not xmlsec_binary:
xmlsec_binary = XMLSEC_BINARY
# Try to find the signing cert in the assertion
for assertion in response.assertion:
if not assertion.signature:
if _TEST_:
log and log.info("unsigned")
if not request.signature:
if must:
raise SignatureError("Signature missing")
continue
else:
if _TEST_:
log and log.info("signed")
issuer = assertion.issuer.text.strip()
if _TEST_:
print "issuer: %s" % issuer
if metadata:
certs = metadata.certs(issuer)
raise SignatureError("Missing must signature")
else:
return request
issuer = request.issuer.text.strip()
if self.metadata:
certs = self.metadata.certs(issuer)
else:
certs = []
if _TEST_:
print "metadata certs: %s" % certs
if not certs:
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(assertion)]
for cert in cert_from_instance(request)]
if not certs:
raise SignatureError("Missing certificate")
raise SignatureError("Missing signing certificate")
verified = False
for _, der_file in certs:
if verify_signature(xmlsec_binary, decoded_xml, der_file):
if verify_signature(self.xmlsec, decoded_xml, der_file):
verified = True
break
if not verified:
raise SignatureError("Could not verify")
raise SignatureError("Failed to verify signature")
return response
return request
#----------------------------------------------------------------------------
# SIGNATURE PART
#----------------------------------------------------------------------------
def correctly_signed_response(self, decoded_xml, must=False):
""" Check if a instance is correctly signed, if we have metadata for
the IdP that sent the info use that, if not use the key that are in
the message if any.
def sign_statement_using_xmlsec(statement, xtype, xmlsec_binary, key=None,
:param decode_xml: The SAML message as a XML string
:param must: Whether there must be a signature
:return: None if the signature can not be verified otherwise an instance
"""
response = samlp.response_from_string(decoded_xml)
# Try to find the signing cert in the assertion
for assertion in response.assertion:
if not assertion.signature:
if self.debug:
self.log.debug("unsigned")
if must:
raise SignatureError("Signature missing")
continue
else:
if self.debug:
self.log.debug("signed")
issuer = assertion.issuer.text.strip()
if self.debug:
self.log.debug("issuer: %s" % issuer)
if self.metadata:
certs = self.metadata.certs(issuer)
else:
certs = []
if self.debug:
self.log.debug("metadata certs: %s" % certs)
if not certs:
certs = [make_temp("%s" % cert, ".der") \
for cert in cert_from_instance(assertion)]
if not certs:
raise SignatureError("Missing certificate")
verified = False
for _, der_file in certs:
if self.verify_signature(decoded_xml, der_file, "der"):
verified = True
break
if not verified:
raise SignatureError("Could not verify")
return response
#----------------------------------------------------------------------------
# SIGNATURE PART
#----------------------------------------------------------------------------
def sign_statement_using_xmlsec(self, statement, class_name, key=None,
key_file=None):
"""Sign a SAML statement using xmlsec.
:param statement: The statement to be signed
:param key: The key to be used for the signing, either this or
:param key_File: The file where the key can be found
:param xmlsec_binary: The xmlsec1 binaries used to do the signing.
:return: The signed statement
"""
"""Sign a SAML statement using xmlsec.
_, fil = make_temp("%s" % statement, decode=False)
if key:
_, key_file = make_temp("%s" % key, ".pem")
ntf = NamedTemporaryFile()
com_list = [xmlsec_binary, "--sign",
"--output", ntf.name,
"--privkey-pem", key_file,
"--id-attr:%s" % ID_ATTR,
xtype,
fil]
#print " ".join(com_list)
if Popen(com_list, stdout=PIPE).communicate()[0] == "":
ntf.seek(0)
return ntf.read()
else:
raise Exception("Signing failed")
def sign_assertion_using_xmlsec(statement, xmlsec_binary, key=None,
key_file=None):
"""Sign a SAML statement using xmlsec.
:param statement: The statement to be signed
:param key: The key to be used for the signing, either this or
:param key_File: The file where the key can be found
:param xmlsec_binary: The xmlsec1 binaries used to do the signing.
:return: The signed statement
"""
:param statement: The statement to be signed
:param key: The key to be used for the signing, either this or
:param key_File: The file where the key can be found
:return: The signed statement
"""
_, fil = make_temp("%s" % statement, decode=False)
if not key and not key_file:
key_file = self.key_file
_, fil = make_temp("%s" % statement, decode=False)
if key:
_, key_file = make_temp("%s" % key, ".pem")
ntf = NamedTemporaryFile()
com_list = [xmlsec_binary, "--sign",
"--output", ntf.name,
"--privkey-pem", key_file,
"--id-attr:%s" % ID_ATTR,
"urn:oasis:names:tc:SAML:2.0:assertion:Assertion",
fil]
if key:
_, key_file = make_temp("%s" % key, ".pem")
ntf = NamedTemporaryFile()
com_list = [self.xmlsec, "--sign",
"--output", ntf.name,
"--privkey-pem", key_file,
"--id-attr:%s" % ID_ATTR,
class_name,
fil]
#print " ".join(com_list)
if Popen(com_list, stdout=PIPE).communicate()[0] == "":
ntf.seek(0)
return ntf.read()
else:
raise Exception("Signing failed")
if Popen(com_list, stdout=PIPE).communicate()[0] == "":
ntf.seek(0)
return ntf.read()
else:
raise Exception("Signing failed")
def sign_assertion_using_xmlsec(self, statement, key=None, key_file=None):
"""Sign a SAML assertion using xmlsec.
:param statement: The statement to be signed
:param key: The key to be used for the signing, either this or
:param key_File: The file where the key can be found
:return: The signed statement
"""
return self.sign_statement_using_xmlsec( statement,
class_name(saml.Assertion()), key=None, key_file=None)
# ===========================================================================
PRE_SIGNATURE = {
"signed_info": {

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python
import os
from saml2 import sigver, make_instance
from saml2 import utils
from saml2 import time_util
@@ -16,60 +18,71 @@ PRIV_KEY = "test.key"
def _eq(l1,l2):
return set(l1) == set(l2)
def test_verify_1(xmlsec):
xml_response = open(SIGNED).read()
response = sigver.correctly_signed_response(xml_response, xmlsec)
assert response
def test_non_verify_1(xmlsec):
""" unsigned is OK if not good """
xml_response = open(UNSIGNED).read()
response = sigver.correctly_signed_response(xml_response, xmlsec)
assert response
def test_non_verify_2(xmlsec):
xml_response = open(FALSE_SIGNED).read()
raises(sigver.SignatureError,sigver.correctly_signed_response,
xml_response, xmlsec)
SIGNED_VALUE= """AS1kHHtA4eTOU2XLTWhLMSJQ6V+TSDymRoTF78CqjrYURNLk9wjdPjAReNn9eykv
ryFiHNk0p9wMBknha5pH8aeCI/LmcVhLa5xteGZrtE/Udh5vv8z4kRQX51Uz/5x8
ToiobGw83MEW6A0dRUn0O20NBMMTaFZZPXye7RvVlHY="""
DIGEST_VALUE = "WFRXmImfoO3M6JOLE6BGGpU9Ud0="
def test_sign(xmlsec):
ass = make_instance(saml.Assertion, {
"version": "2.0",
"id": "11111",
"issue_instant": "2009-10-30T13:20:28Z",
"signature": sigver.pre_signature_part("11111"),
"attribute_statement": {
"attribute": [{
"friendly_name": "surName",
"attribute_value": "Foo",
},
{
"friendly_name": "givenName",
"attribute_value": "Bar",
}
]
}
})
def get_xmlsec():
for path in os.environ["PATH"].split(":"):
fil = os.path.join(path, "xmlsec1")
if os.access(fil,os.X_OK):
return fil
raise Exception("Can't find xmlsec1")
class TestSecurity():
def setup_class(self):
self.sec = sigver.SecurityContext(get_xmlsec(), PRIV_KEY, "pem")
def test_verify_1(self):
xml_response = open(SIGNED).read()
response = self.sec.correctly_signed_response(xml_response)
assert response
def test_non_verify_1(self):
""" unsigned is OK if not good """
xml_response = open(UNSIGNED).read()
response = self.sec.correctly_signed_response(xml_response)
assert response
def test_non_verify_2(self):
xml_response = open(FALSE_SIGNED).read()
raises(sigver.SignatureError,self.sec.correctly_signed_response,
xml_response)
def test_sign(self):
ass = make_instance(saml.Assertion, {
"version": "2.0",
"id": "11111",
"issue_instant": "2009-10-30T13:20:28Z",
"signature": sigver.pre_signature_part("11111"),
"attribute_statement": {
"attribute": [{
"friendly_name": "surName",
"attribute_value": "Foo",
},
{
"friendly_name": "givenName",
"attribute_value": "Bar",
}
]
}
})
sign_ass = self.sec.sign_assertion_using_xmlsec("%s" % ass)
sass = saml.assertion_from_string(sign_ass)
print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
sig = sass.signature
assert sig.signature_value.text == SIGNED_VALUE
assert len(sig.signed_info.reference) == 1
assert len(sig.signed_info.reference[0].digest_value) == 1
assert sig.signed_info.reference[0].digest_value[0].text == DIGEST_VALUE
print ass
sign_ass = sigver.sign_assertion_using_xmlsec("%s" % ass, xmlsec,
key_file=PRIV_KEY)
sass = saml.assertion_from_string(sign_ass)
print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
sig = sass.signature
assert sig.signature_value.text == SIGNED_VALUE
assert len(sig.signed_info.reference) == 1
assert len(sig.signed_info.reference[0].digest_value) == 1
assert sig.signed_info.reference[0].digest_value[0].text == DIGEST_VALUE

View File

@@ -157,13 +157,13 @@ class TestServer1():
print authn_request
intermed = utils.deflate_and_base64_encode(authn_request)
response = self.server.parse_authn_request(intermed)
print response
assert response["consumer_url"] == "http://localhost:8087/"
assert response["id"] == "1"
name_id_policy = response["request"].name_id_policy
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert response["sp_entityid"] == "urn:mace:example.com:saml:roland:sp"
assert response["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response_with_identity(self):
name_id = self.server.id.temporary_nameid()

View File

@@ -4,9 +4,8 @@
from saml2.client import Saml2Client
from saml2 import samlp, client, BINDING_HTTP_POST
from saml2 import saml, utils, config, class_name, make_instance
from saml2.sigver import correctly_signed_authn_request, verify_signature
from saml2.sigver import correctly_signed_response
from saml2.server import Server
#from saml2.sigver import correctly_signed_authn_request, verify_signature
#from saml2.server import Server
XML_RESPONSE_FILE = "saml_signed.xml"
XML_RESPONSE_FILE2 = "saml2_response.xml"
@@ -48,17 +47,6 @@ REQ1 = """<?xml version='1.0' encoding='UTF-8'?>
class TestClient:
def setup_class(self):
server = Server("idp.config")
name_id = server.id.temporary_nameid()
self._resp_ = server.do_response(
"http://lingon.catalogix.se:8087/", # consumer_url
"12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
name_id = name_id
)
conf = config.Config()
try:
conf.load_file("tests/server.config")
@@ -66,58 +54,6 @@ class TestClient:
conf.load_file("server.config")
self.client = Saml2Client({},conf)
def test_verify_1(self):
xml_response = ("%s" % (self._resp_,)).split("\n")[1]
outstanding = {"12": "http://localhost:8088/sso"}
session_info = self.client.verify_response(xml_response,
"urn:mace:example.com:saml:roland:sp",
outstanding=outstanding,
decode=False)
del session_info["name_id"]
del session_info["not_on_or_after"]
del session_info["ava"]["__userid"]
print session_info
assert session_info == {'session_id': '12',
'came_from': 'http://localhost:8088/sso',
'ava': {'eduPersonEntitlement': ['Jeter'] },
'issuer': 'urn:mace:example.com:saml:roland:idp'}
def test_parse_1(self):
xml_response = ("%s" % (self._resp_,)).split("\n")[1]
response = correctly_signed_response(xml_response,
self.client.config["xmlsec_binary"])
outstanding = {"12": "http://localhost:8088/sso"}
session_info = self.client.do_response(response,
"urn:mace:example.com:saml:roland:sp",
outstanding=outstanding)
assert session_info["ava"]["eduPersonEntitlement"] == ["Jeter"]
def test_parse_2(self):
xml_response = open(XML_RESPONSE_FILE2).read()
response = samlp.response_from_string(xml_response)
outstanding = {"_ae0216740b5baa4b13c79ffdb2baa82572788fd9a3":
"http://localhost:8088/sso"}
session_info = self.client.do_response(response,
"xenosmilus.umdc.umu.se",
outstanding=outstanding,
lax=True)
assert session_info["ava"] == {'uid': ['andreas'],
'mobile': ['+4741107700'],
'edupersonnickname': ['erlang'],
'o': ['Feide RnD'],
'edupersonentitlement': [
'urn:mace:feide.no:entitlement:test'],
'edupersonaffiliation': ['employee'],
'eduPersonPrincipalName': ['andreas@rnd.feide.no'],
'sn': ['Solberg'],
'mail': ['andreas@uninett.no'],
'ou': ['Guests'],
'cn': ['Andreas Solberg']}
assert session_info["name_id"] == "_242f88493449e639aab95dd9b92b1d04234ab84fd8"
assert session_info.keys() == ['came_from', 'name_id', 'ava',
'not_on_or_after']
def test_create_attribute_query1(self):
req = self.client.create_attribute_query("1",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
@@ -294,8 +230,4 @@ class TestClient:
self.client.config["xmlsec_binary"],
self.client.config["metadata"])
except Exception: # missing certificate
verify_signature(self.client.config["xmlsec_binary"],
ar_str,
self.client.config["cert_file"],
cert_type="pem",
node_name=class_name(ar))
self.client.sc.verify_signature(ar_str, node_name=class_name(ar))