libvirt: Add emulated TPM support to Nova

Previous patches added support for parsing the vTPM-related flavor extra
specs and image metadata properties, the necessary integrations with the
Castellan key manager API etc. This change adds the ability to enable
support in the libvirt driver and create guests with vTPM functionality
enabled. Cold migration and resize are not yet supported. These will be
addressed in follow-on changes.

Functional tests are included. These require expansion of the
fakelibvirt stubs to implement basic secret management

Part of blueprint add-emulated-virtual-tpm

[1] https://review.opendev.org/686804

Change-Id: I1ff51f608b85dbb621814e70079ecfdd3d1a1d22
Co-Authored-By: Eric Fried <openstack@fried.cc>
Co-Authored-By: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Chris Friesen 2019-02-27 22:56:49 -06:00 committed by Stephen Finucane
parent ecff78e05d
commit e0ca2652ed
8 changed files with 882 additions and 29 deletions

View File

@ -1427,6 +1427,15 @@ For example::
]
libvirt_vtpm_opts = [
cfg.BoolOpt('swtpm_enabled',
default=False,
help="""
Enable emulated TPM (Trusted Platform Module) in guests.
"""),
]
ALL_OPTS = list(itertools.chain(
libvirt_general_opts,
libvirt_imagebackend_opts,
@ -1446,6 +1455,7 @@ ALL_OPTS = list(itertools.chain(
libvirt_virtio_queue_sizes,
libvirt_volume_nvmeof_opts,
libvirt_pmem_opts,
libvirt_vtpm_opts,
))

View File

@ -98,9 +98,18 @@ class ServersTestBase(integrated_helpers._IntegratedTestBase):
return self.start_service('scheduler')
def _get_connection(self, host_info, pci_info=None,
def _get_connection(
self, host_info=None, pci_info=None,
libvirt_version=fakelibvirt.FAKE_LIBVIRT_VERSION,
mdev_info=None, hostname=None):
qemu_version=fakelibvirt.FAKE_QEMU_VERSION,
mdev_info=None, hostname=None,
):
if not host_info:
host_info = fakelibvirt.HostInfo(
cpu_nodes=2, cpu_sockets=1, cpu_cores=2, cpu_threads=2,
kB_mem=16 * 1024 ** 2, # 16 GB
)
# sanity check
self.assertGreater(16, host_info.cpus,
"Host.get_online_cpus is only accounting for 16 CPUs but you're "
@ -109,7 +118,7 @@ class ServersTestBase(integrated_helpers._IntegratedTestBase):
fake_connection = fakelibvirt.Connection(
'qemu:///system',
version=libvirt_version,
hv_version=fakelibvirt.FAKE_QEMU_VERSION,
hv_version=qemu_version,
host_info=host_info,
pci_info=pci_info,
mdev_info=mdev_info,

View File

@ -0,0 +1,377 @@
# Copyright (C) 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import shutil
from castellan.common.objects import passphrase
from castellan.key_manager import key_manager
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
from oslo_utils import versionutils
import nova.conf
from nova import context as nova_context
from nova import crypto
from nova import exception
from nova import objects
from nova.tests.functional.api import client
from nova.tests.functional.libvirt import base
from nova.virt.libvirt import driver
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
FAKE_LIBVIRT_VERSION = versionutils.convert_version_to_int(
driver.MIN_LIBVIRT_VTPM)
FAKE_QEMU_VERSION = versionutils.convert_version_to_int(driver.MIN_QEMU_VTPM)
class FakeKeyManager(key_manager.KeyManager):
"""A fake key manager.
This key manager implementation supports a minimum subset of methods
specified by the key manager interface that are required for vTPM. Side
effects (e.g., raising exceptions) for each method are handled as specified
by the key manager interface.
"""
def __init__(self, configuration):
super().__init__(configuration)
#: A mapping of UUIDs to passphrases.
self._passphrases = {}
def create_key(self, context, algorithm, length, **kwargs):
"""Creates a symmetric key.
This is not implemented as it's unnecessary here.
"""
raise NotImplementedError(
"FakeKeyManager does not support symmetric keys"
)
def create_key_pair(self, context, **kwargs):
"""Creates an asymmetric keypair.
This is not implemented as it's unnecessary here.
"""
raise NotImplementedError(
"FakeKeyManager does not support asymmetric keys"
)
def store(self, context, managed_object, **kwargs):
"""Stores (i.e., registers) a passphrase with the key manager."""
if context is None:
raise exception.Forbidden()
if not isinstance(managed_object, passphrase.Passphrase):
raise exception.KeyManagerError(
reason='cannot store anything except passphrases')
uuid = uuidutils.generate_uuid()
managed_object._id = uuid # set the id to simulate persistence
self._passphrases[uuid] = managed_object
return uuid
def get(self, context, managed_object_id):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A Forbidden exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if context is None:
raise exception.Forbidden()
if managed_object_id not in self._passphrases:
raise KeyError('cannot retrieve non-existent secret')
return self._passphrases[managed_object_id]
def delete(self, context, managed_object_id):
"""Represents deleting the key.
Simply delete the key from our list of keys.
"""
if context is None:
raise exception.Forbidden()
if managed_object_id not in self._passphrases:
raise exception.KeyManagerError(
reason="cannot delete non-existent secret")
del self._passphrases[managed_object_id]
class VTPMServersTest(base.ServersTestBase):
def setUp(self):
# enable vTPM and use our own fake key service
self.flags(swtpm_enabled=True, group='libvirt')
self.flags(
backend='nova.tests.functional.libvirt.test_vtpm.FakeKeyManager',
group='key_manager')
super().setUp()
original_which = shutil.which
def which(cmd, *args, **kwargs):
if cmd == 'swtpm':
return True
return original_which(cmd, *args, **kwargs)
self.useFixture(fixtures.MonkeyPatch('shutil.which', which))
self.key_mgr = crypto._get_key_manager()
# TODO(stephenfin): This should be moved to the base class
def start_compute(self, hostname='compute1'):
libvirt_version = versionutils.convert_version_to_int(
driver.MIN_LIBVIRT_VTPM)
qemu_version = versionutils.convert_version_to_int(
driver.MIN_QEMU_VTPM)
fake_connection = self._get_connection(
libvirt_version=libvirt_version, qemu_version=qemu_version,
hostname=hostname)
# This is fun. Firstly we need to do a global'ish mock so we can
# actually start the service.
with mock.patch(
'nova.virt.libvirt.host.Host.get_connection',
return_value=fake_connection,
):
compute = self.start_service('compute', host=hostname)
# Once that's done, we need to tweak the compute "service" to
# make sure it returns unique objects. We do this inside the
# mock context to avoid a small window between the end of the
# context and the tweaking where get_connection would revert to
# being an autospec mock.
compute.driver._host.get_connection = lambda: fake_connection
return compute
def _create_server_with_vtpm(self):
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
flavor_id = self._create_flavor(extra_spec=extra_specs)
server = self._create_server(flavor_id=flavor_id)
return server
def _create_server_without_vtpm(self):
# use the default flavor (i.e. one without vTPM extra specs)
return self._create_server()
def assertInstanceHasSecret(self, server):
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, server['id'])
self.assertIn('vtpm_secret_uuid', instance.system_metadata)
self.assertEqual(1, len(self.key_mgr._passphrases))
self.assertIn(
instance.system_metadata['vtpm_secret_uuid'],
self.key_mgr._passphrases)
def assertInstanceHasNoSecret(self, server):
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, server['id'])
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
self.assertEqual(0, len(self.key_mgr._passphrases))
def test_create_server(self):
self.compute = self.start_compute()
# ensure we are reporting the correct traits
root_rp_uuid = self._get_provider_uuid_by_name(self.compute.host)
traits = self._get_provider_traits(root_rp_uuid)
for trait in ('COMPUTE_SECURITY_TPM_1_2', 'COMPUTE_SECURITY_TPM_2_0'):
self.assertIn(trait, traits)
# create a server with vTPM
server = self._create_server_with_vtpm()
# ensure our instance's system_metadata field and key manager inventory
# is correct
self.assertInstanceHasSecret(server)
# now delete the server
self._delete_server(server)
# ensure we deleted the key now that we no longer need it
self.assertEqual(0, len(self.key_mgr._passphrases))
def test_suspend_resume_server(self):
self.compute = self.start_compute()
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# suspend the server
server = self._suspend_server(server)
# ensure our instance's system_metadata field and key manager inventory
# is correct
self.assertInstanceHasSecret(server)
# resume the server
server = self._resume_server(server)
# ensure our instance's system_metadata field and key manager inventory
# is still correct
self.assertInstanceHasSecret(server)
def test_soft_reboot_server(self):
self.compute = self.start_compute()
# create a server with vTPM
server = self._create_server_with_vtpm()
# soft reboot the server
server = self._reboot_server(server, hard=False)
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field and key manager inventory
# is still correct
self.assertInstanceHasSecret(server)
def test_hard_reboot_server(self):
self.compute = self.start_compute()
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# hard reboot the server
server = self._reboot_server(server, hard=True)
# ensure our instance's system_metadata field and key manager inventory
# is still correct
self.assertInstanceHasSecret(server)
def test_resize_server__no_vtpm_to_vtpm(self):
self.computes = {}
for host in ('test_compute0', 'test_compute1'):
self.computes[host] = self.start_compute(host)
# create a server without vTPM
server = self._create_server_without_vtpm()
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field and key manager inventory
# is correct
self.assertInstanceHasNoSecret(server)
# create a flavor with vTPM
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
flavor_id = self._create_flavor(extra_spec=extra_specs)
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
# probably be less...dumb
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# resize the server to a new flavor *with* vTPM
self.assertRaises(
client.OpenStackApiException,
self._resize_server, server, flavor_id=flavor_id)
def test_resize_server__vtpm_to_no_vtpm(self):
self.computes = {}
for host in ('test_compute0', 'test_compute1'):
self.computes[host] = self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
# create a flavor without vTPM
flavor_id = self._create_flavor()
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
# probably be less...dumb
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# resize the server to a new flavor *without* vTPM
# TODO(stephenfin): Add support for this operation
self.assertRaises(
client.OpenStackApiException,
self._resize_server, server, flavor_id=flavor_id)
def test_migrate_server(self):
self.computes = {}
for host in ('test_compute0', 'test_compute1'):
self.computes[host] = self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
# probably be less...dumb
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# cold migrate the server
# TODO(stephenfin): Add support for this operation
self.assertRaises(
client.OpenStackApiException,
self._migrate_server, server)
def test_live_migrate_server(self):
self.computes = {}
for host in ('test_compute0', 'test_compute1'):
self.computes[host] = self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
# live migrate the server
self.assertRaises(
client.OpenStackApiException,
self._live_migrate_server, server)
def test_shelve_server(self):
self.computes = {}
for host in ('test_compute0', 'test_compute1'):
self.computes[host] = self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
# attempt to shelve the server
self.assertRaises(
client.OpenStackApiException,
self._shelve_server, server)

View File

@ -127,10 +127,12 @@ VIR_DOMAIN_START_PAUSED = 1
# hardcoding the numerical values)
VIR_FROM_QEMU = 100
VIR_FROM_DOMAIN = 200
VIR_FROM_SECRET = 300
VIR_FROM_NWFILTER = 330
VIR_FROM_REMOTE = 340
VIR_FROM_RPC = 345
VIR_FROM_NODEDEV = 666
VIR_ERR_INVALID_ARG = 8
VIR_ERR_NO_SUPPORT = 3
VIR_ERR_XML_ERROR = 27
@ -144,6 +146,7 @@ VIR_ERR_SYSTEM_ERROR = 900
VIR_ERR_INTERNAL_ERROR = 950
VIR_ERR_CONFIG_UNSUPPORTED = 951
VIR_ERR_NO_NODE_DEVICE = 667
VIR_ERR_INVALID_SECRET = 65
VIR_ERR_NO_SECRET = 66
VIR_ERR_AGENT_UNRESPONSIVE = 86
VIR_ERR_ARGUMENT_UNSUPPORTED = 74
@ -180,7 +183,7 @@ VIR_SECRET_USAGE_TYPE_ISCSI = 3
# Libvirt version to match MIN_LIBVIRT_VERSION in driver.py
FAKE_LIBVIRT_VERSION = 4000000
# Libvirt version to match MIN_QEMU_VERSION in driver.py
# QEMU version to match MIN_QEMU_VERSION in driver.py
FAKE_QEMU_VERSION = 2011000
PCI_VEND_ID = '8086'
@ -1350,11 +1353,42 @@ class DomainSnapshot(object):
class Secret(object):
"""A stub Secret class. Not currently returned by any test, but required to
exist for introspection.
"""
def __init__(self, connection, xml):
self._connection = connection
self._xml = xml
self._parse_xml(xml)
self._value = None
def _parse_xml(self, xml):
tree = etree.fromstring(xml)
self._uuid = tree.find('./uuid').text
self._private = tree.get('private') == 'yes'
def setValue(self, value, flags=0):
self._value = value
return 0
def value(self, flags=0):
if self._value is None:
raise make_libvirtError(
libvirtError,
"secret '%s' does not have a value" % self._uuid,
error_code=VIR_ERR_NO_SECRET,
error_domain=VIR_FROM_SECRET)
pass
if self._private:
raise make_libvirtError(
libvirtError,
'secret is private',
error_code=VIR_ERR_INVALID_SECRET,
error_domain=VIR_FROM_SECRET)
return self._value
def undefine(self):
self._connection._remove_secret(self)
class Connection(object):
def __init__(self, uri=None, readonly=False, version=FAKE_LIBVIRT_VERSION,
@ -1389,6 +1423,7 @@ class Connection(object):
self._id_counter = 1 # libvirt reserves 0 for the hypervisor.
self._nwfilters = {}
self._nodedevs = {}
self._secrets = {}
self._event_callbacks = {}
self.fakeLibVersion = version
self.fakeVersion = hv_version
@ -1411,6 +1446,12 @@ class Connection(object):
def _remove_nodedev(self, nodedev):
del self._nodedevs[nodedev._name]
def _add_secret(self, secret):
self._secrets[secret._uuid] = secret
def _remove_secret(self, secret):
del self._secrets[secret._uuid]
def _mark_running(self, dom):
self._running_vms[self._id_counter] = dom
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0)
@ -1730,7 +1771,9 @@ class Connection(object):
pass
def secretDefineXML(self, xml):
pass
secret = Secret(self, xml)
self._add_secret(secret)
return secret
def listAllDevices(self, flags):
# Note this is incomplete as we do not filter

View File

@ -1088,6 +1088,15 @@ class LibvirtConnTestCase(test.NoDBTestCase,
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertTrue(drvr.capabilities['supports_image_type_ploop'])
def test_driver_capabilities_vtpm(self):
self.flags(swtpm_enabled=True, group='libvirt')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertTrue(
drvr.capabilities['supports_vtpm'],
"Driver capabilities for 'supports_vtpm' is invalid when "
"'swtpm_enabled=True'"
)
def test_driver_raises_on_non_linux_platform(self):
with utils.temporary_mutation(sys, platform='darwin'):
self.assertRaises(
@ -1172,6 +1181,8 @@ class LibvirtConnTestCase(test.NoDBTestCase,
'COMPUTE_STORAGE_BUS_VIRTIO': True,
'COMPUTE_GRAPHICS_MODEL_VGA': True,
'COMPUTE_NET_VIF_MODEL_VIRTIO': True,
'COMPUTE_SECURITY_TPM_1_2': False,
'COMPUTE_SECURITY_TPM_2_0': False,
}
static_traits = drvr.static_traits
@ -1213,7 +1224,11 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_vif_traits.return_value = {'COMPUTE_NET_VIF_MODEL_VIRTIO': True}
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
expected = {'COMPUTE_NET_VIF_MODEL_VIRTIO': True}
expected = {
'COMPUTE_NET_VIF_MODEL_VIRTIO': True,
'COMPUTE_SECURITY_TPM_1_2': False,
'COMPUTE_SECURITY_TPM_2_0': False,
}
static_traits = drvr.static_traits
@ -1505,6 +1520,73 @@ class LibvirtConnTestCase(test.NoDBTestCase,
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr.init_host("dummyhost")
def test__check_vtpm_support_non_qemu(self):
"""Test checking for vTPM support when we're not using QEMU or KVM."""
self.flags(swtpm_enabled=True, virt_type='lxc', group='libvirt')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
exc = self.assertRaises(exception.InvalidConfiguration,
drvr.init_host, 'dummyhost')
self.assertIn("vTPM support requires '[libvirt] virt_type' of 'qemu' "
"or 'kvm'; found lxc.", six.text_type(exc))
@mock.patch.object(host.Host, 'has_min_version')
def test__check_vtpm_support_old_qemu(self, mock_version):
"""Test checking for vTPM support when our QEMU or libvirt version is
too old.
"""
self.flags(swtpm_enabled=True, virt_type='kvm', group='libvirt')
def fake_has_min_version(lv_ver=None, hv_ver=None, hv_type=None):
if lv_ver and hv_ver:
return lv_ver < (5, 6, 0) and hv_ver < (2, 11, 0)
return True
mock_version.side_effect = fake_has_min_version
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
exc = self.assertRaises(exception.InvalidConfiguration,
drvr.init_host, 'dummyhost')
self.assertIn("vTPM support requires QEMU version", six.text_type(exc))
@mock.patch.object(host.Host, 'has_min_version', return_value=True)
@mock.patch('shutil.which')
def test__check_vtpm_support_missing_exe(self, mock_which, mock_version):
"""Test checking for vTPM support when the swtpm binaries are
missing.
"""
self.flags(swtpm_enabled=True, virt_type='kvm', group='libvirt')
mock_which.return_value = False
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
exc = self.assertRaises(exception.InvalidConfiguration,
drvr.init_host, "dummyhost")
self.assertIn(
"vTPM support is configured but the 'swtpm' and 'swtpm_setup' "
"binaries could not be found on PATH.",
str(exc),
)
mock_which.assert_has_calls(
[mock.call('swtpm_setup'), mock.call('swtpm')],
)
@mock.patch.object(host.Host, 'has_min_version')
@mock.patch('shutil.which')
def test__check_vtpm_support(self, mock_which, mock_version):
"""Test checking for vTPM support when everything is configured
correctly.
"""
self.flags(swtpm_enabled=True, virt_type='kvm', group='libvirt')
mock_version.return_value = True
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr.init_host('dummyhost')
mock_which.assert_has_calls(
[mock.call('swtpm_setup'), mock.call().__bool__()],
)
mock_version.assert_called_with(lv_ver=(5, 6, 0), hv_ver=(2, 11, 0))
@mock.patch.object(libvirt_driver.LOG, 'warning')
def test_check_cpu_set_configuration__no_configuration(self, mock_log):
"""Test that configuring no CPU option results no errors or logs.
@ -6544,6 +6626,49 @@ class LibvirtConnTestCase(test.NoDBTestCase,
self.assertEqual(cfg.devices[9].type, "unix")
self.assertEqual(cfg.devices[9].target_name, "org.qemu.guest_agent.0")
def test_get_guest_config_with_vtpm(self):
self.flags(virt_type='kvm', group='libvirt')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
instance = objects.Instance(**self.test_instance)
instance.system_metadata['vtpm_secret_uuid'] = uuids.vtpm
image_meta = objects.ImageMeta.from_dict({
'disk_format': 'raw',
'properties': {
'hw_tpm_version': '2.0',
'hw_tpm_model': 'tpm-crb',
},
})
disk_info = blockinfo.get_disk_info(
CONF.libvirt.virt_type, instance, image_meta)
cfg = drvr._get_guest_config(instance, [], image_meta, disk_info)
self.assertEqual(len(cfg.devices), 9)
self.assertIsInstance(
cfg.devices[0], vconfig.LibvirtConfigGuestDisk)
self.assertIsInstance(
cfg.devices[1], vconfig.LibvirtConfigGuestDisk)
self.assertIsInstance(
cfg.devices[2], vconfig.LibvirtConfigGuestSerial)
self.assertIsInstance(
cfg.devices[3], vconfig.LibvirtConfigGuestInput)
self.assertIsInstance(
cfg.devices[4], vconfig.LibvirtConfigGuestGraphics)
self.assertIsInstance(
cfg.devices[5], vconfig.LibvirtConfigGuestVideo)
self.assertIsInstance(
cfg.devices[6], vconfig.LibvirtConfigGuestRng)
self.assertIsInstance(
cfg.devices[7], vconfig.LibvirtConfigGuestVTPM)
self.assertIsInstance(
cfg.devices[8], vconfig.LibvirtConfigMemoryBalloon)
self.assertEqual(cfg.devices[3].type, 'tablet')
self.assertEqual(cfg.devices[4].type, 'vnc')
self.assertEqual(cfg.devices[7].version, '2.0')
self.assertEqual(cfg.devices[7].model, 'tpm-crb')
self.assertEqual(cfg.devices[7].secret_uuid, uuids.vtpm)
def test_get_guest_config_with_video_driver_vram(self):
self.flags(enabled=False, group='vnc')
self.flags(virt_type='kvm', group='libvirt')
@ -14863,7 +14988,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
# We should have created the root and ephemeral disks
self.assertEqual(['disk', 'disk.local'], disks_created)
def test_start_lxc_from_volume(self):
def test_spawn_lxc_from_volume(self):
self.flags(virt_type="lxc",
group='libvirt')
@ -14997,6 +15122,39 @@ class LibvirtConnTestCase(test.NoDBTestCase,
return_value=mock_connection):
drvr.spawn(self.context, instance, image_meta, [], None, {})
@mock.patch('nova.crypto.ensure_vtpm_secret')
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver._create_guest_with_network')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info')
@mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver._get_guest_xml',
new=mock.Mock())
def test_spawn_with_vtpm(
self, mock_get_info, mock_create_guest, mock_get_vtpm,
mock_ensure_vtpm,
):
"""Ensure spawning with vTPM requested results in pre-config of
instance.
"""
self.flags(swtpm_enabled=True, group='libvirt')
self.useFixture(fake_imagebackend.ImageBackendFixture())
mock_get_info.return_value = hardware.InstanceInfo(
state=power_state.RUNNING)
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
instance_ref = self.test_instance
instance_ref['image_ref'] = 'my_fake_image'
instance = objects.Instance(**instance_ref)
instance.system_metadata = {}
image_meta = objects.ImageMeta.from_dict(self.test_image_meta)
drvr.spawn(self.context, instance, image_meta, [], None, {})
mock_get_vtpm.assert_called_once_with(instance.flavor, image_meta)
mock_ensure_vtpm.assert_called_once_with(self.context, instance)
def _test_create_image_plain(self, os_type='', filename='', mkfs=False):
gotFiles = []
@ -18613,9 +18771,10 @@ class LibvirtConnTestCase(test.NoDBTestCase,
dom_mock.ID.assert_called_once_with()
mock_get_domain.assert_called_once_with(instance)
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch.object(libvirt_guest.Guest, 'create')
def test_create_guest__with_callback(
self, mock_guest_create,
self, mock_guest_create, mock_get_vtpm,
):
"""Check that callback function is called if provided."""
instance = objects.Instance(**self.test_instance)
@ -18623,31 +18782,136 @@ class LibvirtConnTestCase(test.NoDBTestCase,
callback = mock.Mock()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr.supports_vtpm = False
drvr._create_guest(
self.context, xml, instance, post_xml_callback=callback)
mock_get_vtpm.assert_not_called()
mock_guest_create.assert_called_once_with(xml, drvr._host)
mock_guest_create.return_value.launch.assert_called_once_with(
pause=False)
callback.assert_called_once()
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch.object(libvirt_guest.Guest, 'create')
def test_create_guest__no_launch(self, mock_guest_create):
def test_create_guest__no_launch(self, mock_guest_create, mock_get_vtpm):
"""Check that guest is not started unless requested."""
instance = objects.Instance(**self.test_instance)
xml = '<xml>'
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr.supports_vtpm = False
drvr._create_guest(
self.context, xml, instance, power_on=False, pause=False)
mock_get_vtpm.assert_not_called()
mock_guest_create.assert_called_once_with(xml, drvr._host)
mock_guest_create.return_value.launch.assert_not_called()
@mock.patch('nova.crypto.ensure_vtpm_secret')
@mock.patch(
'nova.objects.Instance.image_meta',
new_callable=mock.PropertyMock)
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch.object(libvirt_guest.Guest, 'create')
def test_create_guest__with_vtpm_support_but_no_request(
self, mock_guest_create, mock_get_vtpm, mock_image_meta, mock_secret,
):
"""Check that vTPM is not created unless requested by the guest."""
self.flags(swtpm_enabled=True, group='libvirt')
instance = objects.Instance(**self.test_instance)
instance = objects.Instance(**self.test_instance)
image_meta = objects.ImageMeta.from_dict({})
xml = '<xml>'
mock_image_meta.return_value = image_meta
mock_get_vtpm.return_value = None
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr._create_guest(
self.context, xml, instance)
# we should have queried the instance's flavor, image for vTPM stuff...
mock_get_vtpm.assert_called_once_with(instance.flavor, image_meta)
mock_guest_create.assert_called_once_with(xml, drvr._host)
mock_guest_create.return_value.launch.assert_called_once()
# ...but we should not have created the secret because it wasn't needed
mock_secret.assert_not_called()
@mock.patch('nova.virt.libvirt.host.Host')
@mock.patch('nova.crypto.ensure_vtpm_secret')
@mock.patch(
'nova.objects.Instance.image_meta',
new_callable=mock.PropertyMock)
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch.object(libvirt_guest.Guest, 'create')
def test_create_guest__with_vtpm(
self, mock_guest_create, mock_get_vtpm, mock_image_meta, mock_secret,
mock_host,
):
"""Check that vTPM secret is created and cleaned up again after."""
self.flags(swtpm_enabled=True, group='libvirt')
instance = objects.Instance(**self.test_instance)
image_meta = objects.ImageMeta.from_dict({})
xml = '<xml>'
mock_image_meta.return_value = image_meta
mock_secret.return_value = (uuids.fake_secret, 'passphrase')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
drvr._create_guest(self.context, xml, instance)
# we should have queried the instance's flavor, image for vTPM stuff
mock_get_vtpm.assert_called_once_with(instance.flavor, image_meta)
mock_guest_create.assert_called_once_with(xml, drvr._host)
mock_guest_create.return_value.launch.assert_called_once()
# we should also have created the secret...
drvr._host.create_secret.assert_called_once_with(
'vtpm', instance.uuid, password='passphrase',
uuid=uuids.fake_secret)
# ...and undefined it after
drvr._host.create_secret.return_value.undefine.assert_called_once()
@mock.patch('nova.virt.libvirt.host.Host')
@mock.patch('nova.crypto.ensure_vtpm_secret')
@mock.patch(
'nova.objects.Instance.image_meta',
new_callable=mock.PropertyMock)
@mock.patch.object(hardware, 'get_vtpm_constraint')
@mock.patch.object(libvirt_guest.Guest, 'create')
def test_create_guest__with_vtpm_error(
self, mock_guest_create, mock_get_vtpm, mock_image_meta, mock_secret,
mock_host,
):
"""Check that vTPM secret is always cleaned up even if there's an
error.
"""
self.flags(swtpm_enabled=True, group='libvirt')
instance = objects.Instance(**self.test_instance)
image_meta = objects.ImageMeta.from_dict({})
xml = '<xml>'
mock_guest_create.side_effect = ValueError('foo')
mock_image_meta.return_value = image_meta
mock_secret.return_value = (uuids.fake_secret, 'passphrase')
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
self.assertRaises(
ValueError, drvr._create_guest, self.context, xml, instance)
# we should have queried the instance's flavor, image for vTPM stuff
mock_get_vtpm.assert_called_once_with(instance.flavor, image_meta)
mock_guest_create.assert_called_once_with(xml, drvr._host)
# we should also have created the secret...
drvr._host.create_secret.assert_called_once_with(
'vtpm', instance.uuid, password='passphrase',
uuid=uuids.fake_secret)
# ...and undefined it after, despite the error
drvr._host.create_secret.return_value.undefine.assert_called_once()
@mock.patch('nova.virt.disk.api.clean_lxc_namespace')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info')
@mock.patch('nova.virt.disk.api.setup_container')
@ -19567,6 +19831,32 @@ class LibvirtConnTestCase(test.NoDBTestCase,
drvr.cleanup, 'ctxt', fake_inst, 'netinfo')
unplug.assert_called_once_with(fake_inst, 'netinfo', True)
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver._undefine_domain')
@mock.patch('nova.crypto.delete_vtpm_secret')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.delete_instance_files')
@mock.patch('nova.virt.driver.block_device_info_get_mapping')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver._unplug_vifs')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver._get_vpmems',
new=mock.Mock(return_value=None))
def test_cleanup_pass(
self, mock_unplug, mock_get_mapping, mock_delete_files,
mock_delete_vtpm, mock_undefine,
):
"""Test with default parameters."""
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI())
fake_inst = objects.Instance(**self.test_instance)
mock_get_mapping.return_value = []
mock_delete_files.return_value = True
with mock.patch.object(fake_inst, 'save'):
drvr.cleanup('ctxt', fake_inst, 'netinfo')
mock_unplug.assert_called_once_with(fake_inst, 'netinfo', True)
mock_get_mapping.assert_called_once_with(None)
mock_delete_files.assert_called_once_with(fake_inst)
mock_delete_vtpm.assert_called_once_with('ctxt', fake_inst)
mock_undefine.assert_called_once_with(fake_inst)
@mock.patch.object(libvirt_driver.LibvirtDriver, 'delete_instance_files',
return_value=True)
@mock.patch.object(objects.Instance, 'save')
@ -21117,6 +21407,12 @@ class TestUpdateProviderTree(test.NoDBTestCase):
for trait in ['HW_CPU_X86_AVX512F', 'HW_CPU_X86_BMI']:
self.assertIn(trait, self.pt.data(self.cn_rp['uuid']).traits)
def test_update_provider_tree_with_tpm_traits(self):
self.flags(swtpm_enabled=True, group='libvirt')
self._test_update_provider_tree()
for trait in ('COMPUTE_SECURITY_TPM_2_0', 'COMPUTE_SECURITY_TPM_1_2'):
self.assertIn(trait, self.pt.data(self.cn_rp['uuid']).traits)
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'
'_get_mediated_device_information')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'

View File

@ -231,7 +231,6 @@ MIN_QEMU_VERSION = (2, 11, 0)
NEXT_MIN_LIBVIRT_VERSION = (5, 0, 0)
NEXT_MIN_QEMU_VERSION = (4, 0, 0)
# Virtuozzo driver support
MIN_VIRTUOZZO_VERSION = (7, 0, 0)
@ -280,6 +279,10 @@ MIN_QEMU_BLOCKDEV = (4, 2, 0)
MIN_LIBVIRT_VIR_ERR_DEVICE_MISSING = (4, 1, 0)
# Virtual TPM (vTPM) support
MIN_LIBVIRT_VTPM = (5, 6, 0)
MIN_QEMU_VTPM = (2, 11, 0)
class LibvirtDriver(driver.ComputeDriver):
def __init__(self, virtapi, read_only=False):
@ -326,7 +329,7 @@ class LibvirtDriver(driver.ComputeDriver):
"supports_pcpus": True,
"supports_accelerators": True,
"supports_bfv_rescue": True,
"supports_vtpm": False,
"supports_vtpm": CONF.libvirt.swtpm_enabled,
}
super(LibvirtDriver, self).__init__(virtapi)
@ -723,6 +726,8 @@ class LibvirtDriver(driver.ComputeDriver):
self._check_cpu_compatibility()
self._check_vtpm_support()
def _check_cpu_compatibility(self):
mode = CONF.libvirt.cpu_mode
models = CONF.libvirt.cpu_models
@ -772,6 +777,43 @@ class LibvirtDriver(driver.ComputeDriver):
'flag': flag, 'e': e})
raise exception.InvalidCPUInfo(msg)
def _check_vtpm_support(self) -> None:
# TODO(efried): A key manager must be configured to create/retrieve
# secrets. Is there a way to check that one is set up correctly?
# CONF.key_manager.backend is optional :(
if not CONF.libvirt.swtpm_enabled:
return
if CONF.libvirt.virt_type not in ('qemu', 'kvm'):
msg = _(
"vTPM support requires '[libvirt] virt_type' of 'qemu' or "
"'kvm'; found %s.")
raise exception.InvalidConfiguration(msg % CONF.libvirt.virt_type)
if not self._host.has_min_version(
lv_ver=MIN_LIBVIRT_VTPM, hv_ver=MIN_QEMU_VTPM,
):
msg = _(
'vTPM support requires QEMU version %(qemu)s or greater and '
'Libvirt version %(libvirt)s or greater.')
raise exception.InvalidConfiguration(msg % {
'qemu': libvirt_utils.version_to_string(MIN_QEMU_VTPM),
'libvirt': libvirt_utils.version_to_string(MIN_LIBVIRT_VTPM),
})
# These executables need to be installed for libvirt to make use of
# emulated TPM.
# NOTE(stephenfin): This checks using the PATH of the user running
# nova-compute rather than the libvirtd service, meaning it's an
# imperfect check but the best we can do
if not any(shutil.which(cmd) for cmd in ('swtpm_setup', 'swtpm')):
msg = _(
"vTPM support is configured but the 'swtpm' and "
"'swtpm_setup' binaries could not be found on PATH.")
raise exception.InvalidConfiguration(msg)
LOG.debug('Enabling emulated TPM support')
@staticmethod
def _is_existing_mdev(uuid):
# FIXME(sbauza): Some kernel can have a uevent race meaning that the
@ -1443,6 +1485,9 @@ class LibvirtDriver(driver.ComputeDriver):
instance.cleaned = True
instance.save()
if cleanup_instance_disks:
crypto.delete_vtpm_secret(context, instance)
self._undefine_domain(instance)
def _cleanup_vpmems(self, vpmems):
@ -3269,6 +3314,8 @@ class LibvirtDriver(driver.ComputeDriver):
# on which vif type we're using and we are working with a stale network
# info cache here, so won't rely on waiting for neutron plug events.
# vifs_already_plugged=True means "do not wait for neutron plug events"
# NOTE(efried): The instance should already have a vtpm_secret_uuid
# registered if appropriate.
self._create_guest_with_network(
context, xml, instance, network_info, block_device_info,
vifs_already_plugged=True)
@ -3418,6 +3465,8 @@ class LibvirtDriver(driver.ComputeDriver):
"""resume the specified instance."""
xml = self._get_existing_domain_xml(instance, network_info,
block_device_info)
# NOTE(efried): The instance should already have a vtpm_secret_uuid
# registered if appropriate.
guest = self._create_guest_with_network(
context, xml, instance, network_info, block_device_info,
vifs_already_plugged=True)
@ -3562,6 +3611,8 @@ class LibvirtDriver(driver.ComputeDriver):
self._create_image(context, instance, disk_info['mapping'],
injection_info=injection_info, suffix='.rescue',
disk_images=rescue_images)
# NOTE(efried): The instance should already have a vtpm_secret_uuid
# registered if appropriate.
xml = self._get_guest_xml(context, instance, network_info, disk_info,
image_meta, rescue=rescue_images,
mdevs=mdevs,
@ -3579,6 +3630,7 @@ class LibvirtDriver(driver.ComputeDriver):
"""Reboot the VM which is being rescued back into primary images."""
instance_dir = libvirt_utils.get_instance_path(instance)
unrescue_xml_path = os.path.join(instance_dir, 'unrescue.xml')
# The xml should already contain the secret_uuid if relevant.
xml = libvirt_utils.load_file(unrescue_xml_path)
self._destroy(instance)
@ -3625,6 +3677,18 @@ class LibvirtDriver(driver.ComputeDriver):
# Does the guest need to be assigned some vGPU mediated devices ?
mdevs = self._allocate_mdevs(allocations)
# If the guest needs a vTPM, _get_guest_xml needs its secret to exist
# and its uuid to be registered in the instance prior to _get_guest_xml
if CONF.libvirt.swtpm_enabled and hardware.get_vtpm_constraint(
instance.flavor, image_meta
):
if not instance.system_metadata.get('vtpm_secret_uuid'):
# Create the secret via the key manager service so that we have
# it to hand when generating the XML. This is slightly wasteful
# as we'll perform a redundant key manager API call later when
# we create the domain but the alternative is an ugly mess
crypto.ensure_vtpm_secret(context, instance)
xml = self._get_guest_xml(context, instance, network_info,
disk_info, image_meta,
block_device_info=block_device_info,
@ -5446,6 +5510,27 @@ class LibvirtDriver(driver.ComputeDriver):
virtio_controller.type = 'virtio-serial'
guest.add_device(virtio_controller)
def _add_vtpm_device(
self,
guest: libvirt_guest.Guest,
flavor: 'objects.Flavor',
instance: 'objects.Instance',
image_meta: 'objects.ImageMeta',
):
"""Add a vTPM device to the guest, if requested."""
# Enable virtual tpm support if required in the flavor or image.
vtpm_config = hardware.get_vtpm_constraint(flavor, image_meta)
if not vtpm_config:
return
vtpm_secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
if not vtpm_secret_uuid:
raise exception.Invalid(
'Refusing to create an emulated TPM with no secret!')
vtpm = vconfig.LibvirtConfigGuestVTPM(vtpm_config, vtpm_secret_uuid)
guest.add_device(vtpm)
def _set_qemu_guest_agent(self, guest, flavor, instance, image_meta):
# Enable qga only if the 'hw_qemu_guest_agent' is equal to yes
if image_meta.properties.get('hw_qemu_guest_agent', False):
@ -6045,10 +6130,11 @@ class LibvirtDriver(driver.ComputeDriver):
if caps.host.cpu.arch == fields.Architecture.AARCH64:
self._guest_add_usb_host_keyboard(guest)
# Qemu guest agent only support 'qemu' and 'kvm' hypervisor
# Some features are only supported 'qemu' and 'kvm' hypervisor
if virt_type in ('qemu', 'kvm'):
self._set_qemu_guest_agent(guest, flavor, instance, image_meta)
self._add_rng_device(guest, flavor, image_meta)
self._add_vtpm_device(guest, flavor, instance, image_meta)
if self._guest_needs_pcie(guest, caps):
self._guest_add_pcie_root_ports(guest)
@ -6493,6 +6579,18 @@ class LibvirtDriver(driver.ComputeDriver):
:returns guest.Guest: Created guest.
"""
libvirt_secret = None
# determine whether vTPM is in use and, if so, create the secret
if CONF.libvirt.swtpm_enabled and hardware.get_vtpm_constraint(
instance.flavor, instance.image_meta,
):
secret_uuid, passphrase = crypto.ensure_vtpm_secret(
context, instance)
libvirt_secret = self._host.create_secret(
'vtpm', instance.uuid, password=passphrase,
uuid=secret_uuid)
try:
guest = libvirt_guest.Guest.create(xml, self._host)
if post_xml_callback is not None:
post_xml_callback()
@ -6501,6 +6599,9 @@ class LibvirtDriver(driver.ComputeDriver):
guest.launch(pause=pause)
return guest
finally:
if libvirt_secret is not None:
libvirt_secret.undefine()
def _neutron_failed_callback(self, event_name, instance):
LOG.error('Neutron Reported failure on event '
@ -7795,6 +7896,7 @@ class LibvirtDriver(driver.ComputeDriver):
traits.update(self._get_storage_bus_traits())
traits.update(self._get_video_model_traits())
traits.update(self._get_vif_model_traits())
traits.update(self._get_tpm_traits())
_, invalid_traits = ot.check_traits(traits)
for invalid_trait in invalid_traits:
@ -10906,6 +11008,13 @@ class LibvirtDriver(driver.ComputeDriver):
nova.privsep.fs.FS_FORMAT_EXT4,
nova.privsep.fs.FS_FORMAT_XFS]
def _get_tpm_traits(self) -> ty.Dict[str, bool]:
# Assert or deassert TPM support traits
return {
ot.COMPUTE_SECURITY_TPM_2_0: CONF.libvirt.swtpm_enabled,
ot.COMPUTE_SECURITY_TPM_1_2: CONF.libvirt.swtpm_enabled,
}
def _get_vif_model_traits(self) -> ty.Dict[str, bool]:
"""Get vif model traits based on the currently enabled virt_type.

View File

@ -1048,8 +1048,8 @@ class Host(object):
def delete_secret(self, usage_type, usage_id):
"""Delete a secret.
usage_type: one of 'iscsi', 'ceph', 'rbd' or 'volume'
usage_id: name of resource in secret
:param usage_type: one of 'iscsi', 'ceph', 'rbd', 'volume' or 'vtpm'
:param usage_id: name of resource in secret
"""
secret = self.find_secret(usage_type, usage_id)
if secret is not None:

View File

@ -0,0 +1,9 @@
---
features:
- |
Nova now supports adding an emulated virtual `Trusted Platform Module`__ to
libvirt guests with a ``virt_type`` of ``kvm`` or ``qemu``. See the
documentation__ for details.
.. __: https://en.wikipedia.org/wiki/Trusted_Platform_Module
.. __: https://docs.openstack.org/nova/latest/admin/emulated-tpm.rst