Merge "NFS encrypted volume support"

This commit is contained in:
Zuul
2020-06-26 17:40:30 +00:00
committed by Gerrit Code Review
7 changed files with 650 additions and 122 deletions

View File

@@ -18,6 +18,7 @@ import errno
import os
from unittest import mock
import castellan
import ddt
from oslo_utils import imageutils
from oslo_utils import units
@@ -28,6 +29,7 @@ from cinder.image import image_utils
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import test
from cinder.volume import configuration as conf
from cinder.volume.drivers import nfs
@@ -35,6 +37,11 @@ from cinder.volume.drivers import remotefs
from cinder.volume import volume_utils
class KeyObject(object):
def get_encoded(arg):
return "asdf".encode('utf-8')
class RemoteFsDriverTestCase(test.TestCase):
TEST_FILE_NAME = 'test.txt'
TEST_EXPORT = 'nas-host1:/export'
@@ -374,6 +381,57 @@ Format specific information:
corrupt: false
"""
QEMU_IMG_INFO_OUT5 = """image: volume-%(volid)s.%(snapid)s
file format: qcow2
virtual size: %(size_gb)sG (%(size_b)s bytes)
disk size: 196K
encrypted: yes
cluster_size: 65536
backing file: volume-%(volid)s
backing file format: raw
Format specific information:
compat: 1.1
lazy refcounts: false
refcount bits: 16
encrypt:
ivgen alg: plain64
hash alg: sha256
cipher alg: aes-256
uuid: 386f8626-33f0-4683-a517-78ddfe385e33
format: luks
cipher mode: xts
slots:
[0]:
active: true
iters: 1892498
key offset: 4096
stripes: 4000
[1]:
active: false
key offset: 262144
[2]:
active: false
key offset: 520192
[3]:
active: false
key offset: 778240
[4]:
active: false
key offset: 1036288
[5]:
active: false
key offset: 1294336
[6]:
active: false
key offset: 1552384
[7]:
active: false
key offset: 1810432
payload offset: 2068480
master key iters: 459347
corrupt: false
"""
@ddt.ddt
class NfsDriverTestCase(test.TestCase):
@@ -691,6 +749,17 @@ class NfsDriverTestCase(test.TestCase):
provider_location=loc,
size=size)
def _simple_encrypted_volume(self, size=10):
loc = self.TEST_NFS_EXPORT1
info_dic = {'name': u'volume-0000000a',
'id': '55555555-222f-4b32-b585-9991b3bf0a99',
'size': size,
'encryption_key_id': fake.ENCRYPTION_KEY_ID}
return fake_volume.fake_volume_obj(self.context,
provider_location=loc,
**info_dic)
def test_get_provisioned_capacity(self):
self._set_driver()
drv = self._driver
@@ -1124,14 +1193,15 @@ class NfsDriverTestCase(test.TestCase):
"""Case where the mount works the first time."""
self._set_driver()
self.mock_object(self._driver._remotefsclient, 'mount')
self.mock_object(self._driver._remotefsclient, 'mount', autospec=True)
drv = self._driver
drv.configuration.nfs_mount_attempts = 3
drv.shares = {self.TEST_NFS_EXPORT1: ''}
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
drv._remotefsclient.mount.called_once()
drv._remotefsclient.mount.assert_called_once_with(
self.TEST_NFS_EXPORT1, [])
@mock.patch('time.sleep')
def test_ensure_share_mounted_exception(self, _mock_sleep):
@@ -1169,16 +1239,48 @@ class NfsDriverTestCase(test.TestCase):
self.assertEqual(min_num_attempts,
drv._remotefsclient.mount.call_count)
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT3],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT4],
[NFS_CONFIG3, QEMU_IMG_INFO_OUT3],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT4])
@mock.patch('tempfile.NamedTemporaryFile')
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT3, False],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT4, False],
[NFS_CONFIG3, QEMU_IMG_INFO_OUT3, False],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT4, False],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT5, True])
@ddt.unpack
def test_copy_volume_from_snapshot(self, nfs_conf, qemu_img_info):
def test_copy_volume_from_snapshot(self, nfs_conf, qemu_img_info,
encryption, mock_temp_file):
class DictObj(object):
# convert a dict to object w/ attributes
def __init__(self, d):
self.__dict__ = d
self._set_driver(extra_confs=nfs_conf)
drv = self._driver
dest_volume = self._simple_volume()
src_volume = self._simple_volume()
src_encryption_key_id = None
dest_encryption_key_id = None
if encryption:
mock_temp_file.return_value.__enter__.side_effect = [
DictObj({'name': '/tmp/imgfile'}),
DictObj({'name': '/tmp/passfile'})]
dest_volume = self._simple_encrypted_volume()
src_volume = self._simple_encrypted_volume()
key_mgr = fake_keymgr.fake_api()
self.mock_object(castellan.key_manager, 'API',
return_value=key_mgr)
key_id = key_mgr.store(self.context, KeyObject())
src_volume.encryption_key_id = key_id
dest_volume.encryption_key_id = key_id
src_encryption_key_id = src_volume.encryption_key_id
dest_encryption_key_id = dest_volume.encryption_key_id
else:
dest_volume = self._simple_volume()
src_volume = self._simple_volume()
fake_snap = fake_snapshot.fake_snapshot_obj(self.context)
fake_snap.volume = src_volume
@@ -1209,16 +1311,25 @@ class NfsDriverTestCase(test.TestCase):
mock_permission = self.mock_object(drv, '_set_rw_permissions_for_all')
drv._copy_volume_from_snapshot(fake_snap, dest_volume, size)
drv._copy_volume_from_snapshot(fake_snap, dest_volume, size,
src_encryption_key_id,
dest_encryption_key_id)
mock_read_info_file.assert_called_once_with(info_path)
mock_img_info.assert_called_once_with(snap_path,
force_share=True,
run_as_root=True)
used_qcow = nfs_conf['nfs_qcow2_volumes']
mock_convert_image.assert_called_once_with(
src_vol_path, dest_vol_path, 'qcow2' if used_qcow else 'raw',
run_as_root=True)
if encryption:
mock_convert_image.assert_called_once_with(
src_vol_path, dest_vol_path, 'luks',
passphrase_file='/tmp/passfile',
run_as_root=True,
src_passphrase_file='/tmp/imgfile')
else:
mock_convert_image.assert_called_once_with(
src_vol_path, dest_vol_path, 'qcow2' if used_qcow else 'raw',
run_as_root=True)
mock_permission.assert_called_once_with(dest_vol_path)
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT3],

View File

@@ -19,20 +19,28 @@ import re
import sys
from unittest import mock
import castellan
import ddt
from cinder import context
from cinder import exception
from cinder.image import image_utils
from cinder.objects import fields
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import test
from cinder import utils
from cinder.volume.drivers import remotefs
from cinder.volume import volume_utils
class KeyObject(object):
def get_encoded(arg):
return "asdf".encode('utf-8')
@ddt.ddt
class RemoteFsSnapDriverTestCase(test.TestCase):
@@ -56,6 +64,20 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
self._fake_snapshot.id)
self._fake_snapshot.volume = self._fake_volume
# Encrypted volume and snapshot
self.volume_c = fake_volume.fake_volume_obj(
self.context,
**{'name': u'volume-0000000a',
'id': '55555555-222f-4b32-b585-9991b3bf0a99',
'size': 12,
'encryption_key_id': fake.ENCRYPTION_KEY_ID})
self._fake_snap_c = fake_snapshot.fake_snapshot_obj(self.context)
self._fake_snap_c.volume = self.volume_c
self.volume_c_path = os.path.join(self._FAKE_MNT_POINT,
self._fake_snap_c.name)
self._fake_snap_c_path = (self.volume_c_path + '.' +
self._fake_snap_c.id)
@ddt.data({'current_state': 'in-use',
'acceptable_states': ['available', 'in-use']},
{'current_state': 'in-use',
@@ -75,19 +97,43 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
def _test_delete_snapshot(self, volume_in_use=False,
stale_snapshot=False,
is_active_image=True,
is_tmp_snap=False):
is_tmp_snap=False,
encryption=False):
# If the snapshot is not the active image, it is guaranteed that
# another snapshot exists having it as backing file.
fake_snapshot_name = os.path.basename(self._fake_snapshot_path)
fake_info = {'active': fake_snapshot_name,
self._fake_snapshot.id: fake_snapshot_name}
fake_upper_snap_id = 'fake_upper_snap_id'
if encryption:
fake_snapshot_name = os.path.basename(self._fake_snap_c_path)
fake_info = {'active': fake_snapshot_name,
self._fake_snap_c.id: fake_snapshot_name}
expected_info = fake_info
fake_upper_snap_path = (
self.volume_c_path + '-snapshot' + fake_upper_snap_id)
snapshot = self._fake_snap_c
snapshot_path = self._fake_snap_c_path
volume_name = self.volume_c.name
else:
fake_snapshot_name = os.path.basename(self._fake_snapshot_path)
fake_info = {'active': fake_snapshot_name,
self._fake_snapshot.id: fake_snapshot_name}
expected_info = fake_info
fake_upper_snap_path = (
self._fake_volume_path + '-snapshot' + fake_upper_snap_id)
snapshot = self._fake_snapshot
snapshot_path = self._fake_snapshot_path
volume_name = self._fake_volume.name
fake_snap_img_info = mock.Mock()
fake_base_img_info = mock.Mock()
if stale_snapshot:
fake_snap_img_info.backing_file = None
else:
fake_snap_img_info.backing_file = self._fake_volume.name
fake_snap_img_info.backing_file = volume_name
fake_snap_img_info.file_format = 'qcow2'
fake_base_img_info.backing_file = None
fake_base_img_info.file_format = 'raw'
@@ -107,61 +153,53 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
self._driver._delete_stale_snapshot = mock.Mock()
self._driver._delete_snapshot_online = mock.Mock()
expected_info = {
'active': fake_snapshot_name,
self._fake_snapshot.id: fake_snapshot_name
}
exp_acceptable_states = ['available', 'in-use', 'backing-up',
'deleting', 'downloading']
if volume_in_use:
self._fake_snapshot.volume.status = 'backing-up'
self._fake_snapshot.volume.attach_status = 'attached'
snapshot.volume.status = 'backing-up'
snapshot.volume.attach_status = 'attached'
self._driver._read_info_file.return_value = fake_info
self._driver._delete_snapshot(self._fake_snapshot)
self._driver._delete_snapshot(snapshot)
self._driver._validate_state.assert_called_once_with(
self._fake_snapshot.volume.status,
snapshot.volume.status,
exp_acceptable_states)
if stale_snapshot:
self._driver._delete_stale_snapshot.assert_called_once_with(
self._fake_snapshot)
snapshot)
else:
expected_online_delete_info = {
'active_file': fake_snapshot_name,
'snapshot_file': fake_snapshot_name,
'base_file': self._fake_volume.name,
'base_file': volume_name,
'base_id': None,
'new_base_file': None
}
self._driver._delete_snapshot_online.assert_called_once_with(
self.context, self._fake_snapshot,
self.context, snapshot,
expected_online_delete_info)
elif is_active_image:
self._driver._read_info_file.return_value = fake_info
self._driver._delete_snapshot(self._fake_snapshot)
self._driver._delete_snapshot(snapshot)
self._driver._img_commit.assert_called_once_with(
self._fake_snapshot_path)
self.assertNotIn(self._fake_snapshot.id, fake_info)
snapshot_path)
self.assertNotIn(snapshot.id, fake_info)
self._driver._write_info_file.assert_called_once_with(
mock.sentinel.fake_info_path, fake_info)
else:
fake_upper_snap_id = 'fake_upper_snap_id'
fake_upper_snap_path = (
self._fake_volume_path + '-snapshot' + fake_upper_snap_id)
fake_upper_snap_name = os.path.basename(fake_upper_snap_path)
fake_backing_chain = [
{'filename': fake_upper_snap_name,
'backing-filename': fake_snapshot_name},
{'filename': fake_snapshot_name,
'backing-filename': self._fake_volume.name},
{'filename': self._fake_volume.name,
'backing-filename': volume_name},
{'filename': volume_name,
'backing-filename': None}]
fake_info[fake_upper_snap_id] = fake_upper_snap_name
@@ -169,42 +207,62 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
fake_info['active'] = fake_upper_snap_name
expected_info = copy.deepcopy(fake_info)
del expected_info[self._fake_snapshot.id]
del expected_info[snapshot.id]
self._driver._read_info_file.return_value = fake_info
self._driver._get_backing_chain_for_path = mock.Mock(
return_value=fake_backing_chain)
self._driver._delete_snapshot(self._fake_snapshot)
self._driver._delete_snapshot(snapshot)
self._driver._img_commit.assert_called_once_with(
self._fake_snapshot_path)
snapshot_path)
self._driver._rebase_img.assert_called_once_with(
fake_upper_snap_path, self._fake_volume.name,
fake_upper_snap_path, volume_name,
fake_base_img_info.file_format)
self._driver._write_info_file.assert_called_once_with(
mock.sentinel.fake_info_path, expected_info)
def test_delete_snapshot_when_active_file(self):
self._test_delete_snapshot()
@ddt.data({'encryption': True}, {'encryption': False})
def test_delete_snapshot_when_active_file(self, encryption):
self._test_delete_snapshot(encryption=encryption)
def test_delete_snapshot_in_use(self):
self._test_delete_snapshot(volume_in_use=True)
def test_delete_snapshot_in_use_stale_snapshot(self):
@ddt.data({'encryption': True}, {'encryption': False})
def test_delete_snapshot_in_use(self, encryption):
self._test_delete_snapshot(volume_in_use=True,
stale_snapshot=True)
encryption=encryption)
def test_delete_snapshot_with_one_upper_file(self):
self._test_delete_snapshot(is_active_image=False)
@ddt.data({'encryption': True}, {'encryption': False})
def test_delete_snapshot_in_use_stale_snapshot(self,
encryption):
self._test_delete_snapshot(volume_in_use=True,
stale_snapshot=True,
encryption=encryption)
@ddt.data({'encryption': True}, {'encryption': False})
def test_delete_snapshot_with_one_upper_file(self,
encryption):
self._test_delete_snapshot(is_active_image=False,
encryption=encryption)
@ddt.data({'encryption': True}, {'encryption': False})
def test_delete_stale_snapshot(self, encryption):
if encryption:
fake_snapshot_name = os.path.basename(self._fake_snap_c_path)
volume_name = self.volume_c.name
snapshot = self._fake_snap_c
snapshot_path = self._fake_snap_c_path
else:
fake_snapshot_name = os.path.basename(self._fake_snapshot_path)
volume_name = self._fake_volume.name
snapshot = self._fake_snapshot
snapshot_path = self._fake_snapshot_path
def test_delete_stale_snapshot(self):
fake_snapshot_name = os.path.basename(self._fake_snapshot_path)
fake_snap_info = {
'active': self._fake_volume.name,
self._fake_snapshot.id: fake_snapshot_name
'active': volume_name,
snapshot.id: fake_snapshot_name
}
expected_info = {'active': self._fake_volume.name}
expected_info = {'active': volume_name}
self._driver._local_path_volume_info = mock.Mock(
return_value=mock.sentinel.fake_info_path)
@@ -214,9 +272,9 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
return_value=self._FAKE_MNT_POINT)
self._driver._write_info_file = mock.Mock()
self._driver._delete_stale_snapshot(self._fake_snapshot)
self._driver._delete_stale_snapshot(snapshot)
self._driver._delete.assert_called_once_with(self._fake_snapshot_path)
self._driver._delete.assert_called_once_with(snapshot_path)
self._driver._write_info_file.assert_called_once_with(
mock.sentinel.fake_info_path, expected_info)
@@ -256,9 +314,20 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
mock.call(*command3, run_as_root=True)]
self._driver._execute.assert_has_calls(calls)
def _test_create_snapshot(self, volume_in_use=False, tmp_snap=False):
def _test_create_snapshot(self, volume_in_use=False, tmp_snap=False,
encryption=False):
fake_snapshot_info = {}
fake_snapshot_file_name = os.path.basename(self._fake_snapshot_path)
if encryption:
fake_snapshot_file_name = os.path.basename(self._fake_snap_c_path)
volume_name = self.volume_c.name
snapshot = self._fake_snap_c
snapshot_path = self._fake_snap_c_path
else:
fake_snapshot_file_name = os.path.basename(
self._fake_snapshot_path)
volume_name = self._fake_volume.name
snapshot = self._fake_snapshot
snapshot_path = self._fake_snapshot_path
self._driver._local_path_volume_info = mock.Mock(
return_value=mock.sentinel.fake_info_path)
@@ -268,14 +337,14 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
self._driver._create_snapshot_online = mock.Mock()
self._driver._write_info_file = mock.Mock()
self._driver.get_active_image_from_info = mock.Mock(
return_value=self._fake_volume.name)
return_value=volume_name)
self._driver._get_new_snap_path = mock.Mock(
return_value=self._fake_snapshot_path)
return_value=snapshot_path)
self._driver._validate_state = mock.Mock()
expected_snapshot_info = {
'active': fake_snapshot_file_name,
self._fake_snapshot.id: fake_snapshot_file_name
snapshot.id: fake_snapshot_file_name
}
exp_acceptable_states = ['available', 'in-use', 'backing-up']
if tmp_snap:
@@ -285,31 +354,34 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
self._fake_snapshot.display_name = display_name
if volume_in_use:
self._fake_snapshot.volume.status = 'backing-up'
self._fake_snapshot.volume.attach_status = 'attached'
snapshot.volume.status = 'backing-up'
snapshot.volume.attach_status = 'attached'
expected_method_called = '_create_snapshot_online'
else:
self._fake_snapshot.volume.status = 'available'
snapshot.volume.status = 'available'
expected_method_called = '_do_create_snapshot'
self._driver._create_snapshot(self._fake_snapshot)
self._driver._create_snapshot(snapshot)
self._driver._validate_state.assert_called_once_with(
self._fake_snapshot.volume.status,
snapshot.volume.status,
exp_acceptable_states)
fake_method = getattr(self._driver, expected_method_called)
fake_method.assert_called_with(
self._fake_snapshot, self._fake_volume.name,
self._fake_snapshot_path)
snapshot, volume_name,
snapshot_path)
self._driver._write_info_file.assert_called_with(
mock.sentinel.fake_info_path,
expected_snapshot_info)
def test_create_snapshot_volume_available(self):
self._test_create_snapshot()
@ddt.data({'encryption': True}, {'encryption': False})
def test_create_snapshot_volume_available(self, encryption):
self._test_create_snapshot(encryption=encryption)
def test_create_snapshot_volume_in_use(self):
self._test_create_snapshot(volume_in_use=True)
@ddt.data({'encryption': True}, {'encryption': False})
def test_create_snapshot_volume_in_use(self, encryption):
self._test_create_snapshot(volume_in_use=True,
encryption=encryption)
def test_create_snapshot_invalid_volume(self):
self._fake_snapshot.volume.status = 'error'
@@ -624,14 +696,15 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
src_vref = fake_volume.fake_volume_obj(
self.context,
id=src_vref_id,
name='volume-%s' % src_vref_id)
name='volume-%s' % src_vref_id,
obj_context=self.context)
src_vref.context = self.context
mock_snapshots_exist.return_value = snapshots_exist
drv._always_use_temp_snap_when_cloning = force_temp_snap
vol_attrs = ['provider_location', 'size', 'id', 'name', 'status',
'volume_type', 'metadata']
'volume_type', 'metadata', 'obj_context']
Volume = collections.namedtuple('Volume', vol_attrs)
volume_ref = Volume(id=volume.id,
@@ -640,7 +713,8 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
provider_location=volume.provider_location,
status=volume.status,
size=volume.size,
volume_type=volume.volume_type,)
volume_type=volume.volume_type,
obj_context=self.context,)
snap_args_creation = {
'volume_id': src_vref.id,
@@ -679,7 +753,8 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
mock_create_snapshot.assert_called_once_with(
mock_obj_snap.return_value)
mock_copy_volume_from_snapshot.assert_called_once_with(
mock_obj_snap.return_value, volume_ref, volume['size'])
mock_obj_snap.return_value, volume_ref, volume['size'],
src_encryption_key_id=None, new_encryption_key_id=None)
mock_delete_snapshot.called_once_with(snap_args_deletion)
else:
self.assertFalse(mock_create_snapshot.called)
@@ -693,6 +768,47 @@ class RemoteFsSnapDriverTestCase(test.TestCase):
[mock.call(src_vref), mock.call(volume_ref)])
mock_extend_volume.assert_called_once_with(volume_ref, volume.size)
@mock.patch('tempfile.NamedTemporaryFile')
@mock.patch('cinder.volume.volume_utils.check_encryption_provider',
return_value={'encryption_key_id': fake.ENCRYPTION_KEY_ID})
def test_create_encrypted_volume(self,
mock_check_enc_prov,
mock_temp_file):
class DictObj(object):
# convert a dict to object w/ attributes
def __init__(self, d):
self.__dict__ = d
drv = self._driver
mock_temp_file.return_value.__enter__.side_effect = [
DictObj({'name': '/imgfile'}),
DictObj({'name': '/passfile'})]
key_mgr = fake_keymgr.fake_api()
self.mock_object(castellan.key_manager, 'API', return_value=key_mgr)
key_id = key_mgr.store(self.context, KeyObject())
self.volume_c.encryption_key_id = key_id
enc_info = {'encryption_key_id': key_id,
'cipher': 'aes-xts-essiv',
'key_size': 256}
remotefs_path = 'cinder.volume.drivers.remotefs.open'
with mock.patch('cinder.volume.volume_utils.check_encryption_provider',
return_value=enc_info), \
mock.patch(remotefs_path) as mock_open, \
mock.patch.object(drv, '_execute') as mock_exec:
drv._create_encrypted_volume_file("/passfile",
self.volume_c.size,
enc_info,
self.context)
mock_open.assert_called_with('/imgfile', 'w')
mock_exec.assert_called()
@mock.patch('shutil.copyfile')
@mock.patch.object(remotefs.RemoteFSSnapDriver, '_set_rw_permissions')
def test_copy_volume_image(self, mock_set_perm, mock_copyfile):