Merge "Extend format_inspector for QCOW safety" into stable/2024.1

This commit is contained in:
Zuul 2024-07-04 20:43:13 +00:00 committed by Gerrit Code Review
commit 7ffc392d61
2 changed files with 207 additions and 5 deletions

View File

@ -28,6 +28,14 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def chunked_reader(fileobj, chunk_size=512):
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
class CaptureRegion(object): class CaptureRegion(object):
"""Represents a region of a file we want to capture. """Represents a region of a file we want to capture.
@ -176,10 +184,16 @@ class FileInspector(object):
@property @property
def actual_size(self): def actual_size(self):
"""Returns the total size of the file, usually smaller than """Returns the total size of the file, usually smaller than
virtual_size. virtual_size. NOTE: this will only be accurate if the entire
file is read and processed.
""" """
return self._total_count return self._total_count
@property
def complete(self):
"""Returns True if we have all the information needed."""
return all(r.complete for r in self._capture_regions.values())
def __str__(self): def __str__(self):
"""The string name of this file format.""" """The string name of this file format."""
return 'raw' return 'raw'
@ -194,6 +208,35 @@ class FileInspector(object):
return {name: len(region.data) for name, region in return {name: len(region.data) for name, region in
self._capture_regions.items()} self._capture_regions.items()}
@classmethod
def from_file(cls, filename):
"""Read as much of a file as necessary to complete inspection.
NOTE: Because we only read as much of the file as necessary, the
actual_size property will not reflect the size of the file, but the
amount of data we read before we satisfied the inspector.
Raises ImageFormatError if we cannot parse the file.
"""
inspector = cls()
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
inspector.eat_chunk(chunk)
if inspector.complete:
# No need to eat any more data
break
if not inspector.complete or not inspector.format_match:
raise ImageFormatError('File is not in requested format')
return inspector
def safety_check(self):
"""Perform some checks to determine if this file is safe.
Returns True if safe, False otherwise. It may raise ImageFormatError
if safety cannot be guaranteed because of parsing or other errors.
"""
return True
# The qcow2 format consists of a big-endian 72-byte header, of which # The qcow2 format consists of a big-endian 72-byte header, of which
# only a small portion has information we care about: # only a small portion has information we care about:
@ -202,15 +245,26 @@ class FileInspector(object):
# 0 0x00 Magic 4-bytes 'QFI\xfb' # 0 0x00 Magic 4-bytes 'QFI\xfb'
# 4 0x04 Version (uint32_t, should always be 2 for modern files) # 4 0x04 Version (uint32_t, should always be 2 for modern files)
# . . . # . . .
# 8 0x08 Backing file offset (uint64_t)
# 24 0x18 Size in bytes (unint64_t) # 24 0x18 Size in bytes (unint64_t)
# . . .
# 72 0x48 Incompatible features bitfield (6 bytes)
# #
# https://people.gnome.org/~markmc/qcow-image-format.html # https://gitlab.com/qemu-project/qemu/-/blob/master/docs/interop/qcow2.txt
class QcowInspector(FileInspector): class QcowInspector(FileInspector):
"""QEMU QCOW2 Format """QEMU QCOW2 Format
This should only require about 32 bytes of the beginning of the file This should only require about 32 bytes of the beginning of the file
to determine the virtual size. to determine the virtual size, and 104 bytes to perform the safety check.
""" """
BF_OFFSET = 0x08
BF_OFFSET_LEN = 8
I_FEATURES = 0x48
I_FEATURES_LEN = 8
I_FEATURES_DATAFILE_BIT = 3
I_FEATURES_MAX_BIT = 4
def __init__(self, *a, **k): def __init__(self, *a, **k):
super(QcowInspector, self).__init__(*a, **k) super(QcowInspector, self).__init__(*a, **k)
self.new_region('header', CaptureRegion(0, 512)) self.new_region('header', CaptureRegion(0, 512))
@ -220,6 +274,10 @@ class QcowInspector(FileInspector):
struct.unpack('>4sIQIIQ', self.region('header').data[:32])) struct.unpack('>4sIQIIQ', self.region('header').data[:32]))
return magic, size return magic, size
@property
def has_header(self):
return self.region('header').complete
@property @property
def virtual_size(self): def virtual_size(self):
if not self.region('header').complete: if not self.region('header').complete:
@ -236,9 +294,77 @@ class QcowInspector(FileInspector):
magic, size = self._qcow_header_data() magic, size = self._qcow_header_data()
return magic == b'QFI\xFB' return magic == b'QFI\xFB'
@property
def has_backing_file(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
bf_offset_bytes = self.region('header').data[
self.BF_OFFSET:self.BF_OFFSET + self.BF_OFFSET_LEN]
# nonzero means "has a backing file"
bf_offset, = struct.unpack('>Q', bf_offset_bytes)
return bf_offset != 0
@property
def has_unknown_features(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
i_features = self.region('header').data[
self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
# This is the maximum byte number we should expect any bits to be set
max_byte = self.I_FEATURES_MAX_BIT // 8
# The flag bytes are in big-endian ordering, so if we process
# them in index-order, they're reversed
for i, byte_num in enumerate(reversed(range(self.I_FEATURES_LEN))):
if byte_num == max_byte:
# If we're in the max-allowed byte, allow any bits less than
# the maximum-known feature flag bit to be set
allow_mask = ((1 << self.I_FEATURES_MAX_BIT) - 1)
elif byte_num > max_byte:
# If we're above the byte with the maximum known feature flag
# bit, then we expect all zeroes
allow_mask = 0x0
else:
# Any earlier-than-the-maximum byte can have any of the flag
# bits set
allow_mask = 0xFF
if i_features[i] & ~allow_mask:
LOG.warning('Found unknown feature bit in byte %i: %s/%s',
byte_num, bin(i_features[byte_num] & ~allow_mask),
bin(allow_mask))
return True
return False
@property
def has_data_file(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
i_features = self.region('header').data[
self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
# First byte of bitfield, which is i_features[7]
byte = self.I_FEATURES_LEN - 1 - self.I_FEATURES_DATAFILE_BIT // 8
# Third bit of bitfield, which is 0x04
bit = 1 << (self.I_FEATURES_DATAFILE_BIT - 1 % 8)
return bool(i_features[byte] & bit)
def __str__(self): def __str__(self):
return 'qcow2' return 'qcow2'
def safety_check(self):
return (not self.has_backing_file and
not self.has_data_file and
not self.has_unknown_features)
# The VHD (or VPC as QEMU calls it) format consists of a big-endian # The VHD (or VPC as QEMU calls it) format consists of a big-endian
# 512-byte "footer" at the beginning of the file with various # 512-byte "footer" at the beginning of the file with various

View File

@ -51,18 +51,28 @@ class TestFormatInspectors(test_utils.BaseTestCase):
except Exception: except Exception:
pass pass
def _create_img(self, fmt, size, subformat=None): def _create_img(self, fmt, size, subformat=None, options=None,
backing_file=None):
if fmt == 'vhd': if fmt == 'vhd':
# QEMU calls the vhd format vpc # QEMU calls the vhd format vpc
fmt = 'vpc' fmt = 'vpc'
if options is None:
options = {}
opt = '' opt = ''
prefix = 'glance-unittest-formatinspector-' prefix = 'glance-unittest-formatinspector-'
if subformat: if subformat:
opt = ' -o subformat=%s' % subformat options['subformat'] = subformat
prefix += subformat + '-' prefix += subformat + '-'
if options:
opt += '-o ' + ','.join('%s=%s' % (k, v)
for k, v in options.items())
if backing_file is not None:
opt += ' -b %s -F raw' % backing_file
fn = tempfile.mktemp(prefix=prefix, fn = tempfile.mktemp(prefix=prefix,
suffix='.%s' % fmt) suffix='.%s' % fmt)
self._created_files.append(fn) self._created_files.append(fn)
@ -160,6 +170,15 @@ class TestFormatInspectors(test_utils.BaseTestCase):
def test_vmdk_stream_optimized(self): def test_vmdk_stream_optimized(self):
self._test_format('vmdk', 'streamOptimized') self._test_format('vmdk', 'streamOptimized')
def test_from_file_reads_minimum(self):
img = self._create_img('qcow2', 10 * units.Mi)
file_size = os.stat(img).st_size
fmt = format_inspector.QcowInspector.from_file(img)
# We know everything we need from the first 512 bytes of a QCOW image,
# so make sure that we did not read the whole thing when we inspect
# a local file.
self.assertLess(fmt.actual_size, file_size)
def _test_vmdk_bad_descriptor_offset(self, subformat=None): def _test_vmdk_bad_descriptor_offset(self, subformat=None):
format_name = 'vmdk' format_name = 'vmdk'
image_size = 10 * units.Mi image_size = 10 * units.Mi
@ -231,6 +250,63 @@ class TestFormatInspectors(test_utils.BaseTestCase):
def test_vmdk_bad_descriptor_mem_limit_stream_optimized(self): def test_vmdk_bad_descriptor_mem_limit_stream_optimized(self):
self._test_vmdk_bad_descriptor_mem_limit(subformat='streamOptimized') self._test_vmdk_bad_descriptor_mem_limit(subformat='streamOptimized')
def test_qcow2_safety_checks(self):
# Create backing and data-file names (and initialize the backing file)
backing_fn = tempfile.mktemp(prefix='backing')
self._created_files.append(backing_fn)
with open(backing_fn, 'w') as f:
f.write('foobar')
data_fn = tempfile.mktemp(prefix='data')
self._created_files.append(data_fn)
# A qcow with no backing or data file is safe
fn = self._create_img('qcow2', 5 * units.Mi, None)
inspector = format_inspector.QcowInspector.from_file(fn)
self.assertTrue(inspector.safety_check())
# A backing file makes it unsafe
fn = self._create_img('qcow2', 5 * units.Mi, None,
backing_file=backing_fn)
inspector = format_inspector.QcowInspector.from_file(fn)
self.assertFalse(inspector.safety_check())
# A data-file makes it unsafe
fn = self._create_img('qcow2', 5 * units.Mi,
options={'data_file': data_fn,
'data_file_raw': 'on'})
inspector = format_inspector.QcowInspector.from_file(fn)
self.assertFalse(inspector.safety_check())
# Trying to load a non-QCOW file is an error
self.assertRaises(format_inspector.ImageFormatError,
format_inspector.QcowInspector.from_file,
backing_fn)
def test_qcow2_feature_flag_checks(self):
data = bytearray(512)
data[0:4] = b'QFI\xFB'
inspector = format_inspector.QcowInspector()
inspector.region('header').data = data
# All zeros, no feature flags - all good
self.assertFalse(inspector.has_unknown_features)
# A feature flag set in the first byte (highest-order) is not
# something we know about, so fail.
data[0x48] = 0x01
self.assertTrue(inspector.has_unknown_features)
# The first bit in the last byte (lowest-order) is known (the dirty
# bit) so that should pass
data[0x48] = 0x00
data[0x4F] = 0x01
self.assertFalse(inspector.has_unknown_features)
# Currently (as of 2024), the high-order feature flag bit in the low-
# order byte is not assigned, so make sure we reject it.
data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features)
def test_vdi(self): def test_vdi(self):
self._test_format('vdi') self._test_format('vdi')