From eed2a8aeb7a2e1ba47ff5642a346a97e85b6b0ff Mon Sep 17 00:00:00 2001 From: Abhishek Kekane <akekane@redhat.com> Date: Mon, 8 Jul 2024 19:28:38 +0000 Subject: [PATCH] Add iso file format inspector This change excludes image conversion if source image format is ISO. This change includes unit tests for the ISO format inspector using mkisofs to generate the iso files. A test for stashing qcow content in the system_area of an iso file is also included. This change modifies format_inspector.detect_file_format to evaluate all inspectors until they are complete and raise an InvalidDiskInfo exception if multiple formats match. Related-Bug: #2059809 Change-Id: Id706480e31687d8ade6f7199b600aff3ad7c68f7 (cherry picked from commit d8de63a5008b811f2e22cc5cdda59bd2cc05b207) --- .../async_/flows/plugins/image_conversion.py | 29 ++-- glance/common/format_inspector.py | 100 ++++++++++++- .../flows/plugins/test_image_conversion.py | 101 ++++++++++++- .../unit/common/test_format_inspector.py | 137 +++++++++++++++++- glance/tests/unit/test_store_image.py | 7 +- glance/tests/unit/utils.py | 2 +- 6 files changed, 345 insertions(+), 31 deletions(-) diff --git a/glance/async_/flows/plugins/image_conversion.py b/glance/async_/flows/plugins/image_conversion.py index fedbec117a..a2fed6ea02 100644 --- a/glance/async_/flows/plugins/image_conversion.py +++ b/glance/async_/flows/plugins/image_conversion.py @@ -90,16 +90,7 @@ class _ConvertImage(task.Task): dest_path = "%(path)s.%(target)s" % {'path': self.src_path, 'target': target_format} self.dest_path = dest_path - source_format = action.image_disk_format - inspector_cls = format_inspector.get_inspector(source_format) - if not inspector_cls: - # We cannot convert from disk_format types that qemu-img doesn't - # support (like iso, ploop, etc). The ones it supports overlaps - # with the ones we have inspectors for, so reject conversion for - # any format we don't have an inspector for. - raise RuntimeError( - 'Unable to convert from format %s' % source_format) # Use our own cautious inspector module (if we have one for this # format) to make sure a file is the format the submitter claimed @@ -107,7 +98,7 @@ class _ConvertImage(task.Task): # qemu-img on it. # See https://bugs.launchpad.net/nova/+bug/2059809 for details. try: - inspector = inspector_cls.from_file(self.src_path) + inspector = format_inspector.detect_file_format(self.src_path) if not inspector.safety_check(): LOG.error('Image failed %s safety check; aborting conversion', source_format) @@ -122,6 +113,24 @@ class _ConvertImage(task.Task): LOG.exception('Unknown error inspecting image format: %s', e) raise RuntimeError('Unable to inspect image') + if str(inspector) == 'iso': + if source_format == 'iso': + # NOTE(abhishekk): Excluding conversion and preserving image + # disk_format as `iso` only + LOG.debug("Avoiding conversion of an image %s having" + " `iso` disk format.", self.image_id) + return file_path + + # NOTE(abhishekk): Raising error as image detected as ISO but + # claimed as different format + LOG.error('Image claimed to be %s format but format ' + 'inspection found: ISO', source_format) + raise RuntimeError("Image has disallowed configuration") + elif str(inspector) != source_format: + LOG.error('Image claimed to be %s format failed format ' + 'inspection', source_format) + raise RuntimeError('Image format mismatch') + try: stdout, stderr = putils.trycmd("qemu-img", "info", "-f", source_format, diff --git a/glance/common/format_inspector.py b/glance/common/format_inspector.py index 65502d8892..c083448ac1 100755 --- a/glance/common/format_inspector.py +++ b/glance/common/format_inspector.py @@ -24,6 +24,7 @@ complex-format images. import struct from oslo_log import log as logging +from oslo_utils import units LOG = logging.getLogger(__name__) @@ -839,6 +840,84 @@ class VDIInspector(FileInspector): return 'vdi' +class ISOInspector(FileInspector): + """ISO 9660 and UDF format + we need to check the first 32KB + descriptor size + to look for the ISO 9660 or UDF signature. + http://wiki.osdev.org/ISO_9660 + http://wiki.osdev.org/UDF + mkisofs --help | grep udf + The Universal Disc Format or UDF is the filesystem used on DVDs and + Blu-Ray discs.UDF is an extension of ISO 9660 and shares the same + header structure and initial layout. + Like the CDFS(ISO 9660) file system, + the UDF file system uses a 2048 byte sector size, + and it designates that the first 16 sectors can be used by the OS + to store proprietary data or boot logic. + That means we need to check the first 32KB + descriptor size + to look for the ISO 9660 or UDF signature. + both formats have an extent based layout, so we can't determine + ahead of time where the descriptor will be located. + fortunately, the ISO 9660 and UDF formats have a Primary Volume Descriptor + located at the beginning of the image, which contains the volume size. + """ + def __init__(self, *a, **k): + super(ISOInspector, self).__init__(*a, **k) + self.new_region('system_area', CaptureRegion(0, 32 * units.Ki)) + self.new_region('header', CaptureRegion(32 * units.Ki, 2 * units.Ki)) + + @property + def format_match(self): + if not self.complete: + return False + signature = self.region('header').data[1:6] + assert len(signature) == 5 + # These signatures are for ISO9660, UDF and UDF. + return signature in (b'CD001', b'NSR02', b'NSR03') + + @property + def virtual_size(self): + if not self.complete: + return 0 + if not self.format_match: + return 0 + # the header size is 2KB or 1 sector + # the first header field is the descriptor type which is 1 byte + # the second field is the standard identifier which is 5 bytes + # the third field is the version which is 1 byte + # the rest of the header contains type specific data is 2041 bytes + # see http://wiki.osdev.org/ISO_9660#The_Primary_Volume_Descriptor + # we need to check that the descriptor type is 1 + # to ensure that this is a primary volume descriptor + descriptor_type = self.region('header').data[0] + if descriptor_type != 1: + return 0 + # The size in bytes of a logical block is stored at offset 128 + # and is 2 bytes long encoded in both little and big endian + # int16_LSB-MSB so the field is 4 bytes long + logical_block_size_data = self.region('header').data[128:132] + assert len(logical_block_size_data) == 4 + # given the encoding we only need to read half the field so we + # can use the first 2 bytes which are the little endian part + # this is normally 2048 or 2KB but we need to check as it can be + # different according to the ISO 9660 standard. + logical_block_size, = struct.unpack('<H', logical_block_size_data[:2]) + # The volume space size is the total number of logical blocks + # and is stored at offset 80 and is 8 bytes long + # as with the logical block size the field is encoded in both + # little and big endian as an int32_LSB-MSB + volume_space_size_data = self.region('header').data[80:88] + assert len(volume_space_size_data) == 8 + # given the encoding we only need to read half the field so we + # can use the first 4 bytes which are the little endian part + volume_space_size, = struct.unpack('<L', volume_space_size_data[:4]) + # the virtual size is the volume space size * logical block size + return volume_space_size * logical_block_size + + def __str__(self): + return 'iso' + + class InfoWrapper(object): """A file-like object that wraps another and updates a format inspector. @@ -891,6 +970,7 @@ ALL_FORMATS = { 'vmdk': VMDKInspector, 'vdi': VDIInspector, 'qed': QEDInspector, + 'iso': ISOInspector, } @@ -906,14 +986,16 @@ def get_inspector(format_name): def detect_file_format(filename): """Attempts to detect the format of a file. - This runs through a file one time, running all the known inspectors in - parallel. It stops reading the file once one of them matches or all of + parallel. It stops reading the file once all of them matches or all of them are sure they don't match. - Returns the FileInspector that matched, if any. None if 'raw'. + :param filename: The path to the file to inspect. + :returns: A FormatInspector instance matching the file. + :raises: ImageFormatError if multiple formats are detected. """ inspectors = {k: v() for k, v in ALL_FORMATS.items()} + detections = [] with open(filename, 'rb') as f: for chunk in chunked_reader(f): for format, inspector in list(inspectors.items()): @@ -925,10 +1007,16 @@ def detect_file_format(filename): continue if (inspector.format_match and inspector.complete and format != 'raw'): - # First complete match (other than raw) wins - return inspector + # record all match (other than raw) + detections.append(inspector) + inspectors.pop(format) if all(i.complete for i in inspectors.values()): # If all the inspectors are sure they are not a match, avoid # reading to the end of the file to settle on 'raw'. break - return inspectors['raw'] + + if len(detections) > 1: + all_formats = [str(inspector) for inspector in detections] + raise ImageFormatError( + 'Multiple formats detected: %s' % ', '.join(all_formats)) + return inspectors['raw'] if not detections else detections[0] diff --git a/glance/tests/unit/async_/flows/plugins/test_image_conversion.py b/glance/tests/unit/async_/flows/plugins/test_image_conversion.py index 28a60a4334..861a36ed07 100644 --- a/glance/tests/unit/async_/flows/plugins/test_image_conversion.py +++ b/glance/tests/unit/async_/flows/plugins/test_image_conversion.py @@ -96,10 +96,10 @@ class TestConvertImageTask(test_utils.BaseTestCase): self.image_id, self.task.task_id) - self.inspector_mock = mock.MagicMock() + self.detect_file_format_mock = mock.MagicMock() self.useFixture(fixtures.MockPatch('glance.common.format_inspector.' - 'get_inspector', - self.inspector_mock)) + 'detect_file_format', + self.detect_file_format_mock)) @mock.patch.object(os, 'stat') @mock.patch.object(os, 'remove') @@ -124,6 +124,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): with mock.patch.object(json, 'loads') as jloads_mock: jloads_mock.return_value = {'format': 'raw', 'virtual-size': 456} + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'raw' + inspector.safety_check.return_value = True image_convert.execute('file:///test/path.raw') # NOTE(hemanthm): Asserting that the source format is passed @@ -138,6 +141,51 @@ class TestConvertImageTask(test_utils.BaseTestCase): self.assertEqual(456, image.virtual_size) self.assertEqual(123, image.size) + @mock.patch('os.remove') + @mock.patch('os.stat') + @mock.patch('oslo_concurrency.processutils.trycmd') + @mock.patch('glance.async_.flows.plugins.image_conversion.LOG') + def test_image_convert_excluded_for_iso(self, mock_log, mock_run, + mock_stat, mock_remove, fmt='iso'): + mock_stat.return_value = mock.MagicMock(stsize=123) + stdout = json.dumps({'format': 'raw'}) + mock_run.return_value = (stdout, '') + image_convert = image_conversion._ConvertImage(self.context, + self.task.task_id, + self.task_type, + self.wrapper, + self.stores) + self.task_repo.get.return_value = self.task + image = mock.MagicMock(image_id=self.image_id, virtual_size=None, + extra_properties={ + 'os_glance_import_task': self.task.task_id}, + disk_format=fmt) + + self.img_repo.get.return_value = image + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'iso' + inspector.safety_check.return_value = True + + image_convert.execute('file:///test/path.iso') + self.assertEqual(fmt, image.disk_format) + mock_log.debug.assert_called_once_with( + "Avoiding conversion of an image %s having " + "`iso` disk format.", self.image_id) + + def test_image_convert_iso_lie_raw(self): + e = self.assertRaises( + RuntimeError, self.test_image_convert_excluded_for_iso, + fmt='raw') + self.assertEqual('Image has disallowed configuration', + str(e)) + + def test_image_convert_iso_lie_qcow2(self): + e = self.assertRaises( + RuntimeError, self.test_image_convert_excluded_for_iso, + fmt='qcow2') + self.assertEqual('Image has disallowed configuration', + str(e)) + def _setup_image_convert_info_fail(self, disk_format='qcow2'): image_convert = image_conversion._ConvertImage(self.context, self.task.task_id, @@ -156,6 +204,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): def test_image_convert_fails_inspection(self): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True exc_mock.side_effect = OSError('fail') self.assertRaises(OSError, convert.execute, 'file:///test/path.raw') @@ -174,6 +225,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: exc_mock.return_value = '', 'some error' + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True self.assertRaises(RuntimeError, convert.execute, 'file:///test/path.raw') exc_mock.assert_called_once_with( @@ -193,6 +247,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True exc_mock.return_value = json.dumps(data), '' e = self.assertRaises(RuntimeError, convert.execute, 'file:///test/path.qcow') @@ -210,6 +267,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: exc_mock.return_value = json.dumps(data), '' + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True e = self.assertRaises(RuntimeError, convert.execute, 'file:///test/path.qcow') self.assertEqual('QCOW images with data-file set are not allowed', @@ -217,14 +277,13 @@ class TestConvertImageTask(test_utils.BaseTestCase): def test_image_convert_no_inspector_match(self): convert = self._setup_image_convert_info_fail() - self.inspector_mock.return_value = None self.assertRaisesRegex(RuntimeError, - 'Unable to convert from format', + 'Image format mismatch', convert.execute, 'file:///test/path.hpfs') def test_image_convert_fails_inspection_safety_check(self): convert = self._setup_image_convert_info_fail() - inspector = self.inspector_mock.return_value.from_file.return_value + inspector = self.detect_file_format_mock.return_value inspector.safety_check.return_value = False self.assertRaisesRegex(RuntimeError, 'Image has disallowed configuration', @@ -232,7 +291,7 @@ class TestConvertImageTask(test_utils.BaseTestCase): def test_image_convert_fails_inspection_format_check(self): convert = self._setup_image_convert_info_fail() - self.inspector_mock.return_value.from_file.side_effect = ( + self.detect_file_format_mock.side_effect = ( format_inspector.ImageFormatError()) self.assertRaisesRegex(RuntimeError, 'Image format detection failed', @@ -240,7 +299,7 @@ class TestConvertImageTask(test_utils.BaseTestCase): def test_image_convert_fails_inspection_error(self): convert = self._setup_image_convert_info_fail() - self.inspector_mock.return_value.from_file.side_effect = ValueError + self.detect_file_format_mock.side_effect = ValueError self.assertRaisesRegex(RuntimeError, 'Unable to inspect image', convert.execute, 'file:///test/path.qcow') @@ -254,6 +313,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): convert = self._setup_image_convert_info_fail(disk_format='vmdk') with mock.patch.object(processutils, 'execute') as exc_mock: + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'vmdk' + inspector.safety_check.return_value = True exc_mock.return_value = json.dumps(data), '' convert.execute('file:///test/path.vmdk') @@ -283,6 +345,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): def test_image_convert_fails(self): convert = self._setup_image_convert_info_fail(disk_format='raw') with mock.patch.object(processutils, 'execute') as exc_mock: + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'raw' + inspector.safety_check.return_value = True exc_mock.side_effect = [('{"format":"raw"}', ''), OSError('convert_fail')] self.assertRaises(OSError, @@ -306,6 +371,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): with mock.patch.object(processutils, 'execute') as exc_mock: exc_mock.side_effect = [('{"format":"raw"}', ''), ('', 'some error')] + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'raw' + inspector.safety_check.return_value = True self.assertRaises(RuntimeError, convert.execute, 'file:///test/path.raw') exc_mock.assert_has_calls( @@ -326,6 +394,9 @@ class TestConvertImageTask(test_utils.BaseTestCase): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: exc_mock.return_value = ('{}', '') + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True exc = self.assertRaises(RuntimeError, convert.execute, 'file:///test/path.raw') self.assertIn('Image metadata disagrees about format', str(exc)) @@ -340,11 +411,25 @@ class TestConvertImageTask(test_utils.BaseTestCase): # Make sure we did not update the image self.img_repo.save.assert_not_called() + def test_image_convert_source_format_inspection_not_match(self): + convert = self._setup_image_convert_info_fail(disk_format="raw") + with mock.patch.object(processutils, 'execute') as exc_mock: + exc_mock.return_value = ( + '{"format": "raw", "virtual-size": 123}', '') + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + exc = self.assertRaises(RuntimeError, + convert.execute, 'file:///test/path.raw') + self.assertIn('Image format mismatch', str(exc)) + def test_image_convert_same_format_does_nothing(self): convert = self._setup_image_convert_info_fail() with mock.patch.object(processutils, 'execute') as exc_mock: exc_mock.return_value = ( '{"format": "qcow2", "virtual-size": 123}', '') + inspector = self.detect_file_format_mock.return_value + inspector.__str__.return_value = 'qcow2' + inspector.safety_check.return_value = True convert.execute('file:///test/path.qcow') # Make sure we only called qemu-img for inspection, not conversion exc_mock.assert_called_once_with( diff --git a/glance/tests/unit/common/test_format_inspector.py b/glance/tests/unit/common/test_format_inspector.py index 308d1ce121..421c561004 100644 --- a/glance/tests/unit/common/test_format_inspector.py +++ b/glance/tests/unit/common/test_format_inspector.py @@ -27,6 +27,9 @@ from glance.common import format_inspector from glance.tests import utils as test_utils +TEST_IMAGE_PREFIX = 'glance-unittest-formatinspector-' + + def get_size_from_qemu_img(filename): output = subprocess.check_output('qemu-img info "%s"' % filename, shell=True) @@ -51,8 +54,96 @@ class TestFormatInspectors(test_utils.BaseTestCase): except Exception: pass + def _create_iso(self, image_size, subformat='9660'): + """Create an ISO file of the given size. + :param image_size: The size of the image to create in bytes + :param subformat: The subformat to use, if any + """ + # these tests depend on mkisofs + # being installed and in the path, + # if it is not installed, skip + try: + subprocess.check_output('mkisofs --version', shell=True) + except Exception: + self.skipTest('mkisofs not installed') + size = image_size // units.Mi + base_cmd = "mkisofs" + if subformat == 'udf': + # depending on the distribution mkisofs may not support udf + # and may be provided by genisoimage instead. As a result we + # need to check if the command supports udf via help + # instead of checking the installed version. + # mkisofs --help outputs to stderr so we need to + # redirect it to stdout to use grep. + try: + subprocess.check_output( + 'mkisofs --help 2>&1 | grep udf', shell=True) + except Exception: + self.skipTest('mkisofs does not support udf format') + base_cmd += " -udf" + prefix = TEST_IMAGE_PREFIX + prefix += '-%s-' % subformat + fn = tempfile.mktemp(prefix=prefix, suffix='.iso') + self._created_files.append(fn) + subprocess.check_output( + 'dd if=/dev/zero of=%s bs=1M count=%i' % (fn, size), + shell=True) + subprocess.check_output( + '%s -V "TEST" -o %s %s' % (base_cmd, fn, fn), + shell=True) + return fn + + def _generate_bad_iso(self): + # we want to emulate a malicious user who uploads a an + # ISO file has a qcow2 header in the system area + # of the ISO file + # we will create a qcow2 image and an ISO file + # and then copy the qcow2 header to the ISO file + # e.g. + # mkisofs -o orig.iso /etc/resolv.conf + # qemu-img create orig.qcow2 -f qcow2 64M + # dd if=orig.qcow2 of=outcome bs=32K count=1 + # dd if=orig.iso of=outcome bs=32K skip=1 seek=1 + qcow = self._create_img('qcow2', 10 * units.Mi) + iso = self._create_iso(64 * units.Mi, subformat='9660') + # first ensure the files are valid + iso_fmt = self._test_format_at_block_size('iso', iso, 4 * units.Ki) + self.assertTrue(iso_fmt.format_match) + qcow_fmt = self._test_format_at_block_size('qcow2', qcow, 4 * units.Ki) + self.assertTrue(qcow_fmt.format_match) + # now copy the qcow2 header to an ISO file + prefix = TEST_IMAGE_PREFIX + prefix += '-bad-' + fn = tempfile.mktemp(prefix=prefix, suffix='.iso') + self._created_files.append(fn) + subprocess.check_output( + 'dd if=%s of=%s bs=32K count=1' % (qcow, fn), + shell=True) + subprocess.check_output( + 'dd if=%s of=%s bs=32K skip=1 seek=1' % (iso, fn), + shell=True) + return qcow, iso, fn + def _create_img(self, fmt, size, subformat=None, options=None, backing_file=None): + """Create an image file of the given format and size. + :param fmt: The format to create + :param size: The size of the image to create in bytes + :param subformat: The subformat to use, if any + :param options: A dictionary of options to pass to the format + :param backing_file: The backing file to use, if any + """ + + if fmt == 'iso': + return self._create_iso(size, subformat) + # these tests depend on qemu-img + # being installed and in the path, + # if it is not installed, skip + try: + subprocess.check_output('qemu-img --version', shell=True) + except Exception: + self.skipTest('qemu-img not installed') + if fmt == 'vhd': # QEMU calls the vhd format vpc fmt = 'vpc' @@ -60,7 +151,7 @@ class TestFormatInspectors(test_utils.BaseTestCase): if options is None: options = {} opt = '' - prefix = 'glance-unittest-formatinspector-' + prefix = TEST_IMAGE_PREFIX if subformat: options['subformat'] = subformat @@ -90,7 +181,8 @@ class TestFormatInspectors(test_utils.BaseTestCase): # Matches qemu-img default, see `qemu-img convert -O vmdk -o help` subformat = 'monolithicSparse' - prefix = 'glance-unittest-formatinspector-%s-' % subformat + prefix = TEST_IMAGE_PREFIX + prefix += '-%s-' % subformat fn = tempfile.mktemp(prefix=prefix, suffix='.vmdk') self._created_files.append(fn) raw = tempfile.mktemp(prefix=prefix, suffix='.raw') @@ -126,6 +218,12 @@ class TestFormatInspectors(test_utils.BaseTestCase): def _test_format_at_image_size(self, format_name, image_size, subformat=None): + """Test the format inspector for the given format at the + given image size. + :param format_name: The format to test + :param image_size: The size of the image to create in bytes + :param subformat: The subformat to use, if any + """ img = self._create_img(format_name, image_size, subformat=subformat) # Some formats have internal alignment restrictions making this not @@ -134,7 +232,15 @@ class TestFormatInspectors(test_utils.BaseTestCase): # Read the format in various sizes, some of which will read whole # sections in a single read, others will be completely unaligned, etc. - for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi): + block_sizes = [64 * units.Ki, 1 * units.Mi] + # ISO images have a 32KB system area at the beginning of the image + # as a result reading that in 17 or 512 byte blocks takes too long, + # causing the test to fail. The 64KiB block size is enough to read + # the system area and header in a single read. the 1MiB block size + # adds very little time to the test so we include it. + if format_name != 'iso': + block_sizes.extend([17, 512]) + for block_size in block_sizes: fmt = self._test_format_at_block_size(format_name, img, block_size) self.assertTrue(fmt.format_match, 'Failed to match %s at size %i block %i' % ( @@ -158,6 +264,31 @@ class TestFormatInspectors(test_utils.BaseTestCase): def test_qcow2(self): self._test_format('qcow2') + def test_iso_9660(self): + self._test_format('iso', subformat='9660') + + def test_udf(self): + self._test_format('iso', subformat='udf') + + def test_bad_iso_qcow2(self): + _, _, fn = self._generate_bad_iso() + iso_check = self._test_format_at_block_size('iso', fn, 4 * units.Ki) + qcow_check = self._test_format_at_block_size('qcow2', fn, 4 * units.Ki) + # this system area of the ISO file is not considered part of the format + # the qcow2 header is in the system area of the ISO file + # so the ISO file is still valid + self.assertTrue(iso_check.format_match) + # the qcow2 header is in the system area of the ISO file + # but that will be parsed by the qcow2 format inspector + # and it will match + self.assertTrue(qcow_check.format_match) + # if we call format_inspector.detect_file_format it should detect + # and raise an exception because both match internally. + e = self.assertRaises( + format_inspector.ImageFormatError, + format_inspector.detect_file_format, fn) + self.assertIn('Multiple formats detected', str(e)) + def test_vhd(self): self._test_format('vhd') diff --git a/glance/tests/unit/test_store_image.py b/glance/tests/unit/test_store_image.py index 01b1fc4ff9..bfae0b90c6 100644 --- a/glance/tests/unit/test_store_image.py +++ b/glance/tests/unit/test_store_image.py @@ -44,7 +44,7 @@ class ImageRepoStub(object): class ImageStub(object): def __init__(self, image_id, status=None, locations=None, - visibility=None, extra_properties=None): + visibility=None, extra_properties=None, virtual_size=0): self.image_id = image_id self.status = status self.locations = locations or [] @@ -56,7 +56,7 @@ class ImageStub(object): self.checksum = None self.disk_format = 'raw' self.container_format = 'bare' - self.virtual_size = 0 + self.virtual_size = virtual_size def delete(self): self.status = 'deleted' @@ -112,7 +112,8 @@ class TestStoreMultiBackends(utils.BaseTestCase): 'img_signature': 'VALID' } image_stub = ImageStub(UUID2, status='queued', locations=[], - extra_properties=extra_properties) + extra_properties=extra_properties, + virtual_size=4) image_stub.disk_format = 'iso' image = glance.location.ImageProxy(image_stub, context, self.store_api, self.store_utils) diff --git a/glance/tests/unit/utils.py b/glance/tests/unit/utils.py index 16640a9c1c..b72032aab3 100644 --- a/glance/tests/unit/utils.py +++ b/glance/tests/unit/utils.py @@ -265,7 +265,7 @@ class FakeStoreAPI(object): # 'data' is a string wrapped in a LimitingReader|CooperativeReader # pipeline, so peek under the hood of those objects to get at the # string itself. - size = len(data.data.fd) + size = len(data.data.fd._source) if (current_store_size + size) > store_max_size: raise exception.StorageFull() if context.user_id == USER2: