Updated tests to follow the changed way of handling authentication context.

This commit is contained in:
Roland Hedberg
2013-04-28 16:59:48 +02:00
parent ee42c42b13
commit 872a266333
14 changed files with 198 additions and 123 deletions

View File

@@ -541,8 +541,7 @@ class Server(Entity):
in_response_to, destination, in_response_to, destination,
sp_entity_id, name_id_policy=None, sp_entity_id, name_id_policy=None,
userid=None, name_id=None, authn=None, userid=None, name_id=None, authn=None,
authn_decl=None, issuer=None, issuer=None, sign_response=False,
sign_response=False,
sign_assertion=False, **kwargs): sign_assertion=False, **kwargs):
# ---------------------------------------- # ----------------------------------------
@@ -560,7 +559,7 @@ class Server(Entity):
response = self.create_authn_response(identity, in_response_to, response = self.create_authn_response(identity, in_response_to,
destination, sp_entity_id, destination, sp_entity_id,
name_id_policy, userid, name_id, name_id_policy, userid, name_id,
authn, authn_decl, issuer, authn, issuer,
sign_response, sign_assertion) sign_response, sign_assertion)
body = soapenv.Body() body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)] body.extension_elements = [element_to_extension_element(response)]

View File

@@ -1,5 +1,5 @@
from urlparse import parse_qs from urlparse import parse_qs
from saml2.saml import AUTHN_PASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import attribute_query_from_string, logout_request_from_string from saml2.samlp import attribute_query_from_string, logout_request_from_string
from saml2 import BINDING_HTTP_REDIRECT, pack from saml2 import BINDING_HTTP_REDIRECT, pack
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
@@ -16,6 +16,13 @@ TYP = {
"POST": [BINDING_HTTP_POST, BINDING_SOAP] "POST": [BINDING_HTTP_POST, BINDING_SOAP]
} }
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def unpack_form(_str, ver="SAMLRequest"): def unpack_form(_str, ver="SAMLRequest"):
SR_STR = "name=\"%s\" value=\"" % ver SR_STR = "name=\"%s\" value=\"" % ver
RS_STR = 'name="RelayState" value="' RS_STR = 'name="RelayState" value="'
@@ -32,7 +39,8 @@ def unpack_form(_str, ver="SAMLRequest"):
rs = _str[k:l] rs = _str[k:l]
return {ver:sr, "RelayState":rs} return {ver: sr, "RelayState": rs}
class DummyResponse(object): class DummyResponse(object):
def __init__(self, code, data, headers=None): def __init__(self, code, data, headers=None):
@@ -40,6 +48,7 @@ class DummyResponse(object):
self.text = data self.text = data
self.headers = headers or [] self.headers = headers or []
class FakeIDP(Server): class FakeIDP(Server):
def __init__(self, config_file=""): def __init__(self, config_file=""):
Server.__init__(self, config_file) Server.__init__(self, config_file)
@@ -106,14 +115,13 @@ class FakeIDP(Server):
except Exception: except Exception:
raise raise
identity = { "surName":"Hedberg", "givenName": "Roland", identity = {"surName": "Hedberg", "givenName": "Roland",
"title": "supertramp", "mail": "roland@example.com"} "title": "supertramp", "mail": "roland@example.com"}
userid = "Pavill" userid = "Pavill"
authn_resp = self.create_authn_response(identity, authn_resp = self.create_authn_response(identity,
userid=userid, userid=userid,
authn=(AUTHN_PASSWORD, authn=AUTHN,
"http://www.example.com/login"),
**resp_args) **resp_args)
response = "%s" % authn_resp response = "%s" % authn_resp
@@ -131,12 +139,13 @@ class FakeIDP(Server):
aquery = attribute_query_from_string(_str) aquery = attribute_query_from_string(_str)
extra = {"eduPersonAffiliation": "faculty"} extra = {"eduPersonAffiliation": "faculty"}
userid = "Pavill" #userid = "Pavill"
name_id = aquery.subject.name_id name_id = aquery.subject.name_id
attr_resp = self.create_attribute_response(extra, aquery.id, attr_resp = self.create_attribute_response(extra, aquery.id,
None, None,
sp_entity_id=aquery.issuer.text, sp_entity_id=aquery.issuer
.text,
name_id=name_id, name_id=name_id,
attributes=aquery.attribute) attributes=aquery.attribute)
@@ -144,11 +153,11 @@ class FakeIDP(Server):
# SOAP packing # SOAP packing
#headers = {"content-type": "application/soap+xml"} #headers = {"content-type": "application/soap+xml"}
soap_message = make_soap_enveloped_saml_thingy(attr_resp) soap_message = make_soap_enveloped_saml_thingy(attr_resp)
# if self.sign and self.sec: # if self.sign and self.sec:
# _signed = self.sec.sign_statement_using_xmlsec(soap_message, # _signed = self.sec.sign_statement_using_xmlsec(soap_message,
# class_name(attr_resp), # class_name(attr_resp),
# nodeid=attr_resp.id) # nodeid=attr_resp.id)
# soap_message = _signed # soap_message = _signed
response = "%s" % soap_message response = "%s" % soap_message
else: # Just POST else: # Just POST
response = "%s" % attr_resp response = "%s" % attr_resp

