Metadata is represented as dictionaries instead of class instances.
Added checks.
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
import inspect
|
||||
import sys
|
||||
import traceback
|
||||
from saml2.md import EntitiesDescriptor
|
||||
from saml2.saml import NAMEID_FORMAT_TRANSIENT, NAMEID_FORMAT_PERSISTENT
|
||||
from saml2.saml import NAME_FORMAT_URI
|
||||
from saml2.sigver import cert_from_key_info
|
||||
from saml2.sigver import key_from_key_value
|
||||
from saml2.samlp import STATUS_SUCCESS
|
||||
from saml2.sigver import cert_from_key_info_dict
|
||||
from saml2.sigver import key_from_key_value_dict
|
||||
|
||||
__author__ = 'rolandh'
|
||||
|
||||
@@ -85,6 +85,18 @@ class WrapException(CriticalError):
|
||||
self._message = traceback.format_exception(*sys.exc_info())
|
||||
return {}
|
||||
|
||||
class InteractionNeeded(CriticalError):
|
||||
"""
|
||||
A Webpage was displayed for which no known interaction is defined.
|
||||
"""
|
||||
id = "interaction-needed"
|
||||
msg = "Unexpected page"
|
||||
|
||||
def _func(self, environ=None):
|
||||
self._status = self.status
|
||||
self._message = None
|
||||
return {"url": environ["url"]}
|
||||
|
||||
class CheckHTTPResponse(CriticalError):
|
||||
"""
|
||||
Checks that the HTTP response status is within the 200 or 300 range
|
||||
@@ -93,7 +105,7 @@ class CheckHTTPResponse(CriticalError):
|
||||
msg = "IdP error"
|
||||
|
||||
def _func(self, environ):
|
||||
_response = environ["response"]
|
||||
_response = environ["response"][-1]
|
||||
|
||||
res = {}
|
||||
if _response.status_code >= 400 :
|
||||
@@ -115,31 +127,32 @@ class CheckSaml2IntMetaData(Check):
|
||||
def verify_key_info(self, ki):
|
||||
# key_info
|
||||
# one or more key_value and/or x509_data.X509Certificate
|
||||
try:
|
||||
assert ki.key_value or ki.x509_data
|
||||
except AssertionError:
|
||||
|
||||
xkeys = cert_from_key_info_dict(ki)
|
||||
vkeys = key_from_key_value_dict(ki)
|
||||
|
||||
if xkeys or vkeys:
|
||||
pass
|
||||
else:
|
||||
self._message = "Missing KeyValue or X509Data.X509Certificate"
|
||||
self._status = CRITICAL
|
||||
return False
|
||||
|
||||
xkeys = cert_from_key_info(ki)
|
||||
vkeys = key_from_key_value(ki)
|
||||
|
||||
if xkeys and vkeys:
|
||||
# verify that it's the same keys
|
||||
# verify that it's the same keys TODO
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
def verify_key_descriptor(self, kd):
|
||||
# key_info
|
||||
if not self.verify_key_info(kd.key_info):
|
||||
if not self.verify_key_info(kd["key_info"]):
|
||||
return False
|
||||
|
||||
# use
|
||||
if kd.use:
|
||||
if "use" in kd:
|
||||
try:
|
||||
assert kd.use in ["encryption", "signing"]
|
||||
assert kd["use"] in ["encryption", "signing"]
|
||||
except AssertionError:
|
||||
self._message = "Unknown use specification: '%s'" % kd.use.text
|
||||
self._status = CRITICAL
|
||||
@@ -148,41 +161,46 @@ class CheckSaml2IntMetaData(Check):
|
||||
return True
|
||||
|
||||
def _func(self, environ):
|
||||
if isinstance(environ["metadata"], EntitiesDescriptor):
|
||||
ed = environ["metadata"].entity_descriptor[0]
|
||||
else:
|
||||
ed = environ["metadata"]
|
||||
|
||||
mds = environ["metadata"].metadata[0]
|
||||
# Should only be one
|
||||
ed = mds.entity.values()[0]
|
||||
res = {}
|
||||
|
||||
assert len(ed.idpsso_descriptor)
|
||||
idpsso = ed.idpsso_descriptor[0]
|
||||
for kd in idpsso.key_descriptor:
|
||||
if self.verify_key_descriptor(kd) == False:
|
||||
assert len(ed["idpsso_descriptor"])
|
||||
idpsso = ed["idpsso_descriptor"][0]
|
||||
for kd in idpsso["key_descriptor"]:
|
||||
if not self.verify_key_descriptor(kd):
|
||||
return res
|
||||
|
||||
# contact person
|
||||
if not idpsso.contact_person:
|
||||
if "contact_person" not in idpsso and "contact_person" not in ed:
|
||||
self._message = "Metadata should contain contact person information"
|
||||
self._status = WARNING
|
||||
return res
|
||||
else:
|
||||
item = {"support": False, "technical": False}
|
||||
for contact in idpsso.contact_person:
|
||||
try:
|
||||
item[contact.contact_type] = True
|
||||
except KeyError:
|
||||
pass
|
||||
if "contact_person" in idpsso:
|
||||
for contact in idpsso["contact_person"]:
|
||||
try:
|
||||
item[contact["contact_type"]] = True
|
||||
except KeyError:
|
||||
pass
|
||||
if "contact_person" in ed:
|
||||
for contact in ed["contact_person"]:
|
||||
try:
|
||||
item[contact["contact_type"]] = True
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if item["support"] and item["technical"]:
|
||||
if "support" in item and "technical" in item:
|
||||
pass
|
||||
elif not item["support"] and not item["technical"]:
|
||||
elif "support" not in item and "technical" not in item:
|
||||
self._message = "Missing technical and support contact information"
|
||||
self._status = WARNING
|
||||
elif item["support"]:
|
||||
elif "technical" not in item:
|
||||
self._message = "Missing technical contact information"
|
||||
self._status = WARNING
|
||||
elif item["technical"]:
|
||||
elif "support" not in item:
|
||||
self._message = "Missing support contact information"
|
||||
self._status = WARNING
|
||||
|
||||
@@ -190,16 +208,16 @@ class CheckSaml2IntMetaData(Check):
|
||||
return res
|
||||
|
||||
# NameID format
|
||||
if not idpsso.nameid_format:
|
||||
if "name_id_format" not in idpsso:
|
||||
self._message = "Metadata should specify NameID format support"
|
||||
self._status = WARNING
|
||||
return res
|
||||
else:
|
||||
# should support Transient
|
||||
item = {NAMEID_FORMAT_TRANSIENT:False}
|
||||
for format in idpsso.nameid_format:
|
||||
for format in idpsso["name_id_format"]:
|
||||
try:
|
||||
item[format] = True
|
||||
item[format["text"]] = True
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@@ -220,7 +238,7 @@ class CheckSaml2IntAttributes(Check):
|
||||
msg = "Attribute error"
|
||||
|
||||
def _func(self, environ):
|
||||
response = environ["response"]
|
||||
response = environ["response"][-1]
|
||||
try:
|
||||
opaque_identifier = environ["opaque_identifier"]
|
||||
except KeyError:
|
||||
@@ -261,7 +279,7 @@ class CheckSaml2IntAttributes(Check):
|
||||
try:
|
||||
assert assertion.subject.name_id.format == NAMEID_FORMAT_PERSISTENT
|
||||
except AssertionError:
|
||||
self._message = "NameID format should be TRANSIENT"
|
||||
self._message = "NameID format should be PERSISTENT"
|
||||
self._status = WARNING
|
||||
|
||||
if name_format_not_specified:
|
||||
@@ -286,11 +304,11 @@ class CheckSubjectNameIDFormat(Check):
|
||||
of identifier, subject to any additional constraints due to the content of
|
||||
this element or the policies of the identity provider or principal.
|
||||
"""
|
||||
id = "check-saml2int-attributes"
|
||||
id = "check-saml2int-nameid-format"
|
||||
msg = "Attribute error"
|
||||
|
||||
def _func(self, environ):
|
||||
response = environ["response"]
|
||||
response = environ["response"][-1]
|
||||
request = environ["request"]
|
||||
|
||||
res ={}
|
||||
@@ -314,21 +332,59 @@ class CheckLogoutSupport(Check):
|
||||
msg = "Does not support logout"
|
||||
|
||||
def _func(self, environ):
|
||||
if isinstance(environ["metadata"], EntitiesDescriptor):
|
||||
ed = environ["metadata"].entity_descriptor[0]
|
||||
else:
|
||||
ed = environ["metadata"]
|
||||
mds = environ["metadata"].metadata[0]
|
||||
# Should only be one
|
||||
ed = mds.entity.values()[0]
|
||||
|
||||
assert len(ed.idpsso_descriptor)
|
||||
idpsso = ed.idpsso_descriptor[0]
|
||||
assert len(ed["idpsso_descriptor"])
|
||||
|
||||
idpsso = ed["idpsso_descriptor"][0]
|
||||
try:
|
||||
assert idpsso.single_logout_service
|
||||
assert idpsso["single_logout_service"]
|
||||
except AssertionError:
|
||||
self._message = self.msg
|
||||
self._status = CRITICAL
|
||||
|
||||
return {}
|
||||
|
||||
class VerifyLogout(Check):
|
||||
id = "verify-logout"
|
||||
msg = "Logout failed"
|
||||
|
||||
def _func(self, environ):
|
||||
# Check that the logout response says it was a success
|
||||
resp = environ["response"][-1]
|
||||
status = resp.response.status
|
||||
try:
|
||||
assert status.status_code.value == STATUS_SUCCESS
|
||||
except AssertionError:
|
||||
self._message = self.msg
|
||||
self._status = CRITICAL
|
||||
|
||||
# Check that there are no valid cookies
|
||||
# should only result in a warning
|
||||
httpc = environ["httpc"]
|
||||
try:
|
||||
assert httpc.cookies(environ["destination"]) == {}
|
||||
except AssertionError:
|
||||
self._message = "Remaining cookie ?"
|
||||
self._status = WARNING
|
||||
|
||||
return {}
|
||||
|
||||
class VerifyContent(Check):
|
||||
""" Basic content verification class, does required and max/min checks
|
||||
"""
|
||||
id = "verify-content"
|
||||
|
||||
def _func(self, environ):
|
||||
try:
|
||||
environ["response"][-1].response.verify()
|
||||
except ValueError:
|
||||
self._status = CRITICAL
|
||||
|
||||
return {}
|
||||
|
||||
def factory(id):
|
||||
for name, obj in inspect.getmembers(sys.modules[__name__]):
|
||||
if inspect.isclass(obj):
|
||||
|
||||
Reference in New Issue
Block a user