Merge "Add get_passphrase_from_secret utility function"

This commit is contained in:
Zuul 2024-08-22 21:07:18 +00:00 committed by Gerrit Code Review
commit c9b91057e7
7 changed files with 143 additions and 23 deletions

@ -17,6 +17,7 @@
import abc
from os_brick import executor
from os_brick import utils
class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
@ -47,6 +48,10 @@ class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
"""
return self._key_manager.get(context, self.encryption_key_id)
def _get_encryption_key_as_passphrase(self, context):
key = self._get_key(context)
return utils.get_passphrase_from_secret(key)
@abc.abstractmethod
def attach_volume(self, context, **kwargs):
"""Hook called immediately prior to attaching a volume to an instance.

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import os
from oslo_concurrency import processutils
@ -97,10 +96,6 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
return False
return True
def _get_passphrase(self, key):
"""Convert raw key to string."""
return binascii.hexlify(key).decode('utf-8')
def _open_volume(self, passphrase, **kwargs):
"""Open the LUKS partition on the volume using passphrase.
@ -143,8 +138,7 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
"any existing volumes using this encryptor to the 'luks' "
"LuksEncryptor or 'luks2' Luks2Encryptor encryptors as soon as "
"possible.")
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
passphrase = self._get_encryption_key_as_passphrase(context)
self._open_volume(passphrase, **kwargs)

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import os
from oslo_concurrency import processutils
@ -160,10 +159,6 @@ class LuksEncryptor(base.VolumeEncryptor):
root_helper=self._root_helper,
attempts=3)
def _get_passphrase(self, key):
"""Convert raw key to string."""
return binascii.hexlify(key).decode('utf-8')
def _open_volume(self, passphrase, **kwargs):
"""Opens the LUKS partition on the volume using passphrase.
@ -184,8 +179,7 @@ class LuksEncryptor(base.VolumeEncryptor):
original symbolic link to refer to the device mounted by dm-crypt.
"""
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
passphrase = self._get_encryption_key_as_passphrase(context)
try:
self._open_volume(passphrase, **kwargs)
@ -229,8 +223,7 @@ class LuksEncryptor(base.VolumeEncryptor):
"""Extend an encrypted volume and return the decrypted volume size."""
symlink = self.symlink_path
LOG.debug('Resizing mapping %s to match underlying device', symlink)
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
passphrase = self._get_encryption_key_as_passphrase(context)
self._execute('cryptsetup', 'resize', symlink,
process_input=passphrase,
run_as_root=True, check_exit_code=True,

@ -13,14 +13,41 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from unittest import mock
from castellan.common import objects as castellan_objects
from castellan.tests.unit.key_manager import fake
from os_brick import encryptors
from os_brick.tests import base
def fake__get_key_symmetric(passphrase):
raw = bytes(binascii.unhexlify(passphrase))
symmetric_key = castellan_objects.symmetric_key.SymmetricKey(
'AES', len(raw) * 8, raw)
return symmetric_key
def fake__get_key_passphrase(passphrase):
raw = passphrase.encode('utf-8')
passphrase_key = castellan_objects.passphrase.Passphrase(raw)
return passphrase_key
class BaseVolumeEncryptor(encryptors.base.VolumeEncryptor):
def attach_volume(self, context, **kwargs):
pass
def detach_volume(self, **kwargs):
pass
def extend_volume(self, context, **kwargs):
pass
class VolumeEncryptorTestCase(base.TestCase):
def _create(self):
pass
@ -85,6 +112,45 @@ class BaseEncryptorTestCase(VolumeEncryptorTestCase):
self._test_get_encryptor('nova.volume.encryptors.nop.NoopEncryptor',
encryptors.nop.NoOpEncryptor)
@mock.patch('os_brick.encryptors.base.VolumeEncryptor._get_key')
def test__get_encryption_key_as_passphrase_hexlify(self, mock_key):
"""Test passphrase retrieval for secret type 'symmetric'.
This should hexlify the secret in _get_encryption_key_as_passphrase.
"""
base_enc = BaseVolumeEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr
)
fake_key_plain = 'passphrase-in-clear-text'
fake_key_hexlified = binascii.hexlify(fake_key_plain.encode('utf-8'))
mock_key.return_value = fake__get_key_symmetric(fake_key_hexlified)
passphrase = base_enc._get_encryption_key_as_passphrase(
mock.sentinel.context)
mock_key.assert_called_once_with(mock.sentinel.context)
self.assertEqual(passphrase, fake_key_hexlified.decode('utf-8'))
@mock.patch('os_brick.encryptors.base.VolumeEncryptor._get_key')
def test__get_encryption_key_as_passphrase(self, mock_key):
"""Test passphrase retrieval for secret type 'passphrase'.
This should skip the hexlify step in _get_encryption_key_as_passphrase.
"""
base_enc = BaseVolumeEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr
)
fake_key_plain = 'passphrase-in-clear-text'
mock_key.return_value = fake__get_key_passphrase(fake_key_plain)
passphrase = base_enc._get_encryption_key_as_passphrase(
mock.sentinel.context)
mock_key.assert_called_once_with(mock.sentinel.context)
self.assertEqual(passphrase, fake_key_plain)
def test_get_error_encryptors(self):
encryption = {'control_location': 'front-end',
'provider': 'ErrorEncryptor'}

@ -254,17 +254,13 @@ class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
@mock.patch('os_brick.utils.get_device_size')
@mock.patch.object(luks.LuksEncryptor, '_execute')
@mock.patch.object(luks.LuksEncryptor, '_get_passphrase')
@mock.patch.object(luks.LuksEncryptor, '_get_key')
def test_extend_volume(self, mock_key, mock_pass, mock_exec, mock_size):
@mock.patch.object(luks.LuksEncryptor, '_get_encryption_key_as_passphrase')
def test_extend_volume(self, mock_pass, mock_exec, mock_size):
encryptor = self.encryptor
res = encryptor.extend_volume(mock.sentinel.context)
self.assertEqual(mock_size.return_value, res)
mock_key.assert_called_once_with(mock.sentinel.context)
mock_key.return_value.get_encoded.assert_called_once_with()
key = mock_key.return_value.get_encoded.return_value
mock_pass.assert_called_once_with(key)
mock_pass.assert_called_once_with(mock.sentinel.context)
mock_exec.assert_called_once_with(
'cryptsetup', 'resize', encryptor.dev_path,
process_input=mock_pass.return_value, run_as_root=True,

@ -18,6 +18,7 @@ import io
import time
from unittest import mock
from castellan.common import objects as castellan_objects
import ddt
from os_brick import exception
@ -108,6 +109,35 @@ class TestUtils(base.TestCase):
'dd', 'if=/dev/fake', 'of=/dev/null', 'count=1',
run_as_root=True, root_helper=mock_execute._root_helper)
@mock.patch('binascii.hexlify')
@ddt.data(
castellan_objects.passphrase.Passphrase(b'test-passphrase'),
castellan_objects.symmetric_key.SymmetricKey('AES',
mock.sentinel.bitlength,
mock.sentinel.key),
castellan_objects.opaque_data.OpaqueData(mock.sentinel.key),
castellan_objects.private_key.PrivateKey('RSA',
mock.sentinel.bitlength,
mock.sentinel.key),
castellan_objects.public_key.PublicKey('RSA',
mock.sentinel.bitlength,
mock.sentinel.key),
castellan_objects.x_509.X509(mock.sentinel.key)
)
def test_get_passphrase_from_secret(self, secret, mock_hexlify):
"""Test proper passphrase processing of different secret types."""
if secret.managed_type() == 'passphrase':
passphrase = utils.get_passphrase_from_secret(secret)
mock_hexlify.assert_not_called()
self.assertEqual('test-passphrase', passphrase)
else:
hexlified_bytes = mock.MagicMock()
hexlified_bytes.decode.return_value = mock.sentinel.passphrase
mock_hexlify.return_value = hexlified_bytes
passphrase = utils.get_passphrase_from_secret(secret)
mock_hexlify.assert_called_once_with(mock.sentinel.key)
self.assertEqual(mock.sentinel.passphrase, passphrase)
class TestRetryDecorator(base.TestCase):

@ -14,6 +14,7 @@
from __future__ import annotations
import binascii
import functools
import inspect
import logging as py_logging
@ -456,6 +457,41 @@ def check_valid_device(executor: executor.Executor, path: str) -> bool:
return info is not None
def get_passphrase_from_secret(key) -> str:
"""Convert encryption key retrieved from the Key Manager into a passphrase.
If the secret type is 'passphrase', assume that the key is already in
a suitable string format and simply return it.
In any other case, assume a binary key that needs to be converted into
an ASCII representation using binascii.hexlify().
Cinder uses 'symmetric' in conjunction with binascii.hexlify() to
handle encryption keys for its own volumes and resulting volume images.
Nova uses the 'passphrase' type instead for its qcow2+LUKS images which
are directly passed to LUKS as passphrase input. User-defined Glance
images may reference secrets of any type (defaulting to 'opaque') which
we optimistically assume to represent binary keys too (unless their
type is 'passphrase' explicitly).
:param key: Key Manager Secret containing the encryption key
:type key: castellan.common.objects.managed_object.ManagedObject
:return: passphrase
:rtype: str
"""
if key.managed_type() == 'passphrase':
LOG.debug(
"os_brick.utils.get_passphrase_from_secret: the secret is of type "
"passphrase and will be used without conversion"
)
return key.get_encoded().decode('utf-8')
else:
LOG.debug(
"os_brick.utils.get_passphrase_from_secret: the secret is not of "
"type passphrase and will be converted using hex representation"
)
return binascii.hexlify(key.get_encoded()).decode('utf-8')
class Anything(object):
"""Object equal to everything."""
def __eq__(self, other):