Add support for transferring encrypted volumes

A new microversion 3.70 adds the ability to transfer a volume's
encryption key when transferring a volume to another project.

When the volume transfer is initiated, the volume's encryption
secret is essentially transferred to the cinder service.
- The cinder service creates a new encryption_key_id that contains
  a copy of the volume's encryption secret.
- The volume (and its snapshots) is updated with the new
  encryption_key_id (the one owned by the cinder service).
- The volume's original encryption_key_id (owned by the volume's
  owner) is deleted.

When the transfer is accepted, the secret is transferred to the
user accepting the transfer.
- A new encryption_key_id is generated on behalf of the new user
  that contains a copy of the volume's encryption secret.
- The volume (and its snapshots) is updated with the new
  encryption_key_id (the one owned by the user).
- The intermediate encryption_key_id owned by the cinder service
  is deleted.

When a transfer is cancelled (deleted), the same process is used
to transfer ownship back to the user that cancelled the transfer.

Implements: blueprint transfer-encrypted-volume
Change-Id: I459f06504e90025c9c0b539981d3d56a2a9394c7
changes/49/851449/7
Alan Bishop 5 months ago
parent 4e7d338b9a
commit d59e41fb3c

@ -21,8 +21,8 @@
],
"min_version": "3.0",
"status": "CURRENT",
"updated": "2022-03-30T00:00:00Z",
"version": "3.69"
"updated": "2022-08-31T00:00:00Z",
"version": "3.70"
}
]
}

@ -21,8 +21,8 @@
],
"min_version": "3.0",
"status": "CURRENT",
"updated": "2022-03-30T00:00:00Z",
"version": "3.69"
"updated": "2022-08-31T00:00:00Z",
"version": "3.70"
}
]
}

@ -177,6 +177,8 @@ SUPPORT_REIMAGE_VOLUME = '3.68'
SHARED_TARGETS_TRISTATE = '3.69'
TRANSFER_ENCRYPTED_VOLUME = '3.70'
def get_mv_header(version):
"""Gets a formatted HTTP microversion header.

@ -155,14 +155,15 @@ REST_API_VERSION_HISTORY = """
* 3.67 - API URLs no longer need to include a project_id parameter.
* 3.68 - Support re-image volume
* 3.69 - Allow null value for shared_targets
* 3.70 - Support encrypted volume transfers
"""
# The minimum and maximum versions of the API supported
# The default api version request is defined to be the
# minimum version of the API supported.
_MIN_API_VERSION = "3.0"
_MAX_API_VERSION = "3.69"
UPDATED = "2022-04-20T00:00:00Z"
_MAX_API_VERSION = "3.70"
UPDATED = "2022-08-31T00:00:00Z"
# NOTE(cyeoh): min and max versions declared as functions so we can

@ -528,3 +528,8 @@ following meanings:
manual scans.
- ``false``: Never do locking.
- ``null``: Forced locking regardless of the iSCSI initiator.
3.70
----
Add the ability to transfer encrypted volumes and their snapshots.

@ -96,11 +96,15 @@ class VolumeTransferController(volume_transfer_v2.VolumeTransferController):
no_snapshots = strutils.bool_from_string(transfer.get('no_snapshots',
False))
req_version = req.api_version_request
allow_encrypted = req_version.matches(mv.TRANSFER_ENCRYPTED_VOLUME)
LOG.info("Creating transfer of volume %s", volume_id)
try:
new_transfer = self.transfer_api.create(context, volume_id, name,
no_snapshots=no_snapshots)
new_transfer = self.transfer_api.create(
context, volume_id, name,
no_snapshots=no_snapshots, allow_encrypted=allow_encrypted)
# Not found exception will be handled at the wsgi level
except exception.Invalid as error:
raise exc.HTTPBadRequest(explanation=error.msg)

@ -0,0 +1,107 @@
# Copyright 2022 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from castellan.common.credentials import keystone_password
from castellan.common import exception as castellan_exception
from castellan import key_manager as castellan_key_manager
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import context
from cinder import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class KeyTransfer(object):
def __init__(self, conf: cfg.ConfigOpts):
self.conf = conf
self._service_context = keystone_password.KeystonePassword(
password=conf.keystone_authtoken.password,
auth_url=conf.keystone_authtoken.auth_url,
username=conf.keystone_authtoken.username,
user_domain_name=conf.keystone_authtoken.user_domain_name,
project_name=conf.keystone_authtoken.project_name,
project_domain_name=conf.keystone_authtoken.project_domain_name)
@property
def service_context(self):
"""Returns the cinder service's context."""
return self._service_context
def transfer_key(self,
volume: objects.volume.Volume,
src_context: context.RequestContext,
dst_context: context.RequestContext) -> None:
"""Transfer the key from the src_context to the dst_context."""
key_manager = castellan_key_manager.API(self.conf)
old_encryption_key_id = volume.encryption_key_id
secret = key_manager.get(src_context, old_encryption_key_id)
try:
new_encryption_key_id = key_manager.store(dst_context, secret)
except castellan_exception.KeyManagerError:
with excutils.save_and_reraise_exception():
LOG.error("Failed to transfer the encryption key. This is "
"likely because the cinder service lacks the "
"privilege to create secrets.")
volume.encryption_key_id = new_encryption_key_id
volume.save()
snapshots = objects.snapshot.SnapshotList.get_all_for_volume(
context.get_admin_context(),
volume.id)
for snapshot in snapshots:
snapshot.encryption_key_id = new_encryption_key_id
snapshot.save()
key_manager.delete(src_context, old_encryption_key_id)
def transfer_create(context: context.RequestContext,
volume: objects.volume.Volume,
conf: cfg.ConfigOpts = CONF) -> None:
"""Transfer the key from the owner to the cinder service."""
LOG.info("Initiating transfer of encryption key for volume %s", volume.id)
key_transfer = KeyTransfer(conf)
key_transfer.transfer_key(volume,
src_context=context,
dst_context=key_transfer.service_context)
def transfer_accept(context: context.RequestContext,
volume: objects.volume.Volume,
conf: cfg.ConfigOpts = CONF) -> None:
"""Transfer the key from the cinder service to the recipient."""
LOG.info("Accepting transfer of encryption key for volume %s", volume.id)
key_transfer = KeyTransfer(conf)
key_transfer.transfer_key(volume,
src_context=key_transfer.service_context,
dst_context=context)
def transfer_delete(context: context.RequestContext,
volume: objects.volume.Volume,
conf: cfg.ConfigOpts = CONF) -> None:
"""Transfer the key from the cinder service back to the owner."""
LOG.info("Cancelling transfer of encryption key for volume %s", volume.id)
key_transfer = KeyTransfer(conf)
key_transfer.transfer_key(volume,
src_context=key_transfer.service_context,
dst_context=context)

