diff --git a/neutron_lbaas/common/tls_utils/cert_parser.py b/neutron_lbaas/common/tls_utils/cert_parser.py index 71d565355..d5451b369 100644 --- a/neutron_lbaas/common/tls_utils/cert_parser.py +++ b/neutron_lbaas/common/tls_utils/cert_parser.py @@ -15,16 +15,18 @@ from cryptography.hazmat import backends from cryptography.hazmat.primitives import serialization -import neutron_lbaas.common.exceptions as exceptions -from OpenSSL import crypto -from OpenSSL import SSL -import pyasn1.codec.der.decoder as decoder -import pyasn1_modules.rfc2459 as rfc2459 +from cryptography import x509 +from neutron.i18n import _LE +from oslo_log import log as logging import six +import neutron_lbaas.common.exceptions as exceptions + X509_BEG = "-----BEGIN CERTIFICATE-----" X509_END = "-----END CERTIFICATE-----" +LOG = logging.getLogger(__name__) + def validate_cert(certificate, private_key=None, private_key_passphrase=None, intermediates=None): @@ -40,32 +42,32 @@ def validate_cert(certificate, private_key=None, :param intermediates: PEM encoded intermediate certificates :returns: boolean """ - x509 = _get_x509_from_pem_bytes(certificate) + + cert = _get_x509_from_pem_bytes(certificate) if intermediates: for x509Pem in _split_x509s(intermediates): _get_x509_from_pem_bytes(x509Pem) if private_key: pkey = _read_privatekey(private_key, passphrase=private_key_passphrase) - ctx = SSL.Context(SSL.TLSv1_METHOD) - ctx.use_certificate(x509) - try: - ctx.use_privatekey(pkey) - ctx.check_privatekey() - except Exception: + pknum = pkey.public_key().public_numbers() + certnum = cert.public_key().public_numbers() + if pknum != certnum: raise exceptions.MisMatchedKey return True def _read_privatekey(privatekey_pem, passphrase=None): - def cb(*args): - if passphrase: - if six.PY2: - return passphrase.encode("utf-8") - elif six.PY3: - return six.b(passphrase) - else: - raise exceptions.NeedsPassphrase - return crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_pem, cb) + if passphrase: + if six.PY2: + passphrase = passphrase.encode("utf-8") + elif six.PY3: + passphrase = six.b(passphrase) + + try: + return serialization.load_pem_private_key(privatekey_pem, passphrase, + backends.default_backend()) + except Exception: + raise exceptions.NeedsPassphrase def _split_x509s(x509Str): @@ -128,8 +130,7 @@ def dump_private_key(private_key, private_key_passphrase=None): def get_host_names(certificate): - """ - Extract the host names from the Pem encoded X509 certificate + """Extract the host names from the Pem encoded X509 certificate :param certificate: A PEM encoded certificate :returns: A dictionary containing the following keys: @@ -138,27 +139,29 @@ def get_host_names(certificate): 'dns_names' is a list of dNSNames (possibly empty) from the SubjectAltNames of the certificate. """ + try: + certificate = certificate.encode('ascii') - x509 = _get_x509_from_pem_bytes(certificate) - hostNames = {} - if hasattr(x509.get_subject(), 'CN'): - hostNames['cn'] = x509.get_subject().CN - hostNames['dns_names'] = [] - num_exts = x509.get_extension_count() - for i in range(0, num_exts): - ext = x509.get_extension(i) - short_name = ext.get_short_name() - if short_name == six.b('subjectAltName'): - data = ext.get_data() - general_names_container = decoder.decode( - data, asn1Spec=rfc2459.GeneralNames()) - for general_names in general_names_container[0]: - currName = general_names.getName() - if currName == 'dNSName': - octets = general_names.getComponent().asOctets() - decoded = octets.decode("utf-8") - hostNames['dns_names'].append(decoded) - return hostNames + cert = _get_x509_from_pem_bytes(certificate) + cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0] + host_names = { + 'cn': cn.value.lower(), + 'dns_names': [] + } + try: + ext = cert.extensions.get_extension_for_oid( + x509.OID_SUBJECT_ALTERNATIVE_NAME + ) + host_names['dns_names'] = ext.value.get_values_for_type( + x509.DNSName) + except x509.ExtensionNotFound: + LOG.debug("%s extension not found", + x509.OID_SUBJECT_ALTERNATIVE_NAME) + + return host_names + except Exception: + LOG.exception(_LE("Unreadable certificate.")) + raise exceptions.UnreadableCert def _get_x509_from_pem_bytes(certificate_pem): @@ -166,11 +169,11 @@ def _get_x509_from_pem_bytes(certificate_pem): Parse X509 data from a PEM encoded certificate :param certificate_pem: Certificate in PEM format - :returns: pyOpenSSL high-level x509 data from the PEM string + :returns: crypto high-level x509 data from the PEM string """ try: - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, - certificate_pem) + x509cert = x509.load_pem_x509_certificate(certificate_pem, + backends.default_backend()) except Exception: raise exceptions.UnreadableCert - return x509 + return x509cert diff --git a/neutron_lbaas/tests/unit/common/tls_utils/test_cert_parser.py b/neutron_lbaas/tests/unit/common/tls_utils/test_cert_parser.py index 663df8549..8f301d660 100644 --- a/neutron_lbaas/tests/unit/common/tls_utils/test_cert_parser.py +++ b/neutron_lbaas/tests/unit/common/tls_utils/test_cert_parser.py @@ -13,49 +13,49 @@ # License for the specific language governing permissions and limitations # under the License. +from cryptography import x509 import neutron_lbaas.common.exceptions as exceptions import neutron_lbaas.common.tls_utils.cert_parser as cert_parser from neutron_lbaas.tests import base ALT_EXT_CRT = """-----BEGIN CERTIFICATE----- -MIIGxDCCBaygAwIBAgIGAUp0fCElMA0GCSqGSIb3DQEBDQUAMIGLMQswCQYDVQQG -EwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYD -VQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJh -YXMxHjAcBgNVBAMMFXd3dy5DTkZyb21TdWJqZWN0Lm9yZzAeFw0xNDExMjIwMDEx -MzlaFw0yMjEyMjEwMDExMzlaMIGLMQswCQYDVQQGEwJVUzEOMAwGA1UECAwFVGV4 -YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYDVQQKDBVPcGVuU3RhY2sgRXhw -ZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJhYXMxHjAcBgNVBAMMFXd3dy5D -TkZyb21TdWJqZWN0Lm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB -ALL1nmbDPUDps84i1sM3rhHrc+Dlu0N/wKQWKZFeiWUtF/pot19V3o0yXDpsg7W5 -RkLMTFkZEcnQpyGdpAGjTjzmNXMZw99EzxsmrR3l6hUEISifVbvEuftYZT6jPxM5 -ML6WAjFNaBEZPWtZi8CgX5xdjdrDNndwyHob49n7Nc/h1kVqqBqMILabTqC6yEcx -S/B+DugVuuYbEdYYYElQUMfM+mUdULrSqIVl2n5AvvSFjWzWzfgPyp4QKn+f7HVR -T62bh/XjQ88n1tMYNAEqixRZTPgqY1LFl9VJVgRp9fdL6ttMurOR3C0STJ5qCdKB -L7LrpbY4u8dEragRC6YAyI8CAwEAAaOCAyowggMmMAwGA1UdEwEB/wQCMAAwDgYD -VR0PAQH/BAQDAgO4MIIDBAYDVR0RBIIC+zCCAveCGXd3dy5ob3N0RnJvbV9kTlNO -YW1lMS5jb22CGXd3dy5ob3N0RnJvbV9kTlNOYW1lMi5jb22CGXd3dy5ob3N0RnJv -bV9kTlNOYW1lMy5jb22BEW5vb25lQG5vd2hlcmUub3JnpIGPMIGMMQswCQYDVQQG -EwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYD -VQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJh -YXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lMS5vcmekgY8wgYwxCzAJBgNV -BAYTAlVTMQ4wDAYDVQQIDAVUZXhhczEUMBIGA1UEBwwLU2FuIEFudG9uaW8xHjAc -BgNVBAoMFU9wZW5TdGFjayBFeHBlcmltZW50czEWMBQGA1UECwwNTmV1dHJvbiBM -YmFhczEfMB0GA1UEAwwWd3d3LmNuRnJvbUFsdE5hbWUyLm9yZ6SBjzCBjDELMAkG -A1UEBhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25pbzEe -MBwGA1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0cm9u -IExiYWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTMub3JnpIGPMIGMMQsw -CQYDVQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlv -MR4wHAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRy -b24gTGJhYXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lNC5vcmeHBAoBAgOH -EAEjRWeJq83v97PVkeaixICGFmh0dHA6Ly93d3cuZXhhbXBsZS5jb22CGXd3dy5o -b3N0RnJvbV9kTlNOYW1lNC5jb20wDQYJKoZIhvcNAQENBQADggEBAICUCDMhDf0f -cvX5mVnq4Q3+SM/nl03Gse6J0JdpFivS4hl+uZs0TAFYpfEPpAa7KKxD229kbCiQ -kyxf8fzADSl77RQbL6Lxa8K/c66mVNiuVvTHV4r/nDNcRYN9fGArw/Ho7VX+HVQ6 -UW1t/uvXeyg695t7kzZmvg0ChD5kS848d2rXu2MhwHwXA8rbuK6gxVY97fbzBNlj -aiPJUAb8lqZMShd+3yVCgMmV0J20u2b5pSdO+LHQ7NfVqURk2pcHD8slfHzXT58q -YB90v0pSVP6mzHGyLxETZZz0nhaH9EjOyFkQI84ORT8Kmd5Y04gSI0LTKKF1eMNE -TyNC+MtsRdA= +MIIGqjCCBZKgAwIBAgIJAIApBg8slSSiMA0GCSqGSIb3DQEBBQUAMIGLMQswCQYD +VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w +HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g +TGJhYXMxHjAcBgNVBAMMFXd3dy5DTkZyb21TdWJqZWN0Lm9yZzAeFw0xNTA1MjEy +MDMzMjNaFw0yNTA1MTgyMDMzMjNaMIGLMQswCQYDVQQGEwJVUzEOMAwGA1UECAwF +VGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYDVQQKDBVPcGVuU3RhY2sg +RXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJhYXMxHjAcBgNVBAMMFXd3 +dy5DTkZyb21TdWJqZWN0Lm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBALL1nmbDPUDps84i1sM3rhHrc+Dlu0N/wKQWKZFeiWUtF/pot19V3o0yXDps +g7W5RkLMTFkZEcnQpyGdpAGjTjzmNXMZw99EzxsmrR3l6hUEISifVbvEuftYZT6j +PxM5ML6WAjFNaBEZPWtZi8CgX5xdjdrDNndwyHob49n7Nc/h1kVqqBqMILabTqC6 +yEcxS/B+DugVuuYbEdYYYElQUMfM+mUdULrSqIVl2n5AvvSFjWzWzfgPyp4QKn+f +7HVRT62bh/XjQ88n1tMYNAEqixRZTPgqY1LFl9VJVgRp9fdL6ttMurOR3C0STJ5q +CdKBL7LrpbY4u8dEragRC6YAyI8CAwEAAaOCAw0wggMJMAkGA1UdEwQCMAAwCwYD +VR0PBAQDAgXgMIIC7QYDVR0RBIIC5DCCAuCCGHd3dy5ob3N0RnJvbUROU05hbWUx +LmNvbYIYd3d3Lmhvc3RGcm9tRE5TTmFtZTIuY29tghh3d3cuaG9zdEZyb21ETlNO +YW1lMy5jb22CGHd3dy5ob3N0RnJvbUROU05hbWU0LmNvbYcECgECA4cQASNFZ4mr +ze/3s9WR5qLEgIYWaHR0cDovL3d3dy5leGFtcGxlLmNvbaSBjzCBjDELMAkGA1UE +BhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25pbzEeMBwG +A1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0cm9uIExi +YWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTEub3JnpIGPMIGMMQswCQYD +VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w +HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g +TGJhYXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lMi5vcmekgY8wgYwxCzAJ +BgNVBAYTAlVTMQ4wDAYDVQQIDAVUZXhhczEUMBIGA1UEBwwLU2FuIEFudG9uaW8x +HjAcBgNVBAoMFU9wZW5TdGFjayBFeHBlcmltZW50czEWMBQGA1UECwwNTmV1dHJv +biBMYmFhczEfMB0GA1UEAwwWd3d3LmNuRnJvbUFsdE5hbWUzLm9yZ6SBjzCBjDEL +MAkGA1UEBhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25p +bzEeMBwGA1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0 +cm9uIExiYWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTQub3JnMA0GCSqG +SIb3DQEBBQUAA4IBAQCS6iDn6R3C+qJLZibaqrBSkM9yu5kwRsQ6lQ+DODvVYGWq +eGkkh5o2c6WbJlH44yF280+HvnJcuISD7epPHJN0vUM9+WMtXfEli9avFHgu2JxP +3P0ixK2kaJnqKQkSEdnA/v/eWP1Cd2v6rbKCIo9d2gSP0cnpdtlX9Zk3SzEh0V7s +RjSdfZoAvz0aAnpDHlTerLcz5T2aiRae2wSt/RLA3qDO1Ji05tWvQBmKuepxS6A1 +tL4Drm+OCXJwTrE7ClTMCwcrZnLl4tI+Z+X3DV92WQB8ldST/QFjz1hgs/4zrADA +elu2c/X7MR4ObOjhDfaVGQ8kMhYf5hx69qyNDsGi -----END CERTIFICATE----- """ @@ -237,11 +237,11 @@ def _get_rsa_numbers(private_key, private_key_passphrase=None): class TestTLSParseUtils(base.BaseTestCase): def test_alt_subject_name_parses(self): hosts = cert_parser.get_host_names(ALT_EXT_CRT) - self.assertEqual('www.CNFromSubject.org', hosts['cn']) - self.assertEqual('www.hostFrom_dNSName1.com', hosts['dns_names'][0]) - self.assertEqual('www.hostFrom_dNSName2.com', hosts['dns_names'][1]) - self.assertEqual('www.hostFrom_dNSName3.com', hosts['dns_names'][2]) - self.assertEqual('www.hostFrom_dNSName4.com', hosts['dns_names'][3]) + self.assertEqual('www.cnfromsubject.org', hosts['cn']) + self.assertEqual('www.hostfromdnsname1.com', hosts['dns_names'][0]) + self.assertEqual('www.hostfromdnsname2.com', hosts['dns_names'][1]) + self.assertEqual('www.hostfromdnsname3.com', hosts['dns_names'][2]) + self.assertEqual('www.hostfromdnsname4.com', hosts['dns_names'][3]) def test_x509_parses(self): self.assertRaises(exceptions.UnreadableCert, @@ -260,19 +260,17 @@ class TestTLSParseUtils(base.BaseTestCase): self.assertRaises(exceptions.NeedsPassphrase, cert_parser._read_privatekey, ENCRYPTED_PKCS8_CRT_KEY) - epkey = cert_parser._read_privatekey( + cert_parser._read_privatekey( ENCRYPTED_PKCS8_CRT_KEY, passphrase=ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE) - self.assertTrue(epkey.check()) def test_read_private_key_unicode(self): self.assertRaises(exceptions.NeedsPassphrase, cert_parser._read_privatekey, ENCRYPTED_PKCS8_CRT_KEY) - epkey = cert_parser._read_privatekey( + cert_parser._read_privatekey( ENCRYPTED_PKCS8_CRT_KEY, passphrase=u'{0}'.format(ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE)) - self.assertTrue(epkey.check()) def test_dump_private_key(self): self.assertRaises(exceptions.NeedsPassphrase, @@ -303,4 +301,6 @@ class TestTLSParseUtils(base.BaseTestCase): imds.append(cert_parser._get_x509_from_pem_bytes(x509Pem)) for i in range(0, len(imds)): - self.assertEqual(EXPECTED_IMD_SUBJS[i], imds[i].get_subject().CN) + self.assertEqual(EXPECTED_IMD_SUBJS[i], + imds[i].subject.get_attributes_for_oid( + x509.OID_COMMON_NAME)[0].value)