diff --git a/cinder/image/format_inspector.py b/cinder/image/format_inspector.py
new file mode 100644
index 00000000000..ede09005ea0
--- /dev/null
+++ b/cinder/image/format_inspector.py
@@ -0,0 +1,938 @@
+# Copyright 2020 Red Hat, Inc
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+This is a python implementation of virtual disk format inspection routines
+gathered from various public specification documents, as well as qemu disk
+driver code. It attempts to store and parse the minimum amount of data
+required, and in a streaming-friendly manner to collect metadata about
+complex-format images.
+"""
+
+import struct
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def chunked_reader(fileobj, chunk_size=512):
+    while True:
+        chunk = fileobj.read(chunk_size)
+        if not chunk:
+            break
+        yield chunk
+
+
+class CaptureRegion(object):
+    """Represents a region of a file we want to capture.
+
+    A region of a file we want to capture requires a byte offset into
+    the file and a length. This is expected to be used by a data
+    processing loop, calling capture() with the most recently-read
+    chunk. This class handles the task of grabbing the desired region
+    of data across potentially multiple fractional and unaligned reads.
+
+    :param offset: Byte offset into the file starting the region
+    :param length: The length of the region
+    """
+    def __init__(self, offset, length):
+        self.offset = offset
+        self.length = length
+        self.data = b''
+
+    @property
+    def complete(self):
+        """Returns True when we have captured the desired data."""
+        return self.length == len(self.data)
+
+    def capture(self, chunk, current_position):
+        """Process a chunk of data.
+
+        This should be called for each chunk in the read loop, at least
+        until complete returns True.
+
+        :param chunk: A chunk of bytes in the file
+        :param current_position: The position of the file processed by the
+                                 read loop so far. Note that this will be
+                                 the position in the file *after* the chunk
+                                 being presented.
+        """
+        read_start = current_position - len(chunk)
+        if (read_start <= self.offset <= current_position or
+                self.offset <= read_start <= (self.offset + self.length)):
+            if read_start < self.offset:
+                lead_gap = self.offset - read_start
+            else:
+                lead_gap = 0
+            self.data += chunk[lead_gap:]
+            self.data = self.data[:self.length]
+
+
+class ImageFormatError(Exception):
+    """An unrecoverable image format error that aborts the process."""
+    pass
+
+
+class TraceDisabled(object):
+    """A logger-like thing that swallows tracing when we do not want it."""
+    def debug(self, *a, **k):
+        pass
+
+    info = debug
+    warning = debug
+    error = debug
+
+
+class FileInspector(object):
+    """A stream-based disk image inspector.
+
+    This base class works on raw images and is subclassed for more
+    complex types. It is to be presented with the file to be examined
+    one chunk at a time, during read processing and will only store
+    as much data as necessary to determine required attributes of
+    the file.
+    """
+
+    def __init__(self, tracing=False):
+        self._total_count = 0
+
+        # NOTE(danms): The logging in here is extremely verbose for a reason,
+        # but should never really be enabled at that level at runtime. To
+        # retain all that work and assist in future debug, we have a separate
+        # debug flag that can be passed from a manual tool to turn it on.
+        if tracing:
+            self._log = logging.getLogger(str(self))
+        else:
+            self._log = TraceDisabled()
+        self._capture_regions = {}
+
+    def _capture(self, chunk, only=None):
+        for name, region in self._capture_regions.items():
+            if only and name not in only:
+                continue
+            if not region.complete:
+                region.capture(chunk, self._total_count)
+
+    def eat_chunk(self, chunk):
+        """Call this to present chunks of the file to the inspector."""
+        pre_regions = set(self._capture_regions.keys())
+
+        # Increment our position-in-file counter
+        self._total_count += len(chunk)
+
+        # Run through the regions we know of to see if they want this
+        # data
+        self._capture(chunk)
+
+        # Let the format do some post-read processing of the stream
+        self.post_process()
+
+        # Check to see if the post-read processing added new regions
+        # which may require the current chunk.
+        new_regions = set(self._capture_regions.keys()) - pre_regions
+        if new_regions:
+            self._capture(chunk, only=new_regions)
+
+    def post_process(self):
+        """Post-read hook to process what has been read so far.
+
+        This will be called after each chunk is read and potentially captured
+        by the defined regions. If any regions are defined by this call,
+        those regions will be presented with the current chunk in case it
+        is within one of the new regions.
+        """
+        pass
+
+    def region(self, name):
+        """Get a CaptureRegion by name."""
+        return self._capture_regions[name]
+
+    def new_region(self, name, region):
+        """Add a new CaptureRegion by name."""
+        if self.has_region(name):
+            # This is a bug, we tried to add the same region twice
+            raise ImageFormatError('Inspector re-added region %s' % name)
+        self._capture_regions[name] = region
+
+    def has_region(self, name):
+        """Returns True if named region has been defined."""
+        return name in self._capture_regions
+
+    @property
+    def format_match(self):
+        """Returns True if the file appears to be the expected format."""
+        return True
+
+    @property
+    def virtual_size(self):
+        """Returns the virtual size of the disk image, or zero if unknown."""
+        return self._total_count
+
+    @property
+    def actual_size(self):
+        """Returns the total size of the file.
+
+        This is usually smaller than virtual_size. NOTE: this will only be
+        accurate if the entire file is read and processed.
+        """
+        return self._total_count
+
+    @property
+    def complete(self):
+        """Returns True if we have all the information needed."""
+        return all(r.complete for r in self._capture_regions.values())
+
+    def __str__(self):
+        """The string name of this file format."""
+        return 'raw'
+
+    @property
+    def context_info(self):
+        """Return info on amount of data held in memory for auditing.
+
+        This is a dict of region:sizeinbytes items that the inspector
+        uses to examine the file.
+        """
+        return {name: len(region.data) for name, region in
+                self._capture_regions.items()}
+
+    @classmethod
+    def from_file(cls, filename):
+        """Read as much of a file as necessary to complete inspection.
+
+        NOTE: Because we only read as much of the file as necessary, the
+        actual_size property will not reflect the size of the file, but the
+        amount of data we read before we satisfied the inspector.
+
+        Raises ImageFormatError if we cannot parse the file.
+        """
+        inspector = cls()
+        with open(filename, 'rb') as f:
+            for chunk in chunked_reader(f):
+                inspector.eat_chunk(chunk)
+                if inspector.complete:
+                    # No need to eat any more data
+                    break
+        if not inspector.complete or not inspector.format_match:
+            raise ImageFormatError('File is not in requested format')
+        return inspector
+
+    def safety_check(self):
+        """Perform some checks to determine if this file is safe.
+
+        Returns True if safe, False otherwise. It may raise ImageFormatError
+        if safety cannot be guaranteed because of parsing or other errors.
+        """
+        return True
+
+
+# The qcow2 format consists of a big-endian 72-byte header, of which
+# only a small portion has information we care about:
+#
+# Dec   Hex   Name
+#   0  0x00   Magic 4-bytes 'QFI\xfb'
+#   4  0x04   Version (uint32_t, should always be 2 for modern files)
+#  . . .
+#   8  0x08   Backing file offset (uint64_t)
+#  24  0x18   Size in bytes (unint64_t)
+#  . . .
+#  72  0x48   Incompatible features bitfield (6 bytes)
+#
+# https://gitlab.com/qemu-project/qemu/-/blob/master/docs/interop/qcow2.txt
+class QcowInspector(FileInspector):
+    """QEMU QCOW2 Format
+
+    This should only require about 32 bytes of the beginning of the file
+    to determine the virtual size, and 104 bytes to perform the safety check.
+    """
+
+    BF_OFFSET = 0x08
+    BF_OFFSET_LEN = 8
+    I_FEATURES = 0x48
+    I_FEATURES_LEN = 8
+    I_FEATURES_DATAFILE_BIT = 3
+    I_FEATURES_MAX_BIT = 4
+
+    def __init__(self, *a, **k):
+        super(QcowInspector, self).__init__(*a, **k)
+        self.new_region('header', CaptureRegion(0, 512))
+
+    def _qcow_header_data(self):
+        magic, version, bf_offset, bf_sz, cluster_bits, size = (
+            struct.unpack('>4sIQIIQ', self.region('header').data[:32]))
+        return magic, size
+
+    @property
+    def has_header(self):
+        return self.region('header').complete
+
+    @property
+    def virtual_size(self):
+        if not self.region('header').complete:
+            return 0
+        if not self.format_match:
+            return 0
+        magic, size = self._qcow_header_data()
+        return size
+
+    @property
+    def format_match(self):
+        if not self.region('header').complete:
+            return False
+        magic, size = self._qcow_header_data()
+        return magic == b'QFI\xFB'
+
+    @property
+    def has_backing_file(self):
+        if not self.region('header').complete:
+            return None
+        if not self.format_match:
+            return False
+        bf_offset_bytes = self.region('header').data[
+            self.BF_OFFSET:self.BF_OFFSET + self.BF_OFFSET_LEN]
+        # nonzero means "has a backing file"
+        bf_offset, = struct.unpack('>Q', bf_offset_bytes)
+        return bf_offset != 0
+
+    @property
+    def has_unknown_features(self):
+        if not self.region('header').complete:
+            return None
+        if not self.format_match:
+            return False
+        i_features = self.region('header').data[
+            self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
+
+        # This is the maximum byte number we should expect any bits to be set
+        max_byte = self.I_FEATURES_MAX_BIT // 8
+
+        # The flag bytes are in big-endian ordering, so if we process
+        # them in index-order, they're reversed
+        for i, byte_num in enumerate(reversed(range(self.I_FEATURES_LEN))):
+            if byte_num == max_byte:
+                # If we're in the max-allowed byte, allow any bits less than
+                # the maximum-known feature flag bit to be set
+                allow_mask = ((1 << self.I_FEATURES_MAX_BIT) - 1)
+            elif byte_num > max_byte:
+                # If we're above the byte with the maximum known feature flag
+                # bit, then we expect all zeroes
+                allow_mask = 0x0
+            else:
+                # Any earlier-than-the-maximum byte can have any of the flag
+                # bits set
+                allow_mask = 0xFF
+
+            if i_features[i] & ~allow_mask:
+                LOG.warning('Found unknown feature bit in byte %i: %s/%s',
+                            byte_num, bin(i_features[byte_num] & ~allow_mask),
+                            bin(allow_mask))
+                return True
+
+        return False
+
+    @property
+    def has_data_file(self):
+        if not self.region('header').complete:
+            return None
+        if not self.format_match:
+            return False
+        i_features = self.region('header').data[
+            self.I_FEATURES:self.I_FEATURES + self.I_FEATURES_LEN]
+
+        # First byte of bitfield, which is i_features[7]
+        byte = self.I_FEATURES_LEN - 1 - self.I_FEATURES_DATAFILE_BIT // 8
+        # Third bit of bitfield, which is 0x04
+        bit = 1 << (self.I_FEATURES_DATAFILE_BIT - 1 % 8)
+        return bool(i_features[byte] & bit)
+
+    def __str__(self):
+        return 'qcow2'
+
+    def safety_check(self):
+        return (not self.has_backing_file and
+                not self.has_data_file and
+                not self.has_unknown_features)
+
+    def safety_check_allow_backing_file(self):
+        return (not self.has_data_file and
+                not self.has_unknown_features)
+
+
+class QEDInspector(FileInspector):
+    def __init__(self, tracing=False):
+        super().__init__(tracing)
+        self.new_region('header', CaptureRegion(0, 512))
+
+    @property
+    def format_match(self):
+        if not self.region('header').complete:
+            return False
+        return self.region('header').data.startswith(b'QED\x00')
+
+    def safety_check(self):
+        # QED format is not supported by anyone, but we want to detect it
+        # and mark it as just always unsafe.
+        return False
+
+
+# The VHD (or VPC as QEMU calls it) format consists of a big-endian
+# 512-byte "footer" at the beginning of the file with various
+# information, most of which does not matter to us:
+#
+# Dec   Hex   Name
+#   0  0x00   Magic string (8-bytes, always 'conectix')
+#  40  0x28   Disk size (uint64_t)
+#
+# https://github.com/qemu/qemu/blob/master/block/vpc.c
+class VHDInspector(FileInspector):
+    """Connectix/MS VPC VHD Format
+
+    This should only require about 512 bytes of the beginning of the file
+    to determine the virtual size.
+    """
+    def __init__(self, *a, **k):
+        super(VHDInspector, self).__init__(*a, **k)
+        self.new_region('header', CaptureRegion(0, 512))
+
+    @property
+    def format_match(self):
+        return self.region('header').data.startswith(b'conectix')
+
+    @property
+    def virtual_size(self):
+        if not self.region('header').complete:
+            return 0
+
+        if not self.format_match:
+            return 0
+
+        return struct.unpack('>Q', self.region('header').data[40:48])[0]
+
+    def __str__(self):
+        return 'vhd'
+
+
+# The VHDX format consists of a complex dynamic little-endian
+# structure with multiple regions of metadata and data, linked by
+# offsets with in the file (and within regions), identified by MSFT
+# GUID strings. The header is a 320KiB structure, only a few pieces of
+# which we actually need to capture and interpret:
+#
+#     Dec    Hex  Name
+#      0 0x00000  Identity (Technically 9-bytes, padded to 64KiB, the first
+#                 8 bytes of which are 'vhdxfile')
+# 196608 0x30000  The Region table (64KiB of a 32-byte header, followed
+#                 by up to 2047 36-byte region table entry structures)
+#
+# The region table header includes two items we need to read and parse,
+# which are:
+#
+# 196608 0x30000  4-byte signature ('regi')
+# 196616 0x30008  Entry count (uint32-t)
+#
+# The region table entries follow the region table header immediately
+# and are identified by a 16-byte GUID, and provide an offset of the
+# start of that region. We care about the "metadata region", identified
+# by the METAREGION class variable. The region table entry is (offsets
+# from the beginning of the entry, since it could be in multiple places):
+#
+#      0 0x00000 16-byte MSFT GUID
+#     16 0x00010 Offset of the actual metadata region (uint64_t)
+#
+# When we find the METAREGION table entry, we need to grab that offset
+# and start examining the region structure at that point. That
+# consists of a metadata table of structures, which point to places in
+# the data in an unstructured space that follows. The header is
+# (offsets relative to the region start):
+#
+#      0 0x00000 8-byte signature ('metadata')
+#      . . .
+#     16 0x00010 2-byte entry count (up to 2047 entries max)
+#
+# This header is followed by the specified number of metadata entry
+# structures, identified by GUID:
+#
+#      0 0x00000 16-byte MSFT GUID
+#     16 0x00010 4-byte offset (uint32_t, relative to the beginning of
+#                the metadata region)
+#
+# We need to find the "Virtual Disk Size" metadata item, identified by
+# the GUID in the VIRTUAL_DISK_SIZE class variable, grab the offset,
+# add it to the offset of the metadata region, and examine that 8-byte
+# chunk of data that follows.
+#
+# The "Virtual Disk Size" is a naked uint64_t which contains the size
+# of the virtual disk, and is our ultimate target here.
+#
+# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-vhdx/83e061f8-f6e2-4de1-91bd-5d518a43d477
+class VHDXInspector(FileInspector):
+    """MS VHDX Format
+
+    This requires some complex parsing of the stream. The first 256KiB
+    of the image is stored to get the header and region information,
+    and then we capture the first metadata region to read those
+    records, find the location of the virtual size data and parse
+    it. This needs to store the metadata table entries up until the
+    VDS record, which may consist of up to 2047 32-byte entries at
+    max.  Finally, it must store a chunk of data at the offset of the
+    actual VDS uint64.
+
+    """
+    METAREGION = '8B7CA206-4790-4B9A-B8FE-575F050F886E'
+    VIRTUAL_DISK_SIZE = '2FA54224-CD1B-4876-B211-5DBED83BF4B8'
+    VHDX_METADATA_TABLE_MAX_SIZE = 32 * 2048  # From qemu
+
+    def __init__(self, *a, **k):
+        super(VHDXInspector, self).__init__(*a, **k)
+        self.new_region('ident', CaptureRegion(0, 32))
+        self.new_region('header', CaptureRegion(192 * 1024, 64 * 1024))
+
+    def post_process(self):
+        # After reading a chunk, we may have the following conditions:
+        #
+        # 1. We may have just completed the header region, and if so,
+        #    we need to immediately read and calculate the location of
+        #    the metadata region, as it may be starting in the same
+        #    read we just did.
+        # 2. We may have just completed the metadata region, and if so,
+        #    we need to immediately calculate the location of the
+        #    "virtual disk size" record, as it may be starting in the
+        #    same read we just did.
+        if self.region('header').complete and not self.has_region('metadata'):
+            region = self._find_meta_region()
+            if region:
+                self.new_region('metadata', region)
+        elif self.has_region('metadata') and not self.has_region('vds'):
+            region = self._find_meta_entry(self.VIRTUAL_DISK_SIZE)
+            if region:
+                self.new_region('vds', region)
+
+    @property
+    def format_match(self):
+        return self.region('ident').data.startswith(b'vhdxfile')
+
+    @staticmethod
+    def _guid(buf):
+        """Format a MSFT GUID from the 16-byte input buffer."""
+        guid_format = '<IHHBBBBBBBB'
+        return '%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X' % (
+            struct.unpack(guid_format, buf))
+
+    def _find_meta_region(self):
+        # The region table entries start after a 16-byte table header
+        region_entry_first = 16
+
+        # Parse the region table header to find the number of regions
+        regi, cksum, count, reserved = struct.unpack(
+            '<IIII', self.region('header').data[:16])
+        if regi != 0x69676572:
+            raise ImageFormatError('Region signature not found at %x' % (
+                self.region('header').offset))
+
+        if count >= 2048:
+            raise ImageFormatError('Region count is %i (limit 2047)' % count)
+
+        # Process the regions until we find the metadata one; grab the
+        # offset and return
+        self._log.debug('Region entry first is %x', region_entry_first)
+        self._log.debug('Region entries %i', count)
+        meta_offset = 0
+        for i in range(0, count):
+            entry_start = region_entry_first + (i * 32)
+            entry_end = entry_start + 32
+            entry = self.region('header').data[entry_start:entry_end]
+            self._log.debug('Entry offset is %x', entry_start)
+
+            # GUID is the first 16 bytes
+            guid = self._guid(entry[:16])
+            if guid == self.METAREGION:
+                # This entry is the metadata region entry
+                meta_offset, meta_len, meta_req = struct.unpack(
+                    '<QII', entry[16:])
+                self._log.debug('Meta entry %i specifies offset: %x',
+                                i, meta_offset)
+                # NOTE(danms): The meta_len in the region descriptor is the
+                # entire size of the metadata table and data. This can be
+                # very large, so we should only capture the size required
+                # for the maximum length of the table, which is one 32-byte
+                # table header, plus up to 2047 32-byte entries.
+                meta_len = 2048 * 32
+                return CaptureRegion(meta_offset, meta_len)
+
+        self._log.warning('Did not find metadata region')
+        return None
+
+    def _find_meta_entry(self, desired_guid):
+        meta_buffer = self.region('metadata').data
+        if len(meta_buffer) < 32:
+            # Not enough data yet for full header
+            return None
+
+        # Make sure we found the metadata region by checking the signature
+        sig, reserved, count = struct.unpack('<8sHH', meta_buffer[:12])
+        if sig != b'metadata':
+            raise ImageFormatError(
+                'Invalid signature for metadata region: %r' % sig)
+
+        entries_size = 32 + (count * 32)
+        if len(meta_buffer) < entries_size:
+            # Not enough data yet for all metadata entries. This is not
+            # strictly necessary as we could process whatever we have until
+            # we find the V-D-S one, but there are only 2047 32-byte
+            # entries max (~64k).
+            return None
+
+        if count >= 2048:
+            raise ImageFormatError(
+                'Metadata item count is %i (limit 2047)' % count)
+
+        for i in range(0, count):
+            entry_offset = 32 + (i * 32)
+            guid = self._guid(meta_buffer[entry_offset:entry_offset + 16])
+            if guid == desired_guid:
+                # Found the item we are looking for by id.
+                # Stop our region from capturing
+                item_offset, item_length, _reserved = struct.unpack(
+                    '<III',
+                    meta_buffer[entry_offset + 16:entry_offset + 28])
+                item_length = min(item_length,
+                                  self.VHDX_METADATA_TABLE_MAX_SIZE)
+                self.region('metadata').length = len(meta_buffer)
+                self._log.debug('Found entry at offset %x', item_offset)
+                # Metadata item offset is from the beginning of the metadata
+                # region, not the file.
+                return CaptureRegion(
+                    self.region('metadata').offset + item_offset,
+                    item_length)
+
+        self._log.warning('Did not find guid %s', desired_guid)
+        return None
+
+    @property
+    def virtual_size(self):
+        # Until we have found the offset and have enough metadata buffered
+        # to read it, return "unknown"
+        if not self.has_region('vds') or not self.region('vds').complete:
+            return 0
+
+        size, = struct.unpack('<Q', self.region('vds').data)
+        return size
+
+    def __str__(self):
+        return 'vhdx'
+
+
+# The VMDK format comes in a large number of variations, but the
+# single-file 'monolithicSparse' version 4 one is mostly what we care
+# about. It contains a 512-byte little-endian header, followed by a
+# variable-length "descriptor" region of text. The header looks like:
+#
+#   Dec  Hex  Name
+#     0 0x00  4-byte magic string 'KDMV'
+#     4 0x04  Version (uint32_t)
+#     8 0x08  Flags (uint32_t, unused by us)
+#    16 0x10  Number of 512 byte sectors in the disk (uint64_t)
+#    24 0x18  Granularity (uint64_t, unused by us)
+#    32 0x20  Descriptor offset in 512-byte sectors (uint64_t)
+#    40 0x28  Descriptor size in 512-byte sectors (uint64_t)
+#
+# After we have the header, we need to find the descriptor region,
+# which starts at the sector identified in the "descriptor offset"
+# field, and is "descriptor size" 512-byte sectors long. Once we have
+# that region, we need to parse it as text, looking for the
+# createType=XXX line that specifies the mechanism by which the data
+# extents are stored in this file. We only support the
+# "monolithicSparse" format, so we just need to confirm that this file
+# contains that specifier.
+#
+# https://www.vmware.com/app/vmdk/?src=vmdk
+class VMDKInspector(FileInspector):
+    """vmware VMDK format (monolithicSparse and streamOptimized variants only)
+
+    This needs to store the 512 byte header and the descriptor region
+    which should be just after that. The descriptor region is some
+    variable number of 512 byte sectors, but is just text defining the
+    layout of the disk.
+    """
+
+    # The beginning and max size of the descriptor is also hardcoded in Qemu
+    # at 0x200 and 1MB - 1
+    DESC_OFFSET = 0x200
+    DESC_MAX_SIZE = (1 << 20) - 1
+    GD_AT_END = 0xffffffffffffffff
+
+    def __init__(self, *a, **k):
+        super(VMDKInspector, self).__init__(*a, **k)
+        self.new_region('header', CaptureRegion(0, 512))
+
+    def post_process(self):
+        # If we have just completed the header region, we need to calculate
+        # the location and length of the descriptor, which should immediately
+        # follow and may have been partially-read in this read.
+        if not self.region('header').complete:
+            return
+
+        (sig, ver, _flags, _sectors, _grain, desc_sec, desc_num,
+         _numGTEsperGT, _rgdOffset, gdOffset) = struct.unpack(
+            '<4sIIQQQQIQQ', self.region('header').data[:64])
+
+        if sig != b'KDMV':
+            raise ImageFormatError('Signature KDMV not found: %r' % sig)
+
+        if ver not in (1, 2, 3):
+            raise ImageFormatError('Unsupported format version %i' % ver)
+
+        if gdOffset == self.GD_AT_END:
+            # This means we have a footer, which takes precedence over the
+            # header, which we cannot support since we stream.
+            raise ImageFormatError('Unsupported VMDK footer')
+
+        # Since we parse both desc_sec and desc_num (the location of the
+        # VMDK's descriptor, expressed in 512 bytes sectors) we enforce a
+        # check on the bounds to create a reasonable CaptureRegion. This
+        # is similar to how it's done in qemu.
+        desc_offset = desc_sec * 512
+        desc_size = min(desc_num * 512, self.DESC_MAX_SIZE)
+        if desc_offset != self.DESC_OFFSET:
+            raise ImageFormatError("Wrong descriptor location")
+
+        if not self.has_region('descriptor'):
+            self.new_region('descriptor', CaptureRegion(
+                desc_offset, desc_size))
+
+    @property
+    def format_match(self):
+        return self.region('header').data.startswith(b'KDMV')
+
+    @property
+    def virtual_size(self):
+        if not self.has_region('descriptor'):
+            # Not enough data yet
+            return 0
+
+        descriptor_rgn = self.region('descriptor')
+        if not descriptor_rgn.complete:
+            # Not enough data yet
+            return 0
+
+        descriptor = descriptor_rgn.data
+        type_idx = descriptor.index(b'createType="') + len(b'createType="')
+        type_end = descriptor.find(b'"', type_idx)
+        # Make sure we don't grab and log a huge chunk of data in a
+        # maliciously-formatted descriptor region
+        if type_end - type_idx < 64:
+            vmdktype = descriptor[type_idx:type_end]
+        else:
+            vmdktype = b'formatnotfound'
+        if vmdktype not in (b'monolithicSparse', b'streamOptimized'):
+            LOG.warning('Unsupported VMDK format %s', vmdktype)
+            return 0
+
+        # If we have the descriptor, we definitely have the header
+        _sig, _ver, _flags, sectors, _grain, _desc_sec, _desc_num = (
+            struct.unpack('<IIIQQQQ', self.region('header').data[:44]))
+
+        return sectors * 512
+
+    def safety_check(self):
+        if (not self.has_region('descriptor') or
+                not self.region('descriptor').complete):
+            return False
+
+        try:
+            # Descriptor is padded to 512 bytes
+            desc_data = self.region('descriptor').data.rstrip(b'\x00')
+            # Descriptor is actually case-insensitive ASCII text
+            desc_text = desc_data.decode('ascii').lower()
+        except UnicodeDecodeError:
+            LOG.error('VMDK descriptor failed to decode as ASCII')
+            raise ImageFormatError('Invalid VMDK descriptor data')
+
+        extent_access = ('rw', 'rdonly', 'noaccess')
+        header_fields = []
+        extents = []
+        ddb = []
+
+        # NOTE(danms): Cautiously parse the VMDK descriptor. Each line must
+        # be something we understand, otherwise we refuse it.
+        for line in [x.strip() for x in desc_text.split('\n')]:
+            if line.startswith('#') or not line:
+                # Blank or comment lines are ignored
+                continue
+            elif line.startswith('ddb'):
+                # DDB lines are allowed (but not used by us)
+                ddb.append(line)
+            elif '=' in line and ' ' not in line.split('=')[0]:
+                # Header fields are a single word followed by an '=' and some
+                # value
+                header_fields.append(line)
+            elif line.split(' ')[0] in extent_access:
+                # Extent lines start with one of the three access modes
+                extents.append(line)
+            else:
+                # Anything else results in a rejection
+                LOG.error('Unsupported line %r in VMDK descriptor', line)
+                raise ImageFormatError('Invalid VMDK descriptor data')
+
+        # Check all the extent lines for concerning content
+        for extent_line in extents:
+            if '/' in extent_line:
+                LOG.error('Extent line %r contains unsafe characters',
+                          extent_line)
+                return False
+
+        if not extents:
+            LOG.error('VMDK file specified no extents')
+            return False
+
+        return True
+
+    def __str__(self):
+        return 'vmdk'
+
+
+# The VirtualBox VDI format consists of a 512-byte little-endian
+# header, some of which we care about:
+#
+#  Dec   Hex  Name
+#   64  0x40  4-byte Magic (0xbeda107f)
+#   . . .
+#  368 0x170  Size in bytes (uint64_t)
+#
+# https://github.com/qemu/qemu/blob/master/block/vdi.c
+class VDIInspector(FileInspector):
+    """VirtualBox VDI format
+
+    This only needs to store the first 512 bytes of the image.
+    """
+    def __init__(self, *a, **k):
+        super(VDIInspector, self).__init__(*a, **k)
+        self.new_region('header', CaptureRegion(0, 512))
+
+    @property
+    def format_match(self):
+        if not self.region('header').complete:
+            return False
+
+        signature, = struct.unpack('<I', self.region('header').data[0x40:0x44])
+        return signature == 0xbeda107f
+
+    @property
+    def virtual_size(self):
+        if not self.region('header').complete:
+            return 0
+        if not self.format_match:
+            return 0
+
+        size, = struct.unpack('<Q', self.region('header').data[0x170:0x178])
+        return size
+
+    def __str__(self):
+        return 'vdi'
+
+
+class InfoWrapper(object):
+    """A file-like object that wraps another and updates a format inspector.
+
+    This passes chunks to the format inspector while reading. If the inspector
+    fails, it logs the error and stops calling it, but continues proxying data
+    from the source to its user.
+    """
+    def __init__(self, source, fmt):
+        self._source = source
+        self._format = fmt
+        self._error = False
+
+    def __iter__(self):
+        return self
+
+    def _process_chunk(self, chunk):
+        if not self._error:
+            try:
+                self._format.eat_chunk(chunk)
+            except Exception as e:
+                # Absolutely do not allow the format inspector to break
+                # our streaming of the image. If we failed, just stop
+                # trying, log and keep going.
+                LOG.error('Format inspector failed, aborting: %s', e)
+                self._error = True
+
+    def __next__(self):
+        try:
+            chunk = next(self._source)
+        except StopIteration:
+            raise
+        self._process_chunk(chunk)
+        return chunk
+
+    def read(self, size):
+        chunk = self._source.read(size)
+        self._process_chunk(chunk)
+        return chunk
+
+    def close(self):
+        if hasattr(self._source, 'close'):
+            self._source.close()
+
+
+ALL_FORMATS = {
+    'raw': FileInspector,
+    'qcow2': QcowInspector,
+    'vhd': VHDInspector,
+    'vhdx': VHDXInspector,
+    'vmdk': VMDKInspector,
+    'vdi': VDIInspector,
+    'qed': QEDInspector,
+}
+
+
+def get_inspector(format_name):
+    """Returns a FormatInspector class based on the given name.
+
+    :param format_name: The name of the disk_format (raw, qcow2, etc).
+    :returns: A FormatInspector or None if unsupported.
+    """
+    return ALL_FORMATS.get(format_name)
+
+
+def detect_file_format(filename):
+    """Attempts to detect the format of a file.
+
+    This runs through a file one time, running all the known inspectors in
+    parallel. It stops reading the file once one of them matches or all of
+    them are sure they don't match.
+
+    Returns the FileInspector that matched, if any. None if 'raw'.
+    """
+    inspectors = {k: v() for k, v in ALL_FORMATS.items()}
+    with open(filename, 'rb') as f:
+        for chunk in chunked_reader(f):
+            for format, inspector in list(inspectors.items()):
+                try:
+                    inspector.eat_chunk(chunk)
+                except ImageFormatError:
+                    # No match, so stop considering this format
+                    inspectors.pop(format)
+                    continue
+                if (inspector.format_match and inspector.complete and
+                        format != 'raw'):
+                    # First complete match (other than raw) wins
+                    return inspector
+            if all(i.complete for i in inspectors.values()):
+                # If all the inspectors are sure they are not a match, avoid
+                # reading to the end of the file to settle on 'raw'.
+                break
+    return inspectors['raw']
diff --git a/cinder/image/image_utils.py b/cinder/image/image_utils.py
index 4828a0d91df..30dd17a0555 100644
--- a/cinder/image/image_utils.py
+++ b/cinder/image/image_utils.py
@@ -52,6 +52,7 @@ from cinder import exception
 from cinder.i18n import _
 from cinder.image import accelerator
 from cinder.image import glance
+import cinder.privsep.format_inspector
 from cinder import utils
 from cinder.volume import throttling
 from cinder.volume import volume_utils
@@ -160,11 +161,26 @@ def from_qemu_img_disk_format(disk_format: str) -> str:
     return QEMU_IMG_FORMAT_MAP_INV.get(disk_format, disk_format)
 
 
-def qemu_img_info(path: str,
-                  run_as_root: bool = True,
-                  force_share: bool = False) -> imageutils.QemuImgInfo:
+def qemu_img_info(
+        path: str,
+        run_as_root: bool = True,
+        force_share: bool = False,
+        allow_qcow2_backing_file: bool = False) -> imageutils.QemuImgInfo:
     """Return an object containing the parsed output from qemu-img info."""
-    cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info', '--output=json']
+
+    format_name = cinder.privsep.format_inspector.get_format_if_safe(
+        path=path,
+        allow_qcow2_backing_file=allow_qcow2_backing_file)
+    if format_name is None:
+        LOG.warning('Image/Volume %s failed safety check', path)
+        # NOTE(danms): This is the same exception as would be raised
+        # by qemu_img_info() if the disk format was unreadable or
+        # otherwise unsuitable.
+        raise exception.Invalid(
+            reason=_('Image/Volume failed safety check'))
+
+    cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info',
+           '-f', format_name, '--output=json']
     if force_share:
         if qemu_img_supports_force_share():
             cmd.append('--force-share')
@@ -180,8 +196,32 @@ def qemu_img_info(path: str,
                               prlimit=QEMU_IMG_LIMITS)
     info = imageutils.QemuImgInfo(out, format='json')
 
+    # FIXME: figure out a more elegant way to do this
+    if info.file_format == 'raw':
+        # The format_inspector will detect a luks image as 'raw', and then when
+        # we call qemu-img info -f raw above, we don't get any of the luks
+        # format-specific info (some of which is used in the create_volume
+        # flow).  So we need to check if this is really a luks container.
+        # (We didn't have to do this in the past because we called
+        # qemu-img info without -f.)
+        cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info',
+               '-f', 'luks', '--output=json']
+        if force_share:
+            cmd.append('--force-share')
+        cmd.append(path)
+        if os.name == 'nt':
+            cmd = cmd[2:]
+        try:
+            out, _err = utils.execute(*cmd, run_as_root=run_as_root,
+                                      prlimit=QEMU_IMG_LIMITS)
+            info = imageutils.QemuImgInfo(out, format='json')
+        except processutils.ProcessExecutionError:
+            # we'll just use the info object we already got earlier
+            pass
+
     # From Cinder's point of view, any 'luks' formatted images
-    # should be treated as 'raw'.
+    # should be treated as 'raw'.  (This changes the file_format, but
+    # not any of the format-specific information.)
     if info.file_format == 'luks':
         info.file_format = 'raw'
 
@@ -687,6 +727,35 @@ def get_qemu_data(image_id: str,
     return data
 
 
+def check_qcow2_image(image_id: str, data: imageutils.QemuImgInfo) -> None:
+    """Check some rules about qcow2 images.
+
+    Does not check for a backing_file, because cinder has some legitimate
+    use cases for qcow2 backing files.
+
+    Makes sure the image:
+
+    - does not have a data_file
+
+    :param image_id: the image id
+    :param data: an imageutils.QemuImgInfo object
+    :raises ImageUnacceptable: when the image fails the check
+    """
+    try:
+        data_file = data.format_specific['data'].get('data-file')
+    except (KeyError, TypeError):
+        LOG.debug('Unexpected response from qemu-img info when processing '
+                  'image %s: missing format-specific info for a qcow2 image',
+                  image_id)
+        msg = _('Cannot determine format-specific information')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+    if data_file:
+        LOG.warning("Refusing to process qcow2 file with data-file '%s'",
+                    data_file)
+        msg = _('A qcow2 format image is not allowed to have a data file')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+
+
 def check_vmdk_image(image_id: str, data: imageutils.QemuImgInfo) -> None:
     """Check some rules about VMDK images.
 
