From 477b1342747ddeb3ab9a586b33c06f1e51f37df8 Mon Sep 17 00:00:00 2001
From: Pete Zaitcev <zaitcev@kotori.zaitcev.us>
Date: Wed, 20 Jul 2022 12:15:21 -0500
Subject: [PATCH] Restore into sparse volumes

We take the approach to detect zero data when restoring and not
writing it out to the volume. This works exactly as expected
when restoring into Linux LVM volumes and into Ceph RBD volumes,
provided they are freshly created.

Closes-bug: #2007615
Change-Id: I54b81a568a01af44e3f74bcac55e823cdae9bfbf
---
 cinder/backup/api.py                          |   4 +-
 cinder/backup/chunkeddriver.py                |  38 ++++-
 cinder/backup/driver.py                       |   2 +-
 cinder/backup/drivers/ceph.py                 |  38 +++--
 cinder/backup/manager.py                      |  24 ++-
 cinder/backup/rpcapi.py                       |  14 +-
 cinder/tests/unit/api/contrib/test_backups.py |   2 +-
 .../unit/backup/drivers/test_backup_ceph.py   |   2 +-
 .../unit/backup/drivers/test_backup_google.py |   7 +-
 .../unit/backup/drivers/test_backup_nfs.py    |  13 +-
 .../unit/backup/drivers/test_backup_posix.py  | 150 ++++++++++++++++++
 .../unit/backup/drivers/test_backup_s3.py     |  11 +-
 .../unit/backup/drivers/test_backup_swift.py  |   9 +-
 cinder/tests/unit/backup/fake_service.py      |   2 +-
 cinder/tests/unit/backup/test_backup.py       |  41 ++---
 .../tests/unit/backup/test_backup_messages.py |  10 +-
 .../tests/unit/backup/test_chunkeddriver.py   |   2 +-
 cinder/tests/unit/backup/test_rpcapi.py       |  16 +-
 cinder/volume/volume_utils.py                 |   5 +
 .../notes/backup-sparse-f396b35bfe17332e.yaml |   7 +
 20 files changed, 314 insertions(+), 83 deletions(-)
 create mode 100644 releasenotes/notes/backup-sparse-f396b35bfe17332e.yaml

diff --git a/cinder/backup/api.py b/cinder/backup/api.py
index fb3155cdf50..c53ac42c566 100644
--- a/cinder/backup/api.py
+++ b/cinder/backup/api.py
@@ -404,6 +404,7 @@ class API(base.Base):
                      "backup %(backup_id)s.",
                      {'size': size, 'backup_id': backup_id})
             volume = self.volume_api.create(context, size, name, description)
+            volume_is_new = True
             volume_id = volume['id']
 
             while True:
@@ -419,6 +420,7 @@ class API(base.Base):
                 raise exception.InvalidVolume(reason=msg)
         else:
             volume = self.volume_api.get(context, volume_id)
+            volume_is_new = False
 
         if volume['status'] != "available":
             msg = _('Volume to be restored to must be available')
@@ -447,7 +449,7 @@ class API(base.Base):
                                                    'restoring-backup'})
 
         self.backup_rpcapi.restore_backup(context, backup.host, backup,
-                                          volume_id)
+                                          volume_id, volume_is_new)
 
         d = {'backup_id': backup_id,
              'volume_id': volume_id,
diff --git a/cinder/backup/chunkeddriver.py b/cinder/backup/chunkeddriver.py
index 73e4f2eea69..6d28fc39984 100644
--- a/cinder/backup/chunkeddriver.py
+++ b/cinder/backup/chunkeddriver.py
@@ -66,6 +66,26 @@ CONF = cfg.CONF
 CONF.register_opts(backup_opts)
 
 
+def _write_nonzero(volume_file, volume_offset, content):
+    """Write non-zero parts of `content` into `volume_file`."""
+    chunk_length = 1024 * 1024
+    for chunk_offset in range(0, len(content), chunk_length):
+        chunk_end = chunk_offset + chunk_length
+        chunk = content[chunk_offset:chunk_end]
+        # The len(chunk) may be smaller than chunk_length. It's okay.
+        if not volume_utils.is_all_zero(chunk):
+            volume_file.seek(volume_offset + chunk_offset)
+            volume_file.write(chunk)
+
+
+def _write_volume(volume_is_new, volume_file, volume_offset, content):
+    if volume_is_new:
+        _write_nonzero(volume_file, volume_offset, content)
+    else:
+        volume_file.seek(volume_offset)
+        volume_file.write(content)
+
+
 # Object writer and reader returned by inheriting classes must not have any
 # logging calls, as well as the compression libraries, as eventlet has a bug
 # (https://github.com/eventlet/eventlet/issues/432) that would result in
@@ -666,7 +686,7 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
         self._finalize_backup(backup, container, object_meta, object_sha256)
 
     def _restore_v1(self, backup, volume_id, metadata, volume_file,
-                    requested_backup):
+                    volume_is_new, requested_backup):
         """Restore a v1 volume backup.
 
         Raises BackupRestoreCancel on any requested_backup status change, we
@@ -717,16 +737,17 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
                 body = reader.read()
             compression_algorithm = metadata_object[object_name]['compression']
             decompressor = self._get_compressor(compression_algorithm)
-            volume_file.seek(obj['offset'])
             if decompressor is not None:
                 LOG.debug('decompressing data using %s algorithm',
                           compression_algorithm)
                 decompressed = decompressor.decompress(body)
                 body = None  # Allow Python to free it
-                volume_file.write(decompressed)
+                _write_volume(volume_is_new,
+                              volume_file, obj['offset'], decompressed)
                 decompressed = None  # Allow Python to free it
             else:
-                volume_file.write(body)
+                _write_volume(volume_is_new,
+                              volume_file, obj['offset'], body)
                 body = None  # Allow Python to free it
 
             # force flush every write to avoid long blocking write on close
@@ -748,7 +769,7 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
         LOG.debug('v1 volume backup restore of %s finished.',
                   backup_id)
 
-    def restore(self, backup, volume_id, volume_file):
+    def restore(self, backup, volume_id, volume_file, volume_is_new):
         """Restore the given volume backup from backup repository.
 
         Raises BackupRestoreCancel on any backup status change.
