Add functionality to generate new certificates for each authn reqeust and response.

This commit is contained in:
Hans Hörberg
2014-02-14 09:17:05 +01:00
parent a608d92abf
commit fbb8c100c3
16 changed files with 935 additions and 85 deletions

View File

@@ -297,10 +297,13 @@ class SSO(Service):
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
try:
sign_assertion = IDP.config.getattr("sign_assertion", "idp")
if sign_assertion is None:
sign_assertion = False
_resp = IDP.create_authn_response(
identity, userid=self.user,
authn=AUTHN_BROKER[self.environ["idp.authn_ref"]],
**resp_args)
authn=AUTHN_BROKER[self.environ["idp.authn_ref"]], sign_assertion=sign_assertion,
sign_response=False, **resp_args)
except Exception, excp:
logging.error(exception_trace(excp))
resp = ServiceError("Exception: %s" % (excp,))
@@ -322,6 +325,7 @@ class SSO(Service):
def redirect(self):
""" This is the HTTP-redirect endpoint """
logger.info("--- In SSO Redirect ---")
_info = self.unpack_redirect()

View File

@@ -0,0 +1,2 @@
<?xml version='1.0' encoding='UTF-8'?>
<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="urn:oasis:names:tc:SAML:metadata:attribute" xmlns:ns2="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ns4="urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol" xmlns:ns5="http://www.w3.org/2000/09/xmldsig#" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" entityID="http://localhost:8087/LocalTestSPHans.xml"><ns0:Extensions><ns1:EntityAttributes><ns2:Attribute Name="http://macedir.org/entity-category"><ns2:AttributeValue xsi:type="xs:string">http://www.geant.net/uri/dataprotection-code-of-conduct/v1</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/research-and-education</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/hei-service</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/sfs-1993-1153</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/nren-service</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/eu-adequate-protection</ns2:AttributeValue></ns2:Attribute></ns1:EntityAttributes></ns0:Extensions><ns0:SPSSODescriptor AuthnRequestsSigned="false" WantAssertionsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:Extensions><ns4:DiscoveryResponse Binding="urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol" Location="http://localhost:8087/disco" index="1" /></ns0:Extensions><ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8087/slo" /><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087" index="1" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationName xml:lang="en">Lokal test SP Hans</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="se">Lokal test SP Hans</ns0:OrganizationDisplayName><ns0:OrganizationDisplayName xml:lang="en">Lokal test SP Hans</ns0:OrganizationDisplayName><ns0:OrganizationURL xml:lang="en">http://130.239.200.146:8087</ns0:OrganizationURL></ns0:Organization></ns0:EntityDescriptor>

View File

@@ -0,0 +1,97 @@
from saml2 import BINDING_HTTP_REDIRECT
from saml2.extension.idpdisc import BINDING_DISCO
from saml2.saml import NAME_FORMAT_URI
from saml2.sigver import get_xmlsec_binary, CertHandlerExtra
from saml2.entity_category.edugain import COC
from saml2.entity_category.swamid import RESEARCH_AND_EDUCATION
from saml2.entity_category.swamid import HEI
from saml2.entity_category.swamid import SFS_1993_1153
from saml2.entity_category.swamid import NREN
from saml2.entity_category.swamid import EU
#BASE= "http://130.239.200.146:8087"
BASE= "http://localhost:8087"
#BASE= "http://lingon.catalogix.se:8087"
class SpCertHandlerExtraClass(CertHandlerExtra):
def use_generate_cert_func(self):
return True
def generate_cert(self, generate_cert_info, ca_cert_string, ca_key_string):
print "Hello"
return (ca_cert_string, ca_key_string)
def use_validate_cert_func(self):
return False
def validate_cert(self, cert_str, ca_cert_string, ca_key_string):
pass
CONFIG = {
"entityid": "%s/LocalTestSPHans.xml" % BASE,
"description": "Lokal test SP Hans",
"entity_category": [COC, RESEARCH_AND_EDUCATION, HEI, SFS_1993_1153, NREN, EU],
"only_use_keys_in_metadata": False,
"cert_handler_extra_class": None,#MyCertGeneration(),
"generate_cert_info": {
"cn": "localhost",
"country_code": "se",
"state": "ac",
"city": "Umea",
"organization": "ITS Umea University",
"organization_unit": "DIRG"
},
"tmp_key_file": "pki/tmp_mykey.pem",
"tmp_cert_file": "pki/tmp_mycert.pem",
"validate_certificate": True,
"service": {
"sp": {
"authn_requests_signed": "true", #Will sign the request!
"want_assertions_signed": "true", #Demands that the assertion is signed.
"name": "LocalTestSPHans",
"endpoints": {
"assertion_consumer_service": [BASE],
"single_logout_service": [(BASE + "/slo",
BINDING_HTTP_REDIRECT)],
"discovery_response": [
("%s/disco" % BASE, BINDING_DISCO)
]
},
"required_attributes": ["surname", "givenname",
"edupersonaffiliation"],
"optional_attributes": ["title"],
}
},
"debug": 1,
"key_file": "pki/localhost.ca.key",
"cert_file": "pki/localhost.ca.crt",
"attribute_map_dir": "./attributemaps",
"metadata": {
"local": ["../idp2/idp_nocert.xml"]
# #"remote": [{"url": "http://130.239.201.5/role/idp.xml", "cert": None}],
},
#"metadata": {"local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]},
# -- below used by make_metadata --
"organization": {
"name": "Lokal test SP Hans",
"display_name": [("Lokal test SP Hans", "se"), ("Lokal test SP Hans", "en")],
"url": "http://130.239.200.146:8087",
},
"contact_person": [
],
"xmlsec_binary": '/usr/local/bin/xmlsec1',
"name_form": NAME_FORMAT_URI,
"logger": {
"rotating": {
"filename": "sp.log",
"maxBytes": 100000,
"backupCount": 5,
},
"loglevel": "debug",
}
}

