libvirt: Configure and teardown ephemeral encryption secrets

This adds configuration of the default ephemeral encryption format and
sets default encryption attributes in the driver block device mapping
when needed. This includes generation of a secret passphrase when one
has not been provided.

Co-Authored-By: melanie witt <melwittt@gmail.com>

Related to blueprint ephemeral-encryption-libvirt

Change-Id: I052441076c677c0fe76a8d9421af70b0ffa1d400
This commit is contained in:
Lee Yarwood 2022-01-27 15:08:16 +00:00 committed by melanie witt
parent 7fa1859576
commit 177c184e40
6 changed files with 835 additions and 27 deletions

View File

@ -48,6 +48,17 @@ Encryption key length in bits.
The bit length of the encryption key to be used to encrypt ephemeral storage.
In XTS mode only half of the bits are used for encryption key.
"""),
cfg.StrOpt(
'default_format',
default='luks',
choices=('luks',),
help="""
Default ephemeral encryption format.
Only 'luks' is supported at this time.
Note that this does not apply to LVM ephemeral storage encryption.
"""),
]

View File

@ -35,6 +35,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography import x509
from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_serialization import base64 as oslo_base64
from oslo_utils.secretutils import md5
import paramiko
@ -44,6 +45,7 @@ from nova import exception
from nova.i18n import _
from nova import objects
from nova import utils
from nova.virt import block_device as driver_block_device
LOG = logging.getLogger(__name__)
@ -54,6 +56,8 @@ _KEYMGR = None
_VTPM_SECRET_BYTE_LENGTH = 384
_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH = 64
def _get_key_manager():
global _KEYMGR
@ -252,3 +256,54 @@ def delete_vtpm_secret(
del instance.system_metadata['vtpm_secret_uuid']
instance.save()
def create_encryption_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
driver_bdm: 'driver_block_device.DriverBlockDevice',
for_detail: ty.Optional[str] = None,
):
# Use oslo.serialization to encode some random data as passphrase
secret = oslo_base64.encode_as_text(
os.urandom(_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH))
if for_detail is None:
for_detail = f"instance {instance.uuid} BDM {driver_bdm['uuid']}"
secret_name = f'Ephemeral encryption secret for {for_detail}'
cmo = passphrase.Passphrase(secret, name=secret_name)
key_mgr = _get_key_manager()
secret_uuid = key_mgr.store(context, cmo)
LOG.debug(
f'Created "{secret_name}" with UUID {secret_uuid}',
instance=instance
)
return secret_uuid, secret
def get_encryption_secret(
context: nova_context.RequestContext,
secret_uuid: str,
) -> ty.Optional[str]:
key_mgr = _get_key_manager()
try:
key = key_mgr.get(context, secret_uuid)
LOG.debug(f"Retrieved secret with UUID {secret_uuid}")
return key.get_encoded()
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug(f"Encryption secret with UUID {secret_uuid} was not found.")
return None
def delete_encryption_secret(
context: nova_context.RequestContext,
instance_uuid: str,
secret_uuid: str,
):
key_mgr = _get_key_manager()
try:
key_mgr.delete(context, secret_uuid)
LOG.debug(f"Deleted secret with UUID {secret_uuid}",
instance_uuid=instance_uuid)
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug(f"Encryption secret with UUID {secret_uuid} already deleted "
"or never existed.", instance_uuid=instance_uuid)

View File

@ -2546,3 +2546,13 @@ class NotSupportedComputeForEvacuateV295(NotSupported):
"instance on destination. To evacuate before upgrades are "
"complete please use an older microversion. Required version "
"for compute %(expected), current version %(currently)s")
class EphemeralEncryptionSecretNotFound(Invalid):
msg_fmt = _(
'Encryption secret %(secret_uuid)s was not found in the key manager')
class EphemeralEncryptionCleanupFailed(NovaException):
msg_fmt = _("Failed to clean up ephemeral encryption secrets: "
"%(error)s")

View File

@ -314,36 +314,74 @@ class VTPMTest(test.NoDBTestCase):
self.ctxt, uuids.vtpm,
)
class EncryptionSecretTest(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.ctxt = nova_context.get_admin_context()
@mock.patch('oslo_serialization.base64.encode_as_text')
@mock.patch('castellan.common.objects.passphrase.Passphrase')
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_vtpm_secret_error(self, mock_get_manager):
"""Check behavior when we fail to retrieve the secret via castellan.
def _test_create_encryption_secret(
self, mock_get_manager, mock_pass, mock_text, for_detail=None):
We should carry on and delete the reference from the instance.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.delete.side_effect = (
castellan_exception.ManagedObjectNotFoundError(uuid=uuids.vtpm))
instance = objects.Instance(uuid=uuids.instance)
driver_bdm = {'uuid': uuids.driver_bdm}
passphrase = mock.Mock()
mock_pass.return_value = passphrase
if for_detail is None:
for_detail = f'instance {instance.uuid} BDM {driver_bdm["uuid"]}'
secret_name = f'Ephemeral encryption secret for {for_detail}'
with mock.patch.object(instance, 'save') as mock_save:
crypto.delete_vtpm_secret(self.ctxt, instance)
secret_uuid, secret = crypto.create_encryption_secret(
self.ctxt, instance, driver_bdm, for_detail=for_detail)
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
mock_save.assert_called_once()
mock_get_manager.assert_called_once()
self.assertEqual(
mock_get_manager.return_value.store.return_value, secret_uuid)
self.assertEqual(secret, mock_text.return_value)
mock_pass.assert_called_once_with(
mock_text.return_value, name=secret_name)
mock_get_manager.return_value.store.assert_called_once_with(
self.ctxt, passphrase)
def test_create_encryption_secret(self):
self._test_create_encryption_secret()
def test_create_encryption_secret_for_image(self):
self._test_create_encryption_secret(for_detail='image fake-image')
@mock.patch.object(crypto, '_get_key_manager')
def test_get_encryption_secret(self, mock_get_manager):
passphrase = FakePassphrase()
mock_get_manager.return_value.get.return_value = passphrase
secret = crypto.get_encryption_secret(self.ctxt, uuids.secret)
self.assertEqual(passphrase.get_encoded(), secret)
mock_get_manager.return_value.get.assert_called_once_with(
self.ctxt, uuids.secret)
@mock.patch.object(crypto, '_get_key_manager')
def test_get_encryption_secret_not_found(self, mock_get_manager):
mock_get_manager.return_value.get.side_effect = (
castellan_exception.ManagedObjectNotFoundError())
secret = crypto.get_encryption_secret(self.ctxt, uuids.secret)
self.assertIsNone(secret)
mock_get_manager.return_value.get.assert_called_once_with(
self.ctxt, uuids.secret)
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_encryption_secret(self, mock_get_manager):
instance = objects.Instance(uuid=uuids.instance)
crypto.delete_encryption_secret(self.ctxt, instance.uuid, uuids.secret)
mock_get_manager.return_value.delete.assert_called_once_with(
self.ctxt, uuids.vtpm,
)
self.ctxt, uuids.secret)
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_vtpm_secret_no_secret(self, mock_get_manager):
"""Check behavior when instance has no associated vTPM secret.
This should be effectively a no-op.
"""
instance = objects.Instance()
instance.system_metadata = {}
crypto.delete_vtpm_secret(self.ctxt, instance)
mock_get_manager.assert_not_called()
def test_delete_encryption_secret_not_found(self, mock_get_manager):
mock_get_manager.return_value.delete.side_effect = (
castellan_exception.ManagedObjectNotFoundError())
instance = objects.Instance(uuid=uuids.instance)
crypto.delete_encryption_secret(self.ctxt, instance.uuid, uuids.secret)
mock_get_manager.return_value.delete.assert_called_once_with(
self.ctxt, uuids.secret)