@@ -757,13 +778,15 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
         container = backup['container']
         object_prefix = backup['service_metadata']
         LOG.debug('starting restore of backup %(object_prefix)s '
-                  'container: %(container)s, to volume %(volume_id)s, '
+                  'container: %(container)s, '
+                  'to %(new)s volume %(volume_id)s, '
                   'backup: %(backup_id)s.',
                   {
                       'object_prefix': object_prefix,
                       'container': container,
                       'volume_id': volume_id,
                       'backup_id': backup_id,
+                      'new': 'new' if volume_is_new else 'existing',
                   })
         metadata = self._read_metadata(backup)
         metadata_version = metadata['version']
@@ -794,7 +817,8 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
             backup1 = backup_list[index]
             index = index - 1
             metadata = self._read_metadata(backup1)
-            restore_func(backup1, volume_id, metadata, volume_file, backup)
+            restore_func(backup1, volume_id, metadata, volume_file,
+                         volume_is_new, backup)
 
             volume_meta = metadata.get('volume_meta', None)
             try:
diff --git a/cinder/backup/driver.py b/cinder/backup/driver.py
index f4dc24d6913..32bbe0bf6fe 100644
--- a/cinder/backup/driver.py
+++ b/cinder/backup/driver.py
@@ -374,7 +374,7 @@ class BackupDriver(base.Base, metaclass=abc.ABCMeta):
         return
 
     @abc.abstractmethod
-    def restore(self, backup, volume_id, volume_file):
+    def restore(self, backup, volume_id, volume_file, volume_is_new):
         """Restore a saved backup.
 
         Some I/O operations may block greenthreads, so in order to prevent
diff --git a/cinder/backup/drivers/ceph.py b/cinder/backup/drivers/ceph.py
index 0e329bd880b..1cb5727c242 100644
--- a/cinder/backup/drivers/ceph.py
+++ b/cinder/backup/drivers/ceph.py
@@ -64,6 +64,7 @@ from cinder import interface
 from cinder import objects
 from cinder import utils
 import cinder.volume.drivers.rbd as rbd_driver
+from cinder.volume import volume_utils
 
 try:
     import rados
@@ -413,7 +414,8 @@ class CephBackupDriver(driver.BackupDriver):
                        src_name: str,
                        dest: linuxrbd.RBDVolumeIOWrapper,
                        dest_name: str,
-                       length: int) -> None:
+                       length: int,
+                       discard_zeros: bool = False) -> None:
         """Transfer data between files (Python IO objects)."""
         LOG.debug("Transferring data between '%(src)s' and '%(dest)s'",
                   {'src': src_name, 'dest': dest_name})
@@ -434,13 +436,17 @@ class CephBackupDriver(driver.BackupDriver):
 
                 return
 
-            dest.write(data)
-            dest.flush()
+            if (discard_zeros and volume_utils.is_all_zero(data)):
+                action = "Discarded"
+            else:
+                dest.write(data)
+                dest.flush()
+                action = "Transferred"
             delta = (time.time() - before)
             rate = (self.chunk_size / delta) / 1024
-            LOG.debug("Transferred chunk %(chunk)s of %(chunks)s "
-                      "(%(rate)dK/s)",
-                      {'chunk': chunk + 1,
+            LOG.debug("%(action)s chunk %(chunk)s of %(chunks)s (%(rate)dK/s)",
+                      {'action': action,
+                       'chunk': chunk + 1,
                        'chunks': chunks,
                        'rate': rate})
 
@@ -1075,6 +1081,7 @@ class CephBackupDriver(driver.BackupDriver):
                       dest_file,
                       dest_name: str,
                       length: int,
+                      volume_is_new: bool,
                       src_snap=None) -> None:
         """Restore volume using full copy i.e. all extents.
 
@@ -1103,7 +1110,8 @@ class CephBackupDriver(driver.BackupDriver):
                                                      self._ceph_backup_conf)
                 rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
                 self._transfer_data(eventlet.tpool.Proxy(rbd_fd), backup_name,
-                                    dest_file, dest_name, length)
+                                    dest_file, dest_name, length,
+                                    discard_zeros=volume_is_new)
             finally:
                 src_rbd.close()
 
