testing: Add ephemeral encryption support to fixtures

This adds encryption related methods and attributes to test fixtures to
enable functional testing for ephemeral encryption.

Related to blueprint ephemeral-encryption-libvirt

Change-Id: If65ec55d311ecf7fb3fe745ebbf116a430f60681
This commit is contained in:
melanie witt 2024-02-28 23:39:25 +00:00
parent e91aaaf551
commit 3a1c65a632
4 changed files with 72 additions and 5 deletions

View File

@ -56,6 +56,7 @@ import testtools
from nova.api.openstack import wsgi_app from nova.api.openstack import wsgi_app
from nova.compute import rpcapi as compute_rpcapi from nova.compute import rpcapi as compute_rpcapi
from nova import context from nova import context
import nova.crypto
from nova.db.main import api as db_api from nova.db.main import api as db_api
from nova import exception from nova import exception
from nova import objects from nova import objects
@ -325,6 +326,9 @@ class TestCase(base.BaseTestCase):
# the currently running test to fail. # the currently running test to fail.
self.useFixture(nova_fixtures.GreenThreadPoolShutdownWait()) self.useFixture(nova_fixtures.GreenThreadPoolShutdownWait())
# Reset the global key manager
nova.crypto._KEYMGR = None
def _setup_cells(self): def _setup_cells(self):
"""Setup a normal cellsv2 environment. """Setup a normal cellsv2 environment.

View File

