Merge "Add file format detection to format_inspector"

This commit is contained in:
Zuul 2024-07-04 09:46:24 +00:00 committed by Gerrit Code Review
commit 0d8e79b713
2 changed files with 42 additions and 66 deletions

View File

@ -883,20 +883,52 @@ class InfoWrapper(object):
self._source.close()
ALL_FORMATS = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}
def get_inspector(format_name):
"""Returns a FormatInspector class based on the given name.
:param format_name: The name of the disk_format (raw, qcow2, etc).
:returns: A FormatInspector or None if unsupported.
"""
formats = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}
return formats.get(format_name)
return ALL_FORMATS.get(format_name)
def detect_file_format(filename):
"""Attempts to detect the format of a file.
This runs through a file one time, running all the known inspectors in
parallel. It stops reading the file once one of them matches or all of
them are sure they don't match.
Returns the FileInspector that matched, if any. None if 'raw'.
"""
inspectors = {k: v() for k, v in ALL_FORMATS.items()}
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
for format, inspector in list(inspectors.items()):
try:
inspector.eat_chunk(chunk)
except ImageFormatError:
# No match, so stop considering this format
inspectors.pop(format)
continue
if (inspector.format_match and inspector.complete and
format != 'raw'):
# First complete match (other than raw) wins
return inspector
if all(i.complete for i in inspectors.values()):
# If all the inspectors are sure they are not a match, avoid
# reading to the end of the file to settle on 'raw'.
break
return inspectors['raw']

View File

@ -313,62 +313,6 @@ class TestFormatInspectors(test_utils.BaseTestCase):
data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features)
def test_vmdk_safety_checks(self):
region = format_inspector.CaptureRegion(0, 0)
inspector = format_inspector.VMDKInspector()
inspector.new_region('descriptor', region)
# This should be a legit VMDK descriptor which comments, blank lines,
# an extent, some ddb content, and some header values.
legit_desc = ['# This is a comment',
'',
' ',
'createType=monolithicSparse',
'RW 1234 SPARSE "foo.vmdk"',
'ddb.adapterType = "MFM',
'# EOF']
region.data = ('\n'.join(legit_desc)).encode('ascii')
region.length = len(region.data)
self.assertTrue(inspector.safety_check())
# Any of these lines should trigger an error indicating that there is
# something in the descriptor we don't understand
bad_lines = [
'#\U0001F4A9',
'header Name=foo',
'foo bar',
'WR 123 SPARSE "foo.vmdk"',
]
for bad_line in bad_lines:
# Encode as UTF-8 purely so we can test that anything non-ASCII
# will trigger the decode check
region.data = bad_line.encode('utf-8')
region.length = len(region.data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'Invalid VMDK descriptor',
inspector.safety_check)
# Extents with slashes in the name fail the safety check
region.data = b'RW 123 SPARSE "/etc/shadow"'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
# A descriptor that specifies no extents fails the safety check
region.data = b'# Nothing'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
def test_vmdk_reject_footer(self):
data = struct.pack('<4sIIQQQQIQQ', b'KDMV', 3, 0, 0, 0, 0, 1, 0, 0,
format_inspector.VMDKInspector.GD_AT_END)
inspector = format_inspector.VMDKInspector()
inspector.region('header').data = data
inspector.region('header').length = len(data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'footer',
inspector.post_process)
def test_vdi(self):
self._test_format('vdi')