View File

@@ -3,6 +3,7 @@
from saml2 import saml from saml2 import saml
from saml2 import config from saml2 import config
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.server import Server from saml2.server import Server
from saml2.response import response_factory from saml2.response import response_factory
@@ -15,24 +16,37 @@ from pytest import raises
XML_RESPONSE_FILE = "saml_signed.xml" XML_RESPONSE_FILE = "saml_signed.xml"
XML_RESPONSE_FILE2 = "saml2_response.xml" XML_RESPONSE_FILE2 = "saml2_response.xml"
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2) return set(l1) == set(l2)
IDENTITY = {"eduPersonAffiliation": ["staff", "member"], IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"], "surName": ["Jeter"], "givenName": ["Derek"],
"mail": ["foo@gmail.com"], "mail": ["foo@gmail.com"],
"title": ["shortstop"]} "title": ["shortstop"]}
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
class TestResponse: class TestResponse:
def setup_class(self): def setup_class(self):
server = Server("idp_conf") server = Server("idp_conf")
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp","id12") "urn:mace:example.com:saml:roland:sp", "id12")
self._resp_ = server.create_authn_response(IDENTITY, self._resp_ = server.create_authn_response(IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
# consumer_url
"urn:mace:example"
".com:saml:roland:sp",
# sp_entity_id
name_id=name_id) name_id=name_id)
self._sign_resp_ = server.create_authn_response( self._sign_resp_ = server.create_authn_response(
@@ -40,7 +54,7 @@ class TestResponse:
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id, name_id=name_id,
sign_assertion=True) sign_assertion=True)
self._resp_authn = server.create_authn_response( self._resp_authn = server.create_authn_response(
@@ -48,9 +62,8 @@ class TestResponse:
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id, name_id=name_id,
authn=(saml.AUTHN_PASSWORD, authn=AUTHN)
"http://www.example.com/login"))
conf = config.SPConfig() conf = config.SPConfig()
conf.load_file("server_conf") conf.load_file("server_conf")
@@ -60,7 +73,8 @@ class TestResponse:
xml_response = ("%s" % (self._resp_,)) xml_response = ("%s" % (self._resp_,))
resp = response_factory(xml_response, self.conf, resp = response_factory(xml_response, self.conf,
return_addr="http://lingon.catalogix.se:8087/", return_addr="http://lingon.catalogix.se:8087/",
outstanding_queries={"id12": "http://localhost:8088/sso"}, outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False) timeslack=10000, decode=False)
assert isinstance(resp, StatusResponse) assert isinstance(resp, StatusResponse)
@@ -70,7 +84,8 @@ class TestResponse:
xml_response = self._sign_resp_ xml_response = self._sign_resp_
resp = response_factory(xml_response, self.conf, resp = response_factory(xml_response, self.conf,
return_addr="http://lingon.catalogix.se:8087/", return_addr="http://lingon.catalogix.se:8087/",
outstanding_queries={"id12": "http://localhost:8088/sso"}, outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False) timeslack=10000, decode=False)
assert isinstance(resp, StatusResponse) assert isinstance(resp, StatusResponse)

