Fix how backups handle encryption key IDs

As described in the launchpad bug [1], backup operations must take care
to ensure encryption key ID resources aren't lost, and that restored
volumes always have a unique encryption key ID.

[1] https://bugs.launchpad.net/cinder/+bug/1745180

This patch adds an 'encryption_key_id' column to the backups table. Now,
when a backup is created and the source volume's encryption key is
cloned, the cloned key ID is stored in the table. This makes it possible
to delete the cloned key ID when the backup is deleted. The code that
clones the volume's encryption key has been relocated from the common
backup driver layer to the backup manager. The backup manager now has
full responsibility for managing encryption key IDs.

When restoring a backup of an encrypted volume, the backup manager now
does this:
1) If the restored volume's encryption key ID has changed, delete the
   key ID it had prior to the restore operation. This ensures no key IDs
   are leaked.
2) If the 'encryption_key_id' field in the backup table is empty, glean
   the backup's cloned key ID from the backup's "volume base metadata."
   This helps populate the 'encryption_key_id' column for backup table
   entries created prior to when the column existed.
3) Re-clone the backup's key ID to ensure the restored volume's key ID
   is always unique.

Closes-Bug: #1745180
Change-Id: I6cadcbf839d146b2fd57d7019f73dce303f9e10b
This commit is contained in:
Alan Bishop
2018-01-24 22:07:48 +00:00
parent 6bbab2ff57
commit bec756e040
11 changed files with 381 additions and 33 deletions

View File

