Test order is somewhat important

This commit is contained in:
Roland Hedberg
2010-03-12 15:33:07 +01:00
parent 75a4000856
commit 1d0bd12fed
5 changed files with 1616 additions and 0 deletions

75
tests/test_40_sigver.py Normal file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python
from saml2 import sigver
from saml2 import utils
from saml2 import time_util
from saml2 import saml
import xmldsig as ds
from py.test import raises
SIGNED = "saml_signed.xml"
UNSIGNED = "saml_unsigned.xml"
FALSE_SIGNED = "saml_false_signed.xml"
#PUB_KEY = "test.pem"
PRIV_KEY = "test.key"
def _eq(l1,l2):
return set(l1) == set(l2)
def test_verify_1(xmlsec):
xml_response = open(SIGNED).read()
response = sigver.correctly_signed_response(xml_response, xmlsec)
assert response
def test_non_verify_1(xmlsec):
""" unsigned is OK if not good """
xml_response = open(UNSIGNED).read()
response = sigver.correctly_signed_response(xml_response, xmlsec)
assert response
def test_non_verify_2(xmlsec):
xml_response = open(FALSE_SIGNED).read()
raises(sigver.SignatureError,sigver.correctly_signed_response,
xml_response, xmlsec)
SIGNED_VALUE= """Y88SEXrU3emeoaTgEqUKYAvDtWiLpPMx1sClw0GJV98O6A5QRvB14vNs8xnXNFFZ
XVjksKECcqmf10k/2C3oJfaEOaM4w0DgVLXeuJU08irXfdHcoe1g3276F1If1Kh7
63F7ihzh2ZeWV9OOO8tXofR9GCLIpPECbK+3/D4eEDY="""
DIGEST_VALUE = "9cQ0c72QfbQr1KkH9MCwL5Wm1EQ="
def test_sign(xmlsec):
ass = utils.make_instance(saml.Assertion, {
"version": "2.0",
"id": "11111",
"issue_instant": "2009-10-30T13:20:28Z",
"signature": sigver.pre_signature_part("11111"),
"attribute_statement": {
"attribute": [{
"friendly_name": "surName",
"attribute_value": "Foo",
},
{
"friendly_name": "givenName",
"attribute_value": "Bar",
}
]
}
})
print ass
sign_ass = sigver.sign_assertion_using_xmlsec("%s" % ass, xmlsec,
key_file=PRIV_KEY)
sass = saml.assertion_from_string(sign_ass)
print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
sig = sass.signature
assert sig.signature_value.text == SIGNED_VALUE
assert len(sig.signed_info.reference) == 1
assert len(sig.signed_info.reference[0].digest_value) == 1
assert sig.signed_info.reference[0].digest_value[0].text == DIGEST_VALUE

655
tests/test_41_xmldsig.py Normal file
View File

