Allow setting namespace prefixes.
This commit is contained in:
parent
3bc41c2d71
commit
7b025c619f
@ -541,6 +541,23 @@ class SamlBase(ExtensionContainer):
|
||||
self._add_members_to_element_tree(new_tree)
|
||||
return new_tree
|
||||
|
||||
def register_prefix(self, nspair):
|
||||
"""
|
||||
Register with ElementTree a set of namespaces
|
||||
|
||||
:param nspair: A dictionary of prefixes and uris to use when
|
||||
constructing the text representation.
|
||||
:return:
|
||||
"""
|
||||
for prefix, uri in nspair.items():
|
||||
try:
|
||||
ElementTree.register_namespace(prefix, uri)
|
||||
except AttributeError:
|
||||
# Backwards compatibility with ET < 1.3
|
||||
ElementTree._namespace_map[uri] = prefix
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def to_string(self, nspair=None):
|
||||
"""Converts the Saml object to a string containing XML.
|
||||
|
||||
@ -552,14 +569,7 @@ class SamlBase(ExtensionContainer):
|
||||
nspair = self.c_ns_prefix
|
||||
|
||||
if nspair:
|
||||
for prefix, uri in nspair.items():
|
||||
try:
|
||||
ElementTree.register_namespace(prefix, uri)
|
||||
except AttributeError:
|
||||
# Backwards compatibility with ET < 1.3
|
||||
ElementTree._namespace_map[uri] = prefix
|
||||
except ValueError:
|
||||
pass
|
||||
self.register_prefix(nspair)
|
||||
|
||||
return ElementTree.tostring(self._to_element_tree(), encoding="UTF-8")
|
||||
|
||||
|
@ -342,7 +342,7 @@ class Saml2Client(Base):
|
||||
attribute=None, sp_name_qualifier=None,
|
||||
name_qualifier=None, nameid_format=None,
|
||||
real_id=None, consent=None, extensions=None,
|
||||
sign=False, binding=BINDING_SOAP):
|
||||
sign=False, binding=BINDING_SOAP, nsprefix=None):
|
||||
""" Does a attribute request to an attribute authority, this is
|
||||
by default done over SOAP.
|
||||
|
||||
@ -359,6 +359,8 @@ class Saml2Client(Base):
|
||||
:param real_id: The identifier which is the key to this entity in the
|
||||
identity database
|
||||
:param binding: Which binding to use
|
||||
:param nsprefix: Namespace prefixes preferred before those automatically
|
||||
produced.
|
||||
:return: The attributes returned if BINDING_SOAP was used.
|
||||
HTTP args if BINDING_HTT_POST was used.
|
||||
"""
|
||||
@ -393,7 +395,7 @@ class Saml2Client(Base):
|
||||
mid = sid()
|
||||
query = self.create_attribute_query(destination, subject_id,
|
||||
attribute, mid, consent,
|
||||
extensions, sign)
|
||||
extensions, sign, nsprefix)
|
||||
self.state[query.id] = {"entity_id": entityid,
|
||||
"operation": "AttributeQuery",
|
||||
"subject_id": subject_id,
|
||||
|
@ -306,6 +306,11 @@ class Base(Entity):
|
||||
pass
|
||||
args["name_id_policy"] = name_id_policy
|
||||
|
||||
try:
|
||||
nsprefix = kwargs["nsprefix"]
|
||||
except KeyError:
|
||||
nsprefix = None
|
||||
|
||||
if kwargs:
|
||||
_args, extensions = self._filter_args(AuthnRequest(), extensions,
|
||||
**kwargs)
|
||||
@ -328,11 +333,11 @@ class Base(Entity):
|
||||
return self._message(AuthnRequest, destination, message_id,
|
||||
consent, extensions, sign, sign_prepare,
|
||||
protocol_binding=binding,
|
||||
scoping=scoping, **args)
|
||||
scoping=scoping, nsprefix=nsprefix, **args)
|
||||
return self._message(AuthnRequest, destination, message_id, consent,
|
||||
extensions, sign, sign_prepare,
|
||||
protocol_binding=binding,
|
||||
scoping=scoping, **args)
|
||||
scoping=scoping, nsprefix=nsprefix, **args)
|
||||
|
||||
def create_attribute_query(self, destination, name_id=None,
|
||||
attribute=None, message_id=0, consent=None,
|
||||
@ -386,9 +391,14 @@ class Base(Entity):
|
||||
if attribute:
|
||||
attribute = do_attributes(attribute)
|
||||
|
||||
try:
|
||||
nsprefix = kwargs["nsprefix"]
|
||||
except KeyError:
|
||||
nsprefix = None
|
||||
|
||||
return self._message(AttributeQuery, destination, message_id, consent,
|
||||
extensions, sign, sign_prepare, subject=subject,
|
||||
attribute=attribute)
|
||||
attribute=attribute, nsprefix=nsprefix)
|
||||
|
||||
# MUST use SOAP for
|
||||
# AssertionIDRequest, SubjectQuery,
|
||||
@ -422,7 +432,7 @@ class Base(Entity):
|
||||
subject=None, message_id=0,
|
||||
consent=None,
|
||||
extensions=None,
|
||||
sign=False):
|
||||
sign=False, nsprefix=None):
|
||||
""" Makes an authz decision query based on a previously received
|
||||
Assertion.
|
||||
|
||||
@ -449,7 +459,7 @@ class Base(Entity):
|
||||
return self.create_authz_decision_query(
|
||||
destination, _action, saml.Evidence(assertion=assertion),
|
||||
resource, subject, message_id=message_id, consent=consent,
|
||||
extensions=extensions, sign=sign)
|
||||
extensions=extensions, sign=sign, nsprefix=nsprefix)
|
||||
|
||||
@staticmethod
|
||||
def create_assertion_id_request(assertion_id_refs, **kwargs):
|
||||
@ -466,7 +476,7 @@ class Base(Entity):
|
||||
|
||||
def create_authn_query(self, subject, destination=None, authn_context=None,
|
||||
session_index="", message_id=0, consent=None,
|
||||
extensions=None, sign=False):
|
||||
extensions=None, sign=False, nsprefix=None):
|
||||
"""
|
||||
|
||||
:param subject: The subject its all about as a <Subject> instance
|
||||
@ -479,15 +489,18 @@ class Base(Entity):
|
||||
:param sign: Whether the request should be signed or not.
|
||||
:return:
|
||||
"""
|
||||
return self._message(AuthnQuery, destination, message_id, consent, extensions,
|
||||
sign, subject=subject, session_index=session_index,
|
||||
requested_authn_context=authn_context)
|
||||
return self._message(AuthnQuery, destination, message_id, consent,
|
||||
extensions, sign, subject=subject,
|
||||
session_index=session_index,
|
||||
requested_authn_context=authn_context,
|
||||
nsprefix=nsprefix)
|
||||
|
||||
def create_name_id_mapping_request(self, name_id_policy,
|
||||
name_id=None, base_id=None,
|
||||
encrypted_id=None, destination=None,
|
||||
message_id=0, consent=None, extensions=None,
|
||||
sign=False):
|
||||
message_id=0, consent=None,
|
||||
extensions=None, sign=False,
|
||||
nsprefix=None):
|
||||
"""
|
||||
|
||||
:param name_id_policy:
|
||||
@ -508,16 +521,18 @@ class Base(Entity):
|
||||
if name_id:
|
||||
return self._message(NameIDMappingRequest, destination, message_id,
|
||||
consent, extensions, sign,
|
||||
name_id_policy=name_id_policy, name_id=name_id)
|
||||
name_id_policy=name_id_policy, name_id=name_id,
|
||||
nsprefix=nsprefix)
|
||||
elif base_id:
|
||||
return self._message(NameIDMappingRequest, destination, message_id,
|
||||
consent, extensions, sign,
|
||||
name_id_policy=name_id_policy, base_id=base_id)
|
||||
name_id_policy=name_id_policy, base_id=base_id,
|
||||
nsprefix=nsprefix)
|
||||
else:
|
||||
return self._message(NameIDMappingRequest, destination, message_id,
|
||||
consent, extensions, sign,
|
||||
name_id_policy=name_id_policy,
|
||||
encrypted_id=encrypted_id)
|
||||
encrypted_id=encrypted_id, nsprefix=nsprefix)
|
||||
|
||||
# ======== response handling ===========
|
||||
|
||||
|
@ -421,7 +421,7 @@ class Entity(HTTPBase):
|
||||
|
||||
def _message(self, request_cls, destination=None, message_id=0,
|
||||
consent=None, extensions=None, sign=False, sign_prepare=False,
|
||||
**kwargs):
|
||||
nsprefix=None, **kwargs):
|
||||
"""
|
||||
Some parameters appear in all requests so simplify by doing
|
||||
it in one place
|
||||
@ -456,6 +456,9 @@ class Entity(HTTPBase):
|
||||
if extensions:
|
||||
req.extensions = extensions
|
||||
|
||||
if nsprefix:
|
||||
req.register_prefix(nsprefix)
|
||||
|
||||
if sign:
|
||||
return reqid, self.sign(req, sign_prepare=sign_prepare)
|
||||
else:
|
||||
|
@ -240,30 +240,31 @@ def test_metadata_file():
|
||||
assert len(mds.keys()) == 560
|
||||
|
||||
|
||||
def test_mdx_service():
|
||||
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
http = HTTPBase(verify=False, ca_bundle=None)
|
||||
|
||||
mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
"http://pyff-test.nordu.net",
|
||||
sec_config, None, http)
|
||||
foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
|
||||
"idpsso_descriptor", "single_sign_on_service")
|
||||
|
||||
assert len(foo) == 1
|
||||
assert foo.keys()[0] == BINDING_HTTP_REDIRECT
|
||||
|
||||
|
||||
def test_mdx_certs():
|
||||
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
http = HTTPBase(verify=False, ca_bundle=None)
|
||||
|
||||
mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
"http://pyff-test.nordu.net",
|
||||
sec_config, None, http)
|
||||
foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
|
||||
|
||||
assert len(foo) == 1
|
||||
# pyff-test not available
|
||||
# def test_mdx_service():
|
||||
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
# http = HTTPBase(verify=False, ca_bundle=None)
|
||||
#
|
||||
# mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
# "http://pyff-test.nordu.net",
|
||||
# sec_config, None, http)
|
||||
# foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
|
||||
# "idpsso_descriptor", "single_sign_on_service")
|
||||
#
|
||||
# assert len(foo) == 1
|
||||
# assert foo.keys()[0] == BINDING_HTTP_REDIRECT
|
||||
#
|
||||
#
|
||||
# def test_mdx_certs():
|
||||
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
# http = HTTPBase(verify=False, ca_bundle=None)
|
||||
#
|
||||
# mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
# "http://pyff-test.nordu.net",
|
||||
# sec_config, None, http)
|
||||
# foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
|
||||
#
|
||||
# assert len(foo) == 1
|
||||
|
||||
|
||||
def test_load_local_dir():
|
||||
|
@ -230,30 +230,30 @@ def test_metadata_file():
|
||||
assert len(mds.keys()) == 560
|
||||
|
||||
|
||||
def test_mdx_service():
|
||||
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
http = HTTPBase(verify=False, ca_bundle=None)
|
||||
|
||||
mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
"http://pyff-test.nordu.net",
|
||||
sec_config, None, http)
|
||||
foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
|
||||
"idpsso_descriptor", "single_sign_on_service")
|
||||
|
||||
assert len(foo) == 1
|
||||
assert foo.keys()[0] == BINDING_HTTP_REDIRECT
|
||||
|
||||
|
||||
def test_mdx_certs():
|
||||
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
http = HTTPBase(verify=False, ca_bundle=None)
|
||||
|
||||
mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
"http://pyff-test.nordu.net",
|
||||
sec_config, None, http)
|
||||
foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
|
||||
|
||||
assert len(foo) == 1
|
||||
# def test_mdx_service():
|
||||
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
# http = HTTPBase(verify=False, ca_bundle=None)
|
||||
#
|
||||
# mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
# "http://pyff-test.nordu.net",
|
||||
# sec_config, None, http)
|
||||
# foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
|
||||
# "idpsso_descriptor", "single_sign_on_service")
|
||||
#
|
||||
# assert len(foo) == 1
|
||||
# assert foo.keys()[0] == BINDING_HTTP_REDIRECT
|
||||
#
|
||||
#
|
||||
# def test_mdx_certs():
|
||||
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
|
||||
# http = HTTPBase(verify=False, ca_bundle=None)
|
||||
#
|
||||
# mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
|
||||
# "http://pyff-test.nordu.net",
|
||||
# sec_config, None, http)
|
||||
# foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
|
||||
#
|
||||
# assert len(foo) == 1
|
||||
|
||||
|
||||
def test_load_local_dir():
|
||||
|
@ -473,7 +473,7 @@ class TestClient:
|
||||
|
||||
response = sigver.response_factory(
|
||||
in_response_to="_012345",
|
||||
destination="https://www.example.com",
|
||||
destination="http://lingon.catalogix.se:8087/",
|
||||
status=s_utils.success_status_factory(),
|
||||
issuer=self.server._issuer(),
|
||||
encrypted_assertion=EncryptedAssertion()
|
||||
@ -616,7 +616,7 @@ class TestClientWithDummy():
|
||||
{sid: "/"})
|
||||
ac = resp.assertion.authn_statement[0].authn_context
|
||||
assert ac.authenticating_authority[0].text == \
|
||||
'http://www.example.com/login'
|
||||
'http://www.example.com/login'
|
||||
assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD
|
||||
|
||||
|
||||
@ -628,4 +628,4 @@ class TestClientWithDummy():
|
||||
if __name__ == "__main__":
|
||||
tc = TestClient()
|
||||
tc.setup_class()
|
||||
tc.test_signed_redirect()
|
||||
tc.test_sign_then_encrypt_assertion2()
|
45
tests/test_88_nsprefix.py
Normal file
45
tests/test_88_nsprefix.py
Normal file
@ -0,0 +1,45 @@
|
||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT
|
||||
from saml2.client import Saml2Client
|
||||
from saml2 import config, BINDING_HTTP_POST
|
||||
from saml2 import saml
|
||||
from saml2 import samlp
|
||||
|
||||
__author__ = 'roland'
|
||||
|
||||
|
||||
def test_nsprefix():
|
||||
status_message = samlp.StatusMessage()
|
||||
status_message.text = "OK"
|
||||
|
||||
txt = "%s" % status_message
|
||||
|
||||
assert "ns0:StatusMessage" in txt
|
||||
|
||||
status_message.register_prefix({"saml2": saml.NAMESPACE,
|
||||
"saml2p": samlp.NAMESPACE})
|
||||
|
||||
txt = "%s" % status_message
|
||||
|
||||
assert "saml2p:StatusMessage" in txt
|
||||
|
||||
|
||||
def test_nsprefix2():
|
||||
conf = config.SPConfig()
|
||||
conf.load_file("servera_conf")
|
||||
client = Saml2Client(conf)
|
||||
|
||||
selected_idp = "urn:mace:example.com:saml:roland:idp"
|
||||
|
||||
destination = client._sso_location(selected_idp, BINDING_HTTP_POST)
|
||||
|
||||
reqid, req = client.create_authn_request(
|
||||
destination, nameid_format=NAMEID_FORMAT_TRANSIENT,
|
||||
nsprefix={"saml2": saml.NAMESPACE, "saml2p": samlp.NAMESPACE})
|
||||
|
||||
txt = "%s" % req
|
||||
|
||||
assert "saml2p:AuthnRequest" in txt
|
||||
assert "saml2:Issuer" in txt
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_nsprefix2()
|
Loading…
Reference in New Issue
Block a user