So the configuration file can be somewhere else than from where the script is run.

This commit is contained in:
Roland Hedberg
2013-01-23 14:13:48 +01:00
parent 7721d21629
commit c0535602c3
4 changed files with 117 additions and 20 deletions

View File

@@ -111,6 +111,8 @@ class SAML2client(object):
help="List all the test flows as a JSON object")
self._parser.add_argument("-c", dest="spconfig", default="config_file",
help="Configuration file for the SP")
self._parser.add_argument("-P", dest="configpath", default=".",
help="Path to the configuration file for the SP")
self._parser.add_argument("oper", nargs="?", help="Which test to run")
self.interactions = None
@@ -124,7 +126,7 @@ class SAML2client(object):
return json.loads(open(self.args.json_config_file).read())
def sp_configure(self, metadata_construction=False):
sys.path.insert(0, ".")
sys.path.insert(0, self.args.configpath)
mod = import_module(self.args.spconfig)
self.sp_config = SPConfig().load(mod.CONFIG, metadata_construction)
@@ -154,7 +156,7 @@ class SAML2client(object):
self.entity_id = md.entity.keys()[0]
else:
raise Exception("Don't know which entity to talk to")
def test_summation(self, id):
status = 0
for item in self.test_log:

View File

@@ -175,6 +175,18 @@ def do_sequence(config, oper, httpc, trace, interaction, entity_id,
"rp": cookielib.CookieJar(),
"service": cookielib.CookieJar()}
try:
for test in oper["tests"]["pre"]:
chk = test()
stat = chk(environ, test_output)
try:
check_severity(stat, trace)
except FatalError:
environ["FatalError"] = True
raise
except KeyError:
pass
environ["FatalError"] = False
for op in oper["sequence"]:
output = do_query(client, op(), httpc, trace, interaction, entity_id,
@@ -182,6 +194,19 @@ def do_sequence(config, oper, httpc, trace, interaction, entity_id,
test_output.extend(output)
if environ["FatalError"]:
break
try:
for test in oper["tests"]["post"]:
chk = test()
stat = chk(environ, test_output)
try:
check_severity(stat, trace)
except FatalError:
environ["FatalError"] = True
raise
except KeyError:
pass
return test_output, "%s" % trace
@@ -203,6 +228,7 @@ def do_query(client, oper, httpc, trace, interaction, entity_id, environ, cjar,
oper.setup(environ)
query = oper.request
args = oper.args
environ["oper.args"] = oper.args
args["entity_id"] = entity_id
test_output = []
@@ -231,21 +257,32 @@ def do_query(client, oper, httpc, trace, interaction, entity_id, environ, cjar,
use_artifact = getattr(oper, "use_artifact", False)
qfunc = getattr(client, "create_%s" % query)
# remove args the create function can't handle
fargs = inspect.getargspec(qfunc).args
for arg in qargs.keys():
if arg not in fargs:
del qargs[arg]
if "message" not in oper.args:
qfunc = getattr(client, "create_%s" % query)
# remove args the create function can't handle
fargs = inspect.getargspec(qfunc).args
if oper._class:
fargs.extend([p for p,c,r in oper._class.c_attributes.values()])
fargs.extend([p for p,c in oper._class.c_children.values()])
for arg in qargs.keys():
if arg not in fargs:
del qargs[arg]
for srv in srvs:
loc = srv["location"]
qargs["destination"] = loc
environ["destination"] = loc
req = qfunc(**qargs)
try:
req = oper.args["message"]
except KeyError:
req = qfunc(**qargs)
req = oper.modify_message(req)
environ["request"] = req
_req_str = "%s" % req
if use_artifact:
saml_art = client.use_artifact(_req_str, args["entity_id"])
trace.info("SAML Artifact: %s" % saml_art)

View File

@@ -403,6 +403,32 @@ class VerifySuccessStatus(Check):
return {}
class VerifyNameIDPolicyUsage(Check):
"""
Verify the nameID in the response is according to the provided
NameIDPolicy
"""
id = "verify-name-id-policy-usage"
def _func(self, environ):
response = environ["response"][-1].response
nip = environ["oper.args"]["name_id_policy"]
for assertion in response.assertion:
nid = assertion.subject.name_id
if nip.format:
try:
assert nid.format == nip.format
except AssertionError:
self._message = "Wrong NameID Format"
self._status = WARNING
if nip.sp_name_qualifier:
try:
assert nid.sp_name_qualifier == nip.sp_name_qualifier
except AssertionError:
self._message = "Wrong SPNameQualifier"
self._status = WARNING
return {}
def factory(id):
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj):

View File

@@ -1,19 +1,23 @@
from saml2 import BINDING_HTTP_REDIRECT, BINDING_URI
from saml2 import BINDING_HTTP_REDIRECT, BINDING_URI, samlp
from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_POST
from saml2.saml import NAMEID_FORMAT_PERSISTENT
#from idp_test.check import CheckSubjectNameIDFormat
from idp_test.check import CheckSaml2IntMetaData
from idp_test.check import VerifyNameIDPolicyUsage
from idp_test.check import CheckSaml2IntAttributes
from idp_test.check import CheckLogoutSupport
from idp_test.check import VerifyLogout
from idp_test.check import VerifyContent
from idp_test.check import VerifySuccessStatus
from saml2.samlp import NameIDPolicy
__author__ = 'rolandh'
class Request(object):
_args = {}
_class = None
tests = {"post":[VerifyContent], "pre":[]}
def __init__(self):
@@ -22,13 +26,17 @@ class Request(object):
def setup(self, environ):
pass
class Saml2IntRequest(Request):
tests = {"pre": [CheckSaml2IntMetaData],
"post": [CheckSaml2IntAttributes, VerifyContent
# CheckSubjectNameIDFormat,
]}
def modify_message(self, message):
return message
class AuthnRequest(Saml2IntRequest):
#class Saml2IntRequest(Request):
# tests = {"pre": [],
# "post": [CheckSaml2IntAttributes, VerifyContent
# # CheckSubjectNameIDFormat,
# ]}
class AuthnRequest(Request):
_class = samlp.AuthnRequest
request = "authn_request"
_args = {"binding": BINDING_HTTP_REDIRECT,
"nameid_format": NAMEID_FORMAT_PERSISTENT,
@@ -45,16 +53,15 @@ class AuthnRequest_using_Artifact(AuthnRequest):
AuthnRequest.__init__(self)
self.use_artifact = True
class LogOutRequest(Saml2IntRequest):
class LogOutRequest(Request):
request = "logout_request"
_args = {"binding": BINDING_SOAP,
# "sign": True
}
def __init__(self):
Saml2IntRequest.__init__(self)
Request.__init__(self)
self.tests["pre"].append(CheckLogoutSupport)
self.tests["post"].remove(CheckSaml2IntAttributes)
self.tests["post"].append(VerifyLogout)
def setup(self, environ):
@@ -99,6 +106,18 @@ class NameIDMappeingRequest(Request):
assertion = resp.assertion[0]
self.args["subject"] = assertion.subject
class AuthnRequest_NameIDPolicy1(AuthnRequest):
request = "authn_request"
_args = {"binding": BINDING_HTTP_REDIRECT,
"name_id_policy": NameIDPolicy(format=NAMEID_FORMAT_PERSISTENT,
sp_name_qualifier="Group1",
allow_create="true"),
"allow_create": True}
def __init__(self):
AuthnRequest.__init__(self)
self.tests["post"].append(VerifyNameIDPolicyUsage)
# -----------------------------------------------------------------------------
OPERATIONS = {
@@ -106,16 +125,21 @@ OPERATIONS = {
"name": 'Absolute basic SAML2 AuthnRequest',
"descr": ('AuthnRequest using HTTP-redirect'),
"sequence": [AuthnRequest],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": [CheckSaml2IntAttributes]}
},
'basic-authn-post': {
"name": 'Basic SAML2 AuthnRequest using HTTP POST',
"descr": ('AuthnRequest using HTTP-POST'),
"sequence": [AuthnRequestPost],
"tests": {"pre": [CheckSaml2IntMetaData],
"post": [CheckSaml2IntAttributes]}
},
'log-in-out': {
"name": 'Absolute basic SAML2 log in and out',
"descr": ('AuthnRequest using HTTP-redirect followed by a logout'),
"sequence": [AuthnRequest, LogOutRequest],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []}
},
# 'authn-artifact':{
# "name": "SAML2 AuthnRequest using an artifact",
@@ -126,10 +150,18 @@ OPERATIONS = {
"name": 'AuthnRequest and then an AuthnQuery',
"descr": ('AuthnRequest followed by an AuthnQuery'),
"sequence": [AuthnRequest, AuthnQuery],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []}
},
'authn-assertion_id_request': {
"name": 'AuthnRequest and then an AssertionIDRequest',
"descr": ('AuthnRequest followed by an AssertionIDRequest'),
"sequence": [AuthnRequest, AssertionIDRequest],
}
"tests": {"pre": [CheckSaml2IntMetaData], "post": []}
},
'authn-with-name_id_policy': {
"name": 'SAML2 AuthnRequest with specific NameIDPolicy',
"descr": ('AuthnRequest with specific NameIDPolicy'),
"sequence": [AuthnRequest_NameIDPolicy1],
"tests": {"pre": [CheckSaml2IntMetaData], "post": []}
},
}