Add iso file format inspector

This change includes unit tests for the ISO
format inspector using mkisofs to generate
the iso files.

A test for stashing qcow content in the system_area
of an iso file is also included.

This change modifies format_inspector.detect_file_format
to evaluate all inspectors until they are complete and
raise an InvalidDiskInfo exception if multiple formats
match.

Related-Bug: #2059809
Change-Id: I7e12718fb3e1f77eb8d1cfcb9fa64e8ddeb9e712
(cherry picked from commit b1cc39848e)
This commit is contained in:
Sean Mooney 2024-07-04 20:09:31 +01:00
parent 3a6d9a038f
commit eeda7c333c
4 changed files with 229 additions and 17 deletions

View File

@ -24,6 +24,7 @@ complex-format images.
import struct
from oslo_log import log as logging
from oslo_utils import units
LOG = logging.getLogger(__name__)
@ -843,6 +844,93 @@ class VDIInspector(FileInspector):
return 'vdi'
class ISOInspector(FileInspector):
"""ISO 9660 and UDF format
we need to check the first 32KB + descriptor size
to look for the ISO 9660 or UDF signature.
http://wiki.osdev.org/ISO_9660
http://wiki.osdev.org/UDF
mkisofs --help | grep udf
The Universal Disc Format or UDF is the filesystem used on DVDs and
Blu-Ray discs.UDF is an extension of ISO 9660 and shares the same
header structure and initial layout.
Like the CDFS(ISO 9660) file system,
the UDF file system uses a 2048 byte sector size,
and it designates that the first 16 sectors can be used by the OS
to store proprietary data or boot logic.
That means we need to check the first 32KB + descriptor size
to look for the ISO 9660 or UDF signature.
both formats have an extent based layout, so we can't determine
ahead of time where the descriptor will be located.
fortunately, the ISO 9660 and UDF formats have a Primary Volume Descriptor
located at the beginning of the image, which contains the volume size.
"""
def __init__(self, *a, **k):
super(ISOInspector, self).__init__(*a, **k)
self.new_region('system_area', CaptureRegion(0, 32 * units.Ki))
self.new_region('header', CaptureRegion(32 * units.Ki, 2 * units.Ki))
@property
def format_match(self):
if not self.complete:
return False
signature = self.region('header').data[1:6]
assert len(signature) == 5
return signature in (b'CD001', b'NSR02', b'NSR03')
@property
def virtual_size(self):
if not self.complete:
return 0
if not self.format_match:
return 0
# the header size is 2KB or 1 sector
# the first header field is the descriptor type which is 1 byte
# the second field is the standard identifier which is 5 bytes
# the third field is the version which is 1 byte
# the rest of the header contains type specific data is 2041 bytes
# see http://wiki.osdev.org/ISO_9660#The_Primary_Volume_Descriptor
# we need to check that the descriptor type is 1
# to ensure that this is a primary volume descriptor
descriptor_type = self.region('header').data[0]
if descriptor_type != 1:
return 0
# The size in bytes of a logical block is stored at offset 128
# and is 2 bytes long encoded in both little and big endian
# int16_LSB-MSB so the field is 4 bytes long
logical_block_size_data = self.region('header').data[128:132]
assert len(logical_block_size_data) == 4
# given the encoding we only need to read half the field so we
# can use the first 2 bytes which are the little endian part
# this is normally 2048 or 2KB but we need to check as it can be
# different according to the ISO 9660 standard.
logical_block_size, = struct.unpack('<H', logical_block_size_data[:2])
# The volume space size is the total number of logical blocks
# and is stored at offset 80 and is 8 bytes long
# as with the logical block size the field is encoded in both
# little and big endian as an int32_LSB-MSB
volume_space_size_data = self.region('header').data[80:88]
assert len(volume_space_size_data) == 8
# given the encoding we only need to read half the field so we
# can use the first 4 bytes which are the little endian part
volume_space_size, = struct.unpack('<L', volume_space_size_data[:4])
# the virtual size is the volume space size * logical block size
return volume_space_size * logical_block_size
def __str__(self):
return 'iso'
class InfoWrapper(object):
"""A file-like object that wraps another and updates a format inspector.
@ -896,6 +984,7 @@ ALL_FORMATS = {
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
'iso': ISOInspector,
}
@ -913,12 +1002,15 @@ def detect_file_format(filename):
"""Attempts to detect the format of a file.
This runs through a file one time, running all the known inspectors in
parallel. It stops reading the file once one of them matches or all of
parallel. It stops reading the file once all of them matches or all of
them are sure they don't match.
Returns the FileInspector that matched, if any. None if 'raw'.
:param filename: The path to the file to inspect.
:returns: A FormatInspector instance matching the file.
:raises: ImageFormatError if multiple formats are detected.
"""
inspectors = {k: v() for k, v in ALL_FORMATS.items()}
detections = []
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
for format, inspector in list(inspectors.items()):
@ -930,10 +1022,17 @@ def detect_file_format(filename):
continue
if (inspector.format_match and inspector.complete and
format != 'raw'):
# First complete match (other than raw) wins
return inspector
# record all match (other than raw)
detections.append(inspector)
inspectors.pop(format)
if all(i.complete for i in inspectors.values()):
# If all the inspectors are sure they are not a match, avoid
# reading to the end of the file to settle on 'raw'.
break
return inspectors['raw']
if len(detections) > 1:
all_formats = [str(inspector) for inspector in detections]
raise ImageFormatError(
'Multiple formats detected: %s' % ', '.join(all_formats))
return inspectors['raw'] if not detections else detections[0]

View File

@ -54,7 +54,13 @@ class TestFormatInspectors(test.NoDBTestCase):
except Exception:
pass
def _create_iso(self, image_size, subformat='iso-9660'):
def _create_iso(self, image_size, subformat='9660'):
"""Create an ISO file of the given size.
:param image_size: The size of the image to create in bytes
:param subformat: The subformat to use, if any
"""
# these tests depend on mkisofs
# being installed and in the path,
# if it is not installed, skip
@ -86,12 +92,22 @@ class TestFormatInspectors(test.NoDBTestCase):
'dd if=/dev/zero of=%s bs=1M count=%i' % (fn, size),
shell=True)
subprocess.check_output(
'%s -o %s -V "TEST" -J -r %s' % (base_cmd, fn, fn),
'%s -V "TEST" -o %s %s' % (base_cmd, fn, fn),
shell=True)
return fn
def _create_img(self, fmt, size, subformat=None, options=None,
backing_file=None):
def _create_img(
self, fmt, size, subformat=None, options=None,
backing_file=None):
"""Create an image file of the given format and size.
:param fmt: The format to create
:param size: The size of the image to create in bytes
:param subformat: The subformat to use, if any
:param options: A dictionary of options to pass to the format
:param backing_file: The backing file to use, if any
"""
if fmt == 'iso':
return self._create_iso(size, subformat)
@ -177,6 +193,13 @@ class TestFormatInspectors(test.NoDBTestCase):
def _test_format_at_image_size(self, format_name, image_size,
subformat=None):
"""Test the format inspector for the given format at the
given image size.
:param format_name: The format to test
:param image_size: The size of the image to create in bytes
:param subformat: The subformat to use, if any
"""
img = self._create_img(format_name, image_size, subformat=subformat)
# Some formats have internal alignment restrictions making this not
@ -185,7 +208,15 @@ class TestFormatInspectors(test.NoDBTestCase):
# Read the format in various sizes, some of which will read whole
# sections in a single read, others will be completely unaligned, etc.
for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
block_sizes = [64 * units.Ki, 1 * units.Mi]
# ISO images have a 32KB system area at the beginning of the image
# as a result reading that in 17 or 512 byte blocks takes too long,
# causing the test to fail. The 64KiB block size is enough to read
# the system area and header in a single read. the 1MiB block size
# adds very little time to the test so we include it.
if format_name != 'iso':
block_sizes.extend([17, 512])
for block_size in block_sizes:
fmt = self._test_format_at_block_size(format_name, img, block_size)
self.assertTrue(fmt.format_match,
'Failed to match %s at size %i block %i' % (
@ -210,14 +241,63 @@ class TestFormatInspectors(test.NoDBTestCase):
self._test_format('qcow2')
def test_iso_9660(self):
# reproduce iso-9660 format regression
self.assertRaises(
TypeError, self._test_format, 'iso', subformat='iso-9660')
self._test_format('iso', subformat='9660')
def test_udf(self):
# reproduce udf format regression
self.assertRaises(
TypeError, self._test_format, 'iso', subformat='udf')
def test_iso_udf(self):
self._test_format('iso', subformat='udf')
def _generate_bad_iso(self):
# we want to emulate a malicious user who uploads a an
# ISO file has a qcow2 header in the system area
# of the ISO file
# we will create a qcow2 image and an ISO file
# and then copy the qcow2 header to the ISO file
# e.g.
# mkisofs -o orig.iso /etc/resolv.conf
# qemu-img create orig.qcow2 -f qcow2 64M
# dd if=orig.qcow2 of=outcome bs=32K count=1
# dd if=orig.iso of=outcome bs=32K skip=1 seek=1
qcow = self._create_img('qcow2', 10 * units.Mi)
iso = self._create_iso(64 * units.Mi, subformat='9660')
# first ensure the files are valid
iso_fmt = self._test_format_at_block_size('iso', iso, 4 * units.Ki)
self.assertTrue(iso_fmt.format_match)
qcow_fmt = self._test_format_at_block_size('qcow2', qcow, 4 * units.Ki)
self.assertTrue(qcow_fmt.format_match)
# now copy the qcow2 header to an ISO file
prefix = TEST_IMAGE_PREFIX
prefix += '-bad-'
fn = tempfile.mktemp(prefix=prefix, suffix='.iso')
self._created_files.append(fn)
subprocess.check_output(
'dd if=%s of=%s bs=32K count=1' % (qcow, fn),
shell=True)
subprocess.check_output(
'dd if=%s of=%s bs=32K skip=1 seek=1' % (iso, fn),
shell=True)
return qcow, iso, fn
def test_bad_iso_qcow2(self):
_, _, fn = self._generate_bad_iso()
iso_check = self._test_format_at_block_size('iso', fn, 4 * units.Ki)
qcow_check = self._test_format_at_block_size('qcow2', fn, 4 * units.Ki)
# this system area of the ISO file is not considered part of the format
# the qcow2 header is in the system area of the ISO file
# so the ISO file is still valid
self.assertTrue(iso_check.format_match)
# the qcow2 header is in the system area of the ISO file
# but that will be parsed by the qcow2 format inspector
# and it will match
self.assertTrue(qcow_check.format_match)
# if we call format_inspector.detect_file_format it should detect
# and raise an exception because both match internally.
e = self.assertRaises(
format_inspector.ImageFormatError,
format_inspector.detect_file_format, fn)
self.assertIn('Multiple formats detected', str(e))
def test_vhd(self):
self._test_format('vhd')

View File

@ -235,6 +235,34 @@ class QemuTestCase(test.NoDBTestCase):
images.fetch_to_raw, None, 'foo', 'anypath')
self.assertIn('Invalid VMDK create-type specified', str(e))
@mock.patch('os.rename')
@mock.patch.object(images, 'IMAGE_API')
@mock.patch('nova.image.format_inspector.get_inspector')
@mock.patch.object(images, 'fetch')
@mock.patch('nova.privsep.qemu.unprivileged_qemu_img_info')
def test_fetch_iso_is_raw(self, mock_info, mock_fetch, mock_gi,
mock_glance, mock_rename):
mock_glance.get.return_value = {'disk_format': 'iso'}
inspector = mock_gi.return_value.from_file.return_value
inspector.safety_check.return_value = True
# qemu-img does not have a parser for iso so it is treated as raw
info = {
"virtual-size": 356352,
"filename": "foo.iso",
"format": "raw",
"actual-size": 356352,
"dirty-flag": False
}
mock_info.return_value = jsonutils.dumps(info)
with mock.patch('os.path.exists', return_value=True):
images.fetch_to_raw(None, 'foo', 'anypath')
# Make sure we called info with -f raw for an iso, since qemu-img does
# not support iso
mock_info.assert_called_once_with('anypath.part', format='raw')
# Make sure that since we considered this to be a raw file, we did the
# just-rename-don't-convert path
mock_rename.assert_called_once_with('anypath.part', 'anypath')
@mock.patch.object(images, 'IMAGE_API')
@mock.patch('nova.image.format_inspector.get_inspector')
@mock.patch.object(images, 'qemu_img_info')

View File

@ -174,6 +174,11 @@ def do_image_deep_inspection(img, image_href, path):
raise exception.ImageUnacceptable(
image_id=image_href,
reason=_('Image not in a supported format'))
if disk_format == 'iso':
# ISO image passed safety check; qemu will treat this as raw from here
disk_format = 'raw'
return disk_format