Rekey volume on clone

When cloning an encrypted volume, change the
encryption key used on the destination volume.

This is currently implemented for iSCSI/FC
drivers only.

Change-Id: Id797af4f8ff001ec3d55cb4eda19988a314b700d
This commit is contained in:
Eric Harney 2019-07-09 16:30:09 -04:00
parent b1de0652ca
commit 330f1ae453
7 changed files with 269 additions and 1 deletions

View File

@ -1095,3 +1095,7 @@ class ServiceUserTokenNoAuth(CinderException):
message = _("The [service_user] send_service_user_token option was "
"requested, but no service auth could be loaded. Please check "
"the [service_user] configuration section.")
class RekeyNotSupported(CinderException):
message = _("Rekey not supported.")

View File

@ -29,6 +29,7 @@ CGSNAPSHOT3_ID = '5f392156-fc03-492a-9cb8-e46a7eedaf33'
CONSISTENCY_GROUP_ID = 'f18abf73-79ee-4f2b-8d4f-1c044148f117'
CONSISTENCY_GROUP2_ID = '8afc8952-9dce-4228-9f8a-706c5cb5fc82'
ENCRYPTION_KEY_ID = 'e8387001-745d-45d0-9e4e-0473815ef09a'
ENCRYPTION_KEY2_ID = 'fa0dc8ce-79a4-4162-846f-c731b99f3113'
IMAGE_ID = 'e79161cd-5f9d-4007-8823-81a807a64332'
INSTANCE_ID = 'fa617131-cdbc-45dc-afff-f21f17ae054e'
IN_USE_ID = '8ee42073-4ac2-4099-8c7a-d416630e6aee'

View File