View File

@@ -36,6 +36,7 @@ example = """<Envelope xmlns="http://schemas.xmlsoap.org/soap/envelope/">
</Envelope> </Envelope>
""" """
def test_parse_soap_envelope(): def test_parse_soap_envelope():
envelope = ElementTree.fromstring(example) envelope = ElementTree.fromstring(example)
assert envelope.tag == '{%s}Envelope' % NAMESPACE assert envelope.tag == '{%s}Envelope' % NAMESPACE
@@ -48,6 +49,7 @@ def test_parse_soap_envelope():
assert saml_part.tag == '{%s}Response' % SAMLP_NAMESPACE assert saml_part.tag == '{%s}Response' % SAMLP_NAMESPACE
# {http://schemas.xmlsoap.org/soap/envelope/}Envelope # {http://schemas.xmlsoap.org/soap/envelope/}Envelope
def test_make_soap_envelope(): def test_make_soap_envelope():
envelope = ElementTree.Element('') envelope = ElementTree.Element('')
envelope.tag = '{%s}Envelope' % NAMESPACE envelope.tag = '{%s}Envelope' % NAMESPACE

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from saml2.saml import AUTHN_PASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2 import saml from saml2 import saml
from saml2.server import Server from saml2.server import Server
@@ -12,7 +12,8 @@ from pathutils import dotname, full_path
XML_RESPONSE_FILE = full_path("saml_signed.xml") XML_RESPONSE_FILE = full_path("saml_signed.xml")
XML_RESPONSE_FILE2 = full_path("saml2_response.xml") XML_RESPONSE_FILE2 = full_path("saml2_response.xml")
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2) return set(l1) == set(l2)
IDENTITY = {"eduPersonAffiliation": ["staff", "member"], IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
@@ -20,36 +21,41 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"mail": ["foo@gmail.com"], "mail": ["foo@gmail.com"],
"title": ["shortstop"]} "title": ["shortstop"]}
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
class TestAuthnResponse: class TestAuthnResponse:
def setup_class(self): def setup_class(self):
server = Server(dotname("idp_conf")) server = Server(dotname("idp_conf"))
name_id = server.ident.transient_nameid( name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp","id12") "urn:mace:example.com:saml:roland:sp","id12")
authn = (AUTHN_PASSWORD, "http://www.example.com/login")
self._resp_ = server.create_authn_response( self._resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id, name_id=name_id,
authn=authn) authn=AUTHN)
self._sign_resp_ = server.create_authn_response( self._sign_resp_ = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id, sign_assertion=True, name_id=name_id, sign_assertion=True,
authn=authn) authn=AUTHN)
self._resp_authn = server.create_authn_response( self._resp_authn = server.create_authn_response(
IDENTITY, IDENTITY,
"id12", # in_response_to "id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id = name_id, name_id=name_id,
authn=authn) authn=AUTHN)
self.conf = config_factory("sp", dotname("server_conf")) self.conf = config_factory("sp", dotname("server_conf"))
self.conf.only_use_keys_in_metadata = False self.conf.only_use_keys_in_metadata = False
@@ -115,7 +121,7 @@ class TestAuthnResponse:
assert len(self.ar.assertion.authn_statement) == 1 assert len(self.ar.assertion.authn_statement) == 1
authn_info = self.ar.authn_info() authn_info = self.ar.authn_info()
assert len(authn_info) == 1 assert len(authn_info) == 1
assert authn_info[0][0] == saml.AUTHN_PASSWORD assert authn_info[0][0] == INTERNETPROTOCOLPASSWORD
assert authn_info[0][1] == ["http://www.example.com/login"] assert authn_info[0][1] == ["http://www.example.com/login"]
session_info = self.ar.session_info() session_info = self.ar.session_info()
assert session_info["authn_info"] == authn_info assert session_info["authn_info"] == authn_info

View File

@@ -2,7 +2,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64 import base64
from urlparse import parse_qs from urlparse import parse_qs
from saml2.saml import AUTHN_PASSWORD, NameID, NAMEID_FORMAT_TRANSIENT from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import response_from_string from saml2.samlp import response_from_string
from saml2.server import Server from saml2.server import Server
@@ -21,6 +22,11 @@ from py.test import raises
nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="123456") text="123456")
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def _eq(l1, l2): def _eq(l1, l2):
return set(l1) == set(l2) return set(l1) == set(l2)
@@ -192,7 +198,7 @@ class TestServer1():
"http://localhost:8087/", # destination "http://localhost:8087/", # destination
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id, name_id=name_id,
authn=(AUTHN_PASSWORD, "http://www.example.com/login") authn=AUTHN
) )
print resp.keyswv() print resp.keyswv()
@@ -239,7 +245,7 @@ class TestServer1():
"http://localhost:8087/", # consumer_url "http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id "urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1", userid="USER1",
authn=(AUTHN_PASSWORD, "http://www.example.com/login") authn=AUTHN
) )
print resp.keyswv() print resp.keyswv()
@@ -287,8 +293,7 @@ class TestServer1():
resp_str = "%s" % self.server.create_authn_response( resp_str = "%s" % self.server.create_authn_response(
ava, "id1", "http://local:8087/", ava, "id1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp", npolicy, "urn:mace:example.com:saml:roland:sp", npolicy,
"foba0001@example.com", authn=(AUTHN_PASSWORD, "foba0001@example.com", authn=AUTHN)
"http://www.example.com/login"))
response = samlp.response_from_string(resp_str) response = samlp.response_from_string(resp_str)
print response.keyswv() print response.keyswv()

View File

@@ -4,6 +4,7 @@
import base64 import base64
import urllib import urllib
import urlparse import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.response import LogoutResponse from saml2.response import LogoutResponse
from saml2.client import Saml2Client from saml2.client import Saml2Client
@@ -12,7 +13,6 @@ from saml2 import saml, config, class_name
from saml2.config import SPConfig from saml2.config import SPConfig
from saml2.saml import NAMEID_FORMAT_PERSISTENT from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import AUTHN_PASSWORD
from saml2.saml import NameID from saml2.saml import NameID
from saml2.server import Server from saml2.server import Server
from saml2.time_util import in_a_while from saml2.time_util import in_a_while
@@ -21,6 +21,12 @@ from py.test import raises
from fakeIDP import FakeIDP, unpack_form from fakeIDP import FakeIDP, unpack_form
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def for_me(condition, me ): def for_me(condition, me ):
for restriction in condition.audience_restriction: for restriction in condition.audience_restriction:
audience = restriction.audience audience = restriction.audience
@@ -59,7 +65,6 @@ REQ1 = { "1.2.14": """<?xml version='1.0' encoding='UTF-8'?>
"1.2.16":"""<?xml version='1.0' encoding='UTF-8'?> "1.2.16":"""<?xml version='1.0' encoding='UTF-8'?>
<ns0:AttributeQuery xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol" xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion" Destination="https://idp.example.com/idp/" ID="id1" IssueInstant="%s" Version="2.0"><ns1:Issuer Format="urn:oasis:names:tc:SAML:2.0:nameid-format:entity">urn:mace:example.com:saml:roland:sp</ns1:Issuer><ns1:Subject><ns1:NameID Format="urn:oasis:names:tc:SAML:2.0:nameid-format:persistent">E8042FB4-4D5B-48C3-8E14-8EDD852790DD</ns1:NameID></ns1:Subject></ns0:AttributeQuery>"""} <ns0:AttributeQuery xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol" xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion" Destination="https://idp.example.com/idp/" ID="id1" IssueInstant="%s" Version="2.0"><ns1:Issuer Format="urn:oasis:names:tc:SAML:2.0:nameid-format:entity">urn:mace:example.com:saml:roland:sp</ns1:Issuer><ns1:Subject><ns1:NameID Format="urn:oasis:names:tc:SAML:2.0:nameid-format:persistent">E8042FB4-4D5B-48C3-8E14-8EDD852790DD</ns1:NameID></ns1:Subject></ns0:AttributeQuery>"""}
AUTHN = (AUTHN_PASSWORD, "http://www.example.com/login")
nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="123456") text="123456")
@@ -223,8 +228,8 @@ class TestClient:
assert signed_info.reference[0].digest_value assert signed_info.reference[0].digest_value
print "------------------------------------------------" print "------------------------------------------------"
try: try:
assert self.client.sec.correctly_signed_authn_request(ar_str, assert self.client.sec.correctly_signed_authn_request(
self.client.config.xmlsec_binary, ar_str, self.client.config.xmlsec_binary,
self.client.config.metadata) self.client.config.metadata)
except Exception: # missing certificate except Exception: # missing certificate
self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) self.client.sec.verify_signature(ar_str, node_name=class_name(ar))
@@ -299,7 +304,8 @@ class TestClient:
# Two persons in the cache # Two persons in the cache
assert len(self.client.users.subjects()) == 2 assert len(self.client.users.subjects()) == 2
issuers = [self.client.users.issuers_of_info(s) for s in self.client.users.subjects()] issuers = [self.client.users.issuers_of_info(s) for s in
self.client.users.subjects()]
# The information I have about the subjects comes from the same source # The information I have about the subjects comes from the same source
print issuers print issuers
assert issuers == [[IDP], [IDP]] assert issuers == [[IDP], [IDP]]
@@ -348,7 +354,8 @@ class TestClientWithDummy():
redirect_url = http_args["headers"][0][1] redirect_url = http_args["headers"][0][1]
_, _, _, _, qs, _ = urlparse.urlparse(redirect_url) _, _, _, _, qs, _ = urlparse.urlparse(redirect_url)
qs_dict = urlparse.parse_qs(qs) qs_dict = urlparse.parse_qs(qs)
req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0], binding) req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0],
binding)
resp_args = self.server.response_args(req.message, [response_binding]) resp_args = self.server.response_args(req.message, [response_binding])
assert resp_args["binding"] == response_binding assert resp_args["binding"] == response_binding
@@ -384,8 +391,8 @@ class TestClientWithDummy():
assert isinstance(response, LogoutResponse) assert isinstance(response, LogoutResponse)
def test_post_sso(self): def test_post_sso(self):
binding=BINDING_HTTP_POST binding = BINDING_HTTP_POST
response_binding=BINDING_HTTP_POST response_binding = BINDING_HTTP_POST
sid, http_args = self.client.prepare_for_authenticate( sid, http_args = self.client.prepare_for_authenticate(
"urn:mace:example.com:saml:roland:idp", relay_state="really", "urn:mace:example.com:saml:roland:idp", relay_state="really",
binding=binding, response_binding=response_binding) binding=binding, response_binding=response_binding)
@@ -414,7 +421,7 @@ class TestClientWithDummy():
ac = resp.assertion.authn_statement[0].authn_context ac = resp.assertion.authn_statement[0].authn_context
assert ac.authenticating_authority[0].text == \ assert ac.authenticating_authority[0].text == \
'http://www.example.com/login' 'http://www.example.com/login'
assert ac.authn_context_class_ref.text == AUTHN_PASSWORD assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
# if __name__ == "__main__": # if __name__ == "__main__":