@@ -777,6 +846,8 @@ def check_image_format(source: str,
 
     if data.file_format == 'vmdk':
         check_vmdk_image(image_id, data)
+    if data.file_format == 'qcow2':
+        check_qcow2_image(image_id, data)
 
 
 def fetch_verify_image(context: context.RequestContext,
@@ -819,6 +890,11 @@ def fetch_verify_image(context: context.RequestContext,
             if fmt == 'vmdk':
                 check_vmdk_image(image_id, data)
 
+            # Bug #2059809: a qcow2 can have a data file that's similar
+            # to a backing file and is also unacceptable
+            if fmt == 'qcow2':
+                check_qcow2_image(image_id, data)
+
 
 def fetch_to_vhd(context: context.RequestContext,
                  image_service: glance.GlanceImageService,
diff --git a/cinder/privsep/format_inspector.py b/cinder/privsep/format_inspector.py
new file mode 100644
index 00000000000..b89f5009492
--- /dev/null
+++ b/cinder/privsep/format_inspector.py
@@ -0,0 +1,38 @@
+# Copyright 2024 Red Hat, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Helpers for the image format_inspector.
+"""
+
+
+from cinder.image import format_inspector
+import cinder.privsep
+
+
+@cinder.privsep.sys_admin_pctxt.entrypoint
+def get_format_if_safe(path, allow_qcow2_backing_file):
+    """Returns a str format name if the format is safe, otherwise None"""
+    return _get_format_if_safe(path, allow_qcow2_backing_file)
+
+
+def _get_format_if_safe(path, allow_qcow2_backing_file):
+    """Returns a str format name if the format is safe, otherwise None"""
+    inspector = format_inspector.detect_file_format(path)
+    format_name = str(inspector)
+    safe = inspector.safety_check()
+    if not safe and format_name == 'qcow2' and allow_qcow2_backing_file:
+        safe = inspector.safety_check_allow_backing_file()
+    if safe:
+        return format_name
diff --git a/cinder/tests/unit/image/test_format_inspector.py b/cinder/tests/unit/image/test_format_inspector.py
new file mode 100644
index 00000000000..2687461b4a1
--- /dev/null
+++ b/cinder/tests/unit/image/test_format_inspector.py
@@ -0,0 +1,515 @@
+# Copyright 2020 Red Hat, Inc
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import io
+import os
+import re
+import struct
+import subprocess
+import tempfile
+from unittest import mock
+
+from oslo_utils import units
+
+from cinder.image import format_inspector
+from cinder.tests.unit import test
+
+
+def get_size_from_qemu_img(filename):
+    output = subprocess.check_output('qemu-img info "%s"' % filename,
+                                     shell=True)
+    for line in output.split(b'\n'):
+        m = re.search(b'^virtual size: .* .([0-9]+) bytes', line.strip())
+        if m:
+            return int(m.group(1))
+
+    raise Exception('Could not find virtual size with qemu-img')
+
+
+class TestFormatInspectors(test.TestCase):
+    def setUp(self):
+        super(TestFormatInspectors, self).setUp()
+        self._created_files = []
+
+    def tearDown(self):
+        super(TestFormatInspectors, self).tearDown()
+        for fn in self._created_files:
+            try:
+                os.remove(fn)
+            except Exception:
+                pass
+
+    def _create_img(self, fmt, size, subformat=None, options=None,
+                    backing_file=None):
+        if fmt == 'vhd':
+            # QEMU calls the vhd format vpc
+            fmt = 'vpc'
+
+        if options is None:
+            options = {}
+        opt = ''
+        prefix = 'glance-unittest-formatinspector-'
+
+        if subformat:
+            options['subformat'] = subformat
+            prefix += subformat + '-'
+
+        if options:
+            opt += '-o ' + ','.join('%s=%s' % (k, v)
+                                    for k, v in options.items())
+
+        if backing_file is not None:
+            opt += ' -b %s -F raw' % backing_file
+
+        fn = tempfile.mktemp(prefix=prefix,
+                             suffix='.%s' % fmt)
+        self._created_files.append(fn)
+        subprocess.check_output(
+            'qemu-img create -f %s %s %s %i' % (fmt, opt, fn, size),
+            shell=True)
+        return fn
+
+    def _create_allocated_vmdk(self, size_mb, subformat=None):
+        # We need a "big" VMDK file to exercise some parts of the code of the
+        # format_inspector. A way to create one is to first create an empty
+        # file, and then to convert it with the -S 0 option.
+
+        if subformat is None:
+            # Matches qemu-img default, see `qemu-img convert -O vmdk -o help`
+            subformat = 'monolithicSparse'
+
+        prefix = 'glance-unittest-formatinspector-%s-' % subformat
+        fn = tempfile.mktemp(prefix=prefix, suffix='.vmdk')
+        self._created_files.append(fn)
+        raw = tempfile.mktemp(prefix=prefix, suffix='.raw')
+        self._created_files.append(raw)
+
+        # Create a file with pseudo-random data, otherwise it will get
+        # compressed in the streamOptimized format
+        subprocess.check_output(
+            'dd if=/dev/urandom of=%s bs=1M count=%i' % (raw, size_mb),
+            shell=True)
+
+        # Convert it to VMDK
+        subprocess.check_output(
+            'qemu-img convert -f raw -O vmdk -o subformat=%s -S 0 %s %s' % (
+                subformat, raw, fn),
+            shell=True)
+        return fn
+
+    def _test_format_at_block_size(self, format_name, img, block_size):
+        fmt = format_inspector.get_inspector(format_name)()
+        self.assertIsNotNone(fmt,
+                             'Did not get format inspector for %s' % (
+                                 format_name))
+        wrapper = format_inspector.InfoWrapper(open(img, 'rb'), fmt)
+
+        while True:
+            chunk = wrapper.read(block_size)
+            if not chunk:
+                break
+
+        wrapper.close()
+        return fmt
+
+    def _test_format_at_image_size(self, format_name, image_size,
+                                   subformat=None):
+        img = self._create_img(format_name, image_size, subformat=subformat)
+
+        # Some formats have internal alignment restrictions making this not
+        # always exactly like image_size, so get the real value for comparison
+        virtual_size = get_size_from_qemu_img(img)
+
+        # Read the format in various sizes, some of which will read whole
+        # sections in a single read, others will be completely unaligned, etc.
+        for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
+            fmt = self._test_format_at_block_size(format_name, img, block_size)
+            self.assertTrue(fmt.format_match,
+                            'Failed to match %s at size %i block %i' % (
+                                format_name, image_size, block_size))
+            self.assertEqual(virtual_size, fmt.virtual_size,
+                             ('Failed to calculate size for %s at size %i '
+                              'block %i') % (format_name, image_size,
+                                             block_size))
+            memory = sum(fmt.context_info.values())
+            self.assertLess(memory, 512 * units.Ki,
+                            'Format used more than 512KiB of memory: %s' % (
+                                fmt.context_info))
+
+    def _test_format(self, format_name, subformat=None):
+        # Try a few different image sizes, including some odd and very small
+        # sizes
+        for image_size in (512, 513, 2057, 7):
+            self._test_format_at_image_size(format_name, image_size * units.Mi,
+                                            subformat=subformat)
+
+    def test_qcow2(self):
+        self._test_format('qcow2')
+
+    def test_vhd(self):
+        self._test_format('vhd')
+
+    def test_vhdx(self):
+        self._test_format('vhdx')
+
+    def test_vmdk(self):
+        self._test_format('vmdk')
+
+    def test_vmdk_stream_optimized(self):
+        self._test_format('vmdk', 'streamOptimized')
+
+    def test_from_file_reads_minimum(self):
+        img = self._create_img('qcow2', 10 * units.Mi)
+        file_size = os.stat(img).st_size
+        fmt = format_inspector.QcowInspector.from_file(img)
+        # We know everything we need from the first 512 bytes of a QCOW image,
+        # so make sure that we did not read the whole thing when we inspect
+        # a local file.
+        self.assertLess(fmt.actual_size, file_size)
+
+    def test_qed_always_unsafe(self):
+        img = self._create_img('qed', 10 * units.Mi)
+        fmt = format_inspector.get_inspector('qed').from_file(img)
+        self.assertTrue(fmt.format_match)
+        self.assertFalse(fmt.safety_check())
+
+    def _test_vmdk_bad_descriptor_offset(self, subformat=None):
+        format_name = 'vmdk'
+        image_size = 10 * units.Mi
+        descriptorOffsetAddr = 0x1c
+        BAD_ADDRESS = 0x400
+        img = self._create_img(format_name, image_size, subformat=subformat)
+
+        # Corrupt the header
+        fd = open(img, 'r+b')
+        fd.seek(descriptorOffsetAddr)
+        fd.write(struct.pack('<Q', BAD_ADDRESS // 512))
+        fd.close()
+
+        # Read the format in various sizes, some of which will read whole
+        # sections in a single read, others will be completely unaligned, etc.
+        for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
+            fmt = self._test_format_at_block_size(format_name, img, block_size)
+            self.assertTrue(fmt.format_match,
+                            'Failed to match %s at size %i block %i' % (
+                                format_name, image_size, block_size))
+            self.assertEqual(0, fmt.virtual_size,
+                             ('Calculated a virtual size for a corrupt %s at '
+                              'size %i block %i') % (format_name, image_size,
+                                                     block_size))
+
+    def test_vmdk_bad_descriptor_offset(self):
+        self._test_vmdk_bad_descriptor_offset()
+
+    def test_vmdk_bad_descriptor_offset_stream_optimized(self):
+        self._test_vmdk_bad_descriptor_offset(subformat='streamOptimized')
+
+    def _test_vmdk_bad_descriptor_mem_limit(self, subformat=None):
+        format_name = 'vmdk'
+        image_size = 5 * units.Mi
+        virtual_size = 5 * units.Mi
+        descriptorOffsetAddr = 0x1c
+        descriptorSizeAddr = descriptorOffsetAddr + 8
+        twoMBInSectors = (2 << 20) // 512
+        # We need a big VMDK because otherwise we will not have enough data to
+        # fill-up the CaptureRegion.
+        img = self._create_allocated_vmdk(image_size // units.Mi,
+                                          subformat=subformat)
+
+        # Corrupt the end of descriptor address so it "ends" at 2MB
+        fd = open(img, 'r+b')
+        fd.seek(descriptorSizeAddr)
+        fd.write(struct.pack('<Q', twoMBInSectors))
+        fd.close()
+
+        # Read the format in various sizes, some of which will read whole
+        # sections in a single read, others will be completely unaligned, etc.
+        for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
+            fmt = self._test_format_at_block_size(format_name, img, block_size)
+            self.assertTrue(fmt.format_match,
+                            'Failed to match %s at size %i block %i' % (
+                                format_name, image_size, block_size))
+            self.assertEqual(virtual_size, fmt.virtual_size,
+                             ('Failed to calculate size for %s at size %i '
+                              'block %i') % (format_name, image_size,
+                                             block_size))
+            memory = sum(fmt.context_info.values())
+            self.assertLess(memory, 1.5 * units.Mi,
+                            'Format used more than 1.5MiB of memory: %s' % (
+                                fmt.context_info))
+
+    def test_vmdk_bad_descriptor_mem_limit(self):
+        self._test_vmdk_bad_descriptor_mem_limit()
+
+    def test_vmdk_bad_descriptor_mem_limit_stream_optimized(self):
+        self._test_vmdk_bad_descriptor_mem_limit(subformat='streamOptimized')
+
+    def test_qcow2_safety_checks(self):
+        # Create backing and data-file names (and initialize the backing file)
+        backing_fn = tempfile.mktemp(prefix='backing')
+        self._created_files.append(backing_fn)
+        with open(backing_fn, 'w') as f:
+            f.write('foobar')
+        data_fn = tempfile.mktemp(prefix='data')
+        self._created_files.append(data_fn)
+
+        # A qcow with no backing or data file is safe
+        fn = self._create_img('qcow2', 5 * units.Mi, None)
+        inspector = format_inspector.QcowInspector.from_file(fn)
+        self.assertTrue(inspector.safety_check())
+        self.assertTrue(inspector.safety_check_allow_backing_file())
+
+        # A backing file makes it unsafe
+        fn = self._create_img('qcow2', 5 * units.Mi, None,
+                              backing_file=backing_fn)
+        inspector = format_inspector.QcowInspector.from_file(fn)
+        self.assertFalse(inspector.safety_check())
+        # ... unless we explictly allow a backing file
+        self.assertTrue(inspector.safety_check_allow_backing_file())
+
+        # A data-file makes it unsafe
+        fn = self._create_img('qcow2', 5 * units.Mi,
+                              options={'data_file': data_fn,
+                                       'data_file_raw': 'on'})
+        inspector = format_inspector.QcowInspector.from_file(fn)
+        self.assertFalse(inspector.safety_check())
+        # ... and it remains unsafe even if we allow a backing file
+        self.assertFalse(inspector.safety_check_allow_backing_file())
+
+        # Trying to load a non-QCOW file is an error
+        self.assertRaises(format_inspector.ImageFormatError,
+                          format_inspector.QcowInspector.from_file,
+                          backing_fn)
+
+    def test_qcow2_feature_flag_checks(self):
+        data = bytearray(512)
+        data[0:4] = b'QFI\xFB'
+        inspector = format_inspector.QcowInspector()
+        inspector.region('header').data = data
+
+        # All zeros, no feature flags - all good
+        self.assertFalse(inspector.has_unknown_features)
+
+        # A feature flag set in the first byte (highest-order) is not
+        # something we know about, so fail.
+        data[0x48] = 0x01
+        self.assertTrue(inspector.has_unknown_features)
+
+        # The first bit in the last byte (lowest-order) is known (the dirty
+        # bit) so that should pass
+        data[0x48] = 0x00
+        data[0x4F] = 0x01
+        self.assertFalse(inspector.has_unknown_features)
+
+        # Currently (as of 2024), the high-order feature flag bit in the low-
+        # order byte is not assigned, so make sure we reject it.
+        data[0x4F] = 0x80
+        self.assertTrue(inspector.has_unknown_features)
+
+    def test_vdi(self):
+        self._test_format('vdi')
+
+    def _test_format_with_invalid_data(self, format_name):
+        fmt = format_inspector.get_inspector(format_name)()
+        wrapper = format_inspector.InfoWrapper(open(__file__, 'rb'), fmt)
+        while True:
+            chunk = wrapper.read(32)
+            if not chunk:
+                break
+
+        wrapper.close()
+        self.assertFalse(fmt.format_match)
+        self.assertEqual(0, fmt.virtual_size)
+        memory = sum(fmt.context_info.values())
+        self.assertLess(memory, 512 * units.Ki,
+                        'Format used more than 512KiB of memory: %s' % (
+                            fmt.context_info))
+
+    def test_qcow2_invalid(self):
+        self._test_format_with_invalid_data('qcow2')
+
+    def test_vhd_invalid(self):
+        self._test_format_with_invalid_data('vhd')
+
+    def test_vhdx_invalid(self):
+        self._test_format_with_invalid_data('vhdx')
+
+    def test_vmdk_invalid(self):
+        self._test_format_with_invalid_data('vmdk')
+
+    def test_vdi_invalid(self):
+        self._test_format_with_invalid_data('vdi')
+
+    def test_vmdk_invalid_type(self):
+        fmt = format_inspector.get_inspector('vmdk')()
+        wrapper = format_inspector.InfoWrapper(open(__file__, 'rb'), fmt)
+        while True:
+            chunk = wrapper.read(32)
+            if not chunk:
+                break
+
+        wrapper.close()
+
+        fake_rgn = mock.MagicMock()
+        fake_rgn.complete = True
+        fake_rgn.data = b'foocreateType="someunknownformat"bar'
+
+        with mock.patch.object(fmt, 'has_region', return_value=True):
+            with mock.patch.object(fmt, 'region', return_value=fake_rgn):
+                self.assertEqual(0, fmt.virtual_size)
+
+
+class TestFormatInspectorInfra(test.TestCase):
+    def _test_capture_region_bs(self, bs):
+        data = b''.join(chr(x).encode() for x in range(ord('A'), ord('z')))
+
+        regions = [
+            format_inspector.CaptureRegion(3, 9),
+            format_inspector.CaptureRegion(0, 256),
+            format_inspector.CaptureRegion(32, 8),
+        ]
+
+        for region in regions:
+            # None of them should be complete yet
+            self.assertFalse(region.complete)
+
+        pos = 0
+        for i in range(0, len(data), bs):
+            chunk = data[i:i + bs]
+            pos += len(chunk)
+            for region in regions:
+                region.capture(chunk, pos)
+
+        self.assertEqual(data[3:12], regions[0].data)
+        self.assertEqual(data[0:256], regions[1].data)
+        self.assertEqual(data[32:40], regions[2].data)
+
+        # The small regions should be complete
+        self.assertTrue(regions[0].complete)
+        self.assertTrue(regions[2].complete)
+
+        # This region extended past the available data, so not complete
+        self.assertFalse(regions[1].complete)
+
+    def test_capture_region(self):
+        for block_size in (1, 3, 7, 13, 32, 64):
+            self._test_capture_region_bs(block_size)
+
+    def _get_wrapper(self, data):
+        source = io.BytesIO(data)
+        fake_fmt = mock.create_autospec(format_inspector.get_inspector('raw'))
+        return format_inspector.InfoWrapper(source, fake_fmt)
+
+    def test_info_wrapper_file_like(self):
+        data = b''.join(chr(x).encode() for x in range(ord('A'), ord('z')))
+        wrapper = self._get_wrapper(data)
+
+        read_data = b''
+        while True:
+            chunk = wrapper.read(8)
+            if not chunk:
+                break
+            read_data += chunk
+
+        self.assertEqual(data, read_data)
+
+    def test_info_wrapper_iter_like(self):
+        data = b''.join(chr(x).encode() for x in range(ord('A'), ord('z')))
+        wrapper = self._get_wrapper(data)
+
+        read_data = b''
+        for chunk in wrapper:
+            read_data += chunk
+
+        self.assertEqual(data, read_data)
+
+    def test_info_wrapper_file_like_eats_error(self):
+        wrapper = self._get_wrapper(b'123456')
+        wrapper._format.eat_chunk.side_effect = Exception('fail')
+
+        data = b''
+        while True:
+            chunk = wrapper.read(3)
+            if not chunk:
+                break
+            data += chunk
+
+        # Make sure we got all the data despite the error
+        self.assertEqual(b'123456', data)
+
+        # Make sure we only called this once and never again after
+        # the error was raised
+        wrapper._format.eat_chunk.assert_called_once_with(b'123')
+
+    def test_info_wrapper_iter_like_eats_error(self):
+        fake_fmt = mock.create_autospec(format_inspector.get_inspector('raw'))
+        wrapper = format_inspector.InfoWrapper(iter([b'123', b'456']),
+                                               fake_fmt)
+        fake_fmt.eat_chunk.side_effect = Exception('fail')
+
+        data = b''
+        for chunk in wrapper:
+            data += chunk
+
+        # Make sure we got all the data despite the error
+        self.assertEqual(b'123456', data)
+
+        # Make sure we only called this once and never again after
+        # the error was raised
+        fake_fmt.eat_chunk.assert_called_once_with(b'123')
+
+    def test_get_inspector(self):
+        self.assertEqual(format_inspector.QcowInspector,
+                         format_inspector.get_inspector('qcow2'))
+        self.assertIsNone(format_inspector.get_inspector('foo'))
+
+
+class TestFormatInspectorsTargeted(test.TestCase):
+    def _make_vhd_meta(self, guid_raw, item_length):
+        # Meta region header, padded to 32 bytes
+        data = struct.pack('<8sHH', b'metadata', 0, 1)
+        data += b'0' * 20
+
+        # Metadata table entry, 16-byte GUID, 12-byte information,
+        # padded to 32-bytes
+        data += guid_raw
+        data += struct.pack('<III', 256, item_length, 0)
+        data += b'0' * 6
+
+        return data
+
+    def test_vhd_table_over_limit(self):
+        ins = format_inspector.VHDXInspector()
+        meta = format_inspector.CaptureRegion(0, 0)
+        desired = b'012345678ABCDEF0'
+        # This is a poorly-crafted image that specifies a larger table size
+        # than is allowed
+        meta.data = self._make_vhd_meta(desired, 33 * 2048)
+        ins.new_region('metadata', meta)
+        new_region = ins._find_meta_entry(ins._guid(desired))
+        # Make sure we clamp to our limit of 32 * 2048
+        self.assertEqual(
+            format_inspector.VHDXInspector.VHDX_METADATA_TABLE_MAX_SIZE,
+            new_region.length)
+
+    def test_vhd_table_under_limit(self):
+        ins = format_inspector.VHDXInspector()
+        meta = format_inspector.CaptureRegion(0, 0)
+        desired = b'012345678ABCDEF0'
+        meta.data = self._make_vhd_meta(desired, 16 * 2048)
+        ins.new_region('metadata', meta)
+        new_region = ins._find_meta_entry(ins._guid(desired))
+        # Table size was under the limit, make sure we get it back
+        self.assertEqual(16 * 2048, new_region.length)
diff --git a/cinder/tests/unit/privsep/test_format_inspector.py b/cinder/tests/unit/privsep/test_format_inspector.py
new file mode 100644
index 00000000000..ca5ae604c02
--- /dev/null
+++ b/cinder/tests/unit/privsep/test_format_inspector.py
@@ -0,0 +1,110 @@
+# Copyright 2024 Red Hat, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from unittest import mock
+
+from cinder.privsep import format_inspector as pfi
+from cinder.tests.unit import test
+
+
+class TestFormatInspectorHelper(test.TestCase):
+    @mock.patch('cinder.image.format_inspector.detect_file_format')
+    def test_get_format_if_safe__happy_path(self, mock_detect):
+        mock_inspector = mock.MagicMock()
+        mock_inspector.__str__.return_value = 'mock_fmt'
+        mock_safety = mock_inspector.safety_check
+        mock_safety.return_value = True
+        mock_backing = mock_inspector.safety_check_allow_backing_file
+        mock_detect.return_value = mock_inspector
+
+        test_path = mock.sentinel.path
+
+        fmt_name = pfi._get_format_if_safe(path=test_path,
+                                           allow_qcow2_backing_file=False)
+        self.assertEqual(fmt_name, 'mock_fmt')
+        mock_safety.assert_called_once_with()
+        mock_backing.assert_not_called()
+
+    @mock.patch('cinder.image.format_inspector.detect_file_format')
+    def test_get_format_if_safe__allow_backing(self, mock_detect):
+        mock_inspector = mock.MagicMock()
+        mock_inspector.__str__.return_value = 'qcow2'
+        mock_safety = mock_inspector.safety_check
+        mock_safety.return_value = False
+        mock_backing = mock_inspector.safety_check_allow_backing_file
+        mock_backing.return_value = True
+        mock_detect.return_value = mock_inspector
+
+        test_path = mock.sentinel.path
+
+        fmt_name = pfi._get_format_if_safe(path=test_path,
+                                           allow_qcow2_backing_file=True)
+        self.assertEqual(fmt_name, 'qcow2')
+        mock_safety.assert_called_once_with()
+        mock_backing.assert_called_once_with()
+
+    @mock.patch('cinder.image.format_inspector.detect_file_format')
+    def test_get_format_if_safe__backing_fail(self, mock_detect):
+        """backing flag should only work for qcow2"""
+        mock_inspector = mock.MagicMock()
+        mock_inspector.__str__.return_value = 'mock_fmt'
+        mock_safety = mock_inspector.safety_check
+        mock_safety.return_value = False
+        mock_backing = mock_inspector.safety_check_allow_backing_file
+        mock_detect.return_value = mock_inspector
+
+        test_path = mock.sentinel.path
+
+        fmt_name = pfi._get_format_if_safe(path=test_path,
+                                           allow_qcow2_backing_file=True)
+        self.assertIsNone(fmt_name)
+        mock_safety.assert_called_once_with()
+        mock_backing.assert_not_called()
+
+    @mock.patch('cinder.image.format_inspector.detect_file_format')
+    def test_get_format_if_safe__allow_backing_but_other_problem(
+            self, mock_detect):
+        mock_inspector = mock.MagicMock()
+        mock_inspector.__str__.return_value = 'qcow2'
+        mock_safety = mock_inspector.safety_check
+        mock_safety.return_value = False
+        mock_backing = mock_inspector.safety_check_allow_backing_file
+        mock_backing.return_value = False
+        mock_detect.return_value = mock_inspector
+
+        test_path = mock.sentinel.path
+
+        fmt_name = pfi._get_format_if_safe(path=test_path,
+                                           allow_qcow2_backing_file=True)
+        self.assertIsNone(fmt_name)
+        mock_safety.assert_called_once_with()
+        mock_backing.assert_called_once_with()
+
+    @mock.patch('cinder.image.format_inspector.detect_file_format')
+    def test_get_format_if_safe__unsafe(self, mock_detect):
+        mock_inspector = mock.MagicMock()
+        mock_inspector.__str__.return_value = 'mock_fmt'
+        mock_safety = mock_inspector.safety_check
+        mock_safety.return_value = False
+        mock_backing = mock_inspector.safety_check_allow_backing_file
+        mock_detect.return_value = mock_inspector
+
+        test_path = mock.sentinel.path
+
+        fmt_name = pfi._get_format_if_safe(path=test_path,
+                                           allow_qcow2_backing_file=False)
+        self.assertIsNone(fmt_name)
+        mock_safety.assert_called_once_with()
+        mock_backing.assert_not_called()
diff --git a/cinder/tests/unit/test_image_utils.py b/cinder/tests/unit/test_image_utils.py
index b5b8167a40c..0c75629edde 100644
--- a/cinder/tests/unit/test_image_utils.py
+++ b/cinder/tests/unit/test_image_utils.py
@@ -30,55 +30,186 @@ from cinder.volume import throttling
 
 
 class TestQemuImgInfo(test.TestCase):
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
     @mock.patch('os.name', new='posix')
     @mock.patch('oslo_utils.imageutils.QemuImgInfo')
     @mock.patch('cinder.utils.execute')
-    def test_qemu_img_info(self, mock_exec, mock_info):
+    def test_qemu_img_info(self, mock_exec, mock_info, mock_detect):
         mock_out = mock.sentinel.out
         mock_err = mock.sentinel.err
         test_path = mock.sentinel.path
         mock_exec.return_value = (mock_out, mock_err)
 
+        mock_detect.return_value = 'mock_fmt'
+
         output = image_utils.qemu_img_info(test_path)
-        mock_exec.assert_called_once_with('env', 'LC_ALL=C', 'qemu-img',
-                                          'info', '--output=json', test_path,
-                                          run_as_root=True,
-                                          prlimit=image_utils.QEMU_IMG_LIMITS)
+        mock_exec.assert_called_once_with(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'mock_fmt',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
 
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
     @mock.patch('os.name', new='posix')
     @mock.patch('oslo_utils.imageutils.QemuImgInfo')
     @mock.patch('cinder.utils.execute')
-    def test_qemu_img_info_not_root(self, mock_exec, mock_info):
+    def test_qemu_img_info_qcow2_backing_ok(
+            self, mock_exec, mock_info, mock_detect):
         mock_out = mock.sentinel.out
         mock_err = mock.sentinel.err
         test_path = mock.sentinel.path
         mock_exec.return_value = (mock_out, mock_err)
 
+        mock_detect.return_value = 'qcow2'
+
+        output = image_utils.qemu_img_info(
+            test_path, allow_qcow2_backing_file=True)
+        mock_exec.assert_called_once_with(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'qcow2',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
+        self.assertEqual(mock_info.return_value, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=True)
+
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
+    @mock.patch('os.name', new='posix')
+    @mock.patch('oslo_utils.imageutils.QemuImgInfo')
+    @mock.patch('cinder.utils.execute')
+    def test_qemu_img_info_raw_not_luks(self, mock_exec, mock_info,
+                                        mock_detect):
+        """To determine if a raw image is luks, we call qemu-img twice."""
+        mock_out = mock.sentinel.out
+        mock_err = mock.sentinel.err
+        test_path = mock.sentinel.path
+        mock_exec.side_effect = [(mock_out, mock_err),
+                                 # it's not luks, so raise an error
+                                 processutils.ProcessExecutionError]
+
+        mock_detect.return_value = 'raw'
+
+        mock_data = mock.Mock()
+        mock_data.file_format = 'raw'
+        mock_info.return_value = mock_data
+
+        first = mock.call(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'raw',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
+        second = mock.call(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'luks',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
+
+        output = image_utils.qemu_img_info(test_path)
+        mock_exec.assert_has_calls([first, second])
+        mock_info.assert_called_once()
+        self.assertEqual(mock_info.return_value, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
+
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
+    @mock.patch('os.name', new='posix')
+    @mock.patch('oslo_utils.imageutils.QemuImgInfo')
+    @mock.patch('cinder.utils.execute')
+    def test_qemu_img_info_luks(self, mock_exec, mock_info, mock_detect):
+        # the format_inspector will identify the image as raw, but
+        # we will ask qemu-img for a second opinion, and it say luks
+        mock_out = mock.sentinel.out
+        mock_err = mock.sentinel.err
+        test_path = mock.sentinel.path
+        mock_exec.return_value = (mock_out, mock_err)
+
+        mock_detect.return_value = 'raw'
+
+        mock_data1 = mock.Mock(name='first_time')
+        mock_data1.file_format = 'raw'
+        mock_data2 = mock.Mock(name='second_time')
+        mock_data2.file_format = 'luks'
+        mock_info.side_effect = [mock_data1, mock_data2]
+
+        first = mock.call(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'raw',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
+        second = mock.call(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'luks',
+            '--output=json', test_path, run_as_root=True,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
+
+        output = image_utils.qemu_img_info(test_path)
+        mock_exec.assert_has_calls([first, second])
+        self.assertEqual(2, mock_info.call_count)
+        self.assertEqual(mock_data2, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
+
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
+    @mock.patch('os.name', new='posix')
+    @mock.patch('oslo_utils.imageutils.QemuImgInfo')
+    @mock.patch('cinder.utils.execute')
+    def test_qemu_img_info_not_root(self, mock_exec, mock_info, mock_detect):
+        mock_out = mock.sentinel.out
+        mock_err = mock.sentinel.err
+        test_path = mock.sentinel.path
+        mock_exec.return_value = (mock_out, mock_err)
+
+        mock_detect.return_value = 'mock_fmt'
+
         output = image_utils.qemu_img_info(test_path,
                                            force_share=False,
                                            run_as_root=False)
-        mock_exec.assert_called_once_with('env', 'LC_ALL=C', 'qemu-img',
-                                          'info', '--output=json', test_path,
-                                          run_as_root=False,
-                                          prlimit=image_utils.QEMU_IMG_LIMITS)
+        mock_exec.assert_called_once_with(
+            'env', 'LC_ALL=C', 'qemu-img', 'info', '-f', 'mock_fmt',
+            '--output=json', test_path, run_as_root=False,
+            prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
 
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
     @mock.patch('cinder.image.image_utils.os')
     @mock.patch('oslo_utils.imageutils.QemuImgInfo')
     @mock.patch('cinder.utils.execute')
-    def test_qemu_img_info_on_nt(self, mock_exec, mock_info, mock_os):
+    def test_qemu_img_info_on_nt(self, mock_exec, mock_info, mock_os,
+                                 mock_detect):
         mock_out = mock.sentinel.out
         mock_err = mock.sentinel.err
         test_path = mock.sentinel.path
         mock_exec.return_value = (mock_out, mock_err)
         mock_os.name = 'nt'
 
+        mock_detect.return_value = 'mock_fmt'
+
         output = image_utils.qemu_img_info(test_path)
-        mock_exec.assert_called_once_with('qemu-img', 'info', '--output=json',
-                                          test_path, run_as_root=True,
-                                          prlimit=image_utils.QEMU_IMG_LIMITS)
+        mock_exec.assert_called_once_with(
+            'qemu-img', 'info', '-f', 'mock_fmt', '--output=json',
+            test_path, run_as_root=True, prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
+
+    @mock.patch('cinder.privsep.format_inspector.get_format_if_safe')
+    @mock.patch('os.name', new='posix')
+    @mock.patch('cinder.utils.execute')
+    def test_qemu_img_info_malicious(self, mock_exec, mock_detect):
+        mock_out = mock.sentinel.out
+        mock_err = mock.sentinel.err
+        test_path = mock.sentinel.path
+        mock_exec.return_value = (mock_out, mock_err)
+
+        mock_detect.return_value = None
+
+        self.assertRaises(exception.Invalid,
+                          image_utils.qemu_img_info,
+                          test_path,
+                          force_share=False,
+                          run_as_root=False)
+        mock_exec.assert_not_called()
+        mock_detect.assert_called_once_with(path=test_path,
+                                            allow_qcow2_backing_file=False)
 
     @mock.patch('cinder.utils.execute')
     def test_get_qemu_img_version(self, mock_exec):
@@ -176,6 +307,7 @@ class TestConvertImage(test.TestCase):
         out_format = mock.sentinel.out_format
         mock_info.return_value.file_format = 'qcow2'
         mock_info.return_value.virtual_size = 1048576
+        mock_info.return_value.format_specific = {'data': {}}
         throttle = throttling.Throttle(prefix=['cgcmd'])
 
         with mock.patch('cinder.volume.volume_utils.check_for_odirect_support',
@@ -2158,6 +2290,68 @@ class TestImageUtils(test.TestCase):
                           256)
 
 
+@ddt.ddt(testNameFormat=ddt.TestNameFormat.INDEX_ONLY)
+class TestQcow2ImageChecks(test.TestCase):
+    def setUp(self):
+        super(TestQcow2ImageChecks, self).setUp()
+        # Test data from:
+        # $ qemu-img create -f qcow2 fake.qcow2 1M
+        # $ qemu-img info -f qcow2 fake.qcow2 --output=json
+        qemu_img_info = '''
+        {
+            "virtual-size": 1048576,
+            "filename": "fake.qcow2",
+            "cluster-size": 65536,
+            "format": "qcow2",
+            "actual-size": 200704,
+            "format-specific": {
+                "type": "qcow2",
+                "data": {
+                    "compat": "1.1",
+                    "compression-type": "zlib",
+                    "lazy-refcounts": false,
+                    "refcount-bits": 16,
+                    "corrupt": false,
+                    "extended-l2": false
+                }
+            },
+            "dirty-flag": false
+        }'''
+        self.qdata = imageutils.QemuImgInfo(qemu_img_info, format='json')
+
+    def test_check_qcow2_image_no_problem(self):
+        image_utils.check_qcow2_image(fake.IMAGE_ID, self.qdata)
+
+    def test_check_qcow2_image_with_datafile(self):
+        self.qdata.format_specific['data']['data-file'] = '/not/good'
+        e = self.assertRaises(exception.ImageUnacceptable,
+                              image_utils.check_qcow2_image,
+                              fake.IMAGE_ID,
+                              self.qdata)
+        self.assertIn('not allowed to have a data file', str(e))
+
+    def test_check_qcow2_image_with_backing_file(self):
+        # qcow2 backing file is done as a separate check because
+        # cinder has legitimate uses for a qcow2 with backing file
+        self.qdata.backing_file = '/this/is/ok'
+        image_utils.check_qcow2_image(fake.IMAGE_ID, self.qdata)
+
+    def test_check_qcow2_image_no_barf_bad_data(self):
+        # should never happen, but you never know ...
+        del self.qdata.format_specific['data']
+        e = self.assertRaises(exception.ImageUnacceptable,
+                              image_utils.check_qcow2_image,
+                              fake.IMAGE_ID,
+                              self.qdata)
+        self.assertIn('Cannot determine format-specific', str(e))
+        self.qdata.format_specific = None
+        e = self.assertRaises(exception.ImageUnacceptable,
+                              image_utils.check_qcow2_image,
+                              fake.IMAGE_ID,
+                              self.qdata)
+        self.assertIn('Cannot determine format-specific', str(e))
+
+
 @ddt.ddt(testNameFormat=ddt.TestNameFormat.INDEX_ONLY)
 class TestVmdkImageChecks(test.TestCase):
     def setUp(self):
@@ -2215,7 +2409,7 @@ class TestVmdkImageChecks(test.TestCase):
     def test_check_vmdk_image_handles_missing_info(self):
         expected = 'Unable to determine VMDK createType'
         # remove create-type
-        del(self.qdata_data['create-type'])
+        del (self.qdata_data['create-type'])
         iue = self.assertRaises(exception.ImageUnacceptable,
                                 image_utils.check_vmdk_image,
                                 fake.IMAGE_ID,
@@ -2223,7 +2417,7 @@ class TestVmdkImageChecks(test.TestCase):
         self.assertIn(expected, str(iue))
 
         # remove entire data section
-        del(self.qdata_data)
+        del (self.qdata_data)
         iue = self.assertRaises(exception.ImageUnacceptable,
                                 image_utils.check_vmdk_image,
                                 fake.IMAGE_ID,
@@ -2391,9 +2585,11 @@ class TestImageFormatCheck(test.TestCase):
         }'''
         self.qdata = imageutils.QemuImgInfo(qemu_img_info, format='json')
 
+    @mock.patch('cinder.image.image_utils.check_qcow2_image')
     @mock.patch('cinder.image.image_utils.check_vmdk_image')
     @mock.patch('cinder.image.image_utils.qemu_img_info')
-    def test_check_image_format_defaults(self, mock_info, mock_vmdk):
+    def test_check_image_format_defaults(self, mock_info, mock_vmdk,
+                                         mock_qcow2):
         """Doesn't blow up when only the mandatory arg is passed."""
         src = mock.sentinel.src
         mock_info.return_value = self.qdata
@@ -2413,6 +2609,12 @@ class TestImageFormatCheck(test.TestCase):
         image_utils.check_image_format(src)
         mock_vmdk.assert_called_with(expected_image_id, self.qdata)
 
+        # Bug #2059809: a qcow2 should trigger an additional check
+        mock_info.reset_mock()
+        self.qdata.file_format = 'qcow2'
+        image_utils.check_image_format(src)
+        mock_qcow2.assert_called_with(expected_image_id, self.qdata)
+
     @mock.patch('cinder.image.image_utils.qemu_img_info')
     def test_check_image_format_uses_passed_data(self, mock_info):
         src = mock.sentinel.src
diff --git a/cinder/tests/unit/volume/drivers/test_nfs.py b/cinder/tests/unit/volume/drivers/test_nfs.py
index b1fa90cd124..34c5fac3994 100644
--- a/cinder/tests/unit/volume/drivers/test_nfs.py
+++ b/cinder/tests/unit/volume/drivers/test_nfs.py
@@ -1361,7 +1361,8 @@ class NfsDriverTestCase(test.TestCase):
         mock_read_info_file.assert_called_once_with(info_path)
         mock_img_info.assert_called_once_with(snap_path,
                                               force_share=True,
-                                              run_as_root=True)
+                                              run_as_root=True,
+                                              allow_qcow2_backing_file=True)
         used_qcow = nfs_conf['nfs_qcow2_volumes']
         if encryption:
             mock_convert_image.assert_called_once_with(
@@ -1491,7 +1492,8 @@ class NfsDriverTestCase(test.TestCase):
 
         mock_img_utils.assert_called_once_with(vol_path,
                                                force_share=True,
-                                               run_as_root=True)
+                                               run_as_root=True,
+                                               allow_qcow2_backing_file=True)
         self.assertEqual('nfs', conn_info['driver_volume_type'])
         self.assertEqual(volume.name, conn_info['data']['name'])
         self.assertEqual(self.TEST_MNT_POINT_BASE,
@@ -1506,16 +1508,37 @@ class NfsDriverTestCase(test.TestCase):
         qemu_img_output = """{
     "filename": "%s",
     "format": "iso",
-    "virtual-size": 1073741824,
+    "virtual-size": 10737418240,
     "actual-size": 173000
 }""" % volume['name']
         mock_img_info.return_value = imageutils.QemuImgInfo(qemu_img_output,
                                                             format='json')
 
-        self.assertRaises(exception.InvalidVolume,
-                          drv.initialize_connection,
-                          volume,
-                          None)
+        self.assertRaisesRegex(exception.InvalidVolume,
+                               "must be a valid raw or qcow2 image",
+                               drv.initialize_connection,
+                               volume,
+                               None)
+
+    @mock.patch.object(image_utils, 'qemu_img_info')
+    def test_initialize_connection_raise_on_wrong_size(self, mock_img_info):
+        self._set_driver()
+        drv = self._driver
+        volume = self._simple_volume()
+
+        qemu_img_output = """{
+    "filename": "%s",
+    "format": "qcow2",
+    "virtual-size": 999999999999999,
+    "actual-size": 173000
+}""" % volume['name']
+        mock_img_info.return_value = imageutils.QemuImgInfo(qemu_img_output,
+                                                            format='json')
+        self.assertRaisesRegex(exception.InvalidVolume,
+                               "virtual_size does not match",
+                               drv.initialize_connection,
+                               volume,
+                               None)
 
     def test_create_snapshot(self):
         self._set_driver()
diff --git a/cinder/tests/unit/volume/drivers/test_quobyte.py b/cinder/tests/unit/volume/drivers/test_quobyte.py
index b4e41f04e75..d130f18444e 100644
--- a/cinder/tests/unit/volume/drivers/test_quobyte.py
+++ b/cinder/tests/unit/volume/drivers/test_quobyte.py
@@ -273,7 +273,8 @@ class QuobyteDriverTestCase(test.TestCase):
 
         mock_qemu_img_info.assert_called_with(mock.sentinel.image_path,
                                               force_share=True,
-                                              run_as_root=True)
+                                              run_as_root=True,
+                                              allow_qcow2_backing_file=True)
 
     @ddt.data(['/other_random_path', '/mnt'],
               ['/other_basedir/' + TEST_MNT_HASH + '/volume-' + VOLUME_UUID,
@@ -961,10 +962,11 @@ class QuobyteDriverTestCase(test.TestCase):
         else:
             drv.extend_volume(volume, 3)
 
-            image_utils.qemu_img_info.assert_called_once_with(volume_path,
-                                                              force_share=True,
-                                                              run_as_root=False
-                                                              )
+            image_utils.qemu_img_info.assert_called_once_with(
+                volume_path,
+                force_share=True,
+                run_as_root=False,
+                allow_qcow2_backing_file=True)
             image_utils.resize_image.assert_called_once_with(volume_path, 3)
 
     def test_copy_volume_from_snapshot(self):
@@ -1008,9 +1010,11 @@ class QuobyteDriverTestCase(test.TestCase):
         drv._copy_volume_from_snapshot(snapshot, dest_volume, size)
 
         drv._read_info_file.assert_called_once_with(info_path)
-        image_utils.qemu_img_info.assert_called_once_with(snap_path,
-                                                          force_share=True,
-                                                          run_as_root=False)
+        image_utils.qemu_img_info.assert_called_once_with(
+            snap_path,
+            force_share=True,
+            run_as_root=False,
+            allow_qcow2_backing_file=True)
         (mock_convert.
          assert_called_once_with(src_vol_path,
                                  dest_vol_path,
@@ -1066,9 +1070,11 @@ class QuobyteDriverTestCase(test.TestCase):
         drv._copy_volume_from_snapshot(snapshot, dest_volume, size)
 
         drv._read_info_file.assert_called_once_with(info_path)
-        image_utils.qemu_img_info.assert_called_once_with(snap_path,
-                                                          force_share=True,
-                                                          run_as_root=False)
+        image_utils.qemu_img_info.assert_called_once_with(
+            snap_path,
+            force_share=True,
+            run_as_root=False,
+            allow_qcow2_backing_file=True)
         self.assertFalse(mock_convert.called,
                          ("_convert_image was called but should not have been")
                          )
@@ -1130,9 +1136,11 @@ class QuobyteDriverTestCase(test.TestCase):
         drv._read_info_file.assert_called_once_with(info_path)
         os_ac_mock.assert_called_once_with(
             drv._local_volume_from_snap_cache_path(snapshot), os.F_OK)
-        image_utils.qemu_img_info.assert_called_once_with(snap_path,
-                                                          force_share=True,
-                                                          run_as_root=False)
+        image_utils.qemu_img_info.assert_called_once_with(
+            snap_path,
+            force_share=True,
+            run_as_root=False,
+            allow_qcow2_backing_file=True)
         (mock_convert.
          assert_called_once_with(
              src_vol_path,
@@ -1193,9 +1201,11 @@ class QuobyteDriverTestCase(test.TestCase):
         drv._copy_volume_from_snapshot(snapshot, dest_volume, size)
 
         drv._read_info_file.assert_called_once_with(info_path)
-        image_utils.qemu_img_info.assert_called_once_with(snap_path,
-                                                          force_share=True,
-                                                          run_as_root=False)
+        image_utils.qemu_img_info.assert_called_once_with(
+            snap_path,
+            force_share=True,
+            run_as_root=False,
+            allow_qcow2_backing_file=True)
         (mock_convert.
          assert_called_once_with(
              src_vol_path,
@@ -1264,9 +1274,11 @@ class QuobyteDriverTestCase(test.TestCase):
         conn_info = drv.initialize_connection(volume, None)
 
         drv.get_active_image_from_info.assert_called_once_with(volume)
-        image_utils.qemu_img_info.assert_called_once_with(vol_path,
-                                                          force_share=True,
-                                                          run_as_root=False)
+        image_utils.qemu_img_info.assert_called_once_with(
+            vol_path,
+            force_share=True,
+            run_as_root=False,
+            allow_qcow2_backing_file=True)
 
         self.assertEqual('raw', conn_info['data']['format'])
         self.assertEqual('quobyte', conn_info['driver_volume_type'])
@@ -1315,9 +1327,11 @@ class QuobyteDriverTestCase(test.TestCase):
 
             mock_get_active_image_from_info.assert_called_once_with(volume)
             mock_local_volume_dir.assert_called_once_with(volume)
-            mock_qemu_img_info.assert_called_once_with(volume_path,
-                                                       force_share=True,
-                                                       run_as_root=False)
+            mock_qemu_img_info.assert_called_once_with(
+                volume_path,
+                force_share=True,
+                run_as_root=False,
+                allow_qcow2_backing_file=True)
             mock_upload_volume.assert_called_once_with(
                 mock.ANY, mock.ANY, mock.ANY, upload_path, run_as_root=False,
                 store_id=None, base_image_ref=None, compress=True,
@@ -1368,9 +1382,11 @@ class QuobyteDriverTestCase(test.TestCase):
 
             mock_get_active_image_from_info.assert_called_once_with(volume)
             mock_local_volume_dir.assert_called_with(volume)
-            mock_qemu_img_info.assert_called_once_with(volume_path,
-                                                       force_share=True,
-                                                       run_as_root=False)
+            mock_qemu_img_info.assert_called_once_with(
+                volume_path,
+                force_share=True,
+                run_as_root=False,
+                allow_qcow2_backing_file=True)
             mock_convert_image.assert_called_once_with(
                 volume_path, upload_path, 'raw', run_as_root=False)
             mock_upload_volume.assert_called_once_with(
@@ -1425,9 +1441,11 @@ class QuobyteDriverTestCase(test.TestCase):
 
             mock_get_active_image_from_info.assert_called_once_with(volume)
             mock_local_volume_dir.assert_called_with(volume)
-            mock_qemu_img_info.assert_called_once_with(volume_path,
-                                                       force_share=True,
-                                                       run_as_root=False)
+            mock_qemu_img_info.assert_called_once_with(
+                volume_path,
+                force_share=True,
+                run_as_root=False,
+                allow_qcow2_backing_file=True)
             mock_convert_image.assert_called_once_with(
                 volume_path, upload_path, 'raw', run_as_root=False)
             mock_upload_volume.assert_called_once_with(
diff --git a/cinder/tests/unit/volume/drivers/test_remotefs.py b/cinder/tests/unit/volume/drivers/test_remotefs.py
index cbb2ca1e600..72b948779e2 100644
--- a/cinder/tests/unit/volume/drivers/test_remotefs.py
+++ b/cinder/tests/unit/volume/drivers/test_remotefs.py
@@ -533,7 +533,8 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
 
         mock_qemu_img_info.assert_called_with(mock.sentinel.image_path,
                                               force_share=False,
-                                              run_as_root=True)
+                                              run_as_root=True,
+                                              allow_qcow2_backing_file=True)
 
     @ddt.data([None, '/fake_basedir'],
               ['/fake_basedir/cb2016/fake_vol_name', '/fake_basedir'],
diff --git a/cinder/volume/drivers/nfs.py b/cinder/volume/drivers/nfs.py
index 2ef4cb42e8d..425f87b9182 100644
--- a/cinder/volume/drivers/nfs.py
+++ b/cinder/volume/drivers/nfs.py
@@ -160,6 +160,16 @@ class NfsDriver(remotefs.RemoteFSSnapDriverDistributed):
             msg = _('nfs volume must be a valid raw or qcow2 image.')
             raise exception.InvalidVolume(reason=msg)
 
+        # Test if the size is accurate or if something tried to modify it
+        if info.virtual_size != volume.size * units.Gi:
+            LOG.error('The volume virtual_size does not match the size in '
+                      'cinder, aborting as we suspect an exploit. '
+                      'Virtual Size is %(vsize)s and real size is %(size)s',
+                      {'vsize': info.virtual_size, 'size': volume.size})
+            msg = _('The volume virtual_size does not match the size in '
+                    'cinder, aborting as we suspect an exploit.')
+            raise exception.InvalidVolume(reason=msg)
+
         conn_info['data']['format'] = info.file_format
         LOG.debug('NfsDriver: conn_info: %s', conn_info)
         return conn_info
diff --git a/cinder/volume/drivers/remotefs.py b/cinder/volume/drivers/remotefs.py
index d026d40b6dc..7c97418c297 100644
--- a/cinder/volume/drivers/remotefs.py
+++ b/cinder/volume/drivers/remotefs.py
@@ -867,7 +867,8 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
 
         info = image_utils.qemu_img_info(path,
                                          force_share=force_share,
-                                         run_as_root=run_as_root)
+                                         run_as_root=run_as_root,
+                                         allow_qcow2_backing_file=True)
         if info.image:
             info.image = os.path.basename(info.image)
         if info.backing_file:
diff --git a/releasenotes/notes/bug-2059809-disallow-qcow2-datafile-abc4e6d8be766710.yaml b/releasenotes/notes/bug-2059809-disallow-qcow2-datafile-abc4e6d8be766710.yaml
new file mode 100644
index 00000000000..9814595ba1e
--- /dev/null
+++ b/releasenotes/notes/bug-2059809-disallow-qcow2-datafile-abc4e6d8be766710.yaml
@@ -0,0 +1,19 @@
+---
+security:
+  - |
+    Images in the qcow2 format with an external data file are now
+    rejected with an ``ImageUnacceptable`` error because such images
+    could be used in an exploit to expose host information.  Given
+    that qcow2 external data files were never supported by Cinder,
+    this change should have no impact on users.  See `Bug #2059809
+    <https://bugs.launchpad.net/cinder/+bug/2059809>`_ for details.
+fixes:
+  - |
+    `Bug #2059809 <https://bugs.launchpad.net/cinder/+bug/2059809>`_:
+    Fixed issue where a qcow2 format image with an external data file
+    could expose host information.  Such an image is now rejected with
+    an ``ImageUnacceptable`` error if it is used to create a volume.
+    Given that  qcow2 external data files were never supported by
+    Cinder, the only use for such an image previously was to attempt
+    to steal host information, and hence this change should have no
+    impact on users.