@@ -0,0 +1,655 @@
#!/usr/bin/python
#
# Copyright (C) 2007 SIOS Technology, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for xmldsig"""
__author__ = 'tmatsuo@example.com (Takashi MATSUO)'
import unittest
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
import ds_data
import xmldsig as ds
class TestObject:
def setup_class(self):
self.object = ds.Object()
def testAccessors(self):
"""Test for Object accessors"""
self.object.identifier = "object_id"
self.object.mime_type = "test/plain; charset=UTF-8"
self.object.encoding = ds.ENCODING_BASE64
new_object = ds.object_from_string(self.object.to_string())
assert new_object.identifier == "object_id"
assert new_object.mime_type == "test/plain; charset=UTF-8"
assert new_object.encoding == ds.ENCODING_BASE64
def testUsingTestData(self):
"""Test for object_from_string() using test data"""
new_object = ds.object_from_string(ds_data.TEST_OBJECT)
assert new_object.identifier == "object_id"
assert new_object.encoding == ds.ENCODING_BASE64
assert new_object.text.strip() == \
"V2VkIEp1biAgNCAxMjoxMTowMyBFRFQgMjAwMwo"
class TestMgmtData:
def setup_class(self):
self.mgmt_data = ds.MgmtData()
def testAccessors(self):
"""Test for MgmtData accessors"""
self.mgmt_data.text = "mgmt data"
new_mgmt_data = ds.mgmt_data_from_string(self.mgmt_data.to_string())
assert new_mgmt_data.text.strip() == "mgmt data"
def testUsingTestData(self):
"""Test for mgmt_data_from_string() using test data"""
new_mgmt_data = ds.mgmt_data_from_string(ds_data.TEST_MGMT_DATA)
assert new_mgmt_data.text.strip() == "mgmt data"
class TestSPKISexp:
def setup_class(self):
self.spki_sexp = ds.SPKISexp()
def testAccessors(self):
"""Test for SPKISexp accessors"""
self.spki_sexp.text = "spki sexp"
new_spki_sexp = ds.spki_sexp_from_string(self.spki_sexp.to_string())
assert new_spki_sexp.text.strip() == "spki sexp"
def testUsingTestData(self):
"""Test for spki_sexp_from_string() using test data"""
new_spki_sexp = ds.spki_sexp_from_string(ds_data.TEST_SPKI_SEXP)
assert new_spki_sexp.text.strip() == "spki sexp"
class TestSPKIData:
def setup_class(self):
self.spki_data = ds.SPKIData()
def testAccessors(self):
"""Test for SPKIData accessors"""
self.spki_data.spki_sexp.append(
ds.spki_sexp_from_string(ds_data.TEST_SPKI_SEXP))
new_spki_data = ds.spki_data_from_string(self.spki_data.to_string())
assert new_spki_data.spki_sexp[0].text.strip() == "spki sexp"
def testUsingTestData(self):
"""Test for spki_data_from_string() using test data"""
new_spki_data = ds.spki_data_from_string(ds_data.TEST_SPKI_DATA)
assert new_spki_data.spki_sexp[0].text.strip() == "spki sexp"
assert new_spki_data.spki_sexp[1].text.strip() == "spki sexp2"
class TestPGPData:
def setup_class(self):
self.pgp_data = ds.PGPData()
def testAccessors(self):
"""Test for PGPData accessors"""
self.pgp_data.pgp_key_id = ds.PGPKeyID(text="pgp key id")
self.pgp_data.pgp_key_packet = ds.PGPKeyPacket(text="pgp key packet")
new_pgp_data = ds.pgp_data_from_string(self.pgp_data.to_string())
assert isinstance(new_pgp_data.pgp_key_id, ds.PGPKeyID)
assert isinstance(new_pgp_data.pgp_key_packet, ds.PGPKeyPacket)
assert new_pgp_data.pgp_key_id.text.strip() == "pgp key id"
assert new_pgp_data.pgp_key_packet.text.strip() == "pgp key packet"
def testUsingTestData(self):
"""Test for pgp_data_from_string() using test data"""
new_pgp_data = ds.pgp_data_from_string(ds_data.TEST_PGP_DATA)
assert isinstance(new_pgp_data.pgp_key_id, ds.PGPKeyID)
assert isinstance(new_pgp_data.pgp_key_packet, ds.PGPKeyPacket)
assert new_pgp_data.pgp_key_id.text.strip() == "pgp key id"
assert new_pgp_data.pgp_key_packet.text.strip() == "pgp key packet"
class TestX509IssuerSerial:
def setup_class(self):
self.x509_issuer_serial = ds.X509IssuerSerial()
def testAccessors(self):
"""Test for X509IssuerSerial accessors"""
self.x509_issuer_serial.x509_issuer_name = ds.X509IssuerName(
text="issuer name")
self.x509_issuer_serial.x509_issuer_number = ds.X509IssuerNumber(text="1")
new_x509_issuer_serial = ds.x509_issuer_serial_from_string(
self.x509_issuer_serial.to_string())
assert new_x509_issuer_serial.x509_issuer_name.text.strip() == \
"issuer name"
assert new_x509_issuer_serial.x509_issuer_number.text.strip() == "1"
def testUsingTestData(self):
"""Test for x509_issuer_serial_from_string() using test data"""
new_x509_issuer_serial = ds.x509_issuer_serial_from_string(
ds_data.TEST_X509_ISSUER_SERIAL)
assert new_x509_issuer_serial.x509_issuer_name.text.strip() == \
"issuer name"
assert new_x509_issuer_serial.x509_issuer_number.text.strip() == "1"
class TestX509Data:
def setup_class(self):
self.x509_data = ds.X509Data()
def testAccessors(self):
"""Test for X509Data accessors"""
self.x509_data.x509_issuer_serial.append(ds.x509_issuer_serial_from_string(
ds_data.TEST_X509_ISSUER_SERIAL))
self.x509_data.x509_ski.append(ds.X509SKI(text="x509 ski"))
self.x509_data.x509_subject_name.append(ds.X509SubjectName(
text="x509 subject name"))
self.x509_data.x509_certificate.append(ds.X509Certificate(
text="x509 certificate"))
self.x509_data.x509_crl.append(ds.X509CRL(text="x509 crl"))
new_x509_data = ds.x509_data_from_string(self.x509_data.to_string())
assert isinstance(new_x509_data.x509_issuer_serial[0],
ds.X509IssuerSerial)
assert new_x509_data.x509_ski[0].text.strip() == "x509 ski"
assert isinstance(new_x509_data.x509_ski[0], ds.X509SKI)
assert new_x509_data.x509_subject_name[0].text.strip() == \
"x509 subject name"
assert isinstance(new_x509_data.x509_subject_name[0],
ds.X509SubjectName)
assert new_x509_data.x509_certificate[0].text.strip() == \
"x509 certificate"
assert isinstance(new_x509_data.x509_certificate[0],
ds.X509Certificate)
assert new_x509_data.x509_crl[0].text.strip() == "x509 crl"
assert isinstance(new_x509_data.x509_crl[0],ds.X509CRL)
def testUsingTestData(self):
"""Test for x509_data_from_string() using test data"""
new_x509_data = ds.x509_data_from_string(ds_data.TEST_X509_DATA)
assert isinstance(new_x509_data.x509_issuer_serial[0],
ds.X509IssuerSerial)
assert new_x509_data.x509_ski[0].text.strip() == "x509 ski"
assert isinstance(new_x509_data.x509_ski[0], ds.X509SKI)
assert new_x509_data.x509_subject_name[0].text.strip() == \
"x509 subject name"
assert isinstance(new_x509_data.x509_subject_name[0],
ds.X509SubjectName)
assert new_x509_data.x509_certificate[0].text.strip() == \
"x509 certificate"
assert isinstance(new_x509_data.x509_certificate[0],
ds.X509Certificate)
assert new_x509_data.x509_crl[0].text.strip() == "x509 crl"
assert isinstance(new_x509_data.x509_crl[0],ds.X509CRL)
class TestTransform:
def setup_class(self):
self.transform = ds.Transform()
def testAccessors(self):
"""Test for Transform accessors"""
self.transform.xpath.append(ds.XPath(text="xpath"))
self.transform.algorithm = ds.TRANSFORM_ENVELOPED
new_transform = ds.transform_from_string(self.transform.to_string())
assert isinstance(new_transform.xpath[0], ds.XPath)
assert new_transform.xpath[0].text.strip() == "xpath"
assert new_transform.algorithm == ds.TRANSFORM_ENVELOPED
def testUsingTestData(self):
"""Test for transform_from_string() using test data"""
new_transform = ds.transform_from_string(ds_data.TEST_TRANSFORM)
assert isinstance(new_transform.xpath[0], ds.XPath)
assert new_transform.xpath[0].text.strip() == "xpath"
assert new_transform.algorithm == ds.TRANSFORM_ENVELOPED
class TestTransforms:
def setup_class(self):
self.transforms = ds.Transforms()
def testAccessors(self):
"""Test for Transforms accessors"""
self.transforms.transform.append(
ds.transform_from_string(ds_data.TEST_TRANSFORM))
self.transforms.transform.append(
ds.transform_from_string(ds_data.TEST_TRANSFORM))
new_transforms = ds.transforms_from_string(self.transforms.to_string())
assert isinstance(new_transforms.transform[0], ds.Transform)
assert isinstance(new_transforms.transform[1], ds.Transform)
assert new_transforms.transform[0].algorithm == \
ds.TRANSFORM_ENVELOPED
assert new_transforms.transform[1].algorithm == \
ds.TRANSFORM_ENVELOPED
assert new_transforms.transform[0].xpath[0].text.strip() == "xpath"
assert new_transforms.transform[1].xpath[0].text.strip() == "xpath"
def testUsingTestData(self):
"""Test for transform_from_string() using test data"""
new_transforms = ds.transforms_from_string(ds_data.TEST_TRANSFORMS)
assert isinstance(new_transforms.transform[0], ds.Transform)
assert isinstance(new_transforms.transform[1], ds.Transform)
assert new_transforms.transform[0].algorithm == \
ds.TRANSFORM_ENVELOPED
assert new_transforms.transform[1].algorithm == \
ds.TRANSFORM_ENVELOPED
assert new_transforms.transform[0].xpath[0].text.strip() == "xpath"
assert new_transforms.transform[1].xpath[0].text.strip() == "xpath"
class TestRetrievalMethod:
def setup_class(self):
self.retrieval_method = ds.RetrievalMethod()
def testAccessors(self):
"""Test for RetrievalMethod accessors"""
self.retrieval_method.uri = "http://www.example.com/URI"
self.retrieval_method.type = "http://www.example.com/Type"
self.retrieval_method.transforms.append(ds.transforms_from_string(
ds_data.TEST_TRANSFORMS))
new_retrieval_method = ds.retrieval_method_from_string(
self.retrieval_method.to_string())
assert new_retrieval_method.uri == "http://www.example.com/URI"
assert new_retrieval_method.type == "http://www.example.com/Type"
assert isinstance(new_retrieval_method.transforms[0], ds.Transforms)
def testUsingTestData(self):
"""Test for retrieval_method_from_string() using test data"""
new_retrieval_method = ds.retrieval_method_from_string(
ds_data.TEST_RETRIEVAL_METHOD)
assert new_retrieval_method.uri == "http://www.example.com/URI"
assert new_retrieval_method.type == "http://www.example.com/Type"
assert isinstance(new_retrieval_method.transforms[0], ds.Transforms)
class TestRSAKeyValue:
def setup_class(self):
self.rsa_key_value = ds.RSAKeyValue()
def testAccessors(self):
"""Test for RSAKeyValue accessors"""
self.rsa_key_value.modulus = ds.Modulus(text="modulus")
self.rsa_key_value.exponent = ds.Exponent(text="exponent")
new_rsa_key_value = ds.rsa_key_value_from_string(self.rsa_key_value.to_string())
assert isinstance(new_rsa_key_value.modulus, ds.Modulus)
assert isinstance(new_rsa_key_value.exponent, ds.Exponent)
assert new_rsa_key_value.modulus.text.strip() == "modulus"
assert new_rsa_key_value.exponent.text.strip() == "exponent"
def testUsingTestData(self):
"""Test for rsa_key_value_from_string() using test data"""
new_rsa_key_value = ds.rsa_key_value_from_string(
ds_data.TEST_RSA_KEY_VALUE)
assert isinstance(new_rsa_key_value.modulus, ds.Modulus)
assert isinstance(new_rsa_key_value.exponent, ds.Exponent)
assert new_rsa_key_value.modulus.text.strip() == "modulus"
assert new_rsa_key_value.exponent.text.strip() == "exponent"
class TestDSAKeyValue:
def setup_class(self):
self.dsa_key_value = ds.DSAKeyValue()
def testAccessors(self):
"""Test for DSAKeyValue accessors"""
self.dsa_key_value.p = ds.DsP(text="p")
self.dsa_key_value.q = ds.DsQ(text="q")
self.dsa_key_value.g = ds.DsG(text="g")
self.dsa_key_value.y = ds.DsY(text="y")
self.dsa_key_value.j = ds.DsJ(text="j")
self.dsa_key_value.seed = ds.Seed(text="seed")
self.dsa_key_value.pgen_counter = ds.PgenCounter(text="pgen counter")
new_dsa_key_value = ds.dsa_key_value_from_string(self.dsa_key_value.to_string())
assert isinstance(new_dsa_key_value.p, ds.DsP)
assert isinstance(new_dsa_key_value.q, ds.DsQ)
assert isinstance(new_dsa_key_value.g, ds.DsG)
assert isinstance(new_dsa_key_value.y, ds.DsY)
assert isinstance(new_dsa_key_value.j, ds.DsJ)
assert isinstance(new_dsa_key_value.seed, ds.Seed)
assert isinstance(new_dsa_key_value.pgen_counter, ds.PgenCounter)
assert new_dsa_key_value.p.text.strip() == "p"
assert new_dsa_key_value.q.text.strip() == "q"
assert new_dsa_key_value.g.text.strip() == "g"
assert new_dsa_key_value.y.text.strip() == "y"
assert new_dsa_key_value.j.text.strip() == "j"
assert new_dsa_key_value.seed.text.strip() == "seed"
assert new_dsa_key_value.pgen_counter.text.strip() == "pgen counter"
def testUsingTestData(self):
"""Test for dsa_key_value_from_string() using test data"""
new_dsa_key_value = ds.dsa_key_value_from_string(
ds_data.TEST_DSA_KEY_VALUE)
assert isinstance(new_dsa_key_value.p, ds.DsP)
assert isinstance(new_dsa_key_value.q, ds.DsQ)
assert isinstance(new_dsa_key_value.g, ds.DsG)
assert isinstance(new_dsa_key_value.y, ds.DsY)
assert isinstance(new_dsa_key_value.j, ds.DsJ)
assert isinstance(new_dsa_key_value.seed, ds.Seed)
assert isinstance(new_dsa_key_value.pgen_counter, ds.PgenCounter)
assert new_dsa_key_value.p.text.strip() == "p"
assert new_dsa_key_value.q.text.strip() == "q"
assert new_dsa_key_value.g.text.strip() == "g"
assert new_dsa_key_value.y.text.strip() == "y"
assert new_dsa_key_value.j.text.strip() == "j"
assert new_dsa_key_value.seed.text.strip() == "seed"
assert new_dsa_key_value.pgen_counter.text.strip() == "pgen counter"
class TestKeyValue:
def setup_class(self):
self.key_value = ds.KeyValue()
def testAccessors(self):
"""Test for KeyValue accessors"""
self.key_value.dsa_key_value = ds.dsa_key_value_from_string(
ds_data.TEST_DSA_KEY_VALUE)
new_key_value = ds.key_value_from_string(self.key_value.to_string())
assert isinstance(new_key_value.dsa_key_value, ds.DSAKeyValue)
self.key_value.dsa_key_value = None
self.key_value.rsa_key_value = ds.rsa_key_value_from_string(
ds_data.TEST_RSA_KEY_VALUE)
new_key_value = ds.key_value_from_string(self.key_value.to_string())
assert isinstance(new_key_value.rsa_key_value, ds.RSAKeyValue)
def testUsingTestData(self):
"""Test for key_value_from_string() using test data"""
new_key_value = ds.key_value_from_string(ds_data.TEST_KEY_VALUE1)
assert isinstance(new_key_value.dsa_key_value, ds.DSAKeyValue)
self.key_value.dsa_key_value = None
self.key_value.rsa_key_value = ds.rsa_key_value_from_string(
ds_data.TEST_RSA_KEY_VALUE)
new_key_value = ds.key_value_from_string(ds_data.TEST_KEY_VALUE2)
assert isinstance(new_key_value.rsa_key_value, ds.RSAKeyValue)
class TestKeyName:
def setup_class(self):
self.key_name = ds.KeyName()
def testAccessors(self):
"""Test for KeyName accessors"""
self.key_name.text = "key name"
new_key_name = ds.key_name_from_string(self.key_name.to_string())
assert new_key_name.text.strip() == "key name"
def testUsingTestData(self):
"""Test for key_name_from_string() using test data"""
new_key_name = ds.key_name_from_string(ds_data.TEST_KEY_NAME)
assert new_key_name.text.strip() == "key name"
class TestKeyInfo:
def setup_class(self):
self.key_info = ds.KeyInfo()
def testAccessors(self):
"""Test for KeyInfo accessors"""
self.key_info.key_name.append(
ds.key_name_from_string(ds_data.TEST_KEY_NAME))
self.key_info.key_value.append(
ds.key_value_from_string(ds_data.TEST_KEY_VALUE1))
self.key_info.retrieval_method.append(
ds.retrieval_method_from_string(ds_data.TEST_RETRIEVAL_METHOD))
self.key_info.x509_data.append(
ds.x509_data_from_string(ds_data.TEST_X509_DATA))
self.key_info.pgp_data.append(
ds.pgp_data_from_string(ds_data.TEST_PGP_DATA))
self.key_info.spki_data.append(
ds.spki_data_from_string(ds_data.TEST_SPKI_DATA))
self.key_info.mgmt_data.append(
ds.mgmt_data_from_string(ds_data.TEST_MGMT_DATA))
self.key_info.identifier = "id"
new_key_info = ds.key_info_from_string(self.key_info.to_string())
assert isinstance(new_key_info.key_name[0], ds.KeyName)
assert isinstance(new_key_info.key_value[0], ds.KeyValue)
assert isinstance(new_key_info.retrieval_method[0],
ds.RetrievalMethod)
assert isinstance(new_key_info.x509_data[0], ds.X509Data)
assert isinstance(new_key_info.pgp_data[0], ds.PGPData)
assert isinstance(new_key_info.spki_data[0], ds.SPKIData)
assert isinstance(new_key_info.mgmt_data[0], ds.MgmtData)
assert new_key_info.identifier == "id"
def testUsingTestData(self):
"""Test for key_info_from_string() using test data"""
new_key_info = ds.key_info_from_string(ds_data.TEST_KEY_INFO)
assert isinstance(new_key_info.key_name[0], ds.KeyName)
assert isinstance(new_key_info.key_value[0], ds.KeyValue)
assert isinstance(new_key_info.retrieval_method[0],
ds.RetrievalMethod)
assert isinstance(new_key_info.x509_data[0], ds.X509Data)
assert isinstance(new_key_info.pgp_data[0], ds.PGPData)
assert isinstance(new_key_info.spki_data[0], ds.SPKIData)
assert isinstance(new_key_info.mgmt_data[0], ds.MgmtData)
assert new_key_info.identifier == "id"
class TestDigestValue:
def setup_class(self):
self.digest_value = ds.DigestValue()
def testAccessors(self):
"""Test for DigestValue accessors"""
self.digest_value.text = "digest value"
new_digest_value = ds.digest_value_from_string(self.digest_value.to_string())
assert new_digest_value.text.strip() == "digest value"
def testUsingTestData(self):
"""Test for digest_value_from_string() using test data"""
new_digest_value = ds.digest_value_from_string(ds_data.TEST_DIGEST_VALUE)
assert new_digest_value.text.strip() == "digest value"
class TestDigestMethod:
def setup_class(self):
self.digest_method = ds.DigestMethod()
def testAccessors(self):
"""Test for DigestMethod accessors"""
self.digest_method.algorithm = ds.DIGEST_SHA1
new_digest_method = ds.digest_method_from_string(
self.digest_method.to_string())
assert new_digest_method.algorithm == ds.DIGEST_SHA1
def testUsingTestData(self):
"""Test for digest_method_from_string() using test data"""
new_digest_method = ds.digest_method_from_string(
ds_data.TEST_DIGEST_METHOD)
assert new_digest_method.algorithm == ds.DIGEST_SHA1
class TestReference:
def setup_class(self):
self.reference = ds.Reference()
def testAccessors(self):
"""Test for Reference accessors"""
self.reference.transforms.append(ds.transforms_from_string(
ds_data.TEST_TRANSFORMS))
self.reference.digest_method.append(ds.digest_method_from_string(
ds_data.TEST_DIGEST_METHOD))
self.reference.digest_value.append(ds.digest_value_from_string(
ds_data.TEST_DIGEST_VALUE))
self.reference.identifier = "id"
self.reference.uri = "http://www.example.com/URI"
self.reference.type = "http://www.example.com/Type"
new_reference = ds.reference_from_string(self.reference.to_string())
assert isinstance(new_reference.transforms[0], ds.Transforms)
assert isinstance(new_reference.digest_method[0], ds.DigestMethod)
assert isinstance(new_reference.digest_value[0], ds.DigestValue)
assert new_reference.identifier == "id"
assert new_reference.uri == "http://www.example.com/URI"
assert new_reference.type == "http://www.example.com/Type"
def testUsingTestData(self):
"""Test for reference_from_string() using test data"""
new_reference = ds.reference_from_string(ds_data.TEST_REFERENCE)
assert isinstance(new_reference.transforms[0], ds.Transforms)
assert isinstance(new_reference.digest_method[0], ds.DigestMethod)
assert isinstance(new_reference.digest_value[0], ds.DigestValue)
assert new_reference.identifier == "id"
assert new_reference.uri == "http://www.example.com/URI"
assert new_reference.type == "http://www.example.com/Type"
class TestSignatureMethod:
def setup_class(self):
self.signature_method = ds.SignatureMethod()
def testAccessors(self):
"""Test for SignatureMethod accessors"""
self.signature_method.algorithm = ds.SIG_RSA_SHA1
self.signature_method.hmac_output_length = ds.HMACOutputLength(text="8")
new_signature_method = ds.signature_method_from_string(
self.signature_method.to_string())
assert isinstance(new_signature_method.hmac_output_length,
ds.HMACOutputLength)
assert new_signature_method.hmac_output_length.text.strip() == "8"
assert new_signature_method.algorithm == ds.SIG_RSA_SHA1
def testUsingTestData(self):
"""Test for signature_method_from_string() using test data"""
new_signature_method = ds.signature_method_from_string(
ds_data.TEST_SIGNATURE_METHOD)
assert isinstance(new_signature_method.hmac_output_length,
ds.HMACOutputLength)
assert new_signature_method.hmac_output_length.text.strip() == "8"
assert new_signature_method.algorithm == ds.SIG_RSA_SHA1
class TestCanonicalizationMethod:
def setup_class(self):
self.canonicalization_method = ds.CanonicalizationMethod()
def testAccessors(self):
"""Test for CanonicalizationMethod accessors"""
self.canonicalization_method.algorithm = ds.C14N_WITH_C
new_canonicalization_method = ds.canonicalization_method_from_string(
self.canonicalization_method.to_string())
assert new_canonicalization_method.algorithm == ds.C14N_WITH_C
def testUsingTestData(self):
"""Test for canonicalization_method_from_string() using test data"""
new_canonicalization_method = ds.canonicalization_method_from_string(
ds_data.TEST_CANONICALIZATION_METHOD)
assert new_canonicalization_method.algorithm == ds.C14N_WITH_C
class TestSignedInfo:
def setup_class(self):
self.si = ds.SignedInfo()
def testAccessors(self):
"""Test for SignedInfo accessors"""
self.si.identifier = "id"
self.si.canonicalization_method = ds.canonicalization_method_from_string(
ds_data.TEST_CANONICALIZATION_METHOD)
self.si.signature_method = ds.signature_method_from_string(
ds_data.TEST_SIGNATURE_METHOD)
self.si.reference.append(ds.reference_from_string(
ds_data.TEST_REFERENCE))
new_si = ds.signed_info_from_string(self.si.to_string())
assert new_si.identifier == "id"
assert isinstance(new_si.canonicalization_method,
ds.CanonicalizationMethod)
assert isinstance(new_si.signature_method, ds.SignatureMethod)
assert isinstance(new_si.reference[0], ds.Reference)
def testUsingTestData(self):
"""Test for signed_info_from_string() using test data"""
new_si = ds.signed_info_from_string(ds_data.TEST_SIGNED_INFO)
assert new_si.identifier == "id"
assert isinstance(new_si.canonicalization_method,
ds.CanonicalizationMethod)
assert isinstance(new_si.signature_method, ds.SignatureMethod)
assert isinstance(new_si.reference[0], ds.Reference)
class TestSignatureValue:
def setup_class(self):
self.signature_value = ds.SignatureValue()
def testAccessors(self):
"""Test for SignatureValue accessors"""
self.signature_value.identifier = "id"
self.signature_value.text = "signature value"
new_signature_value = ds.signature_value_from_string(
self.signature_value.to_string())
assert new_signature_value.identifier == "id"
assert new_signature_value.text.strip() == "signature value"
def testUsingTestData(self):
"""Test for signature_value_from_string() using test data"""
new_signature_value = ds.signature_value_from_string(
ds_data.TEST_SIGNATURE_VALUE)
assert new_signature_value.identifier == "id"
assert new_signature_value.text.strip() == "signature value"
class TestSignature:
def setup_class(self):
self.signature = ds.Signature()
def testAccessors(self):
"""Test for Signature accessors"""
self.signature.identifier = "id"
self.signature.signed_info = ds.signed_info_from_string(
ds_data.TEST_SIGNED_INFO)
self.signature.signature_value = ds.signature_value_from_string(
ds_data.TEST_SIGNATURE_VALUE)
self.signature.key_info = ds.key_info_from_string(ds_data.TEST_KEY_INFO)
self.signature.object.append(ds.object_from_string(ds_data.TEST_OBJECT))
new_signature = ds.signature_from_string(self.signature.to_string())
assert new_signature.identifier == "id"
assert isinstance(new_signature.signed_info, ds.SignedInfo)
assert isinstance(new_signature.signature_value, ds.SignatureValue)
assert isinstance(new_signature.key_info, ds.KeyInfo)
assert isinstance(new_signature.object[0], ds.Object)
def testUsingTestData(self):
"""Test for signature_value_from_string() using test data"""
new_signature = ds.signature_from_string(ds_data.TEST_SIGNATURE)
assert new_signature.identifier == "id"
assert isinstance(new_signature.signed_info, ds.SignedInfo)
assert isinstance(new_signature.signature_value, ds.SignatureValue)
assert isinstance(new_signature.key_info, ds.KeyInfo)
assert isinstance(new_signature.object[0], ds.Object)
if __name__ == '__main__':
unittest.main()