@ -27,12 +27,14 @@ from cinder.api import microversions as mv
from cinder.api.v3 import volume_transfer as volume_transfer_v3
from cinder import context
from cinder import db
from cinder import exception
from cinder.objects import fields
from cinder import quota
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import test
from cinder.tests.unit import utils as test_utils
import cinder.transfer
@ -358,3 +360,145 @@ class VolumeTransferAPITestCase357(VolumeTransferAPITestCase):
microversion = mv.TRANSFER_WITH_HISTORY
DETAIL_LEN = 9
expect_transfer_history = True
@ddt.ddt
class VolumeTransferEncryptedAPITestCase(test.TestCase):
# NOTE:
# - The TRANSFER_ENCRYPTED_VOLUME microversion is only relevant when
# creating a volume transfer. The microversion specified when accepting
# or deleting a transfer is not relevant.
# - The tests take advantage of the fact that a project_id is no longer
# required in API URLs.
def setUp(self):
super(VolumeTransferEncryptedAPITestCase, self).setUp()
self.volume_transfer_api = cinder.transfer.API()
self.controller = volume_transfer_v3.VolumeTransferController()
self.user_ctxt = context.RequestContext(
fake.USER_ID, fake.PROJECT_ID, auth_token=True)
self.admin_ctxt = context.get_admin_context()
def _create_volume(self, encryption_key_id):
vol_type = test_utils.create_volume_type(self.admin_ctxt,
name='fake_vol_type',
testcase_instance=self)
volume = test_utils.create_volume(self.user_ctxt,
volume_type_id=vol_type.id,
testcase_instance=self,
encryption_key_id=encryption_key_id)
return volume
@mock.patch('cinder.keymgr.transfer.transfer_create')
def _create_transfer(self, volume_id, mock_key_transfer_create):
transfer = self.volume_transfer_api.create(self.admin_ctxt,
volume_id,
display_name='test',
allow_encrypted=True)
return transfer
@ddt.data(None, fake.ENCRYPTION_KEY_ID)
@mock.patch('cinder.keymgr.transfer.transfer_create')
def test_create_transfer(self,
encryption_key_id,
mock_key_transfer_create):
volume = self._create_volume(encryption_key_id)
body = {"transfer": {"name": "transfer1",
"volume_id": volume.id}}
req = webob.Request.blank('/v3/volume-transfers')
req.method = 'POST'
req.headers = mv.get_mv_header(mv.TRANSFER_ENCRYPTED_VOLUME)
req.headers['Content-Type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(HTTPStatus.ACCEPTED, res.status_int)
call_count = 0 if encryption_key_id is None else 1
self.assertEqual(mock_key_transfer_create.call_count, call_count)
def test_create_transfer_encrypted_volume_not_supported(self):
volume = self._create_volume(fake.ENCRYPTION_KEY_ID)
body = {"transfer": {"name": "transfer1",
"volume_id": volume.id}}
req = webob.Request.blank('/v3/volume-transfers')
req.method = 'POST'
req.headers = mv.get_mv_header(
mv.get_prior_version(mv.TRANSFER_ENCRYPTED_VOLUME))
req.headers['Content-Type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
res_dict = jsonutils.loads(res.body)
self.assertEqual(HTTPStatus.BAD_REQUEST, res.status_int)
self.assertEqual(('Invalid volume: '
'transferring encrypted volume is not supported'),
res_dict['badRequest']['message'])
@mock.patch('cinder.keymgr.transfer.transfer_create',
side_effect=exception.KeyManagerError('whoops!'))
def test_create_transfer_key_transfer_failed(self,
mock_key_transfer_create):
volume = self._create_volume(fake.ENCRYPTION_KEY_ID)
body = {"transfer": {"name": "transfer1",
"volume_id": volume.id}}
req = webob.Request.blank('/v3/volume-transfers')
req.method = 'POST'
req.headers = mv.get_mv_header(mv.TRANSFER_ENCRYPTED_VOLUME)
req.headers['Content-Type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(HTTPStatus.INTERNAL_SERVER_ERROR, res.status_int)
@ddt.data(None, fake.ENCRYPTION_KEY_ID)
@mock.patch('cinder.keymgr.transfer.transfer_accept')
@mock.patch('cinder.volume.api.API.accept_transfer')
def test_accept_transfer(self,
encryption_key_id,
mock_volume_accept_transfer,
mock_key_transfer_accept):
volume = self._create_volume(encryption_key_id)
transfer = self._create_transfer(volume.id)
body = {"accept": {"auth_key": transfer['auth_key']}}
req = webob.Request.blank('/v3/volume-transfers/%s/accept' % (
transfer['id']))
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(HTTPStatus.ACCEPTED, res.status_int)
call_count = 0 if encryption_key_id is None else 1
self.assertEqual(mock_key_transfer_accept.call_count, call_count)
@ddt.data(None, fake.ENCRYPTION_KEY_ID)
@mock.patch('cinder.keymgr.transfer.transfer_delete')
def test_delete_transfer(self,
encryption_key_id,
mock_key_transfer_delete):
volume = self._create_volume(encryption_key_id)
transfer = self._create_transfer(volume.id)
req = webob.Request.blank('/v3/volume-transfers/%s' % (
transfer['id']))
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(HTTPStatus.ACCEPTED, res.status_int)
call_count = 0 if encryption_key_id is None else 1
self.assertEqual(mock_key_transfer_delete.call_count, call_count)

@ -0,0 +1,178 @@
# Copyright 2022 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for encryption key transfer."""
from unittest import mock
from castellan.common.credentials import keystone_password
from oslo_config import cfg
from cinder.common import constants
from cinder import context
from cinder.keymgr import conf_key_mgr
from cinder.keymgr import transfer
from cinder import objects
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import test
from cinder.tests.unit import utils as test_utils
CONF = cfg.CONF
ENCRYPTION_SECRET = 'the_secret'
CINDER_USERNAME = 'cinder'
CINDER_PASSWORD = 'key_transfer_test'
class KeyTransferTestCase(test.TestCase):
OLD_ENCRYPTION_KEY_ID = fake.ENCRYPTION_KEY_ID
NEW_ENCRYPTION_KEY_ID = fake.ENCRYPTION_KEY2_ID
key_manager_class = ('castellan.key_manager.barbican_key_manager.'
'BarbicanKeyManager')
def setUp(self):
super(KeyTransferTestCase, self).setUp()
self.conf = CONF
self.conf.set_override('backend',
self.key_manager_class,
group='key_manager')
self.conf.set_override('username',
CINDER_USERNAME,
group='keystone_authtoken')
self.conf.set_override('password',
CINDER_PASSWORD,
group='keystone_authtoken')
self.context = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
def _create_volume_and_snapshots(self):
volume = test_utils.create_volume(
self.context,
testcase_instance=self,
encryption_key_id=self.OLD_ENCRYPTION_KEY_ID)
_ = test_utils.create_snapshot(
self.context,
volume.id,
display_name='snap_1',
testcase_instance=self,
encryption_key_id=self.OLD_ENCRYPTION_KEY_ID)
_ = test_utils.create_snapshot(
self.context,
volume.id,
display_name='snap_2',
testcase_instance=self,
encryption_key_id=self.OLD_ENCRYPTION_KEY_ID)
return volume
def _verify_service_context(self, mocked_call):
service_context = mocked_call.call_args.args[0]
self.assertIsInstance(service_context,
keystone_password.KeystonePassword)
self.assertEqual(service_context.username, CINDER_USERNAME)
self.assertEqual(service_context.password, CINDER_PASSWORD)
def _verify_encryption_key_id(self, volume_id, encryption_key_id):
volume = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual(volume.encryption_key_id, encryption_key_id)
snapshots = objects.snapshot.SnapshotList.get_all_for_volume(
self.context, volume.id)
self.assertEqual(len(snapshots), 2)
for snapshot in snapshots:
self.assertEqual(snapshot.encryption_key_id, encryption_key_id)
def _test_transfer_from_user_to_cinder(self, transfer_fn):
volume = self._create_volume_and_snapshots()
with mock.patch(
self.key_manager_class + '.get',
return_value=ENCRYPTION_SECRET) as mock_key_get, \
mock.patch(
self.key_manager_class + '.store',
return_value=self.NEW_ENCRYPTION_KEY_ID) as mock_key_store, \
mock.patch(
self.key_manager_class + '.delete') as mock_key_delete:
transfer_fn(self.context, volume)
# Verify the user's context was used to fetch and delete the
# volume's current key ID.
mock_key_get.assert_called_once_with(
self.context, self.OLD_ENCRYPTION_KEY_ID)
mock_key_delete.assert_called_once_with(
self.context, self.OLD_ENCRYPTION_KEY_ID)
# Verify the cinder service created the new key ID.
mock_key_store.assert_called_once_with(
mock.ANY, ENCRYPTION_SECRET)
self._verify_service_context(mock_key_store)
# Verify the volume (and its snaps) reference the new key ID.
self._verify_encryption_key_id(volume.id, self.NEW_ENCRYPTION_KEY_ID)
def _test_transfer_from_cinder_to_user(self, transfer_fn):
volume = self._create_volume_and_snapshots()
with mock.patch(
self.key_manager_class + '.get',
return_value=ENCRYPTION_SECRET) as mock_key_get, \
mock.patch(
self.key_manager_class + '.store',
return_value=self.NEW_ENCRYPTION_KEY_ID) as mock_key_store, \
mock.patch(
self.key_manager_class + '.delete') as mock_key_delete:
transfer_fn(self.context, volume)
# Verify the cinder service was used to fetch and delete the
# volume's current key ID.
mock_key_get.assert_called_once_with(
mock.ANY, self.OLD_ENCRYPTION_KEY_ID)
self._verify_service_context(mock_key_get)
mock_key_delete.assert_called_once_with(
mock.ANY, self.OLD_ENCRYPTION_KEY_ID)
self._verify_service_context(mock_key_delete)
# Verify the user's context created the new key ID.
mock_key_store.assert_called_once_with(
self.context, ENCRYPTION_SECRET)
# Verify the volume (and its snaps) reference the new key ID.
self._verify_encryption_key_id(volume.id, self.NEW_ENCRYPTION_KEY_ID)
def test_transfer_create(self):
self._test_transfer_from_user_to_cinder(transfer.transfer_create)
def test_transfer_accept(self):
self._test_transfer_from_cinder_to_user(transfer.transfer_accept)
def test_transfer_delete(self):
self._test_transfer_from_cinder_to_user(transfer.transfer_delete)
class KeyTransferFixedKeyTestCase(KeyTransferTestCase):
OLD_ENCRYPTION_KEY_ID = constants.FIXED_KEY_ID
NEW_ENCRYPTION_KEY_ID = constants.FIXED_KEY_ID
key_manager_class = 'cinder.keymgr.conf_key_mgr.ConfKeyManager'
def setUp(self):
super(KeyTransferFixedKeyTestCase, self).setUp()
self.conf.register_opts(conf_key_mgr.key_mgr_opts, group='key_manager')
self.conf.set_override('fixed_key',
'df393fca58657e6dc76a6fea31c3e7e0',
group='key_manager')

@ -19,7 +19,6 @@ import ddt
from cinder.api.v3 import volume_transfer
from cinder import context
from cinder import exception
from cinder.objects import volume as volume_obj
from cinder.policies import volume_transfer as vol_transfer_policies
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit.policies import base
@ -98,19 +97,15 @@ class VolumeTransferPolicyTest(base.BasePolicyTest):
testcase_instance=self)
return volume
@mock.patch.object(volume_obj.Volume, 'get_by_id')
def _create_volume_transfer(self, mock_get_vol, volume=None):
def _create_volume_transfer(self, volume=None):
if not volume:
volume = self._create_volume()
mock_get_vol.return_value = volume
return self.volume_transfer_api.create(context.get_admin_context(),
volume.id, 'test-transfer')
@ddt.data(*base.all_users)
@mock.patch.object(volume_obj.Volume, 'get_by_id')
def test_create_volume_transfer_policy(self, user_id, mock_get_vol):
def test_create_volume_transfer_policy(self, user_id):
volume = self._create_volume()
mock_get_vol.return_value = volume
rule_name = vol_transfer_policies.CREATE_POLICY
url = self.api_path
req = fake_api.HTTPRequest.blank(url)
@ -126,8 +121,7 @@ class VolumeTransferPolicyTest(base.BasePolicyTest):
body=body)
@ddt.data(*base.all_users)
@mock.patch.object(volume_obj.Volume, 'get_by_id')
def test_get_volume_transfer_policy(self, user_id, mock_get_vol):
def test_get_volume_transfer_policy(self, user_id):
vol_transfer = self._create_volume_transfer()
rule_name = vol_transfer_policies.GET_POLICY
url = '%s/%s' % (self.api_path, vol_transfer['id'])
@ -176,10 +170,8 @@ class VolumeTransferPolicyTest(base.BasePolicyTest):
self.assertEqual(transfer_count, len(transfers))
@ddt.data(*base.all_users)
@mock.patch.object(volume_obj.Volume, 'get_by_id')
@mock.patch.object(volume_utils, 'notify_about_volume_usage')
def test_delete_volume_transfer_policy(self, user_id, mock_get_vol,
mock_notify):
def test_delete_volume_transfer_policy(self, user_id, mock_notify):
vol_transfer = self._create_volume_transfer()
rule_name = vol_transfer_policies.DELETE_POLICY
url = '%s/%s' % (self.api_path, vol_transfer['id'])
@ -196,13 +188,10 @@ class VolumeTransferPolicyTest(base.BasePolicyTest):
@ddt.data(*base.all_users)
@mock.patch('cinder.transfer.api.QUOTAS')
@mock.patch.object(volume_obj.Volume, 'get_by_id')
@mock.patch.object(volume_utils, 'notify_about_volume_usage')
def test_accept_volume_transfer_policy(self, user_id, mock_notify,
mock_get_vol, mock_quotas):
volume = self._create_volume()
vol_transfer = self._create_volume_transfer(volume=volume)
mock_get_vol.return_value = volume
mock_quotas):
vol_transfer = self._create_volume_transfer()
rule_name = vol_transfer_policies.ACCEPT_POLICY
url = '%s/%s/accept' % (self.api_path, vol_transfer['id'])
req = fake_api.HTTPRequest.blank(url)