View File

@ -709,6 +709,7 @@ def _create_test_instance():
'resources': None,
'migration_context': None,
'info_cache': None,
'cleaned': False,
}
@ -30417,3 +30418,519 @@ class AsyncDeviceEventsHandlerTestCase(test.NoDBTestCase):
# the third client timed out
self.assertIsNone(received_event3)
self.assert_handler_clean()
@ddt.ddt
class EphemeralEncryptionTestCase(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.useFixture(nova_fixtures.LibvirtFixture())
self.useFixture(nova_fixtures.LibvirtImageBackendFixture())
self.context = context.get_admin_context()
self.drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.drvr._host = mock.Mock()
self.instance = objects.Instance(**_create_test_instance())
# Avoid a lazy load on orphaned Instance object.
self.instance.root_device_name = '/dev/vda'
self.img_bdm = block_device_obj.BlockDeviceMapping(
id=1, uuid=uuids.image, image_id=uuids.image_id,
device_type='disk', disk_bus='virtio', no_device=False,
device_name='/dev/vda', volume_size=1, source_type='image',
destination_type='local', guest_format=None, encrypted=True,
encryption_format=None, encryption_options=None,
encryption_secret_uuid=None)
self.eph_bdm = block_device_obj.BlockDeviceMapping(
id=2, uuid=uuids.ephemeral, device_type='disk', disk_bus='virtio',
no_device=False, device_name='/dev/vdb', volume_size=1,
source_type='blank', destination_type='local', guest_format=None,
encrypted=True, encryption_format=None, encryption_options=None,
encryption_secret_uuid=None)
self.swap_bdm = block_device_obj.BlockDeviceMapping(
id=3, uuid=uuids.swap, device_type='disk', disk_bus='virtio',
no_device=False, device_name='/dev/vdc', volume_size=1,
source_type='blank', destination_type='local', guest_format='swap',
encrypted=True, encryption_format=None, encryption_options=None,
encryption_secret_uuid=None)
# Mock things we need to assert.
self.mock_save = self.useFixture(fixtures.MockPatch(
'nova.virt.block_device.DriverBlockDevice.save')).mock
self.mock_create_secret = self.useFixture(fixtures.MockPatch(
'nova.crypto.create_encryption_secret')).mock
self.mock_get_secret = self.useFixture(fixtures.MockPatch(
'nova.crypto.get_encryption_secret')).mock
self.mock_delete_secret = self.useFixture(fixtures.MockPatch(
'nova.crypto.delete_encryption_secret')).mock
# Mock things we don't need to assert.
self.useFixture(fixtures.MockPatchObject(
libvirt_driver.LibvirtDriver, 'get_info',
return_value = hardware.InstanceInfo(state=power_state.RUNNING)))
self.useFixture(fixtures.MockPatchObject(
libvirt_driver.LibvirtDriver,
'_register_undefined_instance_details'))
self.useFixture(fixtures.MockPatchObject(
libvirt_driver.LibvirtDriver, '_create_guest_with_network'))
self.useFixture(fixtures.MockPatchObject(
libvirt_driver.LibvirtDriver, '_get_guest_xml'))
def _test_spawn_with_ephemeral_encryption(self, encryption_format=None):
# Test that encryption defaults are set during spawn() if not
# specified.
for bdm in (self.img_bdm, self.eph_bdm, self.swap_bdm):
bdm.encryption_format = encryption_format
expected_format = (
encryption_format or
CONF.ephemeral_storage_encryption.default_format)
self.mock_create_secret.side_effect = [
(uuids.secret1, mock.sentinel.secret1),
(uuids.secret2, mock.sentinel.secret2),
(uuids.secret3, mock.sentinel.secret3),
]
# Simulate finding one already existing libvirt secret. We should
# delete it before re-creating it.
self.drvr._host.find_secret.side_effect = [
None, mock.sentinel.libvirt_secret, None]
block_device_info = driver.get_block_device_info(
self.instance, [self.img_bdm, self.eph_bdm, self.swap_bdm])
image_meta = objects.ImageMeta.from_dict({})
self.drvr.spawn(
self.context, self.instance, image_meta, [], None, {},
block_device_info=block_device_info)
# image - Assert that the format and secret reflect the expected
# values. Initially encryption_format and encryption_secret_uuid were
# None.
driver_bdm = block_device_info['image'][0]
self.assertEqual(expected_format, driver_bdm['encryption_format'])
self.assertEqual(uuids.secret1, driver_bdm['encryption_secret_uuid'])
keymgr_call1 = mock.call(self.context, self.instance, driver_bdm)
libvirt_call1 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret1, uuid=uuids.secret1)
# ephemerals - Assert that the format and secret reflect the expected
# values. Initially encryption_format and encryption_secret_uuid were
# None.
driver_bdm = block_device_info['ephemerals'][0]
self.assertEqual(expected_format, driver_bdm['encryption_format'])
self.assertEqual(uuids.secret2, driver_bdm['encryption_secret_uuid'])
keymgr_call2 = mock.call(self.context, self.instance, driver_bdm)
libvirt_call2 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret2, uuid=uuids.secret2)
# swap - Assert that the format and secret reflect the expected
# values. Initially encryption_format and encryption_secret_uuid were
# None.
driver_bdm = block_device_info['swap']
self.assertEqual(expected_format, driver_bdm['encryption_format'])
self.assertEqual(uuids.secret3, driver_bdm['encryption_secret_uuid'])
keymgr_call3 = mock.call(self.context, self.instance, driver_bdm)
libvirt_call3 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret3, uuid=uuids.secret3)
# Assert that we generated key manager and libvirt secrets.
self.assertEqual(
[keymgr_call1, keymgr_call2, keymgr_call3],
self.mock_create_secret.mock_calls)
self.assertEqual(
[libvirt_call1, libvirt_call2, libvirt_call3],
self.drvr._host.create_secret.mock_calls)
# And we did not retrieve an existing secret.
self.mock_get_secret.assert_not_called()
# Assert that updates were saved to the database.
self.assertEqual(3, self.mock_save.call_count)
# Assert that no key manager secrets were deleted.
self.mock_delete_secret.assert_not_called()
# Assert that one already existing libvirt secret was deleted.
self.drvr._host.delete_secret.assert_called_once_with(
'volume', f'{self.instance.uuid}_{self.eph_bdm.uuid}')
def test_spawn_with_ephemeral_encryption_defaults(self):
# Test that encryption defaults are set during spawn() if not
# specified.
self._test_spawn_with_ephemeral_encryption()
def test_spawn_with_ephemeral_encryption_non_default(self):
# Test that specified encryption attributes are used during spawn().
self._test_spawn_with_ephemeral_encryption(encryption_format='plain')
def test_spawn_with_ephemeral_encryption_secret_not_found(self):
# Test that we fail if any existing key manager secret is not found.
self.mock_get_secret.side_effect = [
mock.sentinel.secret1, mock.sentinel.secret2, None]
# Mock that there are currently no libvirt secrets but there will be
# after they are created.
self.drvr._host.find_secret.side_effect = [
None, None, mock.sentinel.secret1, mock.sentinel.secret2]
# If we're trying to retrieve secrets from the key manager it's because
# the BDMs have existing secret UUIDs.
self.img_bdm.encryption_secret_uuid = uuids.secret1
self.eph_bdm.encryption_secret_uuid = uuids.secret2
self.swap_bdm.encryption_secret_uuid = uuids.secret3
block_device_info = driver.get_block_device_info(
self.instance, [self.img_bdm, self.eph_bdm, self.swap_bdm])
image_meta = objects.ImageMeta.from_dict({})
self.assertRaises(
exception.EphemeralEncryptionSecretNotFound, self.drvr.spawn,
self.context, self.instance, image_meta, [], None, {},
block_device_info=block_device_info)
# Assert that we didn't generate any key manager secrets.
self.mock_create_secret.assert_not_called()
# We should have attempted to retrieve 3 secrets.
call1 = mock.call(self.context, uuids.secret1)
call2 = mock.call(self.context, uuids.secret2)
call3 = mock.call(self.context, uuids.secret3)
self.assertEqual(
[call1, call2, call3], self.mock_get_secret.mock_calls)
# Assert that we generated only two libvirt secrets since we failed
# retrieving the last one. Also assert the contents of
# block_device_info now contain the expected values. Initially
# encryption_format was None and encryption_secret_uuid was
# uuids.secretN. So they should have the same initial values as before
# the fail.
driver_bdm = block_device_info['image'][0]
self.assertIsNone(driver_bdm['encryption_format'])
self.assertEqual(uuids.secret1, driver_bdm['encryption_secret_uuid'])
create_call1 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret1, uuid=uuids.secret1)
delete_call1 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}")
driver_bdm = block_device_info['ephemerals'][0]
self.assertIsNone(driver_bdm['encryption_format'])
self.assertEqual(uuids.secret2, driver_bdm['encryption_secret_uuid'])
create_call2 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret2, uuid=uuids.secret2)
delete_call2 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}")
# Assert that we created and deleted two libvirt secrets.
self.assertEqual(
[create_call1, create_call2],
self.drvr._host.create_secret.mock_calls)
self.assertEqual(
[delete_call1, delete_call2],
self.drvr._host.delete_secret.mock_calls)
# Assert that updates were saved to the database. There should be two
# from before the third secret was not found and three during cleanup
# for setting the driver BDMs attributes to their original values.
self.assertEqual(5, self.mock_save.call_count)
# Assert that no key manager secrets were deleted.
self.mock_delete_secret.assert_not_called()
def test_spawn_with_ephemeral_encryption_secret_create_fail(self):
# Test that we clean up created secrets if one fails.
self.mock_create_secret.side_effect = [
(uuids.secret1, mock.sentinel.secret1),
(uuids.secret2, mock.sentinel.secret2),
test.TestingException('oops!'),
]
# Mock that there are currently no libvirt secrets but there will be
# after they are created.
self.drvr._host.find_secret.side_effect = [
None, None, mock.sentinel.secret1, mock.sentinel.secret2]
block_device_info = driver.get_block_device_info(
self.instance, [self.img_bdm, self.eph_bdm, self.swap_bdm])
image_meta = objects.ImageMeta.from_dict({})
self.assertRaises(
test.TestingException, self.drvr.spawn, self.context,
self.instance, image_meta, [], None, {},
block_device_info=block_device_info)
# Assert that we attempted to generate three key manager secrets and
# only two libvirt secrets since we failed creating the last key
# manager secret. The contents of block_device_info should now contain
# the expected values, they should have the same initial values as
# before the fail.
driver_bdm = block_device_info['image'][0]
self.assertIsNone(driver_bdm['encryption_format'])
self.assertIsNone(driver_bdm['encryption_secret_uuid'])
keymgr_call1 = mock.call(self.context, self.instance, driver_bdm)
libvirt_call1 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret1, uuid=uuids.secret1)
driver_bdm = block_device_info['ephemerals'][0]
self.assertIsNone(driver_bdm['encryption_format'])
self.assertIsNone(driver_bdm['encryption_secret_uuid'])
keymgr_call2 = mock.call(self.context, self.instance, driver_bdm)
libvirt_call2 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}",
password=mock.sentinel.secret2, uuid=uuids.secret2)
driver_bdm = block_device_info['swap']
self.assertIsNone(driver_bdm['encryption_format'])
self.assertIsNone(driver_bdm['encryption_secret_uuid'])
keymgr_call3 = mock.call(self.context, self.instance, driver_bdm)
self.assertEqual(
[keymgr_call1, keymgr_call2, keymgr_call3],
self.mock_create_secret.mock_calls)
self.assertEqual(
[libvirt_call1, libvirt_call2],
self.drvr._host.create_secret.mock_calls)
# Assert that updates were saved to the database. There should be two
# from before the third secret failed to be created and three after the
# created secrets were cleaned up and deleted.
self.assertEqual(5, self.mock_save.call_count)
# Two key manager secrets that were created should have been deleted
# after the fail.
self.assertEqual(2, self.mock_delete_secret.call_count)
call1 = mock.call(self.context, self.instance.uuid, uuids.secret1)
call2 = mock.call(self.context, self.instance.uuid, uuids.secret2)
self.assertEqual([call1, call2], self.mock_delete_secret.mock_calls)
# Two libvirt secrets that were created by the first two successful key
# manager secret retrievals should have been deleted after the fail.
driver_bdm = block_device_info['image'][0]
call1 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}")
driver_bdm = block_device_info['ephemerals'][0]
call2 = mock.call(
'volume', f"{self.instance.uuid}_{driver_bdm['uuid']}")
self.assertEqual(
[call1, call2], self.drvr._host.delete_secret.mock_calls)
@mock.patch('nova.virt.libvirt.driver.LOG.exception')
def test_spawn_with_ephemeral_encryption_cleanup_fails(self, mock_log_exc):
# Mock that two key manager secret creates succeed and the third fails.
self.mock_create_secret.side_effect = [
(uuids.secret1, mock.sentinel.secret1),
(uuids.secret2, mock.sentinel.secret2),
test.TestingException('oops!'),
]
# Mock that there are currently no libvirt secrets but there will be
# after they are created.
self.drvr._host.find_secret.side_effect = [
None, None, mock.sentinel.secret1, mock.sentinel.secret2]
# Mock that the first key manager key delete during cleanup fails and
# the second succeeds.
self.mock_delete_secret.side_effect = [
test.TestingException('key manager fail'), None]
# Mock that the first libvirt secret delete during cleanup succeeds and
# the second fails.
self.drvr._host.delete_secret.side_effect = [
None, test.TestingException('libvirt fail')]
block_device_info = driver.get_block_device_info(
self.instance, [self.img_bdm, self.eph_bdm, self.swap_bdm])
image_meta = objects.ImageMeta.from_dict({})
self.assertRaises(
test.TestingException, self.drvr.spawn, self.context,
self.instance, image_meta, [], None, {},
block_device_info=block_device_info)
call1 = mock.call(
f'Failed to delete encryption secret {uuids.secret1} '
'in the key manager', instance=self.instance)
call2 = mock.call(
'Failed to delete libvirt secret '
f'{self.instance.uuid}_{self.eph_bdm.uuid}',
instance=self.instance)
self.assertEqual([call1, call2], mock_log_exc.mock_calls)
def test_spawn_with_ephemeral_encryption_mix_with_without_secrets(self):
# Test that we fail fast if we detect a mix of driver BDMs that have
# encryption_secret_uuid set and not set (None). It is not a valid
# state.
img_bdm = {'encrypted': True, 'encryption_secret_uuid': uuids.secret1}
eph_bdm = {'encrypted': True, 'encryption_secret_uuid': None}
swap_bdm = {'encrypted': True, 'encryption_secret_uuid': uuids.secret2}
block_device_info = {
'image': [img_bdm],
'ephemerals': [eph_bdm],
'swap': swap_bdm,
}
image_meta = objects.ImageMeta.from_dict({})
# Call spawn() with encrypted ephemeral block device.
ex = self.assertRaises(
exception.InvalidBDM, self.drvr.spawn, self.context, self.instance,
image_meta, [], None, {}, block_device_info=block_device_info)
self.assertIn(
'Found a mix of encrypted BDMs with and without existing '
'encryption secrets', ex.message)
self.mock_create_secret.assert_not_called()
self.mock_get_secret.assert_not_called()
self.mock_delete_secret.assert_not_called()
self.drvr._host.create_secret.assert_not_called()
self.drvr._host.find_secret.assert_not_called()
self.drvr._host.delete_secret.assert_not_called()
@mock.patch('nova.objects.instance.Instance.save', new=mock.Mock())
def _test_cleanup_with_ephemeral_encryption(
self, has_key_mgr_secret=True, has_libvirt_secret=True,
destroy_disks=True
):
mock_domain = mock.Mock(fakelibvirt.virDomain)
mock_domain.ID.return_value = 123
self.drvr._host._get_domain.return_value = mock_domain
self.drvr.delete_instance_files = mock.Mock(return_value=None)
self.drvr.get_info = mock.Mock(return_value=hardware.InstanceInfo(
state=power_state.SHUTDOWN, internal_id=-1))
if not has_libvirt_secret:
self.drvr._host.find_secret.return_value = None
# Create a DriverBlockDevice list from a BlockDeviceMapping object.
encryption_secret_uuid = uuids.secret if has_key_mgr_secret else None
bdm = block_device_obj.BlockDeviceMapping(
id=1, uuid=uuids.ephemeral, device_type='disk', disk_bus='virtio',
no_device=False, device_name='/dev/vdb', volume_size=1,
source_type='blank', destination_type='local', guest_format=None,
encrypted=True, encryption_format='plain',
encryption_options=None,
encryption_secret_uuid=encryption_secret_uuid,
)
ephemerals = [driver_block_device.DriverEphemeralBlockDevice(bdm)]
block_device_info = {'ephemerals': ephemerals}
self.instance.cleaned = True
# Call cleanup() with encrypted ephemeral block device.
self.drvr.cleanup(
self.context, self.instance, [],
block_device_info=block_device_info, destroy_disks=destroy_disks)
# Assert that we did not delete the key manager secret.
self.mock_delete_secret.assert_not_called()
# Assert that we deleted the libvirt secret.
if has_libvirt_secret and destroy_disks:
secret_usage = f'{self.instance.uuid}_{uuids.ephemeral}'
self.drvr._host.delete_secret.assert_called_once_with(
'volume', secret_usage)
else:
self.drvr._host.delete_secret.assert_not_called()
def test_cleanup_with_ephemeral_encryption(self):
self._test_cleanup_with_ephemeral_encryption()
def test_cleanup_with_ephemeral_encryption_no_key_mgr_secret(self):
self._test_cleanup_with_ephemeral_encryption(has_key_mgr_secret=False)
def test_cleanup_with_ephemeral_encryption_no_libvirt_secret(self):
self._test_cleanup_with_ephemeral_encryption(has_libvirt_secret=False)
def test_cleanup_with_ephemeral_encryption_no_destroy_disks(self):
self._test_cleanup_with_ephemeral_encryption(destroy_disks=False)
def test__cleanup_delete_secret_fails(self):
# Test exception handling when libvirt secret deletion fails during
# cleanup.
error = fakelibvirt.make_libvirtError(
fakelibvirt.libvirtError, msg='error',
error_code=fakelibvirt.VIR_ERR_INTERNAL_ERROR)
self.drvr._host.find_secret.return_value = mock.sentinel.secret
# Secret delete for img_bdm and eph_bdm fail but swap_bdm succeeds.
self.drvr._host.delete_secret.side_effect = [error, error, None]
block_device_info = driver.get_block_device_info(
self.instance, [self.img_bdm, self.eph_bdm, self.swap_bdm])
exp = self.assertRaises(
exception.EphemeralEncryptionCleanupFailed,
self.drvr._cleanup_ephemeral_encryption_secrets,
self.context, self.instance, block_device_info)
expected_msg = (
'Failed to clean up ephemeral encryption secrets: '
f'Failed to delete libvirt secret '
f'{self.instance.uuid}_{self.img_bdm.uuid}: error\n'
f'Failed to delete libvirt secret '
f'{self.instance.uuid}_{self.eph_bdm.uuid}: error')
self.assertEqual(expected_msg, str(exp))
@mock.patch.object(
libvirt_driver.LibvirtDriver, '_cleanup_lvm', new=mock.Mock())
@mock.patch.object(
libvirt_driver.LibvirtDriver, '_cleanup_rbd', new=mock.Mock())
@mock.patch.object(
libvirt_driver.LibvirtDriver, '_cleanup_ephemeral_encryption_secrets')
@ddt.data('raw', 'flat', 'qcow2', 'lvm', 'rbd', 'ploop', 'default')
def test__cleanup_with_ephemeral_encryption_no_cleanup_instance_dir(
self, images_type, mock_cleanup_secrets):
bdm_dict = {
'source_type': 'image',
'destination_type': 'local',
'encrypted': True,
'encryption_format': 'luks',
'encryption_options': None,
'encryption_secret_uuid': None,
}
bdm = fake_block_device.fake_bdm_object(self.context, bdm_dict)
bdi = {'image': [driver_block_device.DriverImageBlockDevice(bdm)]}
# Pass clean_instance_dir=False + clean_instance_disks=True
self.flags(images_type=images_type, group='libvirt')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
drvr._cleanup(
self.context, self.instance, [], block_device_info=bdi,
cleanup_instance_dir=False, cleanup_instance_disks=True)
if images_type not in ('lvm', 'rbd'):
# We should not have cleaned up encryption secrets because the
# disks were not deleted.
mock_cleanup_secrets.assert_not_called()
else:
# For 'lvm' and 'rbd' we should have cleaned up the secrets
# because cleanup for their disks is not related to the
# instance directory.
mock_cleanup_secrets.assert_called_once_with(
self.context, self.instance, bdi)
mock_cleanup_secrets.reset_mock()
@mock.patch('nova.objects.instance.Instance.save', new=mock.Mock())
@mock.patch.object(libvirt_driver.LibvirtDriver, 'delete_instance_files')
@mock.patch.object(
libvirt_driver.LibvirtDriver, '_cleanup_ephemeral_encryption_secrets')
def test__cleanup_with_ephemeral_encryption_cleanup_instance_dir_failed(
self, mock_cleanup_secrets, mock_delete_files):
# Simulate a failure to delete the instance files.
mock_delete_files.return_value = False
bdm_dict = {
'source_type': 'image',
'destination_type': 'local',
'encrypted': True,
'encryption_format': 'luks',
'encryption_options': None,
'encryption_secret_uuid': None,
}
bdm = fake_block_device.fake_bdm_object(self.context, bdm_dict)
bdi = {'image': [driver_block_device.DriverImageBlockDevice(bdm)]}
# Pass clean_instance_dir=True + clean_instance_disks=True
self.drvr._cleanup(
self.context, self.instance, [], block_device_info=bdi,
cleanup_instance_dir=True, cleanup_instance_disks=True)
# We should not have cleaned up encryption secrets because the disks
# were not deleted.
mock_cleanup_secrets.assert_not_called()