@ -56,6 +56,7 @@ from cinder.volume import manager as vol_manager
from cinder.volume import rpcapi as volume_rpcapi
import cinder.volume.targets.tgt
from cinder.volume import volume_types
import os_brick.initiator.connectors.iscsi
QUOTAS = quota.QUOTAS
@ -1577,6 +1578,115 @@ class VolumeTestCase(base.BaseVolumeTestCase):
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
@ddt.data({'connector_class':
os_brick.initiator.connectors.iscsi.ISCSIConnector,
'rekey_supported': True,
'already_encrypted': 'yes'},
{'connector_class':
os_brick.initiator.connectors.iscsi.ISCSIConnector,
'rekey_supported': True,
'already_encrypted': 'no'},
{'connector_class':
os_brick.initiator.connectors.rbd.RBDConnector,
'rekey_supported': False,
'already_encrypted': 'no'})
@ddt.unpack
@mock.patch('cinder.volume.volume_utils.delete_encryption_key')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask._setup_encryption_keys')
@mock.patch('cinder.db.sqlalchemy.api.volume_encryption_metadata_get')
@mock.patch('cinder.image.image_utils.qemu_img_info')
@mock.patch('cinder.volume.driver.VolumeDriver._detach_volume')
@mock.patch('cinder.volume.driver.VolumeDriver._attach_volume')
@mock.patch('cinder.utils.brick_get_connector_properties')
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_with_enc(
self, mock_execute, mock_brick_gcp, mock_at, mock_det,
mock_qemu_img_info, mock_enc_metadata_get, mock_setup_enc_keys,
mock_del_enc_key, connector_class=None, rekey_supported=None,
already_encrypted=None):
# create source volume
mock_execute.return_value = ('', '')
mock_enc_metadata_get.return_value = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'luks'}
mock_setup_enc_keys.return_value = (
'qwert',
'asdfg',
fake.ENCRYPTION_KEY2_ID)
params = {'status': 'creating',
'size': 1,
'host': CONF.host,
'encryption_key_id': fake.ENCRYPTION_KEY_ID}
src_vol = tests_utils.create_volume(self.context, **params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
db.volume_update(self.context,
src_vol['id'],
{'encryption_key_id': fake.ENCRYPTION_KEY_ID})
# create volume from source volume
params['encryption_key_id'] = fake.ENCRYPTION_KEY2_ID
attach_info = {
'connector': connector_class(None),
'device': {'path': '/some/device/thing'}}
mock_at.return_value = (attach_info, src_vol)
img_info = imageutils.QemuImgInfo()
if already_encrypted:
# defaults to None when not encrypted
img_info.encrypted = 'yes'
img_info.file_format = 'raw'
mock_qemu_img_info.return_value = img_info
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**params)
self.volume.create_volume(self.context, dst_vol)
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
mock_del_enc_key.assert_not_called()
if rekey_supported:
mock_setup_enc_keys.assert_called_once_with(
mock.ANY,
src_vol,
{'key_size': 256,
'provider': 'luks',
'cipher': 'aes-xts-plain64'}
)
if already_encrypted:
mock_execute.assert_called_once_with(
'cryptsetup', 'luksChangeKey',
'/some/device/thing',
log_errors=processutils.LOG_ALL_ERRORS,
process_input='qwert\nasdfg\n',
run_as_root=True)
else:
mock_execute.assert_called_once_with(
'cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks1',
'--cipher', 'aes-xts-plain64', '--key-size', '256',
'--key-file=-', '/some/device/thing',
process_input='asdfg',
run_as_root=True)
else:
mock_setup_enc_keys.assert_not_called()
mock_execute.assert_not_called()
mock_at.assert_called()
mock_det.assert_called()
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of an encrypted volume"""

View File

@ -424,7 +424,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
volume_type_id,
snapshot,
source_volume,
image_meta)
image_meta) # new key id that's been cloned already
if volume_type_id:
volume_type = objects.VolumeType.get_by_name_or_id(

View File

@ -10,13 +10,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import traceback
from castellan import key_manager
import os_brick.initiator.connectors
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import fileutils
from oslo_utils import timeutils
import six
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow.types import failure as ft
@ -55,6 +60,11 @@ IMAGE_ATTRIBUTES = (
'size',
)
REKEY_SUPPORTED_CONNECTORS = (
os_brick.initiator.connectors.iscsi.ISCSIConnector,
os_brick.initiator.connectors.fibre_channel.FibreChannelConnector,
)
class OnFailureRescheduleTask(flow_utils.CinderTask):
"""Triggers a rescheduling request to be sent when reverting occurs.
@ -470,6 +480,137 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
snapshot_id=snapshot_id)
return model_update
@staticmethod
def _setup_encryption_keys(context, volume, encryption):
"""Return encryption keys in passphrase form for a clone operation.
:param context: context
:param volume: volume being cloned
:param encryption: encryption info dict
:returns: tuple (source_pass, new_pass, new_key_id)
"""
keymgr = key_manager.API(CONF)
key = keymgr.get(context, encryption['encryption_key_id'])
source_pass = binascii.hexlify(key.get_encoded()).decode('utf-8')
new_key_id = volume_utils.create_encryption_key(context,
keymgr,
volume.volume_type_id)
new_key = keymgr.get(context, encryption['encryption_key_id'])
new_pass = binascii.hexlify(new_key.get_encoded()).decode('utf-8')
return (source_pass, new_pass, new_key_id)
def _rekey_volume(self, context, volume):
"""Change encryption key on volume.
:returns: model update dict
"""
LOG.debug('rekey volume %s', volume.name)
properties = utils.brick_get_connector_properties(False, False)
LOG.debug("properties: %s", properties)
attach_info = None
model_update = {}
new_key_id = None
try:
attach_info, volume = self.driver._attach_volume(context,
volume,
properties)
if not any(c for c in REKEY_SUPPORTED_CONNECTORS
if isinstance(attach_info['connector'], c)):
LOG.debug('skipping rekey, connector: %s',
attach_info['connector'])
raise exception.RekeyNotSupported()
LOG.debug("attempting attach for rekey, attach_info: %s",
attach_info)
if (isinstance(attach_info['device']['path'], six.string_types)):
image_info = image_utils.qemu_img_info(
attach_info['device']['path'])
else:
# Should not happen, just a safety check
LOG.error('%s appears to not be encrypted',
attach_info['device']['path'])
raise exception.RekeyNotSupported()
encryption = volume_utils.check_encryption_provider(
self.db,
volume,
context)
(source_pass, new_pass, new_key_id) = self._setup_encryption_keys(
context,
volume,
encryption)
if image_info.encrypted == 'yes':
key_str = source_pass + "\n" + new_pass + "\n"
del source_pass
(out, err) = utils.execute(
'cryptsetup',
'luksChangeKey',
attach_info['device']['path'],
run_as_root=True,
process_input=key_str,
log_errors=processutils.LOG_ALL_ERRORS)
del key_str
model_update = {'encryption_key_id': new_key_id}
else:
# volume has not been written to yet, format with luks
del source_pass
if image_info.file_format != 'raw':
# Something has gone wrong if the image is not encrypted
# and is detected as another format.
raise exception.Invalid()
if encryption['provider'] == 'luks':
# Force ambiguous "luks" provider to luks1 for
# compatibility with new versions of cryptsetup.
encryption['provider'] = 'luks1'
(out, err) = utils.execute(
'cryptsetup',
'--batch-mode',
'luksFormat',
'--type', encryption['provider'],
'--cipher', encryption['cipher'],
'--key-size', str(encryption['key_size']),
'--key-file=-',
attach_info['device']['path'],
run_as_root=True,
process_input=new_pass)
del new_pass
model_update = {'encryption_key_id': new_key_id}
except exception.RekeyNotSupported:
pass
except Exception:
with excutils.save_and_reraise_exception():
if new_key_id is not None:
# Remove newly cloned key since it will not be used.
volume_utils.delete_encryption_key(
context,
key_manager.API(CONF),
new_key_id)
finally:
if attach_info:
self.driver._detach_volume(context,
attach_info,
volume,
properties,
force=True)
return model_update
def _create_from_source_volume(self, context, volume, source_volid,
**kwargs):
# NOTE(harlowja): if the source volume has disappeared this will be our
@ -481,6 +622,11 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
srcvol_ref = objects.Volume.get_by_id(context, source_volid)
try:
model_update = self.driver.create_cloned_volume(volume, srcvol_ref)
if model_update is None:
model_update = {}
if volume.encryption_key_id is not None:
rekey_model_update = self._rekey_volume(context, volume)
model_update.update(rekey_model_update)
finally:
self._cleanup_cg_in_volume(volume)
# NOTE(harlowja): Subtasks would be useful here since after this

View File

@ -136,3 +136,5 @@ ploop: CommandFilter, ploop, root
# cinder/volume/drivers/quobyte.py
mount.quobyte: CommandFilter, mount.quobyte, root
umount.quobyte: CommandFilter, umount.quobyte, root
cryptsetup: CommandFilter, cryptsetup, root

View File

@ -0,0 +1,5 @@
---
features:
- |
When an encrypted volume is cloned, a new encryption key is generated for
the new volume. This is currently implemented only for iSCSI/FC backends.