Remove OpenSSL from cert_parser

Utilize cryptography package instead of OpenSSL in the cert_parser

Change-Id: I66aa62d7f1f9ce1898bbb8593e4aedd1a1163137
Closes-Bug: #1532344
This commit is contained in:
ptoohill1 2016-01-08 16:33:19 -06:00
parent 578a750181
commit bb34d71c77
2 changed files with 97 additions and 94 deletions

View File

@ -15,16 +15,18 @@
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
import neutron_lbaas.common.exceptions as exceptions
from OpenSSL import crypto
from OpenSSL import SSL
import pyasn1.codec.der.decoder as decoder
import pyasn1_modules.rfc2459 as rfc2459
from cryptography import x509
from neutron.i18n import _LE
from oslo_log import log as logging
import six
import neutron_lbaas.common.exceptions as exceptions
X509_BEG = "-----BEGIN CERTIFICATE-----"
X509_END = "-----END CERTIFICATE-----"
LOG = logging.getLogger(__name__)
def validate_cert(certificate, private_key=None,
private_key_passphrase=None, intermediates=None):
@ -40,32 +42,32 @@ def validate_cert(certificate, private_key=None,
:param intermediates: PEM encoded intermediate certificates
:returns: boolean
"""
x509 = _get_x509_from_pem_bytes(certificate)
cert = _get_x509_from_pem_bytes(certificate)
if intermediates:
for x509Pem in _split_x509s(intermediates):
_get_x509_from_pem_bytes(x509Pem)
if private_key:
pkey = _read_privatekey(private_key, passphrase=private_key_passphrase)
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.use_certificate(x509)
try:
ctx.use_privatekey(pkey)
ctx.check_privatekey()
except Exception:
pknum = pkey.public_key().public_numbers()
certnum = cert.public_key().public_numbers()
if pknum != certnum:
raise exceptions.MisMatchedKey
return True
def _read_privatekey(privatekey_pem, passphrase=None):
def cb(*args):
if passphrase:
if six.PY2:
return passphrase.encode("utf-8")
elif six.PY3:
return six.b(passphrase)
else:
raise exceptions.NeedsPassphrase
return crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_pem, cb)
if passphrase:
if six.PY2:
passphrase = passphrase.encode("utf-8")
elif six.PY3:
passphrase = six.b(passphrase)
try:
return serialization.load_pem_private_key(privatekey_pem, passphrase,
backends.default_backend())
except Exception:
raise exceptions.NeedsPassphrase
def _split_x509s(x509Str):
@ -128,8 +130,7 @@ def dump_private_key(private_key, private_key_passphrase=None):
def get_host_names(certificate):
"""
Extract the host names from the Pem encoded X509 certificate
"""Extract the host names from the Pem encoded X509 certificate
:param certificate: A PEM encoded certificate
:returns: A dictionary containing the following keys:
@ -138,27 +139,29 @@ def get_host_names(certificate):
'dns_names' is a list of dNSNames (possibly empty) from
the SubjectAltNames of the certificate.
"""
try:
certificate = certificate.encode('ascii')
x509 = _get_x509_from_pem_bytes(certificate)
hostNames = {}
if hasattr(x509.get_subject(), 'CN'):
hostNames['cn'] = x509.get_subject().CN
hostNames['dns_names'] = []
num_exts = x509.get_extension_count()
for i in range(0, num_exts):
ext = x509.get_extension(i)
short_name = ext.get_short_name()
if short_name == six.b('subjectAltName'):
data = ext.get_data()
general_names_container = decoder.decode(
data, asn1Spec=rfc2459.GeneralNames())
for general_names in general_names_container[0]:
currName = general_names.getName()
if currName == 'dNSName':
octets = general_names.getComponent().asOctets()
decoded = octets.decode("utf-8")
hostNames['dns_names'].append(decoded)
return hostNames
cert = _get_x509_from_pem_bytes(certificate)
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
host_names = {
'cn': cn.value.lower(),
'dns_names': []
}
try:
ext = cert.extensions.get_extension_for_oid(
x509.OID_SUBJECT_ALTERNATIVE_NAME
)
host_names['dns_names'] = ext.value.get_values_for_type(
x509.DNSName)
except x509.ExtensionNotFound:
LOG.debug("%s extension not found",
x509.OID_SUBJECT_ALTERNATIVE_NAME)
return host_names
except Exception:
LOG.exception(_LE("Unreadable certificate."))
raise exceptions.UnreadableCert
def _get_x509_from_pem_bytes(certificate_pem):
@ -166,11 +169,11 @@ def _get_x509_from_pem_bytes(certificate_pem):
Parse X509 data from a PEM encoded certificate
:param certificate_pem: Certificate in PEM format
:returns: pyOpenSSL high-level x509 data from the PEM string
:returns: crypto high-level x509 data from the PEM string
"""
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
certificate_pem)
x509cert = x509.load_pem_x509_certificate(certificate_pem,
backends.default_backend())
except Exception:
raise exceptions.UnreadableCert
return x509
return x509cert

View File

@ -13,49 +13,49 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography import x509
import neutron_lbaas.common.exceptions as exceptions
import neutron_lbaas.common.tls_utils.cert_parser as cert_parser
from neutron_lbaas.tests import base
ALT_EXT_CRT = """-----BEGIN CERTIFICATE-----
MIIGxDCCBaygAwIBAgIGAUp0fCElMA0GCSqGSIb3DQEBDQUAMIGLMQswCQYDVQQG
EwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYD
VQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJh
YXMxHjAcBgNVBAMMFXd3dy5DTkZyb21TdWJqZWN0Lm9yZzAeFw0xNDExMjIwMDEx
MzlaFw0yMjEyMjEwMDExMzlaMIGLMQswCQYDVQQGEwJVUzEOMAwGA1UECAwFVGV4
YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYDVQQKDBVPcGVuU3RhY2sgRXhw
ZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJhYXMxHjAcBgNVBAMMFXd3dy5D
TkZyb21TdWJqZWN0Lm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
ALL1nmbDPUDps84i1sM3rhHrc+Dlu0N/wKQWKZFeiWUtF/pot19V3o0yXDpsg7W5
RkLMTFkZEcnQpyGdpAGjTjzmNXMZw99EzxsmrR3l6hUEISifVbvEuftYZT6jPxM5
ML6WAjFNaBEZPWtZi8CgX5xdjdrDNndwyHob49n7Nc/h1kVqqBqMILabTqC6yEcx
S/B+DugVuuYbEdYYYElQUMfM+mUdULrSqIVl2n5AvvSFjWzWzfgPyp4QKn+f7HVR
T62bh/XjQ88n1tMYNAEqixRZTPgqY1LFl9VJVgRp9fdL6ttMurOR3C0STJ5qCdKB
L7LrpbY4u8dEragRC6YAyI8CAwEAAaOCAyowggMmMAwGA1UdEwEB/wQCMAAwDgYD
VR0PAQH/BAQDAgO4MIIDBAYDVR0RBIIC+zCCAveCGXd3dy5ob3N0RnJvbV9kTlNO
YW1lMS5jb22CGXd3dy5ob3N0RnJvbV9kTlNOYW1lMi5jb22CGXd3dy5ob3N0RnJv
bV9kTlNOYW1lMy5jb22BEW5vb25lQG5vd2hlcmUub3JnpIGPMIGMMQswCQYDVQQG
EwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYD
VQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJh
YXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lMS5vcmekgY8wgYwxCzAJBgNV
BAYTAlVTMQ4wDAYDVQQIDAVUZXhhczEUMBIGA1UEBwwLU2FuIEFudG9uaW8xHjAc
BgNVBAoMFU9wZW5TdGFjayBFeHBlcmltZW50czEWMBQGA1UECwwNTmV1dHJvbiBM
YmFhczEfMB0GA1UEAwwWd3d3LmNuRnJvbUFsdE5hbWUyLm9yZ6SBjzCBjDELMAkG
A1UEBhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25pbzEe
MBwGA1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0cm9u
IExiYWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTMub3JnpIGPMIGMMQsw
CQYDVQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlv
MR4wHAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRy
b24gTGJhYXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lNC5vcmeHBAoBAgOH
EAEjRWeJq83v97PVkeaixICGFmh0dHA6Ly93d3cuZXhhbXBsZS5jb22CGXd3dy5o
b3N0RnJvbV9kTlNOYW1lNC5jb20wDQYJKoZIhvcNAQENBQADggEBAICUCDMhDf0f
cvX5mVnq4Q3+SM/nl03Gse6J0JdpFivS4hl+uZs0TAFYpfEPpAa7KKxD229kbCiQ
kyxf8fzADSl77RQbL6Lxa8K/c66mVNiuVvTHV4r/nDNcRYN9fGArw/Ho7VX+HVQ6
UW1t/uvXeyg695t7kzZmvg0ChD5kS848d2rXu2MhwHwXA8rbuK6gxVY97fbzBNlj
aiPJUAb8lqZMShd+3yVCgMmV0J20u2b5pSdO+LHQ7NfVqURk2pcHD8slfHzXT58q
YB90v0pSVP6mzHGyLxETZZz0nhaH9EjOyFkQI84ORT8Kmd5Y04gSI0LTKKF1eMNE
TyNC+MtsRdA=
MIIGqjCCBZKgAwIBAgIJAIApBg8slSSiMA0GCSqGSIb3DQEBBQUAMIGLMQswCQYD
VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w
HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g
TGJhYXMxHjAcBgNVBAMMFXd3dy5DTkZyb21TdWJqZWN0Lm9yZzAeFw0xNTA1MjEy
MDMzMjNaFw0yNTA1MTgyMDMzMjNaMIGLMQswCQYDVQQGEwJVUzEOMAwGA1UECAwF
VGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4wHAYDVQQKDBVPcGVuU3RhY2sg
RXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24gTGJhYXMxHjAcBgNVBAMMFXd3
dy5DTkZyb21TdWJqZWN0Lm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBALL1nmbDPUDps84i1sM3rhHrc+Dlu0N/wKQWKZFeiWUtF/pot19V3o0yXDps
g7W5RkLMTFkZEcnQpyGdpAGjTjzmNXMZw99EzxsmrR3l6hUEISifVbvEuftYZT6j
PxM5ML6WAjFNaBEZPWtZi8CgX5xdjdrDNndwyHob49n7Nc/h1kVqqBqMILabTqC6
yEcxS/B+DugVuuYbEdYYYElQUMfM+mUdULrSqIVl2n5AvvSFjWzWzfgPyp4QKn+f
7HVRT62bh/XjQ88n1tMYNAEqixRZTPgqY1LFl9VJVgRp9fdL6ttMurOR3C0STJ5q
CdKBL7LrpbY4u8dEragRC6YAyI8CAwEAAaOCAw0wggMJMAkGA1UdEwQCMAAwCwYD
VR0PBAQDAgXgMIIC7QYDVR0RBIIC5DCCAuCCGHd3dy5ob3N0RnJvbUROU05hbWUx
LmNvbYIYd3d3Lmhvc3RGcm9tRE5TTmFtZTIuY29tghh3d3cuaG9zdEZyb21ETlNO
YW1lMy5jb22CGHd3dy5ob3N0RnJvbUROU05hbWU0LmNvbYcECgECA4cQASNFZ4mr
ze/3s9WR5qLEgIYWaHR0cDovL3d3dy5leGFtcGxlLmNvbaSBjzCBjDELMAkGA1UE
BhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25pbzEeMBwG
A1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0cm9uIExi
YWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTEub3JnpIGPMIGMMQswCQYD
VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w
HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g
TGJhYXMxHzAdBgNVBAMMFnd3dy5jbkZyb21BbHROYW1lMi5vcmekgY8wgYwxCzAJ
BgNVBAYTAlVTMQ4wDAYDVQQIDAVUZXhhczEUMBIGA1UEBwwLU2FuIEFudG9uaW8x
HjAcBgNVBAoMFU9wZW5TdGFjayBFeHBlcmltZW50czEWMBQGA1UECwwNTmV1dHJv
biBMYmFhczEfMB0GA1UEAwwWd3d3LmNuRnJvbUFsdE5hbWUzLm9yZ6SBjzCBjDEL
MAkGA1UEBhMCVVMxDjAMBgNVBAgMBVRleGFzMRQwEgYDVQQHDAtTYW4gQW50b25p
bzEeMBwGA1UECgwVT3BlblN0YWNrIEV4cGVyaW1lbnRzMRYwFAYDVQQLDA1OZXV0
cm9uIExiYWFzMR8wHQYDVQQDDBZ3d3cuY25Gcm9tQWx0TmFtZTQub3JnMA0GCSqG
SIb3DQEBBQUAA4IBAQCS6iDn6R3C+qJLZibaqrBSkM9yu5kwRsQ6lQ+DODvVYGWq
eGkkh5o2c6WbJlH44yF280+HvnJcuISD7epPHJN0vUM9+WMtXfEli9avFHgu2JxP
3P0ixK2kaJnqKQkSEdnA/v/eWP1Cd2v6rbKCIo9d2gSP0cnpdtlX9Zk3SzEh0V7s
RjSdfZoAvz0aAnpDHlTerLcz5T2aiRae2wSt/RLA3qDO1Ji05tWvQBmKuepxS6A1
tL4Drm+OCXJwTrE7ClTMCwcrZnLl4tI+Z+X3DV92WQB8ldST/QFjz1hgs/4zrADA
elu2c/X7MR4ObOjhDfaVGQ8kMhYf5hx69qyNDsGi
-----END CERTIFICATE-----
"""
@ -237,11 +237,11 @@ def _get_rsa_numbers(private_key, private_key_passphrase=None):
class TestTLSParseUtils(base.BaseTestCase):
def test_alt_subject_name_parses(self):
hosts = cert_parser.get_host_names(ALT_EXT_CRT)
self.assertEqual('www.CNFromSubject.org', hosts['cn'])
self.assertEqual('www.hostFrom_dNSName1.com', hosts['dns_names'][0])
self.assertEqual('www.hostFrom_dNSName2.com', hosts['dns_names'][1])
self.assertEqual('www.hostFrom_dNSName3.com', hosts['dns_names'][2])
self.assertEqual('www.hostFrom_dNSName4.com', hosts['dns_names'][3])
self.assertEqual('www.cnfromsubject.org', hosts['cn'])
self.assertEqual('www.hostfromdnsname1.com', hosts['dns_names'][0])
self.assertEqual('www.hostfromdnsname2.com', hosts['dns_names'][1])
self.assertEqual('www.hostfromdnsname3.com', hosts['dns_names'][2])
self.assertEqual('www.hostfromdnsname4.com', hosts['dns_names'][3])
def test_x509_parses(self):
self.assertRaises(exceptions.UnreadableCert,
@ -260,19 +260,17 @@ class TestTLSParseUtils(base.BaseTestCase):
self.assertRaises(exceptions.NeedsPassphrase,
cert_parser._read_privatekey,
ENCRYPTED_PKCS8_CRT_KEY)
epkey = cert_parser._read_privatekey(
cert_parser._read_privatekey(
ENCRYPTED_PKCS8_CRT_KEY,
passphrase=ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE)
self.assertTrue(epkey.check())
def test_read_private_key_unicode(self):
self.assertRaises(exceptions.NeedsPassphrase,
cert_parser._read_privatekey,
ENCRYPTED_PKCS8_CRT_KEY)
epkey = cert_parser._read_privatekey(
cert_parser._read_privatekey(
ENCRYPTED_PKCS8_CRT_KEY,
passphrase=u'{0}'.format(ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE))
self.assertTrue(epkey.check())
def test_dump_private_key(self):
self.assertRaises(exceptions.NeedsPassphrase,
@ -303,4 +301,6 @@ class TestTLSParseUtils(base.BaseTestCase):
imds.append(cert_parser._get_x509_from_pem_bytes(x509Pem))
for i in range(0, len(imds)):
self.assertEqual(EXPECTED_IMD_SUBJS[i], imds[i].get_subject().CN)
self.assertEqual(EXPECTED_IMD_SUBJS[i],
imds[i].subject.get_attributes_for_oid(
x509.OID_COMMON_NAME)[0].value)