Rewrote conversion from AttributeStatement to dictionary. Can now handle undefined name format as well as different name format for different attributes in the Statement.
This commit is contained in:
@@ -21,7 +21,7 @@ from importlib import import_module
|
||||
|
||||
from saml2.s_utils import factory, do_ava
|
||||
from saml2 import saml, extension_elements_to_elements, SAMLError
|
||||
from saml2.saml import NAME_FORMAT_URI
|
||||
from saml2.saml import NAME_FORMAT_URI, NAME_FORMAT_UNSPECIFIED
|
||||
|
||||
|
||||
class UnknownNameFormat(SAMLError):
|
||||
@@ -121,14 +121,22 @@ def to_local(acs, statement):
|
||||
"""
|
||||
if not acs:
|
||||
acs = [AttributeConverter()]
|
||||
acsd = {"": acs}
|
||||
else:
|
||||
acsd = dict([(a.name_format, a) for a in acs])
|
||||
|
||||
ava = []
|
||||
for aconv in acs:
|
||||
ava = {}
|
||||
for attr in statement.attribute:
|
||||
try:
|
||||
ava = aconv.fro(statement)
|
||||
break
|
||||
except UnknownNameFormat:
|
||||
pass
|
||||
key, val = acsd[attr.name_format].ava_from(attr)
|
||||
except KeyError:
|
||||
key, val = acs[0].lcd_ava_from(attr)
|
||||
|
||||
try:
|
||||
ava[key].extend(val)
|
||||
except KeyError:
|
||||
ava[key] = val
|
||||
|
||||
return ava
|
||||
|
||||
|
||||
@@ -233,10 +241,39 @@ class AttributeConverter(object):
|
||||
if self._fro is None or self._to is None:
|
||||
self.adjust()
|
||||
|
||||
def lcd_ava_from(self, attribute):
|
||||
"""
|
||||
In nothing else works, this should
|
||||
|
||||
:param attribute:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
name = attribute.friendly_name.strip()
|
||||
except AttributeError:
|
||||
name = attribute.name.strip()
|
||||
|
||||
values = []
|
||||
for value in attribute.attribute_value:
|
||||
if not value.text:
|
||||
values.append('')
|
||||
else:
|
||||
values.append(value.text.strip())
|
||||
|
||||
return name, values
|
||||
|
||||
def fail_safe_fro(self, statement):
|
||||
""" In case there is not formats defined """
|
||||
""" In case there is not formats defined or if the name format is
|
||||
undefined
|
||||
|
||||
:param statement: AttributeStatement instance
|
||||
:return: A dictionary with names and values
|
||||
"""
|
||||
result = {}
|
||||
for attribute in statement.attribute:
|
||||
if attribute.name_format and \
|
||||
attribute.name_format != NAME_FORMAT_UNSPECIFIED:
|
||||
continue
|
||||
try:
|
||||
name = attribute.friendly_name.strip()
|
||||
except AttributeError:
|
||||
@@ -281,8 +318,8 @@ class AttributeConverter(object):
|
||||
return attr, val
|
||||
|
||||
def fro(self, statement):
|
||||
""" Get the attributes and the attribute values
|
||||
|
||||
""" Get the attributes and the attribute values.
|
||||
|
||||
:param statement: The AttributeStatement.
|
||||
:return: A dictionary containing attributes and values
|
||||
"""
|
||||
@@ -294,15 +331,12 @@ class AttributeConverter(object):
|
||||
for attribute in statement.attribute:
|
||||
if attribute.name_format and self.name_format and \
|
||||
attribute.name_format != self.name_format:
|
||||
raise UnknownNameFormat
|
||||
continue
|
||||
|
||||
(key, val) = self.ava_from(attribute)
|
||||
result[key] = val
|
||||
|
||||
if not result:
|
||||
return self.fail_safe_fro(statement)
|
||||
else:
|
||||
return result
|
||||
return result
|
||||
|
||||
def to_format(self, attr):
|
||||
""" Creates an Attribute instance with name, name_format and
|
||||
|
||||
@@ -173,3 +173,13 @@ STATEMENT3 = """<?xml version='1.0' encoding='UTF-8'?>
|
||||
<ns0:AttributeValue>Roland</ns0:AttributeValue>
|
||||
</ns0:Attribute>
|
||||
</ns0:AttributeStatement>"""
|
||||
|
||||
STATEMENT4 = """<?xml version='1.0' encoding='UTF-8'?>
|
||||
<ns0:AttributeStatement xmlns:ns0="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<ns0:Attribute Name="user_id" NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:unspecified">
|
||||
<ns0:AttributeValue xsi:type="xs:string">bob</ns0:AttributeValue>
|
||||
</ns0:Attribute>
|
||||
<ns0:Attribute Name="NameID" NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:unspecified">
|
||||
<ns0:AttributeValue xsi:type="xs:string">bobsnameagain</ns0:AttributeValue>
|
||||
</ns0:Attribute>
|
||||
</ns0:AttributeStatement>"""
|
||||
|
||||
@@ -5,7 +5,7 @@ from saml2 import attribute_converter, saml
|
||||
from attribute_statement_data import *
|
||||
|
||||
from pathutils import full_path
|
||||
from saml2.attribute_converter import AttributeConverterNOOP
|
||||
from saml2.attribute_converter import AttributeConverterNOOP, to_local
|
||||
|
||||
|
||||
def _eq(l1,l2):
|
||||
@@ -15,10 +15,12 @@ BASIC_NF = 'urn:oasis:names:tc:SAML:2.0:attrname-format:basic'
|
||||
URI_NF = 'urn:oasis:names:tc:SAML:2.0:attrname-format:uri'
|
||||
SAML1 = 'urn:mace:shibboleth:1.0:attributeNamespace:uri'
|
||||
|
||||
|
||||
def test_default():
|
||||
acs = attribute_converter.ac_factory()
|
||||
assert acs
|
||||
|
||||
|
||||
class TestAC():
|
||||
def setup_class(self):
|
||||
self.acs = attribute_converter.ac_factory(full_path("attributemaps"))
|
||||
@@ -51,13 +53,10 @@ class TestAC():
|
||||
def test_ava_fro_2(self):
|
||||
ats = saml.attribute_statement_from_string(STATEMENT2)
|
||||
#print ats
|
||||
ava = None
|
||||
ava = {}
|
||||
for ac in self.acs:
|
||||
try:
|
||||
ava = ac.fro(ats)
|
||||
break
|
||||
except attribute_converter.UnknownNameFormat:
|
||||
pass
|
||||
ava.update(ac.fro(ats))
|
||||
|
||||
print ava.keys()
|
||||
assert _eq(ava.keys(),['uid', 'swissedupersonuniqueid',
|
||||
'swissedupersonhomeorganizationtype',
|
||||
@@ -167,7 +166,12 @@ class TestAC():
|
||||
oava = basic_ac.fro(attr_state)
|
||||
|
||||
assert _eq(ava.keys(), oava.keys())
|
||||
|
||||
|
||||
def test_unspecified_name_format(self):
|
||||
ats = saml.attribute_statement_from_string(STATEMENT4)
|
||||
ava = to_local(self.acs, ats)
|
||||
assert ava == {'user_id': ['bob'], 'NameID': ['bobsnameagain']}
|
||||
|
||||
|
||||
def test_noop_attribute_conversion():
|
||||
ava = {"urn:oid:2.5.4.4": "Roland", "urn:oid:2.5.4.42": "Hedberg" }
|
||||
@@ -187,4 +191,7 @@ def test_noop_attribute_conversion():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_noop_attribute_conversion()
|
||||
t = TestAC()
|
||||
t.setup_class()
|
||||
t.test_ava_fro_2()
|
||||
#test_noop_attribute_conversion()
|
||||
@@ -430,6 +430,6 @@ class TestClientWithDummy():
|
||||
# tc.test_response()
|
||||
|
||||
if __name__ == "__main__":
|
||||
tc = TestClientWithDummy()
|
||||
tc = TestClient()
|
||||
tc.setup_class()
|
||||
tc.test_post_sso()
|
||||
tc.test_sign_auth_request_0()
|
||||
|
||||
Reference in New Issue
Block a user