View File

@ -1662,7 +1662,10 @@ class LibvirtDriver(driver.ComputeDriver):
:param block_device_info: optional instance block device information
:param destroy_vifs: if plugged vifs should be unplugged
:param cleanup_instance_dir: If the instance dir should be removed
:param cleanup_instance_disks: If the instance disks should be removed
:param cleanup_instance_disks: If the instance disks should be removed.
Also removes ephemeral encryption secrets, if present.
:param destroy_secrets: If the cinder volume encryption secrets should
be deleted.
"""
# zero the data on backend pmem device
vpmems = self._get_vpmems(instance)
@ -1728,9 +1731,47 @@ class LibvirtDriver(driver.ComputeDriver):
if cleanup_instance_disks:
crypto.delete_vtpm_secret(context, instance)
# Make sure that the instance directory files were successfully
# deleted before destroying the encryption secrets in the case of
# image backends that are not 'lvm' or 'rbd'. We don't want to
# leave any chance that we delete the secrets if the disks have not
# been deleted.
if CONF.libvirt.images_type in ('lvm', 'rbd') or instance.cleaned:
self._cleanup_ephemeral_encryption_secrets(
context, instance, block_device_info)
self._undefine_domain(instance)
def _cleanup_ephemeral_encryption_secrets(
self, context, instance, block_device_info
):
exception_msgs = []
encrypted_bdms = driver.block_device_info_get_encrypted_disks(
block_device_info)
for driver_bdm in encrypted_bdms:
# NOTE(melwitt): We intentionally only delete libvirt secrets here
# and not secrets in the key manager service (example: barbican).
# Libvirt secrets are local to a compute host and are routinely
# deleted during instance move operations. If we're only moving, we
# don't want to delete the secret in the key manager service. The
# secret in the key manager service should only be deleted when the
# instance is deleted.
secret_usage = f"{instance.uuid}_{driver_bdm['uuid']}"
if self._host.find_secret('volume', secret_usage):
try:
self._host.delete_secret('volume', secret_usage)
except libvirt.libvirtError as e:
msg = (
f'Failed to delete libvirt secret {secret_usage}: ' +
str(e))
LOG.exception(msg, instance=instance)
exception_msgs.append(msg)
if exception_msgs:
msg = '\n'.join(exception_msgs)
raise exception.EphemeralEncryptionCleanupFailed(error=msg)
def cleanup_lingering_instance_resources(self, instance):
# zero the data on backend pmem device, if fails
# it will raise an exception
@ -4424,9 +4465,145 @@ class LibvirtDriver(driver.ComputeDriver):
def poll_rebooting_instances(self, timeout, instances):
pass
@staticmethod
def _get_or_create_encryption_secret(context, instance, driver_bdm):
created = False
secret_uuid = driver_bdm.get('encryption_secret_uuid')
if secret_uuid is None:
# Create a passphrase and stash it in the key manager
secret_uuid, secret = crypto.create_encryption_secret(
context, instance, driver_bdm)
# Stash the UUID of said secret in our driver BDM
driver_bdm['encryption_secret_uuid'] = secret_uuid
created = True
else:
# NOTE(melwitt): In general, we avoid reusing secrets but
# we need to reuse them in the case of shelve/unshelve and
# rebuild. The use case is if an admin user
# shelves/unshelves or rebuilds an instance owned by a
# non-admin user. If we don't reuse the non-admin user's
# secret and instead create a new secret, the new secret
# will be owned by the admin user and will prevent the
# non-admin user from accessing the new secret for their
# instance. There is no way in the barbican API to create a
# secret with a different user/project than the caller, so
# we have to just reuse the secret.
secret = crypto.get_encryption_secret(context, secret_uuid)
if secret is None:
# If we get here, because we know this BDM is supposed
# to have an existing secret, we also know all of the
# other BDMs have existing secrets too. Because we
# didn't create any secrets, we don't need to clean up
# any secrets.
msg = (
f'Failed to find encryption secret {secret_uuid} '
f'in the key manager for driver BDM '
f"{driver_bdm['uuid']}")
raise exception.EphemeralEncryptionSecretNotFound(msg)
return secret_uuid, secret, created
def _add_ephemeral_encryption_driver_bdm_attrs(
self,
context: nova_context.RequestContext,
instance: 'objects.Instance',
block_device_info: ty.Dict[str, ty.Any],
) -> ty.Optional[ty.Dict[str, ty.Any]]:
"""Add ephemeral encryption attributes to driver BDMs before use."""
encrypted_bdms = driver.block_device_info_get_encrypted_disks(
block_device_info)
# Either all of the driver_bdm's should have existing encryption
# secrets (unshelve, rebuild) or none of them should. There should
# never be a mix of both. If there is, something is wrong.
if encrypted_bdms:
bdms_without_secrets = [
driver_bdm for driver_bdm in encrypted_bdms
if not driver_bdm.get('encryption_secret_uuid')]
bdms_with_secrets = [
driver_bdm for driver_bdm in encrypted_bdms
if driver_bdm.get('encryption_secret_uuid')]
if bdms_without_secrets and bdms_with_secrets:
msg = (
f'Found a mix of encrypted BDMs with and without existing '
f'encryption secrets: {encrypted_bdms}')
raise exception.InvalidBDM(msg)
try:
orig_encrypted_bdms = []
created_keymgr_secrets = []
created_libvirt_secrets = []
for driver_bdm in encrypted_bdms:
orig_encrypted_bdms.append(deepcopy(driver_bdm))
# NOTE(lyarwood): Users can request that their ephemeral
# storage be encrypted without providing an encryption format
# to use. If one isn't provided use the host default here and
# record it in the driver BDM.
if driver_bdm.get('encryption_format') is None:
driver_bdm['encryption_format'] = (
CONF.ephemeral_storage_encryption.default_format)
secret_uuid, secret, created = (
self._get_or_create_encryption_secret(
context, instance, driver_bdm))
if created:
created_keymgr_secrets.append(secret_uuid)
# Ensure this is all saved back down in the database via the
# o.vo BlockDeviceMapping object
driver_bdm.save()
# Stash the passphrase itself in a libvirt secret using the
# same UUID as the key manager secret for easy retrieval later
secret_usage = f"{instance.uuid}_{driver_bdm['uuid']}"
# Be extra defensive here and delete any existing libvirt
# secret to ensure we are creating the secret we retrieved or
# created in the key manager just now.
if self._host.find_secret('volume', secret_usage):
self._host.delete_secret('volume', secret_usage)
self._host.create_secret(
'volume', secret_usage, password=secret, uuid=secret_uuid)
created_libvirt_secrets.append(secret_usage)
except Exception:
for secret_uuid in created_keymgr_secrets:
try:
crypto.delete_encryption_secret(
context, instance.uuid, secret_uuid)
except Exception:
LOG.exception(
f'Failed to delete encryption secret '
f'{secret_uuid} in the key manager', instance=instance)
for i, orig_driver_bdm in enumerate(orig_encrypted_bdms):
driver_bdm = encrypted_bdms[i]
for key in ('encryption_format', 'encryption_secret_uuid'):
driver_bdm[key] = orig_driver_bdm[key]
driver_bdm.save()
for secret_usage in created_libvirt_secrets:
try:
if self._host.find_secret('volume', secret_usage):
self._host.delete_secret('volume', secret_usage)
except Exception:
LOG.exception(
f'Failed to delete libvirt secret {secret_usage}',
instance=instance)
raise
return block_device_info
def spawn(self, context, instance, image_meta, injected_files,
admin_password, allocations, network_info=None,
block_device_info=None, power_on=True, accel_info=None):
# NOTE(lyarwood): Before we generate disk_info we need to ensure the
# driver_bdms are populated with any missing encryption attributes such
# as the format to use, associated options and encryption secret uuid.
# This avoids having to pass block_device_info and the driver bdms down
# into the imagebackend later when creating or building the config for
# the disks.
block_device_info = self._add_ephemeral_encryption_driver_bdm_attrs(
context, instance, block_device_info)
disk_info = blockinfo.get_disk_info(CONF.libvirt.virt_type,
instance,
image_meta,