libvirt: Add parsing of firmware metadata files

Recent versions of libvirt provide the ability to auto-configure
firmware, using firmware metadata files provided by QEMU [1] to do so.
Unfortunately this feature is missing knobs to allow us to select e.g.
whether secure boot is enabled or what keys are enrolled [2]. Until
libvirt gains the ability to do this for us, we need to do it ourselves.

Add the ability to parse the firmware metadata files, providing a better
way to identify the location of firmware and NVRAM template files along
with the capabilities of same. This is pretty trivial thanks to the good
documentation that exists for same.

[1] https://github.com/qemu/qemu/blob/v5.2.0/docs/interop/firmware.json
[2] https://bugzilla.redhat.com/show_bug.cgi?id=1906500

Change-Id: If08cbe2d5f2b301ac92a183f15fb87a18836bff5
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2021-03-05 20:08:04 +00:00
parent 1862971faa
commit faad45b632
2 changed files with 272 additions and 0 deletions

View File

@ -20,6 +20,7 @@ import eventlet
from eventlet import greenthread
from eventlet import tpool
import mock
from oslo_serialization import jsonutils
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import uuidutils
import testtools
@ -1209,6 +1210,21 @@ cg /cgroup/memory cg opt1,opt2 0 0
def test_is_cpu_control_policy_capable_ioerror(self, mock_open):
self.assertFalse(self.host.is_cpu_control_policy_capable())
def test_get_canonical_machine_type(self):
# this test relies on configuration from the FakeLibvirtFixture
machine_type = self.host.get_canonical_machine_type('x86_64', 'pc')
self.assertEqual('pc-i440fx-2.11', machine_type)
machine_type = self.host.get_canonical_machine_type(
'x86_64', 'pc-i440fx-2.11')
self.assertEqual('pc-i440fx-2.11', machine_type)
self.assertRaises(
exception.InternalError,
self.host.get_canonical_machine_type,
'x86_64', 'pc-foo-1.2')
@mock.patch('nova.virt.libvirt.host.libvirt.Connection.getCapabilities')
def test_has_hyperthreading__true(self, mock_cap):
mock_cap.return_value = """
@ -1442,6 +1458,86 @@ cg /cgroup/memory cg opt1,opt2 0 0
"""
self.assertTrue(self.host.supports_secure_boot)
@mock.patch.object(host.Host, 'loaders', new_callable=mock.PropertyMock)
@mock.patch.object(host.Host, 'get_canonical_machine_type')
def test_get_loader(self, mock_get_mtype, mock_loaders):
loaders = [
{
'description': 'Sample descriptor',
'interface-types': ['uefi'],
'mapping': {
'device': 'flash',
'executable': {
'filename': '/usr/share/edk2/ovmf/OVMF_CODE.fd',
'format': 'raw',
},
'nvram-template': {
'filename': '/usr/share/edk2/ovmf/OVMF_VARS.fd',
'format': 'raw',
},
},
'targets': [
{
'architecture': 'x86_64',
'machines': ['pc-q35-*'], # exclude pc-i440fx-*
},
],
'features': ['acpi-s3', 'amd-sev', 'verbose-dynamic'],
'tags': [],
},
]
def fake_get_mtype(arch, machine):
return {
'x86_64': {
'pc': 'pc-i440fx-5.1',
'q35': 'pc-q35-5.1',
},
'aarch64': {
'virt': 'virt-5.1',
},
}[arch][machine]
mock_get_mtype.side_effect = fake_get_mtype
mock_loaders.return_value = loaders
# this should pass because we're not reporting the secure-boot feature
# which is what we don't want
loader = self.host.get_loader('x86_64', 'q35', has_secure_boot=False)
self.assertIsNotNone(loader)
# while it should fail here since we want it now
self.assertRaises(
exception.UEFINotSupported,
self.host.get_loader,
'x86_64', 'q35', has_secure_boot=True)
# it should also fail for an unsupported architecture
self.assertRaises(
exception.UEFINotSupported,
self.host.get_loader,
'aarch64', 'virt', has_secure_boot=False)
# or an unsupported machine type
self.assertRaises(
exception.UEFINotSupported,
self.host.get_loader,
'x86_64', 'pc', has_secure_boot=False)
# add the secure-boot feature flag
loaders[0]['features'].append('secure-boot')
# this should pass because we're reporting the secure-boot feature
# which is what we want
loader = self.host.get_loader('x86_64', 'q35', has_secure_boot=True)
self.assertIsNotNone(loader)
# while it should fail here since we don't want it now
self.assertRaises(
exception.UEFINotSupported,
self.host.get_loader,
'x86_64', 'q35', has_secure_boot=False)
vc = fakelibvirt.virConnect
@ -1566,3 +1662,51 @@ class LibvirtTpoolProxyTestCase(test.NoDBTestCase):
for domain in domains:
self.assertIsInstance(domain, tpool.Proxy)
self.assertIn(domain.UUIDString(), (uuids.vm1, uuids.vm2))
class LoadersTestCase(test.NoDBTestCase):
def test_loaders(self):
loader = {
'description': 'Sample descriptor',
'interface-types': ['uefi'],
'mapping': {
'device': 'flash',
'executable': {
'filename': '/usr/share/edk2/ovmf/OVMF_CODE.fd',
'format': 'raw',
},
'nvram-template': {
'filename': '/usr/share/edk2/ovmf/OVMF_VARS.fd',
'format': 'raw',
},
},
'targets': [
{
'architecture': 'x86_64',
'machines': ['pc-i440fx-*', 'pc-q35-*'],
},
],
'features': ['acpi-s3', 'amd-sev', 'verbose-dynamic'],
'tags': [],
}
m = mock.mock_open(read_data=jsonutils.dumps(loader).encode('utf-8'))
with test.nested(
mock.patch.object(
os.path, 'exists',
side_effect=lambda path: path == '/usr/share/qemu/firmware'),
mock.patch('glob.glob', return_value=['10_fake.json']),
mock.patch('builtins.open', m, create=True),
) as (mock_exists, mock_glob, mock_open):
loaders = host._get_loaders()
self.assertEqual(loaders, [loader])
mock_exists.assert_has_calls([
mock.call('/usr/share/qemu/firmware'),
mock.call('/etc/qemu/firmware'),
])
mock_glob.assert_called_once_with(
'/usr/share/qemu/firmware/*.json')
mock_open.assert_called_once_with('10_fake.json')

