Allow ignoring specific gpt safety checks

This allows the operator to configure specific GPT safety checks to be
non-fatal. This could be necessary if an operator finds that legit,
but non-spec-compliant images are already present in the system and
needs time to fix them. See this nova bug for an example of a widely-
distributed image that violates the GPT spec for example:

https://bugs.launchpad.net/nova/+bug/2091114

For maximum compatibility, we ignore the mbr check by default for the
first release with this check.

Related to blueprint glance-as-defender

Change-Id: Ibf9f5eb4e2378ddd77ebe61170b8e921ada792d7
This commit is contained in:
Dan Smith
2025-01-14 06:31:00 -08:00
parent 1040997b33
commit 046c738c4e
5 changed files with 126 additions and 22 deletions
+35 -20
View File
@@ -81,6 +81,37 @@ class _ConvertImage(task.Task):
with self.action_wrapper as action:
return self._execute(action, file_path, **kwargs)
def _inspect_path(self, path):
"""Return a FormatInspector for path.
This encapsulates the act of inspecting a file, running safety checks,
and raising/logging if anything goes wrong. If nothing fails, return
the FileInspector for the file.
"""
# Use our own cautious inspector module (if we have one for this
# format) to make sure a file is the format the submitter claimed
# it is and that it passes some basic safety checks _before_ we run
# qemu-img on it.
# See https://bugs.launchpad.net/nova/+bug/2059809 for details.
try:
inspector = format_inspector.detect_file_format(path)
inspector.safety_check()
except format_inspector.SafetyCheckFailed as e:
nonfatal = set(CONF.image_format.gpt_safety_checks_nonfatal)
fatal = e.failures.keys() - nonfatal
if inspector.NAME == 'gpt' and not fatal:
LOG.warning('Non-fatal %s', e)
else:
LOG.error('%s %s', str(inspector), e)
raise RuntimeError('Image has disallowed configuration')
except format_inspector.ImageFormatError as e:
LOG.error('Image failed format inspection: %s', e)
raise RuntimeError('Image format detection failed')
except Exception as e:
LOG.exception('Unknown error inspecting image format: %s', e)
raise RuntimeError('Unable to inspect image')
return inspector
def _execute(self, action, file_path, **kwargs):
target_format = CONF.image_conversion.output_format
@@ -91,26 +122,7 @@ class _ConvertImage(task.Task):
'target': target_format}
self.dest_path = dest_path
source_format = action.image_disk_format
# Use our own cautious inspector module (if we have one for this
# format) to make sure a file is the format the submitter claimed
# it is and that it passes some basic safety checks _before_ we run
# qemu-img on it.
# See https://bugs.launchpad.net/nova/+bug/2059809 for details.
try:
inspector = format_inspector.detect_file_format(self.src_path)
inspector.safety_check()
except format_inspector.SafetyCheckFailed:
LOG.error('Image failed %s safety check; aborting conversion',
source_format)
raise RuntimeError('Image has disallowed configuration')
except format_inspector.ImageFormatError as e:
LOG.error('Image claimed to be %s format failed format '
'inspection: %s', source_format, e)
raise RuntimeError('Image format detection failed')
except Exception as e:
LOG.exception('Unknown error inspecting image format: %s', e)
raise RuntimeError('Unable to inspect image')
inspector = self._inspect_path(self.src_path)
detected_format = str(inspector)
if detected_format == 'gpt':
@@ -213,6 +225,9 @@ class _ConvertImage(task.Task):
if stderr:
raise RuntimeError(stderr)
dest_inspector = self._inspect_path(dest_path)
# FIXME(danms): Assert that this is the expected format
LOG.info('Post-conversion image detected as %s', str(dest_inspector))
action.set_image_attribute(disk_format=target_format,
container_format='bare')
new_size = os.stat(dest_path).st_size
+16
View File
@@ -120,6 +120,22 @@ image_format_opts = [
'this feature improves security and consistency by '
'ensuring that images claiming to be a given format '
'have content matching that format.')),
cfg.ListOpt('gpt_safety_checks_nonfatal',
default=['mbr'],
help=_('List of safety check names to consider non-fatal when '
'validating disk_format=gpt images (or images detected '
'as gpt). Since MBR is very loosely-defined and GPT '
'(which is strictly defined) depends on the MBR '
'structure, it is easy to craft a GPT disk image that '
'works, but is not valid according to the spec. While '
'not recommended, it is possible to disable MBR or '
'GPT-specific safety checks if necessary for '
'compatibility. This can include "mbr" and/or "gptmbr" '
'to avoid failing format validation if those checks '
'fail. Use this with caution as it makes glance relax '
'its definition of what a valid GPT disk image looks '
'like and prevents strict adherence '
'to the GPT spec.')),
]
task_opts = [
cfg.IntOpt('task_time_to_live',
+9 -2
View File
@@ -644,8 +644,15 @@ class ImageProxy(glance.domain.proxy.Image):
'unable to calculate virtual size',
self.image.disk_format)
except format_inspector.SafetyCheckFailed as e:
LOG.warning('Image %s %s', format, e)
raise exception.InvalidImageData('Image failed safety checks')
nonfatal = set(CONF.image_format.gpt_safety_checks_nonfatal)
fatal = e.failures.keys() - nonfatal
if inspector.NAME == 'gpt' and not fatal:
# All the failures were non-fatal (per config) so log them and
# continue
LOG.warning('Non-fatal %s', e)
else:
LOG.warning('Image %s %s', format, e)
raise exception.InvalidImageData('Image failed safety checks')
except Exception as e:
LOG.error(_LE('Unable to determine stream format because: %s'), e)
# FIXME(danms): Do this based on config
@@ -435,6 +435,56 @@ class TestConvertImageTask(test_utils.BaseTestCase):
image = self.img_repo.get.return_value
self.assertEqual(123, image.virtual_size)
@mock.patch.object(os, 'stat')
@mock.patch.object(os, 'remove', new=mock.MagicMock())
def _test_image_convert_inspect_target_format(
self, mock_stat,
out_format='raw', src_format='qcow2', dst_format='raw',
dest_safe_fails=None):
self.config(output_format=out_format,
group='image_conversion')
convert = self._setup_image_convert_info_fail()
mock_stat.return_value.st_size = 123
with mock.patch.object(processutils, 'execute') as exc_mock:
exc_mock.return_value = (
'{"format": "%s", "virtual-size": 123}' % src_format, '')
pre_inspector = mock.MagicMock()
post_inspector = mock.MagicMock()
self.detect_file_format_mock.side_effect = [
pre_inspector, post_inspector]
pre_inspector.__str__.return_value = pre_inspector.NAME = (
src_format)
post_inspector.__str__.return_value = post_inspector.NAME = (
dst_format)
if dest_safe_fails:
post_inspector.safety_check.side_effect = (
format_inspector.SafetyCheckFailed(
{r: 'fail' for r in dest_safe_fails}))
convert.execute('file:///test/path.%s' % src_format)
def test_image_convert_ignores_configured_gpt_safety(self):
# Conversion succeeds, finds the proper format, but fails safety check
self.config(gpt_safety_checks_nonfatal=['mbr'],
group='image_format')
# GPT that fails only mbr should be allowed to proceed
self._test_image_convert_inspect_target_format(
out_format='raw', dst_format='gpt', dest_safe_fails=['mbr'])
# GPT that fails something not configured as non-fatal is rejected
self.assertRaisesRegex(
RuntimeError, 'disallowed configuration',
self._test_image_convert_inspect_target_format,
out_format='raw', dst_format='gpt', dest_safe_fails=['mbr',
'pmbr'])
# An non-GPT format that fails something configured as non-fatal for
# GPT-only is rejected
self.assertRaisesRegex(
RuntimeError, 'disallowed configuration',
self._test_image_convert_inspect_target_format,
out_format='vmdk', dst_format='vmdk', dest_safe_fails=['mbr'])
def _set_image_conversion(self, mock_os_remove, stores=[]):
mock_os_remove.return_value = None
wrapper = mock.MagicMock()
+16
View File
@@ -377,6 +377,7 @@ class TestStoreImage(utils.BaseTestCase):
image.set_data, iter((data,)), 1024)
def test_image_set_data_inspector_format_fails_safety_check(self):
self.config(gpt_safety_checks_nonfatal=[], group='image_format')
context = glance.context.RequestContext(user=USER1)
image_stub = ImageStub(UUID2, status='queued', locations=[])
image_stub.disk_format = 'gpt'
@@ -394,6 +395,21 @@ class TestStoreImage(utils.BaseTestCase):
'safety checks',
image.set_data, iter((data,)), 1024)
def test_image_set_data_gpt_safety_ignored(self):
self.config(gpt_safety_checks_nonfatal=['mbr'], group='image_format')
context = glance.context.RequestContext(user=USER1)
image_stub = ImageStub(UUID2, status='queued', locations=[])
image_stub.disk_format = 'gpt'
# We are going to pass an iterable data source, so use the
# FakeStoreAPIReader that actually reads from that data
data = self._get_stub_gpt()
# Zero out the partition table to make this unsafe
data[446:446 + 16] = b'\x00' * 16
store_api = unit_test_utils.FakeStoreAPIReader(max_size=2048)
image = glance.location.ImageProxy(image_stub, context,
store_api, self.store_utils)
image.set_data(iter((data,)), 1024)
def test_image_set_data_inspector_no_match_disabled(self):
self.config(require_image_format_match=False, group='image_format')
self.test_image_set_data_inspector_no_match()