Copy encryptors from Nova to os-brick

Currently, when creating an encrypted volume from an image, Cinder
writes raw data to the encrypted volume. When Cinder uploads an
encrypted volume to an image, it writes encrypted data to the image.
As a result, Nova cannot use these images or volumes.
To fix above problem, cinder needs to add encryptor attach/detach
layers.
As both Nova and Cinder needs to use the module, the fix is to
move it to os-brick.
It copies encryptors from Nova to os-brick, and keep all interfaces
unchanged except initialization.

Change-Id: I8044183ad02110c8b2468e20327d822c0437c772
Implements: blueprint improve-encrypted-volume
Related-bug: #1482464
Related-bug: #1465656
This commit is contained in:
LisaLi 2015-11-19 08:55:36 +00:00 committed by lisali
parent 6d71d55caf
commit 05827810ca
12 changed files with 937 additions and 0 deletions

View File

@ -0,0 +1,99 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_brick.encryptors import nop
from os_brick.i18n import _LE, _LW
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import strutils
LOG = logging.getLogger(__name__)
def get_volume_encryptor(root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
"""Creates a VolumeEncryptor used to encrypt the specified volume.
:param: the connection information used to attach the volume
:returns VolumeEncryptor: the VolumeEncryptor for the volume
"""
encryptor = nop.NoOpEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
location = kwargs.get('control_location', None)
if location and location.lower() == 'front-end': # case insensitive
provider = kwargs.get('provider')
if provider == 'LuksEncryptor' or 'LuksEncryptor' in provider:
provider = 'os_brick.encryptors.luks.LuksEncryptor'
elif (provider == 'CryptsetupEncryptor' or
'CryptsetupEncryptor' in provider):
provider = \
'os_brick.encryptors.cryptsetup.CryptsetupEncryptor'
elif (provider == 'NoOpEncryptor' or 'NoOpEncryptor' in provider):
provider = 'os_brick.encryptors.nop.NoOpEncryptor'
try:
encryptor = importutils.import_object(
provider,
root_helper,
connection_info,
keymgr,
execute,
**kwargs)
except Exception as e:
LOG.error(_LE("Error instantiating %(provider)s: %(exception)s"),
{'provider': provider, 'exception': e})
raise
msg = ("Using volume encryptor '%(encryptor)s' for connection: "
"%(connection_info)s" %
{'encryptor': encryptor, 'connection_info': connection_info})
LOG.debug(strutils.mask_password(msg))
return encryptor
def get_encryption_metadata(context, volume_api, volume_id, connection_info):
metadata = {}
if ('data' in connection_info and
connection_info['data'].get('encrypted', False)):
try:
metadata = volume_api.get_volume_encryption_metadata(context,
volume_id)
if not metadata:
LOG.warning(_LW(
'Volume %s should be encrypted but there is no '
'encryption metadata.'), volume_id)
except Exception as e:
LOG.error(_LE("Failed to retrieve encryption metadata for "
"volume %(volume_id)s: %(exception)s"),
{'volume_id': volume_id, 'exception': e})
raise
if metadata:
msg = ("Using volume encryption metadata '%(metadata)s' for "
"connection: %(connection_info)s" %
{'metadata': metadata, 'connection_info': connection_info})
LOG.debug(strutils.mask_password(msg))
return metadata

View File

@ -0,0 +1,65 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from os_brick import executor
from oslo_log import log as logging
import six
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class VolumeEncryptor(executor.Executor):
"""Base class to support encrypted volumes.
A VolumeEncryptor provides hooks for attaching and detaching volumes, which
are called immediately prior to attaching the volume to an instance and
immediately following detaching the volume from an instance. This class
performs no actions for either hook.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(VolumeEncryptor, self).__init__(root_helper,
execute=execute,
*args, **kwargs)
self._key_manager = keymgr
self.encryption_key_id = kwargs.get('encryption_key_id')
def _get_key(self, context):
"""Retrieves the encryption key for the specified volume.
:param: the connection information used to attach the volume
"""
return self._key_manager.get(context, self.encryption_key_id)
@abc.abstractmethod
def attach_volume(self, context, **kwargs):
"""Hook called immediately prior to attaching a volume to an instance.
"""
pass
@abc.abstractmethod
def detach_volume(self, **kwargs):
"""Hook called immediately after detaching a volume from an instance.
"""
pass

View File

@ -0,0 +1,124 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import os
from os_brick.encryptors import base
from os_brick import exception
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class CryptsetupEncryptor(base.VolumeEncryptor):
"""A VolumeEncryptor based on dm-crypt.
This VolumeEncryptor uses dm-crypt to encrypt the specified volume.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(CryptsetupEncryptor, self).__init__(
root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
# Fail if no device_path was set when connecting the volume, e.g. in
# the case of libvirt network volume drivers.
data = connection_info['data']
if not data.get('device_path'):
volume_id = data.get('volume_id') or connection_info.get('serial')
raise exception.VolumeEncryptionNotSupported(
volume_id=volume_id,
volume_type=connection_info['driver_volume_type'])
# the device's path as given to libvirt -- e.g., /dev/disk/by-path/...
self.symlink_path = connection_info['data']['device_path']
# a unique name for the volume -- e.g., the iSCSI participant name
self.dev_name = self.symlink_path.split('/')[-1]
# the device's actual path on the compute host -- e.g., /dev/sd_
self.dev_path = os.path.realpath(self.symlink_path)
def _get_passphrase(self, key):
"""Convert raw key to string."""
return binascii.hexlify(key).decode('utf-8')
def _open_volume(self, passphrase, **kwargs):
"""Opens the LUKS partition on the volume using the specified
passphrase.
:param passphrase: the passphrase used to access the volume
"""
LOG.debug("opening encrypted volume %s", self.dev_path)
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ["cryptsetup", "create", "--key-file=-"]
cipher = kwargs.get("cipher", None)
if cipher is not None:
cmd.extend(["--cipher", cipher])
key_size = kwargs.get("key_size", None)
if key_size is not None:
cmd.extend(["--key-size", key_size])
cmd.extend([self.dev_name, self.dev_path])
self._execute(*cmd, process_input=passphrase,
check_exit_code=True, run_as_root=True,
root_helper=self._root_helper)
def attach_volume(self, context, **kwargs):
"""Shadows the device and passes an unencrypted version to the
instance.
Transparent disk encryption is achieved by mounting the volume via
dm-crypt and passing the resulting device to the instance. The
instance is unaware of the underlying encryption due to modifying the
original symbolic link to refer to the device mounted by dm-crypt.
"""
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
self._open_volume(passphrase, **kwargs)
# modify the original symbolic link to refer to the decrypted device
self._execute('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self._root_helper,
run_as_root=True, check_exit_code=True)
def _close_volume(self, **kwargs):
"""Closes the device (effectively removes the dm-crypt mapping)."""
LOG.debug("closing encrypted volume %s", self.dev_path)
# cryptsetup returns 4 when attempting to destroy a non-active
# dm-crypt device. We are going to ignore this error code to make
# nova deleting that instance successfully.
self._execute('cryptsetup', 'remove', self.dev_name,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
def detach_volume(self, **kwargs):
"""Removes the dm-crypt mapping for the device."""
self._close_volume(**kwargs)

143
os_brick/encryptors/luks.py Normal file
View File

@ -0,0 +1,143 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_brick.encryptors import cryptsetup
from os_brick.i18n import _LI
from os_brick.i18n import _LW
from os_brick.privileged import rootwrap as priv_rootwrap
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def is_luks(root_helper, device, execute=None):
"""Checks if the specified device uses LUKS for encryption.
:param device: the device to check
:returns: true if the specified device uses LUKS; false otherwise
"""
try:
# check to see if the device uses LUKS: exit status is 0
# if the device is a LUKS partition and non-zero if not
if execute is None:
execute = priv_rootwrap.execute
execute('cryptsetup', 'isLuks', '--verbose', device,
run_as_root=True, root_helper=root_helper,
check_exit_code=True)
return True
except putils.ProcessExecutionError as e:
LOG.warning(_LW("isLuks exited abnormally (status %(exit_code)s): "
"%(stderr)s"),
{"exit_code": e.exit_code, "stderr": e.stderr})
return False
class LuksEncryptor(cryptsetup.CryptsetupEncryptor):
"""A VolumeEncryptor based on LUKS.
This VolumeEncryptor uses dm-crypt to encrypt the specified volume.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(LuksEncryptor, self).__init__(
root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
def _format_volume(self, passphrase, **kwargs):
"""Creates a LUKS header on the volume.
:param passphrase: the passphrase used to access the volume
"""
LOG.debug("formatting encrypted volume %s", self.dev_path)
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ["cryptsetup", "--batch-mode", "luksFormat", "--key-file=-"]
cipher = kwargs.get("cipher", None)
if cipher is not None:
cmd.extend(["--cipher", cipher])
key_size = kwargs.get("key_size", None)
if key_size is not None:
cmd.extend(["--key-size", key_size])
cmd.extend([self.dev_path])
self._execute(*cmd, process_input=passphrase,
check_exit_code=True, run_as_root=True,
root_helper=self._root_helper,
attempts=3)
def _open_volume(self, passphrase, **kwargs):
"""Opens the LUKS partition on the volume using the specified
passphrase.
:param passphrase: the passphrase used to access the volume
"""
LOG.debug("opening encrypted volume %s", self.dev_path)
self._execute('cryptsetup', 'luksOpen', '--key-file=-',
self.dev_path, self.dev_name, process_input=passphrase,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
def attach_volume(self, context, **kwargs):
"""Shadows the device and passes an unencrypted version to the
instance.
Transparent disk encryption is achieved by mounting the volume via
dm-crypt and passing the resulting device to the instance. The
instance is unaware of the underlying encryption due to modifying the
original symbolic link to refer to the device mounted by dm-crypt.
"""
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
try:
self._open_volume(passphrase, **kwargs)
except putils.ProcessExecutionError as e:
if e.exit_code == 1 and not is_luks(self._root_helper,
self.dev_path,
execute=self._execute):
# the device has never been formatted; format it and try again
LOG.info(_LI("%s is not a valid LUKS device;"
" formatting device for first use"),
self.dev_path)
self._format_volume(passphrase, **kwargs)
self._open_volume(passphrase, **kwargs)
else:
raise
# modify the original symbolic link to refer to the decrypted device
self._execute('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self._root_helper,
run_as_root=True, check_exit_code=True)
def _close_volume(self, **kwargs):
"""Closes the device (effectively removes the dm-crypt mapping)."""
LOG.debug("closing encrypted volume %s", self.dev_path)
self._execute('cryptsetup', 'luksClose', self.dev_name,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper,
attempts=3)

View File

@ -0,0 +1,47 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_brick.encryptors import base
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class NoOpEncryptor(base.VolumeEncryptor):
"""A VolumeEncryptor that does nothing.
This class exists solely to wrap regular (i.e., unencrypted) volumes so
that they do not require special handling with respect to an encrypted
volume. This implementation performs no action when a volume is attached
or detached.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(NoOpEncryptor, self).__init__(
root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
def attach_volume(self, context):
pass
def detach_volume(self):
pass

View File

@ -145,3 +145,8 @@ class VolumeDriverException(BrickException):
class InvalidIOHandleObject(BrickException):
message = _('IO handle of %(protocol)s has wrong object '
'type %(actual_type)s.')
class VolumeEncryptionNotSupported(Invalid):
message = _("Volume encryption is not supported for %(volume_type)s "
"volume %(volume_id)s.")

View File

View File

@ -0,0 +1,121 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import array
from castellan.tests.unit.key_manager import fake
import codecs
import mock
from os_brick import encryptors
from os_brick.tests import base
class VolumeEncryptorTestCase(base.TestCase):
def _create(self, root_helper, connection_info, keymgr, execute):
pass
def setUp(self):
super(VolumeEncryptorTestCase, self).setUp()
self.cmds = []
self.connection_info = {
"data": {
"device_path": "/dev/disk/by-path/"
"ip-192.0.2.0:3260-iscsi-iqn.2010-10.org.openstack"
":volume-fake_uuid-lun-1",
},
}
self.mock_execute = (
mock.patch("os_brick.privileged.rootwrap.execute").start())
self.addCleanup(self.mock_execute.stop)
_hex = codecs.getdecoder("hex_codec")('0' * 32)[0]
self.encryption_key = array.array('B', _hex).tolist()
self.root_helper = None
self.encryptor = self._create(root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute)
def test_get_encryptors(self):
root_helper = None
encryption = {'control_location': 'front-end',
'provider': 'LuksEncryptor'}
encryptor = encryptors.get_volume_encryptor(
root_helper=root_helper,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute,
**encryption)
self.assertIsInstance(encryptor,
encryptors.luks.LuksEncryptor,
"encryptor is not an instance of LuksEncryptor")
encryption = {'control_location': 'front-end',
'provider': 'CryptsetupEncryptor'}
encryptor = encryptors.get_volume_encryptor(
root_helper=root_helper,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute,
**encryption)
self.assertIsInstance(encryptor,
encryptors.cryptsetup.CryptsetupEncryptor,
"encryptor is not an instance of"
"CryptsetupEncryptor")
encryption = {'control_location': 'front-end',
'provider': 'NoOpEncryptor'}
encryptor = encryptors.get_volume_encryptor(
root_helper=root_helper,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute,
**encryption)
self.assertIsInstance(encryptor,
encryptors.nop.NoOpEncryptor,
"encryptor is not an instance of NoOpEncryptor")
def test_get_error_encryptos(self):
encryption = {'control_location': 'front-end',
'provider': 'ErrorEncryptor'}
self.assertRaises(ValueError,
encryptors.get_volume_encryptor,
root_helper=None,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute,
**encryption)
@mock.patch('os_brick.encryptors.LOG')
def test_error_log(self, log):
encryption = {'control_location': 'front-end',
'provider': 'TestEncryptor'}
provider = 'TestEncryptor'
try:
encryptors.get_volume_encryptor(
root_helper=None,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute,
**encryption)
except Exception as e:
log.error.assert_called_once_with("Error instantiating "
"%(provider)s: "
"%(exception)s",
{'provider': provider,
'exception': e})

View File

@ -0,0 +1,111 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import mock
import six
from castellan.common.objects import symmetric_key as key
from castellan.tests.unit.key_manager import fake
from os_brick.encryptors import cryptsetup
from os_brick import exception
from os_brick.tests.encryptors import test_base
def fake__get_key(context):
raw = bytes(binascii.unhexlify('0' * 32))
symmetric_key = key.SymmetricKey('AES', len(raw) * 8, raw)
return symmetric_key
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self, root_helper, connection_info, keymgr, execute):
return cryptsetup.CryptsetupEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute)
def setUp(self):
super(CryptsetupEncryptorTestCase, self).setUp()
self.dev_path = self.connection_info['data']['device_path']
self.dev_name = self.dev_path.split('/')[-1]
self.symlink_path = self.dev_path
def test__open_volume(self):
self.encryptor._open_volume("passphrase")
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input='passphrase',
run_as_root=True,
root_helper=self.root_helper,
check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
def test_attach_volume(self):
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None)
self.encryptor.attach_volume(None)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
self.assertEqual(2, self.mock_execute.call_count)
def test__close_volume(self):
self.encryptor.detach_volume()
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'remove', self.dev_name,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
def test_detach_volume(self):
self.encryptor.detach_volume()
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'remove', self.dev_name,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
def test_init_volume_encryption_not_supported(self):
# Tests that creating a CryptsetupEncryptor fails if there is no
# device_path key.
type = 'unencryptable'
data = dict(volume_id='a194699b-aa07-4433-a945-a5d23802043e')
connection_info = dict(driver_volume_type=type, data=data)
exc = self.assertRaises(exception.VolumeEncryptionNotSupported,
cryptsetup.CryptsetupEncryptor,
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api())
self.assertIn(type, six.text_type(exc))

View File

@ -0,0 +1,190 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from os_brick.encryptors import luks
from os_brick.tests.encryptors import test_cryptsetup
from oslo_concurrency import processutils as putils
class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
def _create(self, root_helper, connection_info, keymgr, execute):
return luks.LuksEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute)
def test_is_luks(self):
luks.is_luks(self.root_helper, self.dev_path)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
], any_order=False)
self.assertEqual(1, self.mock_execute.call_count)
@mock.patch('os_brick.encryptors.luks.LOG')
def test_is_luks_with_error(self, mock_log):
error_msg = "Device %s is not a valid LUKS device." % self.dev_path
self.mock_execute.side_effect = \
putils.ProcessExecutionError(exit_code=1,
stderr=error_msg)
luks.is_luks(self.root_helper, self.dev_path)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
self.assertEqual(1, mock_log.warning.call_count) # warning logged
def test_is_luks_with_execute(self):
mock_execute = mock.Mock()
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
])
def test__format_volume(self):
self.encryptor._format_volume("passphrase")
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--key-file=-', self.dev_path,
process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
])
self.assertEqual(1, self.mock_execute.call_count)
def test__open_volume(self):
self.encryptor._open_volume("passphrase")
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
def test_attach_volume(self):
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
self.encryptor.attach_volume(None)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
self.assertEqual(2, self.mock_execute.call_count)
def test_attach_volume_not_formatted(self):
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
self.mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
putils.ProcessExecutionError(exit_code=1), # isLuks
mock.DEFAULT, # luksFormat
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
]
self.encryptor.attach_volume(None)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--key-file=-', self.dev_path, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
self.assertEqual(5, self.mock_execute.call_count)
def test_attach_volume_fail(self):
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
self.mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
mock.DEFAULT, # isLuks
]
self.assertRaises(putils.ProcessExecutionError,
self.encryptor.attach_volume, None)
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
self.assertEqual(2, self.mock_execute.call_count)
def test__close_volume(self):
self.encryptor.detach_volume()
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)
def test_detach_volume(self):
self.encryptor.detach_volume()
self.mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=True),
])
self.assertEqual(1, self.mock_execute.call_count)

View File

@ -0,0 +1,31 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_brick.encryptors import nop
from os_brick.tests.encryptors import test_base
class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self, root_helper, connection_info, keymgr, execute):
return nop.NoOpEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute)
def test_attach_volume(self):
self.encryptor.attach_volume(None)
def test_detach_volume(self):
self.encryptor.detach_volume()

View File

@ -15,3 +15,4 @@ oslo.utils>=3.5.0 # Apache-2.0
requests!=2.9.0,>=2.8.1 # Apache-2.0
retrying!=1.3.0,>=1.2.3 # Apache-2.0
six>=1.9.0 # MIT
castellan>=0.4.0 # Apache-2.0