Add get_passphrase_from_secret utility function
This function outsources Cinder's encryption passphrase handling based on Key Manager secrets. It extends Cinder's existing implementation to handle secret objects of type 'passphrase' additionally, for which the binascii.hexlify conversion is now skipped similar to Nova's implementation of handling qcow2+LUKS images. Original Cinder behavior for 'symmetric' secret objects is retained. Provides the basis for image encryption standardization as per https://specs.openstack.org/openstack/cinder-specs/specs/2024.2/LUKS-image-encryption.html Co-Authored-By: Josephine Seifert <josephine.seifert@cloudandheat.com> Change-Id: I8836fe3c2af5b61ba33a7fe3c8eb5a7f4961c515
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
import abc
|
||||
|
||||
from os_brick import executor
|
||||
from os_brick import utils
|
||||
|
||||
|
||||
class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
|
||||
@@ -47,6 +48,10 @@ class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
return self._key_manager.get(context, self.encryption_key_id)
|
||||
|
||||
def _get_encryption_key_as_passphrase(self, context):
|
||||
key = self._get_key(context)
|
||||
return utils.get_passphrase_from_secret(key)
|
||||
|
||||
@abc.abstractmethod
|
||||
def attach_volume(self, context, **kwargs):
|
||||
"""Hook called immediately prior to attaching a volume to an instance.
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import binascii
|
||||
import os
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
@@ -97,10 +96,6 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _get_passphrase(self, key):
|
||||
"""Convert raw key to string."""
|
||||
return binascii.hexlify(key).decode('utf-8')
|
||||
|
||||
def _open_volume(self, passphrase, **kwargs):
|
||||
"""Open the LUKS partition on the volume using passphrase.
|
||||
|
||||
@@ -143,8 +138,7 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
|
||||
"any existing volumes using this encryptor to the 'luks' "
|
||||
"LuksEncryptor or 'luks2' Luks2Encryptor encryptors as soon as "
|
||||
"possible.")
|
||||
key = self._get_key(context).get_encoded()
|
||||
passphrase = self._get_passphrase(key)
|
||||
passphrase = self._get_encryption_key_as_passphrase(context)
|
||||
|
||||
self._open_volume(passphrase, **kwargs)
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import binascii
|
||||
import os
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
@@ -160,10 +159,6 @@ class LuksEncryptor(base.VolumeEncryptor):
|
||||
root_helper=self._root_helper,
|
||||
attempts=3)
|
||||
|
||||
def _get_passphrase(self, key):
|
||||
"""Convert raw key to string."""
|
||||
return binascii.hexlify(key).decode('utf-8')
|
||||
|
||||
def _open_volume(self, passphrase, **kwargs):
|
||||
"""Opens the LUKS partition on the volume using passphrase.
|
||||
|
||||
@@ -184,8 +179,7 @@ class LuksEncryptor(base.VolumeEncryptor):
|
||||
original symbolic link to refer to the device mounted by dm-crypt.
|
||||
"""
|
||||
|
||||
key = self._get_key(context).get_encoded()
|
||||
passphrase = self._get_passphrase(key)
|
||||
passphrase = self._get_encryption_key_as_passphrase(context)
|
||||
|
||||
try:
|
||||
self._open_volume(passphrase, **kwargs)
|
||||
@@ -229,8 +223,7 @@ class LuksEncryptor(base.VolumeEncryptor):
|
||||
"""Extend an encrypted volume and return the decrypted volume size."""
|
||||
symlink = self.symlink_path
|
||||
LOG.debug('Resizing mapping %s to match underlying device', symlink)
|
||||
key = self._get_key(context).get_encoded()
|
||||
passphrase = self._get_passphrase(key)
|
||||
passphrase = self._get_encryption_key_as_passphrase(context)
|
||||
self._execute('cryptsetup', 'resize', symlink,
|
||||
process_input=passphrase,
|
||||
run_as_root=True, check_exit_code=True,
|
||||
|
||||
@@ -13,14 +13,41 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import binascii
|
||||
from unittest import mock
|
||||
|
||||
from castellan.common import objects as castellan_objects
|
||||
from castellan.tests.unit.key_manager import fake
|
||||
|
||||
from os_brick import encryptors
|
||||
from os_brick.tests import base
|
||||
|
||||
|
||||
def fake__get_key_symmetric(passphrase):
|
||||
raw = bytes(binascii.unhexlify(passphrase))
|
||||
symmetric_key = castellan_objects.symmetric_key.SymmetricKey(
|
||||
'AES', len(raw) * 8, raw)
|
||||
return symmetric_key
|
||||
|
||||
|
||||
def fake__get_key_passphrase(passphrase):
|
||||
raw = passphrase.encode('utf-8')
|
||||
passphrase_key = castellan_objects.passphrase.Passphrase(raw)
|
||||
return passphrase_key
|
||||
|
||||
|
||||
class BaseVolumeEncryptor(encryptors.base.VolumeEncryptor):
|
||||
|
||||
def attach_volume(self, context, **kwargs):
|
||||
pass
|
||||
|
||||
def detach_volume(self, **kwargs):
|
||||
pass
|
||||
|
||||
def extend_volume(self, context, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class VolumeEncryptorTestCase(base.TestCase):
|
||||
def _create(self):
|
||||
pass
|
||||
@@ -85,6 +112,45 @@ class BaseEncryptorTestCase(VolumeEncryptorTestCase):
|
||||
self._test_get_encryptor('nova.volume.encryptors.nop.NoopEncryptor',
|
||||
encryptors.nop.NoOpEncryptor)
|
||||
|
||||
@mock.patch('os_brick.encryptors.base.VolumeEncryptor._get_key')
|
||||
def test__get_encryption_key_as_passphrase_hexlify(self, mock_key):
|
||||
"""Test passphrase retrieval for secret type 'symmetric'.
|
||||
|
||||
This should hexlify the secret in _get_encryption_key_as_passphrase.
|
||||
"""
|
||||
base_enc = BaseVolumeEncryptor(
|
||||
root_helper=self.root_helper,
|
||||
connection_info=self.connection_info,
|
||||
keymgr=self.keymgr
|
||||
)
|
||||
fake_key_plain = 'passphrase-in-clear-text'
|
||||
fake_key_hexlified = binascii.hexlify(fake_key_plain.encode('utf-8'))
|
||||
|
||||
mock_key.return_value = fake__get_key_symmetric(fake_key_hexlified)
|
||||
passphrase = base_enc._get_encryption_key_as_passphrase(
|
||||
mock.sentinel.context)
|
||||
mock_key.assert_called_once_with(mock.sentinel.context)
|
||||
self.assertEqual(passphrase, fake_key_hexlified.decode('utf-8'))
|
||||
|
||||
@mock.patch('os_brick.encryptors.base.VolumeEncryptor._get_key')
|
||||
def test__get_encryption_key_as_passphrase(self, mock_key):
|
||||
"""Test passphrase retrieval for secret type 'passphrase'.
|
||||
|
||||
This should skip the hexlify step in _get_encryption_key_as_passphrase.
|
||||
"""
|
||||
base_enc = BaseVolumeEncryptor(
|
||||
root_helper=self.root_helper,
|
||||
connection_info=self.connection_info,
|
||||
keymgr=self.keymgr
|
||||
)
|
||||
fake_key_plain = 'passphrase-in-clear-text'
|
||||
|
||||
mock_key.return_value = fake__get_key_passphrase(fake_key_plain)
|
||||
passphrase = base_enc._get_encryption_key_as_passphrase(
|
||||
mock.sentinel.context)
|
||||
mock_key.assert_called_once_with(mock.sentinel.context)
|
||||
self.assertEqual(passphrase, fake_key_plain)
|
||||
|
||||
def test_get_error_encryptors(self):
|
||||
encryption = {'control_location': 'front-end',
|
||||
'provider': 'ErrorEncryptor'}
|
||||
|
||||
@@ -254,17 +254,13 @@ class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
||||
|
||||
@mock.patch('os_brick.utils.get_device_size')
|
||||
@mock.patch.object(luks.LuksEncryptor, '_execute')
|
||||
@mock.patch.object(luks.LuksEncryptor, '_get_passphrase')
|
||||
@mock.patch.object(luks.LuksEncryptor, '_get_key')
|
||||
def test_extend_volume(self, mock_key, mock_pass, mock_exec, mock_size):
|
||||
@mock.patch.object(luks.LuksEncryptor, '_get_encryption_key_as_passphrase')
|
||||
def test_extend_volume(self, mock_pass, mock_exec, mock_size):
|
||||
encryptor = self.encryptor
|
||||
res = encryptor.extend_volume(mock.sentinel.context)
|
||||
self.assertEqual(mock_size.return_value, res)
|
||||
|
||||
mock_key.assert_called_once_with(mock.sentinel.context)
|
||||
mock_key.return_value.get_encoded.assert_called_once_with()
|
||||
key = mock_key.return_value.get_encoded.return_value
|
||||
mock_pass.assert_called_once_with(key)
|
||||
mock_pass.assert_called_once_with(mock.sentinel.context)
|
||||
mock_exec.assert_called_once_with(
|
||||
'cryptsetup', 'resize', encryptor.dev_path,
|
||||
process_input=mock_pass.return_value, run_as_root=True,
|
||||
|
||||
@@ -18,6 +18,7 @@ import io
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from castellan.common import objects as castellan_objects
|
||||
import ddt
|
||||
|
||||
from os_brick import exception
|
||||
@@ -108,6 +109,35 @@ class TestUtils(base.TestCase):
|
||||
'dd', 'if=/dev/fake', 'of=/dev/null', 'count=1',
|
||||
run_as_root=True, root_helper=mock_execute._root_helper)
|
||||
|
||||
@mock.patch('binascii.hexlify')
|
||||
@ddt.data(
|
||||
castellan_objects.passphrase.Passphrase(b'test-passphrase'),
|
||||
castellan_objects.symmetric_key.SymmetricKey('AES',
|
||||
mock.sentinel.bitlength,
|
||||
mock.sentinel.key),
|
||||
castellan_objects.opaque_data.OpaqueData(mock.sentinel.key),
|
||||
castellan_objects.private_key.PrivateKey('RSA',
|
||||
mock.sentinel.bitlength,
|
||||
mock.sentinel.key),
|
||||
castellan_objects.public_key.PublicKey('RSA',
|
||||
mock.sentinel.bitlength,
|
||||
mock.sentinel.key),
|
||||
castellan_objects.x_509.X509(mock.sentinel.key)
|
||||
)
|
||||
def test_get_passphrase_from_secret(self, secret, mock_hexlify):
|
||||
"""Test proper passphrase processing of different secret types."""
|
||||
if secret.managed_type() == 'passphrase':
|
||||
passphrase = utils.get_passphrase_from_secret(secret)
|
||||
mock_hexlify.assert_not_called()
|
||||
self.assertEqual('test-passphrase', passphrase)
|
||||
else:
|
||||
hexlified_bytes = mock.MagicMock()
|
||||
hexlified_bytes.decode.return_value = mock.sentinel.passphrase
|
||||
mock_hexlify.return_value = hexlified_bytes
|
||||
passphrase = utils.get_passphrase_from_secret(secret)
|
||||
mock_hexlify.assert_called_once_with(mock.sentinel.key)
|
||||
self.assertEqual(mock.sentinel.passphrase, passphrase)
|
||||
|
||||
|
||||
class TestRetryDecorator(base.TestCase):
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import binascii
|
||||
import functools
|
||||
import inspect
|
||||
import logging as py_logging
|
||||
@@ -456,6 +457,41 @@ def check_valid_device(executor: executor.Executor, path: str) -> bool:
|
||||
return info is not None
|
||||
|
||||
|
||||
def get_passphrase_from_secret(key) -> str:
|
||||
"""Convert encryption key retrieved from the Key Manager into a passphrase.
|
||||
|
||||
If the secret type is 'passphrase', assume that the key is already in
|
||||
a suitable string format and simply return it.
|
||||
In any other case, assume a binary key that needs to be converted into
|
||||
an ASCII representation using binascii.hexlify().
|
||||
|
||||
Cinder uses 'symmetric' in conjunction with binascii.hexlify() to
|
||||
handle encryption keys for its own volumes and resulting volume images.
|
||||
Nova uses the 'passphrase' type instead for its qcow2+LUKS images which
|
||||
are directly passed to LUKS as passphrase input. User-defined Glance
|
||||
images may reference secrets of any type (defaulting to 'opaque') which
|
||||
we optimistically assume to represent binary keys too (unless their
|
||||
type is 'passphrase' explicitly).
|
||||
|
||||
:param key: Key Manager Secret containing the encryption key
|
||||
:type key: castellan.common.objects.managed_object.ManagedObject
|
||||
:return: passphrase
|
||||
:rtype: str
|
||||
"""
|
||||
if key.managed_type() == 'passphrase':
|
||||
LOG.debug(
|
||||
"os_brick.utils.get_passphrase_from_secret: the secret is of type "
|
||||
"passphrase and will be used without conversion"
|
||||
)
|
||||
return key.get_encoded().decode('utf-8')
|
||||
else:
|
||||
LOG.debug(
|
||||
"os_brick.utils.get_passphrase_from_secret: the secret is not of "
|
||||
"type passphrase and will be converted using hex representation"
|
||||
)
|
||||
return binascii.hexlify(key.get_encoded()).decode('utf-8')
|
||||
|
||||
|
||||
class Anything(object):
|
||||
"""Object equal to everything."""
|
||||
def __eq__(self, other):
|
||||
|
||||
Reference in New Issue
Block a user