View File

@@ -0,0 +1,42 @@
[plugin:auth_tkt]
# identification
use = repoze.who.plugins.auth_tkt:make_plugin
secret = kasamark
cookie_name = pysaml2
secure = False
include_ip = True
timeout = 3600
reissue_time = 3000
# IDENTIFIER
# @param :
# - rememberer_name : name of the plugin for remembering (delegate)
[plugin:saml2auth]
use = s2repoze.plugins.sp:make_plugin
saml_conf = sp_conf
remember_name = auth_tkt
sid_store = outstanding
idp_query_param = IdPEntityId
discovery = http://130.239.201.5/role/idp.ds
[general]
request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider
remote_user_key = REMOTE_USER
[identifiers]
# plugin_name;classifier_name:.. or just plugin_name (good for any)
plugins =
saml2auth
auth_tkt
[authenticators]
# plugin_name;classifier_name.. or just plugin_name (good for any)
plugins = saml2auth
[challengers]
# plugin_name;classifier_name:.. or just plugin_name (good for any)
plugins = saml2auth
[mdproviders]
plugins = saml2auth

View File

@@ -1,34 +1,2 @@
<?xml version='1.0' encoding='UTF-8'?>
<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" entityID="http://localhost:8087/sp.xml"><ns0:SPSSODescriptor AuthnRequestsSigned="false" WantAssertionsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor use="encryption"><ns1:KeyInfo><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:KeyDescriptor use="signing"><ns1:KeyInfo><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8087/slo" /><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087" index="1" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationName xml:lang="en">Exempel AB</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="se">Exempel AB</ns0:OrganizationDisplayName><ns0:OrganizationDisplayName xml:lang="en">Example Co.</ns0:OrganizationDisplayName><ns0:OrganizationURL xml:lang="en">http://www.example.com/roland</ns0:OrganizationURL></ns0:Organization><ns0:ContactPerson contactType="technical"><ns0:GivenName>John</ns0:GivenName><ns0:SurName>Smith</ns0:SurName><ns0:EmailAddress>john.smith@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor>
<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="urn:oasis:names:tc:SAML:metadata:attribute" xmlns:ns2="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ns4="urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol" xmlns:ns5="http://www.w3.org/2000/09/xmldsig#" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" entityID="http://localhost:8087/LocalTestSPHans.xml"><ns0:Extensions><ns1:EntityAttributes><ns2:Attribute Name="http://macedir.org/entity-category"><ns2:AttributeValue xsi:type="xs:string">http://www.geant.net/uri/dataprotection-code-of-conduct/v1</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/research-and-education</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/hei-service</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/sfs-1993-1153</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/nren-service</ns2:AttributeValue><ns2:AttributeValue xsi:type="xs:string">http://www.swamid.se/category/eu-adequate-protection</ns2:AttributeValue></ns2:Attribute></ns1:EntityAttributes></ns0:Extensions><ns0:SPSSODescriptor AuthnRequestsSigned="false" WantAssertionsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:Extensions><ns4:DiscoveryResponse Binding="urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol" Location="http://localhost:8087/disco" index="1" /></ns0:Extensions><ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8087/slo" /><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087" index="1" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationName xml:lang="en">Lokal test SP Hans</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="se">Lokal test SP Hans</ns0:OrganizationDisplayName><ns0:OrganizationDisplayName xml:lang="en">Lokal test SP Hans</ns0:OrganizationDisplayName><ns0:OrganizationURL xml:lang="en">http://130.239.200.146:8087</ns0:OrganizationURL></ns0:Organization></ns0:EntityDescriptor>

View File