@@ -1279,7 +1287,8 @@ class CephBackupDriver(driver.BackupDriver):
     def _restore_volume(self,
                         backup: 'objects.Backup',
                         volume: 'objects.Volume',
-                        volume_file: linuxrbd.RBDVolumeIOWrapper) -> None:
+                        volume_file: linuxrbd.RBDVolumeIOWrapper,
+                        volume_is_new: bool) -> None:
         """Restore volume from backup using diff transfer if possible.
 
         Attempts a differential restore and reverts to full copy if diff fails.
@@ -1313,7 +1322,7 @@ class CephBackupDriver(driver.BackupDriver):
             # Otherwise full copy
             LOG.debug("Running full restore.")
             self._full_restore(backup, volume_file, volume.name,
-                               length, src_snap=restore_point)
+                               length, volume_is_new, src_snap=restore_point)
 
     def _restore_metadata(self,
                           backup: 'objects.Backup',
@@ -1340,18 +1349,21 @@ class CephBackupDriver(driver.BackupDriver):
     def restore(self,
                 backup: 'objects.Backup',
                 volume_id: str,
-                volume_file: linuxrbd.RBDVolumeIOWrapper) -> None:
+                volume_file: linuxrbd.RBDVolumeIOWrapper,
+                volume_is_new: bool) -> None:
         """Restore volume from backup in Ceph object store.
 
         If volume metadata is available this will also be restored.
         """
         target_volume = self.db.volume_get(self.context, volume_id)
         LOG.debug('Starting restore from Ceph backup=%(src)s to '
-                  'volume=%(dest)s',
-                  {'src': backup.id, 'dest': target_volume.name})
+                  'volume=%(dest)s new=%(new)s',
+                  {'src': backup.id, 'dest': target_volume.name,
+                   'new': volume_is_new})
 
         try:
-            self._restore_volume(backup, target_volume, volume_file)
+            self._restore_volume(backup, target_volume, volume_file,
+                                 volume_is_new)
 
             # Be tolerant of IO implementations that do not support fileno()
             try:
diff --git a/cinder/backup/manager.py b/cinder/backup/manager.py
index a978b8d7ab7..0ac16102e5f 100644
--- a/cinder/backup/manager.py
+++ b/cinder/backup/manager.py
@@ -607,8 +607,15 @@ class BackupManager(manager.SchedulerDependentManager):
         return False
 
     @utils.limit_operations
-    def restore_backup(self, context, backup, volume_id):
-        """Restore volume backups from configured backup service."""
+    def restore_backup(self, context, backup, volume_id, volume_is_new):
+        """Restore volume backups from configured backup service.
+
+        :param context: RequestContext for the restore operation
+        :param backup: Backup that we're restoring
+        :param volume_id: The ID of the volume into which we're restoring
+        :param volume_is_new: The volume does not have stale data, so
+                              sparse backups can be restored as such.
+        """
         context.message_resource_id = backup.id
         context.message_resource_type = message_field.Resource.VOLUME_BACKUP
         context.message_action = message_field.Action.BACKUP_RESTORE
@@ -683,7 +690,7 @@ class BackupManager(manager.SchedulerDependentManager):
 
         canceled = False
         try:
-            self._run_restore(context, backup, volume)
+            self._run_restore(context, backup, volume, volume_is_new)
         except exception.BackupRestoreCancel:
             canceled = True
         except Exception:
@@ -725,7 +732,7 @@ class BackupManager(manager.SchedulerDependentManager):
                   'volume_id': volume_id})
         self._notify_about_backup_usage(context, backup, "restore.end")
 
-    def _run_restore(self, context, backup, volume):
+    def _run_restore(self, context, backup, volume, volume_is_new):
         message_created = False
         orig_key_id = volume.encryption_key_id
         backup_service = self.service(context)
@@ -754,16 +761,19 @@ class BackupManager(manager.SchedulerDependentManager):
                 if secure_enabled:
                     with open(device_path, open_mode) as device_file:
                         backup_service.restore(backup, volume.id,
-                                               tpool.Proxy(device_file))
+                                               tpool.Proxy(device_file),
+                                               volume_is_new)
                 else:
                     with utils.temporary_chown(device_path):
                         with open(device_path, open_mode) as device_file:
                             backup_service.restore(backup, volume.id,
-                                                   tpool.Proxy(device_file))
+                                                   tpool.Proxy(device_file),
+                                                   volume_is_new)
             # device_path is already file-like so no need to open it
             else:
                 backup_service.restore(backup, volume.id,
-                                       tpool.Proxy(device_path))
+                                       tpool.Proxy(device_path),
+                                       volume_is_new)
         except exception.BackupRestoreCancel:
             raise
         except Exception:
diff --git a/cinder/backup/rpcapi.py b/cinder/backup/rpcapi.py
index 06a3104585d..cee45c48774 100644
--- a/cinder/backup/rpcapi.py
+++ b/cinder/backup/rpcapi.py
@@ -48,9 +48,10 @@ class BackupAPI(rpc.RPCAPI):
         2.1 - Adds set_log_levels and get_log_levels
         2.2 - Adds publish_service_capabilities
         2.3 - Adds continue_backup call
+        2.4 - Add the volume_is_new flag to the restore_backup method
     """
 
-    RPC_API_VERSION = '2.3'
+    RPC_API_VERSION = '2.4'
     RPC_DEFAULT_VERSION = '2.0'
     TOPIC = constants.BACKUP_TOPIC
     BINARY = 'cinder-backup'
@@ -66,11 +67,16 @@ class BackupAPI(rpc.RPCAPI):
         cctxt.cast(ctxt, 'continue_backup', backup=backup,
                    backup_device=backup_device)
 
-    def restore_backup(self, ctxt, backup_host, backup, volume_id):
+    def restore_backup(self, ctxt, backup_host, backup, volume_id,
+                       volume_is_new):
         LOG.debug("restore_backup in rpcapi backup_id %s", backup.id)
         cctxt = self._get_cctxt(server=backup_host)
-        cctxt.cast(ctxt, 'restore_backup', backup=backup,
-                   volume_id=volume_id)
+        if self.client.can_send_version('2.4'):
+            cctxt.cast(ctxt, 'restore_backup', backup=backup,
+                       volume_id=volume_id, volume_is_new=volume_is_new)
+        else:
+            cctxt.cast(ctxt, 'restore_backup', backup=backup,
+                       volume_id=volume_id)
 
     def delete_backup(self, ctxt, backup):
         LOG.debug("delete_backup rpcapi backup_id %s", backup.id)
