Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	setup.py
#	src/saml2/server.py
This commit is contained in:
Hans Hörberg
2015-11-19 10:48:35 +01:00
25 changed files with 500 additions and 326 deletions

View File

@@ -3,8 +3,8 @@ language: python
sudo: false
env:
- TOX_ENV=py27
- TOX_ENV=py34
- TOXENV=py27
- TOXENV=py34
addons:
apt:
@@ -14,5 +14,8 @@ addons:
services:
- mongodb
install:
- pip install -U tox
script:
- ./setup.py test
- tox

View File

@@ -143,16 +143,19 @@ class Service(object):
return resp(self.environ, self.start_response)
else:
kwargs = {}
try:
_encrypt_cert = encrypt_cert_from_item(
kwargs['encrypt_cert'] = encrypt_cert_from_item(
saml_msg["req_info"].message)
return self.do(saml_msg["SAMLRequest"], binding,
saml_msg["RelayState"],
encrypt_cert=_encrypt_cert, **kwargs)
except KeyError:
# Can live with no relay state
return self.do(saml_msg["SAMLRequest"], binding,
saml_msg["RelayState"], **kwargs)
pass
try:
kwargs['relay_state'] = saml_msg['RelayState']
except KeyError:
pass
return self.do(saml_msg["SAMLRequest"], binding, **kwargs)
def artifact_operation(self, saml_msg):
if not saml_msg:

View File

@@ -68,7 +68,7 @@ USERS = {
"ou": "IT",
"initials": "P",
#"schacHomeOrganization": "example.com",
"email": "roland@example.com",
"mail": "roland@example.com",
"displayName": "P. Roland Hedberg",
"labeledURL": "http://www.example.com/rohe My homepage",
"norEduPersonNIN": "SE197001012222"

View File

@@ -38,6 +38,7 @@ from saml2.httputil import NotImplemented
from saml2.response import StatusError
from saml2.response import VerificationError
from saml2.s_utils import UnknownPrincipal
from saml2.s_utils import decode_base64_and_inflate
from saml2.s_utils import UnsupportedBinding
from saml2.s_utils import sid
from saml2.s_utils import rndstr
@@ -634,8 +635,18 @@ class SLO(Service):
self.sp = sp
self.cache = cache
def do(self, response, binding, relay_state="", mtype="response"):
req_info = self.sp.parse_logout_request_response(response, binding)
def do(self, message, binding, relay_state="", mtype="response"):
try:
txt = decode_base64_and_inflate(message)
is_logout_request = 'LogoutRequest' in txt.split('>', 1)[0]
except: # TODO: parse the XML correctly
is_logout_request = False
if is_logout_request:
self.sp.parse_logout_request(message, binding)
else:
self.sp.parse_logout_request_response(message, binding)
return finish_logout(self.environ, self.start_response)
# ----------------------------------------------------------------------------

View File

@@ -6,21 +6,6 @@ import sys
from setuptools import setup
from setuptools.command.test import test as TestCommand
class PyTest(TestCommand):
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
def run_tests(self):
#import here, cause outside the eggs aren't loaded
import pytest
errno = pytest.main(self.test_args)
sys.exit(errno)
install_requires = [
# core dependencies
'decorator',
@@ -35,18 +20,6 @@ install_requires = [
'six'
]
tests_require = [
'mongodict',
'pyasn1',
'pymongo==3.0.1',
'python-memcached >= 1.51',
'pytest',
'mako',
'webob',
'mock'
#'pytest-coverage',
]
version = ''
with open('src/saml2/__init__.py', 'r') as fd:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
@@ -79,13 +52,6 @@ setup(
scripts=["tools/parse_xsd2.py", "tools/make_metadata.py",
"tools/mdexport.py", "tools/merge_metadata.py"],
tests_require=tests_require,
extras_require={
'testing': tests_require,
},
install_requires=install_requires,
zip_safe=False,
test_suite='tests',
cmdclass={'test': PyTest},
)

View File

@@ -979,7 +979,7 @@ def extension_elements_to_elements(extension_elements, schemas):
if isinstance(schemas, list):
pass
elif isinstance(schemas, dict):
schemas = schemas.values()
schemas = list(schemas.values())
else:
return res

View File

@@ -425,11 +425,19 @@ class AttributeConverter(object):
:return: An Attribute instance
"""
try:
_attr = self._to[attr]
except KeyError:
try:
_attr = self._to[attr.lower()]
except:
_attr = ''
if _attr:
return factory(saml.Attribute,
name=self._to[attr],
name=_attr,
name_format=self.name_format,
friendly_name=attr)
except KeyError:
else:
return factory(saml.Attribute, name=attr)
def from_format(self, attr):

View File

@@ -56,6 +56,7 @@ class Saml2Client(Base):
successfull log in.
:param binding: Which binding to use for sending the request
:param vorg: The entity_id of the virtual organization I'm a member of
:param nameid_format:
:param scoping: For which IdPs this query are aimed.
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
@@ -95,6 +96,7 @@ class Saml2Client(Base):
successfull log in.
:param binding: Which binding to use for sending the request
:param vorg: The entity_id of the virtual organization I'm a member of
:param nameid_format:
:param scoping: For which IdPs this query are aimed.
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions

View File

@@ -155,6 +155,9 @@ class Base(Entity):
except IndexError:
raise IdpUnspecified("No IdP to send to given the premises")
def sso_location(self, entityid=None, binding=BINDING_HTTP_REDIRECT):
return self._sso_location(entityid, binding)
def _my_name(self):
return self.config.name

View File

@@ -215,10 +215,16 @@ class Entity(HTTPBase):
if binding == BINDING_HTTP_POST:
logger.info("HTTP POST")
# if self.entity_type == 'sp':
# info = self.use_http_post(msg_str, destination, relay_state,
# typ)
# info["url"] = destination
# info["method"] = "POST"
# else:
info = self.use_http_form_post(msg_str, destination,
relay_state, typ)
info["url"] = destination
info["method"] = "GET"
info["method"] = "POST"
elif binding == BINDING_HTTP_REDIRECT:
logger.info("HTTP REDIRECT")
info = self.use_http_get(msg_str, destination, relay_state, typ,

View File

@@ -1,10 +1,14 @@
__author__ = 'rolandh'
COC = "http://www.geant.net/uri/dataprotection-code-of-conduct/v1"
COCO = COC
RELEASE = {
"": ["eduPersonTargetedID"],
COC: ["eduPersonPrincipalName", "eduPersonScopedAffiliation", "mail",
"displayName", "schacHomeOrganization"]
# COC: ["eduPersonPrincipalName", "eduPersonScopedAffiliation", "mail",
# "displayName", "schacHomeOrganization"],
COCO: ["eduPersonPrincipalName", "eduPersonScopedAffiliation",
'eduPersonAffiliation', "mail", "displayName", 'cn',
"schacHomeOrganization", 'schacHomeOrganizationType']
}

View File

@@ -8,4 +8,3 @@ RELEASE = {
"eduPersonScopedAffiliation", "mail",
"givenName", "sn", "displayName"]
}

View File

@@ -11,6 +11,7 @@ from six.moves.http_cookies import SimpleCookie
from saml2.time_util import utc_now
from saml2 import class_name, SAMLError
from saml2.pack import http_form_post_message
from saml2.pack import http_post_message
from saml2.pack import make_soap_enveloped_saml_thingy
from saml2.pack import http_redirect_message
@@ -248,6 +249,23 @@ class HTTPBase(object):
return r
@staticmethod
def use_http_post(message, destination, relay_state,
typ="SAMLRequest"):
"""
Return a urlencoded message that should be POSTed to the recipient.
:param message: The response
:param destination: Where the response should be sent
:param relay_state: The relay_state received in the request
:param typ: Whether a Request, Response or Artifact
:return: dictionary
"""
if not isinstance(message, six.string_types):
message = "%s" % (message,)
return http_post_message(message, relay_state, typ)
@staticmethod
def use_http_form_post(message, destination, relay_state,
typ="SAMLRequest"):

View File

@@ -17,7 +17,6 @@ from saml2 import time_util
__author__ = 'rohe0002'
logger = logging.getLogger(__name__)
@@ -73,8 +72,8 @@ class Created(Response):
class Redirect(Response):
_template = '<html>\n<head><title>Redirecting to %s</title></head>\n' \
'<body>\nYou are being redirected to <a href="%s">%s</a>\n' \
'</body>\n</html>'
'<body>\nYou are being redirected to <a href="%s">%s</a>\n' \
'</body>\n</html>'
_status = '302 Found'
def __call__(self, environ, start_response, **kwargs):
@@ -86,8 +85,8 @@ class Redirect(Response):
class SeeOther(Response):
_template = '<html>\n<head><title>Redirecting to %s</title></head>\n' \
'<body>\nYou are being redirected to <a href="%s">%s</a>\n' \
'</body>\n</html>'
'<body>\nYou are being redirected to <a href="%s">%s</a>\n' \
'</body>\n</html>'
_status = '303 See Other'
def __call__(self, environ, start_response, **kwargs):
@@ -156,6 +155,7 @@ class HttpParameters():
except KeyError:
pass
def extract(environ, empty=False, err=False):
"""Extracts strings in form data and returns a dict.
@@ -266,7 +266,7 @@ def unpack_artifact(environ):
def unpack_any(environ):
if environ['REQUEST_METHOD'].upper() == 'GET':
# Could be either redirect or artifact
# Could be either redirect or artifact
_dict = unpack_redirect(environ)
if "ID" in _dict:
binding = BINDING_URI
@@ -307,7 +307,7 @@ def cookie_signature(seed, *parts):
return sha1.hexdigest()
def make_cookie(name, load, seed, expire=0, domain="", path="",
def make_cookie(name, load, seed, expire=0, domain="", path="",
timestamp=""):
"""
Create and return a cookie

View File

@@ -1,19 +1,20 @@
from __future__ import print_function
import hashlib
import logging
import os
import sys
import json
import six
import requests
import six
from hashlib import sha1
from os.path import isfile, join
from saml2.httpbase import HTTPBase
from saml2.extension.idpdisc import BINDING_DISCO
from saml2.extension.idpdisc import DiscoveryResponse
from saml2.md import EntitiesDescriptor
from saml2.mdie import to_dict
from saml2 import md
from saml2 import samlp
from saml2 import SAMLError
@@ -60,13 +61,27 @@ REQ2SRV = {
"discovery_service_request": "discovery_response"
}
ENTITYATTRIBUTES = "urn:oasis:names:tc:SAML:metadata:attribute&EntityAttributes"
ENTITY_CATEGORY = "http://macedir.org/entity-category"
ENTITY_CATEGORY_SUPPORT = "http://macedir.org/entity-category-support"
# ---------------------------------------------------
def load_extensions():
from saml2 import extension
import pkgutil
package = extension
prefix = package.__name__ + "."
ext_map = {}
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__,
prefix):
module = __import__(modname, fromlist="dummy")
ext_map[module.NAMESPACE] = module
return ext_map
def destinations(srvs):
return [s["location"] for s in srvs]
@@ -121,6 +136,7 @@ class MetaData(object):
self.metadata = metadata
self.entity = None
self.cert = None
self.to_old = []
def items(self):
'''
@@ -313,7 +329,43 @@ class MetaData(object):
'''
Returns certificates for the given Entity
'''
raise NotImplementedError
ent = self[entity_id]
def extract_certs(srvs):
res = []
for srv in srvs:
if "key_descriptor" in srv:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
return res
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
res.extend(extract_certs(srvs))
else:
srvs = ent["%s_descriptor" % descriptor]
res = extract_certs(srvs)
return res
class InMemoryMetaData(MetaData):
@@ -360,7 +412,8 @@ class InMemoryMetaData(MetaData):
try:
if not valid(entity_descr.valid_until):
logger.error("Entity descriptor (entity id:%s) to old",
entity_descr.entity_id)
entity_descr.entity_id)
self.to_old.append(entity_descr.entity_id)
return
except AttributeError:
pass
@@ -423,7 +476,8 @@ class InMemoryMetaData(MetaData):
try:
if not valid(self.entities_descr.valid_until):
raise ToOld(
"Metadata not valid anymore, it's only valid until %s" % (
"Metadata not valid anymore, it's only valid "
"until %s" % (
self.entities_descr.valid_until,))
except AttributeError:
pass
@@ -508,45 +562,6 @@ class InMemoryMetaData(MetaData):
return res
def certs(self, entity_id, descriptor, use="signing"):
ent = self.__getitem__(entity_id)
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
else:
srvs = ent["%s_descriptor" % descriptor]
res = []
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
return res
def signed(self):
if self.entities_descr and self.entities_descr.signature:
return True
@@ -564,8 +579,8 @@ class InMemoryMetaData(MetaData):
return True
node_name = self.node_name \
or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
md.EntitiesDescriptor.c_tag)
or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
md.EntitiesDescriptor.c_tag)
if self.security.verify_signature(
txt, node_name=node_name, cert_file=self.cert):
@@ -581,6 +596,7 @@ class MetaDataFile(InMemoryMetaData):
Handles Metadata file on the same machine. The format of the file is
the SAML Metadata format.
"""
def __init__(self, onts, attrc, filename=None, cert=None, **kwargs):
super(MetaDataFile, self).__init__(onts, attrc, **kwargs)
if not filename:
@@ -601,6 +617,7 @@ class MetaDataLoader(MetaDataFile):
Handles Metadata file loaded by a passed in function.
The format of the file is the SAML Metadata format.
"""
def __init__(self, onts, attrc, loader_callable, cert=None,
security=None, **kwargs):
super(MetaDataLoader, self).__init__(onts, attrc, **kwargs)
@@ -686,6 +703,7 @@ class MetaDataMD(InMemoryMetaData):
Handles locally stored metadata, the file format is the text representation
of the Python representation of the metadata.
"""
def __init__(self, onts, attrc, filename, **kwargs):
super(MetaDataMD, self).__init__(onts, attrc, **kwargs)
self.filename = filename
@@ -701,27 +719,32 @@ SAML_METADATA_CONTENT_TYPE = 'application/samlmetadata+xml'
class MetaDataMDX(InMemoryMetaData):
""" Uses the md protocol to fetch entity information
"""
def __init__(self, entity_transform, onts, attrc, url, security, cert,
http, **kwargs):
@staticmethod
def sha1_entity_transform(entity_id):
return "{{sha1}}{}".format(
hashlib.sha1(entity_id.encode("utf-8")).hexdigest())
def __init__(self, url, entity_transform=None):
"""
:params entity_transform: function transforming (e.g. base64 or sha1
:params url: mdx service url
:params entity_transform: function transforming (e.g. base64,
sha1 hash or URL quote
hash) the entity id. It is applied to the entity id before it is
concatenated with the request URL sent to the MDX server.
:params onts:
:params attrc:
:params url:
:params security: SecurityContext()
:params cert:
:params http:
concatenated with the request URL sent to the MDX server. Defaults to
sha1 transformation.
"""
super(MetaDataMDX, self).__init__(onts, attrc, **kwargs)
super(MetaDataMDX, self).__init__(None, None)
self.url = url
self.security = security
self.cert = cert
self.http = http
self.entity_transform = entity_transform
if entity_transform:
self.entity_transform = entity_transform
else:
self.entity_transform = MetaDataMDX.sha1_entity_transform
def load(self):
# Do nothing
pass
def __getitem__(self, item):
@@ -729,13 +752,9 @@ class MetaDataMDX(InMemoryMetaData):
return self.entity[item]
except KeyError:
mdx_url = "%s/entities/%s" % (self.url, self.entity_transform(item))
response = self.http.send(
mdx_url, headers={'Accept': SAML_METADATA_CONTENT_TYPE})
response = requests.get(mdx_url, headers={
'Accept': SAML_METADATA_CONTENT_TYPE})
if response.status_code == 200:
node_name = self.node_name \
or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
md.EntitiesDescriptor.c_tag)
_txt = response.text.encode("utf-8")
if self.parse_and_check_signature(_txt):
@@ -744,8 +763,14 @@ class MetaDataMDX(InMemoryMetaData):
logger.info("Response status: %s", response.status_code)
raise KeyError
def single_sign_on_service(self, entity_id, binding=None, typ="idpsso"):
if binding is None:
binding = BINDING_HTTP_REDIRECT
return self.service(entity_id, "idpsso_descriptor",
"single_sign_on_service", binding)
class MetadataStore(object):
class MetadataStore(MetaData):
def __init__(self, onts, attrc, config, ca_certs=None,
check_validity=True,
disable_ssl_certificate_validation=False,
@@ -770,6 +795,7 @@ class MetadataStore(object):
self.metadata = {}
self.check_validity = check_validity
self.filter = filter
self.to_old = {}
def load(self, typ, *args, **kwargs):
if self.filter:
@@ -861,12 +887,15 @@ class MetadataStore(object):
for key in item['metadata']:
# Separately handle MetaDataFile and directory
if MDloader == MetaDataFile and os.path.isdir(key[0]):
files = [f for f in os.listdir(key[0]) if isfile(join(key[0], f))]
files = [f for f in os.listdir(key[0]) if
isfile(join(key[0], f))]
for fil in files:
_fil = join(key[0], fil)
_md = MetaDataFile(self.onts, self.attrc, _fil)
_md.load()
self.metadata[_fil] = _md
if _md.to_old:
self.to_old[_fil] = _md.to_old
return
if len(key) == 2:
@@ -875,11 +904,13 @@ class MetadataStore(object):
_md = MDloader(self.onts, self.attrc, key[0], **kwargs)
_md.load()
self.metadata[key[0]] = _md
if _md.to_old:
self.to_old[key[0]] = _md.to_old
def service(self, entity_id, typ, service, binding=None):
known_entity = False
logger.debug("service(%s, %s, %s, %s)", entity_id, typ, service,
binding)
binding)
for key, _md in self.metadata.items():
srvs = _md.service(entity_id, typ, service, binding)
if srvs:
@@ -1049,45 +1080,6 @@ class MetadataStore(object):
return name(_md[entity_id], langpref)
return None
def certs(self, entity_id, descriptor, use="signing"):
ent = self.__getitem__(entity_id)
if descriptor == "any":
res = []
for descr in ["spsso", "idpsso", "role", "authn_authority",
"attribute_authority", "pdp"]:
try:
srvs = ent["%s_descriptor" % descr]
except KeyError:
continue
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
cert = repack_cert(
dat["x509_certificate"]["text"])
if cert not in res:
res.append(cert)
else:
srvs = ent["%s_descriptor" % descriptor]
res = []
for srv in srvs:
for key in srv["key_descriptor"]:
if "use" in key and key["use"] == use:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
elif not "use" in key:
for dat in key["key_info"]["x509_data"]:
res.append(dat["x509_certificate"]["text"])
return res
def vo_members(self, entity_id):
ad = self.__getitem__(entity_id)["affiliation_descriptor"]
return [m["text"] for m in ad["affiliate_member"]]
@@ -1179,7 +1171,7 @@ class MetadataStore(object):
for ent_id, ent_desc in _md.items():
if descriptor in ent_desc:
if ent_id in res:
#print("duplicated entity_id: %s" % res)
# print("duplicated entity_id: %s" % res)
pass
else:
res.append(ent_id)
@@ -1214,4 +1206,3 @@ class MetadataStore(object):
return "%s" % res
elif format == "md":
return json.dumps(self.items(), indent=2)

View File

@@ -79,6 +79,32 @@ def http_form_post_message(message, location, relay_state="",
return {"headers": [("Content-type", "text/html")], "data": response}
def http_post_message(message, relay_state="", typ="SAMLRequest", **kwargs):
"""
:param message: The message
:param relay_state: for preserving and conveying state information
:return: A tuple containing header information and a HTML message.
"""
if not isinstance(message, six.string_types):
message = str(message)
if not isinstance(message, six.binary_type):
message = message.encode('utf-8')
if typ == "SAMLRequest" or typ == "SAMLResponse":
_msg = base64.b64encode(message)
else:
_msg = message
_msg = _msg.decode('ascii')
part = {typ: _msg}
if relay_state:
part["RelayState"] = relay_state
return {"headers": [("Content-type", 'application/x-www-form-urlencoded')],
"data": urlencode(part)}
def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
sigalg=None, key=None, **kwargs):
"""The HTTP Redirect binding defines a mechanism by which SAML protocol

View File

@@ -58,6 +58,7 @@ from saml2.validate import NotValid
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
@@ -160,9 +161,11 @@ class StatusUnknownPrincipal(StatusError):
class StatusUnsupportedBinding(StatusError):
pass
class StatusResponder(StatusError):
pass
STATUSCODE2EXCEPTION = {
STATUS_VERSION_MISMATCH: StatusVersionMismatch,
STATUS_AUTHN_FAILED: StatusAuthnFailed,
@@ -186,6 +189,8 @@ STATUSCODE2EXCEPTION = {
STATUS_UNSUPPORTED_BINDING: StatusUnsupportedBinding,
STATUS_RESPONDER: StatusResponder,
}
# ---------------------------------------------------------------------------
@@ -206,7 +211,8 @@ def for_me(conditions, myself):
if audience.text.strip() == myself:
return True
else:
#print("Not for me: %s != %s" % (audience.text.strip(), myself))
# print("Not for me: %s != %s" % (audience.text.strip(),
# myself))
pass
return False
@@ -336,7 +342,7 @@ class StatusResponse(object):
logger.exception("EXCEPTION: %s", excp)
raise
#print("<", self.response)
# print("<", self.response)
return self._postamble()
@@ -377,7 +383,7 @@ class StatusResponse(object):
if self.request_id and self.in_response_to and \
self.in_response_to != self.request_id:
logger.error("Not the id I expected: %s != %s",
self.in_response_to, self.request_id)
self.in_response_to, self.request_id)
return None
try:
@@ -391,9 +397,9 @@ class StatusResponse(object):
if self.asynchop:
if self.response.destination and \
self.response.destination not in self.return_addrs:
self.response.destination not in self.return_addrs:
logger.error("%s not in %s", self.response.destination,
self.return_addrs)
self.return_addrs)
return None
assert self.issue_instant_ok()
@@ -436,7 +442,7 @@ class NameIDMappingResponse(StatusResponse):
request_id=0, asynchop=True):
StatusResponse.__init__(self, sec_context, return_addrs, timeslack,
request_id, asynchop)
self.signature_check = self.sec\
self.signature_check = self.sec \
.correctly_signed_name_id_mapping_response
@@ -506,7 +512,7 @@ class AuthnResponse(StatusResponse):
if self.asynchop:
if self.in_response_to in self.outstanding_queries:
self.came_from = self.outstanding_queries[self.in_response_to]
#del self.outstanding_queries[self.in_response_to]
# del self.outstanding_queries[self.in_response_to]
try:
if not self.check_subject_confirmation_in_response_to(
self.in_response_to):
@@ -632,12 +638,12 @@ class AuthnResponse(StatusResponse):
def read_attribute_statement(self, attr_statem):
logger.debug("Attribute Statement: %s", attr_statem)
for aconv in self.attribute_converters:
logger.debug("Converts name format: %s", aconv.name_format)
# for aconv in self.attribute_converters:
# logger.debug("Converts name format: %s", aconv.name_format)
self.decrypt_attributes(attr_statem)
return to_local(self.attribute_converters, attr_statem,
self.allow_unknown_attributes)
self.allow_unknown_attributes)
def get_identity(self):
""" The assertion can contain zero or one attributeStatements
@@ -650,7 +656,8 @@ class AuthnResponse(StatusResponse):
for tmp_assertion in _assertion.advice.assertion:
if tmp_assertion.attribute_statement:
assert len(tmp_assertion.attribute_statement) == 1
ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0]))
ava.update(self.read_attribute_statement(
tmp_assertion.attribute_statement[0]))
if _assertion.attribute_statement:
assert len(_assertion.attribute_statement) == 1
_attr_statem = _assertion.attribute_statement[0]
@@ -681,7 +688,7 @@ class AuthnResponse(StatusResponse):
if data.in_response_to in self.outstanding_queries:
self.came_from = self.outstanding_queries[
data.in_response_to]
#del self.outstanding_queries[data.in_response_to]
# del self.outstanding_queries[data.in_response_to]
elif self.allow_unsolicited:
pass
else:
@@ -690,7 +697,7 @@ class AuthnResponse(StatusResponse):
# recognize
logger.debug("in response to: '%s'", data.in_response_to)
logger.info("outstanding queries: %s",
self.outstanding_queries.keys())
self.outstanding_queries.keys())
raise Exception(
"Combination of session id and requestURI I don't "
"recall")
@@ -768,7 +775,8 @@ class AuthnResponse(StatusResponse):
logger.debug("signed")
if not verified and self.do_not_verify is False:
try:
self.sec.check_signature(assertion, class_name(assertion),self.xmlstr)
self.sec.check_signature(assertion, class_name(assertion),
self.xmlstr)
except Exception as exc:
logger.error("correctly_signed_response: %s", exc)
raise
@@ -778,10 +786,10 @@ class AuthnResponse(StatusResponse):
logger.debug("assertion keys: %s", assertion.keyswv())
logger.debug("outstanding_queries: %s", self.outstanding_queries)
#if self.context == "AuthnReq" or self.context == "AttrQuery":
# if self.context == "AuthnReq" or self.context == "AttrQuery":
if self.context == "AuthnReq":
self.authn_statement_ok()
# elif self.context == "AttrQuery":
# elif self.context == "AttrQuery":
# self.authn_statement_ok(True)
if not self.condition_ok():
@@ -789,7 +797,7 @@ class AuthnResponse(StatusResponse):
logger.debug("--- Getting Identity ---")
#if self.context == "AuthnReq" or self.context == "AttrQuery":
# if self.context == "AuthnReq" or self.context == "AttrQuery":
# self.ava = self.get_identity()
# logger.debug("--- AVA: %s", self.ava)
@@ -805,13 +813,17 @@ class AuthnResponse(StatusResponse):
logger.exception("get subject")
raise
def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None, verified=False):
""" Moves the decrypted assertion from the encrypted assertion to a list.
def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None,
verified=False):
""" Moves the decrypted assertion from the encrypted assertion to a
list.
:param encrypted_assertions: A list of encrypted assertions.
:param decr_txt: The string representation containing the decrypted data. Used when verifying signatures.
:param decr_txt: The string representation containing the decrypted
data. Used when verifying signatures.
:param issuer: The issuer of the response.
:param verified: If True do not verify signatures, otherwise verify the signature if it exists.
:param verified: If True do not verify signatures, otherwise verify
the signature if it exists.
:return: A list of decrypted assertions.
"""
res = []
@@ -824,7 +836,8 @@ class AuthnResponse(StatusResponse):
if not self.sec.check_signature(
assertion, origdoc=decr_txt,
node_name=class_name(assertion), issuer=issuer):
logger.error("Failed to verify signature on '%s'", assertion)
logger.error("Failed to verify signature on '%s'",
assertion)
raise SignatureError()
res.append(assertion)
return res
@@ -836,11 +849,12 @@ class AuthnResponse(StatusResponse):
:return: True encrypted data exists otherwise false.
"""
for _assertion in enc_assertions:
if _assertion.encrypted_data is not None:
return True
if _assertion.encrypted_data is not None:
return True
def find_encrypt_data_assertion_list(self, _assertions):
""" Verifies if a list of assertions contains encrypted data in the advice element.
""" Verifies if a list of assertions contains encrypted data in the
advice element.
:param _assertions: A list of assertions.
:return: True encrypted data exists otherwise false.
@@ -848,12 +862,14 @@ class AuthnResponse(StatusResponse):
for _assertion in _assertions:
if _assertion.advice:
if _assertion.advice.encrypted_assertion:
res = self.find_encrypt_data_assertion(_assertion.advice.encrypted_assertion)
res = self.find_encrypt_data_assertion(
_assertion.advice.encrypted_assertion)
if res:
return True
def find_encrypt_data(self, resp):
""" Verifies if a saml response contains encrypted assertions with encrypted data.
""" Verifies if a saml response contains encrypted assertions with
encrypted data.
:param resp: A saml response.
:return: True encrypted data exists otherwise false.
@@ -867,7 +883,8 @@ class AuthnResponse(StatusResponse):
for tmp_assertion in resp.assertion:
if tmp_assertion.advice:
if tmp_assertion.advice.encrypted_assertion:
res = self.find_encrypt_data_assertion(tmp_assertion.advice.encrypted_assertion)
res = self.find_encrypt_data_assertion(
tmp_assertion.advice.encrypted_assertion)
if res:
return True
return False
@@ -875,7 +892,8 @@ class AuthnResponse(StatusResponse):
def parse_assertion(self, keys=None):
""" Parse the assertions for a saml response.
:param keys: A string representing a RSA key or a list of strings containing RSA keys.
:param keys: A string representing a RSA key or a list of strings
containing RSA keys.
:return: True if the assertions are parsed otherwise False.
"""
if self.context == "AuthnQuery":
@@ -884,12 +902,13 @@ class AuthnResponse(StatusResponse):
else: # This is a saml2int limitation
try:
assert len(self.response.assertion) == 1 or \
len(self.response.encrypted_assertion) == 1
len(self.response.encrypted_assertion) == 1
except AssertionError:
raise Exception("No assertion part")
has_encrypted_assertions = self.find_encrypt_data(self.response) #self.response.encrypted_assertion
#if not has_encrypted_assertions and self.response.assertion:
has_encrypted_assertions = self.find_encrypt_data(self.response) #
# self.response.encrypted_assertion
# if not has_encrypted_assertions and self.response.assertion:
# for tmp_assertion in self.response.assertion:
# if tmp_assertion.advice:
# if tmp_assertion.advice.encrypted_assertion:
@@ -912,15 +931,20 @@ class AuthnResponse(StatusResponse):
decr_text_old = decr_text
decr_text = self.sec.decrypt_keys(decr_text, keys)
resp = samlp.response_from_string(decr_text)
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text)
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion,
decr_text)
decr_text_old = None
while (self.find_encrypt_data(resp) or self.find_encrypt_data_assertion_list(_enc_assertions)) and \
while (self.find_encrypt_data(
resp) or self.find_encrypt_data_assertion_list(
_enc_assertions)) and \
decr_text_old != decr_text:
decr_text_old = decr_text
decr_text = self.sec.decrypt_keys(decr_text, keys)
resp = samlp.response_from_string(decr_text)
_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True)
#_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True)
_enc_assertions = self.decrypt_assertions(
resp.encrypted_assertion, decr_text, verified=True)
# _enc_assertions = self.decrypt_assertions(
# resp.encrypted_assertion, decr_text, verified=True)
all_assertions = _enc_assertions
if resp.assertion:
all_assertions = all_assertions + resp.assertion
@@ -928,9 +952,10 @@ class AuthnResponse(StatusResponse):
for tmp_ass in all_assertions:
if tmp_ass.advice and tmp_ass.advice.encrypted_assertion:
advice_res = self.decrypt_assertions(tmp_ass.advice.encrypted_assertion,
decr_text,
tmp_ass.issuer)
advice_res = self.decrypt_assertions(
tmp_ass.advice.encrypted_assertion,
decr_text,
tmp_ass.issuer)
if tmp_ass.advice.assertion:
tmp_ass.advice.assertion.extend(advice_res)
else:
@@ -1211,7 +1236,7 @@ class AssertionIDResponse(object):
logger.exception("EXCEPTION: %s", excp)
raise
#print("<", self.response)
# print("<", self.response)
return self._postamble()
@@ -1233,4 +1258,3 @@ class AssertionIDResponse(object):
logger.debug("response: %s", self.response)
return self

