diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index 374d96a..2ab9374 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -297,10 +297,13 @@ class SSO(Service):
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
try:
+ sign_assertion = IDP.config.getattr("sign_assertion", "idp")
+ if sign_assertion is None:
+ sign_assertion = False
_resp = IDP.create_authn_response(
identity, userid=self.user,
- authn=AUTHN_BROKER[self.environ["idp.authn_ref"]],
- **resp_args)
+ authn=AUTHN_BROKER[self.environ["idp.authn_ref"]], sign_assertion=sign_assertion,
+ sign_response=False, **resp_args)
except Exception, excp:
logging.error(exception_trace(excp))
resp = ServiceError("Exception: %s" % (excp,))
@@ -322,6 +325,7 @@ class SSO(Service):
def redirect(self):
""" This is the HTTP-redirect endpoint """
+
logger.info("--- In SSO Redirect ---")
_info = self.unpack_redirect()
diff --git a/example/sp/nocert_sp_conf/sp.xml b/example/sp/nocert_sp_conf/sp.xml
new file mode 100644
index 0000000..71bdb9d
--- /dev/null
+++ b/example/sp/nocert_sp_conf/sp.xml
@@ -0,0 +1,2 @@
+
+http://www.geant.net/uri/dataprotection-code-of-conduct/v1http://www.swamid.se/category/research-and-educationhttp://www.swamid.se/category/hei-servicehttp://www.swamid.se/category/sfs-1993-1153http://www.swamid.se/category/nren-servicehttp://www.swamid.se/category/eu-adequate-protectionLokal test SP HansLokal test SP HansLokal test SP Hanshttp://130.239.200.146:8087
diff --git a/example/sp/nocert_sp_conf/sp_conf.py b/example/sp/nocert_sp_conf/sp_conf.py
new file mode 100644
index 0000000..15ccd67
--- /dev/null
+++ b/example/sp/nocert_sp_conf/sp_conf.py
@@ -0,0 +1,97 @@
+from saml2 import BINDING_HTTP_REDIRECT
+from saml2.extension.idpdisc import BINDING_DISCO
+from saml2.saml import NAME_FORMAT_URI
+from saml2.sigver import get_xmlsec_binary, CertHandlerExtra
+from saml2.entity_category.edugain import COC
+from saml2.entity_category.swamid import RESEARCH_AND_EDUCATION
+from saml2.entity_category.swamid import HEI
+from saml2.entity_category.swamid import SFS_1993_1153
+from saml2.entity_category.swamid import NREN
+from saml2.entity_category.swamid import EU
+
+
+#BASE= "http://130.239.200.146:8087"
+BASE= "http://localhost:8087"
+#BASE= "http://lingon.catalogix.se:8087"
+
+
+class SpCertHandlerExtraClass(CertHandlerExtra):
+
+ def use_generate_cert_func(self):
+ return True
+
+ def generate_cert(self, generate_cert_info, ca_cert_string, ca_key_string):
+ print "Hello"
+ return (ca_cert_string, ca_key_string)
+
+ def use_validate_cert_func(self):
+ return False
+
+ def validate_cert(self, cert_str, ca_cert_string, ca_key_string):
+ pass
+
+CONFIG = {
+ "entityid": "%s/LocalTestSPHans.xml" % BASE,
+ "description": "Lokal test SP Hans",
+ "entity_category": [COC, RESEARCH_AND_EDUCATION, HEI, SFS_1993_1153, NREN, EU],
+ "only_use_keys_in_metadata": False,
+ "cert_handler_extra_class": None,#MyCertGeneration(),
+ "generate_cert_info": {
+ "cn": "localhost",
+ "country_code": "se",
+ "state": "ac",
+ "city": "Umea",
+ "organization": "ITS Umea University",
+ "organization_unit": "DIRG"
+ },
+ "tmp_key_file": "pki/tmp_mykey.pem",
+ "tmp_cert_file": "pki/tmp_mycert.pem",
+ "validate_certificate": True,
+ "service": {
+ "sp": {
+ "authn_requests_signed": "true", #Will sign the request!
+ "want_assertions_signed": "true", #Demands that the assertion is signed.
+ "name": "LocalTestSPHans",
+ "endpoints": {
+ "assertion_consumer_service": [BASE],
+ "single_logout_service": [(BASE + "/slo",
+ BINDING_HTTP_REDIRECT)],
+ "discovery_response": [
+ ("%s/disco" % BASE, BINDING_DISCO)
+ ]
+ },
+ "required_attributes": ["surname", "givenname",
+ "edupersonaffiliation"],
+ "optional_attributes": ["title"],
+ }
+ },
+ "debug": 1,
+ "key_file": "pki/localhost.ca.key",
+ "cert_file": "pki/localhost.ca.crt",
+ "attribute_map_dir": "./attributemaps",
+ "metadata": {
+ "local": ["../idp2/idp_nocert.xml"]
+ # #"remote": [{"url": "http://130.239.201.5/role/idp.xml", "cert": None}],
+ },
+ #"metadata": {"local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]},
+
+ # -- below used by make_metadata --
+ "organization": {
+ "name": "Lokal test SP Hans",
+ "display_name": [("Lokal test SP Hans", "se"), ("Lokal test SP Hans", "en")],
+ "url": "http://130.239.200.146:8087",
+ },
+ "contact_person": [
+ ],
+ "xmlsec_binary": '/usr/local/bin/xmlsec1',
+ "name_form": NAME_FORMAT_URI,
+ "logger": {
+ "rotating": {
+ "filename": "sp.log",
+ "maxBytes": 100000,
+ "backupCount": 5,
+ },
+ "loglevel": "debug",
+ }
+}
+
diff --git a/example/sp/nocert_sp_conf/who.ini b/example/sp/nocert_sp_conf/who.ini
new file mode 100644
index 0000000..1ed329f
--- /dev/null
+++ b/example/sp/nocert_sp_conf/who.ini
@@ -0,0 +1,42 @@
+[plugin:auth_tkt]
+# identification
+use = repoze.who.plugins.auth_tkt:make_plugin
+secret = kasamark
+cookie_name = pysaml2
+secure = False
+include_ip = True
+timeout = 3600
+reissue_time = 3000
+
+# IDENTIFIER
+# @param :
+# - rememberer_name : name of the plugin for remembering (delegate)
+[plugin:saml2auth]
+use = s2repoze.plugins.sp:make_plugin
+saml_conf = sp_conf
+remember_name = auth_tkt
+sid_store = outstanding
+idp_query_param = IdPEntityId
+discovery = http://130.239.201.5/role/idp.ds
+
+[general]
+request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier
+challenge_decider = repoze.who.classifiers:default_challenge_decider
+remote_user_key = REMOTE_USER
+
+[identifiers]
+# plugin_name;classifier_name:.. or just plugin_name (good for any)
+plugins =
+ saml2auth
+ auth_tkt
+
+[authenticators]
+# plugin_name;classifier_name.. or just plugin_name (good for any)
+plugins = saml2auth
+
+[challengers]
+# plugin_name;classifier_name:.. or just plugin_name (good for any)
+plugins = saml2auth
+
+[mdproviders]
+plugins = saml2auth
diff --git a/example/sp/sp.xml b/example/sp/sp.xml
index 9fbb178..71bdb9d 100644
--- a/example/sp/sp.xml
+++ b/example/sp/sp.xml
@@ -1,34 +1,2 @@
-MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
-BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
-EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
-MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
-YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
-DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
-bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
-FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
-mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
-BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
-o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
-BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
-AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
-BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
-zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
-+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
-BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
-EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
-MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
-YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
-DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
-bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
-FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
-mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
-BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
-o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
-BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
-AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
-BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
-zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
-+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-Exempel ABExempel ABExample Co.http://www.example.com/rolandJohnSmithjohn.smith@example.com
+http://www.geant.net/uri/dataprotection-code-of-conduct/v1http://www.swamid.se/category/research-and-educationhttp://www.swamid.se/category/hei-servicehttp://www.swamid.se/category/sfs-1993-1153http://www.swamid.se/category/nren-servicehttp://www.swamid.se/category/eu-adequate-protectionLokal test SP HansLokal test SP HansLokal test SP Hanshttp://130.239.200.146:8087
diff --git a/example/sp/sp_conf.py b/example/sp/sp_conf.py
index d324427..a9e2fe3 100644
--- a/example/sp/sp_conf.py
+++ b/example/sp/sp_conf.py
@@ -1,19 +1,64 @@
from saml2 import BINDING_HTTP_REDIRECT
+from saml2.extension.idpdisc import BINDING_DISCO
from saml2.saml import NAME_FORMAT_URI
+from saml2.sigver import get_xmlsec_binary, CertHandlerExtra
+from saml2.entity_category.edugain import COC
+from saml2.entity_category.swamid import RESEARCH_AND_EDUCATION
+from saml2.entity_category.swamid import HEI
+from saml2.entity_category.swamid import SFS_1993_1153
+from saml2.entity_category.swamid import NREN
+from saml2.entity_category.swamid import EU
+
+#BASE= "http://130.239.200.146:8087"
BASE= "http://localhost:8087"
#BASE= "http://lingon.catalogix.se:8087"
+
+class SpCertHandlerExtraClass(CertHandlerExtra):
+
+ def use_generate_cert_func(self):
+ return True
+
+ def generate_cert(self, generate_cert_info, ca_cert_string, ca_key_string):
+ print "Hello"
+ return (ca_cert_string, ca_key_string)
+
+ def use_validate_cert_func(self):
+ return False
+
+ def validate_cert(self, cert_str, ca_cert_string, ca_key_string):
+ pass
+
CONFIG = {
- "entityid": "%s/sp.xml" % BASE,
- "description": "My SP",
+ "entityid": "%s/LocalTestSPHans.xml" % BASE,
+ "description": "Lokal test SP Hans",
+ "entity_category": [COC, RESEARCH_AND_EDUCATION, HEI, SFS_1993_1153, NREN, EU],
+ "only_use_keys_in_metadata": False,
+ "cert_handler_extra_class": None,#MyCertGeneration(),
+ "generate_cert_info": {
+ "cn": "localhost",
+ "country_code": "se",
+ "state": "ac",
+ "city": "Umea",
+ "organization": "ITS Umea University",
+ "organization_unit": "DIRG"
+ },
+ "tmp_key_file": "pki/tmp_mykey.pem",
+ "tmp_cert_file": "pki/tmp_mycert.pem",
+ "validate_certificate": True,
"service": {
"sp": {
- "name": "Rolands SP",
+ "authn_requests_signed": "true", #Will sign the request!
+ "want_assertions_signed": "true", #Demands that the assertion is signed.
+ "name": "LocalTestSPHans",
"endpoints": {
"assertion_consumer_service": [BASE],
"single_logout_service": [(BASE + "/slo",
BINDING_HTTP_REDIRECT)],
+ "discovery_response": [
+ ("%s/disco" % BASE, BINDING_DISCO)
+ ]
},
"required_attributes": ["surname", "givenname",
"edupersonaffiliation"],
@@ -21,24 +66,25 @@ CONFIG = {
}
},
"debug": 1,
- "key_file": "pki/mykey.pem",
- "cert_file": "pki/mycert.pem",
+ "key_file": "pki/localhost.ca.key",
+ "cert_file": "pki/localhost.ca.crt",
"attribute_map_dir": "./attributemaps",
- "metadata": {"local": ["../idp2/idp.xml"]},
+ "metadata": {
+ #"local": ["../idp2/idp_nocert.xml"]
+ "local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]
+ # #"remote": [{"url": "http://130.239.201.5/role/idp.xml", "cert": None}],
+ },
+ #"metadata": {"local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]},
+
# -- below used by make_metadata --
"organization": {
- "name": "Exempel AB",
- "display_name": [("Exempel AB", "se"), ("Example Co.", "en")],
- "url": "http://www.example.com/roland",
+ "name": "Lokal test SP Hans",
+ "display_name": [("Lokal test SP Hans", "se"), ("Lokal test SP Hans", "en")],
+ "url": "http://130.239.200.146:8087",
},
- "contact_person": [{
- "given_name":"John",
- "sur_name": "Smith",
- "email_address": ["john.smith@example.com"],
- "contact_type": "technical",
- },
+ "contact_person": [
],
- #"xmlsec_binary":"/opt/local/bin/xmlsec1",
+ "xmlsec_binary": '/usr/local/bin/xmlsec1',
"name_form": NAME_FORMAT_URI,
"logger": {
"rotating": {
@@ -49,3 +95,4 @@ CONFIG = {
"loglevel": "debug",
}
}
+
diff --git a/example/sp/who.ini b/example/sp/who.ini
index ae65a67..1ed329f 100644
--- a/example/sp/who.ini
+++ b/example/sp/who.ini
@@ -17,6 +17,7 @@ saml_conf = sp_conf
remember_name = auth_tkt
sid_store = outstanding
idp_query_param = IdPEntityId
+discovery = http://130.239.201.5/role/idp.ds
[general]
request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier
diff --git a/src/s2repoze/plugins/sp.py b/src/s2repoze/plugins/sp.py
index 3152628..68a26a2 100644
--- a/src/s2repoze/plugins/sp.py
+++ b/src/s2repoze/plugins/sp.py
@@ -23,7 +23,9 @@ import logging
import sys
import platform
import shelve
+import threading
import traceback
+import saml2
from urlparse import parse_qs, urlparse
from StringIO import StringIO
@@ -133,7 +135,6 @@ class SAML2Plugin(object):
self.discosrv = discovery
self.idp_query_param = idp_query_param
self.logout_endpoints = [urlparse(ep)[2] for ep in config.endpoint("single_logout_service")]
-
try:
self.metadata = self.conf.metadata
except KeyError:
@@ -360,11 +361,18 @@ class SAML2Plugin(object):
logger.debug("srvs: %s" % srvs)
dest = srvs[0]["location"]
logger.debug("destination: %s" % dest)
- req = _cli.create_authn_request(dest, vorg=vorg_name)
- ht_args = _cli.apply_binding(_binding, "%s" % req,
- destination=dest,
- relay_state=came_from)
- _sid = req.id
+
+ if _cli.authn_requests_signed:
+ _sid = saml2.s_utils.sid(_cli.seed)
+ msg_str = _cli.create_authn_request(dest, vorg=vorg_name, sign=_cli.authn_requests_signed,
+ message_id=_sid)
+ else:
+ req = _cli.create_authn_request(dest, vorg=vorg_name, sign=False)
+ msg_str = "%s" % req
+ _sid = req.id
+
+ ht_args = _cli.apply_binding(_binding, msg_str, destination=dest, relay_state=came_from)
+
logger.debug("ht_args: %s" % ht_args)
except Exception, exc:
logger.exception(exc)
diff --git a/src/saml2/cert.py b/src/saml2/cert.py
new file mode 100644
index 0000000..d561986
--- /dev/null
+++ b/src/saml2/cert.py
@@ -0,0 +1,303 @@
+import base64
+import traceback
+from M2Crypto.util import passphrase_callback
+import datetime
+import dateutil.parser
+import pytz
+
+__author__ = 'haho0032'
+from OpenSSL import crypto
+from os.path import exists, join
+from os import remove
+from Crypto.Util import asn1
+
+
+class WrongInput(Exception):
+ pass
+
+
+class CertificateError(Exception):
+ pass
+
+
+class PayloadError(Exception):
+ pass
+
+
+class OpenSSLWrapper(object):
+
+ def __init__(self):
+ pass
+
+ def create_certificate(self, cert_info, request=False, valid_from=0, valid_to=315360000, sn=1, key_length=1024,
+ hash_alg="sha256", write_to_file=False, cert_dir="", cipher_passphrase = None):
+ """
+ Can create certificate requests, to be signed later by another certificate with the method
+ create_cert_signed_certificate. If request is True.
+
+ Can also create self signed root certificates if request is False. This is default behaviour.
+
+ :param cert_info: Contains information about the certificate.
+ Is a dictionary that must contain the keys:
+ cn = Common name. This part must match the host being authenticated
+ country_code = Two letter description of the country.
+ state = State
+ city = City
+ organization = Organization, can be a company name.
+ organization_unit = A unit at the organization, can be a department.
+ Example:
+ cert_info_ca = {
+ "cn": "company.com",
+ "country_code": "se",
+ "state": "AC",
+ "city": "Dorotea",
+ "organization": "Company",
+ "organization_unit": "Sales"
+ }
+ :param request: True if this is a request for certificate, that should be signed.
+ False if this is a self signed certificate, root certificate.
+ :param valid_from: When the certificate starts to be valid. Amount of seconds from when the
+ certificate is generated.
+ :param valid_to: How long the certificate will be valid from when it is generated.
+ The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
+ :param sn: Serial number for the certificate. Default is 1.
+ :param key_length: Length of the key to be generated. Defaults to 1024.
+ :param hash_alg: Hash algorithm to use for the key. Default is sha256.
+ :param write_to_file: True if you want to write the certificate to a file. The method will then return
+ a tuple with path to certificate file and path to key file.
+ False if you want to get the result as strings. The method will then return a tuple
+ with the certificate string and the key as string.
+ WILL OVERWRITE ALL EXISTING FILES WITHOUT ASKING!
+ :param cert_dir: Where to save the files if write_to_file is true.
+ :param cipher_passphrase A dictionary with cipher and passphrase. Example:
+ {"cipher": "blowfish", "passphrase": "qwerty"}
+ :return: string representation of certificate, string representation of private key
+ if write_to_file parameter is False otherwise
+ path to certificate file, path to private key file
+ """
+ cn = cert_info["cn"]
+
+ c_f = None
+ k_f = None
+
+ if write_to_file:
+ cert_file = "%s.crt" % cn
+ key_file = "%s.key" % cn
+ try:
+ remove(cert_file)
+ except:
+ pass
+ try:
+ remove(key_file)
+ except:
+ pass
+ c_f = join(cert_dir, cert_file)
+ k_f = join(cert_dir, key_file)
+
+
+ # create a key pair
+ k = crypto.PKey()
+ k.generate_key(crypto.TYPE_RSA, key_length)
+
+ # create a self-signed cert
+ cert = crypto.X509()
+
+ if request:
+ cert = crypto.X509Req()
+
+ if (len(cert_info["country_code"]) != 2):
+ raise WrongInput("Country code must be two letters!")
+ cert.get_subject().C = cert_info["country_code"]
+ cert.get_subject().ST = cert_info["state"]
+ cert.get_subject().L = cert_info["city"]
+ cert.get_subject().O = cert_info["organization"]
+ cert.get_subject().OU = cert_info["organization_unit"]
+ cert.get_subject().CN = cn
+ if not request:
+ cert.set_serial_number(sn)
+ cert.gmtime_adj_notBefore(valid_from) #Valid before present time
+ cert.gmtime_adj_notAfter(valid_to) #3 650 days
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(k)
+ cert.sign(k, hash_alg)
+
+ filesCreated = False
+ try:
+ if request:
+ tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert)
+ else:
+ tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+ tmp_key = None
+ if cipher_passphrase is not None:
+ tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"],
+ cipher_passphrase["passphrase"])
+ else:
+ tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
+ if write_to_file:
+ fc = open(c_f, "wt")
+ fk = open(k_f, "wt")
+
+ if request:
+ fc.write(tmp_cert)
+ else:
+ fc.write(tmp_cert)
+ fk.write(tmp_key)
+ filesCreated = True
+ try:
+ fc.close()
+ except:
+ pass
+
+ try:
+ fk.close()
+ except:
+ pass
+ return c_f, k_f
+ return tmp_cert, tmp_key
+ except Exception as ex:
+ raise CertificateError("Certificate cannot be generated.", ex)
+
+ def write_str_to_file(self, file, str_data):
+ f = open(file, "wt")
+ f.write(str_data)
+ f.close()
+
+ def read_str_from_file(self, file, type="pem"):
+ f = open(file)
+ str_data = f.read()
+ f.close()
+
+ if type == "pem":
+ return str_data
+
+ if type in ["der", "cer", "crt"]:
+ return base64.b64encode(str(str_data))
+
+
+ def create_cert_signed_certificate(self, sign_cert_str, sign_key_str, request_cert_str, hash_alg="sha256",
+ valid_from=0, valid_to=315360000, sn=1, passphrase=None):
+
+ """
+ Will sign a certificate request with a give certificate.
+ :param sign_cert_str: This certificate will be used to sign with. Must be a string representation of
+ the certificate. If you only have a file use the method read_str_from_file to
+ get a string representation.
+ :param sign_key_str: This is the key for the ca_cert_str represented as a string.
+ If you only have a file use the method read_str_from_file to get a string
+ representation.
+ :param request_cert_str: This is the prepared certificate to be signed. Must be a string representation of
+ the requested certificate. If you only have a file use the method read_str_from_file
+ to get a string representation.
+ :param hash_alg: Hash algorithm to use for the key. Default is sha256.
+ :param valid_from: When the certificate starts to be valid. Amount of seconds from when the
+ certificate is generated.
+ :param valid_to: How long the certificate will be valid from when it is generated.
+ The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
+ :param sn: Serial number for the certificate. Default is 1.
+ :param passphrase: Password for the private key in sign_key_str.
+ :return: String representation of the signed certificate.
+ """
+ ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
+ ca_key = None
+ if passphrase is not None:
+ ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase)
+ else:
+ ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
+ req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str)
+
+ cert = crypto.X509()
+ cert.set_subject(req_cert.get_subject())
+ cert.set_serial_number(sn)
+ cert.gmtime_adj_notBefore(valid_from)
+ cert.gmtime_adj_notAfter(valid_to)
+ cert.set_issuer(ca_cert.get_subject())
+ cert.set_pubkey(req_cert.get_pubkey())
+ cert.sign(ca_key, hash_alg)
+
+ return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+
+ def verify_chain(self, cert_chain_str_list, cert_str):
+ """
+
+ :param cert_chain_str_list: Must be a list of certificate strings, where the first certificate to be validate
+ is in the beginning and the root certificate is last.
+ :param cert_str: The certificate to be validated.
+ :return:
+ """
+ for tmp_cert_str in cert_chain_str_list:
+ valid, message = self.verify(tmp_cert_str, cert_str)
+ if not valid:
+ return False, message
+ else:
+ cert_str = tmp_cert_str
+ return True, "Signed certificate is valid and correctly signed by CA certificate."
+
+ def certificate_not_valid_yet(self, cert):
+ starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
+ now = pytz.UTC.localize(datetime.datetime.utcnow())
+ if starts_to_be_valid < now:
+ return False
+ return True
+
+
+ def verify(self, signing_cert_str, cert_str):
+ """
+ Verifies if a certificate is valid and signed by a given certificate.
+
+ :param signing_cert_str: This certificate will be used to verify the signature. Must be a string representation
+ of the certificate. If you only have a file use the method read_str_from_file to
+ get a string representation.
+ :param cert_str: This certificate will be verified if it is correct. Must be a string representation
+ of the certificate. If you only have a file use the method read_str_from_file to
+ get a string representation.
+ :return: Valid, Message
+ Valid = True if the certificate is valid, otherwise false.
+ Message = Why the validation failed.
+ """
+ try:
+ ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str)
+ cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
+
+ if self.certificate_not_valid_yet(ca_cert):
+ return False, "CA certificate is not valid yet."
+
+ if ca_cert.has_expired() == 1:
+ return False, "CA certificate is expired."
+
+ if cert.has_expired() == 1:
+ return False, "The signed certificate is expired."
+
+ if self.certificate_not_valid_yet(cert):
+ return False, "The signed certificate is not valid yet."
+
+ if ca_cert.get_subject().CN == cert.get_subject().CN:
+ return False, "CN may not be equal for CA certificate and the signed certificate."
+
+ cert_algorithm = cert.get_signature_algorithm()
+
+ cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
+
+ der_seq = asn1.DerSequence()
+ der_seq.decode(cert_asn1)
+
+ cert_certificate=der_seq[0]
+ #cert_signature_algorithm=der_seq[1]
+ cert_signature=der_seq[2]
+
+ cert_signature_decoded = asn1.DerObject()
+ cert_signature_decoded.decode(cert_signature)
+
+ signature_payload = cert_signature_decoded.payload
+
+ if signature_payload[0] != '\x00':
+ return False, "The certificate should not contain any unused bits."
+
+ signature = signature_payload[1:]
+
+ try:
+ crypto.verify(ca_cert, signature, cert_certificate, cert_algorithm)
+ return True, "Signed certificate is valid and correctly signed by CA certificate."
+ except crypto.Error, e:
+ return False, "Certificate is incorrectly signed."
+ except Exception, e:
+ return False, "Certificate is not valid for an unknown reason."
\ No newline at end of file
diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py
index aabd26e..9e70933 100644
--- a/src/saml2/client_base.py
+++ b/src/saml2/client_base.py
@@ -18,6 +18,7 @@
"""Contains classes and functions that a SAML2.0 Service Provider (SP) may use
to conclude its tasks.
"""
+import threading
from urllib import urlencode
from urlparse import urlparse
@@ -110,7 +111,7 @@ class Base(Entity):
Entity.__init__(self, "sp", config, config_file, virtual_organization)
self.users = Population(identity_cache)
-
+ self.lock = threading.Lock()
# for server state storage
if state_cache is None:
self.state = {} # in memory storage
@@ -298,11 +299,19 @@ class Base(Entity):
except KeyError:
pass
+ if sign and self.sec.cert_handler.generate_cert():
+ with self.lock:
+ self.sec.cert_handler.update_cert(True)
+ return self._message(AuthnRequest, destination, message_id, consent,
+ extensions, sign, sign_prepare,
+ protocol_binding=binding,
+ scoping=scoping, **args)
return self._message(AuthnRequest, destination, message_id, consent,
extensions, sign, sign_prepare,
protocol_binding=binding,
scoping=scoping, **args)
+
def create_attribute_query(self, destination, name_id=None,
attribute=None, message_id=0, consent=None,
extensions=None, sign=False, sign_prepare=False,
diff --git a/src/saml2/config.py b/src/saml2/config.py
index 22ec13c..a597ea5 100644
--- a/src/saml2/config.py
+++ b/src/saml2/config.py
@@ -62,7 +62,12 @@ COMMON_ARGS = [
"session_storage",
"entity_category",
"xmlsec_path",
- "extension_schemas"
+ "extension_schemas",
+ "cert_handler_extra_class",
+ "generate_cert_info",
+ "tmp_cert_file",
+ "tmp_key_file",
+ "validate_certificate"
]
SP_ARGS = [
@@ -84,6 +89,7 @@ SP_ARGS = [
]
AA_IDP_ARGS = [
+ "sign_assertion",
"want_authn_requests_signed",
"provided_attributes",
"subject_data",
@@ -199,6 +205,11 @@ class Config(object):
self.scope = ""
self.allow_unknown_attributes = False
self.extension_schema = {}
+ self.cert_handler_extra_class = None
+ self.generate_cert_info = None
+ self.tmp_cert_file = None
+ self.tmp_key_file = None
+ self.validate_certificate = None
def setattr(self, context, attr, val):
if context == "":
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 8e519a6..4da39ef 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -542,7 +542,8 @@ class Entity(HTTPBase):
origdoc = xmlstr
xmlstr = self.unravel(xmlstr, binding, request_cls.msgtype)
- _request = _request.loads(xmlstr, binding, origdoc=origdoc)
+ must = self.config.getattr("want_authn_requests_signed", "idp")
+ _request = _request.loads(xmlstr, binding, origdoc=origdoc, must=must)
_log_debug("Loaded request")
diff --git a/src/saml2/request.py b/src/saml2/request.py
index c680434..c6a65cd 100644
--- a/src/saml2/request.py
+++ b/src/saml2/request.py
@@ -36,12 +36,12 @@ class Request(object):
self.message = None
self.not_on_or_after = 0
- def _loads(self, xmldata, binding=None, origdoc=None):
+ def _loads(self, xmldata, binding=None, origdoc=None, must=None):
# own copy
self.xmlstr = xmldata[:]
logger.info("xmlstr: %s" % (self.xmlstr,))
try:
- self.message = self.signature_check(xmldata, origdoc=origdoc)
+ self.message = self.signature_check(xmldata, origdoc=origdoc, must=must)
except TypeError:
raise
except Exception, excp:
@@ -84,8 +84,8 @@ class Request(object):
assert self.issue_instant_ok()
return self
- def loads(self, xmldata, binding, origdoc=None):
- return self._loads(xmldata, binding, origdoc)
+ def loads(self, xmldata, binding, origdoc=None, must=None):
+ return self._loads(xmldata, binding, origdoc, must)
def verify(self):
try:
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 97dad77..e81c957 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -22,6 +22,7 @@ import logging
import os
import shelve
+import threading
from saml2.eptid import EptidShelve, Eptid
from saml2.sdb import SessionStorage
@@ -71,6 +72,7 @@ class Server(Entity):
self.symkey = symkey
self.seed = rndstr()
self.iv = os.urandom(16)
+ self.lock = threading.Lock()
def support_AssertionIDRequest(self):
return True
@@ -309,7 +311,7 @@ class Server(Entity):
self.config.attribute_converters,
policy, issuer=_issuer)
- if sign_assertion:
+ if sign_assertion is not None and sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
@@ -419,7 +421,6 @@ class Server(Entity):
:param sign_response: Whether the response should be signed or not.
:return: A response instance
"""
-
policy = self.config.getattr("policy", "idp")
if not name_id:
@@ -455,16 +456,30 @@ class Server(Entity):
try:
_authn = authn
- return self._authn_response(in_response_to, # in_response_to
- destination, # consumer_url
- sp_entity_id, # sp_entity_id
- identity, # identity as dictionary
- name_id,
- authn=_authn,
- issuer=issuer,
- policy=policy,
- sign_assertion=sign_assertion,
- sign_response=sign_response)
+ if (sign_assertion or sign_response) and self.sec.cert_handler.generate_cert():
+ with self.lock:
+ self.sec.cert_handler.update_cert(True)
+ return self._authn_response(in_response_to, # in_response_to
+ destination, # consumer_url
+ sp_entity_id, # sp_entity_id
+ identity, # identity as dictionary
+ name_id,
+ authn=_authn,
+ issuer=issuer,
+ policy=policy,
+ sign_assertion=sign_assertion,
+ sign_response=sign_response)
+ else:
+ return self._authn_response(in_response_to, # in_response_to
+ destination, # consumer_url
+ sp_entity_id, # sp_entity_id
+ identity, # identity as dictionary
+ name_id,
+ authn=_authn,
+ issuer=issuer,
+ policy=policy,
+ sign_assertion=sign_assertion,
+ sign_response=sign_response)
except MissingValue, exc:
return self.create_error_response(in_response_to, destination,
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index e9f24f6..36ba542 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -32,6 +32,7 @@ from Crypto.PublicKey.RSA import importKey
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
from Crypto.PublicKey import RSA
+from saml2.cert import OpenSSLWrapper
from saml2.samlp import Response
import xmldsig as ds
@@ -549,6 +550,7 @@ class RSASigner(Signer):
verifier = PKCS1_v1_5.new(key)
return verifier.verify(h, sig)
+
SIGNER_ALGS = {
RSA_SHA1: RSASigner(SHA),
"http://www.w3.org/2001/04/xmldsig-more#rsa-sha256": RSASigner(SHA256),
@@ -921,18 +923,118 @@ def security_context(conf, debug=None):
return SecurityContext(crypto, conf.key_file,
cert_file=conf.cert_file, metadata=metadata,
- debug=debug, only_use_keys_in_metadata=_only_md)
+ debug=debug, only_use_keys_in_metadata=_only_md,
+ cert_handler_extra_class=conf.cert_handler_extra_class,
+ generate_cert_info=conf.generate_cert_info, tmp_cert_file=conf.tmp_cert_file,
+ tmp_key_file=conf.tmp_key_file, validate_certificate=conf.validate_certificate)
+
+
+class CertHandlerExtra(object):
+ def __init__(self):
+ pass
+
+ def use_generate_cert_func(self):
+ raise Exception("use_generate_cert_func function must be implemented")
+
+ def generate_cert(self, generate_cert_info, root_cert_string, root_key_string):
+ raise Exception("generate_cert function must be implemented")
+ #Excepts to return (cert_string, key_string)
+
+ def use_validate_cert_func(self):
+ raise Exception("use_validate_cert_func function must be implemented")
+
+ def validate_cert(self, cert_str, root_cert_string, root_key_string):
+ raise Exception("validate_cert function must be implemented")
+ #Excepts to return True/False
+
+
+class CertHandler(object):
+ def __init__(self, security_context, cert_file=None, cert_type="pem", key_file=None, key_type="pem",
+ generate_cert_info=None, cert_handler_extra_class=None, tmp_cert_file=None, tmp_key_file=None,
+ verify_cert=False):
+ """
+ Initiates the class for handling certificates. Enables the certificates to either be a single certificate
+ as base functionality or makes it possible to generate a new certificate for each call to the function.
+ :param key_file:
+ :param key_type:
+ :param cert_file:
+ :param cert_type:
+ :param generate_cert:
+ :param cert_handler_extra_class:
+ """
+ self._verify_cert = False
+ self._generate_cert = False
+ if cert_type == "pem" and key_type == "pem":
+ self._verify_cert = verify_cert is True
+ self._security_context = security_context
+ self._osw = OpenSSLWrapper()
+ if key_file is not None:
+ self._key_str = self._osw.read_str_from_file(key_file, key_type)
+ else:
+ self._key_str = ""
+ if cert_file is not None:
+ self._cert_str = self._osw.read_str_from_file(cert_file, cert_type)
+ else:
+ self._cert_str = ""
+
+ self._tmp_cert_str = self._cert_str
+ self._tmp_key_str = self._key_str
+ self._tmp_cert_file = tmp_cert_file
+ self._tmp_key_file = tmp_key_file
+
+ self._cert_info = None
+ self._generate_cert_func_active = False
+ if generate_cert_info is not None and len(self._cert_str) > 0 and len(self._key_str) > 0 \
+ and tmp_key_file is not None and tmp_cert_file is not None:
+ self._generate_cert = True
+ self._cert_info = generate_cert_info
+ self._cert_handler_extra_class = cert_handler_extra_class
+
+ def verify_cert(self, cert_file):
+ if self._verify_cert:
+ cert_str = self._osw.read_str_from_file(cert_file, "pem")
+ if self._cert_handler_extra_class is not None and self._cert_handler_extra_class.use_validate_cert_func():
+ self._cert_handler_extra_class.validate_cert(cert_str, self._cert_str, self._key_str)
+ else:
+ valid, mess = self._osw.verify(self._cert_str, cert_str)
+ logger.info("CertHandler.verify_cert: %s" % mess)
+ return valid
+ return True
+
+ def generate_cert(self):
+ return self._generate_cert
+
+ def update_cert(self, active=False):
+ if self._generate_cert and active:
+ if self._cert_handler_extra_class is not None and self._cert_handler_extra_class.use_generate_cert_func():
+ (self._tmp_cert_str, self._tmp_key_str) = \
+ self._cert_handler_extra_class.generate_cert(self._cert_info, self._cert_str, self._key_str)
+ else:
+ self._tmp_cert_str, self._tmp_key_str = self._osw.create_certificate(self._cert_info, request=True)
+ self._tmp_cert_str = self._osw.create_cert_signed_certificate(self._cert_str, self._key_str,
+ self._tmp_cert_str)
+ valid, mess = self._osw.verify(self._cert_str, self._tmp_cert_str)
+ self._osw.write_str_to_file(self._tmp_cert_file, self._tmp_cert_str)
+ self._osw.write_str_to_file(self._tmp_key_file, self._tmp_key_str)
+ self._security_context.key_file = self._tmp_key_file
+ self._security_context.cert_file = self._tmp_cert_file
+ self._security_context.key_type = "pem"
+ self._security_context.cert_type = "pem"
+ self._security_context.my_cert = read_cert_from_file(self._security_context.cert_file,
+ self._security_context.cert_type)
# How to get a rsa pub key fingerprint from a certificate
# openssl x509 -inform pem -noout -in server.crt -pubkey > publickey.pem
# openssl rsa -inform pem -noout -in publickey.pem -pubin -modulus
-
class SecurityContext(object):
+ my_cert = None
+
def __init__(self, crypto, key_file="", key_type="pem",
cert_file="", cert_type="pem", metadata=None,
debug=False, template="", encrypt_key_type="des-192",
- only_use_keys_in_metadata=False):
+ only_use_keys_in_metadata=False, cert_handler_extra_class=None, generate_cert_info=None,
+ tmp_cert_file=None, tmp_key_file=None, validate_certificate=None):
self.crypto = crypto
assert (isinstance(self.crypto, CryptoBackend))
@@ -944,8 +1046,14 @@ class SecurityContext(object):
# Your public key
self.cert_file = cert_file
self.cert_type = cert_type
+
self.my_cert = read_cert_from_file(cert_file, cert_type)
+ self.cert_handler = CertHandler(self, cert_file, cert_type, key_file, key_type, generate_cert_info,
+ cert_handler_extra_class, tmp_cert_file, tmp_key_file, validate_certificate)
+
+ self.cert_handler.update_cert(True)
+
self.metadata = metadata
self.only_use_keys_in_metadata = only_use_keys_in_metadata
self.debug = debug
@@ -1053,14 +1161,23 @@ class SecurityContext(object):
#print certs
verified = False
+ last_pem_file = None
for _, pem_file in certs:
try:
+ last_pem_file = pem_file
if origdoc is not None:
- if self.verify_signature(origdoc, pem_file,
- node_name=node_name,
- node_id=item.id, id_attr=id_attr):
- verified = True
- break
+ try:
+ if self.verify_signature(origdoc, pem_file,
+ node_name=node_name,
+ node_id=item.id, id_attr=id_attr):
+ verified = True
+ break
+ except Exception:
+ if self.verify_signature(decoded_xml, pem_file,
+ node_name=node_name,
+ node_id=item.id, id_attr=id_attr):
+ verified = True
+ break
else:
if self.verify_signature(decoded_xml, pem_file,
node_name=node_name,
@@ -1079,6 +1196,10 @@ class SecurityContext(object):
if not verified:
raise SignatureError("Failed to verify signature")
+ else:
+ if not self.cert_handler.verify_cert(last_pem_file):
+ raise CertificateError("Invalid certificate!")
+
return item
@@ -1222,7 +1343,7 @@ class SecurityContext(object):
origdoc)
if isinstance(response, Response) and (response.assertion or
- response.encrypted_assertion):
+ response.encrypted_assertion):
# Try to find the signing cert in the assertion
for assertion in (response.assertion or response.encrypted_assertion):
if response.encrypted_assertion:
diff --git a/tests/test_81_certificates.py b/tests/test_81_certificates.py
new file mode 100644
index 0000000..d587430
--- /dev/null
+++ b/tests/test_81_certificates.py
@@ -0,0 +1,221 @@
+from os import remove
+import time
+
+__author__ = 'haho0032'
+import unittest
+from saml2.cert import OpenSSLWrapper
+
+
+class TestGenerateCertificates(unittest.TestCase):
+
+ def test_validate_with_root_cert(self):
+
+ cert_info_ca = {
+ "cn": "qwerty",
+ "country_code": "qw",
+ "state": "qwerty",
+ "city": "qwerty",
+ "organization": "qwerty",
+ "organization_unit": "qwerty"
+ }
+
+ cert_info = {
+ "cn": "asdfgh",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+ osw = OpenSSLWrapper()
+
+ ca_cert, ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=True,
+ cert_dir="/Users/haho0032/Develop/openSSL/pki")
+
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+
+ ca_cert_str = osw.read_str_from_file(ca_cert)
+ ca_key_str = osw.read_str_from_file(ca_key)
+
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+ self.assertTrue(valid)
+
+ false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=False)
+ false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(cert_info_ca, request=True)
+ false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_1)
+ false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(cert_info, request=True)
+ false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_2)
+
+ valid, mess = osw.verify(false_ca_cert, cert_str)
+ self.assertFalse(valid)
+ valid, mess = osw.verify(false_ca_cert, false_cert_str_1)
+ self.assertFalse(valid)
+ valid, mess = osw.verify(ca_cert_str, false_cert_str_2)
+ self.assertFalse(valid)
+
+ if 'z' in cert_str:
+ false_cert_str = cert_str.replace('z', 'x')
+ valid, mess = osw.verify(ca_cert_str, false_cert_str)
+ self.assertFalse(valid)
+
+ remove(ca_cert)
+ remove(ca_key)
+
+ def test_validate_cert_chains(self):
+
+ cert_info_ca = {
+ "cn": "qwerty",
+ "country_code": "qw",
+ "state": "qwerty",
+ "city": "qwerty",
+ "organization": "qwerty",
+ "organization_unit": "qwerty"
+ }
+
+ cert_intermediate_1_info = {
+ "cn": "intermediate_1",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+ cert_intermediate_2_info = {
+ "cn": "intermediate_2",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+ cert_client_cert_info = {
+ "cn": "intermediate_1",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+ osw = OpenSSLWrapper()
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+
+ req_cert_str, intermediate_1_key_str = osw.create_certificate(cert_intermediate_1_info, request=True)
+ intermediate_cert_1_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+
+ req_cert_str, intermediate_2_key_str = osw.create_certificate(cert_intermediate_2_info, request=True)
+ intermediate_cert_2_str = osw.create_cert_signed_certificate(intermediate_cert_1_str, intermediate_1_key_str,
+ req_cert_str)
+
+ req_cert_str, client_key_str = osw.create_certificate(cert_client_cert_info, request=True)
+ client_cert_str = osw.create_cert_signed_certificate(intermediate_cert_2_str, intermediate_2_key_str,
+ req_cert_str)
+
+ cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, ca_cert_str]
+
+ valid, mess = osw.verify_chain(cert_chain, client_cert_str)
+ self.assertTrue(valid)
+
+
+ def test_validate_passphrase(self):
+
+ cert_info_ca = {
+ "cn": "qwerty",
+ "country_code": "qw",
+ "state": "qwerty",
+ "city": "qwerty",
+ "organization": "qwerty",
+ "organization_unit": "qwerty"
+ }
+
+ cert_info = {
+ "cn": "intermediate_1",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+
+ osw = OpenSSLWrapper()
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False,
+ cipher_passphrase=
+ {"cipher": "blowfish", "passphrase": "qwerty"})
+
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
+ passphrase="qwerty")
+
+ valid = False
+ try:
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
+ passphrase="qwertyqwerty")
+ except Exception:
+ valid = True
+
+ self.assertTrue(valid)
+
+ def test_validate_expire(self):
+
+ cert_info_ca = {
+ "cn": "qwerty",
+ "country_code": "qw",
+ "state": "qwerty",
+ "city": "qwerty",
+ "organization": "qwerty",
+ "organization_unit": "qwerty"
+ }
+
+ cert_info = {
+ "cn": "intermediate_1",
+ "country_code": "as",
+ "state": "asdfgh",
+ "city": "asdfgh",
+ "organization": "asdfgh",
+ "organization_unit": "asdfg"
+ }
+
+
+ osw = OpenSSLWrapper()
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=1000, valid_to=100000)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+ self.assertFalse(valid)
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=1000,
+ valid_to=100000)
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+ self.assertFalse(valid)
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=0, valid_to=1)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ time.sleep(2)
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+ self.assertFalse(valid)
+
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=0, valid_to=1)
+ time.sleep(2)
+ valid, mess = osw.verify(ca_cert_str, cert_str)
+ self.assertFalse(valid)
\ No newline at end of file