Files
deb-python-pysaml2/tests/test_server.py
2009-11-29 11:49:16 +01:00

326 lines
14 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2.server import Server
from saml2 import server
from saml2 import samlp, saml, client, utils
from saml2.utils import make_instance, OtherError
from saml2.utils import do_attribute_statement
from py.test import raises
import shelve
import re
def _eq(l1,l2):
return set(l1) == set(l2)
class TestServer():
def setup_class(self):
self.server = Server("tests/idp.config")
def test_issuer(self):
issuer = make_instance( saml.Issuer, self.server.issuer())
assert isinstance(issuer, saml.Issuer)
assert _eq(issuer.keyswv(), ["text","format"])
assert issuer.format == saml.NAMEID_FORMAT_ENTITY
assert issuer.text == self.server.conf["entityid"]
def test_assertion(self):
tmp = utils.kd_assertion(
subject= utils.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.kd_attribute_statement(
attribute=[
utils.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
utils.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
)
assertion = make_instance(saml.Assertion, tmp)
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
#
assert len(assertion.attribute_statement) == 1
attribute_statement = assertion.attribute_statement[0]
assert len(attribute_statement.attribute) == 2
attr0 = attribute_statement.attribute[0]
attr1 = attribute_statement.attribute[1]
if attr0.attribute_value[0].text == "Derek":
assert attr0.friendly_name == "givenName"
assert attr1.friendly_name == "surName"
assert attr1.attribute_value[0].text == "Jeter"
else:
assert attr1.friendly_name == "givenName"
assert attr1.attribute_value[0].text == "Derek"
assert attr0.friendly_name == "surName"
assert attr0.attribute_value[0].text == "Jeter"
#
subject = assertion.subject
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
tmp = utils.kd_response(
in_response_to="_012345",
destination="https:#www.example.com",
status=utils.kd_success_status(),
assertion=utils.kd_assertion(
subject = utils.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.kd_attribute_statement([
utils.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
utils.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
),
issuer=self.server.issuer(),
)
response = make_instance(samlp.Response, tmp)
print response.keyswv()
assert _eq(response.keyswv(),['destination', 'assertion','status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
status = response.status
print status
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
intermed = utils.deflate_and_base64_encode(authn_request)
# should raise an error because faulty spentityid
raises(OtherError,self.server.parse_authn_request,intermed)
def test_parse_faulty_request_to_err_status(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
intermed = utils.deflate_and_base64_encode(authn_request)
try:
self.server.parse_authn_request(intermed)
status = None
except OtherError, oe:
print oe.args
status = utils.make_instance(samlp.Status,
utils.kd_status_from_exception(oe))
assert status
print status
assert _eq(status.keyswv(), ["status_code", "status_message"])
assert status.status_message.text == (
'ConsumerURL and return destination mismatch')
status_code = status.status_code
assert _eq(status_code.keyswv(), ["status_code","value"])
assert status_code.value == samlp.STATUS_RESPONDER
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
sc = client.Saml2Client({},None)
authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://localhost:8087/",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
print authn_request
intermed = utils.deflate_and_base64_encode(authn_request)
response = self.server.parse_authn_request(intermed)
assert response["consumer_url"] == "http://localhost:8087/"
assert response["id"] == "1"
name_id_policy = response["request"].name_id_policy
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert response["sp_entityid"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response(self):
resp = self.server.do_sso_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"}
)
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
assert resp.status.status_code.value == samlp.STATUS_SUCCESS
assert resp.assertion
assert len(resp.assertion) == 1
assertion = resp.assertion[0]
assert len(assertion.authn_statement) == 1
assert assertion.conditions
assert len(assertion.attribute_statement) == 1
assert assertion.subject
assert assertion.subject.name_id
assert len(assertion.subject.subject_confirmation) == 1
confirmation = assertion.subject.subject_confirmation[0]
print confirmation.keyswv()
print confirmation.subject_confirmation_data
assert confirmation.subject_confirmation_data.in_response_to == "12"
def test_persistence_0(self):
pid1 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
pid2 = self.server.persistent_id(
"urn:mace:example.com:saml:roland:sp", "jeter")
print pid1, pid2
assert pid1 == pid2
def test_filter_ava_0(self):
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["givenName", "surName", "mail"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_1(self):
""" No mail address returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"givenName": None,
"surName": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_filter_ava_2(self):
""" Only mail returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": None,
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
def test_filter_ava_3(self):
""" Only example.com mail addresses returned """
self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"] = {
"lifetime": {"minutes": 5},
"attribute_restrictions":{
"mail": [re.compile(".*@example\.com$")],
}
}
print self.server.conf["service"]["idp"]["assertions"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
ava = self.server.filter_ava(ava,
"urn:mace:example.com:saml:roland:sp",
[], [], "idp")
assert _eq(ava.keys(), ["mail"])
assert ava["mail"] == ["dj@example.com"]
def test_authn_response_0(self):
# reset
del self.server.conf["service"]["idp"]["assertions"][
"urn:mace:example.com:saml:roland:sp"]
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = self.server.authn_response(ava,
"1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
utils.make_instance(samlp.NameIDPolicy,
utils.kd_name_id_policy(
format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true")),
"foba0001@example.com")
response = samlp.response_from_string("\n".join(resp_str))
print response.keyswv()
assert _eq(response.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant', 'version',
'issuer', 'id'])
print response.assertion[0].keyswv()
assert len(response.assertion) == 1
assert _eq(response.assertion[0].keyswv(), ['authn_statement',
'attribute_statement', 'subject', 'issue_instant',
'version', 'conditions', 'id'])
assertion = response.assertion[0]
assert len(assertion.attribute_statement) == 1
astate = assertion.attribute_statement[0]
print astate
assert len(astate.attribute) == 3