Fixed handling of signed and then encrypted response assertions. At the same time added support for dealing with any combination of encrypted/non-encrypted assertions.

This commit is contained in:
Roland Hedberg
2014-04-14 14:19:19 +02:00
parent fca906e9b0
commit a6ef115ff7
11 changed files with 633 additions and 380 deletions

View File

@@ -543,6 +543,103 @@ class EntityCategories(object):
pass
def _authn_context_class_ref(authn_class, authn_auth=None):
"""
Construct the authn context with a authn context class reference
:param authn_class: The authn context class reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
cntx_class = factory(saml.AuthnContextClassRef, text=authn_class)
if authn_auth:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
else:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class)
def _authn_context_decl(decl, authn_auth=None):
"""
Construct the authn context with a authn context declaration
:param decl: The authn context declaration
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl=decl,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
def _authn_context_decl_ref(decl_ref, authn_auth=None):
"""
Construct the authn context with a authn context declaration reference
:param decl_ref: The authn context declaration reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl_ref=decl_ref,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
def authn_statement(authn_class=None, authn_auth=None,
authn_decl=None, authn_decl_ref=None, authn_instant="",
subject_locality=""):
"""
Construct the AuthnStatement
:param authn_class: Authentication Context Class reference
:param authn_auth: Authenticating Authority
:param authn_decl: Authentication Context Declaration
:param authn_decl_ref: Authentication Context Declaration reference
:param authn_instant: When the Authentication was performed.
Assumed to be seconds since the Epoch.
:param subject_locality: Specifies the DNS domain name and IP address
for the system from which the assertion subject was apparently
authenticated.
:return: An AuthnContext instance
"""
if authn_instant:
_instant = instant(time_stamp=authn_instant)
else:
_instant = instant()
if authn_class:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_class_ref(
authn_class, authn_auth))
elif authn_decl:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_decl(authn_decl, authn_auth))
elif authn_decl_ref:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_decl_ref(authn_decl_ref,
authn_auth))
else:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid())
if subject_locality:
res.subject_locality = saml.SubjectLocality(text=subject_locality)
return res
class Assertion(dict):
""" Handles assertions about subjects """
@@ -550,101 +647,6 @@ class Assertion(dict):
dict.__init__(self, dic)
self.acs = []
@staticmethod
def _authn_context_decl(decl, authn_auth=None):
"""
Construct the authn context with a authn context declaration
:param decl: The authn context declaration
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl=decl,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
def _authn_context_decl_ref(self, decl_ref, authn_auth=None):
"""
Construct the authn context with a authn context declaration reference
:param decl_ref: The authn context declaration reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl_ref=decl_ref,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
@staticmethod
def _authn_context_class_ref(authn_class, authn_auth=None):
"""
Construct the authn context with a authn context class reference
:param authn_class: The authn context class reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
cntx_class = factory(saml.AuthnContextClassRef, text=authn_class)
if authn_auth:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
else:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class)
def _authn_statement(self, authn_class=None, authn_auth=None,
authn_decl=None, authn_decl_ref=None, authn_instant="",
subject_locality=""):
"""
Construct the AuthnStatement
:param authn_class: Authentication Context Class reference
:param authn_auth: Authenticating Authority
:param authn_decl: Authentication Context Declaration
:param authn_decl_ref: Authentication Context Declaration reference
:param authn_instant: When the Authentication was performed.
Assumed to be seconds since the Epoch.
:param subject_locality: Specifies the DNS domain name and IP address
for the system from which the assertion subject was apparently
authenticated.
:return: An AuthnContext instance
"""
if authn_instant:
_instant = instant(time_stamp=authn_instant)
else:
_instant = instant()
if authn_class:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=self._authn_context_class_ref(
authn_class, authn_auth))
elif authn_decl:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=self._authn_context_decl(authn_decl, authn_auth))
elif authn_decl_ref:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=self._authn_context_decl_ref(authn_decl_ref,
authn_auth))
else:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid())
if subject_locality:
res.subject_locality = saml.SubjectLocality(text=subject_locality)
return res
def construct(self, sp_entity_id, in_response_to, consumer_url,
name_id, attrconvs, policy, issuer, authn_class=None,
authn_auth=None, authn_decl=None, encrypt=None,
@@ -695,10 +697,10 @@ class Assertion(dict):
conds = policy.conditions(sp_entity_id)
if authn_auth or authn_class or authn_decl or authn_decl_ref:
_authn_statement = self._authn_statement(authn_class, authn_auth,
authn_decl, authn_decl_ref,
authn_instant,
subject_locality)
_authn_statement = authn_statement(authn_class, authn_auth,
authn_decl, authn_decl_ref,
authn_instant,
subject_locality)
else:
_authn_statement = None

View File

@@ -852,14 +852,6 @@ class Entity(HTTPBase):
xmlstr = self.unravel(xmlstr, binding, response_cls.msgtype)
origxml = xmlstr
if outstanding_certs is not None:
_response = samlp.any_response_from_string(xmlstr)
if len(_response.encrypted_assertion) > 0:
_, cert_file = make_temp(
"%s" % outstanding_certs[
_response.in_response_to]["key"], decode=False)
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
xmlstr = cbxs.decrypt(xmlstr, cert_file)
if not xmlstr: # Not a valid reponse
return None
@@ -878,18 +870,14 @@ class Entity(HTTPBase):
logger.debug("XMLSTR: %s" % xmlstr)
if hasattr(response.response, 'encrypted_assertion'):
for encrypted_assertion in response.response.encrypted_assertion:
if encrypted_assertion.extension_elements is not None:
assertion_list = extension_elements_to_elements(
encrypted_assertion.extension_elements, [saml])
for assertion in assertion_list:
_assertion = saml.assertion_from_string(
str(assertion))
response.response.assertion.append(_assertion)
if response:
response = response.verify()
if outstanding_certs is not None:
_, key_file = make_temp(
"%s" % outstanding_certs[
response.in_response_to]["key"], decode=False)
else:
key_file = ""
response = response.verify(key_file)
if not response:
return None

View File

@@ -42,8 +42,8 @@ import xmldsig as ds
import xmlenc as xenc
from saml2 import samlp
from saml2 import class_name
from saml2 import saml
from saml2 import extension_element_to_element
from saml2 import extension_elements_to_elements
from saml2 import SAMLError
from saml2 import time_util
@@ -387,7 +387,7 @@ class StatusResponse(object):
if self.asynchop:
if self.response.destination and \
self.response.destination not in self.return_addrs:
self.response.destination not in self.return_addrs:
logger.error("%s not in %s" % (self.response.destination,
self.return_addrs))
return None
@@ -399,7 +399,7 @@ class StatusResponse(object):
def loads(self, xmldata, decode=True, origxml=None):
return self._loads(xmldata, decode, origxml)
def verify(self):
def verify(self, key_file=""):
try:
return self._verify()
except AssertionError:
@@ -473,6 +473,7 @@ class AuthnResponse(StatusResponse):
self.came_from = ""
self.ava = None
self.assertion = None
self.assertions = []
self.session_not_on_or_after = 0
self.allow_unsolicited = allow_unsolicited
self.require_signature = want_assertions_signed
@@ -739,8 +740,13 @@ class AuthnResponse(StatusResponse):
return self.name_id
def _assertion(self, assertion):
self.assertion = assertion
"""
Check the assertion
:param assertion:
:return: True/False depending on if the assertion is sane or not
"""
self.assertion = assertion
logger.debug("assertion context: %s" % (self.context,))
logger.debug("assertion keys: %s" % (assertion.keyswv()))
logger.debug("outstanding_queries: %s" % (self.outstanding_queries,))
@@ -773,54 +779,61 @@ class AuthnResponse(StatusResponse):
logger.exception("get subject")
raise
def _encrypted_assertion(self, xmlstr):
if xmlstr.encrypted_data:
assertion_str = self.sec.decrypt(xmlstr.encrypted_data.to_string())
if not assertion_str:
raise DecryptionFailed()
assertion = saml.assertion_from_string(assertion_str)
else:
decrypt_xml = self.sec.decrypt(xmlstr)
def decrypt_assertions(self, encrypted_assertions, key_file=""):
res = []
for encrypted_assertion in encrypted_assertions:
if encrypted_assertion.extension_elements:
assertions = extension_elements_to_elements(
encrypted_assertion.extension_elements, [saml, samlp])
for assertion in assertions:
if assertion.signature:
if not self.sec.verify_signature(
"%s" % assertion, key_file,
node_name=class_name(assertion)):
logger.error(
"Failed to verify signature on '%s'" % assertion)
raise SignatureError()
res.append(assertion)
return res
logger.debug("Decryption successfull")
self.response = samlp.response_from_string(decrypt_xml)
logger.debug("Parsed decrypted assertion successfull")
enc = self.response.encrypted_assertion[0].extension_elements[0]
assertion = extension_element_to_element(
enc, saml.ELEMENT_FROM_STRING, namespace=saml.NAMESPACE)
logger.debug("Decrypted Assertion: %s" % assertion)
return self._assertion(assertion)
def parse_assertion(self):
def parse_assertion(self, key_file=""):
if self.context == "AuthnQuery":
# can contain one or more assertions
pass
else: # This is a saml2int limitation
try:
assert len(self.response.assertion) == 1 or \
len(self.response.encrypted_assertion) == 1
len(self.response.encrypted_assertion) == 1
except AssertionError:
raise Exception("No assertion part")
if self.response.encrypted_assertion:
logger.debug("***Encrypted assertion/-s***")
decr_text = self.sec.decrypt(self.xmlstr)
resp = samlp.response_from_string(decr_text)
res = self.decrypt_assertions(resp.encrypted_assertion, key_file)
if self.response.assertion:
self.response.assertion.extend(res)
else:
self.response.assertion = res
self.response.encrypted_assertion = []
if self.response.assertion:
logger.debug("***Unencrypted response***")
logger.debug("***Unencrypted assertion***")
for assertion in self.response.assertion:
if not self._assertion(assertion):
return False
return True
else:
logger.debug("***Encrypted response***")
for assertion in self.response.encrypted_assertion:
if not self._encrypted_assertion(assertion):
return False
return True
else:
self.assertions.append(assertion)
self.assertion = self.assertions[0]
def verify(self):
return True
def verify(self, key_file):
""" Verify that the assertion is syntactically correct and
the signature is correct if present."""
the signature is correct if present.
:param key_file: If not the default key file should be used this is it.
"""
try:
self._verify()
@@ -830,7 +843,7 @@ class AuthnResponse(StatusResponse):
if not isinstance(self.response, samlp.Response):
return self
if self.parse_assertion():
if self.parse_assertion(key_file):
return self
else:
logger.error("Could not parse the assertion")
@@ -1056,7 +1069,7 @@ class AssertionIDResponse(object):
return self._postamble()
def verify(self):
def verify(self, key_file=""):
try:
valid_instance(self.response)
except NotValid, exc:

View File

@@ -35,12 +35,13 @@ from Crypto.PublicKey import RSA
from saml2.cert import OpenSSLWrapper
from saml2.extension import pefim
from saml2.saml import EncryptedAssertion
from saml2.samlp import Response, response_from_string
from saml2.samlp import Response
import xmldsig as ds
import xmlenc as enc
from saml2 import samlp, SAMLError, extension_elements_to_elements
from saml2 import samlp
from saml2 import SAMLError
from saml2 import extension_elements_to_elements
from saml2 import class_name
from saml2 import saml
from saml2 import ExtensionElement
@@ -74,7 +75,8 @@ RSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
RSA_1_5 = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
TRIPLE_DES_CBC = "http://www.w3.org/2001/04/xmlenc#tripledes-cbc"
XMLTAG = "<?xml version='1.0'?>"
PREFIX = "<?xml version='1.0' encoding='UTF-8'?>"
PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>"
PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>'
class SigverError(SAMLError):
@@ -125,8 +127,12 @@ def rm_xmltag(statement):
statement = statement[len(XMLTAG):]
if statement[0] == '\n':
statement = statement[1:]
elif statement.startswith(PREFIX):
statement = statement[len(PREFIX):]
elif statement.startswith(PREFIX1):
statement = statement[len(PREFIX1):]
if statement[0] == '\n':
statement = statement[1:]
elif statement.startswith(PREFIX2):
statement = statement[len(PREFIX2):]
if statement[0] == '\n':
statement = statement[1:]
@@ -693,7 +699,7 @@ class CryptoBackend():
def encrypt(self, text, recv_key, template, key_type):
raise NotImplementedError()
def encrypt_assertion(self, statement, recv_key, key_type):
def encrypt_assertion(self, statement, recv_key, key_type, xpath=""):
raise NotImplementedError()
def decrypt(self, enctext, key_file):
@@ -733,12 +739,25 @@ class CryptoBackendXmlSec1(CryptoBackend):
except IndexError:
return ""
def encrypt(self, text, recv_key, template, key_type):
def encrypt(self, text, recv_key, template, session_key_type, xpath=""):
"""
:param text: The text to be compiled
:param recv_key: Filename of a file where the key resides
:param template: Filename of a file with the pre-encryption part
:param session_key_type: Type and size of a new session key
"des-192" generates a new 192 bits DES key for DES3 encryption
:param xpath: What should be encrypted
:return:
"""
logger.debug("Encryption input len: %d" % len(text))
_, fil = make_temp("%s" % text, decode=False)
com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key,
"--session-key", key_type, "--xml-data", fil]
"--session-key", session_key_type, "--xml-data", fil]
if xpath:
com_list.extend(['--node-xpath', xpath])
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [template],
exception=DecryptError,
@@ -767,7 +786,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
"--session-key", key_type, "--xml-data", fil,
"--node-xpath", ASSERT_XPATH]
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl], exception=EncryptError, validate_output=False)
(_stdout, _stderr, output) = self._run_xmlsec(
com_list, [tmpl], exception=EncryptError, validate_output=False)
os.unlink(fil)
if not output:
@@ -1206,7 +1226,7 @@ class SecurityContext(object):
:param text: Text to encrypt
:param recv_key: A file containing the receivers public key
:param template: A file containing the XML document template
:param template: A file containing the XMLSEC template
:param key_type: The type of session key to use
:result: An encrypted XML text
"""
@@ -1262,8 +1282,7 @@ class SecurityContext(object):
return self.crypto.validate_signature(signedtext, cert_file=cert_file,
cert_type=cert_type,
node_name=node_name,
node_id=node_id, id_attr=id_attr,
)
node_id=node_id, id_attr=id_attr)
def _check_signature(self, decoded_xml, item, node_name=NODE_NAME,
origdoc=None, id_attr="", must=False,
@@ -1735,7 +1754,10 @@ def pre_encrypt_assertion(response):
assertion = response.assertion
response.assertion = None
response.encrypted_assertion = EncryptedAssertion()
response.encrypted_assertion.add_extension_element(assertion)
if isinstance(assertion, list):
response.encrypted_assertion.add_extension_elements(assertion)
else:
response.encrypted_assertion.add_extension_element(assertion)
# txt = "%s" % response
# _ass = "%s" % assertion
# _ass = rm_xmltag(_ass)

View File

@@ -151,7 +151,7 @@ def test_audience():
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
conditions = utils.factory( saml.Conditions,
conditions = utils.factory(saml.Conditions,
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=[utils.factory(saml.AudienceRestriction,

View File

@@ -1,11 +1,12 @@
#!/usr/bin/env python
import base64
from saml2.sigver import pre_encryption_part, make_temp
from saml2.mdstore import MetadataStore
from saml2.saml import assertion_from_string
from saml2.saml import assertion_from_string, EncryptedAssertion
from saml2.samlp import response_from_string
from saml2 import sigver
from saml2 import sigver, extension_elements_to_elements
from saml2 import class_name
from saml2 import time_util
from saml2 import saml, samlp
@@ -25,9 +26,10 @@ PUB_KEY = full_path("test.pem")
PRIV_KEY = full_path("test.key")
def _eq(l1,l2):
def _eq(l1, l2):
return set(l1) == set(l2)
CERT1 = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
@@ -60,7 +62,7 @@ wKe3ACFXBvqGQN0IbcH49hu0FKhYFM/GPDJcIHFBsiyMBXChpye9vBaTNEBCtU3K
jjyG0hRT2mAQ9h+bkPmOvlEo/aH0xR68Z9hw4PF13w=="""
from pyasn1.codec.der import decoder
def test_cert_from_instance_1():
xml_response = open(SIGNED).read()
@@ -104,7 +106,6 @@ class FakeConfig():
class TestSecurity():
def setup_class(self):
# This would be one way to initialize the security context :
#
@@ -126,8 +127,8 @@ class TestSecurity():
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
attribute_statement=do_attribute_statement({
("", "", "surName"): ("Foo", ""),
("", "", "givenName"): ("Bar", ""),
("", "", "surName"): ("Foo", ""),
("", "", "givenName"): ("Bar", ""),
})
)
@@ -144,7 +145,7 @@ class TestSecurity():
def test_non_verify_2(self):
xml_response = open(FALSE_SIGNED).read()
raises(sigver.SignatureError,self.sec.correctly_signed_response,
raises(sigver.SignatureError, self.sec.correctly_signed_response,
xml_response)
def test_sign_assertion(self):
@@ -154,8 +155,8 @@ class TestSecurity():
#print sign_ass
sass = saml.assertion_from_string(sign_ass)
#print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
@@ -227,19 +228,21 @@ class TestSecurity():
def test_sign_response(self):
response = factory(samlp.Response,
assertion=self._assertion,
id="22222",
signature=sigver.pre_signature_part("22222", self.sec.my_cert))
assertion=self._assertion,
id="22222",
signature=sigver.pre_signature_part("22222",
self.sec
.my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
(class_name(response), response.id)]
(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
assert s_response is not None
print s_response
response = response_from_string(s_response)
sass = response.assertion[0]
print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
@@ -252,23 +255,27 @@ class TestSecurity():
assert item.id == "22222"
def test_sign_response_2(self):
assertion2 = factory( saml.Assertion,
version= "2.0",
id= "11122",
issue_instant= "2009-10-30T13:20:28Z",
signature= sigver.pre_signature_part("11122", self.sec.my_cert),
attribute_statement=do_attribute_statement({
("","","surName"): ("Fox",""),
("","","givenName") :("Bear",""),
})
)
assertion2 = factory(saml.Assertion,
version="2.0",
id="11122",
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11122",
self.sec
.my_cert),
attribute_statement=do_attribute_statement({
("", "", "surName"): ("Fox", ""),
("", "", "givenName"): ("Bear", ""),
})
)
response = factory(samlp.Response,
assertion=assertion2,
id="22233",
signature=sigver.pre_signature_part("22233", self.sec.my_cert))
assertion=assertion2,
id="22233",
signature=sigver.pre_signature_part("22233",
self.sec
.my_cert))
to_sign = [(class_name(assertion2), assertion2.id),
(class_name(response), response.id)]
(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
@@ -277,7 +284,7 @@ class TestSecurity():
sass = response2.assertion[0]
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11122"
@@ -285,105 +292,115 @@ class TestSecurity():
s_response)
assert isinstance(item, samlp.Response)
def test_sign_verify(self):
def test_sign_verify(self):
response = factory(samlp.Response,
assertion=self._assertion,
id="22233",
signature=sigver.pre_signature_part("22233", self.sec.my_cert))
assertion=self._assertion,
id="22233",
signature=sigver.pre_signature_part("22233",
self.sec
.my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
(class_name(response), response.id)]
(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
print s_response
res = self.sec.verify_signature("%s" % s_response,
node_name=class_name(samlp.Response()))
res = self.sec.verify_signature("%s" % s_response,
node_name=class_name(samlp.Response()))
print res
print res
assert res
def test_sign_verify_with_cert_from_instance(self):
response = factory(samlp.Response,
assertion=self._assertion,
id="22222",
signature=sigver.pre_signature_part("22222", self.sec.my_cert))
assertion=self._assertion,
id="22222",
signature=sigver.pre_signature_part("22222",
self.sec
.my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
(class_name(response), response.id)]
(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
response2 = response_from_string(s_response)
ci = "".join(sigver.cert_from_instance(response2)[0].split())
assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_response,
node_name=class_name(samlp.Response()))
res = self.sec.verify_signature("%s" % s_response,
node_name=class_name(samlp.Response()))
assert res
res = self.sec._check_signature(s_response, response2,
class_name(response2), s_response)
assert res == response2
def test_sign_verify_assertion_with_cert_from_instance(self):
assertion = factory( saml.Assertion,
version= "2.0",
id= "11100",
issue_instant= "2009-10-30T13:20:28Z",
signature= sigver.pre_signature_part("11100", self.sec.my_cert),
attribute_statement=do_attribute_statement({
("","","surName"): ("Fox",""),
("","","givenName") :("Bear",""),
})
)
assertion = factory(saml.Assertion,
version="2.0",
id="11100",
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11100",
self.sec
.my_cert),
attribute_statement=do_attribute_statement({
("", "", "surName"): ("Fox", ""),
("", "", "givenName"): ("Bear", ""),
})
)
to_sign = [(class_name(assertion), assertion.id)]
s_assertion = sigver.signed_instance_factory(assertion, self.sec, to_sign)
s_assertion = sigver.signed_instance_factory(assertion, self.sec,
to_sign)
print s_assertion
ass = assertion_from_string(s_assertion)
ci = "".join(sigver.cert_from_instance(ass)[0].split())
assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_assertion,
node_name=class_name(ass))
assert res
assert res
res = self.sec._check_signature(s_assertion, ass, class_name(ass))
assert res
def test_exception_sign_verify_with_cert_from_instance(self):
assertion = factory( saml.Assertion,
version= "2.0",
id= "11100",
issue_instant= "2009-10-30T13:20:28Z",
#signature= sigver.pre_signature_part("11100", self.sec.my_cert),
attribute_statement=do_attribute_statement({
("","","surName"): ("Foo",""),
("","","givenName") :("Bar",""),
})
)
assertion = factory(saml.Assertion,
version="2.0",
id="11100",
issue_instant="2009-10-30T13:20:28Z",
#signature= sigver.pre_signature_part("11100",
# self.sec.my_cert),
attribute_statement=do_attribute_statement({
("", "", "surName"): ("Foo", ""),
("", "", "givenName"): ("Bar", ""),
})
)
response = factory(samlp.Response,
assertion=assertion,
id="22222",
signature=sigver.pre_signature_part("22222", self.sec.my_cert))
assertion=assertion,
id="22222",
signature=sigver.pre_signature_part("22222",
self.sec
.my_cert))
to_sign = [(class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
response2 = response_from_string(s_response)
# Change something that should make everything fail
response2.id = "23456"
raises(sigver.SignatureError, self.sec._check_signature,
s_response, response2, class_name(response2))
s_response, response2, class_name(response2))
class TestSecurityMetadata():
@@ -397,36 +414,71 @@ class TestSecurityMetadata():
conf.only_use_keys_in_metadata = False
self.sec = sigver.security_context(conf)
self._assertion = factory( saml.Assertion,
version="2.0",
id="11111",
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
attribute_statement=do_attribute_statement({
("","","surName"): ("Foo",""),
("","","givenName") :("Bar",""),
})
assertion = factory(
saml.Assertion, version="2.0", id="11111",
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
attribute_statement=do_attribute_statement(
{("", "", "surName"): ("Foo", ""),
("", "", "givenName"): ("Bar", ""), })
)
def test_sign_assertion(self):
ass = self._assertion
print ass
sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id)
#print sign_ass
sass = saml.assertion_from_string(sign_ass)
#print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
print "Crypto version : %s" % (self.sec.crypto.version())
def test_xbox():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
md.load("local", full_path("idp_example.xml"))
item = self.sec.check_signature(sass, class_name(sass), sign_ass)
conf.metadata = md
conf.only_use_keys_in_metadata = False
sec = sigver.security_context(conf)
assertion = factory(
saml.Assertion, version="2.0", id="11111",
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
attribute_statement=do_attribute_statement(
{("", "", "surName"): ("Foo", ""),
("", "", "givenName"): ("Bar", ""), })
)
sigass = sec.sign_statement(assertion, class_name(assertion),
key_file="pki/mykey.pem", node_id=assertion.id)
_ass0 = saml.assertion_from_string(sigass)
encrypted_assertion = EncryptedAssertion()
encrypted_assertion.add_extension_element(_ass0)
_, pre = make_temp("%s" % pre_encryption_part(), decode=False)
enctext = sec.crypto.encrypt(
"%s" % encrypted_assertion, conf.cert_file, pre, "des-192",
'/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
decr_text = sec.decrypt(enctext)
_seass = saml.encrypted_assertion_from_string(decr_text)
assertions = []
assers = extension_elements_to_elements(_seass.extension_elements,
[saml, samlp])
sign_cert_file = "pki/mycert.pem"
for ass in assers:
_ass = "%s" % ass
#_ass = _ass.replace('xsi:nil="true" ', '')
#assert sigass == _ass
_txt = sec.verify_signature(_ass, sign_cert_file,
node_name=class_name(assertion))
if _txt:
assertions.append(ass)
print assertions
assert isinstance(item, saml.Assertion)
if __name__ == "__main__":
t = TestSecurity()
t.setup_class()
#t = TestSecurity()
#t.setup_class()
#t.test_sign_then_encrypt_assertion()
test_xbox()

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from saml2 import saml
from saml2 import config
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
@@ -9,7 +8,8 @@ from saml2.server import Server
from saml2.response import response_factory
from saml2.response import StatusResponse
from saml2.response import AuthnResponse
from saml2.sigver import security_context, MissingKey
from saml2.sigver import security_context
from saml2.sigver import MissingKey
from pytest import raises
@@ -26,7 +26,6 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"mail": ["foo@gmail.com"],
"title": ["shortstop"]}
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
@@ -39,29 +38,28 @@ class TestResponse:
name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
self._resp_ = server.create_authn_response(IDENTITY,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/",
# consumer_url
"urn:mace:example"
".com:saml:roland:sp",
# sp_entity_id
name_id=name_id)
self._resp_ = server.create_authn_response(
IDENTITY,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/",
# consumer_url
"urn:mace:example.com:saml:roland:sp",
# sp_entity_id
name_id=name_id)
self._sign_resp_ = server.create_authn_response(
IDENTITY,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_assertion=True)
self._resp_authn = server.create_authn_response(
IDENTITY,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=AUTHN)
@@ -72,7 +70,8 @@ class TestResponse:
def test_1(self):
xml_response = ("%s" % (self._resp_,))
resp = response_factory(xml_response, self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
return_addrs=[
"http://lingon.catalogix.se:8087/"],
outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False)
@@ -83,7 +82,8 @@ class TestResponse:
def test_2(self):
xml_response = self._sign_resp_
resp = response_factory(xml_response, self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
return_addrs=[
"http://lingon.catalogix.se:8087/"],
outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False)
@@ -92,14 +92,15 @@ class TestResponse:
assert isinstance(resp, AuthnResponse)
def test_only_use_keys_in_metadata(self):
conf = config.SPConfig()
conf.load_file("sp_2_conf")
def test_only_use_keys_in_metadata(self):
conf = config.SPConfig()
conf.load_file("sp_2_conf")
sc = security_context(conf)
# should fail
raises(MissingKey,
'sc.correctly_signed_response("%s" % self._sign_resp_)')
sc = security_context(conf)
# should fail
raises(MissingKey,
'sc.correctly_signed_response("%s" % self._sign_resp_)')
if __name__ == "__main__":
t = TestResponse()

View File

@@ -2,18 +2,25 @@
# -*- coding: utf-8 -*-
import base64
from urlparse import parse_qs
from saml2.sigver import pre_encryption_part
from saml2.assertion import Policy
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import response_from_string
from saml2.server import Server
from saml2 import samlp, saml, client, config
from saml2 import samlp
from saml2 import saml
from saml2 import client
from saml2 import config
from saml2 import class_name
from saml2 import extension_elements_to_elements
from saml2 import s_utils
from saml2 import sigver
from saml2 import time_util
from saml2.s_utils import OtherError
from saml2.s_utils import do_attribute_statement, factory
from saml2.s_utils import do_attribute_statement
from saml2.s_utils import factory
from saml2.soap import make_soap_enveloped_saml_thingy
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
@@ -182,7 +189,8 @@ class TestServer1():
name_id_policy = resp_args["name_id_policy"]
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
assert resp_args["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
assert resp_args[
"sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response_with_identity(self):
name_id = self.server.ident.transient_nameid(
@@ -195,8 +203,8 @@ class TestServer1():
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"
},
"id12", # in_response_to
"http://localhost:8087/", # destination
"id12", # in_response_to
"http://localhost:8087/", # destination
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=AUTHN
@@ -227,7 +235,8 @@ class TestServer1():
break
assert len(attr.attribute_value) == 1
assert attr.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
assert attr.name_format == "urn:oasis:names:tc:SAML:2" \
".0:attrname-format:uri"
value = attr.attribute_value[0]
assert value.text.strip() == "Short stop"
assert value.get_type() == "xs:string"
@@ -242,13 +251,13 @@ class TestServer1():
def test_sso_response_without_identity(self):
resp = self.server.create_authn_response(
{},
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=AUTHN,
release_policy=Policy(),
best_effort=True
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=AUTHN,
release_policy=Policy(),
best_effort=True
)
print resp.keyswv()
@@ -268,12 +277,12 @@ class TestServer1():
resp = self.server.create_authn_response(
{},
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=_authn,
best_effort=True
"id12", # in_response_to
"http://localhost:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
userid="USER1",
authn=_authn,
best_effort=True
)
print resp.keyswv()
@@ -297,11 +306,11 @@ class TestServer1():
print resp.status
assert resp.status.status_code.value == samlp.STATUS_RESPONDER
assert resp.status.status_code.status_code.value == \
samlp.STATUS_REQUEST_UNSUPPORTED
samlp.STATUS_REQUEST_UNSUPPORTED
assert resp.status.status_message.text == \
"eduPersonAffiliation missing"
"eduPersonAffiliation missing"
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert not resp.assertion
assert not resp.assertion
def test_authn_response_0(self):
self.server = Server("idp_conf")
@@ -346,8 +355,8 @@ class TestServer1():
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_assertion=True
@@ -480,7 +489,6 @@ def _logout_request(conf_file):
class TestServerLogout():
def test_1(self):
server = Server("idp_slo_redirect_conf")
req_id, request = _logout_request("sp_slo_redirect_conf")
@@ -502,4 +510,3 @@ class TestServerLogout():
if __name__ == "__main__":
ts = TestServer1()
ts.setup_class()
ts.test_sso_response_specific_instant()

View File

@@ -4,21 +4,32 @@
import base64
import urllib
import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.response import LogoutResponse
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import config
from saml2 import class_name
from saml2 import extension_elements_to_elements
from saml2 import saml
from saml2 import samlp
from saml2 import sigver
from saml2 import s_utils
from saml2.assertion import Assertion
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2 import samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import saml, config, class_name
from saml2.config import SPConfig
from saml2.response import LogoutResponse
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
from saml2.server import Server
from saml2.sigver import pre_encryption_part
from saml2.s_utils import do_attribute_statement
from saml2.s_utils import factory
from saml2.time_util import in_a_while
from py.test import raises
from fakeIDP import FakeIDP, unpack_form
from fakeIDP import FakeIDP
from fakeIDP import unpack_form
AUTHN = {
@@ -341,8 +352,123 @@ class TestClient:
print my_name
assert my_name == "urn:mace:example.com:saml:roland:sp"
# Below can only be done with dummy Server
def test_sign_then_encrypt_assertion(self):
# Begin with the IdPs side
_sec = self.server.sec
assertion = s_utils.assertion_factory(
subject=factory(saml.Subject, text="_aaa",
name_id=factory(
saml.NameID,
format=saml.NAMEID_FORMAT_TRANSIENT)),
attribute_statement=do_attribute_statement(
{
("", "", "surName"): ("Jeter", ""),
("", "", "givenName"): ("Derek", ""),
}
),
issuer=self.server._issuer(),
)
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
sigass = _sec.sign_statement(assertion, class_name(assertion),
key_file="pki/mykey.pem",
node_id=assertion.id)
# Create an Assertion instance from the signed assertion
_ass = saml.assertion_from_string(sigass)
response = sigver.response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
issuer=self.server._issuer(),
assertion=_ass
)
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part())
seresp = samlp.response_from_string(enctext)
# Now over to the client side
_csec = self.client.sec
if seresp.encrypted_assertion:
decr_text = _csec.decrypt(enctext)
seresp = samlp.response_from_string(decr_text)
resp_ass = []
sign_cert_file = "pki/mycert.pem"
for enc_ass in seresp.encrypted_assertion:
assers = extension_elements_to_elements(
enc_ass.extension_elements, [saml, samlp])
for ass in assers:
if ass.signature:
if not _csec.verify_signature("%s" % ass,
sign_cert_file,
node_name=class_name(ass)):
continue
resp_ass.append(ass)
seresp.assertion = resp_ass
seresp.encrypted_assertion = None
#print _sresp
assert seresp.assertion
def test_sign_then_encrypt_assertion2(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
sigass = _sec.sign_statement(assertion, class_name(assertion),
#key_file="pki/mykey.pem",
key_file="test.key",
node_id=assertion.id)
# Create an Assertion instance from the signed assertion
_ass = saml.assertion_from_string(sigass)
response = sigver.response_factory(
in_response_to="_012345",
destination="https://www.example.com",
status=s_utils.success_status_factory(),
issuer=self.server._issuer(),
assertion=_ass
)
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part())
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"
@@ -448,4 +574,4 @@ class TestClientWithDummy():
if __name__ == "__main__":
tc = TestClient()
tc.setup_class()
tc.test_sign_auth_request_0()
tc.test_sign_then_encrypt_assertion2()

View File

@@ -73,6 +73,7 @@ class TestSP():
'sn': ['Jeter'],
'title': ['The man']}
if __name__ == "__main__":
_sp = TestSP()
_sp.setup_class()

View File

@@ -8,7 +8,6 @@ from saml2.cert import OpenSSLWrapper
class TestGenerateCertificates(unittest.TestCase):
def test_validate_with_root_cert(self):
cert_info_ca = {
@@ -33,23 +32,35 @@ class TestGenerateCertificates(unittest.TestCase):
ca_cert, ca_key = osw.create_certificate(cert_info_ca, request=False,
write_to_file=True,
cert_dir=os.path.dirname(os.path.abspath(__file__)) + "/pki")
cert_dir=os.path.dirname(
os.path.abspath(
__file__)) + "/pki")
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
ca_cert_str = osw.read_str_from_file(ca_cert)
ca_key_str = osw.read_str_from_file(ca_key)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertTrue(valid)
false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=False)
false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(cert_info_ca, request=True)
false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_1)
false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(cert_info, request=True)
false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_2)
false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca,
request=False,
write_to_file=False)
false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(
cert_info_ca, request=True)
false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert,
false_ca_key,
false_req_cert_str_1)
false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(
cert_info, request=True)
false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert,
false_ca_key,
false_req_cert_str_2)
valid, mess = osw.verify(false_ca_cert, cert_str)
self.assertFalse(valid)
@@ -106,20 +117,28 @@ class TestGenerateCertificates(unittest.TestCase):
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False)
req_cert_str, intermediate_1_key_str = osw.create_certificate(cert_intermediate_1_info, request=True)
intermediate_cert_1_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
req_cert_str, intermediate_1_key_str = osw.create_certificate(
cert_intermediate_1_info, request=True)
intermediate_cert_1_str = osw.create_cert_signed_certificate(
ca_cert_str, ca_key_str, req_cert_str)
req_cert_str, intermediate_2_key_str = osw.create_certificate(cert_intermediate_2_info, request=True)
intermediate_cert_2_str = osw.create_cert_signed_certificate(intermediate_cert_1_str, intermediate_1_key_str,
req_cert_str)
req_cert_str, intermediate_2_key_str = osw.create_certificate(
cert_intermediate_2_info, request=True)
intermediate_cert_2_str = osw.create_cert_signed_certificate(
intermediate_cert_1_str, intermediate_1_key_str,
req_cert_str)
req_cert_str, client_key_str = osw.create_certificate(cert_client_cert_info, request=True)
client_cert_str = osw.create_cert_signed_certificate(intermediate_cert_2_str, intermediate_2_key_str,
req_cert_str)
req_cert_str, client_key_str = osw.create_certificate(
cert_client_cert_info, request=True)
client_cert_str = osw.create_cert_signed_certificate(
intermediate_cert_2_str, intermediate_2_key_str,
req_cert_str)
cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, ca_cert_str]
cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str,
ca_cert_str]
valid, mess = osw.verify_chain(cert_chain, client_cert_str)
self.assertTrue(valid)
@@ -145,21 +164,23 @@ class TestGenerateCertificates(unittest.TestCase):
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False,
cipher_passphrase=
{"cipher": "blowfish", "passphrase": "qwerty"})
ca_cert_str, ca_key_str = osw.create_certificate(
cert_info_ca, request=False,
cipher_passphrase={"cipher": "blowfish", "passphrase": "qwerty"})
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
passphrase="qwerty")
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str,
passphrase="qwerty")
valid = False
try:
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
passphrase="qwertyqwerty")
cert_str = osw.create_cert_signed_certificate(
ca_cert_str, ca_key_str, req_cert_str,
passphrase="qwertyqwerty")
except Exception:
valid = True
@@ -185,39 +206,59 @@ class TestGenerateCertificates(unittest.TestCase):
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=1000, valid_to=100000)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False,
valid_from=1000,
valid_to=100000)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=1000,
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str,
valid_from=1000,
valid_to=100000)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=0, valid_to=1)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False,
valid_from=0,
valid_to=1)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=0, valid_to=1)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info,
request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
req_cert_str,
valid_from=0, valid_to=1)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)