Merge "Support rotating server_certs_key_passphrase key"

This commit is contained in:
Zuul 2025-05-07 11:17:42 +00:00 committed by Gerrit Code Review
commit df8fdf6b44
14 changed files with 113 additions and 42 deletions

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography import fernet
from jsonschema import exceptions as js_exceptions
from jsonschema import validate
@ -71,8 +70,7 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
topic=consts.TOPIC_AMPHORA_V2, version="2.0", fanout=False)
self.client = rpc.get_client(self.target)
self.repositories = repositories.Repositories()
key = utils.get_compatible_server_certs_key_passphrase()
self.fernet = fernet.Fernet(key)
self.fernet = utils.get_server_certs_key_passphrases_fernet()
def _validate_pool_algorithm(self, pool):
if pool.lb_algorithm not in AMPHORA_SUPPORTED_LB_ALGORITHMS:

View File

@ -19,6 +19,7 @@ Common classes for local filesystem certificate handling
import os
from oslo_config import cfg
from oslo_config import types as cfg_types
from octavia.certificates.common import cert
@ -37,6 +38,22 @@ TLS_STORAGE_DEFAULT = os.environ.get(
'OS_OCTAVIA_TLS_STORAGE', '/var/lib/octavia/certificates/'
)
class FernetKeyOpt:
regex_pattern = r'^[A-Za-z0-9\-_=]{32}$'
def __init__(self, value: str):
string_type = cfg_types.String(
choices=None, regex=self.regex_pattern)
self.value = string_type(value)
def __repr__(self):
return self.value.__repr__()
def __str__(self):
return self.value.__str__()
certgen_opts = [
cfg.StrOpt('ca_certificate',
default=TLS_CERT_DEFAULT,
@ -51,15 +68,17 @@ certgen_opts = [
help='Passphrase for the Private Key. Defaults'
' to env[OS_OCTAVIA_CA_KEY_PASS] or None.',
secret=True),
cfg.StrOpt('server_certs_key_passphrase',
default=TLS_PASS_AMPS_DEFAULT,
help='Passphrase for encrypting Amphora Certificates and '
'Private Keys. Must be 32, base64(url) compatible, '
'characters long. Defaults to env[TLS_PASS_AMPS_DEFAULT] '
'or insecure-key-do-not-use-this-key',
regex=r'^[A-Za-z0-9\-_=]{32}$',
required=True,
secret=True),
cfg.ListOpt('server_certs_key_passphrase',
default=[TLS_PASS_AMPS_DEFAULT],
item_type=FernetKeyOpt,
help='List of passphrase for encrypting Amphora Certificates '
'and Private Keys, first in list is used for encryption while '
'all other keys is used to decrypt previously encrypted data. '
'Each key must be 32, base64(url) compatible, characters long.'
' Defaults to env[TLS_PASS_AMPS_DEFAULT] or '
'a list with default key insecure-key-do-not-use-this-key',
required=True,
secret=True),
cfg.StrOpt('signing_digest',
default=TLS_DIGEST_DEFAULT,
help='Certificate signing digest. Defaults'
@ -80,6 +99,7 @@ certmgr_opts = [
class LocalCert(cert.Cert):
"""Representation of a Cert for local storage."""
def __init__(self, certificate, private_key, intermediates=None,
private_key_passphrase=None):
self.certificate = certificate

View File

@ -25,6 +25,7 @@ import re
import socket
import typing
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
@ -131,11 +132,24 @@ def get_compatible_value(value):
return value
def get_compatible_server_certs_key_passphrase():
key = CONF.certificates.server_certs_key_passphrase
if isinstance(key, str):
key = key.encode('utf-8')
return base64.urlsafe_b64encode(key)
def _get_compatible_server_certs_key_passphrases():
key_opts = CONF.certificates.server_certs_key_passphrase
keys = []
for key_opt in key_opts:
key = str(key_opt)
if isinstance(key, str):
key = key.encode('utf-8')
keys.append(
base64.urlsafe_b64encode(key))
return keys
def get_server_certs_key_passphrases_fernet() -> fernet.MultiFernet:
"""Get a cryptography.MultiFernet with loaded keys."""
keys = [
fernet.Fernet(x) for x in
_get_compatible_server_certs_key_passphrases()]
return fernet.MultiFernet(keys)
def subnet_ip_availability(nw_ip_avail, subnet_id, req_num_ips):
@ -178,6 +192,7 @@ class exception_logger:
any occurred
"""
def __init__(self, logger=None):
self.logger = logger

View File

@ -17,7 +17,6 @@ import copy
from typing import List
from typing import Optional
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
@ -458,8 +457,7 @@ class AmphoraCertUpload(BaseAmphoraTask):
def execute(self, amphora, server_pem):
"""Execute cert_update_amphora routine."""
LOG.debug("Upload cert in amphora REST driver")
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
session = db_apis.get_session()
with session.begin():
db_amp = self.amphora_repo.get(session,

View File

@ -13,7 +13,6 @@
# under the License.
#
from cryptography import fernet
from oslo_config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
@ -45,8 +44,7 @@ class GenerateServerPEMTask(BaseCertTask):
cert = self.cert_generator.generate_cert_key_pair(
cn=amphora_id,
validity=CONF.certificates.cert_validity_time)
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
# storing in db requires conversion bytes to string
# (required for python3)

View File

@ -15,7 +15,6 @@
import time
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
@ -186,8 +185,7 @@ class CertComputeCreate(ComputeCreate):
encoding='utf-8') as client_ca:
ca = client_ca.read()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
config_drive_files = {
'/etc/octavia/certs/server.pem': fer.decrypt(
server_pem.encode("utf-8")).decode("utf-8"),

View File

@ -13,7 +13,6 @@
# under the License.
#
from cryptography import fernet
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from oslo_db import exception as odb_exceptions
@ -1047,8 +1046,7 @@ class UpdateAmphoraDBCertExpiration(BaseDatabaseTask):
LOG.debug("Update DB cert expiry date of amphora id: %s", amphora_id)
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
cert_expiration = cert_parser.get_cert_expiration(
fer.decrypt(server_pem.encode("utf-8")))
LOG.debug("Certificate expiration date is %s ", cert_expiration)

View File

@ -888,7 +888,7 @@ class TestAmphoraDriver(base.TestRpc):
self.amp_driver.validate_availability_zone,
'bogus')
@mock.patch('cryptography.fernet.Fernet')
@mock.patch('cryptography.fernet.MultiFernet')
def test_encrypt_listener_dict(self, mock_fernet):
mock_fern = mock.MagicMock()
mock_fernet.return_value = mock_fern

View File

@ -13,7 +13,9 @@
# under the License.
from unittest import mock
from cryptography import fernet
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from oslo_utils import uuidutils
from octavia.common import constants
@ -178,3 +180,42 @@ class TestConfig(base.TestCase):
result = utils.map_protocol_to_nftable_protocol(
{constants.PROTOCOL: lib_consts.PROTOCOL_PROMETHEUS})
self.assertEqual({constants.PROTOCOL: lib_consts.PROTOCOL_TCP}, result)
def test_rotate_server_certs_key_passphrase(self):
"""Test rotate server_certs_key_passphrase."""
# Use one key (default) and encrypt/decrypt it
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-key'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
data1 = 'some data one'
enc1 = fer.encrypt(data1.encode('utf-8'))
self.assertEqual(
data1, fer.decrypt(enc1).decode('utf-8'))
# Use two keys, first key is new and used for encrypting
# and default key can still be used for decryption
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-ke2',
'insecure-key-do-not-use-this-key'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
data2 = 'some other data'
enc2 = fer.encrypt(data2.encode('utf-8'))
self.assertEqual(
data2, fer.decrypt(enc2).decode('utf-8'))
self.assertEqual(
data1, fer.decrypt(enc1).decode('utf-8'))
# Remove first key and we should only be able to
# decrypt the newest data
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-ke2'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
self.assertEqual(
data2, fer.decrypt(enc2).decode('utf-8'))
self.assertRaises(fernet.InvalidToken, fer.decrypt, enc1)

View File

@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
@ -840,9 +839,8 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_get,
mock_amphora_repo_update):
key = utils.get_compatible_server_certs_key_passphrase()
mock_amphora_repo_get.return_value = _db_amphora_mock
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
pem_file_mock = fer.encrypt(
utils.get_compatible_value('test-pem-file')).decode('utf-8')
amphora_cert_upload_mock = amphora_driver_tasks.AmphoraCertUpload()

View File

@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from octavia.certificates.common import local
@ -29,8 +28,7 @@ class TestCertTasks(base.TestCase):
@mock.patch('stevedore.driver.DriverManager.driver')
def test_execute(self, mock_driver):
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
dummy_cert = local.LocalCert(
utils.get_compatible_value('test_cert'),
utils.get_compatible_value('test_key'))

View File

@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
@ -375,8 +374,7 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_cert(self, mock_driver, mock_ud_conf,
mock_conf, mock_jinja, mock_log_cfg):
createcompute = compute_tasks.CertComputeCreate()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
mock_log_cfg.return_value = 'FAKE CFG'
mock_driver.build.return_value = COMPUTE_ID

View File

@ -16,7 +16,6 @@ import copy
import random
from unittest import mock
from cryptography import fernet
from oslo_db import exception as odb_exceptions
from oslo_utils import uuidutils
from sqlalchemy.orm import exc
@ -1201,8 +1200,7 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
update_amp_cert = database_tasks.UpdateAmphoraDBCertExpiration()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
_pem_mock = fer.encrypt(
utils.get_compatible_value('test_cert')
).decode('utf-8')

View File

@ -0,0 +1,13 @@
---
features:
- |
Added support for multiple Fernet keys in the ``[certificates]/server_certs_key_passphrase``
configuration option by changing it to a ListOpt. The first key is used for
encryption and other keys is used for decryption adding support for rotating
the passphrase.
upgrade:
- |
The ``[certificates]/server_certs_key_passphrase`` configuration option is
now a ListOpt so multiple keys can be specified, the first key is used for
encryption and other keys is used for decryption adding support for rotating
the passphrase.