@@ -20,7 +20,7 @@ import os
import uuid
import mock
from os_brick.initiator.connectors import fake
from os_brick.initiator.connectors import fake as fake_connectors
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import importutils
@@ -36,6 +36,7 @@ from cinder import objects
from cinder.objects import fields
from cinder import test
from cinder.tests import fake_driver
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import utils
from cinder.volume import rpcapi as volume_rpcapi
@@ -82,7 +83,8 @@ class BaseBackupTest(test.TestCase):
temp_snapshot_id=None,
snapshot_id=None,
metadata=None,
parent_id=None):
parent_id=None,
encryption_key_id=None):
"""Create a backup entry in the DB.
Return the entry ID
@@ -107,6 +109,7 @@ class BaseBackupTest(test.TestCase):
kwargs['temp_volume_id'] = temp_volume_id
kwargs['temp_snapshot_id'] = temp_snapshot_id
kwargs['metadata'] = metadata or {}
kwargs['encryption_key_id'] = encryption_key_id
backup = objects.Backup(context=self.ctxt, **kwargs)
backup.create()
return backup
@@ -116,7 +119,8 @@ class BaseBackupTest(test.TestCase):
status='backing-up',
previous_status='available',
size=1,
host='testhost'):
host='testhost',
encryption_key_id=None):
"""Create a volume entry in the DB.
Return the entry ID
@@ -132,6 +136,7 @@ class BaseBackupTest(test.TestCase):
vol['attach_status'] = fields.VolumeAttachStatus.DETACHED
vol['availability_zone'] = '1'
vol['previous_status'] = previous_status
vol['encryption_key_id'] = encryption_key_id
volume = objects.Volume(context=self.ctxt, **vol)
volume.create()
return volume.id
@@ -430,7 +435,7 @@ class BackupTestCase(BaseBackupTest):
self.assertEqual('error_restoring', volume.status)
def test_cleanup_one_deleting_backup(self):
"""Test cleanup_one_backup for volume status 'deleting'."""
"""Test cleanup_one_backup for backup status 'deleting'."""
self.override_config('backup_service_inithost_offload', False)
backup = self._create_backup_db_entry(
@@ -443,6 +448,21 @@ class BackupTestCase(BaseBackupTest):
self.ctxt,
backup.id)
def test_cleanup_one_deleting_encrypted_backup(self):
"""Test cleanup of backup status 'deleting' (encrypted)."""
self.override_config('backup_service_inithost_offload', False)
backup = self._create_backup_db_entry(
status=fields.BackupStatus.DELETING,
encryption_key_id=fake.ENCRYPTION_KEY_ID)
self.backup_mgr._cleanup_one_backup(self.ctxt, backup)
backup = db.backup_get(self.ctxt, backup.id)
self.assertIsNotNone(backup)
self.assertEqual(fields.BackupStatus.ERROR_DELETING,
backup.status)
def test_detach_all_attachments_handles_exceptions(self):
"""Test detach_all_attachments with exceptions."""
@@ -636,6 +656,7 @@ class BackupTestCase(BaseBackupTest):
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
self.assertIsNone(backup.encryption_key_id)
@mock.patch('cinder.utils.brick_get_connector_properties')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.get_backup_device')
@@ -831,7 +852,7 @@ class BackupTestCase(BaseBackupTest):
attach_info = {
'device': {'path': '/dev/null'},
'conn': {'data': {}},
'connector': fake.FakeConnector(None)}
'connector': fake_connectors.FakeConnector(None)}
mock_terminate_connection_snapshot = self.mock_object(
volume_rpcapi.VolumeAPI,
'terminate_connection_snapshot')
@@ -926,6 +947,58 @@ class BackupTestCase(BaseBackupTest):
self.backup_mgr.create_backup(self.ctxt, backup)
self.assertEqual(2, notify.call_count)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.get_backup_device')
@mock.patch('cinder.volume.utils.clone_encryption_key')
@mock.patch('cinder.utils.brick_get_connector_properties')
def test_create_backup_encrypted_volume(self,
mock_connector_properties,
mock_clone_encryption_key,
mock_get_backup_device):
"""Test backup of encrypted volume.
Test whether the volume's encryption key ID is cloned and
saved in the backup.
"""
vol_id = self._create_volume_db_entry(encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(volume_id=vol_id)
self.mock_object(self.backup_mgr, '_detach_device')
mock_attach_device = self.mock_object(self.backup_mgr,
'_attach_device')
mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
mock_clone_encryption_key.return_value = fake.UUID2
self.backup_mgr.create_backup(self.ctxt, backup)
mock_clone_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID1)
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fake.UUID2, backup.encryption_key_id)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.get_backup_device')
@mock.patch('cinder.volume.utils.clone_encryption_key')
@mock.patch('cinder.utils.brick_get_connector_properties')
def test_create_backup_encrypted_volume_again(self,
mock_connector_properties,
mock_clone_encryption_key,
mock_get_backup_device):
"""Test backup of encrypted volume.
Test when the backup already has a clone of the volume's encryption
key ID.
"""
vol_id = self._create_volume_db_entry(encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(volume_id=vol_id,
encryption_key_id=fake.UUID2)
self.mock_object(self.backup_mgr, '_detach_device')
mock_attach_device = self.mock_object(self.backup_mgr,
'_attach_device')
mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
self.backup_mgr.create_backup(self.ctxt, backup)
mock_clone_encryption_key.assert_not_called()
def test_restore_backup_with_bad_volume_status(self):
"""Test error handling.
@@ -1064,6 +1137,159 @@ class BackupTestCase(BaseBackupTest):
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
self.assertEqual(2, notify.call_count)
@mock.patch('cinder.volume.utils.clone_encryption_key')
@mock.patch('cinder.volume.utils.delete_encryption_key')
@mock.patch(
'cinder.tests.unit.backup.fake_service.FakeBackupService.restore')
@mock.patch('cinder.utils.brick_get_connector_properties')
def test_restore_backup_encrypted_volume(self,
mock_connector_properties,
mock_backup_driver_restore,
mock_delete_encryption_key,
mock_clone_encryption_key):
"""Test restore of encrypted volume.
Test restoring a volume from its own backup. In this situation,
the volume's encryption key ID shouldn't change.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(
volume_id=vol_id,
status=fields.BackupStatus.RESTORING,
encryption_key_id=fake.UUID2)
self.mock_object(self.backup_mgr, '_detach_device')
mock_attach_device = self.mock_object(self.backup_mgr,
'_attach_device')
mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
volume = db.volume_get(self.ctxt, vol_id)
self.assertEqual(fake.UUID1, volume.encryption_key_id)
mock_clone_encryption_key.assert_not_called()
mock_delete_encryption_key.assert_not_called()
@mock.patch('cinder.volume.utils.clone_encryption_key')
@mock.patch('cinder.volume.utils.delete_encryption_key')
@mock.patch(
'cinder.tests.unit.backup.fake_service.FakeBackupService.restore')
@mock.patch('cinder.utils.brick_get_connector_properties')
def test_restore_backup_new_encrypted_volume(self,
mock_connector_properties,
mock_backup_driver_restore,
mock_delete_encryption_key,
mock_clone_encryption_key):
"""Test restore of encrypted volume.
Test handling of encryption key IDs when retoring to another
encrypted volume, i.e. a volume whose key ID is different from
the volume originally backed up.
- The volume's prior encryption key ID is deleted.
- The volume is assigned a fresh clone of the backup's encryption
key ID.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(
volume_id=vol_id,
status=fields.BackupStatus.RESTORING,
encryption_key_id=fake.UUID2)
self.mock_object(self.backup_mgr, '_detach_device')
mock_attach_device = self.mock_object(self.backup_mgr,
'_attach_device')
mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
mock_clone_encryption_key.return_value = fake.UUID3
# Mimic the driver's side effect where it updates the volume's
# metadata. For backups of encrypted volumes, this will essentially
# overwrite the volume's encryption key ID prior to the restore.
def restore_side_effect(backup, volume_id, volume_file):
db.volume_update(self.ctxt,
volume_id,
{'encryption_key_id': fake.UUID4})
mock_backup_driver_restore.side_effect = restore_side_effect
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
# Volume's original encryption key ID should be deleted
mock_delete_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID1)
# Backup's encryption key ID should have been cloned
mock_clone_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID2)
# Volume should have the cloned backup key ID
volume = db.volume_get(self.ctxt, vol_id)
self.assertEqual(fake.UUID3, volume.encryption_key_id)
# Backup's key ID should not have changed
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fake.UUID2, backup.encryption_key_id)
@mock.patch('cinder.volume.utils.clone_encryption_key')
@mock.patch('cinder.volume.utils.delete_encryption_key')
@mock.patch(
'cinder.tests.unit.backup.fake_service.FakeBackupService.restore')
@mock.patch('cinder.utils.brick_get_connector_properties')
def test_restore_backup_glean_key_id(self,
mock_connector_properties,
mock_backup_driver_restore,
mock_delete_encryption_key,
mock_clone_encryption_key):
"""Test restore of encrypted volume.
Test restoring a backup that was created prior to when the encryption
key ID is saved in the backup DB. The backup encryption key ID is
gleaned from the restored volume.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(
volume_id=vol_id,
status=fields.BackupStatus.RESTORING)
self.mock_object(self.backup_mgr, '_detach_device')
mock_attach_device = self.mock_object(self.backup_mgr,
'_attach_device')
mock_attach_device.return_value = {'device': {'path': '/dev/null'}}
mock_clone_encryption_key.return_value = fake.UUID3
# Mimic the driver's side effect where it updates the volume's
# metadata. For backups of encrypted volumes, this will essentially
# overwrite the volume's encryption key ID prior to the restore.
def restore_side_effect(backup, volume_id, volume_file):
db.volume_update(self.ctxt,
volume_id,
{'encryption_key_id': fake.UUID4})
mock_backup_driver_restore.side_effect = restore_side_effect
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
# Volume's original encryption key ID should be deleted
mock_delete_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID1)
# Backup's encryption key ID should have been cloned from
# the value restored from the metadata.
mock_clone_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID4)
# Volume should have the cloned backup key ID
volume = db.volume_get(self.ctxt, vol_id)
self.assertEqual(fake.UUID3, volume.encryption_key_id)
# Backup's key ID should have been gleaned from value restored
# from the backup's metadata
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fake.UUID4, backup.encryption_key_id)
def test_delete_backup_with_bad_backup_status(self):
"""Test error handling.
@@ -1144,6 +1370,25 @@ class BackupTestCase(BaseBackupTest):
self.assertGreaterEqual(timeutils.utcnow(), backup.deleted_at)
self.assertEqual(fields.BackupStatus.DELETED, backup.status)
@mock.patch('cinder.volume.utils.delete_encryption_key')
def test_delete_backup_of_encrypted_volume(self,
mock_delete_encryption_key):
"""Test deletion of backup of encrypted volume"""
vol_id = self._create_volume_db_entry(
encryption_key_id=fake.UUID1)
backup = self._create_backup_db_entry(
volume_id=vol_id,
status=fields.BackupStatus.DELETING,
encryption_key_id=fake.UUID2)
self.backup_mgr.delete_backup(self.ctxt, backup)
mock_delete_encryption_key.assert_called_once_with(self.ctxt,
mock.ANY,
fake.UUID2)
ctxt_read_deleted = context.get_admin_context('yes')
backup = db.backup_get(ctxt_read_deleted, backup.id)
self.assertTrue(backup.deleted)
self.assertIsNone(backup.encryption_key_id)
@mock.patch('cinder.volume.utils.notify_about_backup_usage')
def test_delete_backup_with_notify(self, notify):
"""Test normal backup deletion with notifications."""