diff --git a/cinder/backup/driver.py b/cinder/backup/driver.py index ad08a9f7ab9..10598300181 100644 --- a/cinder/backup/driver.py +++ b/cinder/backup/driver.py @@ -26,6 +26,7 @@ import six from cinder.db import base from cinder import exception from cinder.i18n import _ +from cinder.volume import utils as vol_utils service_opts = [ cfg.IntOpt('backup_metadata_version', default=2, @@ -97,10 +98,9 @@ class BackupMetadataAPI(base.Base): continue # Copy the encryption key UUID for backup if key is 'encryption_key_id' and value is not None: - value = self._key_manager.store( - self.context, - self._key_manager.get(self.context, value) - ) + value = vol_utils.clone_encryption_key(self.context, + self._key_manager, + value) LOG.debug("Copying encryption key UUID for backup.") container[type_tag][key] = value diff --git a/cinder/tests/unit/volume/flows/test_create_volume_flow.py b/cinder/tests/unit/volume/flows/test_create_volume_flow.py index f92038f204e..9c4815608cc 100644 --- a/cinder/tests/unit/volume/flows/test_create_volume_flow.py +++ b/cinder/tests/unit/volume/flows/test_create_volume_flow.py @@ -463,7 +463,7 @@ class CreateVolumeFlowTestCase(test.TestCase): group_snapshot=None, backup=None) - mock_is_encrypted.assert_called_once_with(self.ctxt, 1) + mock_is_encrypted.assert_called_with(self.ctxt, 1) mock_get_volume_type_encryption.assert_called_once_with(self.ctxt, 1) @mock.patch('cinder.volume.volume_types.is_encrypted') diff --git a/cinder/tests/unit/volume/test_volume.py b/cinder/tests/unit/volume/test_volume.py index 2e108e7e525..d2206930b45 100644 --- a/cinder/tests/unit/volume/test_volume.py +++ b/cinder/tests/unit/volume/test_volume.py @@ -20,6 +20,7 @@ import ddt import time import uuid +from castellan.common import exception as castellan_exception from castellan import key_manager import enum import eventlet @@ -780,6 +781,40 @@ class VolumeTestCase(base.BaseVolumeTestCase): self.assertEqual("error_deleting", volume.status) volume.destroy() + @mock.patch.object(key_manager, 'API', fake_keymgr.fake_api) + def test_delete_encrypted_volume_key_not_found(self): + cipher = 'aes-xts-plain64' + key_size = 256 + db.volume_type_create(self.context, + {'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'}) + db.volume_type_encryption_create( + self.context, fake.VOLUME_TYPE_ID, + {'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER, + 'cipher': cipher, 'key_size': key_size}) + + db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS') + + volume = self.volume_api.create(self.context, + 1, + 'name', + 'description', + volume_type=db_vol_type) + + volume_id = volume['id'] + volume['host'] = 'fake_host' + volume['status'] = 'available' + db.volume_update(self.context, volume_id, {'status': 'available'}) + + with mock.patch.object( + self.volume_api.key_manager, + 'delete', + side_effect=castellan_exception.ManagedObjectNotFoundError): + self.volume_api.delete(self.context, volume) + + volume = objects.Volume.get_by_id(self.context, volume_id) + self.assertEqual("deleting", volume.status) + volume.destroy() + def test_delete_busy_volume(self): """Test volume survives deletion if driver reports it as busy.""" volume = tests_utils.create_volume(self.context, **self.volume_params) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 423402421f9..f0db4e73ff0 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -481,7 +481,9 @@ class API(base.Base): encryption_key_id = volume.get('encryption_key_id', None) if encryption_key_id is not None: try: - self.key_manager.delete(context, encryption_key_id) + volume_utils.delete_encryption_key(context, + self.key_manager, + encryption_key_id) except Exception as e: volume.update({'status': 'error_deleting'}) volume.save() diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index 6d026078c6a..9df196151de 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -11,7 +11,6 @@ # under the License. -from castellan.common import exception as castellan_exc from oslo_config import cfg from oslo_log import log as logging from oslo_utils import units @@ -391,30 +390,15 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): # Clone the existing key and associate a separate -- but # identical -- key with each volume. if encryption_key_id is not None: - encryption_key_id = key_manager.store( - context, key_manager.get(context, encryption_key_id)) + encryption_key_id = vol_utils.clone_encryption_key( + context, + key_manager, + encryption_key_id) else: - volume_type_encryption = ( - volume_types.get_volume_type_encryption(context, - volume_type_id)) - cipher = volume_type_encryption.cipher - length = volume_type_encryption.key_size - - # NOTE(kaitlin-farr): dm-crypt expects the cipher in a - # hyphenated format (aes-xts-plain64). The algorithm needs - # to be parsed out to pass to the key manager (aes). - algorithm = cipher.split('-')[0] if cipher else None - try: - encryption_key_id = key_manager.create_key( - context, - algorithm=algorithm, - length=length) - except castellan_exc.KeyManagerError: - # The messaging back to the client here is - # purposefully terse, so we don't leak any sensitive - # details. - LOG.exception("Key manager error") - raise exception.Invalid(message="Key manager error") + encryption_key_id = vol_utils.create_encryption_key( + context, + key_manager, + volume_type_id) return encryption_key_id diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index b45a9c43a8a..8fe83068c9a 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -25,6 +25,7 @@ import re import time import uuid +from castellan.common import exception as castellan_exception import eventlet from eventlet import tpool from oslo_concurrency import processutils @@ -892,13 +893,37 @@ def create_encryption_key(context, key_manager, volume_type_id): cipher = volume_type_encryption.cipher length = volume_type_encryption.key_size algorithm = cipher.split('-')[0] if cipher else None - encryption_key_id = key_manager.create_key( - context, - algorithm=algorithm, - length=length) + try: + encryption_key_id = key_manager.create_key( + context, + algorithm=algorithm, + length=length) + except castellan_exception.KeyManagerError: + # The messaging back to the client here is + # purposefully terse, so we don't leak any sensitive + # details. + LOG.exception("Key manager error") + raise exception.Invalid(message="Key manager error") + return encryption_key_id +def delete_encryption_key(context, key_manager, encryption_key_id): + try: + key_manager.delete(context, encryption_key_id) + except castellan_exception.ManagedObjectNotFoundError: + pass + + +def clone_encryption_key(context, key_manager, encryption_key_id): + clone_key_id = None + if encryption_key_id is not None: + clone_key_id = key_manager.store( + context, + key_manager.get(context, encryption_key_id)) + return clone_key_id + + def is_replicated_str(str): spec = (str or '').split() return (len(spec) == 2 and