View File

@@ -57,6 +57,7 @@ AUTHN_DICT_MAP = {
"subject_locality": "subject_locality"
}
def _shelve_compat(name, *args, **kwargs):
try:
return shelve.open(name, *args, **kwargs)
@@ -132,7 +133,7 @@ class Server(Entity):
elif isinstance(dbspec, six.string_types):
idb = _shelve_compat(dbspec, writeback=True, protocol=2)
else: # database spec is a a 2-tuple (type, address)
#print(>> sys.stderr, "DBSPEC: %s" % (dbspec,))
# print(>> sys.stderr, "DBSPEC: %s" % (dbspec,))
(typ, addr) = dbspec
if typ == "shelve":
idb = _shelve_compat(addr, writeback=True, protocol=2)
@@ -289,8 +290,10 @@ class Server(Entity):
# ------------------------------------------------------------------------
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url, name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response, add_subject=True):
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response,
add_subject=True):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
@@ -305,25 +308,29 @@ class Server(Entity):
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict([
(AUTHN_DICT_MAP[k], v) for k, v in authn.items()
if k in AUTHN_DICT_MAP])
(AUTHN_DICT_MAP[k], v) for k, v in
authn.items()
if k in AUTHN_DICT_MAP])
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer, add_subject=add_subject,
policy, issuer=_issuer,
add_subject=add_subject,
**authn_args)
elif authn_statement: # Got a complete AuthnStatement
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_statem=authn_statement, add_subject=add_subject)
authn_statem=authn_statement,
add_subject=add_subject)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer, add_subject=add_subject)
policy, issuer=_issuer,
add_subject=add_subject)
return assertion
def _authn_response(self, in_response_to, consumer_url,
@@ -332,8 +339,8 @@ class Server(Entity):
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False,
encrypt_cert_advice=None, encrypt_cert_assertion=None, authn_statement=None,
encrypt_assertion_self_contained=False, encrypted_advice_attributes=False, pefim=False,
sign_alg=None, digest_alg=None):
encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
pefim=False, sign_alg=None, digest_alg=None):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -352,23 +359,27 @@ class Server(Entity):
:param best_effort: Even if not the SPs demands can be met send a
response.
:param encrypt_assertion: True if assertions should be encrypted.
:param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces
:param encrypt_assertion_self_contained: True if all encrypted
assertions should have alla namespaces
selfcontained.
:param encrypted_advice_attributes: True if assertions in the advice element should be encrypted.
:param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption of assertions.
:param encrypted_advice_attributes: True if assertions in the advice
element should be encrypted.
:param encrypt_cert_advice: Certificate to be used for encryption of
assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
:param authn_statement: Authentication statement.
:param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile should be created.
:param pefim: True if a response according to the PEFIM profile
should be created.
:return: A response instance
"""
to_sign = []
args = {}
#if identity:
# if identity:
_issuer = self._issuer(issuer)
#if encrypt_assertion and show_nameid:
# if encrypt_assertion and show_nameid:
# tmp_name_id = name_id
# name_id = None
# name_id = None
@@ -380,19 +391,27 @@ class Server(Entity):
if pefim:
encrypted_advice_attributes = True
encrypt_assertion_self_contained = True
assertion_attributes = self.setup_assertion(None, sp_entity_id, None, None, None, policy,
None, None, identity, best_effort, sign_response, False)
assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer, authn_statement, [], True,
assertion_attributes = self.setup_assertion(None, sp_entity_id,
None, None, None,
policy,
None, None, identity,
best_effort,
sign_response, False)
assertion = self.setup_assertion(authn, sp_entity_id,
in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, [], True,
sign_response)
assertion.advice = saml.Advice()
#assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
#assertion.advice.assertion_uri_ref.append(saml.AssertionURIRef())
# assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
# assertion.advice.assertion_uri_ref.append(saml.AssertionURIRef())
assertion.advice.assertion.append(assertion_attributes)
else:
assertion = self.setup_assertion(authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer, authn_statement, identity, True,
assertion = self.setup_assertion(authn, sp_entity_id,
in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, identity, True,
sign_response)
to_sign = []
@@ -402,27 +421,13 @@ class Server(Entity):
sign_alg=sign_alg, digest_alg=digest_alg)
to_sign.append((class_name(assertion), assertion.id))
#if not encrypted_advice_attributes:
# if sign_assertion:
# if assertion.advice and assertion.advice.assertion:
# for tmp_assertion in assertion.advice.assertion:
# tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
# to_sign.append((class_name(tmp_assertion), tmp_assertion.id))
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
args["assertion"] = assertion
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()):
self.session_db.store_assertion(assertion, to_sign)
return self._response(in_response_to, consumer_url, status, issuer,
sign_response, to_sign,sp_entity_id=sp_entity_id,
sign_response, to_sign, sp_entity_id=sp_entity_id,
encrypt_assertion=encrypt_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
@@ -434,7 +439,7 @@ class Server(Entity):
# ------------------------------------------------------------------------
#noinspection PyUnusedLocal
# noinspection PyUnusedLocal
def create_attribute_response(self, identity, in_response_to, destination,
sp_entity_id, userid="", name_id=None,
status=None, issuer=None,
@@ -504,7 +509,9 @@ class Server(Entity):
sp_entity_id, name_id_policy=None, userid=None,
name_id=None, authn=None, issuer=None,
sign_response=None, sign_assertion=None,
encrypt_cert_advice=None, encrypt_cert_assertion=None, encrypt_assertion=None,
encrypt_cert_advice=None,
encrypt_cert_assertion=None,
encrypt_assertion=None,
encrypt_assertion_self_contained=True,
encrypted_advice_attributes=False, pefim=False, sign_alg=None, digest_alg=None,
**kwargs):
@@ -524,13 +531,18 @@ class Server(Entity):
:param sign_assertion: Whether the assertion should be signed or not.
:param sign_response: Whether the response should be signed or not.
:param encrypt_assertion: True if assertions should be encrypted.
:param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces
:param encrypt_assertion_self_contained: True if all encrypted
assertions should have alla namespaces
selfcontained.
:param encrypted_advice_attributes: True if assertions in the advice element should be encrypted.
:param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption of assertions.
:param encrypted_advice_attributes: True if assertions in the advice
element should be encrypted.
:param encrypt_cert_advice: Certificate to be used for encryption of
assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
:param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile should be created.
:param pefim: True if a response according to the PEFIM profile
should be created.
:return: A response instance
"""
@@ -559,34 +571,43 @@ class Server(Entity):
if encrypt_assertion is None:
encrypt_assertion = False
if encrypt_assertion_self_contained is None:
encrypt_assertion_self_contained = self.config.getattr("encrypt_assertion_self_contained", "idp")
encrypt_assertion_self_contained = self.config.getattr(
"encrypt_assertion_self_contained", "idp")
if encrypt_assertion_self_contained is None:
encrypt_assertion_self_contained = True
if encrypted_advice_attributes is None:
encrypted_advice_attributes = self.config.getattr("encrypted_advice_attributes", "idp")
encrypted_advice_attributes = self.config.getattr(
"encrypted_advice_attributes", "idp")
if encrypted_advice_attributes is None:
encrypted_advice_attributes = False
if encrypted_advice_attributes or pefim:
verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_advice", "idp")
verify_encrypt_cert = self.config.getattr(
"verify_encrypt_cert_advice", "idp")
if verify_encrypt_cert is not None:
if encrypt_cert_advice is None:
raise CertificateError("No SPCertEncType certificate for encryption contained in authentication "
"request.")
raise CertificateError(
"No SPCertEncType certificate for encryption "
"contained in authentication "
"request.")
if not verify_encrypt_cert(encrypt_cert_advice):
raise CertificateError("Invalid certificate for encryption!")
raise CertificateError(
"Invalid certificate for encryption!")
if encrypt_assertion:
verify_encrypt_cert = self.config.getattr("verify_encrypt_cert_assertion", "idp")
verify_encrypt_cert = self.config.getattr(
"verify_encrypt_cert_assertion", "idp")
if verify_encrypt_cert is not None:
if encrypt_cert_assertion is None:
raise CertificateError("No SPCertEncType certificate for encryption contained in authentication "
"request.")
raise CertificateError(
"No SPCertEncType certificate for encryption "
"contained in authentication "
"request.")
if not verify_encrypt_cert(encrypt_cert_assertion):
raise CertificateError("Invalid certificate for encryption!")
raise CertificateError(
"Invalid certificate for encryption!")
if not name_id:
try:
@@ -628,13 +649,17 @@ class Server(Entity):
try:
_authn = authn
if (sign_assertion or sign_response) and self.sec.cert_handler.generate_cert():
if (
sign_assertion or sign_response) and \
self.sec.cert_handler.generate_cert():
with self.lock:
self.sec.cert_handler.update_cert(True)
return self._authn_response(in_response_to, # in_response_to
return self._authn_response(in_response_to,
# in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
identity,
# identity as dictionary
name_id,
authn=_authn,
issuer=issuer,
@@ -686,8 +711,8 @@ class Server(Entity):
authn_decl=authn_decl)
#noinspection PyUnusedLocal
def create_assertion_id_request_response(self, assertion_id, sign=False, sign_alg=None, digest_alg=None,
**kwargs):
def create_assertion_id_request_response(self, assertion_id, sign=False, sign_alg=None,
digest_alg=None, **kwargs):
"""
:param assertion_id:
@@ -709,7 +734,7 @@ class Server(Entity):
else:
return assertion
#noinspection PyUnusedLocal
# noinspection PyUnusedLocal
def create_name_id_mapping_response(self, name_id=None, encrypted_id=None,
in_response_to=None,
issuer=None, sign_response=False,

View File

@@ -301,6 +301,10 @@ not_on_or_after = before
valid = before
def utc_time_sans_frac():
return int("%d" % time.mktime(time.gmtime()))
def later_than(after, before):
""" True if then is later or equal to that """
if isinstance(after, six.string_types):

View File

@@ -36,7 +36,6 @@ CONFIG = {
# }
}
},
"subject_data": full_path("subject_data_3.db"),
#"domain": "umu.se",
#"name_qualifier": ""
},

View File

@@ -45,7 +45,6 @@ CONFIG = {
# }
}
},
"subject_data": full_path("subject_data_2.db"),
#"domain": "umu.se",
#"name_qualifier": ""
},

View File

@@ -2,14 +2,15 @@
# -*- coding: utf-8 -*-
import datetime
import re
from six.moves.urllib.parse import quote_plus
#from six.moves.urllib.parse import quote_plus
from future.backports.urllib.parse import quote_plus
from saml2.config import Config
from saml2.httpbase import HTTPBase
from saml2.mdstore import MetadataStore, MetaDataMDX
from saml2.mdstore import MetadataStore
from saml2.mdstore import MetaDataMDX
from saml2.mdstore import SAML_METADATA_CONTENT_TYPE
from saml2.mdstore import destinations
from saml2.mdstore import load_extensions
from saml2.mdstore import name
from saml2 import md
from saml2 import sigver
from saml2 import BINDING_SOAP
@@ -20,21 +21,32 @@ from saml2 import saml
from saml2 import config
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import d_to_local_name
from saml2.extension import mdui
from saml2.extension import idpdisc
from saml2.extension import dri
from saml2.extension import mdattr
from saml2.extension import ui
from saml2.s_utils import UnknownPrincipal
from saml2 import xmldsig
from saml2 import xmlenc
from pathutils import full_path
import responses
sec_config = config.Config()
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk="""
TEST_METADATA_STRING = """
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
@@ -51,21 +63,8 @@ TEST_METADATA_STRING = """
<ds:KeyInfo>
<ds:X509Data>
<ds:X509Certificate>
MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate>
{cert_data}
</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
@@ -85,20 +84,17 @@ TEST_METADATA_STRING = """
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>
"""
""".format(cert_data=TEST_CERT)
ONTS = {
saml.NAMESPACE: saml,
mdui.NAMESPACE: mdui,
mdattr.NAMESPACE: mdattr,
dri.NAMESPACE: dri,
ui.NAMESPACE: ui,
idpdisc.NAMESPACE: idpdisc,
md.NAMESPACE: md,
xmldsig.NAMESPACE: xmldsig,
xmlenc.NAMESPACE: xmlenc
}
ONTS.update(load_extensions())
ATTRCONV = ac_factory(full_path("attributemaps"))
METADATACONF = {
@@ -149,7 +145,11 @@ METADATACONF = {
}],
"11": [{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [(TEST_METADATA_STRING, )]
"metadata": [(TEST_METADATA_STRING,)]
}],
"12": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("uu.xml"),)],
}],
}
@@ -304,6 +304,36 @@ def test_metadata_file():
assert len(mds.keys()) == 560
@responses.activate
def test_mdx_service():
entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
url = "http://mdx.example.com/entities/{}".format(
quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)))
responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200,
content_type=SAML_METADATA_CONTENT_TYPE)
mdx = MetaDataMDX("http://mdx.example.com")
sso_loc = mdx.service(entity_id, "idpsso_descriptor", "single_sign_on_service")
assert sso_loc[BINDING_HTTP_REDIRECT][0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
certs = mdx.certs(entity_id, "idpsso")
assert len(certs) == 1
@responses.activate
def test_mdx_single_sign_on_service():
entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
url = "http://mdx.example.com/entities/{}".format(
quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)))
responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200,
content_type=SAML_METADATA_CONTENT_TYPE)
mdx = MetaDataMDX("http://mdx.example.com")
sso_loc = mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT)
assert sso_loc[0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
# pyff-test not available
# def test_mdx_service():
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
@@ -372,7 +402,7 @@ def test_load_string():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
#print(mds)
# print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
@@ -384,5 +414,58 @@ def test_load_string():
assert len(certs) == 1
def test_get_certs_from_metadata():
mds = MetadataStore(ONTS.values(), ATTRCONV, None)
mds.imp(METADATACONF["11"])
certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any")
certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert certs1[0] == certs2[0] == TEST_CERT
def test_get_certs_from_metadata_without_keydescriptor():
mds = MetadataStore(ONTS.values(), ATTRCONV, None)
mds.imp([{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [("""
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:shibmeta="urn:mace:shibboleth:metadata:1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
Name="urn:mace:example.com:test-1.0">
<EntityDescriptor
entityID="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
xml:base="swamid-1.0/idp.umu.se-saml2.xml">
<IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
<SingleSignOnService
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
Location="http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"/>
</IDPSSODescriptor>
<Organization>
<OrganizationName xml:lang="en">Catalogix</OrganizationName>
<OrganizationDisplayName xml:lang="en">Catalogix</OrganizationDisplayName>
<OrganizationURL xml:lang="en">http://www.catalogix.se</OrganizationURL>
</Organization>
<ContactPerson contactType="technical">
<SurName>Hedberg</SurName>
<EmailAddress>datordrift@catalogix.se</EmailAddress>
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>""",)]
}])
certs = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert len(certs) == 0
def test_metadata_extension_algsupport():
mds = MetadataStore(list(ONTS.values()), ATTRCONV, None)
mds.imp(METADATACONF["12"])
mdf = mds.metadata[full_path("uu.xml")]
_txt = mdf.dumps()
assert mds
if __name__ == "__main__":
test_load_local()
test_metadata_extension_algsupport()

View File

@@ -0,0 +1,2 @@
pymongo==3.0.1
responses==0.5.0

View File

@@ -67,9 +67,10 @@ secc = security_context(conf)
if args.id:
desc, xmldoc = entities_descriptor(eds, valid_for, args.name, args.id,
args.sign, secc)
args.sign, secc)
valid_instance(desc)
print(desc.to_string(nspair))
xmldoc = metadata_tostring_fix(desc, nspair, xmldoc)
print(xmldoc.decode("utf-8"))
else:
for eid in eds:
if args.sign:

View File

@@ -2,9 +2,6 @@
envlist = py27,py34
[testenv]
basepython =
py27: python2.7
py34: python3.4
deps =
pytest
deps = pytest
-rtests/test_requirements.txt
commands = py.test tests/