Swap pycrypto* for pyca/cryptography
pyOpenSSL is already a dependency and pyOpenSSL uses cryptography. This also reduces the complexity of the code significantly in several places (and removes the need to directly manipulate asn1). A future PR could remove pyOpenSSL entirely as all the cert behavior is supported directly by cryptography.
This commit is contained in:
		
				
					committed by
					
						
						Ian Cordasco
					
				
			
			
				
	
			
			
			
						parent
						
							08815f9322
						
					
				
				
					commit
					afdf5b4a8c
				
			
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							@@ -14,7 +14,7 @@ install_requires = [
 | 
			
		||||
    'paste',
 | 
			
		||||
    'zope.interface',
 | 
			
		||||
    'repoze.who',
 | 
			
		||||
    'pycryptodomex',
 | 
			
		||||
    'cryptography',
 | 
			
		||||
    'pytz',
 | 
			
		||||
    'pyOpenSSL',
 | 
			
		||||
    'python-dateutil',
 | 
			
		||||
 
 | 
			
		||||
@@ -8,7 +8,11 @@ import six
 | 
			
		||||
from OpenSSL import crypto
 | 
			
		||||
from os.path import join
 | 
			
		||||
from os import remove
 | 
			
		||||
from Cryptodome.Util import asn1
 | 
			
		||||
 | 
			
		||||
from cryptography.hazmat.backends import default_backend
 | 
			
		||||
from cryptography.x509 import load_pem_x509_certificate
 | 
			
		||||
 | 
			
		||||
backend = default_backend()
 | 
			
		||||
 | 
			
		||||
class WrongInput(Exception):
 | 
			
		||||
    pass
 | 
			
		||||
@@ -194,9 +198,8 @@ class OpenSSLWrapper(object):
 | 
			
		||||
        f.close()
 | 
			
		||||
 | 
			
		||||
    def read_str_from_file(self, file, type="pem"):
 | 
			
		||||
        f = open(file, 'rt')
 | 
			
		||||
        str_data = f.read()
 | 
			
		||||
        f.close()
 | 
			
		||||
        with open(file, 'rb') as f:
 | 
			
		||||
            str_data = f.read()
 | 
			
		||||
 | 
			
		||||
        if type == "pem":
 | 
			
		||||
            return str_data
 | 
			
		||||
@@ -336,31 +339,13 @@ class OpenSSLWrapper(object):
 | 
			
		||||
            cert_algorithm = cert.get_signature_algorithm()
 | 
			
		||||
            if six.PY3:
 | 
			
		||||
                cert_algorithm = cert_algorithm.decode('ascii')
 | 
			
		||||
                cert_str = cert_str.encode('ascii')
 | 
			
		||||
 | 
			
		||||
            cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
 | 
			
		||||
 | 
			
		||||
            der_seq = asn1.DerSequence()
 | 
			
		||||
            der_seq.decode(cert_asn1)
 | 
			
		||||
 | 
			
		||||
            cert_certificate = der_seq[0]
 | 
			
		||||
            #cert_signature_algorithm=der_seq[1]
 | 
			
		||||
            cert_signature = der_seq[2]
 | 
			
		||||
 | 
			
		||||
            cert_signature_decoded = asn1.DerObject()
 | 
			
		||||
            cert_signature_decoded.decode(cert_signature)
 | 
			
		||||
 | 
			
		||||
            signature_payload = cert_signature_decoded.payload
 | 
			
		||||
 | 
			
		||||
            sig_pay0 = signature_payload[0]
 | 
			
		||||
            if ((isinstance(sig_pay0, int) and sig_pay0 != 0) or
 | 
			
		||||
                (isinstance(sig_pay0, str) and sig_pay0 != '\x00')):
 | 
			
		||||
                return (False,
 | 
			
		||||
                       "The certificate should not contain any unused bits.")
 | 
			
		||||
 | 
			
		||||
            signature = signature_payload[1:]
 | 
			
		||||
            cert_crypto = load_pem_x509_certificate(cert_str, backend)
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                crypto.verify(ca_cert, signature, cert_certificate,
 | 
			
		||||
                crypto.verify(ca_cert, cert_crypto.signature,
 | 
			
		||||
                              cert_crypto.tbs_certificate_bytes,
 | 
			
		||||
                              cert_algorithm)
 | 
			
		||||
                return True, "Signed certificate is valid and correctly signed by CA certificate."
 | 
			
		||||
            except crypto.Error as e:
 | 
			
		||||
 
 | 
			
		||||
@@ -7,9 +7,6 @@ import six
 | 
			
		||||
from binascii import hexlify
 | 
			
		||||
from hashlib import sha1
 | 
			
		||||
 | 
			
		||||
# from Crypto.PublicKey import RSA
 | 
			
		||||
from Cryptodome.PublicKey import RSA
 | 
			
		||||
 | 
			
		||||
from saml2.metadata import ENDPOINTS
 | 
			
		||||
from saml2.profile import paos, ecp
 | 
			
		||||
from saml2.soap import parse_soap_enveloped_saml_artifact_resolve
 | 
			
		||||
 
 | 
			
		||||
@@ -19,25 +19,13 @@ from binascii import hexlify
 | 
			
		||||
 | 
			
		||||
from future.backports.urllib.parse import urlencode
 | 
			
		||||
 | 
			
		||||
# from Crypto.PublicKey.RSA import importKey
 | 
			
		||||
# from Crypto.Signature import PKCS1_v1_5
 | 
			
		||||
# from Crypto.Util.asn1 import DerSequence
 | 
			
		||||
# from Crypto.PublicKey import RSA
 | 
			
		||||
# from Crypto.Hash import SHA
 | 
			
		||||
# from Crypto.Hash import SHA224
 | 
			
		||||
# from Crypto.Hash import SHA256
 | 
			
		||||
# from Crypto.Hash import SHA384
 | 
			
		||||
# from Crypto.Hash import SHA512
 | 
			
		||||
 | 
			
		||||
from Cryptodome.PublicKey.RSA import importKey
 | 
			
		||||
from Cryptodome.Signature import PKCS1_v1_5
 | 
			
		||||
from Cryptodome.Util.asn1 import DerSequence
 | 
			
		||||
from Cryptodome.PublicKey import RSA
 | 
			
		||||
from Cryptodome.Hash import SHA
 | 
			
		||||
from Cryptodome.Hash import SHA224
 | 
			
		||||
from Cryptodome.Hash import SHA256
 | 
			
		||||
from Cryptodome.Hash import SHA384
 | 
			
		||||
from Cryptodome.Hash import SHA512
 | 
			
		||||
from cryptography.exceptions import InvalidSignature
 | 
			
		||||
from cryptography.hazmat.backends import default_backend
 | 
			
		||||
from cryptography.hazmat.primitives import hashes
 | 
			
		||||
from cryptography.hazmat.primitives.asymmetric import rsa
 | 
			
		||||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
 | 
			
		||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
 | 
			
		||||
from cryptography.x509 import load_pem_x509_certificate
 | 
			
		||||
 | 
			
		||||
from tempfile import NamedTemporaryFile
 | 
			
		||||
from subprocess import Popen
 | 
			
		||||
@@ -87,6 +75,8 @@ XMLTAG = "<?xml version='1.0'?>"
 | 
			
		||||
PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>"
 | 
			
		||||
PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>'
 | 
			
		||||
 | 
			
		||||
backend = default_backend()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SigverError(SAMLError):
 | 
			
		||||
    pass
 | 
			
		||||
@@ -406,18 +396,10 @@ def active_cert(key):
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        cert_str = pem_format(key)
 | 
			
		||||
        try:
 | 
			
		||||
            certificate = importKey(cert_str)
 | 
			
		||||
            not_before = to_time(str(certificate.get_not_before()))
 | 
			
		||||
            not_after = to_time(str(certificate.get_not_after()))
 | 
			
		||||
            assert not_before < utc_now()
 | 
			
		||||
            assert not_after > utc_now()
 | 
			
		||||
            return True
 | 
			
		||||
        except:
 | 
			
		||||
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
 | 
			
		||||
            assert cert.has_expired() == 0
 | 
			
		||||
            assert not OpenSSLWrapper().certificate_not_valid_yet(cert)
 | 
			
		||||
            return True
 | 
			
		||||
        cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
 | 
			
		||||
        assert cert.has_expired() == 0
 | 
			
		||||
        assert not OpenSSLWrapper().certificate_not_valid_yet(cert)
 | 
			
		||||
        return True
 | 
			
		||||
    except AssertionError:
 | 
			
		||||
        return False
 | 
			
		||||
    except AttributeError:
 | 
			
		||||
@@ -555,19 +537,8 @@ def rsa_eq(key1, key2):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def extract_rsa_key_from_x509_cert(pem):
 | 
			
		||||
    # Convert from PEM to DER
 | 
			
		||||
    der = ssl.PEM_cert_to_DER_cert(pem.decode('ascii'))
 | 
			
		||||
 | 
			
		||||
    # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
 | 
			
		||||
    cert = DerSequence()
 | 
			
		||||
    cert.decode(der)
 | 
			
		||||
    tbsCertificate = DerSequence()
 | 
			
		||||
    tbsCertificate.decode(cert[0])
 | 
			
		||||
    subjectPublicKeyInfo = tbsCertificate[6]
 | 
			
		||||
 | 
			
		||||
    # Initialize RSA key
 | 
			
		||||
    rsa_key = RSA.importKey(subjectPublicKeyInfo)
 | 
			
		||||
    return rsa_key
 | 
			
		||||
    cert = load_pem_x509_certificate(pem, backend)
 | 
			
		||||
    return cert.public_key()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pem_format(key):
 | 
			
		||||
@@ -576,7 +547,7 @@ def pem_format(key):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def import_rsa_key_from_file(filename):
 | 
			
		||||
    return RSA.importKey(read_file(filename, 'r'))
 | 
			
		||||
    return load_pem_private_key(read_file(filename, 'rb'), None, backend)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def parse_xmlsec_output(output):
 | 
			
		||||
@@ -622,25 +593,28 @@ class RSASigner(Signer):
 | 
			
		||||
        if key is None:
 | 
			
		||||
            key = self.key
 | 
			
		||||
 | 
			
		||||
        h = self.digest.new(msg)
 | 
			
		||||
        signer = PKCS1_v1_5.new(key)
 | 
			
		||||
        return signer.sign(h)
 | 
			
		||||
        return key.sign(msg, PKCS1v15(), self.digest)
 | 
			
		||||
 | 
			
		||||
    def verify(self, msg, sig, key=None):
 | 
			
		||||
        if key is None:
 | 
			
		||||
            key = self.key
 | 
			
		||||
 | 
			
		||||
        h = self.digest.new(msg)
 | 
			
		||||
        verifier = PKCS1_v1_5.new(key)
 | 
			
		||||
        return verifier.verify(h, sig)
 | 
			
		||||
        try:
 | 
			
		||||
            if isinstance(key, rsa.RSAPrivateKey):
 | 
			
		||||
                key = key.public_key()
 | 
			
		||||
 | 
			
		||||
            key.verify(sig, msg, PKCS1v15(), self.digest)
 | 
			
		||||
            return True
 | 
			
		||||
        except InvalidSignature:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
SIGNER_ALGS = {
 | 
			
		||||
    SIG_RSA_SHA1: RSASigner(SHA),
 | 
			
		||||
    SIG_RSA_SHA224: RSASigner(SHA224),
 | 
			
		||||
    SIG_RSA_SHA256: RSASigner(SHA256),
 | 
			
		||||
    SIG_RSA_SHA384: RSASigner(SHA384),
 | 
			
		||||
    SIG_RSA_SHA512: RSASigner(SHA512),
 | 
			
		||||
    SIG_RSA_SHA1: RSASigner(hashes.SHA1()),
 | 
			
		||||
    SIG_RSA_SHA224: RSASigner(hashes.SHA224()),
 | 
			
		||||
    SIG_RSA_SHA256: RSASigner(hashes.SHA256()),
 | 
			
		||||
    SIG_RSA_SHA384: RSASigner(hashes.SHA384()),
 | 
			
		||||
    SIG_RSA_SHA512: RSASigner(hashes.SHA512()),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user