Merge "Use cryptography to load PKCS12 certificates"

This commit is contained in:
Zuul 2023-12-19 15:03:19 +00:00 committed by Gerrit Code Review
commit 240b0066a1
5 changed files with 44 additions and 35 deletions

View File

@ -18,7 +18,7 @@ Common classes for pkcs12 based certificate handling
"""
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto
from cryptography.hazmat.primitives.serialization import pkcs12
from octavia.certificates.common import cert
from octavia.common import exceptions
@ -28,21 +28,21 @@ class PKCS12Cert(cert.Cert):
"""Representation of a Cert for local storage."""
def __init__(self, certbag):
try:
p12 = crypto.load_pkcs12(certbag)
except crypto.Error as e:
p12 = pkcs12.load_pkcs12(certbag, None)
except (TypeError, ValueError) as e:
raise exceptions.UnreadablePKCS12(error=str(e))
self.certificate = p12.get_certificate()
self.intermediates = p12.get_ca_certificates()
self.private_key = p12.get_privatekey()
self.certificate = p12.cert
self.intermediates = p12.additional_certs
self.private_key = p12.key
def get_certificate(self):
return self.certificate.to_cryptography().public_bytes(
return self.certificate.certificate.public_bytes(
encoding=serialization.Encoding.PEM).strip()
def get_intermediates(self):
if self.intermediates:
int_data = [
ic.to_cryptography().public_bytes(
ic.certificate.public_bytes(
encoding=serialization.Encoding.PEM).strip()
for ic in self.intermediates
]
@ -50,7 +50,7 @@ class PKCS12Cert(cert.Cert):
return None
def get_private_key(self):
return self.private_key.to_cryptography_key().private_bytes(
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()).strip()

View File

@ -17,8 +17,9 @@
"""
Cert manager implementation for Barbican using a single PKCS12 secret
"""
from OpenSSL import crypto
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
from cryptography import x509
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
@ -64,25 +65,29 @@ class BarbicanCertManager(cert_mgr.CertManager):
connection = self.auth.get_barbican_client(context.project_id)
LOG.info("Storing certificate secret '%s' in Barbican.", name)
p12 = crypto.PKCS12()
p12.set_friendlyname(encodeutils.to_utf8(name))
x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
p12.set_certificate(x509_cert)
x509_pk = crypto.load_privatekey(crypto.FILETYPE_PEM, private_key)
p12.set_privatekey(x509_pk)
if intermediates:
cert_ints = list(cert_parser.get_intermediates_pems(intermediates))
x509_ints = [
crypto.load_certificate(crypto.FILETYPE_PEM, ci)
for ci in cert_ints]
p12.set_ca_certificates(x509_ints)
if private_key_passphrase:
raise exceptions.CertificateStorageException(
"Passphrase protected PKCS12 certificates are not supported.")
x509_cert = x509.load_pem_x509_certificate(certificate)
x509_pk = serialization.load_pem_private_key(private_key, None)
cas = None
if intermediates:
cert_ints = list(cert_parser.get_intermediates_pems(intermediates))
cas = [
x509.load_pem_x509_certificate(ci)
for ci in cert_ints]
try:
certificate_secret = connection.secrets.create(
payload=p12.export(),
payload=c_pkcs12.serialize_key_and_certificates(
name=encodeutils.safe_encode(name),
key=x509_pk,
cert=x509_cert,
cas=cas,
encryption_algorithm=serialization.NoEncryption()
),
expiration=expiration,
name=name
)

View File

@ -18,7 +18,8 @@ Cert manager implementation for Castellan
"""
from castellan.common.objects import opaque_data
from castellan import key_manager
from OpenSSL import crypto
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
from oslo_config import cfg
from oslo_log import log as logging
@ -41,16 +42,20 @@ class CastellanCertManager(cert_mgr.CertManager):
def store_cert(self, context, certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name="PKCS12 Certificate Bundle"):
p12 = crypto.PKCS12()
p12.set_certificate(certificate)
p12.set_privatekey(private_key)
if intermediates:
p12.set_ca_certificates(intermediates)
if private_key_passphrase:
raise exceptions.CertificateStorageException(
"Passphrases protected PKCS12 certificates are not supported.")
p12_data = opaque_data.OpaqueData(p12.export(), name=name)
p12_data = opaque_data.OpaqueData(
c_pkcs12.serialize_key_and_certificates(
name=None,
key=private_key,
cert=certificate,
cas=intermediates,
encryption_algorithm=serialization.NoEncryption()
),
name=name
)
self.manager.store(context, p12_data)
def get_cert(self, context, cert_ref, resource_ref=None, check_only=False,

View File

@ -15,7 +15,6 @@ from unittest import mock
import uuid
from barbicanclient.v1 import secrets
from OpenSSL import crypto
import octavia.certificates.common.barbican as barbican_common
import octavia.certificates.common.cert as cert
@ -138,10 +137,11 @@ class TestBarbicanManager(base.TestCase):
sorted(data.get_intermediates()))
self.assertIsNone(data.get_private_key_passphrase())
@mock.patch('OpenSSL.crypto.load_pkcs12')
@mock.patch('cryptography.hazmat.primitives.serialization.pkcs12.'
'load_pkcs12')
def test_get_cert_bad_pkcs12(self, mock_load_pkcs12):
mock_load_pkcs12.side_effect = [crypto.Error]
mock_load_pkcs12.side_effect = [ValueError]
# Mock out the client
self.bc.secrets.get.return_value = self.secret_pkcs12

View File

@ -38,7 +38,6 @@ python-barbicanclient>=4.5.2 # Apache-2.0
python-glanceclient>=2.8.0 # Apache-2.0
python-novaclient>=9.1.0 # Apache-2.0
python-cinderclient>=3.3.0 # Apache-2.0
pyOpenSSL>=19.1.0 # Apache-2.0
WSME>=0.8.0 # MIT
Jinja2>=2.10 # BSD License (3 clause)
taskflow>=4.4.0 # Apache-2.0