View File

@@ -2,7 +2,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64 import base64
from saml2.saml import NAMEID_FORMAT_TRANSIENT, AUTHN_PASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.samlp import NameIDPolicy from saml2.samlp import NameIDPolicy
from s2repoze.plugins.sp import make_plugin from s2repoze.plugins.sp import make_plugin
from saml2.server import Server from saml2.server import Server
@@ -35,7 +36,11 @@ ENV1 = {'SERVER_SOFTWARE': 'CherryPy/3.1.2 WSGI Server',
trans_name_policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT, trans_name_policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT,
allow_create="true") allow_create="true")
AUTHN = (AUTHN_PASSWORD, "http://www.example.com/login") AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
class TestSP(): class TestSP():
def setup_class(self): def setup_class(self):

View File

@@ -1,4 +1,5 @@
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID
from saml2.saml import NAMEID_FORMAT_TRANSIENT
__author__ = 'rolandh' __author__ = 'rolandh'

View File

@@ -1,4 +1,4 @@
from saml2.saml import AUTHN_PASSWORD from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.httpbase import set_list2dict from saml2.httpbase import set_list2dict
from saml2.profile.ecp import RelayState from saml2.profile.ecp import RelayState
from saml2.profile.paos import Request from saml2.profile.paos import Request
@@ -19,6 +19,11 @@ from pathutils import dotname, full_path, xmlsec_path
__author__ = 'rolandh' __author__ = 'rolandh'
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def _eq(l1, l2): def _eq(l1, l2):
if len(l1) == len(l2): if len(l1) == len(l2):
return set(l1) == set(l2) return set(l1) == set(l2)
@@ -54,7 +59,7 @@ def test_complete_flow():
assert sp.can_handle_ecp_response(response) assert sp.can_handle_ecp_response(response)
id, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ") sid, message = sp.create_ecp_authn_request(IDP_ENTITY_ID, relay_state="XYZ")
# ------------ @Client ----------------------------- # ------------ @Client -----------------------------
@@ -86,17 +91,15 @@ def test_complete_flow():
[BINDING_PAOS], [BINDING_PAOS],
entity_id=sp_entity_id) entity_id=sp_entity_id)
resp = idp.create_ecp_authn_request_response(destination, resp = idp.create_ecp_authn_request_response(
{ destination, {"eduPersonEntitlement": "Short stop",
"eduPersonEntitlement": "Short stop",
"surName": "Jeter", "surName": "Jeter",
"givenName": "Derek", "givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man" "title": "The man"
}, },
req.message.id, destination, sp_entity_id, req.message.id, destination, sp_entity_id,
name_id=name_id, authn=(AUTHN_PASSWORD, name_id=name_id, authn=AUTHN)
"http://www.example.com/login"))
# ------------ @Client ----------------------------- # ------------ @Client -----------------------------
# The client got the response from the IDP repackage and send it to the SP # The client got the response from the IDP repackage and send it to the SP
@@ -134,7 +137,7 @@ def test_complete_flow():
# parse the response # parse the response
resp = sp.parse_authn_request_response(respdict["body"], None, {id: "/"}) resp = sp.parse_authn_request_response(respdict["body"], None, {sid: "/"})
print resp.response print resp.response