@ -31,6 +31,7 @@ import six
from cinder.db import base
from cinder import exception
from cinder.i18n import _
from cinder.keymgr import transfer as key_transfer
from cinder import objects
from cinder.policies import volume_transfer as policy
from cinder import quota
@ -76,6 +77,8 @@ class API(base.Base):
"transfer.delete.start")
if volume_ref['status'] != 'awaiting-transfer':
LOG.error("Volume in unexpected state")
if volume_ref.encryption_key_id is not None:
key_transfer.transfer_delete(context, volume_ref, conf=CONF)
self.db.transfer_destroy(context, transfer_id)
volume_utils.notify_about_volume_usage(context, volume_ref,
"transfer.delete.end")
@ -126,16 +129,23 @@ class API(base.Base):
auth_key = auth_key.encode('utf-8')
return hmac.new(salt, auth_key, hashlib.sha1).hexdigest()
def create(self, context, volume_id, display_name, no_snapshots=False):
def create(self, context, volume_id, display_name, no_snapshots=False,
allow_encrypted=False):
"""Creates an entry in the transfers table."""
LOG.info("Generating transfer record for volume %s", volume_id)
volume_ref = objects.Volume.get_by_id(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume_ref)
if volume_ref['status'] != "available":
raise exception.InvalidVolume(reason=_("status must be available"))
if volume_ref['encryption_key_id'] is not None:
raise exception.InvalidVolume(
reason=_("transferring encrypted volume is not supported"))
if volume_ref.encryption_key_id is not None:
if not allow_encrypted:
raise exception.InvalidVolume(
reason=_("transferring encrypted volume is not supported"))
if no_snapshots:
raise exception.InvalidVolume(
reason=_("transferring an encrypted volume without its "
"snapshots is not supported"))
if not no_snapshots:
snapshots = self.db.snapshot_get_all_for_volume(context, volume_id)
@ -144,10 +154,6 @@ class API(base.Base):
msg = _("snapshot: %s status must be "
"available") % snapshot['id']
raise exception.InvalidSnapshot(reason=msg)
if snapshot.get('encryption_key_id'):
msg = _("snapshot: %s encrypted snapshots cannot be "
"transferred") % snapshot['id']
raise exception.InvalidSnapshot(reason=msg)
volume_utils.notify_about_volume_usage(context, volume_ref,
"transfer.create.start")
@ -170,6 +176,15 @@ class API(base.Base):
except Exception:
LOG.error("Failed to create transfer record for %s", volume_id)
raise
if volume_ref.encryption_key_id is not None:
try:
key_transfer.transfer_create(context, volume_ref, conf=CONF)
except Exception:
LOG.error("Failed to transfer keys for %s", volume_id)
self.db.transfer_destroy(context, transfer.id)
raise
volume_utils.notify_about_volume_usage(context, volume_ref,
"transfer.create.end")
return {'id': transfer['id'],
@ -284,6 +299,8 @@ class API(base.Base):
volume_utils.notify_about_volume_usage(context, vol_ref,
"transfer.accept.start")
encryption_key_transferred = False
try:
# Transfer ownership of the volume now, must use an elevated
# context.
@ -292,6 +309,10 @@ class API(base.Base):
context.user_id,
context.project_id,
transfer['no_snapshots'])
if vol_ref.encryption_key_id is not None:
key_transfer.transfer_accept(context, vol_ref, conf=CONF)
encryption_key_transferred = True
self.db.transfer_accept(context.elevated(),
transfer_id,
context.user_id,
@ -306,6 +327,11 @@ class API(base.Base):
QUOTAS.commit(context, snap_donor_res, project_id=donor_id)
LOG.info("Volume %s has been transferred.", volume_id)
except Exception:
# If an exception occurs after the encryption key was transferred
# then we need to transfer the key *back* to the service project.
# This is done by making another key transfer request.
if encryption_key_transferred:
key_transfer.transfer_create(context, vol_ref, conf=CONF)
with excutils.save_and_reraise_exception():
QUOTAS.rollback(context, reservations)
if snap_res:

@ -506,6 +506,9 @@ Starting with microversion 3.55 and later, Cinder supports the ability to
transfer volume without snapshots. If users don't want to transfer snapshots,
they need to specify the new optional argument `--no-snapshots`.
Starting with microversion 3.70 and later, Cinder supports the ability to
transfer encrypted volumes. Snapshots must be transferred with the volume.
.. note::
The procedure for volume transfer is intended for projects (both the

@ -11,6 +11,7 @@ cinder/i18n.py
cinder/image/cache.py
cinder/image/glance.py
cinder/image/image_utils.py
cinder/keymgr/transfer.py
cinder/exception.py
cinder/manager.py
cinder/objects/backup.py

@ -0,0 +1,8 @@
---
features:
- |
Starting with API microversion 3.70, encrypted volumes can be transferred
to a user in a different project. Prior to microversion 3.70, the transfer
is blocked due to the inability to transfer ownership of the volume's
encryption key. With microverson 3.70, ownership of the encryption key is
transferred when the volume is transferred.
Loading…
Cancel
Save