Removed unnecessary functions

This commit is contained in:
Roland Hedberg
2010-03-12 15:06:14 +01:00
parent 3e4b3f1e92
commit 631b2a8935
6 changed files with 98 additions and 161 deletions

View File

@@ -26,7 +26,7 @@ import time
import sys
from saml2.time_util import str_to_time, instant
from saml2.utils import sid, deflate_and_base64_encode, make_instance
from saml2.utils import name_id_factory, subject_factory, do_attributes
from saml2.utils import do_attributes, args2dict
from saml2 import samlp, saml, extension_element_to_element
from saml2 import VERSION, class_name
@@ -477,8 +477,8 @@ class Saml2Client(object):
"""
subject = subject_factory(
name_id = name_id_factory(subject_id, format=nameformat,
subject = args2dict(
name_id = args2dict(subject_id, format=nameformat,
sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier),
)

View File

@@ -124,8 +124,12 @@ class Config(dict):
"attribute_maps"])
config["am_forward"] = forward
config["am_backward"] = backward
else:
config["am_forward"] = None
config["am_backward"] = None
if "sp" in config["service"]:
print config["service"]["sp"]
if "metadata" in config:
self.sp_check(config["service"]["sp"], config["metadata"])
else:

View File

@@ -23,14 +23,10 @@ import shelve
from saml2 import saml, samlp, VERSION
from saml2.utils import issuer_factory, conditions_factory
from saml2.utils import audience_restriction_factory
from saml2.utils import sid, decode_base64_and_inflate, make_instance
from saml2.utils import audience_factory, name_id_factory, assertion_factory
from saml2.utils import subject_factory, subject_confirmation_factory
from saml2.utils import response_factory, do_ava_statement
from saml2.utils import authn_statement_factory, MissingValue
from saml2.utils import subject_confirmation_data_factory, success_status_factory
from saml2.utils import MissingValue, args2dict
from saml2.utils import success_status_factory, assertion_factory
from saml2.utils import filter_attribute_value_assertions
from saml2.utils import OtherError, do_attribute_statement
from saml2.utils import VersionMismatch, UnknownPrincipal, UnsupportedBinding
@@ -70,7 +66,7 @@ class Server(object):
self.id_map = None
def issuer(self):
return issuer_factory( self.conf["entityid"],
return args2dict( self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
def persistent_id(self, entity_id, subject_id):
@@ -261,15 +257,15 @@ class Server(object):
return self.filter_ava(identity, spid, required, optional)
def _conditions(self, sp_entity_id):
return conditions_factory(
return args2dict(
not_before=instant(),
# How long might depend on who's getting it
not_on_or_after=self._not_on_or_after(sp_entity_id),
audience_restriction=audience_restriction_factory(
audience=audience_factory(sp_entity_id)))
audience_restriction=args2dict(
audience=args2dict(sp_entity_id)))
def _authn_statement(self):
return authn_statement_factory(authn_instant = instant(),
return args2dict(authn_instant = instant(),
session_index = sid()),
def do_response(self, consumer_url, in_response_to,
@@ -304,8 +300,7 @@ class Server(object):
# temporary identifier or ??
if not name_id:
name_id = name_id_factory(sid(),
format=saml.NAMEID_FORMAT_TRANSIENT)
name_id = args2dict(sid(),format=saml.NAMEID_FORMAT_TRANSIENT)
# start using now and for a hour
conds = self._conditions(sp_entity_id)
@@ -314,13 +309,12 @@ class Server(object):
attribute_statement = attr_statement,
authn_statement = self._authn_statement(),
conditions = conds,
subject=subject_factory(
subject=args2dict(
name_id=name_id,
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=subject_confirmation_factory(
subject_confirmation=args2dict(
subject_confirmation_data = \
subject_confirmation_data_factory(
in_response_to=in_response_to))),
args2dict(in_response_to=in_response_to))),
),
# Store which assertion that has been sent to which SP about which
@@ -359,16 +353,16 @@ class Server(object):
# temporary identifier or ??
if not name_id:
name_id = name_id_factory(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
name_id = args2dict(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
conds = self._conditions(sp_entity_id)
assertion = assertion_factory(
subject = subject_factory(
subject = args2dict(
name_id = name_id,
method = saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation = subject_confirmation_factory(
subject_confirmation = args2dict(
subject_confirmation_data = \
subject_confirmation_data_factory(
args2dict(
in_response_to = in_response_to,
not_on_or_after = self._not_on_or_after(sp_entity_id),
address = ip_address,
@@ -424,7 +418,7 @@ class Server(object):
userid)
self.log.info("=> %s" % subj_id)
name_id = name_id_factory(subj_id,
name_id = args2dict(subj_id,
format=saml.NAMEID_FORMAT_PERSISTENT,
sp_name_qualifier=name_id_policy.sp_name_qualifier)

View File

@@ -319,6 +319,12 @@ def _properties(klass):
props.extend(klass.c_attributes.values())
return props
def args2dict(text=None, **kwargs):
spec = kwargs.copy()
if text:
spec["text"] = text
return spec
def _klassdict(klass, text=None, **kwargs):
""" Does not remove attributes with no values """
spec = {}
@@ -333,77 +339,25 @@ def _klassdict(klass, text=None, **kwargs):
return spec
def status_from_exception_factory(exception):
return _klassdict(samlp.Status,
status_code=_klassdict(samlp.StatusCode,
msg = exception.args[0]
return args2dict(
status_message=msg,
status_code=args2dict(
value=samlp.STATUS_RESPONDER,
status_code=_klassdict(samlp.StatusCode,
value=EXCEPTION2STATUS[exception.__class__])
status_code=args2dict(value=EXCEPTION2STATUS[exception.__class__])
),
status_message=exception.args[0],
)
def name_id_factory(text="", **kwargs):
return _klassdict(saml.NameID, text, **kwargs)
def status_message_factory(text="", **kwargs):
return _klassdict(samlp.StatusMessage, text, **kwargs)
def status_code_factory(text="", **kwargs):
return _klassdict(samlp.StatusCode, text, **kwargs)
def status_factory(text="", **kwargs):
return _klassdict(samlp.Status, text, **kwargs)
def success_status_factory():
return status_factory(status_code=status_code_factory(
value=samlp.STATUS_SUCCESS))
def audience_factory(text="", **kwargs):
return _klassdict(saml.Audience, text, **kwargs)
def audience_restriction_factory(text="", **kwargs):
return _klassdict(saml.AudienceRestriction, text, **kwargs)
def conditions_factory(text="", **kwargs):
return _klassdict(saml.Conditions, text, **kwargs)
def attribute_factory(text="", **kwargs):
return _klassdict(saml.Attribute, text, **kwargs)
def attribute_value_factory(text="", **kwargs):
return _klassdict(saml.AttributeValue, text, **kwargs)
def attribute_statement_factory(text="", **kwargs):
return _klassdict(saml.AttributeStatement, text, **kwargs)
def subject_confirmation_data_factory(text="", **kwargs):
return _klassdict(saml.SubjectConfirmationData, text, **kwargs)
def subject_confirmation_factory(text="", **kwargs):
return _klassdict(saml.SubjectConfirmation, text, **kwargs)
def subject_factory(text="", **kwargs):
return _klassdict(saml.Subject, text, **kwargs)
def authn_context_class_ref_factory(text="", **kwargs):
return _klassdict(saml.AuthnContextClassRef, text, **kwargs)
def authn_context_factory(text="", **kwargs):
return _klassdict(saml.AuthnContext, text, **kwargs)
def authn_statement_factory(text="", **kwargs):
return _klassdict(saml.AuthnStatement, text, **kwargs)
def name_id_policy_factory(text="", **kwargs):
return _klassdict(samlp.NameIDPolicy, text, **kwargs)
def success_status_factory():
return args2dict(status_code=args2dict(value=samlp.STATUS_SUCCESS))
def assertion_factory(text="", **kwargs):
kwargs.update({
"version": VERSION,
"id" : sid(),
"issue_instant" : instant(),
})
return _klassdict(saml.Assertion, text, **kwargs)
return args2dict(text, **kwargs)
def response_factory(signature=False, encrypt=False, **kwargs):
kwargs.update({
@@ -415,13 +369,13 @@ def response_factory(signature=False, encrypt=False, **kwargs):
kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
if encrypt:
pass
return _klassdict(samlp.Response, **kwargs)
return args2dict(**kwargs)
def _attrval(val):
if isinstance(val, basestring):
attrval = [attribute_value_factory(val)]
attrval = [args2dict(val)]
elif isinstance(val, list):
attrval = [attribute_value_factory(v) for v in val]
attrval = [args2dict(v) for v in val]
elif val == None:
attrval = None
else:
@@ -444,7 +398,7 @@ def ava_to_attributes(ava, bmap=None):
(dic["name"], dic["name_format"]) = bmap[friendly_name]
except KeyError:
pass
attrs.append(attribute_factory(**dic))
attrs.append(args2dict(**dic))
return attrs
def do_ava_statement(identity, bmap):
@@ -452,8 +406,7 @@ def do_ava_statement(identity, bmap):
:param identity: A dictionary with fiendly names as keys
:return:
"""
return attribute_statement_factory(
attribute=ava_to_attributes(identity, bmap))
return args2dict(attribute=ava_to_attributes(identity, bmap))
def do_attributes(identity):
attrs = []
@@ -480,7 +433,7 @@ def do_attributes(identity):
dic["name_format"] = nformat
if friendly:
dic["friendly_name"] = friendly
attrs.append(attribute_factory(**dic))
attrs.append(args2dict(**dic))
return attrs
def do_attribute_statement(identity):
@@ -488,8 +441,5 @@ def do_attribute_statement(identity):
:param identity: A dictionary with fiendly names as keys
:return:
"""
return attribute_statement_factory(attribute=do_attributes(identity))
def issuer_factory(text, **kwargs):
return _klassdict(saml.Issuer, text, **kwargs)
return args2dict(attribute=do_attributes(identity))

View File

@@ -31,13 +31,13 @@ class TestServer1():
def test_assertion(self):
tmp = utils.assertion_factory(
subject= utils.subject_factory("_aaa",
subject= utils.args2dict("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.attribute_statement_factory(
attribute_statement = utils.args2dict(
attribute=[
utils.attribute_factory(attribute_value="Derek",
utils.args2dict(attribute_value="Derek",
friendly_name="givenName"),
utils.attribute_factory(attribute_value="Jeter",
utils.args2dict(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -75,12 +75,12 @@ class TestServer1():
destination="https:#www.example.com",
status=utils.success_status_factory(),
assertion=utils.assertion_factory(
subject = utils.subject_factory("_aaa",
subject = utils.args2dict("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement = utils.attribute_statement_factory([
utils.attribute_factory(attribute_value="Derek",
attribute_statement = utils.args2dict([
utils.args2dict(attribute_value="Derek",
friendly_name="givenName"),
utils.attribute_factory(attribute_value="Jeter",
utils.args2dict(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -341,7 +341,7 @@ class TestServer1():
"1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
utils.make_instance(samlp.NameIDPolicy,
utils.name_id_policy_factory(
utils.args2dict(
format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true")),
"foba0001@example.com")

View File

@@ -44,9 +44,8 @@ def test_inflate_then_deflate():
assert bis == str
def test_status_success():
stat = utils.status_factory(
status_code=utils.status_code_factory(
value=samlp.STATUS_SUCCESS))
stat = utils.args2dict(
status_code=utils.args2dict(value=samlp.STATUS_SUCCESS))
status = make_instance( samlp.Status, stat)
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
@@ -60,12 +59,11 @@ def test_success_status():
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_error_status():
stat = utils.status_factory(
status_message=utils.status_message_factory(
"Error resolving principal"),
status_code=utils.status_code_factory(
stat = utils.args2dict(
status_message=utils.args2dict("Error resolving principal"),
status_code=utils.args2dict(
value=samlp.STATUS_RESPONDER,
status_code=utils.status_code_factory(
status_code=utils.args2dict(
value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
status_text = "%s" % make_instance( samlp.Status, stat )
@@ -75,8 +73,9 @@ def test_error_status():
def test_status_from_exception():
e = utils.UnknownPrincipal("Error resolving principal")
stat = utils.status_from_exception_factory(e)
print stat
status_text = "%s" % make_instance( samlp.Status, stat )
print status_text
assert status_text == ERROR_STATUS
def test_make_vals_str():
@@ -144,18 +143,18 @@ def test_attribute_statement():
def test_audience():
aud_restr = make_instance( saml.AudienceRestriction,
utils.audience_restriction_factory(
audience=utils.audience_factory("urn:foo:bar")))
utils.args2dict(
audience=utils.args2dict("urn:foo:bar")))
assert aud_restr.keyswv() == ["audience"]
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
conds_dict = utils.conditions_factory(
conds_dict = utils.args2dict(
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=utils.audience_restriction_factory(
audience=utils.audience_factory("urn:foo:bar")))
audience_restriction=utils.args2dict(
audience=utils.args2dict("urn:foo:bar")))
conditions = make_instance(saml.Conditions, conds_dict)
assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
@@ -167,7 +166,7 @@ def test_conditions():
def test_value_1():
#FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
adict = utils.attribute_factory(name="urn:oid:2.5.4.42",
adict = utils.args2dict(name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI)
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["name","name_format"])
@@ -175,7 +174,7 @@ def test_value_1():
assert attribute.name_format == saml.NAME_FORMAT_URI
def test_value_2():
adict = utils.attribute_factory(name="urn:oid:2.5.4.42",
adict = utils.args2dict(name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI,
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
@@ -185,7 +184,7 @@ def test_value_2():
assert attribute.friendly_name == "givenName"
def test_value_3():
adict = utils.attribute_factory(attribute_value="Derek",
adict = utils.args2dict(attribute_value="Derek",
name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI,
friendly_name="givenName")
@@ -199,7 +198,7 @@ def test_value_3():
assert attribute.attribute_value[0].text == "Derek"
def test_value_4():
adict = utils.attribute_factory(attribute_value="Derek",
adict = utils.args2dict(attribute_value="Derek",
friendly_name="givenName")
attribute = make_instance(saml.Attribute, adict)
assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
@@ -261,7 +260,7 @@ def test_do_attribute_statement_multi():
assert attribute.friendly_name == "eduPersonEntitlement"
def test_subject():
adict = utils.subject_factory("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
adict = utils.args2dict("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
subject = make_instance(saml.Subject, adict)
assert _eq(subject.keyswv(),["text", "name_id"])
assert subject.text == "_aaa"
@@ -589,50 +588,40 @@ def test_nameformat_email():
assert utils.valid_email("a@b.se")
assert utils.valid_email("john@doe@johndoe.com") == False
def test_name_id_factory():
n = utils.name_id_factory("foo", name_qualifier="urn:mace:example.com:nq")
def test_args2dict():
n = utils.args2dict("foo", name_qualifier="urn:mace:example.com:nq")
assert _eq(n.keys(), ["text","name_qualifier"])
assert n["text"] == "foo"
assert n["name_qualifier"] == "urn:mace:example.com:nq"
def test_audience_factory():
a = utils.audience_factory("http://example.com/sp", foo="bar")
assert _eq(a.keys(), ["text"])
assert a["text"] == "http://example.com/sp"
def test_attribute_value_factory():
a = utils.attribute_value_factory("member@example.com")
assert _eq(a.keys(), ["text"])
assert a["text"] == "member@example.com"
def test_attribute_factory():
a = utils.attribute_factory(friendly_name="eduPersonScopedAffiliation",
def test_attribute():
a = utils.args2dict(friendly_name="eduPersonScopedAffiliation",
name="urn:oid:1.3.6.1.4.1.5923.1.1.1.9",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
assert _eq(a.keys(), ["friendly_name","name", "name_format"])
a = utils.attribute_factory(friendly_name="eduPersonScopedAffiliation",
a = utils.args2dict(friendly_name="eduPersonScopedAffiliation",
name="urn:oid:1.3.6.1.4.1.5923.1.1.1.9",
name_format="urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
attribute_value=utils.attribute_value_factory("member@example.com"))
attribute_value=utils.args2dict("member@example.com"))
assert _eq(a.keys(), ["friendly_name","name", "name_format",
"attribute_value"])
def test_attribute_statement_factory():
a = utils.attribute_statement_factory(
def test_attribute_statement():
a = utils.args2dict(
attribute=[
utils.attribute_factory(attribute_value="Derek",
utils.args2dict(attribute_value="Derek",
friendly_name="givenName"),
utils.attribute_factory(attribute_value="Jeter",
utils.args2dict(attribute_value="Jeter",
friendly_name="surName"),
])
assert a.keys() == ["attribute"]
assert len(a["attribute"]) == 2
def test_subject_confirmation_data_factory():
s = utils.subject_confirmation_data_factory(
def test_subject_confirmation_data():
s = utils.args2dict(
in_response_to="_12345678",
not_before="2010-02-11T07:30:00Z",
not_on_or_after="2010-02-11T07:35:00Z",
@@ -642,12 +631,12 @@ def test_subject_confirmation_data_factory():
assert _eq(s.keys(),["in_response_to","not_before","not_on_or_after",
"recipient", "address"])
def test_subject_confirmation_factory():
s = utils.subject_confirmation_factory(
def test_subject_confirmation():
s = utils.args2dict(
method="urn:oasis:names:tc:SAML:2.0:profiles:SSO:browser",
base_id="1234",
name_id="abcd",
subject_confirmation_data=utils.subject_confirmation_data_factory(
subject_confirmation_data=utils.args2dict(
in_response_to="_1234567890",
recipient="http://example.com/sp/"))
@@ -656,24 +645,24 @@ def test_subject_confirmation_factory():
assert s["method"] == "urn:oasis:names:tc:SAML:2.0:profiles:SSO:browser"
def test_authn_context_class_ref_factory():
a = utils.authn_context_class_ref_factory(
def test_authn_context_class_ref():
a = utils.args2dict(
"urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified")
assert a.keys() == ["text"]
assert a["text"] == "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
def test_authn_context_factory():
accr = utils.authn_context_class_ref_factory(
def test_authn_context():
accr = utils.args2dict(
"urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified")
a = utils.authn_context_factory(authn_context_class_ref=accr)
a = utils.args2dict(authn_context_class_ref=accr)
assert a.keys() == ["authn_context_class_ref"]
def test_authn_statement_factory():
accr = utils.authn_context_class_ref_factory(
def test_authn_statement():
accr = utils.args2dict(
"urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified")
ac = utils.authn_context_factory(authn_context_class_ref=accr)
a = utils.authn_statement_factory(
ac = utils.args2dict(authn_context_class_ref=accr)
a = utils.args2dict(
authn_instant="2010-03-10T12:33:00Z",
session_index="_12345",
session_not_on_or_after="2010-03-11T12:00:00Z",