175
tests/test_42_xmlenc.py Normal file
View File

@@ -0,0 +1,175 @@
import saml2
import xmlenc
import xmldsig
data1 = """<?xml version='1.0'?>
<EncryptedData xmlns='http://www.w3.org/2001/04/xmlenc#'
MimeType='text/xml'>
<CipherData>
<CipherValue>A23B45C56</CipherValue>
</CipherData>
</EncryptedData>"""
def test_1():
ed = xmlenc.encrypted_data_from_string(data1)
assert ed
assert ed.mime_type == "text/xml"
assert len(ed.cipher_data) == 1
cd = ed.cipher_data[0]
assert len(cd.cipher_value) == 1
assert cd.cipher_value[0].text == "A23B45C56"
data2 = """<EncryptedData xmlns='http://www.w3.org/2001/04/xmlenc#'
Type='http://www.w3.org/2001/04/xmlenc#Element'>
<EncryptionMethod
Algorithm='http://www.w3.org/2001/04/xmlenc#tripledes-cbc'/>
<ds:KeyInfo xmlns:ds='http://www.w3.org/2000/09/xmldsig#'>
<ds:KeyName>John Smith</ds:KeyName>
</ds:KeyInfo>
<CipherData><CipherValue>DEADBEEF</CipherValue></CipherData>
</EncryptedData>"""
def test_2():
ed = xmlenc.encrypted_data_from_string(data2)
assert ed
print ed
assert ed.typ == "http://www.w3.org/2001/04/xmlenc#Element"
assert len(ed.encryption_method) == 1
em = ed.encryption_method[0]
assert em.algorithm == 'http://www.w3.org/2001/04/xmlenc#tripledes-cbc'
assert len(ed.key_info) == 1
ki = ed.key_info[0]
assert ki.key_name[0].text == "John Smith"
assert len(ed.cipher_data) == 1
cd = ed.cipher_data[0]
assert len(cd.cipher_value) == 1
assert cd.cipher_value[0].text == "DEADBEEF"
data3 = """<EncryptedData Id='ED'
xmlns='http://www.w3.org/2001/04/xmlenc#'>
<EncryptionMethod
Algorithm='http://www.w3.org/2001/04/xmlenc#aes128-cbc'/>
<ds:KeyInfo xmlns:ds='http://www.w3.org/2000/09/xmldsig#'>
<ds:RetrievalMethod URI='#EK'
Type="http://www.w3.org/2001/04/xmlenc#EncryptedKey"/>
<ds:KeyName>Sally Doe</ds:KeyName>
</ds:KeyInfo>
<CipherData><CipherValue>DEADBEEF</CipherValue></CipherData>
</EncryptedData>"""
def test_3():
ed = xmlenc.encrypted_data_from_string(data3)
assert ed
print ed
assert len(ed.encryption_method) == 1
em = ed.encryption_method[0]
assert em.algorithm == 'http://www.w3.org/2001/04/xmlenc#aes128-cbc'
assert len(ed.key_info) == 1
ki = ed.key_info[0]
assert ki.key_name[0].text == "Sally Doe"
assert len(ki.retrieval_method) == 1
rm = ki.retrieval_method[0]
assert rm.uri == "#EK"
assert rm.type == "http://www.w3.org/2001/04/xmlenc#EncryptedKey"
assert len(ed.cipher_data) == 1
cd = ed.cipher_data[0]
assert len(cd.cipher_value) == 1
assert cd.cipher_value[0].text == "DEADBEEF"
data4 = """<EncryptedKey Id='EK' xmlns='http://www.w3.org/2001/04/xmlenc#'>
<EncryptionMethod
Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-1_5"/>
<ds:KeyInfo xmlns:ds='http://www.w3.org/2000/09/xmldsig#'>
<ds:KeyName>John Smith</ds:KeyName>
</ds:KeyInfo>
<CipherData><CipherValue>xyzabc</CipherValue></CipherData>
<ReferenceList>
<DataReference URI='#ED'/>
</ReferenceList>
<CarriedKeyName>Sally Doe</CarriedKeyName>
</EncryptedKey>"""
def test_4():
ek = xmlenc.encrypted_key_from_string(data4)
assert ek
print ek
assert len(ek.encryption_method) == 1
em = ek.encryption_method[0]
assert em.algorithm == 'http://www.w3.org/2001/04/xmlenc#rsa-1_5'
assert len(ek.key_info) == 1
ki = ek.key_info[0]
assert ki.key_name[0].text == "John Smith"
assert len(ek.reference_list) == 1
rl = ek.reference_list[0]
assert len(rl.data_reference)
dr = rl.data_reference[0]
assert dr.uri == "#ED"
assert len(ek.cipher_data) == 1
cd = ek.cipher_data[0]
assert len(cd.cipher_value) == 1
assert cd.cipher_value[0].text == "xyzabc"
data5 = """<CipherReference URI="http://www.example.com/CipherValues.xml"
xmlns="http://www.w3.org/2001/04/xmlenc#">
<Transforms xmlns:ds='http://www.w3.org/2000/09/xmldsig#'>
<ds:Transform
Algorithm="http://www.w3.org/TR/1999/REC-xpath-19991116">
<ds:XPath xmlns:rep="http://www.example.org/repository">
self::text()[parent::rep:CipherValue[@Id="example1"]]
</ds:XPath>
</ds:Transform>
<ds:Transform Algorithm="http://www.w3.org/2000/09/xmldsig#base64"/>
</Transforms>
</CipherReference>"""
def test_5():
cr = xmlenc.cipher_reference_from_string(data5)
assert cr
print cr
assert len(cr.transforms) == 1
trs = cr.transforms[0]
assert len(trs.transform) == 2
tr = trs.transform[0]
assert tr.algorithm in ["http://www.w3.org/TR/1999/REC-xpath-19991116",
"http://www.w3.org/2000/09/xmldsig#base64"]
if tr.algorithm == "http://www.w3.org/2000/09/xmldsig#base64":
pass
elif tr.algorithm == "http://www.w3.org/TR/1999/REC-xpath-19991116":
assert len(tr.xpath) == 1
xp = tr.xpath[0]
assert xp.text.strip() == """self::text()[parent::rep:CipherValue[@Id="example1"]]"""
data6 = """<ReferenceList xmlns="http://www.w3.org/2001/04/xmlenc#">
<DataReference URI="#invoice34">
<ds:Transforms xmlns:ds='http://www.w3.org/2000/09/xmldsig#'>
<ds:Transform Algorithm="http://www.w3.org/TR/1999/REC-xpath-19991116">
<ds:XPath xmlns:xenc="http://www.w3.org/2001/04/xmlenc#">
self::xenc:EncryptedData[@Id="example1"]
</ds:XPath>
</ds:Transform>
</ds:Transforms>
</DataReference>
</ReferenceList>"""
def test_6():
rl = xmlenc.reference_list_from_string(data6)
assert rl
print rl
assert len(rl.data_reference) == 1
dr = rl.data_reference[0]
assert dr.uri == "#invoice34"
assert len(dr.extension_elements) == 1
ee = dr.extension_elements[0]
assert ee.tag == "Transforms"
assert ee.namespace == "http://www.w3.org/2000/09/xmldsig#"
trs = saml2.extension_element_to_element(ee, xmldsig.ELEMENT_FROM_STRING,
namespace=xmldsig.NAMESPACE)
assert trs
assert len(trs.transform) == 1
tr = trs.transform[0]
assert tr.algorithm == "http://www.w3.org/TR/1999/REC-xpath-19991116"
assert len(tr.xpath) == 1
assert tr.xpath[0].text.strip() == """self::xenc:EncryptedData[@Id="example1"]"""