diff --git a/cinder/tests/unit/api/contrib/test_backups.py b/cinder/tests/unit/api/contrib/test_backups.py
index 7189cba0ca4..7437427290d 100644
--- a/cinder/tests/unit/api/contrib/test_backups.py
+++ b/cinder/tests/unit/api/contrib/test_backups.py
@@ -2006,7 +2006,7 @@ class BackupsAPITestCase(test.TestCase):
         self.assertEqual(volume.id, res_dict['restore']['volume_id'])
         self.assertEqual(volume_name, res_dict['restore']['volume_name'])
         mock_restore_backup.assert_called_once_with(mock.ANY, 'testhost',
-                                                    mock.ANY, volume.id)
+                                                    mock.ANY, volume.id, False)
         # Manually check if restore_backup was called with appropriate backup.
         self.assertEqual(backup.id, mock_restore_backup.call_args[0][2].id)
 
diff --git a/cinder/tests/unit/backup/drivers/test_backup_ceph.py b/cinder/tests/unit/backup/drivers/test_backup_ceph.py
index 813c6fea2be..e01ca9c67b5 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_ceph.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_ceph.py
@@ -916,7 +916,7 @@ class BackupCephTestCase(test.TestCase):
                     self.volume_file.seek(0)
 
                     self.service.restore(self.alt_backup, self.volume_id,
-                                         test_file)
+                                         test_file, False)
 
                     checksum = hashlib.sha256()
                     test_file.seek(0)
diff --git a/cinder/tests/unit/backup/drivers/test_backup_google.py b/cinder/tests/unit/backup/drivers/test_backup_google.py
index b9efb8b432a..8788702a9e2 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_google.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_google.py
@@ -504,7 +504,7 @@ class GoogleBackupDriverTestCase(test.TestCase):
         service = google_dr.GoogleBackupDriver(self.ctxt)
 
         with tempfile.NamedTemporaryFile() as volume_file:
-            service.restore(backup, volume_id, volume_file)
+            service.restore(backup, volume_id, volume_file, False)
 
     @gcs_client
     def test_restore_fail(self):
@@ -517,7 +517,7 @@ class GoogleBackupDriverTestCase(test.TestCase):
         with tempfile.NamedTemporaryFile() as volume_file:
             self.assertRaises(google_dr.GCSConnectionFailure,
                               service.restore,
-                              backup, volume_id, volume_file)
+                              backup, volume_id, volume_file, False)
 
     @gcs_client2
     def test_restore_delta(self):
@@ -548,8 +548,7 @@ class GoogleBackupDriverTestCase(test.TestCase):
         service2.backup(deltabackup, self.volume_file, True)
 
         with tempfile.NamedTemporaryFile() as restored_file:
