Add Noop Certificate Manager
So far, Octavia noop drivers were using real certificate managers, which have validated the cerifitates for every certificate required operation, sometimes without any need. Octavia should have a Noop Certificate Manager for faster testing purposes. This patch adds it. Closes-Bug: #2034711 Change-Id: I700c65fb17bad28b2b922e03d9c94c4716de9cbe (cherry picked from commit 7310986de9bf68ed86de90de0501a1bc46945526)
This commit is contained in:
parent
74499a036b
commit
ec36152a03
106
octavia/certificates/manager/noop.py
Normal file
106
octavia/certificates/manager/noop.py
Normal file
@ -0,0 +1,106 @@
|
||||
# Copyright (c) 2023 Red Hat
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import uuid
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from octavia.certificates.common import cert
|
||||
from octavia.certificates.common import local
|
||||
from octavia.certificates.manager import cert_mgr
|
||||
from octavia.common.tls_utils import cert_parser
|
||||
from octavia.tests.common import sample_certs
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NoopCertManager(cert_mgr.CertManager):
|
||||
"""Cert manager implementation for no-op operations
|
||||
|
||||
"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._local_cert = None
|
||||
|
||||
@property
|
||||
def local_cert(self):
|
||||
if self._local_cert is None:
|
||||
self._local_cert = self.store_cert(
|
||||
None,
|
||||
sample_certs.X509_CERT,
|
||||
sample_certs.X509_CERT_KEY_ENCRYPTED,
|
||||
sample_certs.X509_IMDS,
|
||||
private_key_passphrase=sample_certs.X509_CERT_KEY_PASSPHRASE)
|
||||
return self._local_cert
|
||||
|
||||
def store_cert(self, context, certificate, private_key, intermediates=None,
|
||||
private_key_passphrase=None, **kwargs) -> cert.Cert:
|
||||
"""Stores (i.e., registers) a cert with the cert manager.
|
||||
|
||||
This method stores the specified cert to the filesystem and returns
|
||||
a UUID that can be used to retrieve it.
|
||||
|
||||
:param context: Ignored in this implementation
|
||||
:param certificate: PEM encoded TLS certificate
|
||||
:param private_key: private key for the supplied certificate
|
||||
:param intermediates: ordered and concatenated intermediate certs
|
||||
:param private_key_passphrase: optional passphrase for the supplied key
|
||||
|
||||
:returns: the UUID of the stored cert
|
||||
:raises CertificateStorageException: if certificate storage fails
|
||||
"""
|
||||
cert_ref = str(uuid.uuid4())
|
||||
if isinstance(certificate, bytes):
|
||||
certificate = certificate.decode('utf-8')
|
||||
if isinstance(private_key, bytes):
|
||||
private_key = private_key.decode('utf-8')
|
||||
|
||||
LOG.debug('Driver %s no-op, store_cert certificate %s, cert_ref %s',
|
||||
self.__class__.__name__, certificate, cert_ref)
|
||||
|
||||
cert_data = {'certificate': certificate, 'private_key': private_key}
|
||||
if intermediates:
|
||||
if isinstance(intermediates, bytes):
|
||||
intermediates = intermediates.decode('utf-8')
|
||||
cert_data['intermediates'] = list(
|
||||
cert_parser.get_intermediates_pems(intermediates))
|
||||
if private_key_passphrase:
|
||||
if isinstance(private_key_passphrase, bytes):
|
||||
private_key_passphrase = private_key_passphrase.decode('utf-8')
|
||||
cert_data['private_key_passphrase'] = private_key_passphrase
|
||||
|
||||
return local.LocalCert(**cert_data)
|
||||
|
||||
def get_cert(self, context, cert_ref, check_only=True, **kwargs) -> (
|
||||
cert.Cert):
|
||||
LOG.debug('Driver %s no-op, get_cert with cert_ref %s',
|
||||
self.__class__.__name__, cert_ref)
|
||||
return self.local_cert
|
||||
|
||||
def delete_cert(self, context, cert_ref, resource_ref, service_name=None):
|
||||
LOG.debug('Driver %s no-op, delete_cert with cert_ref %s',
|
||||
self.__class__.__name__, cert_ref)
|
||||
|
||||
def set_acls(self, context, cert_ref):
|
||||
LOG.debug('Driver %s no-op, set_acls with cert_ref %s',
|
||||
self.__class__.__name__, cert_ref)
|
||||
|
||||
def unset_acls(self, context, cert_ref):
|
||||
LOG.debug('Driver %s no-op, unset_acls with cert_ref %s',
|
||||
self.__class__.__name__, cert_ref)
|
||||
|
||||
def get_secret(self, context, secret_ref) -> cert.Cert:
|
||||
LOG.debug('Driver %s no-op, get_secret with secret_ref %s',
|
||||
self.__class__.__name__, secret_ref)
|
||||
return self.local_cert
|
53
octavia/tests/unit/certificates/manager/test_noop.py
Normal file
53
octavia/tests/unit/certificates/manager/test_noop.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright 2023 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.certificates.common import cert
|
||||
from octavia.certificates.manager import noop as noop_cert_mgr
|
||||
from octavia.tests.common import sample_certs
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestNoopManager(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.manager = noop_cert_mgr.NoopCertManager()
|
||||
|
||||
def test_store_cert(self):
|
||||
certificate = self.manager.store_cert(
|
||||
None,
|
||||
sample_certs.X509_CERT,
|
||||
sample_certs.X509_CERT_KEY_ENCRYPTED,
|
||||
sample_certs.X509_IMDS,
|
||||
private_key_passphrase=sample_certs.X509_CERT_KEY_PASSPHRASE)
|
||||
self.assertIsNotNone(certificate)
|
||||
self.assertIsInstance(certificate, cert.Cert)
|
||||
|
||||
def test_get_cert(self):
|
||||
cert_ref = uuidutils.generate_uuid()
|
||||
certificate = self.manager.get_cert(
|
||||
context=None,
|
||||
cert_ref=cert_ref)
|
||||
self.assertIsNotNone(certificate)
|
||||
self.assertIsInstance(certificate, cert.Cert)
|
||||
|
||||
def test_get_secret(self):
|
||||
secret_ref = uuidutils.generate_uuid()
|
||||
secret = self.manager.get_secret(
|
||||
context=None,
|
||||
secret_ref=secret_ref)
|
||||
self.assertIsNotNone(secret)
|
||||
self.assertIsInstance(secret, cert.Cert)
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
Noop certificate manager was added. Now any Octavia certificate operations using noop drivers will be faster (as they won't be validated).
|
@ -93,6 +93,7 @@ octavia.cert_manager =
|
||||
local_cert_manager = octavia.certificates.manager.local:LocalCertManager
|
||||
barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager
|
||||
castellan_cert_manager = octavia.certificates.manager.castellan_mgr:CastellanCertManager
|
||||
noop_cert_manager = octavia.certificates.manager.noop:NoopCertManager
|
||||
octavia.barbican_auth =
|
||||
barbican_acl_auth = octavia.certificates.common.auth.barbican_acl:BarbicanACLAuth
|
||||
octavia.plugins =
|
||||
|
Loading…
x
Reference in New Issue
Block a user