diff --git a/cinder/tests/unit/volume/test_volume.py b/cinder/tests/unit/volume/test_volume.py index 1fd043b6f01..c5a1de6c815 100644 --- a/cinder/tests/unit/volume/test_volume.py +++ b/cinder/tests/unit/volume/test_volume.py @@ -21,6 +21,7 @@ import io import time from unittest import mock +import castellan from castellan.common import exception as castellan_exception from castellan import key_manager import ddt @@ -88,6 +89,16 @@ def create_snapshot(volume_id, size=1, metadata=None, ctxt=None, return snap +class KeyObject(object): + def get_encoded(arg): + return "asdf".encode('utf-8') + + +class KeyObject2(object): + def get_encoded(arg): + return "qwert".encode('utf-8') + + @ddt.ddt class VolumeTestCase(base.BaseVolumeTestCase): @@ -1766,6 +1777,40 @@ class VolumeTestCase(base.BaseVolumeTestCase): mock_at.assert_called() mock_det.assert_called() + @mock.patch('cinder.db.sqlalchemy.api.volume_encryption_metadata_get') + def test_setup_encryption_keys(self, mock_enc_metadata_get): + key_mgr = fake_keymgr.fake_api() + self.mock_object(castellan.key_manager, 'API', return_value=key_mgr) + key_id = key_mgr.store(self.context, KeyObject()) + key2_id = key_mgr.store(self.context, KeyObject2()) + + params = {'status': 'creating', + 'size': 1, + 'host': CONF.host, + 'encryption_key_id': key_id} + vol = tests_utils.create_volume(self.context, **params) + + self.volume.create_volume(self.context, vol) + db.volume_update(self.context, + vol['id'], + {'encryption_key_id': key_id}) + + mock_enc_metadata_get.return_value = {'cipher': 'aes-xts-plain64', + 'key_size': 256, + 'provider': 'luks'} + ctxt = context.get_admin_context() + + enc_info = {'encryption_key_id': key_id} + with mock.patch('cinder.volume.volume_utils.create_encryption_key', + return_value=key2_id): + r = cinder.volume.flows.manager.create_volume.\ + CreateVolumeFromSpecTask._setup_encryption_keys(ctxt, + vol, + enc_info) + (source_pass, new_pass, new_key_id) = r + self.assertNotEqual(source_pass, new_pass) + self.assertEqual(new_key_id, key2_id) + @mock.patch.object(key_manager, 'API', fake_keymgr.fake_api) def test_create_volume_from_snapshot_with_encryption(self): """Test volume can be created from a snapshot of an encrypted volume""" diff --git a/cinder/volume/flows/manager/create_volume.py b/cinder/volume/flows/manager/create_volume.py index 3b8899d46d4..f8468dbd8e2 100644 --- a/cinder/volume/flows/manager/create_volume.py +++ b/cinder/volume/flows/manager/create_volume.py @@ -496,7 +496,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): new_key_id = volume_utils.create_encryption_key(context, keymgr, volume.volume_type_id) - new_key = keymgr.get(context, encryption['encryption_key_id']) + new_key = keymgr.get(context, new_key_id) new_pass = binascii.hexlify(new_key.get_encoded()).decode('utf-8') return (source_pass, new_pass, new_key_id) diff --git a/releasenotes/notes/bug-1904440-clone-rekey-fd57a2b5f6224e0f.yaml b/releasenotes/notes/bug-1904440-clone-rekey-fd57a2b5f6224e0f.yaml new file mode 100644 index 00000000000..b2401787491 --- /dev/null +++ b/releasenotes/notes/bug-1904440-clone-rekey-fd57a2b5f6224e0f.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + `Bug #1904440 `_: + When an iSCSI/FC encrypted volume was cloned, the rekey operation would + stamp the wrong encryption key on the newly cloned volume. This resulted + in a volume that could not be attached. It does not present a security + problem.