-            service2.restore(deltabackup, volume_id,
-                             restored_file)
+            service2.restore(deltabackup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
diff --git a/cinder/tests/unit/backup/drivers/test_backup_nfs.py b/cinder/tests/unit/backup/drivers/test_backup_nfs.py
index f033f71684a..2ada9293cbd 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_nfs.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_nfs.py
@@ -698,7 +698,7 @@ class BackupNFSTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
             backup.status = objects.fields.BackupStatus.RESTORING
             backup.save()
-            service.restore(backup, volume_id, restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -721,7 +721,7 @@ class BackupNFSTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
             backup.status = objects.fields.BackupStatus.RESTORING
             backup.save()
-            service.restore(backup, volume_id, restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -747,7 +747,7 @@ class BackupNFSTestCase(test.TestCase):
 
         with tempfile.NamedTemporaryFile() as restored_file:
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
-            service.restore(backup, volume_id, restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -773,7 +773,7 @@ class BackupNFSTestCase(test.TestCase):
 
         with tempfile.NamedTemporaryFile() as restored_file:
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
-            service.restore(backup, volume_id, restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -842,7 +842,7 @@ class BackupNFSTestCase(test.TestCase):
 
             self.assertRaises(exception.BackupRestoreCancel,
                               service.restore, backup, volume_id,
-                              restored_file)
+                              restored_file, False)
 
     def test_restore_delta(self):
         volume_id = fake.VOLUME_ID
@@ -890,8 +890,7 @@ class BackupNFSTestCase(test.TestCase):
 
         with tempfile.NamedTemporaryFile() as restored_file:
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP2_ID)
-            service.restore(backup, volume_id,
-                            restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
diff --git a/cinder/tests/unit/backup/drivers/test_backup_posix.py b/cinder/tests/unit/backup/drivers/test_backup_posix.py
index 794c1e5552a..1f098feffba 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_posix.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_posix.py
@@ -16,14 +16,19 @@
 
 import builtins
 import os
+import shutil
+import tempfile
 from unittest import mock
+import uuid
 
 from cinder.backup.drivers import posix
+from cinder.common import config
 from cinder import context
 from cinder import objects
 from cinder.tests.unit import fake_constants as fake
 from cinder.tests.unit import test
 
+CONF = config.CONF
 
 FAKE_FILE_SIZE = 52428800
 FAKE_SHA_BLOCK_SIZE_BYTES = 1024
@@ -177,3 +182,148 @@ class PosixBackupDriverTestCase(test.TestCase):
                                                timestamp,
                                                backup.id)
         self.assertEqual(expected, res)
+
+
+class PosixBackupTestWithData(test.TestCase):
+
+    def _create_volume_db_entry(self, display_name='test_volume',
+                                display_description='this is a test volume',
+                                status='backing-up',
+                                previous_status='available',
+                                size=1,
+                                host='testhost',
+                                encryption_key_id=None,
+                                project_id=None):
+        """Create a volume entry in the DB.
+
+        Return the entry ID
+        """
+        vol = {}
+        vol['size'] = size
+        vol['host'] = host
+        vol['user_id'] = fake.USER_ID
+        vol['project_id'] = project_id or fake.PROJECT_ID
+        vol['status'] = status
+        vol['display_name'] = display_name
+        vol['display_description'] = display_description
+        vol['attach_status'] = objects.fields.VolumeAttachStatus.DETACHED
+        vol['availability_zone'] = '1'
+        vol['previous_status'] = previous_status
+        vol['encryption_key_id'] = encryption_key_id
+        vol['volume_type_id'] = fake.VOLUME_TYPE_ID
+        volume = objects.Volume(context=self.ctxt, **vol)
+        volume.create()
+        return volume.id
+
+    def _create_backup_db_entry(self, volume_id=str(uuid.uuid4()),
+                                restore_volume_id=None,
+                                display_name='test_backup',
+                                display_description='this is a test backup',
+                                container='volumebackups',
+                                status=objects.fields.BackupStatus.CREATING,
+                                size=1,
+                                object_count=0,
+                                project_id=str(uuid.uuid4()),
+                                service=None,
+                                temp_volume_id=None,
+                                temp_snapshot_id=None,
+                                snapshot_id=None,
+                                metadata=None,
+                                parent_id=None,
+                                encryption_key_id=None):
+        """Create a backup entry in the DB.
+
+        Return the entry ID
+        """
+        kwargs = {}
+        kwargs['volume_id'] = volume_id
+        kwargs['restore_volume_id'] = restore_volume_id
+        kwargs['user_id'] = str(uuid.uuid4())
+        kwargs['project_id'] = project_id
+        kwargs['host'] = 'testhost'
+        kwargs['availability_zone'] = '1'
+        kwargs['display_name'] = display_name
+        kwargs['display_description'] = display_description
+        kwargs['container'] = container
+        kwargs['status'] = status
+        kwargs['fail_reason'] = ''
+        kwargs['service'] = service or CONF.backup_driver
+        kwargs['snapshot_id'] = snapshot_id
+        kwargs['parent_id'] = parent_id
+        kwargs['size'] = size
+        kwargs['object_count'] = object_count
+        kwargs['temp_volume_id'] = temp_volume_id
+        kwargs['temp_snapshot_id'] = temp_snapshot_id
+        kwargs['metadata'] = metadata or {}
+        kwargs['encryption_key_id'] = encryption_key_id
+        backup = objects.Backup(context=self.ctxt, **kwargs)
+        backup.create()
+        return backup
+
+    def setUp(self):
+        super(PosixBackupTestWithData, self).setUp()
+
+        self.tempdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.tempdir)
+
+        backup_path = os.path.join(self.tempdir, "backup-dir")
+        os.mkdir(backup_path)
+
+        self.ctxt = context.get_admin_context()
+
+        self.override_config('backup_file_size',
+                             FAKE_FILE_SIZE)
+        self.override_config('backup_sha_block_size_bytes',
+                             FAKE_SHA_BLOCK_SIZE_BYTES)
+        self.override_config('backup_enable_progress_timer',
+                             FAKE_BACKUP_ENABLE_PROGRESS_TIMER)
+        self.override_config('backup_posix_path', backup_path)
+        self.mock_object(posix, 'LOG')
+
+        self.driver = posix.PosixBackupDriver(self.ctxt)
+
+        mock_volume_filename = "restore-volume"
+        self.vol_path = os.path.join(self.tempdir, mock_volume_filename)
+
+    def test_restore_backup_with_sparseness(self):
+        """Test a sparse backup restoration."""
+
+        vol_size = 1
+        vol_id = self._create_volume_db_entry(status='restoring-backup',
+                                              size=vol_size)
+
+        chunk_size = 1024 * 1024
+
+        obj_data = b'01234567890123456789'
+
+        backup = self._create_backup_db_entry(
+            volume_id=vol_id,
+            status=objects.fields.BackupStatus.RESTORING)
+
+        with tempfile.NamedTemporaryFile() as volume_file:
+
+            # First, we create a fake volume with a hole. Although we know that
+            # the driver only detects zeroes, we create a real file with a hole
+            # as a way to future-proof this a little. Also, it's easier.
+            # Miraclously, tmpfs supports files with actual holes.
+            volume_file.seek(3 * chunk_size)
+            volume_file.write(obj_data)
+
+            # And then, we immediately run a backup on the fake volume.
+            # We don't attempt to re-create the backup volume by hand.
+            volume_file.seek(0)
+            self.driver.backup(backup, volume_file)
+
+        # Next, we restore, excercising the code under test.
+        with open(self.vol_path, 'wb') as volume_file:
+            self.driver.restore(backup, vol_id, volume_file, True)
+
+        # Finally, we examine the fake volume into which we restored.
+        with open(self.vol_path, 'rb') as volume_file:
+            volume_file.seek(3 * chunk_size)
+            question_data = volume_file.read(len(obj_data))
+
+        self.assertEqual(obj_data, question_data)
+
+        statb = os.stat(self.vol_path)
+        self.assertLess(statb.st_blocks * 512, (3 * chunk_size + 512) / 512)
diff --git a/cinder/tests/unit/backup/drivers/test_backup_s3.py b/cinder/tests/unit/backup/drivers/test_backup_s3.py
index 04d097badd9..bf4c95e207d 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_s3.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_s3.py
@@ -528,7 +528,7 @@ class BackupS3TestCase(test.TestCase):
         service.backup(backup, self.volume_file)
 
         with tempfile.NamedTemporaryFile() as volume_file:
-            service.restore(backup, volume_id, volume_file)
+            service.restore(backup, volume_id, volume_file, False)
 
     @mock_s3
     def test_restore_delta(self):
@@ -555,8 +555,7 @@ class BackupS3TestCase(test.TestCase):
         service2.backup(deltabackup, self.volume_file, True)
 
         with tempfile.NamedTemporaryFile() as restored_file:
-            service2.restore(deltabackup, volume_id,
-                             restored_file)
+            service2.restore(deltabackup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -571,7 +570,7 @@ class BackupS3TestCase(test.TestCase):
         with tempfile.NamedTemporaryFile() as volume_file:
             self.assertRaises(s3_dr.S3ClientError,
                               service.restore,
-                              backup, volume_id, volume_file)
+                              backup, volume_id, volume_file, False)
 
     @s3_client
     def test_restore_faili2(self):
@@ -584,7 +583,7 @@ class BackupS3TestCase(test.TestCase):
         with tempfile.NamedTemporaryFile() as volume_file:
             self.assertRaises(s3_dr.S3ConnectionFailure,
                               service.restore,
-                              backup, volume_id, volume_file)
+                              backup, volume_id, volume_file, False)
 
     @mock_s3
     def test_backup_md5_validation(self):
@@ -618,4 +617,4 @@ class BackupS3TestCase(test.TestCase):
         service.backup(backup, self.volume_file)
 
         with tempfile.NamedTemporaryFile() as volume_file:
-            service.restore(backup, volume_id, volume_file)
+            service.restore(backup, volume_id, volume_file, False)
diff --git a/cinder/tests/unit/backup/drivers/test_backup_swift.py b/cinder/tests/unit/backup/drivers/test_backup_swift.py
index 2eadfae2388..750e7ba1385 100644
--- a/cinder/tests/unit/backup/drivers/test_backup_swift.py
+++ b/cinder/tests/unit/backup/drivers/test_backup_swift.py
@@ -799,7 +799,7 @@ class BackupSwiftTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
             backup.status = objects.fields.BackupStatus.RESTORING
             backup.save()
-            service.restore(backup, volume_id, volume_file)
+            service.restore(backup, volume_id, volume_file, False)
 
     def test_restore_delta(self):
         volume_id = '04d83506-bcf7-4ff5-9c65-00000051bd2e'
@@ -849,8 +849,7 @@ class BackupSwiftTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP2_ID)
             backup.status = objects.fields.BackupStatus.RESTORING
             backup.save()