@ -890,6 +890,13 @@ def _parse_disk_info(element):
if not disk_info['source']: if not disk_info['source']:
disk_info['source'] = source.get('path') disk_info['source'] = source.get('path')
encryption = element.find('./source/encryption')
if encryption is not None and len(encryption):
disk_info['encryption_format'] = encryption.get('format')
secret = encryption.find('./secret')
if secret is not None:
disk_info['encryption_secret'] = secret.get('uuid')
target = element.find('./target') target = element.find('./target')
if target is not None: if target is not None:
disk_info['target_dev'] = target.get('dev') disk_info['target_dev'] = target.get('dev')
@ -1416,12 +1423,23 @@ class Domain(object):
else: else:
source_attr = 'dev' source_attr = 'dev'
disks += '''<disk type='%(type)s' device='%(device)s'> strformat = """
<disk type='%(type)s' device='%(device)s'>
<driver name='%(driver_name)s' type='%(driver_type)s'/> <driver name='%(driver_name)s' type='%(driver_type)s'/>
<source %(source_attr)s='%(source)s'/> <source %(source_attr)s='%(source)s'"""
if 'encryption_format' not in disk:
strformat += '/>'
else:
strformat += """>
<encryption format='%(encryption_format)s'>
<secret type='passphrase' uuid='%(encryption_secret)s'/>
</encryption>
</source>"""
strformat += """
<target dev='%(target_dev)s' bus='%(target_bus)s'/> <target dev='%(target_dev)s' bus='%(target_bus)s'/>
<address type='drive' controller='0' bus='0' unit='0'/> <address type='drive' controller='0' bus='0' unit='0'/>
</disk>''' % dict(source_attr=source_attr, **disk) </disk>"""
disks += strformat % dict(source_attr=source_attr, **disk)
nics = '' nics = ''
for func, nic in enumerate(self._def['devices']['nics']): for func, nic in enumerate(self._def['devices']['nics']):
if func > 7: if func > 7:
@ -1700,6 +1718,11 @@ class Secret(object):
tree = etree.fromstring(xml) tree = etree.fromstring(xml)
self._uuid = tree.find('./uuid').text self._uuid = tree.find('./uuid').text
self._private = tree.get('private') == 'yes' self._private = tree.get('private') == 'yes'
self._usage_id = None
usage = tree.find('./usage')
if usage is not None:
if usage.get('type') == 'volume':
self._usage_id = usage.find('volume').text
def setValue(self, value, flags=0): def setValue(self, value, flags=0):
self._value = value self._value = value
@ -1726,6 +1749,14 @@ class Secret(object):
def undefine(self): def undefine(self):
self._connection._remove_secret(self) self._connection._remove_secret(self)
def UUIDString(self):
if self._uuid is not None:
return self._uuid
def usageID(self):
if self._usage_id is not None:
return self._usage_id
class Connection(object): class Connection(object):
def __init__( def __init__(
@ -2128,8 +2159,15 @@ class Connection(object):
<feature policy='require' name='aes'/> <feature policy='require' name='aes'/>
</cpu>""" </cpu>"""
def listAllSecrets(self, flags):
return [secret for secret in self._secrets.values()]
def secretLookupByUsage(self, usage_type_obj, usage_id): def secretLookupByUsage(self, usage_type_obj, usage_id):
pass for secret in self._secrets.values():
# Ignore usage_type_obj because we don't have a way to map libvrt
# usage type constants to strings.
if secret._usage_id == usage_id:
return secret
def secretDefineXML(self, xml): def secretDefineXML(self, xml):
secret = Secret(self, xml) secret = Secret(self, xml)

View File

@ -20,12 +20,16 @@ from unittest import mock
import fixtures import fixtures
import nova.conf
from nova.virt.libvirt import config from nova.virt.libvirt import config
from nova.virt.libvirt import driver from nova.virt.libvirt import driver
from nova.virt.libvirt import imagebackend from nova.virt.libvirt import imagebackend
from nova.virt.libvirt import utils as libvirt_utils from nova.virt.libvirt import utils as libvirt_utils
CONF = nova.conf.CONF
class LibvirtImageBackendFixture(fixtures.Fixture): class LibvirtImageBackendFixture(fixtures.Fixture):
def __init__(self, got_files=None, imported_files=None, exists=None): def __init__(self, got_files=None, imported_files=None, exists=None):
@ -206,6 +210,9 @@ class LibvirtImageBackendFixture(fixtures.Fixture):
setattr( setattr(
image_init, 'is_file_in_instance_path', is_file_in_instance_path) image_init, 'is_file_in_instance_path', is_file_in_instance_path)
image_init.SUPPORTS_LUKS = (
backend_self.BACKEND[CONF.libvirt.images_type].SUPPORTS_LUKS)
return image_init return image_init
def _fake_cache(self, fetch_func, filename, size=None, *args, **kwargs): def _fake_cache(self, fetch_func, filename, size=None, *args, **kwargs):
@ -228,6 +235,8 @@ class LibvirtImageBackendFixture(fixtures.Fixture):
): ):
# For tests in test_virt_drivers which expect libvirt_info to be # For tests in test_virt_drivers which expect libvirt_info to be
# functional # functional
# This is where the guest disk XML is first generated and is what tests
# will see when LibvirtFixture Domain XML are read and written.
info = config.LibvirtConfigGuestDisk() info = config.LibvirtConfigGuestDisk()
info.source_type = 'file' info.source_type = 'file'
info.source_device = mock_disk.disk_info_mapping['type'] info.source_device = mock_disk.disk_info_mapping['type']
@ -238,4 +247,15 @@ class LibvirtImageBackendFixture(fixtures.Fixture):
info.source_path = mock_disk.path info.source_path = mock_disk.path
if boot_order: if boot_order:
info.boot_order = boot_order info.boot_order = boot_order
if mock_disk.disk_info_mapping.get('encrypted'):
info.ephemeral_encryption = (
config.LibvirtConfigGuestDiskEncryption())
info.ephemeral_encryption.secret = (
config.LibvirtConfigGuestDiskEncryptionSecret())
info.ephemeral_encryption.secret.type = 'passphrase'
info.ephemeral_encryption.secret.uuid = (
mock_disk.disk_info_mapping['encryption_secret_uuid'])
info.ephemeral_encryption.format = (
mock_disk.disk_info_mapping['encryption_format'])
return info return info

View File

@ -1318,10 +1318,15 @@ class _IntegratedTestBase(test.TestCase, PlacementInstanceHelperMixin):
#: do real authentication. #: do real authentication.
STUB_KEYSTONE = True STUB_KEYSTONE = True
#: Whether to treat RPC casts as calls. This shouldn't really default to
#: True but a significant number of existing tests may be relying on it.
CAST_AS_CALL = True
def setUp(self): def setUp(self):
super(_IntegratedTestBase, self).setUp() super(_IntegratedTestBase, self).setUp()
self.useFixture(nova_fixtures.CastAsCallFixture(self)) if self.CAST_AS_CALL:
self.useFixture(nova_fixtures.CastAsCallFixture(self))
self.placement = self.useFixture(func_fixtures.PlacementFixture()).api self.placement = self.useFixture(func_fixtures.PlacementFixture()).api
self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self)) self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self))