View File

@@ -2,10 +2,10 @@ import base64
from hashlib import sha1 from hashlib import sha1
from urlparse import urlparse from urlparse import urlparse
from urlparse import parse_qs from urlparse import parse_qs
from saml2.saml import AUTHN_PASSWORD
from saml2 import BINDING_HTTP_ARTIFACT from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_SOAP from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client from saml2.client import Saml2Client
from saml2.entity import create_artifact from saml2.entity import create_artifact
@@ -17,6 +17,13 @@ __author__ = 'rolandh'
TAG1 = "name=\"SAMLRequest\" value=" TAG1 = "name=\"SAMLRequest\" value="
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def get_msg(hinfo, binding, response=False): def get_msg(hinfo, binding, response=False):
if binding == BINDING_SOAP: if binding == BINDING_SOAP:
msg = hinfo["data"] msg = hinfo["data"]
@@ -43,6 +50,7 @@ def get_msg(hinfo, binding, response=False):
return msg return msg
def test_create_artifact(): def test_create_artifact():
b64art = create_artifact("http://sp.example.com/saml.xml", b64art = create_artifact("http://sp.example.com/saml.xml",
"aabbccddeeffgghhiijj") "aabbccddeeffgghhiijj")
@@ -57,6 +65,7 @@ def test_create_artifact():
SP = 'urn:mace:example.com:saml:roland:sp' SP = 'urn:mace:example.com:saml:roland:sp'
def test_create_artifact_resolve(): def test_create_artifact_resolve():
b64art = create_artifact(SP, "aabbccddeeffgghhiijj", 1) b64art = create_artifact(SP, "aabbccddeeffgghhiijj", 1)
artifact = base64.b64decode(b64art) artifact = base64.b64decode(b64art)
@@ -88,8 +97,9 @@ def test_create_artifact_resolve():
assert ar.artifact.text == b64art assert ar.artifact.text == b64art
def test_artifact_flow(): def test_artifact_flow():
SP = 'urn:mace:example.com:saml:roland:sp' #SP = 'urn:mace:example.com:saml:roland:sp'
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") idp = Server(config_file="idp_all_conf")
@@ -164,8 +174,7 @@ def test_artifact_flow():
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man"}, "title": "The man"},
name_id=name_id, name_id=name_id,
authn=(AUTHN_PASSWORD, authn=AUTHN,
"http://www.example.com/login"),
**resp_args) **resp_args)
print response print response
@@ -207,7 +216,7 @@ def test_artifact_flow():
assert ar.artifact.text == artifact3 assert ar.artifact.text == artifact3
# The IDP retrieves the response from the database using the artifact as the key # The IDP retrieves the response from the database using the artifact as the key
oreq = idp.artifact[ar.artifact.text] #oreq = idp.artifact[ar.artifact.text]
binding, destination = idp.pick_binding("artifact_resolution_service", binding, destination = idp.pick_binding("artifact_resolution_service",
entity_id=sp.config.entityid) entity_id=sp.config.entityid)