@@ -1,19 +1,64 @@
from saml2 import BINDING_HTTP_REDIRECT
from saml2.extension.idpdisc import BINDING_DISCO
from saml2.saml import NAME_FORMAT_URI
from saml2.sigver import get_xmlsec_binary, CertHandlerExtra
from saml2.entity_category.edugain import COC
from saml2.entity_category.swamid import RESEARCH_AND_EDUCATION
from saml2.entity_category.swamid import HEI
from saml2.entity_category.swamid import SFS_1993_1153
from saml2.entity_category.swamid import NREN
from saml2.entity_category.swamid import EU
#BASE= "http://130.239.200.146:8087"
BASE= "http://localhost:8087"
#BASE= "http://lingon.catalogix.se:8087"
class SpCertHandlerExtraClass(CertHandlerExtra):
def use_generate_cert_func(self):
return True
def generate_cert(self, generate_cert_info, ca_cert_string, ca_key_string):
print "Hello"
return (ca_cert_string, ca_key_string)
def use_validate_cert_func(self):
return False
def validate_cert(self, cert_str, ca_cert_string, ca_key_string):
pass
CONFIG = {
"entityid": "%s/sp.xml" % BASE,
"description": "My SP",
"entityid": "%s/LocalTestSPHans.xml" % BASE,
"description": "Lokal test SP Hans",
"entity_category": [COC, RESEARCH_AND_EDUCATION, HEI, SFS_1993_1153, NREN, EU],
"only_use_keys_in_metadata": False,
"cert_handler_extra_class": None,#MyCertGeneration(),
"generate_cert_info": {
"cn": "localhost",
"country_code": "se",
"state": "ac",
"city": "Umea",
"organization": "ITS Umea University",
"organization_unit": "DIRG"
},
"tmp_key_file": "pki/tmp_mykey.pem",
"tmp_cert_file": "pki/tmp_mycert.pem",
"validate_certificate": True,
"service": {
"sp": {
"name": "Rolands SP",
"authn_requests_signed": "true", #Will sign the request!
"want_assertions_signed": "true", #Demands that the assertion is signed.
"name": "LocalTestSPHans",
"endpoints": {
"assertion_consumer_service": [BASE],
"single_logout_service": [(BASE + "/slo",
BINDING_HTTP_REDIRECT)],
"discovery_response": [
("%s/disco" % BASE, BINDING_DISCO)
]
},
"required_attributes": ["surname", "givenname",
"edupersonaffiliation"],
@@ -21,24 +66,25 @@ CONFIG = {
}
},
"debug": 1,
"key_file": "pki/mykey.pem",
"cert_file": "pki/mycert.pem",
"key_file": "pki/localhost.ca.key",
"cert_file": "pki/localhost.ca.crt",
"attribute_map_dir": "./attributemaps",
"metadata": {"local": ["../idp2/idp.xml"]},
"metadata": {
#"local": ["../idp2/idp_nocert.xml"]
"local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]
# #"remote": [{"url": "http://130.239.201.5/role/idp.xml", "cert": None}],
},
#"metadata": {"local": ["/Users/haho0032/Develop/svn/trunk/pyOpSamlProxy/idp_nocert.xml"]},
# -- below used by make_metadata --
"organization": {
"name": "Exempel AB",
"display_name": [("Exempel AB", "se"), ("Example Co.", "en")],
"url": "http://www.example.com/roland",
"name": "Lokal test SP Hans",
"display_name": [("Lokal test SP Hans", "se"), ("Lokal test SP Hans", "en")],
"url": "http://130.239.200.146:8087",
},
"contact_person": [{
"given_name":"John",
"sur_name": "Smith",
"email_address": ["john.smith@example.com"],
"contact_type": "technical",
},
"contact_person": [
],
#"xmlsec_binary":"/opt/local/bin/xmlsec1",
"xmlsec_binary": '/usr/local/bin/xmlsec1',
"name_form": NAME_FORMAT_URI,
"logger": {
"rotating": {
@@ -49,3 +95,4 @@ CONFIG = {
"loglevel": "debug",
}
}

View File

@@ -17,6 +17,7 @@ saml_conf = sp_conf
remember_name = auth_tkt
sid_store = outstanding
idp_query_param = IdPEntityId
discovery = http://130.239.201.5/role/idp.ds
[general]
request_classifier = s2repoze.plugins.challenge_decider:my_request_classifier

View File

@@ -23,7 +23,9 @@ import logging
import sys
import platform
import shelve
import threading
import traceback
import saml2
from urlparse import parse_qs, urlparse
from StringIO import StringIO
@@ -133,7 +135,6 @@ class SAML2Plugin(object):
self.discosrv = discovery
self.idp_query_param = idp_query_param
self.logout_endpoints = [urlparse(ep)[2] for ep in config.endpoint("single_logout_service")]
try:
self.metadata = self.conf.metadata
except KeyError:
@@ -360,11 +361,18 @@ class SAML2Plugin(object):
logger.debug("srvs: %s" % srvs)
dest = srvs[0]["location"]
logger.debug("destination: %s" % dest)
req = _cli.create_authn_request(dest, vorg=vorg_name)
ht_args = _cli.apply_binding(_binding, "%s" % req,
destination=dest,
relay_state=came_from)
_sid = req.id
if _cli.authn_requests_signed:
_sid = saml2.s_utils.sid(_cli.seed)
msg_str = _cli.create_authn_request(dest, vorg=vorg_name, sign=_cli.authn_requests_signed,
message_id=_sid)
else:
req = _cli.create_authn_request(dest, vorg=vorg_name, sign=False)
msg_str = "%s" % req
_sid = req.id
ht_args = _cli.apply_binding(_binding, msg_str, destination=dest, relay_state=came_from)
logger.debug("ht_args: %s" % ht_args)
except Exception, exc:
logger.exception(exc)

303
src/saml2/cert.py Normal file
View File

@@ -0,0 +1,303 @@
import base64
import traceback
from M2Crypto.util import passphrase_callback
import datetime
import dateutil.parser
import pytz
__author__ = 'haho0032'
from OpenSSL import crypto
from os.path import exists, join
from os import remove
from Crypto.Util import asn1
class WrongInput(Exception):
pass
class CertificateError(Exception):
pass
class PayloadError(Exception):
pass
class OpenSSLWrapper(object):
def __init__(self):
pass
def create_certificate(self, cert_info, request=False, valid_from=0, valid_to=315360000, sn=1, key_length=1024,
hash_alg="sha256", write_to_file=False, cert_dir="", cipher_passphrase = None):
"""
Can create certificate requests, to be signed later by another certificate with the method
create_cert_signed_certificate. If request is True.
Can also create self signed root certificates if request is False. This is default behaviour.
:param cert_info: Contains information about the certificate.
Is a dictionary that must contain the keys:
cn = Common name. This part must match the host being authenticated
country_code = Two letter description of the country.
state = State
city = City
organization = Organization, can be a company name.
organization_unit = A unit at the organization, can be a department.
Example:
cert_info_ca = {
"cn": "company.com",
"country_code": "se",
"state": "AC",
"city": "Dorotea",
"organization": "Company",
"organization_unit": "Sales"
}
:param request: True if this is a request for certificate, that should be signed.
False if this is a self signed certificate, root certificate.
:param valid_from: When the certificate starts to be valid. Amount of seconds from when the
certificate is generated.
:param valid_to: How long the certificate will be valid from when it is generated.
The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
:param sn: Serial number for the certificate. Default is 1.
:param key_length: Length of the key to be generated. Defaults to 1024.
:param hash_alg: Hash algorithm to use for the key. Default is sha256.
:param write_to_file: True if you want to write the certificate to a file. The method will then return
a tuple with path to certificate file and path to key file.
False if you want to get the result as strings. The method will then return a tuple
with the certificate string and the key as string.
WILL OVERWRITE ALL EXISTING FILES WITHOUT ASKING!
:param cert_dir: Where to save the files if write_to_file is true.
:param cipher_passphrase A dictionary with cipher and passphrase. Example:
{"cipher": "blowfish", "passphrase": "qwerty"}
:return: string representation of certificate, string representation of private key
if write_to_file parameter is False otherwise
path to certificate file, path to private key file
"""
cn = cert_info["cn"]
c_f = None
k_f = None
if write_to_file:
cert_file = "%s.crt" % cn
key_file = "%s.key" % cn
try:
remove(cert_file)
except:
pass
try:
remove(key_file)
except:
pass
c_f = join(cert_dir, cert_file)
k_f = join(cert_dir, key_file)
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, key_length)
# create a self-signed cert
cert = crypto.X509()
if request:
cert = crypto.X509Req()
if (len(cert_info["country_code"]) != 2):
raise WrongInput("Country code must be two letters!")
cert.get_subject().C = cert_info["country_code"]
cert.get_subject().ST = cert_info["state"]
cert.get_subject().L = cert_info["city"]
cert.get_subject().O = cert_info["organization"]
cert.get_subject().OU = cert_info["organization_unit"]
cert.get_subject().CN = cn
if not request:
cert.set_serial_number(sn)
cert.gmtime_adj_notBefore(valid_from) #Valid before present time
cert.gmtime_adj_notAfter(valid_to) #3 650 days
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, hash_alg)
filesCreated = False
try:
if request:
tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert)
else:
tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
tmp_key = None
if cipher_passphrase is not None:
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"],
cipher_passphrase["passphrase"])
else:
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
if write_to_file:
fc = open(c_f, "wt")
fk = open(k_f, "wt")
if request:
fc.write(tmp_cert)
else:
fc.write(tmp_cert)
fk.write(tmp_key)
filesCreated = True
try:
fc.close()
except:
pass
try:
fk.close()
except:
pass
return c_f, k_f
return tmp_cert, tmp_key
except Exception as ex:
raise CertificateError("Certificate cannot be generated.", ex)
def write_str_to_file(self, file, str_data):
f = open(file, "wt")
f.write(str_data)
f.close()
def read_str_from_file(self, file, type="pem"):
f = open(file)
str_data = f.read()
f.close()
if type == "pem":
return str_data
if type in ["der", "cer", "crt"]:
return base64.b64encode(str(str_data))
def create_cert_signed_certificate(self, sign_cert_str, sign_key_str, request_cert_str, hash_alg="sha256",
valid_from=0, valid_to=315360000, sn=1, passphrase=None):
"""
Will sign a certificate request with a give certificate.
:param sign_cert_str: This certificate will be used to sign with. Must be a string representation of
the certificate. If you only have a file use the method read_str_from_file to
get a string representation.
:param sign_key_str: This is the key for the ca_cert_str represented as a string.
If you only have a file use the method read_str_from_file to get a string
representation.
:param request_cert_str: This is the prepared certificate to be signed. Must be a string representation of
the requested certificate. If you only have a file use the method read_str_from_file
to get a string representation.
:param hash_alg: Hash algorithm to use for the key. Default is sha256.
:param valid_from: When the certificate starts to be valid. Amount of seconds from when the
certificate is generated.
:param valid_to: How long the certificate will be valid from when it is generated.
The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
:param sn: Serial number for the certificate. Default is 1.
:param passphrase: Password for the private key in sign_key_str.
:return: String representation of the signed certificate.
"""
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
ca_key = None
if passphrase is not None:
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase)
else:
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str)
cert = crypto.X509()
cert.set_subject(req_cert.get_subject())
cert.set_serial_number(sn)
cert.gmtime_adj_notBefore(valid_from)
cert.gmtime_adj_notAfter(valid_to)
cert.set_issuer(ca_cert.get_subject())
cert.set_pubkey(req_cert.get_pubkey())
cert.sign(ca_key, hash_alg)
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
def verify_chain(self, cert_chain_str_list, cert_str):
"""
:param cert_chain_str_list: Must be a list of certificate strings, where the first certificate to be validate
is in the beginning and the root certificate is last.
:param cert_str: The certificate to be validated.
:return:
"""
for tmp_cert_str in cert_chain_str_list:
valid, message = self.verify(tmp_cert_str, cert_str)
if not valid:
return False, message
else:
cert_str = tmp_cert_str
return True, "Signed certificate is valid and correctly signed by CA certificate."
def certificate_not_valid_yet(self, cert):
starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
now = pytz.UTC.localize(datetime.datetime.utcnow())
if starts_to_be_valid < now:
return False
return True
def verify(self, signing_cert_str, cert_str):
"""
Verifies if a certificate is valid and signed by a given certificate.
:param signing_cert_str: This certificate will be used to verify the signature. Must be a string representation
of the certificate. If you only have a file use the method read_str_from_file to
get a string representation.
:param cert_str: This certificate will be verified if it is correct. Must be a string representation
of the certificate. If you only have a file use the method read_str_from_file to
get a string representation.
:return: Valid, Message
Valid = True if the certificate is valid, otherwise false.
Message = Why the validation failed.
"""
try:
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str)
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
if self.certificate_not_valid_yet(ca_cert):
return False, "CA certificate is not valid yet."
if ca_cert.has_expired() == 1:
return False, "CA certificate is expired."
if cert.has_expired() == 1:
return False, "The signed certificate is expired."
if self.certificate_not_valid_yet(cert):
return False, "The signed certificate is not valid yet."
if ca_cert.get_subject().CN == cert.get_subject().CN:
return False, "CN may not be equal for CA certificate and the signed certificate."
cert_algorithm = cert.get_signature_algorithm()
cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
der_seq = asn1.DerSequence()
der_seq.decode(cert_asn1)
cert_certificate=der_seq[0]
#cert_signature_algorithm=der_seq[1]
cert_signature=der_seq[2]
cert_signature_decoded = asn1.DerObject()
cert_signature_decoded.decode(cert_signature)
signature_payload = cert_signature_decoded.payload
if signature_payload[0] != '\x00':
return False, "The certificate should not contain any unused bits."
signature = signature_payload[1:]
try:
crypto.verify(ca_cert, signature, cert_certificate, cert_algorithm)
return True, "Signed certificate is valid and correctly signed by CA certificate."
except crypto.Error, e:
return False, "Certificate is incorrectly signed."
except Exception, e:
return False, "Certificate is not valid for an unknown reason."