-            service.restore(backup, volume_id,
-                            restored_file)
+            service.restore(backup, volume_id, restored_file, False)
             self.assertTrue(filecmp.cmp(self.volume_file.name,
                             restored_file.name))
 
@@ -865,7 +864,7 @@ class BackupSwiftTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
             self.assertRaises(exception.SwiftConnectionFailed,
                               service.restore,
-                              backup, volume_id, volume_file)
+                              backup, volume_id, volume_file, False)
 
     def test_restore_unsupported_version(self):
         volume_id = '390db8c1-32d3-42ca-82c9-00000010c703'
@@ -878,7 +877,7 @@ class BackupSwiftTestCase(test.TestCase):
             backup = objects.Backup.get_by_id(self.ctxt, fake.BACKUP_ID)
             self.assertRaises(exception.InvalidBackup,
                               service.restore,
-                              backup, volume_id, volume_file)
+                              backup, volume_id, volume_file, False)
 
     def test_delete(self):
         volume_id = '9ab256c8-3175-4ad8-baa1-0000007f9d31'
diff --git a/cinder/tests/unit/backup/fake_service.py b/cinder/tests/unit/backup/fake_service.py
index fa6537403c6..bcc40a1ffb2 100644
--- a/cinder/tests/unit/backup/fake_service.py
+++ b/cinder/tests/unit/backup/fake_service.py
@@ -23,7 +23,7 @@ class FakeBackupService(driver.BackupDriver):
     def backup(self, backup, volume_file):
         pass
 
-    def restore(self, backup, volume_id, volume_file):
+    def restore(self, backup, volume_id, volume_file, volume_is_new):
         pass
 
     def delete_backup(self, backup):
diff --git a/cinder/tests/unit/backup/test_backup.py b/cinder/tests/unit/backup/test_backup.py
index d63271e73f0..4e0c7b91dd1 100644
--- a/cinder/tests/unit/backup/test_backup.py
+++ b/cinder/tests/unit/backup/test_backup.py
@@ -1139,7 +1139,8 @@ class BackupTestCase(BaseBackupTest):
                           self.backup_mgr.restore_backup,
                           self.ctxt,
                           backup,
-                          vol_id)
+                          vol_id,
+                          False)
         backup = db.backup_get(self.ctxt, backup.id)
         vol = db.volume_get(self.ctxt, vol_id)
         self.assertEqual('error_restoring', vol['status'])
@@ -1159,7 +1160,8 @@ class BackupTestCase(BaseBackupTest):
                           self.backup_mgr.restore_backup,
                           self.ctxt,
                           backup,
-                          vol_id)
+                          vol_id,
+                          False)
         vol = db.volume_get(self.ctxt, vol_id)
         self.assertEqual('error', vol['status'])
         backup = db.backup_get(self.ctxt, backup.id)
@@ -1180,7 +1182,8 @@ class BackupTestCase(BaseBackupTest):
                           self.backup_mgr.restore_backup,
                           self.ctxt,
                           backup,
-                          vol_id)
+                          vol_id,
+                          False)
         vol = db.volume_get(self.ctxt, vol_id)
         self.assertEqual('error_restoring', vol['status'])
         backup = db.backup_get(self.ctxt, backup.id)
@@ -1200,7 +1203,7 @@ class BackupTestCase(BaseBackupTest):
         mock_run_restore.side_effect = exception.BackupRestoreCancel(
             vol_id=vol_id, back_id=backup.id)
         # We shouldn't raise an exception on the call, it's OK to cancel
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
         vol = objects.Volume.get_by_id(self.ctxt, vol_id)
         self.assertEqual('error', vol.status)
         backup.refresh()
@@ -1217,7 +1220,7 @@ class BackupTestCase(BaseBackupTest):
         mock_run_restore = self.mock_object(
             self.backup_mgr,
             '_run_restore')
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
         vol = objects.Volume.get_by_id(self.ctxt, vol_id)
         self.assertEqual(fields.VolumeStatus.AVAILABLE, vol.status)
         self.assertIsNotNone(vol.launched_at)