View File

@@ -3,11 +3,12 @@ from saml2 import BINDING_SOAP, BINDING_HTTP_POST
__author__ = 'rolandh' __author__ = 'rolandh'
from saml2.samlp import RequestedAuthnContext, AuthnRequest, NameIDPolicy from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.authn_context import requested_authn_context
from saml2.samlp import AuthnRequest
from saml2.samlp import NameIDPolicy
from saml2.samlp import AuthnQuery from saml2.samlp import AuthnQuery
from saml2.client import Saml2Client from saml2.client import Saml2Client
from saml2.saml import AUTHN_PASSWORD
from saml2.saml import AuthnContextClassRef
from saml2.saml import Subject from saml2.saml import Subject
from saml2.saml import NameID from saml2.saml import NameID
from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NAMEID_FORMAT_TRANSIENT
@@ -16,6 +17,12 @@ from saml2.server import Server
TAG1 = "name=\"SAMLRequest\" value=" TAG1 = "name=\"SAMLRequest\" value="
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def get_msg(hinfo, binding): def get_msg(hinfo, binding):
if binding == BINDING_SOAP: if binding == BINDING_SOAP:
xmlstr = hinfo["data"] xmlstr = hinfo["data"]
@@ -41,9 +48,7 @@ def test_basic():
srvs = sp.metadata.authn_query_service(idp.config.entityid) srvs = sp.metadata.authn_query_service(idp.config.entityid)
destination = srvs[0]["location"] destination = srvs[0]["location"]
authn_context = [RequestedAuthnContext( authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD)
authn_context_class_ref=AuthnContextClassRef(
text=AUTHN_PASSWORD))]
subject = Subject(text="abc", subject = Subject(text="abc",
name_id=NameID(format=NAMEID_FORMAT_TRANSIENT)) name_id=NameID(format=NAMEID_FORMAT_TRANSIENT))
@@ -80,8 +85,7 @@ def test_flow():
destination, destination,
sp.config.entityid, sp.config.entityid,
name_id=name_id, name_id=name_id,
authn=(AUTHN_PASSWORD, authn=AUTHN)
"http://www.example.com/login"))
hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state)
@@ -94,9 +98,7 @@ def test_flow():
binding, destination = sp.pick_binding("authn_query_service", binding, destination = sp.pick_binding("authn_query_service",
entity_id=idp.config.entityid) entity_id=idp.config.entityid)
authn_context = [RequestedAuthnContext( authn_context = requested_authn_context(INTERNETPROTOCOLPASSWORD)
authn_context_class_ref=AuthnContextClassRef(
text=AUTHN_PASSWORD))]
subject = aresp.assertion.subject subject = aresp.assertion.subject