View File

@@ -18,6 +18,7 @@
"""Contains classes and functions that a SAML2.0 Service Provider (SP) may use
to conclude its tasks.
"""
import threading
from urllib import urlencode
from urlparse import urlparse
@@ -110,7 +111,7 @@ class Base(Entity):
Entity.__init__(self, "sp", config, config_file, virtual_organization)
self.users = Population(identity_cache)
self.lock = threading.Lock()
# for server state storage
if state_cache is None:
self.state = {} # in memory storage
@@ -298,11 +299,19 @@ class Base(Entity):
except KeyError:
pass
if sign and self.sec.cert_handler.generate_cert():
with self.lock:
self.sec.cert_handler.update_cert(True)
return self._message(AuthnRequest, destination, message_id, consent,
extensions, sign, sign_prepare,
protocol_binding=binding,
scoping=scoping, **args)
return self._message(AuthnRequest, destination, message_id, consent,
extensions, sign, sign_prepare,
protocol_binding=binding,
scoping=scoping, **args)
def create_attribute_query(self, destination, name_id=None,
attribute=None, message_id=0, consent=None,
extensions=None, sign=False, sign_prepare=False,

View File

@@ -62,7 +62,12 @@ COMMON_ARGS = [
"session_storage",
"entity_category",
"xmlsec_path",
"extension_schemas"
"extension_schemas",
"cert_handler_extra_class",
"generate_cert_info",
"tmp_cert_file",
"tmp_key_file",
"validate_certificate"
]
SP_ARGS = [
@@ -84,6 +89,7 @@ SP_ARGS = [
]
AA_IDP_ARGS = [
"sign_assertion",
"want_authn_requests_signed",
"provided_attributes",
"subject_data",
@@ -199,6 +205,11 @@ class Config(object):
self.scope = ""
self.allow_unknown_attributes = False
self.extension_schema = {}
self.cert_handler_extra_class = None
self.generate_cert_info = None
self.tmp_cert_file = None
self.tmp_key_file = None
self.validate_certificate = None
def setattr(self, context, attr, val):
if context == "":

View File

@@ -542,7 +542,8 @@ class Entity(HTTPBase):
origdoc = xmlstr
xmlstr = self.unravel(xmlstr, binding, request_cls.msgtype)
_request = _request.loads(xmlstr, binding, origdoc=origdoc)
must = self.config.getattr("want_authn_requests_signed", "idp")
_request = _request.loads(xmlstr, binding, origdoc=origdoc, must=must)
_log_debug("Loaded request")

View File

@@ -36,12 +36,12 @@ class Request(object):
self.message = None
self.not_on_or_after = 0
def _loads(self, xmldata, binding=None, origdoc=None):
def _loads(self, xmldata, binding=None, origdoc=None, must=None):
# own copy
self.xmlstr = xmldata[:]
logger.info("xmlstr: %s" % (self.xmlstr,))
try:
self.message = self.signature_check(xmldata, origdoc=origdoc)
self.message = self.signature_check(xmldata, origdoc=origdoc, must=must)
except TypeError:
raise
except Exception, excp:
@@ -84,8 +84,8 @@ class Request(object):
assert self.issue_instant_ok()
return self
def loads(self, xmldata, binding, origdoc=None):
return self._loads(xmldata, binding, origdoc)
def loads(self, xmldata, binding, origdoc=None, must=None):
return self._loads(xmldata, binding, origdoc, must)
def verify(self):
try:

View File

@@ -22,6 +22,7 @@ import logging
import os
import shelve
import threading
from saml2.eptid import EptidShelve, Eptid
from saml2.sdb import SessionStorage
@@ -71,6 +72,7 @@ class Server(Entity):
self.symkey = symkey
self.seed = rndstr()
self.iv = os.urandom(16)
self.lock = threading.Lock()
def support_AssertionIDRequest(self):
return True
@@ -309,7 +311,7 @@ class Server(Entity):
self.config.attribute_converters,
policy, issuer=_issuer)
if sign_assertion:
if sign_assertion is not None and sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
@@ -419,7 +421,6 @@ class Server(Entity):
:param sign_response: Whether the response should be signed or not.
:return: A response instance
"""
policy = self.config.getattr("policy", "idp")
if not name_id:
@@ -455,16 +456,30 @@ class Server(Entity):
try:
_authn = authn
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id,
authn=_authn,
issuer=issuer,
policy=policy,
sign_assertion=sign_assertion,
sign_response=sign_response)
if (sign_assertion or sign_response) and self.sec.cert_handler.generate_cert():
with self.lock:
self.sec.cert_handler.update_cert(True)
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id,
authn=_authn,
issuer=issuer,
policy=policy,
sign_assertion=sign_assertion,
sign_response=sign_response)
else:
return self._authn_response(in_response_to, # in_response_to
destination, # consumer_url
sp_entity_id, # sp_entity_id
identity, # identity as dictionary
name_id,
authn=_authn,
issuer=issuer,
policy=policy,
sign_assertion=sign_assertion,
sign_response=sign_response)
except MissingValue, exc:
return self.create_error_response(in_response_to, destination,

View File

@@ -32,6 +32,7 @@ from Crypto.PublicKey.RSA import importKey
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
from Crypto.PublicKey import RSA
from saml2.cert import OpenSSLWrapper
from saml2.samlp import Response
import xmldsig as ds
@@ -549,6 +550,7 @@ class RSASigner(Signer):
verifier = PKCS1_v1_5.new(key)
return verifier.verify(h, sig)
SIGNER_ALGS = {
RSA_SHA1: RSASigner(SHA),
"http://www.w3.org/2001/04/xmldsig-more#rsa-sha256": RSASigner(SHA256),
@@ -921,18 +923,118 @@ def security_context(conf, debug=None):
return SecurityContext(crypto, conf.key_file,
cert_file=conf.cert_file, metadata=metadata,
debug=debug, only_use_keys_in_metadata=_only_md)
debug=debug, only_use_keys_in_metadata=_only_md,
cert_handler_extra_class=conf.cert_handler_extra_class,
generate_cert_info=conf.generate_cert_info, tmp_cert_file=conf.tmp_cert_file,
tmp_key_file=conf.tmp_key_file, validate_certificate=conf.validate_certificate)
class CertHandlerExtra(object):
def __init__(self):
pass
def use_generate_cert_func(self):
raise Exception("use_generate_cert_func function must be implemented")
def generate_cert(self, generate_cert_info, root_cert_string, root_key_string):
raise Exception("generate_cert function must be implemented")
#Excepts to return (cert_string, key_string)
def use_validate_cert_func(self):
raise Exception("use_validate_cert_func function must be implemented")
def validate_cert(self, cert_str, root_cert_string, root_key_string):
raise Exception("validate_cert function must be implemented")
#Excepts to return True/False
class CertHandler(object):
def __init__(self, security_context, cert_file=None, cert_type="pem", key_file=None, key_type="pem",
generate_cert_info=None, cert_handler_extra_class=None, tmp_cert_file=None, tmp_key_file=None,
verify_cert=False):
"""
Initiates the class for handling certificates. Enables the certificates to either be a single certificate
as base functionality or makes it possible to generate a new certificate for each call to the function.
:param key_file:
:param key_type:
:param cert_file:
:param cert_type:
:param generate_cert:
:param cert_handler_extra_class:
"""
self._verify_cert = False
self._generate_cert = False
if cert_type == "pem" and key_type == "pem":
self._verify_cert = verify_cert is True
self._security_context = security_context
self._osw = OpenSSLWrapper()
if key_file is not None:
self._key_str = self._osw.read_str_from_file(key_file, key_type)
else:
self._key_str = ""
if cert_file is not None:
self._cert_str = self._osw.read_str_from_file(cert_file, cert_type)
else:
self._cert_str = ""
self._tmp_cert_str = self._cert_str
self._tmp_key_str = self._key_str
self._tmp_cert_file = tmp_cert_file
self._tmp_key_file = tmp_key_file
self._cert_info = None
self._generate_cert_func_active = False
if generate_cert_info is not None and len(self._cert_str) > 0 and len(self._key_str) > 0 \
and tmp_key_file is not None and tmp_cert_file is not None:
self._generate_cert = True
self._cert_info = generate_cert_info
self._cert_handler_extra_class = cert_handler_extra_class
def verify_cert(self, cert_file):
if self._verify_cert:
cert_str = self._osw.read_str_from_file(cert_file, "pem")
if self._cert_handler_extra_class is not None and self._cert_handler_extra_class.use_validate_cert_func():
self._cert_handler_extra_class.validate_cert(cert_str, self._cert_str, self._key_str)
else:
valid, mess = self._osw.verify(self._cert_str, cert_str)
logger.info("CertHandler.verify_cert: %s" % mess)
return valid
return True
def generate_cert(self):
return self._generate_cert
def update_cert(self, active=False):
if self._generate_cert and active:
if self._cert_handler_extra_class is not None and self._cert_handler_extra_class.use_generate_cert_func():
(self._tmp_cert_str, self._tmp_key_str) = \
self._cert_handler_extra_class.generate_cert(self._cert_info, self._cert_str, self._key_str)
else:
self._tmp_cert_str, self._tmp_key_str = self._osw.create_certificate(self._cert_info, request=True)
self._tmp_cert_str = self._osw.create_cert_signed_certificate(self._cert_str, self._key_str,
self._tmp_cert_str)
valid, mess = self._osw.verify(self._cert_str, self._tmp_cert_str)
self._osw.write_str_to_file(self._tmp_cert_file, self._tmp_cert_str)
self._osw.write_str_to_file(self._tmp_key_file, self._tmp_key_str)
self._security_context.key_file = self._tmp_key_file
self._security_context.cert_file = self._tmp_cert_file
self._security_context.key_type = "pem"
self._security_context.cert_type = "pem"
self._security_context.my_cert = read_cert_from_file(self._security_context.cert_file,
self._security_context.cert_type)
# How to get a rsa pub key fingerprint from a certificate
# openssl x509 -inform pem -noout -in server.crt -pubkey > publickey.pem
# openssl rsa -inform pem -noout -in publickey.pem -pubin -modulus
class SecurityContext(object):
my_cert = None
def __init__(self, crypto, key_file="", key_type="pem",
cert_file="", cert_type="pem", metadata=None,
debug=False, template="", encrypt_key_type="des-192",
only_use_keys_in_metadata=False):
only_use_keys_in_metadata=False, cert_handler_extra_class=None, generate_cert_info=None,
tmp_cert_file=None, tmp_key_file=None, validate_certificate=None):
self.crypto = crypto
assert (isinstance(self.crypto, CryptoBackend))
@@ -944,8 +1046,14 @@ class SecurityContext(object):
# Your public key
self.cert_file = cert_file
self.cert_type = cert_type
self.my_cert = read_cert_from_file(cert_file, cert_type)
self.cert_handler = CertHandler(self, cert_file, cert_type, key_file, key_type, generate_cert_info,
cert_handler_extra_class, tmp_cert_file, tmp_key_file, validate_certificate)
self.cert_handler.update_cert(True)
self.metadata = metadata
self.only_use_keys_in_metadata = only_use_keys_in_metadata
self.debug = debug
@@ -1053,14 +1161,23 @@ class SecurityContext(object):
#print certs
verified = False
last_pem_file = None
for _, pem_file in certs:
try:
last_pem_file = pem_file
if origdoc is not None:
if self.verify_signature(origdoc, pem_file,
node_name=node_name,
node_id=item.id, id_attr=id_attr):
verified = True
break
try:
if self.verify_signature(origdoc, pem_file,
node_name=node_name,
node_id=item.id, id_attr=id_attr):
verified = True
break
except Exception:
if self.verify_signature(decoded_xml, pem_file,
node_name=node_name,
node_id=item.id, id_attr=id_attr):
verified = True
break
else:
if self.verify_signature(decoded_xml, pem_file,
node_name=node_name,
@@ -1079,6 +1196,10 @@ class SecurityContext(object):
if not verified:
raise SignatureError("Failed to verify signature")
else:
if not self.cert_handler.verify_cert(last_pem_file):
raise CertificateError("Invalid certificate!")
return item
@@ -1222,7 +1343,7 @@ class SecurityContext(object):
origdoc)
if isinstance(response, Response) and (response.assertion or
response.encrypted_assertion):
response.encrypted_assertion):
# Try to find the signing cert in the assertion
for assertion in (response.assertion or response.encrypted_assertion):
if response.encrypted_assertion:

View File

@@ -0,0 +1,221 @@
from os import remove
import time
__author__ = 'haho0032'
import unittest
from saml2.cert import OpenSSLWrapper
class TestGenerateCertificates(unittest.TestCase):
def test_validate_with_root_cert(self):
cert_info_ca = {
"cn": "qwerty",
"country_code": "qw",
"state": "qwerty",
"city": "qwerty",
"organization": "qwerty",
"organization_unit": "qwerty"
}
cert_info = {
"cn": "asdfgh",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert, ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=True,
cert_dir="/Users/haho0032/Develop/openSSL/pki")
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
ca_cert_str = osw.read_str_from_file(ca_cert)
ca_key_str = osw.read_str_from_file(ca_key)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertTrue(valid)
false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=False)
false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(cert_info_ca, request=True)
false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_1)
false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(cert_info, request=True)
false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_2)
valid, mess = osw.verify(false_ca_cert, cert_str)
self.assertFalse(valid)
valid, mess = osw.verify(false_ca_cert, false_cert_str_1)
self.assertFalse(valid)
valid, mess = osw.verify(ca_cert_str, false_cert_str_2)
self.assertFalse(valid)
if 'z' in cert_str:
false_cert_str = cert_str.replace('z', 'x')
valid, mess = osw.verify(ca_cert_str, false_cert_str)
self.assertFalse(valid)
remove(ca_cert)
remove(ca_key)
def test_validate_cert_chains(self):
cert_info_ca = {
"cn": "qwerty",
"country_code": "qw",
"state": "qwerty",
"city": "qwerty",
"organization": "qwerty",
"organization_unit": "qwerty"
}
cert_intermediate_1_info = {
"cn": "intermediate_1",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
cert_intermediate_2_info = {
"cn": "intermediate_2",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
cert_client_cert_info = {
"cn": "intermediate_1",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, intermediate_1_key_str = osw.create_certificate(cert_intermediate_1_info, request=True)
intermediate_cert_1_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
req_cert_str, intermediate_2_key_str = osw.create_certificate(cert_intermediate_2_info, request=True)
intermediate_cert_2_str = osw.create_cert_signed_certificate(intermediate_cert_1_str, intermediate_1_key_str,
req_cert_str)
req_cert_str, client_key_str = osw.create_certificate(cert_client_cert_info, request=True)
client_cert_str = osw.create_cert_signed_certificate(intermediate_cert_2_str, intermediate_2_key_str,
req_cert_str)
cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, ca_cert_str]
valid, mess = osw.verify_chain(cert_chain, client_cert_str)
self.assertTrue(valid)
def test_validate_passphrase(self):
cert_info_ca = {
"cn": "qwerty",
"country_code": "qw",
"state": "qwerty",
"city": "qwerty",
"organization": "qwerty",
"organization_unit": "qwerty"
}
cert_info = {
"cn": "intermediate_1",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False,
cipher_passphrase=
{"cipher": "blowfish", "passphrase": "qwerty"})
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
passphrase="qwerty")
valid = False
try:
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
passphrase="qwertyqwerty")
except Exception:
valid = True
self.assertTrue(valid)
def test_validate_expire(self):
cert_info_ca = {
"cn": "qwerty",
"country_code": "qw",
"state": "qwerty",
"city": "qwerty",
"organization": "qwerty",
"organization_unit": "qwerty"
}
cert_info = {
"cn": "intermediate_1",
"country_code": "as",
"state": "asdfgh",
"city": "asdfgh",
"organization": "asdfgh",
"organization_unit": "asdfg"
}
osw = OpenSSLWrapper()
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=1000, valid_to=100000)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=1000,
valid_to=100000)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=0, valid_to=1)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=0, valid_to=1)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)