@@ -1238,7 +1241,7 @@ class BackupTestCase(BaseBackupTest):
         mock_run_restore.side_effect = exception.BackupRestoreCancel(
             vol_id=vol_id, back_id=backup.id)
         # We shouldn't raise an exception on the call, it's OK to cancel
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
         vol = objects.Volume.get_by_id(self.ctxt, vol_id)
         self.assertEqual(fields.VolumeStatus.ERROR, vol.status)
         backup.refresh()
@@ -1262,7 +1265,8 @@ class BackupTestCase(BaseBackupTest):
                           self.backup_mgr.restore_backup,
                           self.ctxt,
                           backup,
-                          vol_id)
+                          vol_id,
+                          False)
         vol = db.volume_get(self.ctxt, vol_id)
         self.assertEqual('error', vol['status'])
         backup = db.backup_get(self.ctxt, backup.id)
@@ -1299,7 +1303,7 @@ class BackupTestCase(BaseBackupTest):
         mock_attach_device.return_value = attach_info
 
         with mock.patch('os.name', os_name):
-            self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+            self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
 
         mock_open.assert_called_once_with('/dev/null', exp_open_mode)
         mock_temporary_chown.assert_called_once_with('/dev/null')
@@ -1359,14 +1363,15 @@ class BackupTestCase(BaseBackupTest):
         mock_attach_device.return_value = attach_info
 
         with mock.patch('os.name', os_name):
-            self.backup_mgr.restore_backup(self.ctxt, backup, new_vol_id)
+            self.backup_mgr.restore_backup(self.ctxt, backup, new_vol_id,
+                                           False)
 
         backup.status = "restoring"
         db.backup_update(self.ctxt, backup.id, {"status": "restoring"})
         vol.status = 'available'
         vol.obj_reset_changes()
         with mock.patch('os.name', os_name):
-            self.backup_mgr.restore_backup(self.ctxt, backup, vol2_id)
+            self.backup_mgr.restore_backup(self.ctxt, backup, vol2_id, False)
 
         vol2.refresh()
         old_src_backup_id = vol2.metadata["src_backup_id"]
@@ -1376,7 +1381,7 @@ class BackupTestCase(BaseBackupTest):
         vol2.obj_reset_changes()
 
         with mock.patch('os.name', os_name):
-            self.backup_mgr.restore_backup(self.ctxt, backup2, vol2_id)
+            self.backup_mgr.restore_backup(self.ctxt, backup2, vol2_id, False)
 
         vol2.status = 'available'
         vol2.obj_reset_changes()
@@ -1399,7 +1404,7 @@ class BackupTestCase(BaseBackupTest):
             status=fields.BackupStatus.RESTORING, volume_id=vol_id)
         self.backup_mgr._run_restore = mock.Mock()
 
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
         self.assertEqual(2, notify.call_count)
 
     @mock.patch('cinder.volume.volume_utils.clone_encryption_key')
@@ -1429,7 +1434,7 @@ class BackupTestCase(BaseBackupTest):
                                               '_attach_device')
         mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
 
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
         volume = db.volume_get(self.ctxt, vol_id)
         self.assertEqual(fake.UUID1, volume.encryption_key_id)
         mock_clone_encryption_key.assert_not_called()
@@ -1470,13 +1475,13 @@ class BackupTestCase(BaseBackupTest):
         # Mimic the driver's side effect where it updates the volume's
         # metadata. For backups of encrypted volumes, this will essentially
         # overwrite the volume's encryption key ID prior to the restore.
