Language correction.

Deal with case where people want to JSON serialize session information.
Carry over more parameters in create_attribute_response.
This commit is contained in:
Roland Hedberg 2016-02-11 11:08:04 +01:00
parent 1220e8580c
commit 0515de9fa8
7 changed files with 56 additions and 44 deletions

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
import shelve
import six
from saml2.ident import code, decode
from saml2 import time_util, SAMLError
import logging
@ -98,6 +99,8 @@ class Cache(object):
if check_not_on_or_after and time_util.after(timestamp):
raise ToOld("past %s" % str(timestamp))
if 'name_id' in info and isinstance(info['name_id'], six.string_types):
info['name_id'] = decode(info['name_id'])
return info or None
def set(self, name_id, entity_id, info, not_on_or_after=0):

View File

@ -29,8 +29,8 @@ class Unknown(SAMLError):
def code(item):
"""
Turn a NameID class instance into a quoted string of comma separated
attribute,value pairs. The attribute name is replaced with a digits.
Depends on knowledge on the specific order of the attributes for that
attribute,value pairs. The attribute names are replaced with digits.
Depends on knowledge on the specific order of the attributes for the
class that is used.
:param item: The class instance

View File

@ -1,6 +1,7 @@
import logging
from saml2.cache import Cache
import six
from saml2.cache import Cache
from saml2.ident import code
logger = logging.getLogger(__name__)
@ -20,6 +21,8 @@ class Population(object):
this function will overwrite that information"""
name_id = session_info["name_id"]
# make friendly to (JSON) serialization
session_info['name_id'] = code(name_id)
issuer = session_info["issuer"]
del session_info["issuer"]
self.cache.set(name_id, issuer, session_info,

View File

@ -480,7 +480,7 @@ class Server(Entity):
pass
to_sign = []
args = {}
if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
@ -505,12 +505,16 @@ class Server(Entity):
digest_alg=digest_alg)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
kwargs['sign_assertion'] = True
args["assertion"] = assertion
kwargs["assertion"] = assertion
if sp_entity_id:
kwargs['sp_entity_id'] = sp_entity_id
return self._response(in_response_to, destination, status, issuer,
sign_response, to_sign, sign_alg=sign_alg,
digest_alg=digest_alg, **args)
digest_alg=digest_alg, **kwargs)
# ------------------------------------------------------------------------

View File

@ -7,50 +7,52 @@ from saml2.cache import Cache
from saml2.time_util import in_a_while, str_to_time
from saml2.ident import code
SESSION_INFO_PATTERN = {"ava":{}, "came from":"", "not_on_or_after":0,
"issuer":"", "session_id":-1}
SESSION_INFO_PATTERN = {"ava": {}, "came from": "", "not_on_or_after": 0,
"issuer": "", "session_id": -1}
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2)
def nid_eq(l1, l2):
return _eq([code(c) for c in l1], [code(c) for c in l2])
nid = [
NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="1234"),
NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="9876"),
NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="1000")]
class TestClass:
def setup_class(self):
self.cache = Cache()
def test_set(self):
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Derek"]}
session_info["ava"] = {"givenName": ["Derek"]}
self.cache.set(nid[0], "abcd", session_info, not_on_or_after)
(ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == []
assert list(ava.keys()) == ["givenName"]
assert ava["givenName"] == ["Derek"]
def test_add_ava_info(self):
def test_add_ava_info(self):
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"surName":["Jeter"]}
session_info["ava"] = {"surName": ["Jeter"]}
self.cache.set(nid[0], "bcde", session_info, not_on_or_after)
(ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == []
assert _eq(ava.keys(), ["givenName","surName"])
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_from_one_target_source(self):
def test_from_one_target_source(self):
session_info = self.cache.get(nid[0], "bcde")
ava = session_info["ava"]
assert _eq(ava.keys(), ["surName"])
@ -59,66 +61,65 @@ class TestClass:
ava = session_info["ava"]
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Derek"]
def test_entities(self):
assert _eq(self.cache.entities(nid[0]), ["abcd", "bcde"])
py.test.raises(Exception, "self.cache.entities('6666')")
def test_remove_info(self):
self.cache.reset(nid[0], "bcde")
assert self.cache.active(nid[0], "bcde") == False
assert self.cache.active(nid[0], "abcd")
(ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == ['bcde']
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Derek"]
def test_active(self):
assert self.cache.active(nid[0], "bcde") == False
assert self.cache.active(nid[0], "abcd")
def test_subjects(self):
assert nid_eq(self.cache.subjects(), [nid[0]])
def test_second_subject(self):
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Ichiro"],
"surName":["Suzuki"]}
session_info["ava"] = {"givenName": ["Ichiro"],
"surName": ["Suzuki"]}
self.cache.set(nid[1], "abcd", session_info,
not_on_or_after)
not_on_or_after)
(ava, inactive) = self.cache.get_identity(nid[1])
assert inactive == []
assert _eq(ava.keys(), ["givenName","surName"])
assert _eq(ava.keys(), ["givenName", "surName"])
assert ava["givenName"] == ["Ichiro"]
assert ava["surName"] == ["Suzuki"]
assert nid_eq(self.cache.subjects(), [nid[0], nid[1]])
def test_receivers(self):
assert _eq(self.cache.receivers(nid[1]), ["abcd"])
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Ichiro"],
"surName":["Suzuki"]}
session_info["ava"] = {"givenName": ["Ichiro"],
"surName": ["Suzuki"]}
self.cache.set(nid[1], "bcde", session_info,
not_on_or_after)
not_on_or_after)
assert _eq(self.cache.receivers(nid[1]), ["abcd", "bcde"])
assert nid_eq(self.cache.subjects(), nid[0:2])
def test_timeout(self):
not_on_or_after = str_to_time(in_a_while(seconds=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Alex"],
"surName":["Rodriguez"]}
session_info["ava"] = {"givenName": ["Alex"],
"surName": ["Rodriguez"]}
self.cache.set(nid[2], "bcde", session_info,
not_on_or_after)
not_on_or_after)
time.sleep(2)
(ava, inactive) = self.cache.get_identity(nid[2])
assert inactive == ["bcde"]
assert ava == {}

View File

@ -10,8 +10,9 @@ IDP_OTHER = "urn:mace:example.com:saml:other:idp"
nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="123456")
nida = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="abcdef")
text="abcdef")
cnid = code(nid)
cnida = code(nida)
@ -57,7 +58,7 @@ class TestPopulationMemoryBased():
info = self.population.get_info_from(nid, IDP_ONE)
assert sorted(list(info.keys())) == sorted(["not_on_or_after",
"name_id", "ava"])
assert info["name_id"] == nid
assert info["name_id"] == nid
assert info["ava"] == {'mail': 'anders.andersson@example.com',
'givenName': 'Anders',
'surName': 'Andersson'}

View File

@ -1204,7 +1204,7 @@ class TestServer2():
print(aa_policy.__dict__)
response = self.server.create_attribute_response(
IDENTITY.copy(), "aaa", "http://example.com/sp/",
"urn:mace:example.com:sp:1")
"http://www.example.com/roland/sp")
assert response is not None
assert response.destination == "http://example.com/sp/"