All tests works now

This commit is contained in:
Roland Hedberg
2012-11-14 13:36:08 +01:00
parent cc8e91e84d
commit 74cf8659e1
25 changed files with 838 additions and 837 deletions

View File

@@ -18,7 +18,7 @@ Sure you can send a AuthenticationRequest to an IdentityProvider or a
AttributeQuery to an AttributeAuthority but in order to get what they
return you have to sit behind a Web server. Well that is not really true since
the AttributeQuery would be over SOAP and you would get the result over the
conenction you have to the AttributeAuthority.
connection you have to the AttributeAuthority.
But anyway, you may get my point. This is middleware stuff !

View File

@@ -67,7 +67,7 @@ def filter_on_attributes(ava, required=None, optional=None):
:param ava: An attribute value assertion as a dictionary
:param required: list of RequestedAttribute instances defined to be
required
:param optional: list of RequestedAttribute instances defined to be
:param optional: list of RequestedAttribute instances defined to be
optional
:return: The modified attribute value assertion
"""
@@ -79,13 +79,15 @@ def filter_on_attributes(ava, required=None, optional=None):
for attr in required:
if attr.friendly_name in ava:
values = [av.text for av in attr.attribute_value]
res[attr.friendly_name] = _filter_values(ava[attr.friendly_name], values, True)
res[attr.friendly_name] = _filter_values(ava[attr.friendly_name],
values, True)
elif attr.name in ava:
values = [av.text for av in attr.attribute_value]
res[attr.name] = _filter_values(ava[attr.name], values, True)
else:
_name = attr.friendly_name or attr.name
print >> sys.stderr, ava.keys()
raise MissingValue("Required attribute missing: '%s'" % (attr.friendly_name,))
raise MissingValue("Required attribute missing: '%s'" % (_name,))
if optional is None:
optional = []
@@ -94,9 +96,11 @@ def filter_on_attributes(ava, required=None, optional=None):
if attr.friendly_name in ava:
values = [av.text for av in attr.attribute_value]
try:
res[attr.friendly_name].extend(_filter_values(ava[attr.friendly_name], values))
res[attr.friendly_name].extend(_filter_values(ava[attr.friendly_name],
values))
except KeyError:
res[attr.friendly_name] = _filter_values(ava[attr.friendly_name], values)
res[attr.friendly_name] = _filter_values(ava[attr.friendly_name],
values)
elif attr.name in ava:
values = [av.text for av in attr.attribute_value]
try:
@@ -379,8 +383,7 @@ class Policy(object):
If the requirements can't be met an exception is raised.
"""
if metadata:
(required, optional) = metadata.attribute_consumer(sp_entity_id)
#(required, optional) = metadata.wants(sp_entity_id)
(required, optional) = metadata.attribute_requirement(sp_entity_id)
else:
required = optional = None

View File