-        def restore_side_effect(backup, volume_id, volume_file):
+        def restore_side_effect(backup, volume_id, volume_file, volume_is_new):
             db.volume_update(self.ctxt,
                              volume_id,
                              {'encryption_key_id': fake.UUID4})
         mock_backup_driver_restore.side_effect = restore_side_effect
 
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
 
         # Volume's original encryption key ID should be deleted
         mock_delete_encryption_key.assert_called_once_with(self.ctxt,
@@ -1527,13 +1532,13 @@ class BackupTestCase(BaseBackupTest):
         # Mimic the driver's side effect where it updates the volume's
         # metadata. For backups of encrypted volumes, this will essentially
         # overwrite the volume's encryption key ID prior to the restore.
-        def restore_side_effect(backup, volume_id, volume_file):
+        def restore_side_effect(backup, volume_id, volume_file, volume_is_new):
             db.volume_update(self.ctxt,
                              volume_id,
                              {'encryption_key_id': fake.UUID4})
         mock_backup_driver_restore.side_effect = restore_side_effect
 
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
 
         # Volume's original encryption key ID should be deleted
         mock_delete_encryption_key.assert_called_once_with(self.ctxt,
@@ -1931,7 +1936,7 @@ class BackupTestCase(BaseBackupTest):
         backup = self._create_backup_db_entry(
             volume_id=vol_id, status=fields.BackupStatus.RESTORING)
 
-        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
+        self.backup_mgr.restore_backup(self.ctxt, backup, vol_id, False)
 
         self.assertEqual(1, mock_sem.__enter__.call_count)
         self.assertEqual(1, mock_restore.call_count)
diff --git a/cinder/tests/unit/backup/test_backup_messages.py b/cinder/tests/unit/backup/test_backup_messages.py
index 136f4bfd014..3ed0e9ad232 100644
--- a/cinder/tests/unit/backup/test_backup_messages.py
+++ b/cinder/tests/unit/backup/test_backup_messages.py
@@ -379,7 +379,7 @@ class BackupUserMessagesTest(test.TestCase):
 
         self.assertRaises(
             exception.InvalidVolume, manager.restore_backup,
-            fake_context, fake_backup, fake.VOLUME_ID)
+            fake_context, fake_backup, fake.VOLUME_ID, False)
         mock_msg_create.assert_called_once_with(
             fake_context,
             action=message_field.Action.BACKUP_RESTORE,
@@ -409,7 +409,7 @@ class BackupUserMessagesTest(test.TestCase):
 
         self.assertRaises(
             exception.InvalidBackup, manager.restore_backup,
-            fake_context, fake_backup, fake.VOLUME_ID)
+            fake_context, fake_backup, fake.VOLUME_ID, False)
         self.assertEqual(message_field.Action.BACKUP_RESTORE,
                          fake_context.message_action)
         self.assertEqual(message_field.Resource.VOLUME_BACKUP,
@@ -455,7 +455,7 @@ class BackupUserMessagesTest(test.TestCase):
 
         self.assertRaises(
             exception.InvalidBackup, manager.restore_backup,
-            fake_context, fake_backup, fake.VOLUME_ID)
+            fake_context, fake_backup, fake.VOLUME_ID, False)
         self.assertEqual(message_field.Action.BACKUP_RESTORE,
                          fake_context.message_action)
         self.assertEqual(message_field.Resource.VOLUME_BACKUP,
@@ -505,7 +505,7 @@ class BackupUserMessagesTest(test.TestCase):
 
         self.assertRaises(
             exception.InvalidBackup, manager.restore_backup,
-            fake_context, fake_backup, fake.VOLUME_ID)
+            fake_context, fake_backup, fake.VOLUME_ID, False)
         self.assertEqual(message_field.Action.BACKUP_RESTORE,
                          fake_context.message_action)
         self.assertEqual(message_field.Resource.VOLUME_BACKUP,
@@ -555,7 +555,7 @@ class BackupUserMessagesTest(test.TestCase):
 
         self.assertRaises(
             exception.InvalidBackup, manager.restore_backup,
-            fake_context, fake_backup, fake.VOLUME_ID)
+            fake_context, fake_backup, fake.VOLUME_ID, False)
         self.assertEqual(message_field.Action.BACKUP_RESTORE,
                          fake_context.message_action)
         self.assertEqual(message_field.Resource.VOLUME_BACKUP,
diff --git a/cinder/tests/unit/backup/test_chunkeddriver.py b/cinder/tests/unit/backup/test_chunkeddriver.py
index 403deb7b5b0..3bf1ae2b6d2 100644
--- a/cinder/tests/unit/backup/test_chunkeddriver.py
+++ b/cinder/tests/unit/backup/test_chunkeddriver.py
@@ -465,7 +465,7 @@ class ChunkedDriverTestCase(test.TestCase):
             self.volume, parent_id=self.backup.id)
 
         with mock.patch.object(self.driver, 'put_metadata') as mock_put:
-            self.driver.restore(backup, self.volume, volume_file)
+            self.driver.restore(backup, self.volume, volume_file, False)
             self.assertEqual(2, mock_put.call_count)
 
         restore_test.assert_called()
diff --git a/cinder/tests/unit/backup/test_rpcapi.py b/cinder/tests/unit/backup/test_rpcapi.py
index 144b3cd278a..b92fdfe3ca2 100644
--- a/cinder/tests/unit/backup/test_rpcapi.py
+++ b/cinder/tests/unit/backup/test_rpcapi.py
@@ -31,6 +31,9 @@ class BackupRPCAPITestCase(test.RPCAPITestCase):
         self.rpcapi = backup_rpcapi.BackupAPI
         self.fake_backup_obj = fake_backup.fake_backup_obj(self.context)
 
+        self.can_send_version_mock = self.patch(
+            'oslo_messaging.RPCClient.can_send_version', return_value=True)
+
     def test_create_backup(self):
         self._test_rpc_api('create_backup',
                            rpc_method='cast',
@@ -43,7 +46,18 @@ class BackupRPCAPITestCase(test.RPCAPITestCase):
                            server='fake_volume_host',
                            backup_host='fake_volume_host',
                            backup=self.fake_backup_obj,
-                           volume_id=fake.VOLUME_ID)
+                           volume_id=fake.VOLUME_ID,
+                           volume_is_new=True)
+
+        with mock.patch('cinder.rpc.LAST_RPC_VERSIONS',
+                        {'cinder-backup': '2.0'}):
+            self._test_rpc_api('restore_backup',
+                               rpc_method='cast',
+                               server='fake_volume_host',
+                               backup_host='fake_volume_host',
+                               backup=self.fake_backup_obj,
+                               volume_id=fake.VOLUME_ID,
+                               volume_is_new=False)
 
     def test_delete_backup(self):
         self._test_rpc_api('delete_backup',
diff --git a/cinder/volume/volume_utils.py b/cinder/volume/volume_utils.py
index 611e12f7bfb..831b137b9a1 100644
--- a/cinder/volume/volume_utils.py
+++ b/cinder/volume/volume_utils.py
@@ -1634,3 +1634,8 @@ def log_unsupported_driver_warning(driver):
                      'version': driver.get_version()},
                     resource={'type': 'driver',
                               'id': driver.__class__.__name__})
+
+
+def is_all_zero(chunk: bytes) -> bool:
+    """Return true if the chunk of bytes is all zeroes."""
+    return chunk == bytes(len(chunk))
diff --git a/releasenotes/notes/backup-sparse-f396b35bfe17332e.yaml b/releasenotes/notes/backup-sparse-f396b35bfe17332e.yaml
new file mode 100644
index 00000000000..a9731cb6228
--- /dev/null
+++ b/releasenotes/notes/backup-sparse-f396b35bfe17332e.yaml
@@ -0,0 +1,7 @@
+---
+fixes:
+  - |
+    `Bug #2007615 <https://bugs.launchpad.net/cinder/+bug/2007615>`_:
+    the restore operation of the Cinder backup service now restores into
+    sparse volumes, if possible. So, operators no longer need more space
+    than used previously when they restore from a disaster.