Don't destroy existing backup by mistake on import

We're supposed to fail an import request for an already existing
backup, but the current code confuses an existing backup record with
one we created for the import, and during quota rollback, will
destroy either one.  Simplify the logic so that we check whether
there's an already existing backup first and bail without even
bothering to make an unnecessary reservation, hence leaving us nothing
to roll back or delete by mistake.

Also revise and add tests.

Closes-bug: #1965847
Change-Id: I3c0e365f5dc3c32975343f538c6029f02ac7c2b5
(cherry picked from commit 6386cbb0a0)
(cherry picked from commit 753c022463)
This commit is contained in:
Brian Rosmaita 2022-04-26 18:57:36 -04:00
parent bfcef4982a
commit 332aa33404
4 changed files with 111 additions and 23 deletions

View File

@ -499,6 +499,23 @@ class API(base.Base):
msg = _('Provided backup record is missing size attribute')
raise exception.InvalidInput(reason=msg)
# Try to get the backup with that ID in all projects even among
# deleted entries (we reuse soft-deleted backups).
try:
backup = objects.BackupImport.get_by_id(
context.elevated(read_deleted='yes'),
backup_record['id'],
project_only=False)
# If record exists and it's not deleted we cannot proceed
# with the import
if backup.status != fields.BackupStatus.DELETED:
msg = _('Backup already exists in database.')
raise exception.InvalidBackup(reason=msg)
except exception.BackupNotFound:
pass
# Check that we're under limit by reserving quota
try:
reserve_opts = {'backups': 1,
'backup_gigabytes': backup_record['size']}
@ -520,30 +537,16 @@ class API(base.Base):
}
try:
try:
# Try to get the backup with that ID in all projects even among
# deleted entries.
backup = objects.BackupImport.get_by_id(
context.elevated(read_deleted='yes'),
backup_record['id'],
project_only=False)
# If record exists and it's not deleted we cannot proceed
# with the import
if backup.status != fields.BackupStatus.DELETED:
msg = _('Backup already exists in database.')
raise exception.InvalidBackup(reason=msg)
# Otherwise we'll "revive" delete backup record
if backup:
# "revive" the soft-deleted backup record retrieved earlier
backup.update(kwargs)
backup.save()
QUOTAS.commit(context, reservations)
except exception.BackupNotFound:
# If record doesn't exist create it with the specific ID
else:
# create a new backup with the specified ID
backup = objects.BackupImport(context=context,
id=backup_record['id'], **kwargs)
backup.create()
QUOTAS.commit(context, reservations)
QUOTAS.commit(context, reservations)
except Exception:
with excutils.save_and_reraise_exception():
try:

View File

@ -2240,9 +2240,14 @@ class BackupsAPITestCase(test.TestCase):
res_dict['badRequest']['code'])
self.assertEqual('Invalid backup: Backup already exists in database.',
res_dict['badRequest']['message'])
mock_reserve.assert_called_with(
ctx, backups=1, backup_gigabytes=1)
mock_rollback.assert_called_with(ctx, "fake_reservation")
# Bug #1965847: already existing backup should not be deleted
self.assertNotEqual(fields.BackupStatus.DELETED,
self._get_backup_attrib(backup.id, 'status'))
mock_reserve.assert_not_called()
mock_rollback.assert_not_called()
mock_commit.assert_not_called()
backup.destroy()
@mock.patch.object(quota.QUOTAS, 'commit')

View File

@ -2086,13 +2086,86 @@ class BackupAPITestCase(BaseBackupTest):
'user_id': backup.user_id,
'volume_id': backup.volume_id,
'size': 1}
mock_reserve.return_value = 'fake_reservation'
self.assertRaises(exception.InvalidBackup,
self.api._get_import_backup,
self.ctxt, 'fake_backup_url')
# make sure Bug #1965847 has been fixed
backup = db.backup_get(self.ctxt, backup.id)
self.assertNotEqual(fields.BackupStatus.DELETED, backup.status)
# the fix for Bug #1965847 changed the workflow in the method
# under test, so we check that none of this stuff happens any more
mock_reserve.assert_not_called()
mock_rollback.assert_not_called()
mock_commit.assert_not_called()
@mock.patch.object(objects.Backup, 'decode_record')
@mock.patch.object(quota.QUOTAS, 'commit')
@mock.patch.object(quota.QUOTAS, 'rollback')
@mock.patch.object(quota.QUOTAS, 'reserve')
def test__get_import_backup_reuse_backup(
self, mock_reserve, mock_rollback, mock_commit, mock_decode):
backup = self._create_backup_db_entry(
size=1,
status=fields.BackupStatus.DELETED)
mock_decode.return_value = {'id': backup.id,
'project_id': backup.project_id,
'user_id': backup.user_id,
'volume_id': backup.volume_id,
'size': 1}
mock_reserve.return_value = 'fake_reservation'
self.ctxt.user_id = 'fake_user'
self.ctxt.project_id = 'fake_project'
# check pre-conditions
self.assertNotEqual(self.ctxt.user_id, backup.user_id)
self.assertNotEqual(self.ctxt.project_id, backup.project_id)
self.assertEqual(fields.BackupStatus.DELETED, backup.status)
self.api._get_import_backup(self.ctxt, 'fake_backup_url')
# check post-conditions
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(self.ctxt.user_id, backup.user_id)
self.assertEqual(self.ctxt.project_id, backup.project_id)
self.assertNotEqual(fields.BackupStatus.DELETED, backup.status)
mock_reserve.assert_called_with(
self.ctxt, backups=1, backup_gigabytes=1)
mock_commit.assert_called_with(self.ctxt, 'fake_reservation')
mock_rollback.assert_not_called()
@mock.patch.object(objects.BackupImport, '__init__')
@mock.patch.object(objects.BackupImport, 'get_by_id')
@mock.patch.object(objects.Backup, 'decode_record')
@mock.patch.object(quota.QUOTAS, 'commit')
@mock.patch.object(quota.QUOTAS, 'rollback')
@mock.patch.object(quota.QUOTAS, 'reserve')
def test__get_import_backup_rollback_situation(
self, mock_reserve, mock_rollback, mock_commit, mock_decode,
mock_get_by_id, mock_imp_init):
mock_decode.return_value = {'id': fake.BACKUP_ID,
'project_id': fake.PROJECT_ID,
'user_id': fake.USER_ID,
'volume_id': fake.VOLUME_ID,
'size': 1}
# we won't find a backup, so we'll need to create one
mock_get_by_id.side_effect = exception.BackupNotFound(
backup_id=fake.BACKUP_ID)
# we should make a reservation ...
mock_reserve.return_value = 'fake_reservation'
# ... but will fail to create and will have to roll back
mock_imp_init.side_effect = FakeBackupException,
self.assertRaises(FakeBackupException,
self.api._get_import_backup,
self.ctxt, 'fake_backup_url')
mock_reserve.assert_called_with(
self.ctxt, backups=1, backup_gigabytes=1)
mock_commit.assert_not_called()
mock_rollback.assert_called_with(self.ctxt, "fake_reservation")
@mock.patch('cinder.objects.BackupList.get_all_by_volume')

View File

@ -0,0 +1,7 @@
---
fixes:
- |
`Bug #1965847 <https://bugs.launchpad.net/cinder/+bug/1965847>`_: Fixed
issue where importing a backup record for a backup_id that currently
existed had the unfortunate side effect of deleting the existing backup
record.