Add file format detection to format_inspector

Change-Id: If0a4251465507be035ffaf9d855299611637cfa9
(cherry picked from commit 79271eaa5c742a1741321198c43807857fb6ed94)
This commit is contained in:
Dan Smith 2024-04-18 08:25:24 -07:00 committed by Abhishek Kekane
parent 83acfc4620
commit 26e5b8551a
2 changed files with 42 additions and 66 deletions

View File

@ -883,20 +883,52 @@ class InfoWrapper(object):
self._source.close() self._source.close()
ALL_FORMATS = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}
def get_inspector(format_name): def get_inspector(format_name):
"""Returns a FormatInspector class based on the given name. """Returns a FormatInspector class based on the given name.
:param format_name: The name of the disk_format (raw, qcow2, etc). :param format_name: The name of the disk_format (raw, qcow2, etc).
:returns: A FormatInspector or None if unsupported. :returns: A FormatInspector or None if unsupported.
""" """
formats = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}
return formats.get(format_name) return ALL_FORMATS.get(format_name)
def detect_file_format(filename):
"""Attempts to detect the format of a file.
This runs through a file one time, running all the known inspectors in
parallel. It stops reading the file once one of them matches or all of
them are sure they don't match.
Returns the FileInspector that matched, if any. None if 'raw'.
"""
inspectors = {k: v() for k, v in ALL_FORMATS.items()}
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
for format, inspector in list(inspectors.items()):
try:
inspector.eat_chunk(chunk)
except ImageFormatError:
# No match, so stop considering this format
inspectors.pop(format)
continue
if (inspector.format_match and inspector.complete and
format != 'raw'):
# First complete match (other than raw) wins
return inspector
if all(i.complete for i in inspectors.values()):
# If all the inspectors are sure they are not a match, avoid
# reading to the end of the file to settle on 'raw'.
break
return inspectors['raw']

View File

@ -313,62 +313,6 @@ class TestFormatInspectors(test_utils.BaseTestCase):
data[0x4F] = 0x80 data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features) self.assertTrue(inspector.has_unknown_features)
def test_vmdk_safety_checks(self):
region = format_inspector.CaptureRegion(0, 0)
inspector = format_inspector.VMDKInspector()
inspector.new_region('descriptor', region)
# This should be a legit VMDK descriptor which comments, blank lines,
# an extent, some ddb content, and some header values.
legit_desc = ['# This is a comment',
'',
' ',
'createType=monolithicSparse',
'RW 1234 SPARSE "foo.vmdk"',
'ddb.adapterType = "MFM',
'# EOF']
region.data = ('\n'.join(legit_desc)).encode('ascii')
region.length = len(region.data)
self.assertTrue(inspector.safety_check())
# Any of these lines should trigger an error indicating that there is
# something in the descriptor we don't understand
bad_lines = [
'#\U0001F4A9',
'header Name=foo',
'foo bar',
'WR 123 SPARSE "foo.vmdk"',
]
for bad_line in bad_lines:
# Encode as UTF-8 purely so we can test that anything non-ASCII
# will trigger the decode check
region.data = bad_line.encode('utf-8')
region.length = len(region.data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'Invalid VMDK descriptor',
inspector.safety_check)
# Extents with slashes in the name fail the safety check
region.data = b'RW 123 SPARSE "/etc/shadow"'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
# A descriptor that specifies no extents fails the safety check
region.data = b'# Nothing'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
def test_vmdk_reject_footer(self):
data = struct.pack('<4sIIQQQQIQQ', b'KDMV', 3, 0, 0, 0, 0, 1, 0, 0,
format_inspector.VMDKInspector.GD_AT_END)
inspector = format_inspector.VMDKInspector()
inspector.region('header').data = data
inspector.region('header').length = len(data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'footer',
inspector.post_process)
def test_vdi(self): def test_vdi(self):
self._test_format('vdi') self._test_format('vdi')