View File

@ -28,6 +28,8 @@ the other libvirt related classes
"""
from collections import defaultdict
import fnmatch
import glob
import inspect
import operator
import os
@ -41,6 +43,7 @@ from eventlet import greenthread
from eventlet import patcher
from eventlet import tpool
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import units
@ -81,6 +84,36 @@ HV_DRIVER_QEMU = "QEMU"
SEV_KERNEL_PARAM_FILE = '/sys/module/kvm_amd/parameters/sev'
# These are taken from the spec
# https://github.com/qemu/qemu/blob/v5.2.0/docs/interop/firmware.json
QEMU_FIRMWARE_DESCRIPTOR_PATHS = [
'/usr/share/qemu/firmware',
'/etc/qemu/firmware',
# we intentionally ignore '$XDG_CONFIG_HOME/qemu/firmware'
]
def _get_loaders():
if not any(
os.path.exists(path) for path in QEMU_FIRMWARE_DESCRIPTOR_PATHS
):
msg = _("Failed to locate firmware descriptor files")
raise exception.InternalError(msg)
_loaders = []
for path in QEMU_FIRMWARE_DESCRIPTOR_PATHS:
if not os.path.exists(path):
continue
for spec_path in sorted(glob.glob(f'{path}/*.json')):
with open(spec_path) as fh:
spec = jsonutils.load(fh)
_loaders.append(spec)
return _loaders
class Host(object):
@ -118,6 +151,8 @@ class Host(object):
self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt)
self._libvirt_proxy = self._wrap_libvirt_proxy(libvirt)
self._loaders: ty.Optional[ty.List[dict]] = None
# A number of features are conditional on support in the hardware,
# kernel, QEMU, and/or libvirt. These are determined on demand and
# memoized by various properties below
@ -1247,6 +1282,34 @@ class Host(object):
except IOError:
return False
def get_canonical_machine_type(self, arch, machine) -> str:
"""Resolve a machine type to its canonical representation.
Libvirt supports machine type aliases. On an x86 host the 'pc' machine
type is an alias for e.g. 'pc-1440fx-5.1'. Resolve the provided machine
type to its canonical representation so that it can be used for other
operations.
:param arch: The guest arch.
:param machine: The guest machine type.
:returns: The canonical machine type.
:raises: exception.InternalError if the machine type cannot be resolved
to its canonical representation.
"""
for guest in self.get_capabilities().guests:
if guest.arch != arch:
continue
for domain in guest.domains:
if machine in guest.domains[domain].machines:
return machine
if machine in guest.domains[domain].aliases:
return guest.domains[domain].aliases[machine]['canonical']
msg = _('Invalid machine type: %s')
raise exception.InternalError(msg % machine)
@property
def has_hyperthreading(self) -> bool:
"""Determine if host CPU has SMT, a.k.a. HyperThreading.
@ -1376,3 +1439,68 @@ class Host(object):
LOG.debug("No AMD SEV support detected for any (arch, machine_type)")
return self._supports_amd_sev
@property
def loaders(self) -> ty.List[dict]:
"""Retrieve details of loader configuration for the host.
Inspect the firmware metadata files provided by QEMU [1] to retrieve
information about the firmware supported by this host. Note that most
distros only publish this information for UEFI loaders currently.
This should be removed when libvirt correctly supports switching
between loaders with or without secure boot enabled [2].
[1] https://github.com/qemu/qemu/blob/v5.2.0/docs/interop/firmware.json
[2] https://bugzilla.redhat.com/show_bug.cgi?id=1906500
:returns: An ordered list of loader configuration dictionaries.
"""
if self._loaders is not None:
return self._loaders
self._loaders = _get_loaders()
return self._loaders
def get_loader(
self,
arch: str,
machine: str,
has_secure_boot: bool,
) -> ty.Tuple[str, str]:
"""Get loader for the specified architecture and machine type.
:returns: A tuple of the bootloader executable path and the NVRAM
template path.
"""
machine = self.get_canonical_machine_type(arch, machine)
for loader in self.loaders:
for target in loader['targets']:
if arch != target['architecture']:
continue
for machine_glob in target['machines']:
# the 'machines' attribute supports glob patterns (e.g.
# 'pc-q35-*') so we need to resolve these
if fnmatch.fnmatch(machine, machine_glob):
break
else:
continue
# if we've got this far, we have a match on the target
break
else:
continue
# if we request secure boot then we should get it and vice versa
if has_secure_boot != ('secure-boot' in loader['features']):
continue
return (
loader['mapping']['executable']['filename'],
loader['mapping']['nvram-template']['filename'],
)
raise exception.UEFINotSupported()