PEP-8 stuff
This commit is contained in:
@@ -541,7 +541,8 @@ class Assertion(dict):
|
|||||||
def __init__(self, dic=None):
|
def __init__(self, dic=None):
|
||||||
dict.__init__(self, dic)
|
dict.__init__(self, dic)
|
||||||
|
|
||||||
def _authn_context_decl(self, decl, authn_auth=None):
|
@staticmethod
|
||||||
|
def _authn_context_decl(decl, authn_auth=None):
|
||||||
"""
|
"""
|
||||||
Construct the authn context with a authn context declaration
|
Construct the authn context with a authn context declaration
|
||||||
:param decl: The authn context declaration
|
:param decl: The authn context declaration
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ from os.path import join
|
|||||||
from os import remove
|
from os import remove
|
||||||
from Crypto.Util import asn1
|
from Crypto.Util import asn1
|
||||||
|
|
||||||
|
|
||||||
class WrongInput(Exception):
|
class WrongInput(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -23,55 +22,82 @@ class PayloadError(Exception):
|
|||||||
|
|
||||||
|
|
||||||
class OpenSSLWrapper(object):
|
class OpenSSLWrapper(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def create_certificate(self, cert_info, request=False, valid_from=0, valid_to=315360000, sn=1, key_length=1024,
|
def create_certificate(self, cert_info, request=False, valid_from=0,
|
||||||
hash_alg="sha256", write_to_file=False, cert_dir="", cipher_passphrase = None):
|
valid_to=315360000, sn=1, key_length=1024,
|
||||||
|
hash_alg="sha256", write_to_file=False, cert_dir="",
|
||||||
|
cipher_passphrase=None):
|
||||||
"""
|
"""
|
||||||
Can create certificate requests, to be signed later by another certificate with the method
|
Can create certificate requests, to be signed later by another
|
||||||
|
certificate with the method
|
||||||
create_cert_signed_certificate. If request is True.
|
create_cert_signed_certificate. If request is True.
|
||||||
|
|
||||||
Can also create self signed root certificates if request is False. This is default behaviour.
|
Can also create self signed root certificates if request is False.
|
||||||
|
This is default behaviour.
|
||||||
|
|
||||||
:param cert_info: Contains information about the certificate.
|
:param cert_info: Contains information about the certificate.
|
||||||
Is a dictionary that must contain the keys:
|
Is a dictionary that must contain the keys:
|
||||||
cn = Common name. This part must match the host being authenticated
|
cn = Common name. This part
|
||||||
country_code = Two letter description of the country.
|
must match the host being authenticated
|
||||||
|
country_code = Two letter description
|
||||||
|
of the country.
|
||||||
state = State
|
state = State
|
||||||
city = City
|
city = City
|
||||||
organization = Organization, can be a company name.
|
organization = Organization, can be a
|
||||||
organization_unit = A unit at the organization, can be a department.
|
company name.
|
||||||
|
organization_unit = A unit at the
|
||||||
|
organization, can be a department.
|
||||||
Example:
|
Example:
|
||||||
cert_info_ca = {
|
cert_info_ca = {
|
||||||
"cn": "company.com",
|
"cn": "company.com",
|
||||||
"country_code": "se",
|
"country_code": "se",
|
||||||
"state": "AC",
|
"state": "AC",
|
||||||
"city": "Dorotea",
|
"city": "Dorotea",
|
||||||
"organization": "Company",
|
"organization":
|
||||||
"organization_unit": "Sales"
|
"Company",
|
||||||
|
"organization_unit":
|
||||||
|
"Sales"
|
||||||
}
|
}
|
||||||
:param request: True if this is a request for certificate, that should be signed.
|
:param request: True if this is a request for certificate,
|
||||||
False if this is a self signed certificate, root certificate.
|
that should be signed.
|
||||||
:param valid_from: When the certificate starts to be valid. Amount of seconds from when the
|
False if this is a self signed certificate,
|
||||||
|
root certificate.
|
||||||
|
:param valid_from: When the certificate starts to be valid.
|
||||||
|
Amount of seconds from when the
|
||||||
certificate is generated.
|
certificate is generated.
|
||||||
:param valid_to: How long the certificate will be valid from when it is generated.
|
:param valid_to: How long the certificate will be valid from
|
||||||
The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
|
when it is generated.
|
||||||
:param sn: Serial number for the certificate. Default is 1.
|
The value is in seconds. Default is
|
||||||
:param key_length: Length of the key to be generated. Defaults to 1024.
|
315360000 seconds, a.k.a 10 years.
|
||||||
:param hash_alg: Hash algorithm to use for the key. Default is sha256.
|
:param sn: Serial number for the certificate. Default
|
||||||
:param write_to_file: True if you want to write the certificate to a file. The method will then return
|
is 1.
|
||||||
a tuple with path to certificate file and path to key file.
|
:param key_length: Length of the key to be generated. Defaults
|
||||||
False if you want to get the result as strings. The method will then return a tuple
|
to 1024.
|
||||||
with the certificate string and the key as string.
|
:param hash_alg: Hash algorithm to use for the key. Default
|
||||||
WILL OVERWRITE ALL EXISTING FILES WITHOUT ASKING!
|
is sha256.
|
||||||
:param cert_dir: Where to save the files if write_to_file is true.
|
:param write_to_file: True if you want to write the certificate
|
||||||
:param cipher_passphrase A dictionary with cipher and passphrase. Example:
|
to a file. The method will then return
|
||||||
{"cipher": "blowfish", "passphrase": "qwerty"}
|
a tuple with path to certificate file and
|
||||||
:return: string representation of certificate, string representation of private key
|
path to key file.
|
||||||
|
False if you want to get the result as
|
||||||
|
strings. The method will then return a tuple
|
||||||
|
with the certificate string and the key as
|
||||||
|
string.
|
||||||
|
WILL OVERWRITE ALL EXISTING FILES WITHOUT
|
||||||
|
ASKING!
|
||||||
|
:param cert_dir: Where to save the files if write_to_file is
|
||||||
|
true.
|
||||||
|
:param cipher_passphrase A dictionary with cipher and passphrase.
|
||||||
|
Example::
|
||||||
|
{"cipher": "blowfish", "passphrase": "qwerty"}
|
||||||
|
|
||||||
|
:return: string representation of certificate,
|
||||||
|
string representation of private key
|
||||||
if write_to_file parameter is False otherwise
|
if write_to_file parameter is False otherwise
|
||||||
path to certificate file, path to private key file
|
path to certificate file, path to private
|
||||||
|
key file
|
||||||
"""
|
"""
|
||||||
cn = cert_info["cn"]
|
cn = cert_info["cn"]
|
||||||
|
|
||||||
@@ -97,7 +123,7 @@ class OpenSSLWrapper(object):
|
|||||||
k = crypto.PKey()
|
k = crypto.PKey()
|
||||||
k.generate_key(crypto.TYPE_RSA, key_length)
|
k.generate_key(crypto.TYPE_RSA, key_length)
|
||||||
|
|
||||||
# create a self-signed cert
|
# create a self-signed cert
|
||||||
cert = crypto.X509()
|
cert = crypto.X509()
|
||||||
|
|
||||||
if request:
|
if request:
|
||||||
@@ -113,8 +139,8 @@ class OpenSSLWrapper(object):
|
|||||||
cert.get_subject().CN = cn
|
cert.get_subject().CN = cn
|
||||||
if not request:
|
if not request:
|
||||||
cert.set_serial_number(sn)
|
cert.set_serial_number(sn)
|
||||||
cert.gmtime_adj_notBefore(valid_from) #Valid before present time
|
cert.gmtime_adj_notBefore(valid_from) #Valid before present time
|
||||||
cert.gmtime_adj_notAfter(valid_to) #3 650 days
|
cert.gmtime_adj_notAfter(valid_to) #3 650 days
|
||||||
cert.set_issuer(cert.get_subject())
|
cert.set_issuer(cert.get_subject())
|
||||||
cert.set_pubkey(k)
|
cert.set_pubkey(k)
|
||||||
cert.sign(k, hash_alg)
|
cert.sign(k, hash_alg)
|
||||||
@@ -122,13 +148,16 @@ class OpenSSLWrapper(object):
|
|||||||
filesCreated = False
|
filesCreated = False
|
||||||
try:
|
try:
|
||||||
if request:
|
if request:
|
||||||
tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert)
|
tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM,
|
||||||
|
cert)
|
||||||
else:
|
else:
|
||||||
tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
|
tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
|
||||||
tmp_key = None
|
tmp_key = None
|
||||||
if cipher_passphrase is not None:
|
if cipher_passphrase is not None:
|
||||||
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"],
|
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k,
|
||||||
cipher_passphrase["passphrase"])
|
cipher_passphrase["cipher"],
|
||||||
|
cipher_passphrase[
|
||||||
|
"passphrase"])
|
||||||
else:
|
else:
|
||||||
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
|
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
|
||||||
if write_to_file:
|
if write_to_file:
|
||||||
@@ -172,36 +201,52 @@ class OpenSSLWrapper(object):
|
|||||||
return base64.b64encode(str(str_data))
|
return base64.b64encode(str(str_data))
|
||||||
|
|
||||||
|
|
||||||
def create_cert_signed_certificate(self, sign_cert_str, sign_key_str, request_cert_str, hash_alg="sha256",
|
def create_cert_signed_certificate(self, sign_cert_str, sign_key_str,
|
||||||
valid_from=0, valid_to=315360000, sn=1, passphrase=None):
|
request_cert_str, hash_alg="sha256",
|
||||||
|
valid_from=0, valid_to=315360000, sn=1,
|
||||||
|
passphrase=None):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Will sign a certificate request with a give certificate.
|
Will sign a certificate request with a give certificate.
|
||||||
:param sign_cert_str: This certificate will be used to sign with. Must be a string representation of
|
:param sign_cert_str: This certificate will be used to sign with.
|
||||||
the certificate. If you only have a file use the method read_str_from_file to
|
Must be a string representation of
|
||||||
|
the certificate. If you only have a file
|
||||||
|
use the method read_str_from_file to
|
||||||
get a string representation.
|
get a string representation.
|
||||||
:param sign_key_str: This is the key for the ca_cert_str represented as a string.
|
:param sign_key_str: This is the key for the ca_cert_str
|
||||||
If you only have a file use the method read_str_from_file to get a string
|
represented as a string.
|
||||||
|
If you only have a file use the method
|
||||||
|
read_str_from_file to get a string
|
||||||
representation.
|
representation.
|
||||||
:param request_cert_str: This is the prepared certificate to be signed. Must be a string representation of
|
:param request_cert_str: This is the prepared certificate to be
|
||||||
the requested certificate. If you only have a file use the method read_str_from_file
|
signed. Must be a string representation of
|
||||||
|
the requested certificate. If you only have
|
||||||
|
a file use the method read_str_from_file
|
||||||
to get a string representation.
|
to get a string representation.
|
||||||
:param hash_alg: Hash algorithm to use for the key. Default is sha256.
|
:param hash_alg: Hash algorithm to use for the key. Default
|
||||||
:param valid_from: When the certificate starts to be valid. Amount of seconds from when the
|
is sha256.
|
||||||
|
:param valid_from: When the certificate starts to be valid.
|
||||||
|
Amount of seconds from when the
|
||||||
certificate is generated.
|
certificate is generated.
|
||||||
:param valid_to: How long the certificate will be valid from when it is generated.
|
:param valid_to: How long the certificate will be valid from
|
||||||
The value is in seconds. Default is 315360000 seconds, a.k.a 10 years.
|
when it is generated.
|
||||||
:param sn: Serial number for the certificate. Default is 1.
|
The value is in seconds. Default is
|
||||||
|
315360000 seconds, a.k.a 10 years.
|
||||||
|
:param sn: Serial number for the certificate. Default
|
||||||
|
is 1.
|
||||||
:param passphrase: Password for the private key in sign_key_str.
|
:param passphrase: Password for the private key in sign_key_str.
|
||||||
:return: String representation of the signed certificate.
|
:return: String representation of the signed
|
||||||
|
certificate.
|
||||||
"""
|
"""
|
||||||
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
|
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
|
||||||
ca_key = None
|
ca_key = None
|
||||||
if passphrase is not None:
|
if passphrase is not None:
|
||||||
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase)
|
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str,
|
||||||
|
passphrase)
|
||||||
else:
|
else:
|
||||||
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
|
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
|
||||||
req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str)
|
req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM,
|
||||||
|
request_cert_str)
|
||||||
|
|
||||||
cert = crypto.X509()
|
cert = crypto.X509()
|
||||||
cert.set_subject(req_cert.get_subject())
|
cert.set_subject(req_cert.get_subject())
|
||||||
@@ -217,7 +262,8 @@ class OpenSSLWrapper(object):
|
|||||||
def verify_chain(self, cert_chain_str_list, cert_str):
|
def verify_chain(self, cert_chain_str_list, cert_str):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param cert_chain_str_list: Must be a list of certificate strings, where the first certificate to be validate
|
:param cert_chain_str_list: Must be a list of certificate strings,
|
||||||
|
where the first certificate to be validate
|
||||||
is in the beginning and the root certificate is last.
|
is in the beginning and the root certificate is last.
|
||||||
:param cert_str: The certificate to be validated.
|
:param cert_str: The certificate to be validated.
|
||||||
:return:
|
:return:
|
||||||
@@ -229,7 +275,8 @@ class OpenSSLWrapper(object):
|
|||||||
else:
|
else:
|
||||||
cert_str = tmp_cert_str
|
cert_str = tmp_cert_str
|
||||||
return (True,
|
return (True,
|
||||||
"Signed certificate is valid and correctly signed by CA certificate.")
|
"Signed certificate is valid and correctly signed by CA "
|
||||||
|
"certificate.")
|
||||||
|
|
||||||
def certificate_not_valid_yet(self, cert):
|
def certificate_not_valid_yet(self, cert):
|
||||||
starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
|
starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
|
||||||
@@ -243,18 +290,24 @@ class OpenSSLWrapper(object):
|
|||||||
"""
|
"""
|
||||||
Verifies if a certificate is valid and signed by a given certificate.
|
Verifies if a certificate is valid and signed by a given certificate.
|
||||||
|
|
||||||
:param signing_cert_str: This certificate will be used to verify the signature. Must be a string representation
|
:param signing_cert_str: This certificate will be used to verify the
|
||||||
of the certificate. If you only have a file use the method read_str_from_file to
|
signature. Must be a string representation
|
||||||
|
of the certificate. If you only have a file
|
||||||
|
use the method read_str_from_file to
|
||||||
get a string representation.
|
get a string representation.
|
||||||
:param cert_str: This certificate will be verified if it is correct. Must be a string representation
|
:param cert_str: This certificate will be verified if it is
|
||||||
of the certificate. If you only have a file use the method read_str_from_file to
|
correct. Must be a string representation
|
||||||
|
of the certificate. If you only have a file
|
||||||
|
use the method read_str_from_file to
|
||||||
get a string representation.
|
get a string representation.
|
||||||
:return: Valid, Message
|
:return: Valid, Message
|
||||||
Valid = True if the certificate is valid, otherwise false.
|
Valid = True if the certificate is valid,
|
||||||
|
otherwise false.
|
||||||
Message = Why the validation failed.
|
Message = Why the validation failed.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str)
|
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
|
||||||
|
signing_cert_str)
|
||||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
|
||||||
|
|
||||||
if self.certificate_not_valid_yet(ca_cert):
|
if self.certificate_not_valid_yet(ca_cert):
|
||||||
@@ -270,7 +323,8 @@ class OpenSSLWrapper(object):
|
|||||||
return False, "The signed certificate is not valid yet."
|
return False, "The signed certificate is not valid yet."
|
||||||
|
|
||||||
if ca_cert.get_subject().CN == cert.get_subject().CN:
|
if ca_cert.get_subject().CN == cert.get_subject().CN:
|
||||||
return False, "CN may not be equal for CA certificate and the signed certificate."
|
return False, ("CN may not be equal for CA certificate and the "
|
||||||
|
"signed certificate.")
|
||||||
|
|
||||||
cert_algorithm = cert.get_signature_algorithm()
|
cert_algorithm = cert.get_signature_algorithm()
|
||||||
|
|
||||||
@@ -279,9 +333,9 @@ class OpenSSLWrapper(object):
|
|||||||
der_seq = asn1.DerSequence()
|
der_seq = asn1.DerSequence()
|
||||||
der_seq.decode(cert_asn1)
|
der_seq.decode(cert_asn1)
|
||||||
|
|
||||||
cert_certificate=der_seq[0]
|
cert_certificate = der_seq[0]
|
||||||
#cert_signature_algorithm=der_seq[1]
|
#cert_signature_algorithm=der_seq[1]
|
||||||
cert_signature=der_seq[2]
|
cert_signature = der_seq[2]
|
||||||
|
|
||||||
cert_signature_decoded = asn1.DerObject()
|
cert_signature_decoded = asn1.DerObject()
|
||||||
cert_signature_decoded.decode(cert_signature)
|
cert_signature_decoded.decode(cert_signature)
|
||||||
@@ -289,12 +343,14 @@ class OpenSSLWrapper(object):
|
|||||||
signature_payload = cert_signature_decoded.payload
|
signature_payload = cert_signature_decoded.payload
|
||||||
|
|
||||||
if signature_payload[0] != '\x00':
|
if signature_payload[0] != '\x00':
|
||||||
return False, "The certificate should not contain any unused bits."
|
return (False,
|
||||||
|
"The certificate should not contain any unused bits.")
|
||||||
|
|
||||||
signature = signature_payload[1:]
|
signature = signature_payload[1:]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
crypto.verify(ca_cert, signature, cert_certificate, cert_algorithm)
|
crypto.verify(ca_cert, signature, cert_certificate,
|
||||||
|
cert_algorithm)
|
||||||
return True, "Signed certificate is valid and correctly signed by CA certificate."
|
return True, "Signed certificate is valid and correctly signed by CA certificate."
|
||||||
except crypto.Error, e:
|
except crypto.Error, e:
|
||||||
return False, "Certificate is incorrectly signed."
|
return False, "Certificate is incorrectly signed."
|
||||||
|
|||||||
@@ -521,7 +521,6 @@ class Server(Entity):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
_authn = authn
|
_authn = authn
|
||||||
response = None
|
|
||||||
if (sign_assertion or sign_response) and self.sec.cert_handler.generate_cert():
|
if (sign_assertion or sign_response) and self.sec.cert_handler.generate_cert():
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.sec.cert_handler.update_cert(True)
|
self.sec.cert_handler.update_cert(True)
|
||||||
@@ -536,7 +535,8 @@ class Server(Entity):
|
|||||||
sign_assertion=sign_assertion,
|
sign_assertion=sign_assertion,
|
||||||
sign_response=sign_response,
|
sign_response=sign_response,
|
||||||
best_effort=best_effort,
|
best_effort=best_effort,
|
||||||
encrypt_assertion=encrypt_assertion, encrypt_cert=encrypt_cert)
|
encrypt_assertion=encrypt_assertion,
|
||||||
|
encrypt_cert=encrypt_cert)
|
||||||
return self._authn_response(in_response_to, # in_response_to
|
return self._authn_response(in_response_to, # in_response_to
|
||||||
destination, # consumer_url
|
destination, # consumer_url
|
||||||
sp_entity_id, # sp_entity_id
|
sp_entity_id, # sp_entity_id
|
||||||
@@ -548,7 +548,8 @@ class Server(Entity):
|
|||||||
sign_assertion=sign_assertion,
|
sign_assertion=sign_assertion,
|
||||||
sign_response=sign_response,
|
sign_response=sign_response,
|
||||||
best_effort=best_effort,
|
best_effort=best_effort,
|
||||||
encrypt_assertion=encrypt_assertion, encrypt_cert=encrypt_cert)
|
encrypt_assertion=encrypt_assertion,
|
||||||
|
encrypt_cert=encrypt_cert)
|
||||||
|
|
||||||
except MissingValue, exc:
|
except MissingValue, exc:
|
||||||
return self.create_error_response(in_response_to, destination,
|
return self.create_error_response(in_response_to, destination,
|
||||||
|
|||||||
@@ -1011,6 +1011,7 @@ def security_context(conf, debug=None):
|
|||||||
tmp_key_file=conf.tmp_key_file,
|
tmp_key_file=conf.tmp_key_file,
|
||||||
validate_certificate=conf.validate_certificate)
|
validate_certificate=conf.validate_certificate)
|
||||||
|
|
||||||
|
|
||||||
def encrypt_cert_from_item(item):
|
def encrypt_cert_from_item(item):
|
||||||
_encrypt_cert = None
|
_encrypt_cert = None
|
||||||
try:
|
try:
|
||||||
@@ -1031,6 +1032,7 @@ def encrypt_cert_from_item(item):
|
|||||||
return None
|
return None
|
||||||
return _encrypt_cert
|
return _encrypt_cert
|
||||||
|
|
||||||
|
|
||||||
class CertHandlerExtra(object):
|
class CertHandlerExtra(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
@@ -1488,7 +1490,8 @@ class SecurityContext(object):
|
|||||||
return self.correctly_signed_message(decoded_xml, "assertion", must,
|
return self.correctly_signed_message(decoded_xml, "assertion", must,
|
||||||
origdoc, only_valid_cert)
|
origdoc, only_valid_cert)
|
||||||
|
|
||||||
def correctly_signed_response(self, decoded_xml, must=False, origdoc=None,only_valid_cert=False,
|
def correctly_signed_response(self, decoded_xml, must=False, origdoc=None,
|
||||||
|
only_valid_cert=False,
|
||||||
require_response_signature=False, **kwargs):
|
require_response_signature=False, **kwargs):
|
||||||
""" Check if a instance is correctly signed, if we have metadata for
|
""" Check if a instance is correctly signed, if we have metadata for
|
||||||
the IdP that sent the info use that, if not use the key that are in
|
the IdP that sent the info use that, if not use the key that are in
|
||||||
|
|||||||
Reference in New Issue
Block a user