Merge "Add VMDK safety check" into stable/2024.1

This commit is contained in:
Zuul 2024-07-04 20:43:15 +00:00 committed by Gerrit Code Review
commit 8cde4adb90
2 changed files with 118 additions and 2 deletions

View File

@ -650,6 +650,7 @@ class VMDKInspector(FileInspector):
# at 0x200 and 1MB - 1
DESC_OFFSET = 0x200
DESC_MAX_SIZE = (1 << 20) - 1
GD_AT_END = 0xffffffffffffffff
def __init__(self, *a, **k):
super(VMDKInspector, self).__init__(*a, **k)
@ -662,8 +663,9 @@ class VMDKInspector(FileInspector):
if not self.region('header').complete:
return
sig, ver, _flags, _sectors, _grain, desc_sec, desc_num = struct.unpack(
'<4sIIQQQQ', self.region('header').data[:44])
(sig, ver, _flags, _sectors, _grain, desc_sec, desc_num,
_numGTEsperGT, _rgdOffset, gdOffset) = struct.unpack(
'<4sIIQQQQIQQ', self.region('header').data[:64])
if sig != b'KDMV':
raise ImageFormatError('Signature KDMV not found: %r' % sig)
@ -671,6 +673,11 @@ class VMDKInspector(FileInspector):
if ver not in (1, 2, 3):
raise ImageFormatError('Unsupported format version %i' % ver)
if gdOffset == self.GD_AT_END:
# This means we have a footer, which takes precedence over the
# header, which we cannot support since we stream.
raise ImageFormatError('Unsupported VMDK footer')
# Since we parse both desc_sec and desc_num (the location of the
# VMDK's descriptor, expressed in 512 bytes sectors) we enforce a
# check on the bounds to create a reasonable CaptureRegion. This
@ -718,6 +725,59 @@ class VMDKInspector(FileInspector):
return sectors * 512
def safety_check(self):
if (not self.has_region('descriptor') or
not self.region('descriptor').complete):
return False
try:
# Descriptor is padded to 512 bytes
desc_data = self.region('descriptor').data.rstrip(b'\x00')
# Descriptor is actually case-insensitive ASCII text
desc_text = desc_data.decode('ascii').lower()
except UnicodeDecodeError:
LOG.error('VMDK descriptor failed to decode as ASCII')
raise ImageFormatError('Invalid VMDK descriptor data')
extent_access = ('rw', 'rdonly', 'noaccess')
header_fields = []
extents = []
ddb = []
# NOTE(danms): Cautiously parse the VMDK descriptor. Each line must
# be something we understand, otherwise we refuse it.
for line in [x.strip() for x in desc_text.split('\n')]:
if line.startswith('#') or not line:
# Blank or comment lines are ignored
continue
elif line.startswith('ddb'):
# DDB lines are allowed (but not used by us)
ddb.append(line)
elif '=' in line and ' ' not in line.split('=')[0]:
# Header fields are a single word followed by an '=' and some
# value
header_fields.append(line)
elif line.split(' ')[0] in extent_access:
# Extent lines start with one of the three access modes
extents.append(line)
else:
# Anything else results in a rejection
LOG.error('Unsupported line %r in VMDK descriptor', line)
raise ImageFormatError('Invalid VMDK descriptor data')
# Check all the extent lines for concerning content
for extent_line in extents:
if '/' in extent_line:
LOG.error('Extent line %r contains unsafe characters',
extent_line)
return False
if not extents:
LOG.error('VMDK file specified no extents')
return False
return True
def __str__(self):
return 'vmdk'

View File

@ -307,6 +307,62 @@ class TestFormatInspectors(test_utils.BaseTestCase):
data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features)
def test_vmdk_safety_checks(self):
region = format_inspector.CaptureRegion(0, 0)
inspector = format_inspector.VMDKInspector()
inspector.new_region('descriptor', region)
# This should be a legit VMDK descriptor which comments, blank lines,
# an extent, some ddb content, and some header values.
legit_desc = ['# This is a comment',
'',
' ',
'createType=monolithicSparse',
'RW 1234 SPARSE "foo.vmdk"',
'ddb.adapterType = "MFM',
'# EOF']
region.data = ('\n'.join(legit_desc)).encode('ascii')
region.length = len(region.data)
self.assertTrue(inspector.safety_check())
# Any of these lines should trigger an error indicating that there is
# something in the descriptor we don't understand
bad_lines = [
'#\U0001F4A9',
'header Name=foo',
'foo bar',
'WR 123 SPARSE "foo.vmdk"',
]
for bad_line in bad_lines:
# Encode as UTF-8 purely so we can test that anything non-ASCII
# will trigger the decode check
region.data = bad_line.encode('utf-8')
region.length = len(region.data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'Invalid VMDK descriptor',
inspector.safety_check)
# Extents with slashes in the name fail the safety check
region.data = b'RW 123 SPARSE "/etc/shadow"'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
# A descriptor that specifies no extents fails the safety check
region.data = b'# Nothing'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())
def test_vmdk_reject_footer(self):
data = struct.pack('<4sIIQQQQIQQ', b'KDMV', 3, 0, 0, 0, 0, 1, 0, 0,
format_inspector.VMDKInspector.GD_AT_END)
inspector = format_inspector.VMDKInspector()
inspector.region('header').data = data
inspector.region('header').length = len(data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'footer',
inspector.post_process)
def test_vdi(self):
self._test_format('vdi')