Extend format_inspector for QCOW safety

This adds two properties to the QcowInspector that makes it able to
indicate whether the file specifies a backing_file or data_file in the
header. Both conditions are considered unsafe for our usage. To
ease checking of this condition, a classmethod is added that takes
a local filename and digests just enough of the file to assert that
both conditions are false.

Note for yoga backport:With older qemu installed one of the qemu-img
create commands fails, let's skip it from unmaintained/yoga and
below that.

Change-Id: Iaf86b525397d41bd116999cabe0954a0a7efac65
Related-Bug: #2059809
(cherry picked from commit ae536bb394793c9a7a219cb498e03d5c81dbbbb7)
(cherry picked from commit 2eba54e0821106097dfeceb424e53943fd090483)
(cherry picked from commit 89dbbc838d606f461087e1494d19ddbcf9db0a38)
(cherry picked from commit 4860024286256b028fabc5ed50274934c3dfdd8a)
(cherry picked from commit f32d5b8ad865113d499a36f7507a085f583514f9)
This commit is contained in:
Dan Smith 2024-04-16 10:29:10 -07:00 committed by elajkat
parent f7f7249b34
commit a28d96bb8c
2 changed files with 209 additions and 5 deletions
glance

@ -28,6 +28,14 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def chunked_reader(fileobj, chunk_size=512):
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
class CaptureRegion(object):
"""Represents a region of a file we want to capture.
@ -176,10 +184,16 @@ class FileInspector(object):
@property
def actual_size(self):
"""Returns the total size of the file, usually smaller than
virtual_size.
virtual_size. NOTE: this will only be accurate if the entire
file is read and processed.
"""
return self._total_count
@property
def complete(self):
"""Returns True if we have all the information needed."""
return all(r.complete for r in self._capture_regions.values())
def __str__(self):
"""The string name of this file format."""
return 'raw'
@ -194,6 +208,35 @@ class FileInspector(object):
return {name: len(region.data) for name, region in
self._capture_regions.items()}
@classmethod
def from_file(cls, filename):
"""Read as much of a file as necessary to complete inspection.
NOTE: Because we only read as much of the file as necessary, the
actual_size property will not reflect the size of the file, but the
amount of data we read before we satisfied the inspector.
Raises ImageFormatError if we cannot parse the file.
"""
inspector = cls()
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
inspector.eat_chunk(chunk)
if inspector.complete:
# No need to eat any more data
break
if not inspector.complete or not inspector.format_match:
raise ImageFormatError('File is not in requested format')
return inspector
def safety_check(self):
"""Perform some checks to determine if this file is safe.
Returns True if safe, False otherwise. It may raise ImageFormatError
if safety cannot be guaranteed because of parsing or other errors.
"""
return True
# The qcow2 format consists of a big-endian 72-byte header, of which
# only a small portion has information we care about:
@ -202,15 +245,26 @@ class FileInspector(object):
# 0 0x00 Magic 4-bytes 'QFI\xfb'
# 4 0x04 Version (uint32_t, should always be 2 for modern files)
# . . .
# 8 0x08 Backing file offset (uint64_t)
# 24 0x18 Size in bytes (unint64_t)
# . . .
# 72 0x48 Incompatible features bitfield (6 bytes)
#
# https://people.gnome.org/~markmc/qcow-image-format.html
# https://gitlab.com/qemu-project/qemu/-/blob/master/docs/interop/qcow2.txt
class QcowInspector(FileInspector):
"""QEMU QCOW2 Format
This should only require about 32 bytes of the beginning of the file
to determine the virtual size.
to determine the virtual size, and 104 bytes to perform the safety check.
"""
BF_OFFSET = 0x08
BF_OFFSET_LEN = 8
I_FEATURES = 0x48
I_FEATURES_LEN = 8
I_FEATURES_DATAFILE_BIT = 3
I_FEATURES_MAX_BIT = 4
def __init__(self, *a, **k):
super(QcowInspector, self).__init__(*a, **k)
self.new_region('header', CaptureRegion(0, 512))
@ -220,6 +274,10 @@ class QcowInspector(FileInspector):
struct.unpack('>4sIQIIQ', self.region('header').data[:32]))
return magic, size
@property
def has_header(self):
return self.region('header').complete
@property
def virtual_size(self):
if not self.region('header').complete:
@ -236,9 +294,77 @@ class QcowInspector(FileInspector):
magic, size = self._qcow_header_data()
return magic == b'QFI\xFB'
@property
def has_backing_file(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
bf_offset_bytes = self.region('header').data[
self.BF_OFFSET:self.BF_OFFSET + self.BF_OFFSET_LEN]
# nonzero means "has a backing file"
bf_offset, = struct.unpack('>Q', bf_offset_bytes)
return bf_offset != 0
@property
def has_unknown_features(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
i_features = self.region('header').data[
self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
# This is the maximum byte number we should expect any bits to be set
max_byte = self.I_FEATURES_MAX_BIT // 8
# The flag bytes are in big-endian ordering, so if we process
# them in index-order, they're reversed
for i, byte_num in enumerate(reversed(range(self.I_FEATURES_LEN))):
if byte_num == max_byte:
# If we're in the max-allowed byte, allow any bits less than
# the maximum-known feature flag bit to be set
allow_mask = ((1 << self.I_FEATURES_MAX_BIT) - 1)
elif byte_num > max_byte:
# If we're above the byte with the maximum known feature flag
# bit, then we expect all zeroes
allow_mask = 0x0
else:
# Any earlier-than-the-maximum byte can have any of the flag
# bits set
allow_mask = 0xFF
if i_features[i] & ~allow_mask:
LOG.warning('Found unknown feature bit in byte %i: %s/%s',
byte_num, bin(i_features[byte_num] & ~allow_mask),
bin(allow_mask))
return True
return False
@property
def has_data_file(self):
if not self.region('header').complete:
return None
if not self.format_match:
return False
i_features = self.region('header').data[
self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
# First byte of bitfield, which is i_features[7]
byte = self.I_FEATURES_LEN - 1 - self.I_FEATURES_DATAFILE_BIT // 8
# Third bit of bitfield, which is 0x04
bit = 1 << (self.I_FEATURES_DATAFILE_BIT - 1 % 8)
return bool(i_features[byte] & bit)
def __str__(self):
return 'qcow2'
def safety_check(self):
return (not self.has_backing_file and
not self.has_data_file and
not self.has_unknown_features)
# The VHD (or VPC as QEMU calls it) format consists of a big-endian
# 512-byte "footer" at the beginning of the file with various

@ -51,18 +51,28 @@ class TestFormatInspectors(test_utils.BaseTestCase):
except Exception:
pass
def _create_img(self, fmt, size, subformat=None):
def _create_img(self, fmt, size, subformat=None, options=None,
backing_file=None):
if fmt == 'vhd':
# QEMU calls the vhd format vpc
fmt = 'vpc'
if options is None:
options = {}
opt = ''
prefix = 'glance-unittest-formatinspector-'
if subformat:
opt = ' -o subformat=%s' % subformat
options['subformat'] = subformat
prefix += subformat + '-'
if options:
opt += '-o ' + ','.join('%s=%s' % (k, v)
for k, v in options.items())
if backing_file is not None:
opt += ' -b %s -F raw' % backing_file
fn = tempfile.mktemp(prefix=prefix,
suffix='.%s' % fmt)
self._created_files.append(fn)
@ -160,6 +170,15 @@ class TestFormatInspectors(test_utils.BaseTestCase):
def test_vmdk_stream_optimized(self):
self._test_format('vmdk', 'streamOptimized')
def test_from_file_reads_minimum(self):
img = self._create_img('qcow2', 10 * units.Mi)
file_size = os.stat(img).st_size
fmt = format_inspector.QcowInspector.from_file(img)
# We know everything we need from the first 512 bytes of a QCOW image,
# so make sure that we did not read the whole thing when we inspect
# a local file.
self.assertLess(fmt.actual_size, file_size)
def _test_vmdk_bad_descriptor_offset(self, subformat=None):
format_name = 'vmdk'
image_size = 10 * units.Mi
@ -231,6 +250,65 @@ class TestFormatInspectors(test_utils.BaseTestCase):
def test_vmdk_bad_descriptor_mem_limit_stream_optimized(self):
self._test_vmdk_bad_descriptor_mem_limit(subformat='streamOptimized')
def test_qcow2_safety_checks(self):
# Create backing and data-file names (and initialize the backing file)
backing_fn = tempfile.mktemp(prefix='backing')
self._created_files.append(backing_fn)
with open(backing_fn, 'w') as f:
f.write('foobar')
data_fn = tempfile.mktemp(prefix='data')
self._created_files.append(data_fn)
# A qcow with no backing or data file is safe
fn = self._create_img('qcow2', 5 * units.Mi, None)
inspector = format_inspector.QcowInspector.from_file(fn)
self.assertTrue(inspector.safety_check())
# A backing file makes it unsafe
fn = self._create_img('qcow2', 5 * units.Mi, None,
backing_file=backing_fn)
inspector = format_inspector.QcowInspector.from_file(fn)
self.assertFalse(inspector.safety_check())
# Note(lajoskatona): This image create fails on bionic due to
# old qemu-img utilities, let's skip this only test from yoga
# A data-file makes it unsafe
# fn = self._create_img('qcow2', 5 * units.Mi,
# options={'data_file': data_fn,
# 'data_file_raw': 'on'})
# inspector = format_inspector.QcowInspector.from_file(fn)
# self.assertFalse(inspector.safety_check())
# Trying to load a non-QCOW file is an error
self.assertRaises(format_inspector.ImageFormatError,
format_inspector.QcowInspector.from_file,
backing_fn)
def test_qcow2_feature_flag_checks(self):
data = bytearray(512)
data[0:4] = b'QFI\xFB'
inspector = format_inspector.QcowInspector()
inspector.region('header').data = data
# All zeros, no feature flags - all good
self.assertFalse(inspector.has_unknown_features)
# A feature flag set in the first byte (highest-order) is not
# something we know about, so fail.
data[0x48] = 0x01
self.assertTrue(inspector.has_unknown_features)
# The first bit in the last byte (lowest-order) is known (the dirty
# bit) so that should pass
data[0x48] = 0x00
data[0x4F] = 0x01
self.assertFalse(inspector.has_unknown_features)
# Currently (as of 2024), the high-order feature flag bit in the low-
# order byte is not assigned, so make sure we reject it.
data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features)
def test_vdi(self):
self._test_format('vdi')