Merge "Add file format detection to format_inspector" into stable/2024.1
This commit is contained in:
commit
77da27e2a2
@ -883,13 +883,7 @@ class InfoWrapper(object):
|
||||
self._source.close()
|
||||
|
||||
|
||||
def get_inspector(format_name):
|
||||
"""Returns a FormatInspector class based on the given name.
|
||||
|
||||
:param format_name: The name of the disk_format (raw, qcow2, etc).
|
||||
:returns: A FormatInspector or None if unsupported.
|
||||
"""
|
||||
formats = {
|
||||
ALL_FORMATS = {
|
||||
'raw': FileInspector,
|
||||
'qcow2': QcowInspector,
|
||||
'vhd': VHDInspector,
|
||||
@ -897,6 +891,44 @@ def get_inspector(format_name):
|
||||
'vmdk': VMDKInspector,
|
||||
'vdi': VDIInspector,
|
||||
'qed': QEDInspector,
|
||||
}
|
||||
}
|
||||
|
||||
return formats.get(format_name)
|
||||
|
||||
def get_inspector(format_name):
|
||||
"""Returns a FormatInspector class based on the given name.
|
||||
|
||||
:param format_name: The name of the disk_format (raw, qcow2, etc).
|
||||
:returns: A FormatInspector or None if unsupported.
|
||||
"""
|
||||
|
||||
return ALL_FORMATS.get(format_name)
|
||||
|
||||
|
||||
def detect_file_format(filename):
|
||||
"""Attempts to detect the format of a file.
|
||||
|
||||
This runs through a file one time, running all the known inspectors in
|
||||
parallel. It stops reading the file once one of them matches or all of
|
||||
them are sure they don't match.
|
||||
|
||||
Returns the FileInspector that matched, if any. None if 'raw'.
|
||||
"""
|
||||
inspectors = {k: v() for k, v in ALL_FORMATS.items()}
|
||||
with open(filename, 'rb') as f:
|
||||
for chunk in chunked_reader(f):
|
||||
for format, inspector in list(inspectors.items()):
|
||||
try:
|
||||
inspector.eat_chunk(chunk)
|
||||
except ImageFormatError:
|
||||
# No match, so stop considering this format
|
||||
inspectors.pop(format)
|
||||
continue
|
||||
if (inspector.format_match and inspector.complete and
|
||||
format != 'raw'):
|
||||
# First complete match (other than raw) wins
|
||||
return inspector
|
||||
if all(i.complete for i in inspectors.values()):
|
||||
# If all the inspectors are sure they are not a match, avoid
|
||||
# reading to the end of the file to settle on 'raw'.
|
||||
break
|
||||
return inspectors['raw']
|
||||
|
@ -313,62 +313,6 @@ class TestFormatInspectors(test_utils.BaseTestCase):
|
||||
data[0x4F] = 0x80
|
||||
self.assertTrue(inspector.has_unknown_features)
|
||||
|
||||
def test_vmdk_safety_checks(self):
|
||||
region = format_inspector.CaptureRegion(0, 0)
|
||||
inspector = format_inspector.VMDKInspector()
|
||||
inspector.new_region('descriptor', region)
|
||||
|
||||
# This should be a legit VMDK descriptor which comments, blank lines,
|
||||
# an extent, some ddb content, and some header values.
|
||||
legit_desc = ['# This is a comment',
|
||||
'',
|
||||
' ',
|
||||
'createType=monolithicSparse',
|
||||
'RW 1234 SPARSE "foo.vmdk"',
|
||||
'ddb.adapterType = "MFM',
|
||||
'# EOF']
|
||||
region.data = ('\n'.join(legit_desc)).encode('ascii')
|
||||
region.length = len(region.data)
|
||||
self.assertTrue(inspector.safety_check())
|
||||
|
||||
# Any of these lines should trigger an error indicating that there is
|
||||
# something in the descriptor we don't understand
|
||||
bad_lines = [
|
||||
'#\U0001F4A9',
|
||||
'header Name=foo',
|
||||
'foo bar',
|
||||
'WR 123 SPARSE "foo.vmdk"',
|
||||
]
|
||||
|
||||
for bad_line in bad_lines:
|
||||
# Encode as UTF-8 purely so we can test that anything non-ASCII
|
||||
# will trigger the decode check
|
||||
region.data = bad_line.encode('utf-8')
|
||||
region.length = len(region.data)
|
||||
self.assertRaisesRegex(format_inspector.ImageFormatError,
|
||||
'Invalid VMDK descriptor',
|
||||
inspector.safety_check)
|
||||
|
||||
# Extents with slashes in the name fail the safety check
|
||||
region.data = b'RW 123 SPARSE "/etc/shadow"'
|
||||
region.length = len(region.data)
|
||||
self.assertFalse(inspector.safety_check())
|
||||
|
||||
# A descriptor that specifies no extents fails the safety check
|
||||
region.data = b'# Nothing'
|
||||
region.length = len(region.data)
|
||||
self.assertFalse(inspector.safety_check())
|
||||
|
||||
def test_vmdk_reject_footer(self):
|
||||
data = struct.pack('<4sIIQQQQIQQ', b'KDMV', 3, 0, 0, 0, 0, 1, 0, 0,
|
||||
format_inspector.VMDKInspector.GD_AT_END)
|
||||
inspector = format_inspector.VMDKInspector()
|
||||
inspector.region('header').data = data
|
||||
inspector.region('header').length = len(data)
|
||||
self.assertRaisesRegex(format_inspector.ImageFormatError,
|
||||
'footer',
|
||||
inspector.post_process)
|
||||
|
||||
def test_vdi(self):
|
||||
self._test_format('vdi')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user