View File

@@ -1,8 +1,9 @@
from urlparse import parse_qs from urlparse import parse_qs
from urlparse import urlparse from urlparse import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import AuthnRequest from saml2.samlp import AuthnRequest
from saml2.samlp import NameIDPolicy from saml2.samlp import NameIDPolicy
from saml2.saml import AUTHN_PASSWORD, Assertion from saml2.saml import Assertion
from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_URI from saml2 import BINDING_URI
@@ -14,6 +15,13 @@ __author__ = 'rolandh'
TAG1 = "name=\"SAMLRequest\" value=" TAG1 = "name=\"SAMLRequest\" value="
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def get_msg(hinfo, binding, response=False): def get_msg(hinfo, binding, response=False):
if binding == BINDING_SOAP: if binding == BINDING_SOAP:
msg = hinfo["data"] msg = hinfo["data"]
@@ -35,6 +43,7 @@ def get_msg(hinfo, binding, response=False):
return msg return msg
def test_basic_flow(): def test_basic_flow():
sp = Saml2Client(config_file="servera_conf") sp = Saml2Client(config_file="servera_conf")
idp = Server(config_file="idp_all_conf") idp = Server(config_file="idp_all_conf")
@@ -43,9 +52,9 @@ def test_basic_flow():
relay_state = "FOO" relay_state = "FOO"
# -- dummy request --- # -- dummy request ---
orig_req = AuthnRequest(issuer=sp._issuer(), orig_req = AuthnRequest(
name_id_policy=NameIDPolicy(allow_create="true", issuer=sp._issuer(), name_id_policy=NameIDPolicy(
format=NAMEID_FORMAT_TRANSIENT)) allow_create="true", format=NAMEID_FORMAT_TRANSIENT))
# == Create an AuthnRequest response # == Create an AuthnRequest response
@@ -62,8 +71,7 @@ def test_basic_flow():
destination, destination,
sp.config.entityid, sp.config.entityid,
name_id=name_id, name_id=name_id,
authn=(AUTHN_PASSWORD, authn=AUTHN)
"http://www.example.com/login"))
hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state) hinfo = idp.apply_binding(binding, "%s" % resp, destination, relay_state)
@@ -72,7 +80,7 @@ def test_basic_flow():
xmlstr = get_msg(hinfo, binding) xmlstr = get_msg(hinfo, binding)
aresp = sp.parse_authn_request_response(xmlstr, binding, aresp = sp.parse_authn_request_response(xmlstr, binding,
{resp.in_response_to :"/"}) {resp.in_response_to: "/"})
# == Look for assertion X # == Look for assertion X

View File

@@ -1,6 +1,5 @@
from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_POST
from saml2.mdstore import MetadataStore from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import AUTHN_PASSWORD
from saml2.client import Saml2Client from saml2.client import Saml2Client
from saml2.server import Server from saml2.server import Server
from saml2.mongo_store import EptidMDB from saml2.mongo_store import EptidMDB
@@ -8,6 +7,12 @@ from saml2.mongo_store import EptidMDB
__author__ = 'rolandh' __author__ = 'rolandh'
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
}
def _eq(l1, l2): def _eq(l1, l2):
return set(l1) == set(l2) return set(l1) == set(l2)
@@ -34,8 +39,7 @@ def test_flow():
"mail": "derek.jeter@nyy.mlb.com", "mail": "derek.jeter@nyy.mlb.com",
"title": "The man"}, "title": "The man"},
userid="jeter", userid="jeter",
authn=(AUTHN_PASSWORD, authn=AUTHN,
"http://www.example.com/login"),
**rinfo) **rinfo)
# What's stored away is the assertion # What's stored away is the assertion