@@ -20,7 +20,7 @@ to conclude its tasks.
"""
import saml2
from saml2.saml import AssertionIDRef
from saml2.saml import AssertionIDRef, NAMEID_FORMAT_PERSISTENT
try:
from urlparse import parse_qs
@@ -51,6 +51,7 @@ class Saml2Client(Base):
def do_authenticate(self, entityid=None, relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT, vorg="",
nameid_format=NAMEID_FORMAT_PERSISTENT,
scoping=None, consent=None, extensions=None, sign=None):
""" Makes an authentication request.
@@ -68,11 +69,10 @@ class Saml2Client(Base):
location = self._sso_location(entityid, binding)
session_id, _req_str = "%s" % self.create_authn_request(location, vorg,
scoping,
consent,
extensions,
sign)
req = self.create_authn_request(location, vorg, scoping, binding,
nameid_format, consent, extensions,
sign)
_req_str = "%s" % req
logger.info("AuthNReq: %s" % _req_str)
@@ -90,7 +90,7 @@ class Saml2Client(Base):
else:
raise Exception("Unknown binding type: %s" % binding)
return session_id, response
return response
def global_logout(self, subject_id, reason="", expire=None, sign=None,
return_to="/"):
@@ -146,8 +146,9 @@ class Saml2Client(Base):
destination = destinations[0]
logger.info("destination to provider: %s" % destination)
request = self.create_logout_request(subject_id, destination,
entity_id, reason, expire)
request = self.create_logout_request(subject_id,
destination, entity_id,
reason, expire)
to_sign = []
#if sign and binding != BINDING_HTTP_REDIRECT:
@@ -311,8 +312,13 @@ class Saml2Client(Base):
def _soap_query_response(self, destination, query_type, **kwargs):
_create_func = getattr(self, "create_%s" % query_type)
_response_func = getattr(self, "%s_response" % query_type)
try:
response_args = kwargs["response_args"]
del kwargs["response_args"]
except KeyError:
response_args = None
id, query = _create_func(destination, **kwargs)
query = _create_func(destination, **kwargs)
response = send_using_soap(query, destination,
self.config.key_file,
@@ -321,8 +327,8 @@ class Saml2Client(Base):
if response:
logger.info("Verifying response")
if "response_args" in kwargs:
response = _response_func(response, **kwargs["response_args"])
if response_args:
response = _response_func(response, **response_args)
else:
response = _response_func(response)
@@ -387,7 +393,7 @@ class Saml2Client(Base):
consent=consent, extensions=extensions,
sign=sign)
def do_attribute_query(self, subject_id, entityid,
def do_attribute_query(self, entityid, subject_id,
attribute=None, sp_name_qualifier=None,
name_qualifier=None, nameid_format=None,
real_id=None, consent=None, extensions=None,
@@ -396,8 +402,8 @@ class Saml2Client(Base):
by default done over SOAP. Other bindings could be used but not
supported right now.
:param subject_id: The identifier of the subject
:param entityid: To whom the query should be sent
:param subject_id: The identifier of the subject
:param attribute: A dictionary of attributes and values that is asked for
:param sp_name_qualifier: The unique identifier of the
service provider or affiliation of providers for whom the

View File

@@ -18,8 +18,7 @@
"""Contains classes and functions that a SAML2.0 Service Provider (SP) may use
to conclude its tasks.
"""
from random import random
from saml2.saml import AssertionIDRef
from saml2.saml import AssertionIDRef, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import AuthnQuery, LogoutRequest, AssertionIDRequest
from saml2.samlp import AttributeQuery
from saml2.samlp import AuthzDecisionQuery
@@ -35,7 +34,7 @@ except ImportError:
from cgi import parse_qs
from saml2.time_util import instant
from saml2.s_utils import signature
from saml2.s_utils import signature, rndstr
from saml2.s_utils import sid
from saml2.s_utils import do_attributes
from saml2.s_utils import decode_base64_and_inflate
@@ -123,23 +122,17 @@ class Base(object):
else:
self.vorg = None
if "allow_unsolicited" in self.config:
self.allow_unsolicited = self.config.allow_unsolicited
else:
self.allow_unsolicited = False
if getattr(self.config, 'authn_requests_signed', 'false') == 'true':
self.authn_requests_signed_default = True
else:
self.authn_requests_signed_default = False
if getattr(self.config, 'logout_requests_signed', 'false') == 'true':
self.logout_requests_signed_default = True
else:
self.logout_requests_signed_default = False
for foo in ["allow_unsolicited", "authn_requests_signed",
"logout_requests_signed"]:
if self.config.getattr("sp", foo) == 'true':
setattr(self, foo, True)
else:
setattr(self, foo, False)
# extra randomness
self.seed = random()
self.seed = rndstr(32)
self.logout_requests_signed_default = True
self.allow_unsolicited = self.config.getattr("allow_unsolicited", "sp")
#
# Private methods
@@ -223,7 +216,7 @@ class Base(object):
return True
def service_url(self, binding=BINDING_HTTP_POST):
_res = self.config.endpoint("assertion_consumer_service", binding)
_res = self.config.endpoint("assertion_consumer_service", binding, "sp")
if _res:
return _res[0]
else:
@@ -266,23 +259,23 @@ class Base(object):
logger.info("REQUEST: %s" % req)
return id, signed_instance_factory(req, self.sec, to_sign)
return signed_instance_factory(req, self.sec, to_sign)
def create_authn_request(self, destination, id=0, vorg="", scoping=None,
def create_authn_request(self, destination, vorg="", scoping=None,
binding=saml2.BINDING_HTTP_POST,
nameid_format=saml.NAMEID_FORMAT_TRANSIENT,
nameid_format=NAMEID_FORMAT_TRANSIENT,
service_url_binding=None,
consent=None, extensions=None, sign=None):
id=0, consent=None, extensions=None, sign=None):
""" Creates an authentication request.
:param destination: Where the request should be sent.
:param id: The identifier for this request
:param vorg: The virtual organization the service belongs to.
:param scoping: The scope of the request
:param binding: The protocol to use for the Response !!
:param nameid_format: Format of the NameID
:param service_url_binding: Where the reply should be sent dependent
on reply binding.
:param id: The identifier for this request
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
:param sign: Whether the request should be signed or not.
@@ -301,11 +294,12 @@ class Base(object):
my_name = self._my_name()
# Profile stuff, should be configurable
if nameid_format == saml.NAMEID_FORMAT_TRANSIENT:
if nameid_format is None or nameid_format == NAMEID_FORMAT_TRANSIENT:
name_id_policy = samlp.NameIDPolicy(allow_create="true",
format=nameid_format)
format=NAMEID_FORMAT_TRANSIENT)
else:
name_id_policy = samlp.NameIDPolicy(format=nameid_format)
name_id_policy = samlp.NameIDPolicy(allow_create="false",
format=nameid_format)
if vorg:
try:
@@ -317,20 +311,20 @@ class Base(object):
return self._message(AuthnRequest, destination, id, consent,
extensions, sign,
assertion_consumer_service_url=service_url,
protocol_binding= binding,
name_id_policy = name_id_policy,
provider_name = my_name,
scoping = scoping)
protocol_binding=binding,
name_id_policy=name_id_policy,
provider_name=my_name,
scoping=scoping)
def create_attribute_query(self, destination, id, subject_id,
def create_attribute_query(self, destination, subject_id,
attribute=None, sp_name_qualifier=None,
name_qualifier=None, nameid_format=None,
consent=None, extensions=None, sign=False):
id=0, consent=None, extensions=None, sign=False,
**kwargs):
""" Constructs an AttributeQuery
:param destination: To whom the query should be sent
:param id: The identifier of the session
:param subject_id: The identifier of the subject
:param attribute: A dictionary of attributes and values that is
asked for. The key are one of 4 variants:
@@ -344,6 +338,7 @@ class Base(object):
:param name_qualifier: The unique identifier of the identity
provider that generated the identifier.
:param nameid_format: The format of the name ID
:param id: The identifier of the session
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
:param sign: Whether the query should be signed or not.
@@ -365,13 +360,12 @@ class Base(object):
attribute=attribute)
def create_logout_request(self, destination, id, subject_id,
issuer_entity_id, reason=None, expire=None,
consent=None, extensions=None, sign=False):
def create_logout_request(self, destination, subject_id, issuer_entity_id,
reason=None, expire=None,
id=0, consent=None, extensions=None, sign=False):
""" Constructs a LogoutRequest
:param destination: Destination of the request
:param id: Request identifier
:param subject_id: The identifier of the subject
:param issuer_entity_id: The entity ID of the IdP the request is
target at.
@@ -379,6 +373,7 @@ class Base(object):
form of a URI reference.
:param expire: The time at which the request expires,
after which the recipient may discard the message.
:param id: Request identifier
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
:param sign: Whether the query should be signed or not.
@@ -419,21 +414,21 @@ class Base(object):
# AssertionIDRequest, SubjectQuery,
# AuthnQuery, AttributeQuery, or AuthzDecisionQuery
def create_authz_decision_query(self, destination, action, id=0,
def create_authz_decision_query(self, destination, action,
evidence=None, resource=None, subject=None,
sign=None, consent=None,
extensions=None):
id=0, consent=None, extensions=None,
sign=None):
""" Creates an authz decision query.
:param destination: The IdP endpoint
:param action: The action you want to perform (has to be at least one)
:param id: Message identifier
:param evidence: Why you should be able to perform the action
:param resource: The resource you want to perform the action on
:param subject: Who wants to do the thing
:param sign: Whether the request should be signed or not.
:param id: Message identifier
:param consent: If the principal gave her consent to this request
:param extensions: Possible request extensions
:param sign: Whether the request should be signed or not.
:return: AuthzDecisionQuery instance
"""
@@ -443,17 +438,20 @@ class Base(object):
def create_authz_decision_query_using_assertion(self, destination, assertion,
action=None, resource=None,
subject=None,
binding=BINDING_HTTP_REDIRECT,
subject=None, id=0,
consent=None,
extensions=None,
sign=False):
""" Makes an authz decision query.
:param destination: The IdP endpoint to send the request to
:param assertion:
:param action:
:param resource:
:param subject:
:param binding: Which binding to use for sending the request
:param assertion: An Assertion instance
:param action: The action you want to perform (has to be at least one)
:param resource: The resource you want to perform the action on
:param subject: Who wants to do the thing
:param id: Message identifier
:param consent: If the principal gave her consent to this request
:param extensions: Possible request extensions
:param sign: Whether the request should be signed or not.
:return: AuthzDecisionQuery instance
"""
@@ -469,24 +467,46 @@ class Base(object):
return self.create_authz_decision_query(destination,
_action,
saml.Evidence(assertion=assertion),
resource, subject, binding,
sign)
resource, subject,
id=id,
consent=consent,
extensions=extensions,
sign=sign)
def create_assertion_id_request(self, assertion_id_refs, destination=None,
id=0, consent=None, extensions=None,
sign=False):
"""
:param assertion_id_refs:
:param destination: The IdP endpoint to send the request to
:param id: Message identifier
:param consent: If the principal gave her consent to this request
:param extensions: Possible request extensions
:param sign: Whether the request should be signed or not.
:return: AssertionIDRequest instance
"""
id_refs = [AssertionIDRef(text=s) for s in assertion_id_refs]
return self._message(AssertionIDRequest, destination, id, consent,
extensions, sign, assertion_id_refs=id_refs )
def create_authn_query(self, subject, destination=None, id=0,
def create_authn_query(self, subject, destination=None,
authn_context=None, session_index="",
consent=None, sign=False,
extensions=None):
id=0, consent=None, extensions=None, sign=False):
"""
:param subject:
:param destination: The IdP endpoint to send the request to
:param authn_context:
:param session_index:
:param id: Message identifier
:param consent: If the principal gave her consent to this request
:param extensions: Possible request extensions
:param sign: Whether the request should be signed or not.
:return:
"""
return self._message(AuthnQuery, destination, id, consent, extensions,
sign, subject=subject, session_index=session_index,
requested_auth_context=authn_context)

View File

@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
COMMON_ARGS = ["entityid", "xmlsec_binary", "debug", "key_file", "cert_file",
"secret", "accepted_time_diff", "name", "ca_certs",
"description",
"description", "valid_for",
"organization",
"contact_person",
"name_form",
@@ -97,48 +97,61 @@ class Config(object):
def_context = ""
def __init__(self):
self._attr = {"": {}, "sp": {}, "idp": {}, "aa": {}, "pdp": {}}
self.entityid = None
self.xmlsec_binary= None
self.debug=False
self.key_file=None
self.cert_file=None
self.secret=None
self.accepted_time_diff=None
self.name=None
self.ca_certs=None
self.description=None
self.valid_for=None
self.organization=None
self.contact_person=None
self.name_form=None
self.virtual_organization=None
self.logger=None
self.only_use_keys_in_metadata=None
self.logout_requests_signed=None
self.disable_ssl_certificate_validation=None
self.context = ""
self.attribute_converters=None
self.metadata=None
self.policy=None
self.serves = []
def serves(self):
return [t for t in ["sp", "idp", "aa", "pdp"] if self._attr[t]]
def copy_into(self, typ=""):
if typ == "sp":
copy = SPConfig()
elif typ in ["idp", "aa"]:
copy = IdPConfig()
else:
copy = Config()
copy.context = typ
copy._attr = self._attr.copy()
return copy
def __getattribute__(self, item):
if item == "context":
return object.__getattribute__(self, item)
_context = self.context
if item in ALL:
try:
return self._attr[_context][item]
except KeyError:
if _context:
try:
return self._attr[""][item]
except KeyError:
pass
return None
else:
return object.__getattribute__(self, item)
# def copy_into(self, typ=""):
# if typ == "sp":
# copy = SPConfig()
# elif typ in ["idp", "aa"]:
# copy = IdPConfig()
# else:
# copy = Config()
# copy.context = typ
# copy._attr = self._attr.copy()
# return copy
def setattr(self, context, attr, val):
self._attr[context][attr] = val
if context == "":
setattr(self, attr, val)
else:
setattr(self, "_%s_%s" % (context,attr), val)
def getattr(self, attr, context=None):
if context is None:
context = self.context
if context == "":
return getattr(self, attr, None)
else:
return getattr(self, "_%s_%s" % (context,attr), None)
def load_special(self, cnf, typ, metadata_construction=False):
for arg in SPEC[typ]:
try:
self._attr[typ][arg] = cnf[arg]
self.setattr(typ, arg, cnf[arg])
except KeyError:
pass
@@ -147,9 +160,8 @@ class Config(object):
self.context = self.def_context
def load_complex(self, cnf, typ="", metadata_construction=False):
_attr_typ = self._attr[typ]
try:
_attr_typ["policy"] = Policy(cnf["policy"])
self.setattr(typ, "policy", Policy(cnf["policy"]))
except KeyError:
pass
@@ -162,16 +174,20 @@ class Config(object):
if not acs:
raise Exception(("No attribute converters, ",
"something is wrong!!"))
try:
_attr_typ["attribute_converters"].extend(acs)
except KeyError:
_attr_typ["attribute_converters"] = acs
_acs = self.getattr("attribute_converters", typ)
if _acs:
_acs.extend(acs)
else:
self.setattr(typ, "attribute_converters", acs)
except KeyError:
pass
if not metadata_construction:
try:
_attr_typ["metadata"] = self.load_metadata(cnf["metadata"])
self.setattr(typ, "metadata",
self.load_metadata(cnf["metadata"]))
except KeyError:
pass
@@ -185,7 +201,7 @@ class Config(object):
"""
for arg in COMMON_ARGS:
try:
self._attr[""][arg] = cnf[arg]
setattr(self, arg, cnf[arg])
except KeyError:
pass
@@ -194,19 +210,19 @@ class Config(object):
try:
self.load_special(cnf["service"][typ], typ,
metadata_construction=metadata_construction)
self.serves.append(typ)
except KeyError:
pass
if not metadata_construction:
if "xmlsec_binary" not in self._attr[""]:
self._attr[""]["xmlsec_binary"] = get_xmlsec_binary()
if not self.xmlsec_binary:
self.xmlsec_binary = get_xmlsec_binary()
# verify that xmlsec is where it's supposed to be
if not os.path.exists(self._attr[""]["xmlsec_binary"]):
if not os.path.exists(self.xmlsec_binary):
#if not os.access(, os.F_OK):
raise Exception("xmlsec binary not in '%s' !" % (
self._attr[""]["xmlsec_binary"]))
self.xmlsec_binary))
self.load_complex(cnf, metadata_construction=metadata_construction)
self.context = self.def_context
@@ -258,7 +274,7 @@ class Config(object):
metad.import_external_metadata(spec["url"], cert)
return metad
def endpoint(self, service, binding=None):
def endpoint(self, service, binding=None, context=None):
""" Goes through the list of endpoint specifications for the
given type of service and returnes the first endpoint that matches
the given binding. If no binding is given any endpoint for that
@@ -270,13 +286,15 @@ class Config(object):
"""
spec = []
unspec = []
for endpspec in self.endpoints[service]:
try:
endp, bind = endpspec
if binding is None or bind == binding:
spec.append(endp)
except ValueError:
unspec.append(endpspec)
endps = self.getattr("endpoints")
if endps and service in endps:
for endpspec in endps[service]:
try:
endp, bind = endpspec
if binding is None or bind == binding:
spec.append(endp)
except ValueError:
unspec.append(endpspec)
if spec:
return spec
@@ -327,9 +345,8 @@ class Config(object):
if root_logger.level != logging.NOTSET: # Someone got there before me
return root_logger
try:
_logconf = self._attr[""]["logger"]
except KeyError:
_logconf = self.logger
if _logconf is None:
return root_logger
try:
@@ -340,21 +357,7 @@ class Config(object):
root_logger.addHandler(self.log_handler())
root_logger.info("Logging started")
return root_logger
def keys(self):
keys = []
for dir in ["", "sp", "idp", "aa"]:
keys.extend(self._attr[dir].keys())
return list(set(keys))
def __contains__(self, item):
for dir in ["", "sp", "idp", "aa"]:
if item in self._attr[dir]:
return True
return False
class SPConfig(Config):
def_context = "sp"
@@ -393,16 +396,20 @@ class SPConfig(Config):
"""
res = []
if self.aa is None or entity_id in self.aa:
for aad in self.metadata.attribute_authority(entity_id):
for attrserv in aad.attribute_service:
if attrserv.binding == binding:
res.append(attrserv)
aa_eid = self.getattr("entity_id")
if aa_eid:
if entity_id in aa_eid:
for aad in self.metadata.attribute_authority(entity_id):
for attrserv in aad.attribute_service:
if attrserv.binding == binding:
res.append(attrserv)
else:
return self.metadata.attribute_authority()
return res
def idps(self, langpref=None):
""" Returns a dictionary of usefull IdPs, the keys being the
""" Returns a dictionary of useful IdPs, the keys being the
entity ID of the service and the names of the services as values
:param langpref: The preferred languages of the name, the first match
@@ -411,12 +418,14 @@ class SPConfig(Config):
"""
if langpref is None:
langpref = ["en"]
if self.idp:
eidp = self.getattr("entity_id")
if eidp:
return dict([(e, nd[0]) for (e,
nd) in self.metadata.idps(langpref).items() if e in self.idp])
nd) in self.metadata.idps(langpref).items() if e in eidp])
else:
return self.metadata.idps()
return dict([(e, nd[0]) for (e,
nd) in self.metadata.idps(langpref).items()])
def vo_conf(self, vo_name):
try:
@@ -431,8 +440,9 @@ class SPConfig(Config):
:param ipaddress: The IP address of the user client
:return: IdP entity ID or None
"""
if "ecp" in self._attr["sp"]:
for key, eid in self._attr["sp"]["ecp"].items():
_ecp = self.getattr("ecp")
if _ecp:
for key, eid in _ecp.items():
if re.match(key, ipaddress):
return eid

View File

@@ -32,7 +32,6 @@ from saml2.profile import ecp
from saml2.server import Server
from saml2.schema import soapenv
from saml2.s_utils import sid
from saml2.response import authn_response
@@ -115,9 +114,9 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
logger.info("entityid: %s, binding: %s" % (entityid, BINDING_SOAP))
location = cls._sso_location(entityid, binding=BINDING_SOAP)
session_id = sid()
authn_req = cls.authn(location, session_id, binding=BINDING_PAOS,
service_url_binding=BINDING_PAOS)
authn_req = cls.create_authn_request(location,
binding=BINDING_PAOS,
service_url_binding=BINDING_PAOS)
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(authn_req)]
@@ -128,7 +127,7 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
soap_envelope = soapenv.Envelope(header=header, body=body)
return session_id, "%s" % soap_envelope
return authn_req.id, "%s" % soap_envelope
def handle_ecp_authn_response(cls, soap_message, outstanding=None):

View File

@@ -51,7 +51,6 @@ from saml2.saml import NAME_FORMAT_URI
from saml2.time_util import in_a_while
from saml2.time_util import valid
from saml2.attribute_converter import from_local_name
from saml2.attribute_converter import ava_fro
from saml2.sigver import pre_signature_part
from saml2.sigver import make_temp, cert_from_key_info, verify_signature
from saml2.sigver import pem_format
@@ -104,7 +103,6 @@ class MetaData(object):
self.http = httplib2.Http(ca_certs=ca_certs,
disable_ssl_certificate_validation=disable_ssl_certificate_validation)
self._import = {}
self._wants = {}
self._keys = {}
self._extension_modules = metadata_extension_modules()
self.post_load_process = post_load_process
@@ -152,7 +150,7 @@ class MetaData(object):
except KeyError:
self._loc_key[ident] = {use: certs}
def _vo_metadata(self, entity_descr, entity, tag):
def _affiliation(self, entity_descr, entity, tag):
"""
Pick out the Affiliation descriptors from an entity
descriptor and store the information in a way which is easily
@@ -169,61 +167,26 @@ class MetaData(object):
if members:
entity[tag] = members
def _sp_metadata(self, entity_descr, entity, tag):
afd._certs = self._certs(afd.key_descriptor, "pem")
self._add_certs(entity_descr.entity_id, afd._certs)
def _spsso(self, dp, entity_descr):
"""
Pick out the SP SSO descriptors from an entity
descriptor and store the information in a way which is easily
accessible.
:param entity_descr: A EntityDescriptor instance
:param dp:
:param entity_descr
:return:
"""
try:
ssd = entity_descr.spsso_descriptor
except AttributeError:
return
ssds = []
required = []
optional = []
#print "..... %s ..... " % entity_descriptor.entity_id
for tssd in ssd:
# Only want to talk to SAML 2.0 entities
if samlp.NAMESPACE not in \
tssd.protocol_support_enumeration.split(" "):
#print "<<<", idp.protocol_support_enumeration
continue
ssds.append(tssd)
certs = self._certs(tssd.key_descriptor, "pem")
self._add_certs(entity_descr.entity_id, certs)
dp = self._role(dp, entity_descr)
self._extensions(tssd)
if dp._certs:
for acs in dp.assertion_consumer_service:
self._add_certs(acs.location, dp._certs)
for acs in tssd.attribute_consuming_service:
for attr in acs.requested_attribute:
#print "==", attr
if attr.is_required == "true":
required.append(attr)
else:
optional.append(attr)
for acs in tssd.assertion_consumer_service:
self._add_certs(acs.location, certs)
return dp
if required or optional:
#print "REQ",required
#print "OPT",optional
self._wants[entity_descr.entity_id] = (ava_fro(self.attrconv,
required),
ava_fro(self.attrconv,
optional))
if ssds:
entity[tag] = ssds
def _idp_metadata(self, entity_descr, entity, tag):
def _idpsso(self, dp, entity_descr):
"""
Pick out the IdP SSO descriptors from an entity
descriptor and store the information in a way which is easily
@@ -231,32 +194,17 @@ class MetaData(object):
:param entity_descr: A EntityDescriptor instance
"""
try:
isd = entity_descr.idpsso_descriptor
except AttributeError:
return
idps = []
for tidp in isd:
if samlp.NAMESPACE not in \
tidp.protocol_support_enumeration.split(" "):
#print "<<<", idp.protocol_support_enumeration
continue
idps.append(tidp)
certs = self._certs(tidp.key_descriptor, "pem")
dp = self._role(dp, entity_descr)
if dp._certs:
for sso in dp.single_sign_on_service:
self._add_certs(sso.location, dp._certs)
self._add_certs(entity_descr.entity_id, certs)
for sso in tidp.single_sign_on_service:
self._add_certs(sso.location, certs)
self._extensions(dp)
self._extensions(tidp)
return dp
if idps:
entity[tag] = idps
def _aad_metadata(self, entity_descr, entity, tag):
def _attribute_authority(self, dp, entity_descr):
"""
Pick out the attribute authority descriptors from an entity
descriptor and store the information in a way which is easily
@@ -264,105 +212,107 @@ class MetaData(object):
:param entity_descr: A EntityDescriptor instance
"""
try:
attr_auth_descr = entity_descr.attribute_authority_descriptor
except AttributeError:
#print "No Attribute AD: %s" % entity_descr.entity_id
return
aads = []
for taad in attr_auth_descr:
# Remove everyone that doesn't talk SAML 2.0
#print "supported protocols", taad.protocol_support_enumeration
if samlp.NAMESPACE not in \
taad.protocol_support_enumeration.split(" "):
continue
# remove the bindings I can't handle
aserv = []
for attr_serv in taad.attribute_service:
#print "binding", attr_serv.binding
if attr_serv.binding == BINDING_SOAP:
aserv.append(attr_serv)
if not aserv:
continue
taad.attribute_service = aserv
self._extensions(taad)
# gather all the certs and place them in temporary files
certs = self._certs(taad.key_descriptor, "pem")
self._add_certs(entity_descr.entity_id, certs)
# remove the bindings I can't handle
aserv = []
for attr_serv in dp.attribute_service:
#print "binding", attr_serv.binding
if attr_serv.binding == BINDING_SOAP:
aserv.append(attr_serv)
for sso in taad.attribute_service:
self._add_certs(sso.location, certs)
aads.append(taad)
if aads:
entity[tag] = aads
def _pdp_metadata(self, entity_descr, entity, tag):
if not aserv:
return None
dp.attribute_service = aserv
dp = self._role(dp, entity_descr)
if dp._certs:
for attr_serv in dp.attribute_service:
self._add_certs(attr_serv.location, dp._certs)
return dp
def _pdp(self, dp, entity_descr):
aserv = []
for authz_serv in dp.authz_service:
#print "binding", attr_serv.binding
if authz_serv.binding == BINDING_SOAP:
aserv.append(authz_serv)
if not aserv:
return None
dp.authz_service = aserv
dp = self._role(dp, entity_descr)
if dp._certs:
for aus in dp.authz_service:
self._add_certs(aus.location, dp._certs)
return dp
def _authn_authority(self, dp, entity_descr):
"""
Pick out the PDP descriptors from an entity
AuthnAuthorityDescriptor
:return:
"""
return self._role(dp, entity_descr)
def _role(self, dp, entity_descr):
"""
RoleDescriptor
:return:
"""
self._extensions(dp)
# gather all the certs and place them in temporary files
dp._certs = self._certs(dp.key_descriptor, "pem")
self._add_certs(entity_descr.entity_id, dp._certs)
return dp
def _roledescriptor(self, entity_descr, entity, tag, descriptor, func):
"""
Pick out a specific descriptor from an entity
descriptor and store the information in a way which is easily
accessible.
*authz_service=None,
assertion_id_request_service=None,
name_id_format=None,
signature=None,
extensions=None,
key_descriptor=None,
organization=None,
contact_person=None,
id=None,
valid_until=None,
cache_duration=None,
*protocol_support_enumeration=None,
error_url=None,
:param entity_descr: A EntityDescriptor instance
:param entity: The whole entity
:param tag: which tag to store the information under
:param descriptor: The descriptor type
:param func: A processing function specific for the descriptor type
"""
try:
pdp_descr = entity_descr.pdp_descriptor
_descr = getattr(entity_descr, descriptor)
except AttributeError:
#print "No Attribute AD: %s" % entity_descr.entity_id
return
pdps = []
for pdp in pdp_descr:
dps = []
if isinstance(_descr, list):
for dp in _descr:
# Remove everyone that doesn't talk SAML 2.0
if samlp.NAMESPACE not in \
dp.protocol_support_enumeration.split(" "):
continue
dp = func(dp, entity_descr)
if dp:
dps.append(dp)
elif _descr:
dp = _descr
# Remove everyone that doesn't talk SAML 2.0
#print "supported protocols", taad.protocol_support_enumeration
if samlp.NAMESPACE not in \
pdp.protocol_support_enumeration.split(" "):
continue
if samlp.NAMESPACE in dp.protocol_support_enumeration.split(" "):
dp = func(dp, entity_descr)
if dp:
dps.append(dp)
# remove the bindings I can't handle
aserv = []
for authz_serv in pdp.authz_service:
#print "binding", attr_serv.binding
if authz_serv.binding == BINDING_SOAP:
aserv.append(authz_serv)
if dps:
entity[tag] = dps
if not aserv:
continue
pdp.authz_service = aserv
self._extensions(pdp)
# gather all the certs and place them in temporary files
certs = self._certs(pdp.key_descriptor, "pem")
self._add_certs(entity_descr.entity_id, certs)
for aus in pdp.authz_service:
self._add_certs(aus.location, certs)
pdps.append(pdp)
if pdps:
entity[tag] = pdps
def clear_from_source(self, source):
""" Remove all the metadata references I have gotten from this source
@@ -419,13 +369,16 @@ class MetaData(object):
entity["valid_until"] = valid_until
elif entity_descr.valid_until:
entity["valid_until"] = entity_descr.valid_until
self._idp_metadata(entity_descr, entity, "idp_sso")
self._sp_metadata(entity_descr, entity, "sp_sso")
self._aad_metadata(entity_descr, entity,
"attribute_authority")
self._vo_metadata(entity_descr, entity, "affiliation")
self._pdp_metadata(entity_descr, entity, "pdp")
# go through the different types of descriptors
for descr in ["idpsso", "attribute_authority", "authn_authority",
"pdp", "role", "spsso"]:
func = getattr(self, "_%s" % descr)
self._roledescriptor(entity_descr, entity, descr,
"%s_descriptor" % descr, func)
self._affiliation(entity_descr, entity, "affiliation")
try:
entity["organization"] = entity_descr.organization
except AttributeError:
@@ -492,7 +445,7 @@ class MetaData(object):
@keep_updated
def idp_services(self, entity_id, typ, binding=None):
""" depreceated """
idps = self.entity[entity_id]["idp_sso"]
idps = self.entity[entity_id]["idpsso"]
loc = {}
for idp in idps: # None or one
@@ -504,7 +457,7 @@ class MetaData(object):
@keep_updated
def sp_services(self, entity_id, typ, binding=None):
""" deprecated """
sps = self.entity[entity_id]["sp_sso"]
sps = self.entity[entity_id]["spsso"]
loc = {}
for sep in sps: # None or one
@@ -527,7 +480,7 @@ class MetaData(object):
loc = []
try:
idps = self.entity[entity_id]["idp_sso"]
idps = self.entity[entity_id]["idpsso"]
except KeyError:
return loc
@@ -554,7 +507,7 @@ class MetaData(object):
loc = []
try:
idps = self.entity[entity_id]["idp_sso"]
idps = self.entity[entity_id]["idpsso"]
except KeyError:
return loc
@@ -590,7 +543,7 @@ class MetaData(object):
loc = []
try:
sss = self.entity[entity_id]["%s_sso" % typ]
sss = self.entity[entity_id]["%ssso" % typ]
except KeyError:
return loc
@@ -663,7 +616,7 @@ class MetaData(object):
@keep_updated
def consumer_url(self, entity_id, binding=BINDING_HTTP_POST, _log=None):
try:
ssos = self.entity[entity_id]["sp_sso"]
ssos = self.entity[entity_id]["spsso"]
except KeyError:
raise
@@ -683,7 +636,7 @@ class MetaData(object):
@keep_updated
def assertion_consumer_services(self, entity_id, binding=BINDING_HTTP_POST):
try:
ssos = self.entity[entity_id]["sp_sso"]
ssos = self.entity[entity_id]["spsso"]
except KeyError:
raise
@@ -728,32 +681,40 @@ class MetaData(object):
return name
def req_opt(self, acs):
req = []
opt = []
for attr in acs.requested_attribute:
if attr.is_required == "true":
req.append(attr)
else:
opt.append(attr)
return req, opt
@keep_updated
def wants(self, entity_id):
#def attribute_consumer(self, entity_id, index=None):
def attribute_requirement(self, entity_id, index=None):
try:
return self._wants[entity_id]
ssos = self.entity[entity_id]["spsso"]
except KeyError:
return [], []
@keep_updated
def attribute_consumer(self, entity_id):
try:
ssos = self.entity[entity_id]["sp_sso"]
except KeyError:
return [], []
required = []
optional = []
# What if there is more than one ? Can't be ?
for acs in ssos[0].attribute_consuming_service:
for attr in acs.requested_attribute:
if attr.is_required == "true":
required.append(attr)
else:
optional.append(attr)
return required, optional
return {}, {}
acss = ssos[0].attribute_consuming_service
if acss is None or acss == []:
return {}, {}
elif len(acss) == 1:
return self.req_opt(acss[0])
else:
if index is None:
for acs in acss:
if acs.default:
return self.req_opt(acs)
# if I get here NO default was found, pick the first ?
return self.req_opt(acss[0])
else:
return self.req_opt(acss[index])
def _orgname(self, org, langs=None):
if not org:
return ""
@@ -792,20 +753,20 @@ class MetaData(object):
langs = ["en"]
for entity_id, edict in self.entity.items():
if "idp_sso" in edict:
if "idpsso" in edict:
#idp_aa_check self._valid(entity_id)
name = None
if "organization" in edict:
name = self._orgname(edict["organization"], langs)
if not name:
name = self._location(edict["idp_sso"])[0]
idps[entity_id] = (name, edict["idp_sso"])
name = self._location(edict["idpsso"])[0]
idps[entity_id] = (name, edict["idpsso"])
return idps
#noinspection PyUnusedLocal
@keep_updated
def ui_info(self, entity_id, service="idp_sso"):
def ui_info(self, entity_id, service="idpsso"):
inst = self.entity[entity_id][service]
def export_discojuice_json(self, lang=None):
@@ -826,7 +787,7 @@ class MetaData(object):
result = []
for entity_id, entity in self.entity.items():
try:
for _sso in entity['idp_sso']:
for _sso in entity['idpsso']:
rdict = {'entityID': entity_id,
'title': self._orgname(entity['organization'], lang)}
@@ -996,12 +957,7 @@ def do_requested_attribute(attributes, acs, is_required="false"):
lista.append(md.RequestedAttribute(**args))
return lista
def do_uiinfo(conf):
try:
_uiinfo = conf.ui_info
except AttributeError:
return None
def do_uiinfo(_uiinfo):
uii = mdui.UIInfo()
for attr in ['display_name', 'description', "information_url",
'privacy_statement_url']:
@@ -1157,13 +1113,14 @@ DEFAULT = {
"want_authn_requests_signed": "false",
}
def do_sp_sso_descriptor(conf, cert=None):
def do_spsso_descriptor(conf, cert=None):
spsso = md.SPSSODescriptor()
spsso.protocol_support_enumeration = samlp.NAMESPACE
if conf.endpoints:
for (endpoint, instlist) in do_endpoints(conf.endpoints,
ENDPOINTS["sp"]).items():
endps = conf.getattr("endpoints", "sp")
if endps:
for (endpoint, instlist) in do_endpoints(endps,
ENDPOINTS["sp"]).items():
setattr(spsso, endpoint, instlist)
if cert:
@@ -1171,7 +1128,7 @@ def do_sp_sso_descriptor(conf, cert=None):
for key in ["want_assertions_signed", "authn_requests_signed"]:
try:
val = getattr(conf, key)
val = conf.getattr(key, "sp")
if val is None:
setattr(spsso, key, DEFAULT[key]) #default ?!
else:
@@ -1181,16 +1138,15 @@ def do_sp_sso_descriptor(conf, cert=None):
setattr(spsso, key, DEFAULTS[key])
requested_attributes = []
if conf.required_attributes:
requested_attributes.extend(do_requested_attribute(
conf.required_attributes,
conf.attribute_converters,
is_required="true"))
acs = conf.attribute_converters
req = conf.getattr("required_attributes", "sp")
if req:
requested_attributes.extend(do_requested_attribute(req, acs,
is_required="true"))
if conf.optional_attributes:
requested_attributes.extend(do_requested_attribute(
conf.optional_attributes,
conf.attribute_converters))
opt=conf.getattr("optional_attributes", "sp")
if opt:
requested_attributes.extend(do_requested_attribute(opt, acs))
if requested_attributes:
spsso.attribute_consuming_service = [md.AttributeConsumingService(
@@ -1211,43 +1167,47 @@ def do_sp_sso_descriptor(conf, cert=None):
except KeyError:
pass
if conf.discovery_response:
dresp = conf.getattr("discovery_response", "sp")
if dresp:
if spsso.extensions is None:
spsso.extensions = md.Extensions()
spsso.extensions.add_extension_element(do_idpdisc(conf.discovery_response))
spsso.extensions.add_extension_element(do_idpdisc(dresp))
return spsso
def do_idp_sso_descriptor(conf, cert=None):
def do_idpsso_descriptor(conf, cert=None):
idpsso = md.IDPSSODescriptor()
idpsso.protocol_support_enumeration = samlp.NAMESPACE
if conf.endpoints:
for (endpoint, instlist) in do_endpoints(conf.endpoints,
ENDPOINTS["idp"]).items():
endps = conf.getattr("endpoints", "idp")
if endps:
for (endpoint, instlist) in do_endpoints(endps,
ENDPOINTS["idp"]).items():
setattr(idpsso, endpoint, instlist)
if conf.scope:
scopes = conf.getattr("scope", "idp")
if scopes:
if idpsso.extensions is None:
idpsso.extensions = md.Extensions()
for scope in conf.scope:
for scope in scopes:
mdscope = shibmd.Scope()
mdscope.text = scope
# unless scope contains '*'/'+'/'?' assume non regexp ?
mdscope.regexp = "false"
idpsso.extensions.add_extension_element(mdscope)
if conf.ui_info:
ui_info = conf.getattr("ui_info", "idp")
if ui_info:
if idpsso.extensions is None:
idpsso.extensions = md.Extensions()
idpsso.extensions.add_extension_element(do_uiinfo(conf))
idpsso.extensions.add_extension_element(do_uiinfo(ui_info))
if cert:
idpsso.key_descriptor = do_key_descriptor(cert)
for key in ["want_authn_requests_signed"]:
try:
val = getattr(conf,key)
val = conf.getattr(key, "idp")
if val is None:
setattr(idpsso, key, DEFAULT["want_authn_requests_signed"])
else:
@@ -1294,41 +1254,31 @@ def do_pdp_descriptor(conf, cert):
return pdp
def entity_descriptor(confd, valid_for):
def entity_descriptor(confd):
mycert = "".join(open(confd.cert_file).readlines()[1:-1])
# if "attribute_map_dir" in confd:
# attrconverters = ac_factory(confd.attribute_map_dir)
# else:
# attrconverters = [AttributeConverter()]
#if "attribute_maps" in confd:
# (forward,backward) = parse_attribute_map(confd["attribute_maps"])
#else:
# backward = {}
entd = md.EntityDescriptor()
entd.entity_id = confd.entityid
if valid_for:
entd.valid_until = in_a_while(hours=valid_for)
if confd.valid_for:
entd.valid_until = in_a_while(hours=int(confd.valid_for))
if confd.organization is not None:
entd.organization = do_organization_info(confd.organization)
if confd.contact_person is not None:
entd.contact_person = do_contact_person_info(confd.contact_person)
serves = confd.serves()
serves = confd.serves
if not serves:
raise Exception(
'No service type ("sp","idp","aa") provided in the configuration')
if "sp" in serves:
confd.context = "sp"
entd.spsso_descriptor = do_sp_sso_descriptor(confd, mycert)
entd.spsso_descriptor = do_spsso_descriptor(confd, mycert)
if "idp" in serves:
confd.context = "idp"
entd.idpsso_descriptor = do_idp_sso_descriptor(confd, mycert)
entd.idpsso_descriptor = do_idpsso_descriptor(confd, mycert)
if "aa" in serves:
confd.context = "aa"
entd.attribute_authority_descriptor = do_aa_descriptor(confd, mycert)
@@ -1366,10 +1316,7 @@ def entities_descriptor(eds, valid_for, name, ident, sign, secc):
entities = md.entities_descriptor_from_string(xmldoc)
return entities
def sign_entity_descriptor(edesc, valid_for, ident, secc):
if valid_for:
edesc.valid_until = in_a_while(hours=valid_for)
def sign_entity_descriptor(edesc, ident, secc):
if not ident:
ident = sid()

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python
import logging
import random
import string
import time
import base64
@@ -101,10 +103,21 @@ def deflate_and_base64_encode( string_val ):
:return: The deflated and encoded string
"""
return base64.b64encode( zlib.compress( string_val )[2:-4] )
def rndstr(size=16):
"""
Returns a string of random ascii characters or digits
:param size: The length of the string
:return: string
"""
_basech = string.ascii_letters + string.digits
return "".join([random.choice(_basech) for _ in range(size)])
def sid(seed=""):
"""The hash of the server time + seed makes an unique SID for each session.
128-bits long so it fulfills the SAML2 requirements which states 128-160 bits
:param seed: A seed string
:return: The hex version of the digest, prefixed by 'id-' to make it
compliant with the NCName specification

View File

@@ -366,7 +366,8 @@ class AssertionIDRequestType_(RequestAbstractType_):
c_attributes = RequestAbstractType_.c_attributes.copy()
c_child_order = RequestAbstractType_.c_child_order[:]
c_cardinality = RequestAbstractType_.c_cardinality.copy()
c_children['{urn:oasis:names:tc:SAML:2.0:assertion}AssertionIDRef'] = ('assertion_id_ref', [saml.AssertionIDRef])
c_children['{urn:oasis:names:tc:SAML:2.0:assertion}AssertionIDRef'] = (
'assertion_id_ref', [saml.AssertionIDRef])
c_cardinality['assertion_id_ref'] = {"min":1}
c_child_order.extend(['assertion_id_ref'])

View File

@@ -258,7 +258,7 @@ class Server(object):
try:
# subject information is stored in a database
# default database is a shelve database which is OK in some setups
dbspec = self.conf.subject_data
dbspec = self.conf.getattr("subject_data", "idp")
idb = None
if isinstance(dbspec, basestring):
idb = shelve.open(dbspec, writeback=True)
@@ -376,14 +376,15 @@ class Server(object):
return response
def wants(self, sp_entity_id):
""" Returns what attributes the SP requiers and which are optional
def wants(self, sp_entity_id, index=None):
""" Returns what attributes the SP requires and which are optional
if any such demands are registered in the Metadata.
:param sp_entity_id: The entity id of the SP
:param index: which of the attribute consumer services its all about
:return: 2-tuple, list of required and list of optional attributes
"""
return self.metadata.requests(sp_entity_id)
return self.metadata.attribute_requirement(sp_entity_id, index)
def parse_attribute_query(self, xml_string, decode=True):
""" Parse an attribute query
@@ -410,28 +411,20 @@ class Server(object):
# ------------------------------------------------------------------------
def _response(self, in_response_to, consumer_url=None, sp_entity_id=None,
identity=None, name_id=None, status=None, sign=False,
policy=Policy(), authn=None, authn_decl=None, issuer=None):
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
**kwargs):
""" Create a Response that adhers to the ??? profile.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param policy: The attribute release policy for this instance
:param authn: A 2-tuple denoting the authn class and the authn
authority
:param authn_decl:
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: What other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
to_sign = []
if not status:
status = success_status_factory()
@@ -447,57 +440,25 @@ class Server(object):
if consumer_url:
response.destination = consumer_url
if identity:
ast = Assertion(identity)
for key, val in kwargs.items():
setattr(response, key, val)
if sign:
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
return self.error_response(in_response_to, consumer_url,
sp_entity_id, exc, name_id)
to_sign.append((class_name(response), response.id))
except AttributeError:
to_sign = [(class_name(response), response.id)]
if authn: # expected to be a 2-tuple class+authority
(authn_class, authn_authn) = authn
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer,
authn_class=authn_class,
authn_auth=authn_authn)
elif authn_decl:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer,
authn_decl=authn_decl)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer)
if sign:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
response.assertion = assertion
return signed_instance_factory(response, self.sec, to_sign)
# ------------------------------------------------------------------------
def do_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, sign=False, authn=None, authn_decl=None,
issuer=None):
def create_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None,
authn_decl=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -507,54 +468,92 @@ class Server(object):
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param authn: A 2-tuple denoting the authn class and the authn
authority.
:param authn_decl:
:param issuer: The issuer of the response
:return: A Response instance.
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:return: A response instance
"""
policy = self.conf.policy
to_sign = []
args = {}
if identity:
_issuer = self.issuer(issuer)
ast = Assertion(identity)
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
return self._response(in_response_to, consumer_url,
sp_entity_id, identity, name_id,
status, sign, policy, authn, authn_decl, issuer)
if authn: # expected to be a 2-tuple class+authority
(authn_class, authn_authn) = authn
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer,
authn_class=authn_class,
authn_auth=authn_authn)
elif authn_decl:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer,
authn_decl=authn_decl)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.conf.attribute_converters,
policy, issuer=_issuer)
if sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
args["assertion"] = assertion
return self._response(in_response_to, consumer_url, status, issuer,
sign_response, to_sign, **args)
# ------------------------------------------------------------------------
def error_response(self, in_response_to, destination, spid, info,
name_id=None, sign=False, issuer=None):
def create_error_response(self, in_response_to, destination, info,
sign=False, issuer=None):
""" Create a error response.
:param in_response_to: The identifier of the message this is a response
to.
:param destination: The intended recipient of this message
:param spid: The entitiy ID of the SP that will get this.
:param destination: The intended recipient of this message
:param info: Either an Exception instance or a 2-tuple consisting of
error code and descriptive text
:param name_id:
:param sign: Whether the message should be signed or not
:param sign: Whether the response should be signed or not
:param issuer: The issuer of the response
:return: A Response instance
:return: A response instance
"""
status = error_status_factory(info)
return self._response(
in_response_to, # in_response_to
destination, # consumer_url
spid, # sp_entity_id
name_id=name_id,
status=status,
sign=sign,
issuer=issuer
)
return self._response(in_response_to, destination, status, issuer,
sign)
# ------------------------------------------------------------------------
#noinspection PyUnusedLocal
def do_aa_response(self, in_response_to, consumer_url, sp_entity_id,
identity=None, userid="", name_id=None, status=None,
sign=False, _name_id_policy=None, issuer=None):
def create_aa_response(self, in_response_to, consumer_url, sp_entity_id,
identity=None, userid="", name_id=None, status=None,
issuer=None, sign_assertion=False,
sign_response=False):
""" Create an attribute assertion response.
:param in_response_to: The session identifier of the request
@@ -565,24 +564,27 @@ class Server(object):
:param userid: A identifier of the user
:param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param _name_id_policy: Policy for NameID creation.
:param issuer: The issuer of the response
:return: A Response instance.
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the whole response should be signed
:return: A response instance
"""
# name_id = self.ident.construct_nameid(self.conf.policy, userid,
# sp_entity_id, identity)
return self._response(in_response_to, consumer_url,
sp_entity_id, identity, name_id,
status, sign, policy=self.conf.policy, issuer=issuer)
return self.create_response(in_response_to, consumer_url, sp_entity_id,
identity, name_id, status,
issuer=issuer,
policy=self.conf.getattr("policy", "aa"),
sign_assertion=sign_assertion,
sign_response=sign_response)
# ------------------------------------------------------------------------
def authn_response(self, identity, in_response_to, destination,
sp_entity_id, name_id_policy, userid, sign=False,
authn=None, sign_response=False, authn_decl=None,
issuer=None, instance=False):
def create_authn_response(self, identity, in_response_to, destination,
sp_entity_id, name_id_policy, userid,
authn=None, authn_decl=None, issuer=None,
sign_response=False, sign_assertion=False):
""" Constructs an AuthenticationResponse
:param identity: Information about an user
@@ -590,73 +592,50 @@ class Server(object):
this response is an answer to.
:param destination: Where the response should be sent
:param sp_entity_id: The entity identifier of the Service Provider
:param name_id_policy: ...
:param name_id_policy: How the NameID should be constructed
:param userid: The subject identifier
:param sign: Whether the assertion should be signed or not. This is
different from signing the response as such.
:param authn: Information about the authentication
:param sign_response: The response can be signed separately from the
assertions.
:param authn_decl:
:param issuer: Issuer of the response
:param instance: Whether to return the instance or a string
representation
:return: A XML string representing an authentication response
:param sign_assertion: Whether the assertion should be signed or not.
:param sign_response: Whether the response should be signed or not.
:return: A response instance
"""
name_id = None
try:
nid_formats = []
for _sp in self.metadata.entity[sp_entity_id]["sp_sso"]:
for _sp in self.metadata.entity[sp_entity_id]["spsso"]:
nid_formats.extend([n.text for n in _sp.name_id_format])
policy = self.conf.policy
policy = self.conf.getattr("policy", "idp")
name_id = self.ident.construct_nameid(policy, userid, sp_entity_id,
identity, name_id_policy,
nid_formats)
except IOError, exc:
response = self.error_response(in_response_to, destination,
sp_entity_id, exc, name_id)
response = self.create_error_response(in_response_to, destination,
sp_entity_id, exc, name_id)
return ("%s" % response).split("\n")
try:
response = self.do_response(
in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id,
sign=sign, # If the assertion should be signed
authn=authn, # Information about the
# authentication
authn_decl=authn_decl,
issuer=issuer
)
return self.create_response(in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id,
authn=authn, # Information about the
# authentication
authn_decl=authn_decl,
issuer=issuer,
policy=policy,
sign_assertion=sign_assertion,
sign_response=sign_response)
except MissingValue, exc:
response = self.error_response(in_response_to, destination,
sp_entity_id, exc, name_id)
return self.create_error_response(in_response_to, destination,
sp_entity_id, exc, name_id)
if sign_response:
try:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 2)
return self.sec.sign_statement_using_xmlsec(response,
class_name(response),
nodeid=response.id)
except Exception, exc:
response = self.error_response(in_response_to, destination,
sp_entity_id, exc, name_id)
if instance:
return response
else:
return ("%s" % response).split("\n")
else:
if instance:
return response
else:
return ("%s" % response).split("\n")
def parse_logout_request(self, text, binding=BINDING_SOAP):
"""Parse a Logout Request
@@ -669,9 +648,9 @@ class Server(object):
"""
try:
slo = self.conf.endpoint("single_logout_service", binding)
slo = self.conf.endpoint("single_logout_service", binding, "idp")
except IndexError:
logger.info("enpoints: %s" % (self.conf.endpoints,))
logger.info("enpoints: %s" % self.conf.getattr("endpoints", "idp"))
logger.info("binding wanted: %s" % (binding,))
raise
@@ -703,8 +682,8 @@ class Server(object):
return req
def logout_response(self, request, bindings, status=None, sign=False,
issuer=None):
def create_logout_response(self, request, bindings, status=None,
sign=False, issuer=None):
""" Create a LogoutResponse. What is returned depends on which binding
is used.
@@ -794,7 +773,7 @@ class Server(object):
attribute - which attributes that the requestor wants back
query - the whole query
"""
receiver_addresses = self.conf.endpoint("attribute_service")
receiver_addresses = self.conf.endpoint("attribute_service", "idp")
attribute_query = AttributeQuery( self.sec, receiver_addresses)
attribute_query = attribute_query.loads(xml_string)

View File

@@ -1006,7 +1006,7 @@ def logoutresponse_factory(sign=False, encrypt=False, **kwargs):
def response_factory(sign=False, encrypt=False, **kwargs):
response = samlp.Response(id=sid(), version=VERSION,
issue_instant=instant())
issue_instant=instant())
if sign:
response.signature = pre_signature_part(kwargs["id"])

View File

@@ -7,6 +7,7 @@ try:
except ImportError:
xmlsec_path = '/opt/local/bin/xmlsec1'
BASE = "http://localhost:8088"
CONFIG = {
"entityid" : "urn:mace:example.com:saml:roland:idp",
@@ -15,10 +16,10 @@ CONFIG = {
"idp": {
"endpoints" : {
"single_sign_on_service" : [
("http://localhost:8088/sso", BINDING_HTTP_REDIRECT)],
("%s/sso" % BASE, BINDING_HTTP_REDIRECT)],
"single_logout_service": [
("http://localhost:8088/slo", BINDING_SOAP),
("http://localhost:8088/slop",BINDING_HTTP_POST)]
("%s/slo" % BASE, BINDING_SOAP),
("%s/slop" % BASE,BINDING_HTTP_POST)]
},
"policy": {
"default": {
@@ -43,7 +44,7 @@ CONFIG = {
"cert_file" : "test.pem",
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["metadata.xml", "vo_metadata.xml"],
"local": ["metadata_sp_1.xml", "vo_metadata.xml"],
},
"attribute_map_dir" : "attributemaps",
"organization": {

View File

@@ -53,7 +53,7 @@ CONFIG = {
"debug" : 1,
"key_file" : "test.key",
"cert_file" : "test.pem",
#"xmlsec_binary" : xmlsec_path,
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["metadata.xml", "vo_metadata.xml"],
},

View File

@@ -21,7 +21,7 @@ CONFIG = {
"debug" : 1,
"key_file" : "test.key",
"cert_file" : "test.pem",
#"xmlsec_binary" : xmlsec_path,
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["idp_aa.xml", "vo_metadata.xml"],
},

View File

@@ -34,6 +34,7 @@ CONFIG={
"subject_data": "subject_data.db",
"accepted_time_diff": 60,
"attribute_map_dir" : "attributemaps",
"valid_for": 6,
"organization": {
"name": ("AB Exempel", "se"),
"display_name": ("AB Exempel", "se"),

View File

@@ -20,7 +20,7 @@ CONFIG = {
"debug" : 1,
"key_file" : "test.key",
"cert_file" : "test.pem",
#"xmlsec_binary" : xmlsec_path,
"xmlsec_binary" : xmlsec_path,
"metadata": {
"local": ["idp.xml", "vo_metadata.xml"],
},

View File

@@ -8,7 +8,7 @@ from saml2 import BINDING_SOAP
from saml2 import md, saml, samlp
from saml2 import time_util
from saml2.saml import NAMEID_FORMAT_TRANSIENT, NAME_FORMAT_URI
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import ac_factory, to_local_name
#from py.test import raises
@@ -48,38 +48,41 @@ def test_swami_1():
md.import_metadata(_read_file(SWAMI_METADATA),"-")
print len(md.entity)
assert len(md.entity)
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
if "idp_sso" in ent])
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
if "idpsso" in ent])
print idps
assert idps.keys()
idp_sso = md.single_sign_on_services(
idpsso = md.single_sign_on_services(
'https://idp.umu.se/saml2/idp/metadata.php')
assert md.name('https://idp.umu.se/saml2/idp/metadata.php') == (
u'Ume\xe5 University (SAML2)')
assert len(idp_sso) == 1
assert idp_sso == ['https://idp.umu.se/saml2/idp/SSOService.php']
assert len(idpsso) == 1
assert idpsso == ['https://idp.umu.se/saml2/idp/SSOService.php']
print md._loc_key['https://idp.umu.se/saml2/idp/SSOService.php']
ssocerts = md.certs('https://idp.umu.se/saml2/idp/SSOService.php', "signing")
print ssocerts
assert len(ssocerts) == 1
print md._wants.keys()
assert _eq(md._wants.keys(),['https://sp.swamid.se/shibboleth',
'https://connect8.sunet.se/shibboleth',
'https://beta.lobber.se/shibboleth',
'https://connect.uninett.no/shibboleth',
'https://www.diva-portal.org/shibboleth',
'https://connect.sunet.se/shibboleth',
'https://crowd.nordu.net/shibboleth'])
print md.wants('https://www.diva-portal.org/shibboleth')
assert _eq(md.wants('https://www.diva-portal.org/shibboleth')[1].keys(),
sps = dict([(id,ent["spsso"]) for id,ent in md.entity.items()\
if "spsso" in ent])
acs_sp = []
for nam, desc in sps.items():
if desc[0].attribute_consuming_service:
acs_sp.append(nam)
#print md.wants('https://www.diva-portal.org/shibboleth')
wants = md.attribute_requirement('https://connect8.sunet.se/shibboleth')
lnamn = [to_local_name(md.attrconv, attr) for attr in wants[1]]
assert _eq(lnamn,
['mail', 'givenName', 'eduPersonPrincipalName', 'sn',
'eduPersonScopedAffiliation'])
assert md.wants('https://connect.sunet.se/shibboleth')[0] == {}
assert _eq(md.wants('https://connect.sunet.se/shibboleth')[1].keys(),
['mail', 'givenName', 'eduPersonPrincipalName', 'sn',
'eduPersonScopedAffiliation'])
wants = md.attribute_requirement('https://beta.lobber.se/shibboleth')
assert wants[0] == []
lnamn = [to_local_name(md.attrconv, attr) for attr in wants[1]]
assert _eq(lnamn,
['eduPersonScopedAffiliation', 'eduPersonEntitlement',
'eduPersonPrincipalName', 'sn', 'mail', 'givenName'])
def test_incommon_1():
md = metadata.MetaData(attrconv=ATTRCONV)
@@ -87,23 +90,39 @@ def test_incommon_1():
print len(md.entity)
assert len(md.entity) == 442
idps = dict([
(id,ent["idp_sso"]) for id,ent in md.entity.items() if "idp_sso" in ent])
(id,ent["idpsso"]) for id,ent in md.entity.items() if "idpsso" in ent])
print idps.keys()
assert len(idps) == 53 # !!!!???? < 10%
assert md.single_sign_on_services('urn:mace:incommon:uiuc.edu') == []
idp_sso = md.single_sign_on_services('urn:mace:incommon:alaska.edu')
assert len(idp_sso) == 1
print idp_sso
print md.wants
assert idp_sso == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
idpsso = md.single_sign_on_services('urn:mace:incommon:alaska.edu')
assert len(idpsso) == 1
print idpsso
assert idpsso == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
sps = dict([(id,ent["spsso"]) for id,ent in md.entity.items()\
if "spsso" in ent])
acs_sp = []
for nam, desc in sps.items():
if desc[0].attribute_consuming_service:
acs_sp.append(nam)
assert len(acs_sp) == 0
# Look for attribute authorities
aas = dict([(id,ent["attribute_authority"]) for id,ent in md.entity.items()\
if "attribute_authority" in ent])
print aas.keys()
assert len(aas) == 53
def test_example():
md = metadata.MetaData(attrconv=ATTRCONV)
md.import_metadata(_read_file(EXAMPLE_METADATA), "-")
print len(md.entity)
assert len(md.entity) == 1
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
if "idp_sso" in ent])
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
if "idpsso" in ent])
assert idps.keys() == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
print md._loc_key['http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
@@ -119,14 +138,14 @@ def test_switch_1():
md.import_metadata(_read_file(SWITCH_METADATA), "-")
print len(md.entity)
assert len(md.entity) == 90
idps = dict([(id,ent["idp_sso"]) for id,ent in md.entity.items() \
if "idp_sso" in ent])
idps = dict([(id,ent["idpsso"]) for id,ent in md.entity.items() \
if "idpsso" in ent])
print idps.keys()
idp_sso = md.single_sign_on_services(
idpsso = md.single_sign_on_services(
'https://aai-demo-idp.switch.ch/idp/shibboleth')
assert len(idp_sso) == 1
print idp_sso
assert idp_sso == [
assert len(idpsso) == 1
print idpsso
assert idpsso == [
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
assert len(idps) == 16
aas = dict([(id,ent["attribute_authority"]) for id,ent in md.entity.items() \
@@ -138,7 +157,7 @@ def test_switch_1():
assert len(aad.attribute_service) == 1
assert len(aad.name_id_format) == 2
dual = dict([(id,ent) for id,ent in md.entity.items() \
if "idp_sso" in ent and "sp_sso" in ent])
if "idpsso" in ent and "spsso" in ent])
print len(dual)
assert len(dual) == 0
@@ -150,25 +169,18 @@ def test_sp_metadata():
assert len(md.entity) == 1
assert md.entity.keys() == ['urn:mace:umu.se:saml:roland:sp']
assert _eq(md.entity['urn:mace:umu.se:saml:roland:sp'].keys(), [
'valid_until',"organization","sp_sso",
'valid_until',"organization","spsso",
'contact_person'])
print md.entity['urn:mace:umu.se:saml:roland:sp']["sp_sso"][0].keyswv()
(req,opt) = md.attribute_consumer('urn:mace:umu.se:saml:roland:sp')
print md.entity['urn:mace:umu.se:saml:roland:sp']["spsso"][0].keyswv()
(req,opt) = md.attribute_requirement('urn:mace:umu.se:saml:roland:sp')
print req
assert len(req) == 3
assert len(opt) == 1
assert opt[0].name == 'urn:oid:2.5.4.12'
assert opt[0].friendly_name == 'title'
assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42',
'urn:oid:0.9.2342.19200300.100.1.3'])
assert _eq([n.name for n in req],['urn:oid:2.5.4.4', 'urn:oid:2.5.4.42',
'urn:oid:0.9.2342.19200300.100.1.3'])
assert _eq([n.friendly_name for n in req],['surName', 'givenName', 'mail'])
print md.wants
assert md._wants.keys() == ['urn:mace:umu.se:saml:roland:sp']
assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[0].keys(),
["mail", "givenName", "sn"])
assert _eq(md.wants('urn:mace:umu.se:saml:roland:sp')[1].keys(),
["title"])
KALMAR2_URL = "https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2"
KALMAR2_CERT = "kalmar2.pem"
@@ -180,7 +192,7 @@ KALMAR2_CERT = "kalmar2.pem"
# print len(md.entity)
# assert len(md.entity) > 20
# idps = dict([
# (id,ent["idp_sso"]) for id,ent in md.entity.items() if "idp_sso" in ent])
# (id,ent["idpsso"]) for id,ent in md.entity.items() if "idpsso" in ent])
# print idps.keys()
# assert len(idps) > 1
# assert "https://idp.umu.se/saml2/idp/metadata.php" in idps

View File

@@ -163,15 +163,15 @@ def test_1():
c = SPConfig().load(sp1)
c.context = "sp"
print c
assert c.endpoints
assert c.name
assert c.idp
assert c._sp_endpoints
assert c._sp_name
assert c._sp_idp
md = c.metadata
assert isinstance(md, MetaData)
assert len(c.idp) == 1
assert c.idp.keys() == ["urn:mace:example.com:saml:roland:idp"]
assert c.idp.values() == [{'single_sign_on_service':
assert len(c._sp_idp) == 1
assert c._sp_idp.keys() == ["urn:mace:example.com:saml:roland:idp"]
assert c._sp_idp.values() == [{'single_sign_on_service':
{'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect':
'http://localhost:8088/sso/'}}]
@@ -182,15 +182,16 @@ def test_2():
c.context = "sp"
print c
assert c.endpoints
assert c.idp
assert c.optional_attributes
assert c._sp_endpoints
assert c.getattr("endpoints", "sp")
assert c._sp_idp
assert c._sp_optional_attributes
assert c.name
assert c.required_attributes
assert c._sp_required_attributes
assert len(c.idp) == 1
assert c.idp.keys() == [""]
assert c.idp.values() == ["https://example.com/saml2/idp/SSOService.php"]
assert len(c._sp_idp) == 1
assert c._sp_idp.keys() == [""]
assert c._sp_idp.values() == ["https://example.com/saml2/idp/SSOService.php"]
assert c.only_use_keys_in_metadata is None
def test_minimum():
@@ -222,7 +223,7 @@ def test_idp_1():
print c
assert c.endpoint("single_sign_on_service")[0] == 'http://localhost:8088/'
attribute_restrictions = c.policy.get_attribute_restriction("")
attribute_restrictions = c.getattr("policy","idp").get_attribute_restriction("")
assert attribute_restrictions["eduPersonAffiliation"][0].match("staff")
def test_idp_2():
@@ -235,7 +236,7 @@ def test_idp_2():
assert c.endpoint("single_logout_service",
BINDING_HTTP_REDIRECT) == ["http://localhost:8088/"]
attribute_restrictions = c.policy.get_attribute_restriction("")
attribute_restrictions = c.getattr("policy","idp").get_attribute_restriction("")
assert attribute_restrictions["eduPersonAffiliation"][0].match("staff")
def test_wayf():
@@ -313,15 +314,12 @@ def test_sp():
def test_dual():
cnf = Config().load_file("idp_sp_conf")
assert cnf.serves() == ["sp", "idp"]
spcnf = cnf.copy_into("sp")
assert isinstance(spcnf, SPConfig)
assert spcnf.context == "sp"
idpcnf = cnf.copy_into("idp")
assert isinstance(idpcnf, IdPConfig)
assert idpcnf.context == "idp"
spe = cnf.getattr("endpoints", "sp")
idpe = cnf.getattr("endpoints", "idp")
assert spe
assert idpe
assert spe != idpe
def test_ecp():
cnf = SPConfig()

View File

@@ -19,7 +19,11 @@ XML_RESPONSE_FILE2 = "saml2_response.xml"
def _eq(l1,l2):
return set(l1) == set(l2)
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"],
"mail": ["foo@gmail.com"]}
class TestResponse:
def setup_class(self):
server = Server("idp_conf")
@@ -27,28 +31,28 @@ class TestResponse:
"urn:mace:example.com:saml:roland:sp",
"id12")
self._resp_ = server.do_response(
self._resp_ = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
IDENTITY,
name_id = name_id
)
self._sign_resp_ = server.do_response(
self._sign_resp_ = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
IDENTITY,
name_id = name_id,
sign=True
sign_assertion=True
)
self._resp_authn = server.do_response(
self._resp_authn = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
IDENTITY,
name_id = name_id,
authn=(saml.AUTHN_PASSWORD, "http://www.example.com/login")
)

View File

@@ -1,51 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2 import samlp, BINDING_HTTP_POST
from saml2 import saml, config, class_name, make_instance
from saml2 import saml
from saml2.server import Server
from saml2.response import authn_response, StatusResponse
from saml2.response import authn_response
from saml2.config import config_factory
XML_RESPONSE_FILE = "saml_signed.xml"
XML_RESPONSE_FILE2 = "saml2_response.xml"
import os
def _eq(l1,l2):
return set(l1) == set(l2)
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"], "givenName": ["Derek"],
"mail": ["foo@gmail.com"]}
class TestAuthnResponse:
def setup_class(self):
server = Server("idp_conf")
name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp","id12")
self._resp_ = server.do_response(
policy = server.conf.getattr("policy", "idp")
self._resp_ = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
name_id = name_id
)
IDENTITY, name_id = name_id, policy=policy)
self._sign_resp_ = server.do_response(
self._sign_resp_ = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
name_id = name_id,
sign=True
)
IDENTITY,
name_id = name_id, sign_assertion=True, policy=policy)
self._resp_authn = server.do_response(
self._resp_authn = server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
IDENTITY,
name_id = name_id,
authn=(saml.AUTHN_PASSWORD, "http://www.example.com/login")
)
authn=(saml.AUTHN_PASSWORD, "http://www.example.com/login"),
policy=policy)
self.conf = config_factory("sp", "server_conf")
self.ar = authn_response(self.conf, "http://lingon.catalogix.se:8087/")
@@ -60,7 +57,7 @@ class TestAuthnResponse:
print self.ar.__dict__
assert self.ar.came_from == 'http://localhost:8088/sso'
assert self.ar.session_id() == "id12"
assert self.ar.ava == {'eduPersonEntitlement': ['Jeter'] }
assert self.ar.ava == IDENTITY
assert self.ar.name_id
assert self.ar.issuer() == 'urn:mace:example.com:saml:roland:idp'
@@ -76,7 +73,7 @@ class TestAuthnResponse:
print self.ar.__dict__
assert self.ar.came_from == 'http://localhost:8088/sso'
assert self.ar.session_id() == "id12"
assert self.ar.ava == {'eduPersonEntitlement': ['Jeter'] }
assert self.ar.ava == IDENTITY
assert self.ar.issuer() == 'urn:mace:example.com:saml:roland:idp'
assert self.ar.name_id

View File

@@ -139,26 +139,17 @@ class TestServer1():
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
authn_request = self.client.authn_request(
query_id = "id1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
authn_request = self.client.create_authn_request(
destination = "http://www.example.com",
id = "id1")
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
# should raise an error because faulty spentityid
raises(OtherError, self.server.parse_authn_request, intermed)
def test_parse_faulty_request_to_err_status(self):
authn_request = self.client.authn_request(
query_id = "id1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
authn_request = self.client.create_authn_request(
destination = "http://www.example.com")
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
try:
@@ -178,20 +169,17 @@ class TestServer1():
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
authn_request = self.client.authn_request(
query_id = "id1",
destination = "http://localhost:8088/sso",
service_url = "http://localhost:8087/",
spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
authn_request = self.client.create_authn_request(
id = "id1",
destination = "http://localhost:8088/sso")
print authn_request
intermed = s_utils.deflate_and_base64_encode("%s" % authn_request)
response = self.server.parse_authn_request(intermed)
# returns a dictionary
print response
assert response["consumer_url"] == "http://localhost:8087/"
assert response["consumer_url"] == "http://lingon.catalogix.se:8087/"
assert response["id"] == "id1"
name_id_policy = response["request"].name_id_policy
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
@@ -202,12 +190,16 @@ class TestServer1():
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp",
"id12")
resp = self.server.do_response(
resp = self.server.create_response(
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{ "eduPersonEntitlement": "Short stop"}, # identity
name_id
{"eduPersonEntitlement": "Short stop",
"surName": "Jeter",
"givenName": "Derek",
"mail": "derek.jeter@nyy.mlb.com"},
name_id,
policy= self.server.conf.getattr("policy")
)
print resp.keyswv()
@@ -227,7 +219,7 @@ class TestServer1():
assert assertion.attribute_statement
attribute_statement = assertion.attribute_statement
print attribute_statement
assert len(attribute_statement.attribute) == 1
assert len(attribute_statement.attribute) == 4
attribute = attribute_statement.attribute[0]
assert len(attribute.attribute_value) == 1
assert attribute.friendly_name == "eduPersonEntitlement"
@@ -245,7 +237,7 @@ class TestServer1():
assert confirmation.subject_confirmation_data.in_response_to == "id12"
def test_sso_response_without_identity(self):
resp = self.server.do_response(
resp = self.server.create_response(
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
@@ -263,8 +255,9 @@ class TestServer1():
def test_sso_failure_response(self):
exc = s_utils.MissingValue("eduPersonAffiliation missing")
resp = self.server.error_response("id12", "http://localhost:8087/",
"urn:mace:example.com:saml:roland:sp", exc )
resp = self.server.create_error_response("id12",
"http://localhost:8087/",
exc )
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'in_response_to',
@@ -291,14 +284,15 @@ class TestServer1():
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = self.server.authn_response(ava,
"id1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true"),
"foba0001@example.com")
npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true")
resp_str = "%s" % self.server.create_authn_response(
ava, "id1", "http://local:8087/",
"urn:mace:example.com:saml:roland:sp",
npolicy,
"foba0001@example.com")
response = samlp.response_from_string("\n".join(resp_str))
response = samlp.response_from_string(resp_str)
print response.keyswv()
assert _eq(response.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant', 'version',
@@ -318,14 +312,16 @@ class TestServer1():
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp",
"id12")
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
signed_resp = self.server.do_response(
signed_resp = self.server.create_response(
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
{"eduPersonEntitlement":"Jeter"},
ava,
name_id = name_id,
sign=True
sign_assertion=True
)
print "%s" % signed_resp
@@ -352,11 +348,11 @@ class TestServer1():
}
self.client.users.add_information_about_person(sinfo)
logout_request = self.client.construct_logout_request(
subject_id="foba0001",
destination = "http://localhost:8088/slop",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
logout_request = self.client.create_logout_request(
destination = "http://localhost:8088/slop",
subject_id="foba0001",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
intermed = s_utils.deflate_and_base64_encode("%s" % (logout_request,))
@@ -379,10 +375,11 @@ class TestServer1():
sp = client.Saml2Client(config_file="server_conf")
sp.users.add_information_about_person(sinfo)
logout_request = sp.construct_logout_request(subject_id = "foba0001",
destination = "http://localhost:8088/slo",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
logout_request = sp.create_logout_request(
subject_id = "foba0001",
destination = "http://localhost:8088/slo",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
_ = s_utils.deflate_and_base64_encode("%s" % (logout_request,))
@@ -402,10 +399,12 @@ class TestServer2():
self.server = Server("restrictive_idp_conf")
def test_do_aa_reponse(self):
aa_policy = self.server.conf.policy
aa_policy = self.server.conf.getattr("policy", "idp")
print aa_policy.__dict__
response = self.server.do_aa_response("aaa", "http://example.com/sp/",
"urn:mace:example.com:sp:1", IDENTITY.copy())
response = self.server.create_aa_response("aaa",
"http://example.com/sp/",
"urn:mace:example.com:sp:1",
IDENTITY.copy())
assert response is not None
assert response.destination == "http://example.com/sp/"
@@ -439,7 +438,7 @@ def _logout_request(conf_file):
}
sp.users.add_information_about_person(sinfo)
return sp.construct_logout_request(
return sp.create_logout_request(
subject_id = "foba0001",
destination = "http://localhost:8088/slo",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
@@ -452,7 +451,8 @@ class TestServerLogout():
request = _logout_request("sp_slo_redirect_conf")
print request
bindings = [BINDING_HTTP_REDIRECT]
(resp, headers, message) = server.logout_response(request, bindings)
(resp, headers, message) = server.create_logout_response(request,
bindings)
assert resp == '302 Found'
assert len(headers) == 1
assert headers[0][0] == "Location"

View File

@@ -6,9 +6,12 @@ import urllib
from urlparse import urlparse, parse_qs
from saml2.client import Saml2Client, LogoutError
from saml2 import samlp, BINDING_HTTP_POST
from saml2 import samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP
from saml2 import saml, config, class_name
from saml2.discovery import discovery_service_request_url
from saml2.discovery import discovery_service_response
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.server import Server
from saml2.s_utils import decode_base64_and_inflate
from saml2.time_util import in_a_while
@@ -62,10 +65,11 @@ class TestClient:
self.client = Saml2Client(conf)
def test_create_attribute_query1(self):
req = self.client.create_attribute_query("id1",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
"https://idp.example.com/idp/",
nameid_format=saml.NAMEID_FORMAT_PERSISTENT)
req = self.client.create_attribute_query(
"https://idp.example.com/idp/",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
nameid_format=saml.NAMEID_FORMAT_PERSISTENT,
id="id1")
reqstr = "%s" % req.to_string()
assert req.destination == "https://idp.example.com/idp/"
@@ -93,9 +97,9 @@ class TestClient:
assert attrq.subject.name_id.text == name_id.text
def test_create_attribute_query2(self):
req = self.client.create_attribute_query("id1",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
req = self.client.create_attribute_query(
"https://idp.example.com/idp/",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
attribute={
("urn:oid:2.5.4.42",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
@@ -106,7 +110,8 @@ class TestClient:
("urn:oid:1.2.840.113549.1.9.1",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"):None,
},
nameid_format=saml.NAMEID_FORMAT_PERSISTENT)
nameid_format=saml.NAMEID_FORMAT_PERSISTENT,
id="id1")
print req.to_string()
assert req.destination == "https://idp.example.com/idp/"
@@ -133,13 +138,14 @@ class TestClient:
if getattr(attribute,"friendly_name"):
assert False
seen.append("email")
assert set(seen) == set(["givenName", "surname", "email"])
assert set(seen) == {"givenName", "surname", "email"}
def test_create_attribute_query_3(self):
req = self.client.create_attribute_query("id1",
"_e7b68a04488f715cda642fbdd90099f5",
req = self.client.create_attribute_query(
"https://aai-demo-idp.switch.ch/idp/shibboleth",
nameid_format=saml.NAMEID_FORMAT_TRANSIENT )
"_e7b68a04488f715cda642fbdd90099f5",
nameid_format=saml.NAMEID_FORMAT_TRANSIENT,
id="id1")
assert isinstance(req, samlp.AttributeQuery)
assert req.destination == "https://aai-demo-idp.switch.ch/idp/shibboleth"
@@ -152,13 +158,13 @@ class TestClient:
assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"
def test_attribute_query(self):
req = self.client.attribute_query(
"_e7b68a04488f715cda642fbdd90099f5",
"https://aai-demo-idp.switch.ch/idp/shibboleth",
resp = self.client.do_attribute_query(
"urn:mace:example.com:saml:roland:idp",
"_e7b68a04488f715cda642fbdd90099f5",
nameid_format=saml.NAMEID_FORMAT_TRANSIENT)
# since no one is answering on the other end
assert req is None
assert resp is None
# def test_idp_entry(self):
# idp_entry = self.client.idp_entry(name="Umeå Universitet",
@@ -179,19 +185,17 @@ class TestClient:
# assert idp_entry.loc == ['http://localhost:8088/sso']
def test_create_auth_request_0(self):
ar_str = "%s" % self.client.authn_request("id1",
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name")
id="id1")
ar = samlp.authn_request_from_string(ar_str)
print ar
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.assertion_consumer_service_url == "http://lingon.catalogix.se:8087/"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
@@ -200,36 +204,34 @@ class TestClient:
assert self.client.config.virtual_organization.keys() == [
"urn:mace:example.com:it:tek"]
ar_str = "%s" % self.client.authn_request("666",
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name",
vorg="urn:mace:example.com:it:tek")
"urn:mace:example.com:it:tek", # vo
nameid_format=NAMEID_FORMAT_PERSISTENT,
id="666")
ar = samlp.authn_request_from_string(ar_str)
print ar
assert ar.id == "666"
assert ar.assertion_consumer_service_url == "http://www.example.org/service"
assert ar.assertion_consumer_service_url == "http://lingon.catalogix.se:8087/"
assert ar.destination == "http://www.example.com/sso"
assert ar.protocol_binding == BINDING_HTTP_POST
assert ar.version == "2.0"
assert ar.provider_name == "My Name"
assert ar.issuer.text == "urn:mace:example.org:saml:sp"
assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
nid_policy = ar.name_id_policy
assert nid_policy.allow_create == "true"
assert nid_policy.allow_create == "false"
assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
def test_sign_auth_request_0(self):
#print self.client.config
ar_str = "%s" % self.client.authn_request("id1",
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
"http://www.example.org/service",
"urn:mace:example.org:saml:sp",
"My Name", sign=True)
sign=True,
id="id1")
ar = samlp.authn_request_from_string(ar_str)
assert ar
@@ -251,17 +253,20 @@ class TestClient:
def test_response(self):
IDP = "urn:mace:example.com:saml:roland:idp"
ava = { "givenName": ["Derek"], "surname": ["Jeter"],
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = "\n".join(self.server.authn_response(
identity=ava,
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=samlp.NameIDPolicy(
format=saml.NAMEID_FORMAT_PERSISTENT),
userid="foba0001@example.com"))
nameid_policy=samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
resp = self.server.create_authn_response(identity=ava,
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=nameid_policy,
userid="foba0001@example.com")
resp_str = "%s" % resp
resp_str = base64.encodestring(resp_str)
@@ -274,7 +279,9 @@ class TestClient:
session_info = authn_response.session_info()
print session_info
assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], 'sn': ['Jeter']}
assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'],
'givenName': ['Derek'],
'surName': ['Jeter']}
assert session_info["issuer"] == IDP
assert session_info["came_from"] == "http://foo.example.com/service"
response = samlp.response_from_string(authn_response.xmlstr)
@@ -289,17 +296,16 @@ class TestClient:
# --- authenticate another person
ava = { "givenName": ["Alfonson"], "surname": ["Soriano"],
ava = { "givenName": ["Alfonson"], "surName": ["Soriano"],
"mail": ["alfonson@chc.mlb.com"]}
resp_str = "\n".join(self.server.authn_response(
identity=ava,
in_response_to="id2",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=samlp.NameIDPolicy(
format=saml.NAMEID_FORMAT_PERSISTENT),
userid="also0001@example.com"))
resp_str = "%s" % self.server.create_authn_response(
identity=ava,
in_response_to="id2",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=nameid_policy,
userid="also0001@example.com")
resp_str = base64.encodestring(resp_str)
@@ -317,7 +323,6 @@ class TestClient:
entityid = self.client.config.entityid
print entityid
assert entityid == "urn:mace:example.com:saml:roland:sp"
print self.client.config.idp
print self.client.config.metadata.idps()
print self.client.config.idps()
location = self.client._sso_location()
@@ -332,10 +337,9 @@ class TestClient:
def test_authenticate(self):
print self.client.config.idps()
(sid, response) = self.client.authenticate(
response = self.client.do_authenticate(
"urn:mace:example.com:saml:roland:idp",
"http://www.example.com/relay_state")
assert sid is not None
assert response[0] == "Location"
o = urlparse(response[1])
qdict = parse_qs(o.query)
@@ -343,13 +347,11 @@ class TestClient:
saml_request = decode_base64_and_inflate(qdict["SAMLRequest"][0])
print saml_request
authnreq = samlp.authn_request_from_string(saml_request)
assert authnreq.id == sid
def test_authenticate_no_args(self):
(sid, request) = self.client.authenticate(relay_state="http://www.example.com/relay_state")
assert sid is not None
assert request[0] == "Location"
o = urlparse(request[1])
response = self.client.do_authenticate(relay_state="http://www.example.com/relay_state")
assert response[0] == "Location"
o = urlparse(response[1])
qdict = parse_qs(o.query)
assert _leq(qdict.keys(), ['SAMLRequest', 'RelayState'])
saml_request = decode_base64_and_inflate(qdict["SAMLRequest"][0])
@@ -357,14 +359,13 @@ class TestClient:
print saml_request
authnreq = samlp.authn_request_from_string(saml_request)
print authnreq.keyswv()
assert authnreq.id == sid
assert authnreq.destination == "http://localhost:8088/sso"
assert authnreq.assertion_consumer_service_url == "http://lingon.catalogix.se:8087/"
assert authnreq.provider_name == "urn:mace:example.com:saml:roland:sp"
assert authnreq.protocol_binding == BINDING_HTTP_POST
assert authnreq.protocol_binding == BINDING_HTTP_REDIRECT
name_id_policy = authnreq.name_id_policy
assert name_id_policy.allow_create == "true"
assert name_id_policy.format == "urn:oasis:names:tc:SAML:2.0:nameid-format:transient"
assert name_id_policy.allow_create == "false"
assert name_id_policy.format == NAMEID_FORMAT_PERSISTENT
issuer = authnreq.issuer
assert issuer.text == "urn:mace:example.com:saml:roland:sp"
@@ -386,7 +387,8 @@ class TestClient:
self.client.users.add_information_about_person(session_info)
entity_ids = self.client.users.issuers_of_info("123456")
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
resp = self.client.global_logout("123456", "Tired", in_a_while(minutes=5))
resp = self.client.global_logout("123456", "Tired",
in_a_while(minutes=5))
print resp
assert resp
assert resp[0] # a session_id
@@ -401,7 +403,7 @@ class TestClient:
assert session_info["reason"] == "Tired"
assert session_info["operation"] == "SLO"
assert session_info["entity_ids"] == entity_ids
assert session_info["sign"] == False
assert session_info["sign"] == True
def test_logout_2(self):
""" one IdP/AA with BINDING_SOAP, can't actually send something"""
@@ -480,7 +482,7 @@ class TestClient:
assert state_info["reason"] == "Tired"
assert state_info["operation"] == "SLO"
assert state_info["entity_ids"] == entity_ids
assert state_info["sign"] == False
assert state_info["sign"] == True
def test_authz_decision_query(self):
conf = config.SPConfig()
@@ -503,7 +505,7 @@ class TestClient:
conf.attribute_converters,
policy, issuer=client._issuer())
adq = client.authz_decision_query_using_assertion("entity_id",
adq = client.create_authz_decision_query_using_assertion("entity_id",
assertion,
"read",
"http://example.com/text")
@@ -517,11 +519,14 @@ class TestClient:
def test_request_to_discovery_service(self):
disc_url = "http://example.com/saml2/idp/disc"
url = self.client.discovery_service_request_url(disc_url)
url = discovery_service_request_url("urn:mace:example.com:saml:roland:sp",
disc_url)
print url
assert url == "http://example.com/saml2/idp/disc?entityID=urn%3Amace%3Aexample.com%3Asaml%3Aroland%3Asp"
url = self.client.discovery_service_request_url(disc_url,
url = discovery_service_request_url(
self.client.config.entityid,
disc_url,
return_url= "http://example.org/saml2/sp/ds")
print url
@@ -532,15 +537,15 @@ class TestClient:
params = urllib.urlencode(pdir)
redirect_url = "http://example.com/saml2/sp/disc?%s" % params
entity_id = self.client.discovery_service_response(url=redirect_url)
entity_id = discovery_service_response(url=redirect_url)
assert entity_id == "http://example.org/saml2/idp/sso"
pdir = {"idpID": "http://example.org/saml2/idp/sso"}
params = urllib.urlencode(pdir)
redirect_url = "http://example.com/saml2/sp/disc?%s" % params
entity_id = self.client.discovery_service_response(url=redirect_url,
returnIDParam="idpID")
entity_id = discovery_service_response(url=redirect_url,
returnIDParam="idpID")
assert entity_id == "http://example.org/saml2/idp/sso"
@@ -559,17 +564,17 @@ class TestClient:
IDP = "urn:mace:example.com:saml:roland:idp"
ava = { "givenName": ["Derek"], "surname": ["Jeter"],
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = "\n".join(self.server.authn_response(
identity=ava,
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=samlp.NameIDPolicy(
format=saml.NAMEID_FORMAT_PERSISTENT),
userid="foba0001@example.com"))
resp_str = "%s" % self.server.create_authn_response(
identity=ava,
in_response_to="id1",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id_policy=samlp.NameIDPolicy(
format=saml.NAMEID_FORMAT_PERSISTENT),
userid="foba0001@example.com")
resp_str = base64.encodestring(resp_str)
@@ -582,7 +587,9 @@ class TestClient:
session_info = authn_response.session_info()
print session_info
assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], 'sn': ['Jeter']}
assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'],
'givenName': ['Derek'],
'surName': ['Jeter']}
assert session_info["issuer"] == IDP
assert session_info["came_from"] == ""
response = samlp.response_from_string(authn_response.xmlstr)