413
tests/test_50_server.py Normal file
View File

@@ -0,0 +1,413 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2.server import Server
from saml2 import server
from saml2 import samlp, saml, client, utils
from saml2.utils import make_instance, OtherError
from saml2.utils import do_attribute_statement
from py.test import raises
import shelve
import re
def _eq(l1,l2):
return set(l1) == set(l2)
class TestServer1():
def setup_class(self):
try:
self.server = Server("idp.config")
except IOError, e:
self.server = Server("tests/idp.config")
def test_issuer(self):
issuer = make_instance( saml.Issuer, self.server.issuer())
assert isinstance(issuer, saml.Issuer)
assert _eq(issuer.keyswv(), ["text","format"])
assert issuer.format == saml.NAMEID_FORMAT_ENTITY
assert issuer.text == self.server.conf["entityid"]
def test_assertion(self):
tmp = utils.assertion_factory(
subject= utils.args2dict("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.args2dict(
attribute=[
utils.args2dict(attribute_value="Derek",
friendly_name="givenName"),
utils.args2dict(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
)
assertion = make_instance(saml.Assertion, tmp)
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
#
assert len(assertion.attribute_statement) == 1
attribute_statement = assertion.attribute_statement[0]
assert len(attribute_statement.attribute) == 2
attr0 = attribute_statement.attribute[0]
attr1 = attribute_statement.attribute[1]
if attr0.attribute_value[0].text == "Derek":
assert attr0.friendly_name == "givenName"
assert attr1.friendly_name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr1.friendly_name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
assert attr0.friendly_name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
#
subject = assertion.subject
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
tmp = utils.response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=utils.success_status_factory(),
assertion=utils.assertion_factory(
subject = utils.args2dict("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.args2dict([
utils.args2dict(attribute_value="Derek",
friendly_name="givenName"),
utils.args2dict(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
),
issuer=self.server.issuer(),
)
response = make_instance(samlp.Response, tmp)
print response.keyswv()
assert _eq(response.keyswv(),['destination', 'assertion','status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
status = response.status
print status
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
intermed = utils.deflate_and_base64_encode(authn_request)
# should raise an error because faulty spentityid
raises(OtherError,self.server.parse_authn_request,intermed)
def test_parse_faulty_request_to_err_status(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
intermed = utils.deflate_and_base64_encode(authn_request)
try:
self.server.parse_authn_request(intermed)
status = None
except OtherError, oe:
print oe.args
status = utils.make_instance(samlp.Status,
utils.status_from_exception_factory(oe))
assert status
print status
assert _eq(status.keyswv(), ["status_code", "status_message"])
assert status.status_message.text == (
'ConsumerURL and return destination mismatch')
status_code = status.status_code
assert _eq(status_code.keyswv(), ["status_code","value"])
assert status_code.value == samlp.STATUS_RESPONDER
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://localhost:8087/",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
print authn_request
intermed = utils.deflate_and_base64_encode(authn_request)
response = self.server.parse_authn_request(intermed)
assert response["consumer_url"] == "http://localhost:8087/"
assert response["id"] == "1"
name_id_policy = response["request"].name_id_policy
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert response["sp_entityid"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response_with_identity(self):
resp = self.server.do_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{ "eduPersonEntitlement": "Bat"}
)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
assert resp.status.status_code.value == samlp.STATUS_SUCCESS
assert resp.assertion
assert len(resp.assertion) == 1
assertion = resp.assertion[0]
assert len(assertion.authn_statement) == 1
assert assertion.conditions
assert len(assertion.attribute_statement) == 1
assert assertion.subject
assert assertion.subject.name_id
assert len(assertion.subject.subject_confirmation) == 1
confirmation = assertion.subject.subject_confirmation[0]
print confirmation.keyswv()
print confirmation.subject_confirmation_data
assert confirmation.subject_confirmation_data.in_response_to == "12"
def test_sso_response_without_identity(self):
resp = self.server.do_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
assert resp.status.status_code.value == samlp.STATUS_SUCCESS
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert not resp.assertion
def test_sso_failure_response(self):
exc = utils.MissingValue("eduPersonAffiliation missing")
resp = self.server.error_response( "http://localhost:8087/", "12",
"urn:mace:example.com:saml:roland:sp", exc )
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to',
'issue_instant', 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
print resp.status
assert resp.status.status_code.value == samlp.STATUS_RESPONDER
assert resp.status.status_code.status_code.value == \
samlp.STATUS_REQUEST_UNSUPPORTED
assert resp.status.status_message.text == \
"eduPersonAffiliation missing"
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert not resp.assertion
def test_persistence_0(self):
pid1 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
pid2 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
print pid1, pid2
assert pid1 == pid2
def test_filter_ava_0(self):
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["givenName", "surName", "mail"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_1(self):
""" No mail address returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"givenName": None,
"surName": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_filter_ava_2(self):
""" Only mail returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_3(self):
""" Only example.com mail addresses returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": [re.compile(".*@example\.com$")],
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [])
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["dj@example.com"]
def test_authn_response_0(self):
# reset
del self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = self.server.authn_response(ava,
"1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
utils.make_instance(samlp.NameIDPolicy,
utils.args2dict(
format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true")),
"foba0001@example.com")
response = samlp.response_from_string("\n".join(resp_str))
print response.keyswv()
assert _eq(response.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant', 'version',
'issuer', 'id'])
print response.assertion[0].keyswv()
assert len(response.assertion) == 1
assert _eq(response.assertion[0].keyswv(), ['authn_statement',
'attribute_statement', 'subject', 'issue_instant',
'version', 'conditions', 'id'])
assertion = response.assertion[0]
assert len(assertion.attribute_statement) == 1
astate = assertion.attribute_statement[0]
print astate
assert len(astate.attribute) == 3
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"],
"mail": ["foo@gmail.com"]}
class TestServer2():
def setup_class(self):
try:
self.server = Server("restrictive_idp.config")
except IOError, e:
self.server = Server("tests/restrictive_idp.config")
def test_0(self):
ident = self.server.restrict_ava(IDENTITY.copy(),
"urn:mace:example.com:saml:roland:sp")
assert len(ident) == 3
assert ident == {'eduPersonAffiliation': ['staff'],
'givenName': ['Derek'], 'surName': ['Jeter']}
print self.server.conf.keys()
attr = utils.ava_to_attributes(ident, self.server.conf["am_backward"])
assert len(attr) == 3
assert {'attribute_value': [{'text': 'staff'}],
'friendly_name': 'eduPersonAffiliation',
'name': 'urn:oid:1.3.6.1.4.1.5923.1.1.1.1',
'name_format': 'urn:oasis:names:tc:SAML:2.0:attrname-format:uri'} in attr
def test_do_aa_reponse(self):
response = self.server.do_aa_response( "http://example.com/sp/", "aaa",
"urn:mace:example.com:sp:1", IDENTITY.copy(),
issuer = self.server.conf["entityid"])
assert response != None
assert response.destination == "http://example.com/sp/"
assert response.in_response_to == "aaa"
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idpr"
assert response.status.status_code.value == samlp.STATUS_SUCCESS
assert len(response.assertion) == 1
assertion = response.assertion[0]
assert assertion.version == "2.0"
subject = assertion.subject
assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
assert len(subject.subject_confirmation) == 1
subject_confirmation = subject.subject_confirmation[0]
assert subject_confirmation.subject_confirmation_data.in_response_to == "aaa"

298
tests/test_51_client.py Normal file
View File

@@ -0,0 +1,298 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2.client import Saml2Client
from saml2 import samlp, client, BINDING_HTTP_POST
from saml2 import saml, utils, config, class_name
from saml2.sigver import correctly_signed_authn_request, verify_signature
from saml2.sigver import correctly_signed_response
from saml2.server import Server
XML_RESPONSE_FILE = "saml_signed.xml"
XML_RESPONSE_FILE2 = "saml2_response.xml"
import os
def for_me(condition, me ):
for restriction in condition.audience_restriction:
audience = restriction.audience
if audience.text.strip() == me:
return True
def ava(attribute_statement):
result = {}
for attribute in attribute_statement.attribute:
# Check name_format ??
name = attribute.name.strip()
result[name] = []
for value in attribute.attribute_value:
result[name].append(value.text.strip())
return result
# def test_parse_3():
# xml_response = open(XML_RESPONSE_FILE3).read()
# response = samlp.response_from_string(xml_response)
# client = Saml2Client({})
# (ava, name_id, real_uri) = \
# client.do_response(response, "xenosmilus.umdc.umu.se")
# print 40*"="
# print ava
# print 40*","
# print name_id
# assert False
REQ1 = """<?xml version='1.0' encoding='UTF-8'?>
<ns0:AttributeQuery Destination="https://idp.example.com/idp/" ID="1" IssueInstant="%s" Version="2.0" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns1:Issuer xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion">http://vo.example.com/sp1</ns1:Issuer><ns1:Subject xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion"><ns1:NameID Format="urn:oasis:names:tc:SAML:2.0:nameid-format:persistent">E8042FB4-4D5B-48C3-8E14-8EDD852790DD</ns1:NameID></ns1:Subject></ns0:AttributeQuery>"""
class TestClient:
def setup_class(self):
server = Server("idp.config")
self._resp_ = server.do_response(
"http://lingon.catalogix.se:8087/", # consumer_url
"12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"}
)
conf = config.Config()
try:
conf.load_file("tests/server.config")
except IOError:
conf.load_file("server.config")
self.client = Saml2Client({},conf)
def test_verify_1(self):
xml_response = ("%s" % (self._resp_,)).split("\n")[1]
outstanding = {"12": "http://localhost:8088/sso"}
session_info = self.client.verify_response(xml_response,
"urn:mace:example.com:saml:roland:sp",
outstanding=outstanding,
decode=False)
del session_info["name_id"]
del session_info["not_on_or_after"]
del session_info["ava"]["__userid"]
print session_info
assert session_info == {'session_id': '12',
'came_from': 'http://localhost:8088/sso',
'ava': {'eduPersonEntitlement': ['Jeter'] },
'issuer': 'urn:mace:example.com:saml:roland:idp'}
def test_parse_1(self):
xml_response = ("%s" % (self._resp_,)).split("\n")[1]
response = correctly_signed_response(xml_response,
self.client.config["xmlsec_binary"])
outstanding = {"12": "http://localhost:8088/sso"}
session_info = self.client.do_response(response,
"urn:mace:example.com:saml:roland:sp",
outstanding=outstanding)
assert session_info["ava"]["eduPersonEntitlement"] == ["Jeter"]
def test_parse_2(self):
xml_response = open(XML_RESPONSE_FILE2).read()
response = samlp.response_from_string(xml_response)
outstanding = {"_ae0216740b5baa4b13c79ffdb2baa82572788fd9a3":
"http://localhost:8088/sso"}
session_info = self.client.do_response(response,
"xenosmilus.umdc.umu.se",
outstanding=outstanding,
lax=True)
assert session_info["ava"] == {'uid': ['andreas'],
'mobile': ['+4741107700'],
'edupersonnickname': ['erlang'],
'o': ['Feide RnD'],
'edupersonentitlement': [
'urn:mace:feide.no:entitlement:test'],
'edupersonaffiliation': ['employee'],
'eduPersonPrincipalName': ['andreas@rnd.feide.no'],
'sn': ['Solberg'],
'mail': ['andreas@uninett.no'],
'ou': ['Guests'],
'cn': ['Andreas Solberg']}
assert session_info["name_id"] == "_242f88493449e639aab95dd9b92b1d04234ab84fd8"
assert session_info.keys() == ['came_from', 'name_id', 'ava',
'not_on_or_after']
def test_create_attribute_query1(self):
req = self.client.create_attribute_query("1",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
"http://vo.example.com/sp1",
"https://idp.example.com/idp/",
nameformat=saml.NAMEID_FORMAT_PERSISTENT)
str = "%s" % req.to_string()
print str
assert str == REQ1 % req.issue_instant
assert req.destination == "https://idp.example.com/idp/"
assert req.id == "1"
assert req.version == "2.0"
subject = req.subject
name_id = subject.name_id
assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
issuer = req.issuer
assert issuer.text == "http://vo.example.com/sp1"
def test_create_attribute_query2(self):
req = self.client.create_attribute_query("1",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
"http://vo.example.com/sp1",
"https://idp.example.com/idp/",
attribute={
("urn:oid:2.5.4.42",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"givenName"):None,
("urn:oid:2.5.4.4",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"surname"):None,
("urn:oid:1.2.840.113549.1.9.1",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"):None,
},
nameformat=saml.NAMEID_FORMAT_PERSISTENT)
print req.to_string()
assert req.destination == "https://idp.example.com/idp/"
assert req.id == "1"
assert req.version == "2.0"
subject = req.subject
name_id = subject.name_id
assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
assert len(req.attribute) == 3
# one is givenName
seen = []
for attribute in req.attribute:
if attribute.name == "urn:oid:2.5.4.42":
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "givenName"
seen.append("givenName")
elif attribute.name == "urn:oid:2.5.4.4":
assert attribute.name_format == saml.NAME_FORMAT_URI
assert attribute.friendly_name == "surname"
seen.append("surname")
elif attribute.name == "urn:oid:1.2.840.113549.1.9.1":
assert attribute.name_format == saml.NAME_FORMAT_URI
if getattr(attribute,"friendly_name"):
assert False
seen.append("email")
assert set(seen) == set(["givenName","surname","email"])
def test_create_attribute_query_3(self):
req = self.client.create_attribute_query("1",
"_e7b68a04488f715cda642fbdd90099f5",
"urn:mace:umu.se:saml/rolandsp",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
nameformat=saml.NAMEID_FORMAT_TRANSIENT )
assert isinstance(req, samlp.AttributeQuery)
assert req.destination == "https://aai-demo-idp.switch.ch/idp/shibboleth"
assert req.id == "1"
assert req.version == "2.0"
assert req.issue_instant
assert req.issuer.text == "urn:mace:umu.se:saml/rolandsp"
nameid = req.subject.name_id
assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT
assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"
def test_attribute_query(self):
req = self.client.attribute_query(
"_e7b68a04488f715cda642fbdd90099f5",
"urn:mace:umu.se:saml/rolandsp",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
format=saml.NAMEID_FORMAT_TRANSIENT)
# since no one is answering on the other end
assert req == None
def test_idp_entry(self):
idp_entry = utils.make_instance( samlp.IDPEntry,
self.client.idp_entry(name="Umeå Universitet",
location="https://idp.umu.se/"))
assert idp_entry.name == "Umeå Universitet"
assert idp_entry.loc == "https://idp.umu.se/"
def test_scope(self):
scope = utils.make_instance(samlp.Scoping, self.client.scoping(
[self.client.idp_entry(name="Umeå Universitet",
location="https://idp.umu.se/")]))
assert scope.idp_list
assert len(scope.idp_list.idp_entry) == 1
idp_entry = scope.idp_list.idp_entry[0]
assert idp_entry.name == "Umeå Universitet"
assert idp_entry.loc == "https://idp.umu.se/"
def test_create_auth_request_0(self):
ar_str = self.client.authn_request("1",
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name")
ar = samlp.authn_request_from_string(ar_str)
print ar
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
def test_create_auth_request_vo(self):
assert self.client.config["virtual_organization"].keys() == [
"urn:mace:example.com:it:tek"]
ar_str = self.client.authn_request("1",
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name",
vorg="urn:mace:example.com:it:tek")
ar = samlp.authn_request_from_string(ar_str)
print ar
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
def test_sign_auth_request_0(self):
#print self.client.config
ar_str = self.client.authn_request("1",
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name", sign=True)
ar = samlp.authn_request_from_string(ar_str)
assert ar
assert ar.signature
assert ar.signature.signature_value
signed_info = ar.signature.signed_info
#print signed_info
assert len(signed_info.reference) == 1
assert signed_info.reference[0].uri == "#1"
assert signed_info.reference[0].digest_value
print "------------------------------------------------"
try:
assert correctly_signed_authn_request(ar_str,
self.client.config["xmlsec_binary"],
self.client.config["metadata"])
except Exception: # missing certificate
verify_signature(self.client.config["xmlsec_binary"],
ar_str,
self.client.config["cert_file"],
cert_type="pem",
node_name=class_name(ar))