View File

@@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
import base64
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.samlp import NameIDPolicy
from s2repoze.plugins.sp import make_plugin
from saml2.server import Server
from saml2 import make_instance, samlp, saml
@@ -30,7 +32,9 @@ ENV1 = {'SERVER_SOFTWARE': 'CherryPy/3.1.2 WSGI Server',
'wsgi.multiprocess': False,
'HTTP_ACCEPT_LANGUAGE': 'en-us',
'HTTP_ACCEPT_ENCODING': 'gzip, deflate'}
trans_name_policy = NameIDPolicy(format=NAMEID_FORMAT_TRANSIENT,
allow_create="true")
class TestSP():
def setup_class(self):
self.sp = make_plugin("rem", saml_conf="server_conf")
@@ -42,15 +46,14 @@ class TestSP():
def test_identify(self):
# Create a SAMLResponse
ava = { "givenName": ["Derek"], "surname": ["Jeter"],
ava = { "givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["derek@nyy.mlb.com"]}
resp_str = "\n".join(self.server.authn_response(ava,
"id1", "http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp",
samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
allow_create="true"),
"foba0001@example.com"))
resp_str = "%s" % self.server.create_authn_response(ava, "id1",
"http://lingon.catalogix.se:8087/",
"urn:mace:example.com:saml:roland:sp",
trans_name_policy,
"foba0001@example.com")
resp_str = base64.encodestring(resp_str)
self.sp.outstanding_queries = {"id1":"http://www.example.com/service"}
@@ -60,4 +63,4 @@ class TestSP():
assert session_info["came_from"] == 'http://www.example.com/service'
assert session_info["ava"] == {'givenName': ['Derek'],
'mail': ['derek@nyy.mlb.com'],
'sn': ['Jeter']}
'surName': ['Jeter']}

View File

@@ -186,7 +186,7 @@ def test_optional_attributes():
def test_do_sp_sso_descriptor():
conf = SPConfig().load(SP, metadata_construction=True)
spsso = metadata.do_sp_sso_descriptor(conf)
spsso = metadata.do_spsso_descriptor(conf)
assert isinstance(spsso, md.SPSSODescriptor)
assert _eq(spsso.keyswv(), ['authn_requests_signed',
@@ -215,7 +215,7 @@ def test_do_sp_sso_descriptor_2():
SP["service"]["sp"]["discovery_response"] = "http://example.com/sp/ds"
conf = SPConfig().load(SP, metadata_construction=True)
spsso = metadata.do_sp_sso_descriptor(conf)
spsso = metadata.do_spsso_descriptor(conf)
assert isinstance(spsso, md.SPSSODescriptor)
print spsso.keyswv()
@@ -242,7 +242,7 @@ def test_entity_description():
#confd = eval(open("../tests/server.config").read())
confd = SPConfig().load_file("server_conf")
print confd.attribute_converters
entd = metadata.entity_descriptor(confd, 1)
entd = metadata.entity_descriptor(confd)
assert entd is not None
print entd.keyswv()
assert _eq(entd.keyswv(), ['valid_until', 'entity_id', 'contact_person',
@@ -252,7 +252,7 @@ def test_entity_description():
def test_do_idp_sso_descriptor():
conf = IdPConfig().load(IDP, metadata_construction=True)
idpsso = metadata.do_idp_sso_descriptor(conf)
idpsso = metadata.do_idpsso_descriptor(conf)
assert isinstance(idpsso, md.IDPSSODescriptor)
assert _eq(idpsso.keyswv(), ['protocol_support_enumeration',

View File

@@ -108,7 +108,7 @@ def main(args):
if fil.endswith(".py"):
fil = fil[:-3]
cnf = Config().load_file(fil, metadata_construction=True)
eds.append(entity_descriptor(cnf, valid_for))
eds.append(entity_descriptor(cnf))
secc = SecurityContext(xmlsec, keyfile, cert_file=pubkeyfile)
if entitiesid:
@@ -118,7 +118,7 @@ def main(args):
else:
for eid in eds:
if sign:
desc = sign_entity_descriptor(eid, valid_for, id, secc)
desc = sign_entity_descriptor(eid, id, secc)
else:
desc = eid
valid_instance(desc)