privsep: Move luks utils to privsep

Our first target for privsep'ification. With privsep in charge of
everything, there's no longer any reason to provide a rootwrap helper or
any way (or reason) to allow a custom executor. As such, the
'root_helper' and 'execute' arguments of the three in-tree encryptors
are deprecated for removal. This makes things somewhat more complex than
they would otherwise be but it's the only obvious way to get to a world
where we don't force users to specify useless parameters.

Change-Id: I8bac9f9a9a84d72fafff38cb3e92e0b23cc85a8f
Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
This commit is contained in:
Stephen Finucane 2021-05-14 11:12:48 +01:00 committed by Stephen Finucane
parent e6ad5ac7d6
commit 3369d168f4
15 changed files with 1349 additions and 591 deletions

View File

@ -20,6 +20,9 @@ os_brick/initiator/linuxfc.py
os_brick/initiator/linuxrbd.py
os_brick/initiator/utils.py
os_brick/local_dev/lvm.py
os_brick/privileged/cryptsetup.py
os_brick/privileged/luks.py
os_brick/privileged/nvmeof.py
os_brick/privileged/rbd.py
os_brick/remotefs/remotefs.py
os_brick/utils.py

View File

@ -17,6 +17,7 @@ from __future__ import annotations
from typing import Any
import debtcollector
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import strutils
@ -33,7 +34,7 @@ PLAIN = "plain"
FORMAT_TO_FRONTEND_ENCRYPTOR_MAP = {
LUKS: 'os_brick.encryptors.luks.LuksEncryptor',
LUKS2: 'os_brick.encryptors.luks.Luks2Encryptor',
PLAIN: 'os_brick.encryptors.cryptsetup.CryptsetupEncryptor'
PLAIN: 'os_brick.encryptors.cryptsetup.CryptsetupEncryptor',
}
LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP = {
@ -48,22 +49,46 @@ LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP = {
"NoOpEncryptor": None,
}
_NO_ARG_SENTINEL = object()
def get_volume_encryptor(root_helper: str,
connection_info: dict[str, Any],
keymgr,
execute=None,
*args, **kwargs) -> base.VolumeEncryptor:
def get_volume_encryptor(
root_helper=_NO_ARG_SENTINEL,
connection_info=_NO_ARG_SENTINEL,
keymgr=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
*args,
**kwargs,
) -> base.VolumeEncryptor:
"""Creates a VolumeEncryptor used to encrypt the specified volume.
:param: the connection information used to attach the volume
:returns VolumeEncryptor: the VolumeEncryptor for the volume
"""
encryptor = nop.NoOpEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if connection_info == _NO_ARG_SENTINEL:
raise TypeError('missing connection_info')
if keymgr == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
encryptor = nop.NoOpEncryptor(
connection_info=connection_info,
keymgr=keymgr,
*args,
**kwargs,
)
location = kwargs.get('control_location', None)
if location and location.lower() == 'front-end': # case insensitive
@ -73,10 +98,13 @@ def get_volume_encryptor(root_helper: str,
# ERROR if provider is not a key in SUPPORTED_ENCRYPTION_PROVIDERS.
# Until then continue to allow both the class name and path to be used.
if provider in LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
LOG.warning("Use of the in tree encryptor class %(provider)s"
" by directly referencing the implementation class"
" will be blocked in the Queens release of"
" os-brick.", {'provider': provider})
LOG.warning(
"Use of the in tree encryptor class %(provider)s"
" by directly referencing the implementation class"
" will be blocked in the Queens release of"
" os-brick.",
{'provider': provider},
)
provider = LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
if provider in FORMAT_TO_FRONTEND_ENCRYPTOR_MAP:
@ -84,26 +112,32 @@ def get_volume_encryptor(root_helper: str,
elif provider is None:
provider = "os_brick.encryptors.nop.NoOpEncryptor"
else:
LOG.warning("Use of the out of tree encryptor class "
"%(provider)s will be blocked with the Queens "
"release of os-brick.", {'provider': provider})
LOG.warning(
"Use of the out of tree encryptor class "
"%(provider)s will be blocked with the Queens "
"release of os-brick.",
{'provider': provider},
)
try:
encryptor = importutils.import_object(
provider,
root_helper,
connection_info,
keymgr,
execute,
**kwargs)
connection_info=connection_info,
keymgr=keymgr,
**kwargs,
)
except Exception as e:
LOG.error("Error instantiating %(provider)s: %(exception)s",
{'provider': provider, 'exception': e})
LOG.error(
"Error instantiating %(provider)s: %(exception)s",
{'provider': provider, 'exception': e},
)
raise
msg = ("Using volume encryptor '%(encryptor)s' for connection: "
"%(connection_info)s" %
{'encryptor': encryptor, 'connection_info': connection_info})
msg = (
"Using volume encryptor '%(encryptor)s' for connection: "
"%(connection_info)s"
% {'encryptor': encryptor, 'connection_info': connection_info}
)
LOG.debug(strutils.mask_password(msg))
return encryptor
@ -114,24 +148,33 @@ def get_encryption_metadata(context,
volume_id: str,
connection_info: dict[str, Any]) -> dict[str, Any]:
metadata = {}
if ('data' in connection_info and
connection_info['data'].get('encrypted', False)):
if 'data' in connection_info and connection_info['data'].get(
'encrypted', False
):
try:
metadata = volume_api.get_volume_encryption_metadata(context,
volume_id)
metadata = volume_api.get_volume_encryption_metadata(
context, volume_id
)
if not metadata:
LOG.warning('Volume %s should be encrypted but there is no '
'encryption metadata.', volume_id)
LOG.warning(
'Volume %s should be encrypted but there is no '
'encryption metadata.',
volume_id,
)
except Exception as e:
LOG.error("Failed to retrieve encryption metadata for "
"volume %(volume_id)s: %(exception)s",
{'volume_id': volume_id, 'exception': e})
LOG.error(
"Failed to retrieve encryption metadata for "
"volume %(volume_id)s: %(exception)s",
{'volume_id': volume_id, 'exception': e},
)
raise
if metadata:
msg = ("Using volume encryption metadata '%(metadata)s' for "
"connection: %(connection_info)s" %
{'metadata': metadata, 'connection_info': connection_info})
msg = (
"Using volume encryption metadata '%(metadata)s' for "
"connection: %(connection_info)s"
% {'metadata': metadata, 'connection_info': connection_info}
)
LOG.debug(strutils.mask_password(msg))
return metadata

View File

@ -13,13 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
from os_brick import executor
class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
class VolumeEncryptor(metaclass=abc.ABCMeta):
"""Base class to support encrypted volumes.
A VolumeEncryptor provides hooks for attaching and detaching volumes, which
@ -28,14 +25,7 @@ class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
performs no actions for either hook.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(VolumeEncryptor, self).__init__(root_helper,
execute=execute,
*args, **kwargs)
def __init__(self, connection_info, keymgr, *args, **kwargs):
self._key_manager = keymgr
self.encryption_key_id = kwargs.get('encryption_key_id')
@ -49,16 +39,12 @@ class VolumeEncryptor(executor.Executor, metaclass=abc.ABCMeta):
@abc.abstractmethod
def attach_volume(self, context, **kwargs):
"""Hook called immediately prior to attaching a volume to an instance.
"""
"""Hook called immediately before attaching a volume to an instance"""
pass
@abc.abstractmethod
def detach_volume(self, **kwargs):
"""Hook called immediately after detaching a volume from an instance.
"""
"""Hook called immediately after detaching a volume from an instance"""
pass
@abc.abstractmethod

View File

@ -16,33 +16,58 @@
import binascii
import os
from oslo_concurrency import processutils
import debtcollector
from oslo_log import log as logging
from oslo_log import versionutils
from os_brick.encryptors import base
from os_brick import exception
import os_brick.privileged.cryptsetup
LOG = logging.getLogger(__name__)
_NO_ARG_SENTINEL = object()
# TODO(stephenfin): Remove this entirely in AA or later
class CryptsetupEncryptor(base.VolumeEncryptor):
"""A VolumeEncryptor based on dm-crypt.
This VolumeEncryptor uses dm-crypt to encrypt the specified volume.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(CryptsetupEncryptor, self).__init__(
root_helper=root_helper,
def __init__(
self,
root_helper=_NO_ARG_SENTINEL,
connection_info=_NO_ARG_SENTINEL,
keymgr=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
*args,
**kwargs,
):
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if connection_info == _NO_ARG_SENTINEL:
raise TypeError('missing connection_info')
if keymgr == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
super().__init__(
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
*args,
**kwargs,
)
# Fail if no device_path was set when connecting the volume, e.g. in
# the case of libvirt network volume drivers.
@ -51,7 +76,8 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
volume_id = data.get('volume_id') or connection_info.get('serial')
raise exception.VolumeEncryptionNotSupported(
volume_id=volume_id,
volume_type=connection_info['driver_volume_type'])
volume_type=connection_info['driver_volume_type'],
)
# the device's path as given to libvirt -- e.g., /dev/disk/by-path/...
self.symlink_path = connection_info['data']['device_path']
@ -66,12 +92,12 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
# old name when the encrypted volume already exists.
old_dev_name = os.path.basename(self.symlink_path)
wwn = data.get('multipath_id')
if self._is_crypt_device_available(old_dev_name):
if os_brick.privileged.cryptsetup.is_available(old_dev_name):
self.dev_name = old_dev_name
LOG.debug("Using old encrypted volume name: %s", self.dev_name)
elif wwn and wwn != old_dev_name:
# FibreChannel device could be named '/dev/mapper/<WWN>'.
if self._is_crypt_device_available(wwn):
if os_brick.privileged.cryptsetup.is_available(wwn):
self.dev_name = wwn
LOG.debug("Using encrypted volume name from wwn: %s",
self.dev_name)
@ -79,24 +105,6 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
# the device's actual path on the compute host -- e.g., /dev/sd_
self.dev_path = os.path.realpath(self.symlink_path)
def _is_crypt_device_available(self, dev_name):
if not os.path.exists('/dev/mapper/%s' % dev_name):
return False
try:
self._execute('cryptsetup', 'status', dev_name, run_as_root=True)
except processutils.ProcessExecutionError as e:
# If /dev/mapper/<dev_name> is a non-crypt block device (such as a
# normal disk or multipath device), exit_code will be 1. In the
# case, we will omit the warning message.
if e.exit_code != 1:
LOG.warning('cryptsetup status %(dev_name)s exited '
'abnormally (status %(exit_code)s): %(err)s',
{"dev_name": dev_name, "exit_code": e.exit_code,
"err": e.stderr})
return False
return True
def _get_passphrase(self, key):
"""Convert raw key to string."""
return binascii.hexlify(key).decode('utf-8')
@ -108,23 +116,17 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
"""
LOG.debug("opening encrypted volume %s", self.dev_path)
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ["cryptsetup", "create", "--key-file=-"]
# TODO(stephenfin): Make these named kwargs
cipher = kwargs.get('cipher', None)
key_size = kwargs.get('key_size', None)
cipher = kwargs.get("cipher", None)
if cipher is not None:
cmd.extend(["--cipher", cipher])
key_size = kwargs.get("key_size", None)
if key_size is not None:
cmd.extend(["--key-size", key_size])
cmd.extend([self.dev_name, self.dev_path])
self._execute(*cmd, process_input=passphrase,
check_exit_code=True, run_as_root=True,
root_helper=self._root_helper)
os_brick.privileged.cryptsetup.open_volume(
self.dev_path,
self.dev_name,
passphrase,
cipher=cipher,
key_size=key_size,
)
def attach_volume(self, context, **kwargs):
"""Shadow the device and pass an unencrypted version to the instance.
@ -142,29 +144,23 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
"in a future release. Existing users are encouraged to retype "
"any existing volumes using this encryptor to the 'luks' "
"LuksEncryptor or 'luks2' Luks2Encryptor encryptors as soon as "
"possible.")
"possible.",
)
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
self._open_volume(passphrase, **kwargs)
# modify the original symbolic link to refer to the decrypted device
self._execute('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self._root_helper,
run_as_root=True, check_exit_code=True)
os_brick.privileged.cryptsetup.replace_device_mapping(
self.dev_name,
self.symlink_path,
)
def _close_volume(self, **kwargs):
"""Closes the device (effectively removes the dm-crypt mapping)."""
LOG.debug("closing encrypted volume %s", self.dev_path)
# NOTE(mdbooth): remove will return 4 (wrong device specified) if
# the device doesn't exist. We assume here that the caller hasn't
# specified the wrong device, and that it doesn't exist because it
# isn't open. We don't fail in this case in order to make this
# operation idempotent.
self._execute('cryptsetup', 'remove', self.dev_name,
run_as_root=True, check_exit_code=[0, 4],
root_helper=self._root_helper)
os_brick.privileged.cryptsetup.close_volume(self.dev_name)
def detach_volume(self, **kwargs):
"""Removes the dm-crypt mapping for the device."""

View File

@ -16,37 +16,49 @@
import binascii
import os
import debtcollector
from oslo_concurrency import processutils
from oslo_log import log as logging
from os_brick.encryptors import base
from os_brick import exception
from os_brick.privileged import rootwrap as priv_rootwrap
import os_brick.privileged.luks
from os_brick import utils
LOG = logging.getLogger(__name__)
_NO_ARG_SENTINEL = object()
def is_luks(root_helper, device, execute=None):
# TODO(stephenfin): Remove root_helper, execute in AA or later
# NOTE(stephenfin): This is part of our public API
def is_luks(
root_helper=_NO_ARG_SENTINEL,
device=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
):
"""Checks if the specified device uses LUKS for encryption.
:param root_helper: **DEPRECATED**
:param device: the device to check
:param execute: **DEPRECATED**
:returns: true if the specified device uses LUKS; false otherwise
"""
try:
# check to see if the device uses LUKS: exit status is 0
# if the device is a LUKS partition and non-zero if not
if execute is None:
execute = priv_rootwrap.execute
execute('cryptsetup', 'isLuks', '--verbose', device,
run_as_root=True, root_helper=root_helper,
check_exit_code=True)
return True
except processutils.ProcessExecutionError as e:
LOG.warning("isLuks exited abnormally (status %(exit_code)s): "
"%(stderr)s",
{"exit_code": e.exit_code, "stderr": e.stderr})
return False
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if device == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
return os_brick.privileged.luks.is_luks(device)
class LuksEncryptor(base.VolumeEncryptor):
@ -54,17 +66,41 @@ class LuksEncryptor(base.VolumeEncryptor):
This VolumeEncryptor uses dm-crypt to encrypt the specified volume.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(LuksEncryptor, self).__init__(
root_helper=root_helper,
# TODO(stephenfin): Remove root_helper, execute in AA or later
def __init__(
self,
root_helper=_NO_ARG_SENTINEL,
connection_info=_NO_ARG_SENTINEL,
keymgr=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
*args,
**kwargs,
):
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if connection_info == _NO_ARG_SENTINEL:
raise TypeError('missing connection_info')
if keymgr == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
super().__init__(
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
*args,
**kwargs,
)
# Fail if no device_path was set when connecting the volume, e.g. in
# the case of libvirt network volume drivers.
@ -73,7 +109,8 @@ class LuksEncryptor(base.VolumeEncryptor):
volume_id = data.get('volume_id') or connection_info.get('serial')
raise exception.VolumeEncryptionNotSupported(
volume_id=volume_id,
volume_type=connection_info['driver_volume_type'])
volume_type=connection_info['driver_volume_type'],
)
# the device's path as given to libvirt -- e.g., /dev/disk/by-path/...
self.symlink_path = connection_info['data']['device_path']
@ -88,37 +125,20 @@ class LuksEncryptor(base.VolumeEncryptor):
# old name when the encrypted volume already exists.
old_dev_name = os.path.basename(self.symlink_path)
wwn = data.get('multipath_id')
if self._is_crypt_device_available(old_dev_name):
if os_brick.privileged.luks.is_available(old_dev_name):
self.dev_name = old_dev_name
LOG.debug("Using old encrypted volume name: %s", self.dev_name)
elif wwn and wwn != old_dev_name:
# FibreChannel device could be named '/dev/mapper/<WWN>'.
if self._is_crypt_device_available(wwn):
if os_brick.privileged.luks.is_available(wwn):
self.dev_name = wwn
LOG.debug(
"Using encrypted volume name from wwn: %s", self.dev_name)
"Using encrypted volume name from wwn: %s", self.dev_name
)
# the device's actual path on the compute host -- e.g., /dev/sd_
self.dev_path = os.path.realpath(self.symlink_path)
def _is_crypt_device_available(self, dev_name):
if not os.path.exists('/dev/mapper/%s' % dev_name):
return False
try:
self._execute('cryptsetup', 'status', dev_name, run_as_root=True)
except processutils.ProcessExecutionError as e:
# If /dev/mapper/<dev_name> is a non-crypt block device (such as a
# normal disk or multipath device), exit_code will be 1. In the
# case, we will omit the warning message.
if e.exit_code != 1:
LOG.warning('cryptsetup status %(dev_name)s exited '
'abnormally (status %(exit_code)s): %(err)s',
{"dev_name": dev_name, "exit_code": e.exit_code,
"err": e.stderr})
return False
return True
def _format_volume(self, passphrase, **kwargs):
"""Creates a LUKS v1 header on the volume.
@ -140,25 +160,13 @@ class LuksEncryptor(base.VolumeEncryptor):
"""
LOG.debug("formatting encrypted volume %s", self.dev_path)
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ["cryptsetup", "--batch-mode", "luksFormat", "--type", version,
"--key-file=-"]
# TODO(stephenfin): Make these named kwargs
cipher = kwargs.get("cipher", None)
if cipher is not None:
cmd.extend(["--cipher", cipher])
key_size = kwargs.get("key_size", None)
if key_size is not None:
cmd.extend(["--key-size", key_size])
cmd.extend([self.dev_path])
self._execute(*cmd, process_input=passphrase,
check_exit_code=True, run_as_root=True,
root_helper=self._root_helper,
attempts=3)
os_brick.privileged.luks.format_volume(
self.dev_path, passphrase, version, cipher, key_size
)
def _get_passphrase(self, key):
"""Convert raw key to string."""
@ -170,10 +178,9 @@ class LuksEncryptor(base.VolumeEncryptor):
:param passphrase: the passphrase used to access the volume
"""
LOG.debug("opening encrypted volume %s", self.dev_path)
self._execute('cryptsetup', 'luksOpen', '--key-file=-',
self.dev_path, self.dev_name, process_input=passphrase,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
os_brick.privileged.luks.open_volume(
self.dev_path, self.dev_name, passphrase
)
def attach_volume(self, context, **kwargs):
"""Shadow the device and pass an unencrypted version to the instance.
@ -190,36 +197,29 @@ class LuksEncryptor(base.VolumeEncryptor):
try:
self._open_volume(passphrase, **kwargs)
except processutils.ProcessExecutionError as e:
if e.exit_code == 1 and not is_luks(self._root_helper,
self.dev_path,
execute=self._execute):
if e.exit_code == 1 and not os_brick.privileged.luks.is_luks(
self.dev_path
):
# the device has never been formatted; format it and try again
LOG.info("%s is not a valid LUKS device;"
" formatting device for first use",
self.dev_path)
LOG.info(
"%s is not a valid LUKS device;"
" formatting device for first use",
self.dev_path,
)
self._format_volume(passphrase, **kwargs)
self._open_volume(passphrase, **kwargs)
else:
raise
# modify the original symbolic link to refer to the decrypted device
self._execute('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self._root_helper,
run_as_root=True, check_exit_code=True)
os_brick.privileged.luks.replace_device_mapping(
self.dev_name, self.symlink_path
)
def _close_volume(self, **kwargs):
"""Closes the device (effectively removes the dm-crypt mapping)."""
LOG.debug("closing encrypted volume %s", self.dev_path)
# NOTE(mdbooth): luksClose will return 4 (wrong device specified) if
# the device doesn't exist. We assume here that the caller hasn't
# specified the wrong device, and that it doesn't exist because it
# isn't open. We don't fail in this case in order to make this
# operation idempotent.
self._execute('cryptsetup', 'luksClose', self.dev_name,
run_as_root=True, check_exit_code=[0, 4],
root_helper=self._root_helper,
attempts=3)
os_brick.privileged.luks.close_volume(self.dev_name)
def detach_volume(self, **kwargs):
"""Removes the dm-crypt mapping for the device."""
@ -227,14 +227,13 @@ class LuksEncryptor(base.VolumeEncryptor):
def extend_volume(self, context, **kwargs):
"""Extend an encrypted volume and return the decrypted volume size."""
symlink = self.symlink_path
LOG.debug('Resizing mapping %s to match underlying device', symlink)
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
self._execute('cryptsetup', 'resize', symlink,
process_input=passphrase,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
symlink = self.symlink_path
LOG.debug('Resizing mapping %s to match underlying device', symlink)
os_brick.privileged.luks.extend_volume(symlink, passphrase)
res = utils.get_device_size(self, symlink)
LOG.debug('New size of mapping is %s', res)
return res
@ -245,17 +244,38 @@ class Luks2Encryptor(LuksEncryptor):
This VolumeEncryptor uses dm-crypt to encrypt the specified volume.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(Luks2Encryptor, self).__init__(
root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs) # type: ignore
# TODO(stephenfin): Remove root_helper, execute in AA or later
def __init__(
self,
root_helper=_NO_ARG_SENTINEL,
connection_info=_NO_ARG_SENTINEL,
keymgr=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
*args,
**kwargs,
):
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if connection_info == _NO_ARG_SENTINEL:
raise TypeError('missing connection_info')
if keymgr == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
super().__init__(
connection_info=connection_info, keymgr=keymgr, *args, **kwargs
) # type: ignore
def _format_volume(self, passphrase, **kwargs):
"""Creates a LUKS v2 header on the volume.

View File

@ -13,8 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import debtcollector
from os_brick.encryptors import base
_NO_ARG_SENTINEL = object()
class NoOpEncryptor(base.VolumeEncryptor):
"""A VolumeEncryptor that does nothing.
@ -24,17 +28,41 @@ class NoOpEncryptor(base.VolumeEncryptor):
volume. This implementation performs no action when a volume is attached
or detached.
"""
def __init__(self, root_helper,
connection_info,
keymgr,
execute=None,
*args, **kwargs):
super(NoOpEncryptor, self).__init__(
root_helper=root_helper,
# TODO(stephenfin): Remove root_helper, execute in AA or later
def __init__(
self,
root_helper=_NO_ARG_SENTINEL,
connection_info=_NO_ARG_SENTINEL,
keymgr=_NO_ARG_SENTINEL,
execute=_NO_ARG_SENTINEL,
*args,
**kwargs,
):
if root_helper != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'root_helper' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
if connection_info == _NO_ARG_SENTINEL:
raise TypeError('missing connection_info')
if keymgr == _NO_ARG_SENTINEL:
raise TypeError('missing keymgr')
if execute != _NO_ARG_SENTINEL:
debtcollector.deprecate(
"The 'execute' argument is no longer used and will be "
"removed in a future release; remove this argument."
)
super().__init__(
connection_info=connection_info,
keymgr=keymgr,
execute=execute,
*args, **kwargs)
*args,
**kwargs,
)
def attach_volume(self, context, **kwargs):
pass

View File

@ -0,0 +1,124 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import typing as ty
from oslo_concurrency import processutils
from oslo_log import log as logging
import os_brick.privileged
LOG = logging.getLogger(__name__)
@os_brick.privileged.default.entrypoint
def is_available(device_name: str) -> bool:
"""Checks if the specified crypt device is available.
:param device_name: the name of the device to check
:returns: true if the specified device uses LUKS, else false
"""
if not os.path.exists(f'/dev/mapper/{device_name}'):
return False
cmd = ['cryptsetup', 'status', device_name]
try:
processutils.execute(*cmd)
except processutils.ProcessExecutionError as e:
# If /dev/mapper/<dev_name> is a non-crypt block device (such as a
# normal disk or multipath device), exit_code will be 1. In the
# case, we will omit the warning message.
if e.exit_code != 1:
LOG.warning(
'cryptsetup status %(device_name)s exited '
'abnormally (status %(exit_code)s): %(error)s',
{
"device_name": device_name,
"exit_code": e.exit_code,
"error": e.stderr,
},
)
return False
return True
@os_brick.privileged.default.entrypoint
def replace_device_mapping(
device_name: str,
decrypted_device_path: str,
) -> None:
"""Modify a symbolic link to a device to refer to the decrypted device.
:param device_name: the name of the encrypted device
:param decrypted_device_path: the path to the decrypted device
:returns: None
"""
cmd = [
'ln',
'--symbolic',
'--force',
f'/dev/mapper/{device_name}',
decrypted_device_path,
]
processutils.execute(*cmd)
@os_brick.privileged.default.entrypoint
def open_volume(
device_path: str,
device_name: str,
passphrase: str,
*,
cipher: ty.Optional[str] = None,
key_size: ty.Optional[str] = None,
) -> None:
"""Opens a LUKS partition on a volume using passphrase.
:param device_path: the path to the device to open
:param device_name: the name to use for the mapping
:param passphrase: the passphrase used to access the volume
:param cipher: the cipher to use to access the volume (optional)
:param cipher: the key size to use to access the volume (optional)
:returns: None
"""
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ['cryptsetup', 'create', '--key-file=-']
if cipher:
cmd += ['--cipher', cipher]
if key_size:
cmd += ['--key-size', key_size]
cmd += [device_name, device_path]
processutils.execute(*cmd, process_input=passphrase)
@os_brick.privileged.default.entrypoint
def close_volume(device_name: str) -> None:
"""Close a device.
This effectively means removing the dm-crypt mapping.
:param device_name: the name of the device to close
:returns: None
"""
cmd = ['cryptsetup', 'remove', device_name]
# NOTE(mdbooth): luksClose will return 4 (wrong device specified) if
# the device doesn't exist. We assume here that the caller hasn't
# specified the wrong device, and that it doesn't exist because it
# isn't open. We don't fail in this case in order to make this
# operation idempotent.
processutils.execute(*cmd, check_exit_code=[0, 4], attempts=3)

204
os_brick/privileged/luks.py Normal file
View File

@ -0,0 +1,204 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import typing as ty
from oslo_concurrency import processutils
from oslo_log import log as logging
import os_brick.privileged
LOG = logging.getLogger(__name__)
@os_brick.privileged.default.entrypoint
def is_luks(device_name: str) -> bool:
"""Checks if the specified device uses LUKS for encryption.
:param device_name: the name of the device to check
:returns: true if the specified device uses LUKS, else false
"""
cmd = ['cryptsetup', 'isLuks', '--verbose', device_name]
try:
processutils.execute(*cmd, check_exit_code=True)
except processutils.ProcessExecutionError as e:
LOG.warning(
"isLuks exited abnormally (status %(exit_code)s): %(stderr)s",
{"exit_code": e.exit_code, "stderr": e.stderr},
)
return False
return True
@os_brick.privileged.default.entrypoint
def is_available(device_name: str) -> bool:
"""Check if the specified LUKS device is available.
:param device_name: the name of the device to check
:returns: true if the specified device is available, else false
"""
if not os.path.exists(f'/dev/mapper/{device_name}'):
return False
cmd = ['cryptsetup', 'status', device_name]
try:
processutils.execute(*cmd)
except processutils.ProcessExecutionError as e:
# If /dev/mapper/<dev_name> is a non-crypt block device (such as a
# normal disk or multipath device), exit_code will be 1. In the
# case, we will omit the warning message.
if e.exit_code != 1:
LOG.warning(
'cryptsetup status %(device_name)s exited '
'abnormally (status %(exit_code)s): %(err)s',
{
'dev_name': device_name,
'exit_code': e.exit_code,
'err': e.stderr,
},
)
return False
return True
@os_brick.privileged.default.entrypoint
def replace_device_mapping(
device_name: str,
decrypted_device_path: str,
) -> None:
"""Modify a symbolic link to a device to refer to the decrypted device.
:param device_name: the name of the encrypted device
:param decrypted_device_path: the path to the decrypted device
:returns: None
"""
cmd = [
'ln',
'--symbolic',
'--force',
f'/dev/mapper/{device_name}',
decrypted_device_path,
]
processutils.execute(*cmd, check_exit_code=True)
@os_brick.privileged.default.entrypoint
def open_volume(
device_path: str,
device_name: str,
passphrase: str,
) -> None:
"""Opens a LUKS partition on a volume using passphrase.
:param device_path: the path to the device to open
:param device_name: the name to use for the mapping
:param passphrase: the passphrase used to access the volume
:returns: None
"""
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = [
'cryptsetup',
'luksOpen',
'--key-file=-',
device_path,
device_name,
]
processutils.execute(*cmd, process_input=passphrase, check_exit_code=True)
@os_brick.privileged.default.entrypoint
def close_volume(device_name: str) -> None:
"""Close a device.
This effectively means removing the dm-crypt mapping.
:param device_name: the name of the device to close
:returns: None
"""
cmd = ['cryptsetup', 'luksClose', device_name]
# NOTE(mdbooth): luksClose will return 4 (wrong device specified) if
# the device doesn't exist. We assume here that the caller hasn't
# specified the wrong device, and that it doesn't exist because it
# isn't open. We don't fail in this case in order to make this
# operation idempotent.
processutils.execute(*cmd, check_exit_code=[0, 4], attempts=3)
@os_brick.privileged.default.entrypoint
def extend_volume(
device_path: str,
passphrase: str,
):
"""Extends an encrypted volume.
:param device_path: the path to the device to extend
:param passphrase: the passphrase used to access the volume
:return: None
"""
cmd = ['cryptsetup', 'resize', device_path]
processutils.execute(*cmd, process_input=passphrase, check_exit_code=True)
@os_brick.privileged.default.entrypoint
def format_volume(
device_path: str,
passphrase: str,
version: str,
cipher: ty.Optional[str] = None,
key_size: ty.Optional[str] = None,
):
"""Creates a LUKS header of a given version or type on the volume.
:param device_path: the path to the device to format
:param passphrase: the passphrase used to access the volume
:param version: the LUKS version or type to use: one of `luks`,
`luks1`, or `luks2`. Be aware that `luks` gives you the default LUKS
format preferred by the particular cryptsetup being used (depends on
version and compile time parameters), which could be either LUKS1 or
LUKS2, so it's better to be specific about what you want here
:param cipher: the cipher specification string. The default will depend on
the particular cryptsetup being used (depends on version and compile
time parameters)
:param key_size: the key size in bits. The argument has to be a multiple of
8. The possible key-sizes are limited by the cipher and mode used. The
default will depend on the particular cryptsetup being used (depends on
version and compile time parameters)
:returns: None
"""
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = [
'cryptsetup',
'--batch-mode',
'luksFormat',
'--type',
version,
'--key-file=-',
]
if cipher is not None:
cmd.extend(['--cipher', cipher])
if key_size is not None:
cmd.extend(['--key-size', key_size])
cmd.extend([device_path])
processutils.execute(
*cmd,
process_input=passphrase,
check_exit_code=True,
attempts=3,
)

View File

@ -26,7 +26,7 @@ class VolumeEncryptorTestCase(base.TestCase):
pass
def setUp(self):
super(VolumeEncryptorTestCase, self).setUp()
super().setUp()
self.connection_info = {
"data": {
"device_path": "/dev/disk/by-path/"
@ -34,146 +34,177 @@ class VolumeEncryptorTestCase(base.TestCase):
":volume-fake_uuid-lun-1",
},
}
self.root_helper = None
self.keymgr = fake.fake_api()
self.encryptor = self._create()
class BaseEncryptorTestCase(VolumeEncryptorTestCase):
def _test_get_encryptor(self, provider, expected_provider_class):
encryption = {'control_location': 'front-end',
'provider': provider}
encryption = {
'control_location': 'front-end',
'provider': provider,
}
encryptor = encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
self.assertIsInstance(encryptor, expected_provider_class)
@mock.patch('os_brick.privileged.cryptsetup.is_available', new=mock.Mock())
@mock.patch('os_brick.privileged.luks.is_available', new=mock.Mock())
def test_get_encryptors(self):
self._test_get_encryptor('luks',
encryptors.luks.LuksEncryptor)
self._test_get_encryptor('luks', encryptors.luks.LuksEncryptor)
# TODO(lyarwood): Remove the following in Pike
self._test_get_encryptor('LuksEncryptor',
encryptors.luks.LuksEncryptor)
self._test_get_encryptor('os_brick.encryptors.luks.LuksEncryptor',
encryptors.luks.LuksEncryptor)
self._test_get_encryptor('nova.volume.encryptors.luks.LuksEncryptor',
encryptors.luks.LuksEncryptor)
self._test_get_encryptor(
'LuksEncryptor', encryptors.luks.LuksEncryptor
)
self._test_get_encryptor(
'os_brick.encryptors.luks.LuksEncryptor',
encryptors.luks.LuksEncryptor,
)
self._test_get_encryptor(
'nova.volume.encryptors.luks.LuksEncryptor',
encryptors.luks.LuksEncryptor,
)
self._test_get_encryptor('plain',
encryptors.cryptsetup.CryptsetupEncryptor)
self._test_get_encryptor(
'plain', encryptors.cryptsetup.CryptsetupEncryptor
)
# TODO(lyarwood): Remove the following in Pike
self._test_get_encryptor('CryptsetupEncryptor',
encryptors.cryptsetup.CryptsetupEncryptor)
self._test_get_encryptor(
'CryptsetupEncryptor', encryptors.cryptsetup.CryptsetupEncryptor
)
self._test_get_encryptor(
'os_brick.encryptors.cryptsetup.CryptsetupEncryptor',
encryptors.cryptsetup.CryptsetupEncryptor)
encryptors.cryptsetup.CryptsetupEncryptor,
)
self._test_get_encryptor(
'nova.volume.encryptors.cryptsetup.CryptsetupEncryptor',
encryptors.cryptsetup.CryptsetupEncryptor)
encryptors.cryptsetup.CryptsetupEncryptor,
)
self._test_get_encryptor(None,
encryptors.nop.NoOpEncryptor)
self._test_get_encryptor(None, encryptors.nop.NoOpEncryptor)
# TODO(lyarwood): Remove the following in Pike
self._test_get_encryptor('NoOpEncryptor',
encryptors.nop.NoOpEncryptor)
self._test_get_encryptor('os_brick.encryptors.nop.NoOpEncryptor',
encryptors.nop.NoOpEncryptor)
self._test_get_encryptor('nova.volume.encryptors.nop.NoopEncryptor',
encryptors.nop.NoOpEncryptor)
self._test_get_encryptor('NoOpEncryptor', encryptors.nop.NoOpEncryptor)
self._test_get_encryptor(
'os_brick.encryptors.nop.NoOpEncryptor',
encryptors.nop.NoOpEncryptor,
)
self._test_get_encryptor(
'nova.volume.encryptors.nop.NoopEncryptor',
encryptors.nop.NoOpEncryptor,
)
def test_get_error_encryptors(self):
encryption = {'control_location': 'front-end',
'provider': 'ErrorEncryptor'}
self.assertRaises(ValueError,
encryptors.get_volume_encryptor,
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
encryption = {
'control_location': 'front-end',
'provider': 'ErrorEncryptor',
}
self.assertRaises(
ValueError,
encryptors.get_volume_encryptor,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption,
)
@mock.patch('os_brick.encryptors.LOG')
def test_error_log(self, log):
encryption = {'control_location': 'front-end',
'provider': 'TestEncryptor'}
def test_error_log(self, mock_log):
encryption = {
'control_location': 'front-end',
'provider': 'TestEncryptor',
}
provider = 'TestEncryptor'
try:
encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
except Exception as e:
log.error.assert_called_once_with("Error instantiating "
"%(provider)s: "
"%(exception)s",
{'provider': provider,
'exception': e})
mock_log.error.assert_called_once_with(
"Error instantiating %(provider)s: %(exception)s",
{'provider': provider, 'exception': e},
)
@mock.patch('os_brick.encryptors.LOG')
def test_get_missing_out_of_tree_encryptor_log(self, log):
def test_get_missing_out_of_tree_encryptor_log(self, mock_log):
provider = 'TestEncryptor'
encryption = {'control_location': 'front-end',
'provider': provider}
encryption = {
'control_location': 'front-end',
'provider': provider,
}
try:
encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
except Exception as e:
log.error.assert_called_once_with("Error instantiating "
"%(provider)s: "
"%(exception)s",
{'provider': provider,
'exception': e})
log.warning.assert_called_once_with("Use of the out of tree "
"encryptor class %(provider)s "
"will be blocked with the "
"Queens release of os-brick.",
{'provider': provider})
mock_log.error.assert_called_once_with(
"Error instantiating %(provider)s: %(exception)s",
{'provider': provider, 'exception': e},
)
mock_log.warning.assert_called_once_with(
"Use of the out of tree encryptor class %(provider)s "
"will be blocked with the Queens release of os-brick.",
{'provider': provider},
)
@mock.patch('os_brick.privileged.luks.is_available', new=mock.Mock())
@mock.patch('os_brick.encryptors.LOG')
def test_get_direct_encryptor_log(self, log):
encryption = {'control_location': 'front-end',
'provider': 'LuksEncryptor'}
def test_get_direct_encryptor_log(self, mock_log):
encryption = {
'control_location': 'front-end',
'provider': 'LuksEncryptor',
}
encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
encryption = {'control_location': 'front-end',
'provider': 'os_brick.encryptors.luks.LuksEncryptor'}
encryption = {
'control_location': 'front-end',
'provider': 'os_brick.encryptors.luks.LuksEncryptor',
}
encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
encryption = {'control_location': 'front-end',
'provider': 'nova.volume.encryptors.luks.LuksEncryptor'}
encryption = {
'control_location': 'front-end',
'provider': 'nova.volume.encryptors.luks.LuksEncryptor',
}
encryptors.get_volume_encryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr,
**encryption)
**encryption,
)
log.warning.assert_has_calls([
mock.call("Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider': 'LuksEncryptor'}),
mock.call("Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider':
'os_brick.encryptors.luks.LuksEncryptor'}),
mock.call("Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider':
'nova.volume.encryptors.luks.LuksEncryptor'})])
mock_log.warning.assert_has_calls(
[
mock.call(
"Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider': 'LuksEncryptor'},
),
mock.call(
"Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider': 'os_brick.encryptors.luks.LuksEncryptor'},
),
mock.call(
"Use of the in tree encryptor class %(provider)s by "
"directly referencing the implementation class will be "
"blocked in the Queens release of os-brick.",
{'provider': 'nova.volume.encryptors.luks.LuksEncryptor'},
),
]
)

View File

@ -33,72 +33,66 @@ def fake__get_key(context, passphrase):
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
@mock.patch('os.path.exists', return_value=False)
def _create(self, mock_exists):
@mock.patch(
'os_brick.privileged.cryptsetup.is_available',
return_value=False,
)
def _create(self, mock_is_available):
return cryptsetup.CryptsetupEncryptor(
connection_info=self.connection_info,
root_helper=self.root_helper,
keymgr=self.keymgr)
keymgr=self.keymgr,
)
def setUp(self):
super(CryptsetupEncryptorTestCase, self).setUp()
super().setUp()
self.dev_path = self.connection_info['data']['device_path']
self.dev_name = 'crypt-%s' % self.dev_path.split('/')[-1]
self.symlink_path = self.dev_path
@mock.patch('os_brick.executor.Executor._execute')
def test__open_volume(self, mock_execute):
@mock.patch('os_brick.privileged.cryptsetup.open_volume')
def test__open_volume(self, mock_open_volume):
self.encryptor._open_volume("passphrase")
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input='passphrase',
run_as_root=True,
root_helper=self.root_helper,
check_exit_code=True),
])
mock_open_volume.assert_called_once_with(
self.dev_path,
self.dev_name,
'passphrase',
cipher=None,
key_size=None,
)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume(self, mock_execute):
@mock.patch('os_brick.privileged.cryptsetup.replace_device_mapping')
@mock.patch.object(cryptsetup.CryptsetupEncryptor, '_open_volume')
def test_attach_volume(
self,
mock_open_volume,
mock_replace_device_mapping,
):
fake_key = 'e8b76872e3b04c18b3b6656bbf6f5089'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
mock_open_volume.assert_called_once_with(fake_key)
mock_replace_device_mapping.assert_called_once_with(
self.dev_name,
self.symlink_path,
)
@mock.patch('os_brick.executor.Executor._execute')
def test__close_volume(self, mock_execute):
@mock.patch('os_brick.privileged.cryptsetup.close_volume')
def test__close_volume(self, mock_close_volume):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'remove', self.dev_name,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=[0, 4]),
])
mock_close_volume.assert_called_once_with(self.dev_name)
@mock.patch('os_brick.executor.Executor._execute')
def test_detach_volume(self, mock_execute):
@mock.patch.object(cryptsetup.CryptsetupEncryptor, '_close_volume')
def test_detach_volume(self, mock_close_volume):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'remove', self.dev_name,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=[0, 4]),
])
mock_close_volume.assert_called_once_with()
def test_init_volume_encryption_not_supported(self):
# Tests that creating a CryptsetupEncryptor fails if there is no
@ -106,53 +100,56 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
type = 'unencryptable'
data = dict(volume_id='a194699b-aa07-4433-a945-a5d23802043e')
connection_info = dict(driver_volume_type=type, data=data)
exc = self.assertRaises(exception.VolumeEncryptionNotSupported,
cryptsetup.CryptsetupEncryptor,
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api())
exc = self.assertRaises(
exception.VolumeEncryptionNotSupported,
cryptsetup.CryptsetupEncryptor,
connection_info=connection_info,
keymgr=fake.fake_api(),
)
self.assertIn(type, str(exc))
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os.path.exists', return_value=True)
def test_init_volume_encryption_with_old_name(self, mock_exists,
mock_execute):
@mock.patch(
'os_brick.privileged.cryptsetup.is_available',
return_value=True,
)
def test_init_volume_encryption_with_old_name(self, mock_is_available):
# If an old name crypt device exists, dev_path should be the old name.
old_dev_name = self.dev_path.split('/')[-1]
encryptor = cryptsetup.CryptsetupEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
keymgr=self.keymgr,
)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(old_dev_name, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_called_once_with('/dev/mapper/%s' % old_dev_name)
mock_execute.assert_called_once_with(
'cryptsetup', 'status', old_dev_name, run_as_root=True)
mock_is_available.assert_called_once_with(old_dev_name)
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os.path.exists', side_effect=[False, True])
def test_init_volume_encryption_with_wwn(self, mock_exists, mock_execute):
@mock.patch(
'os_brick.privileged.cryptsetup.is_available',
side_effect=[False, True],
)
def test_init_volume_encryption_with_wwn(self, mock_is_available):
# If an wwn name crypt device exists, dev_path should be based on wwn.
old_dev_name = self.dev_path.split('/')[-1]
wwn = 'fake_wwn'
connection_info = copy.deepcopy(self.connection_info)
connection_info['data']['multipath_id'] = wwn
encryptor = cryptsetup.CryptsetupEncryptor(
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api(),
execute=mock_execute)
)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(wwn, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_has_calls([
mock.call('/dev/mapper/%s' % old_dev_name),
mock.call('/dev/mapper/%s' % wwn)])
mock_execute.assert_called_once_with(
'cryptsetup', 'status', wwn, run_as_root=True)
mock_is_available.assert_has_calls(
[mock.call(old_dev_name), mock.call(wwn)]
)
def test_extend_volume(self):
self.assertRaises(NotImplementedError,

View File

@ -34,7 +34,15 @@ def fake__get_key(context, passphrase):
class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
version = 'luks1'
def setUp(self):
p = mock.patch(
'os_brick.privileged.luks.is_available', return_value=False
)
self.mock_is_available = p.start()
self.addCleanup(p.stop)
super().setUp()
self.dev_path = self.connection_info['data']['device_path']
@ -43,161 +51,135 @@ class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
self.symlink_path = self.dev_path
def _create(self):
return luks.LuksEncryptor(root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
return luks.LuksEncryptor(
connection_info=self.connection_info, keymgr=self.keymgr
)
@mock.patch('os_brick.executor.Executor._execute')
def test_is_luks(self, mock_execute):
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
], any_order=False)
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os_brick.encryptors.luks.LOG')
def test_is_luks_with_error(self, mock_log, mock_execute):
error_msg = "Device %s is not a valid LUKS device." % self.dev_path
mock_execute.side_effect = putils.ProcessExecutionError(
exit_code=1, stderr=error_msg)
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
])
self.assertEqual(1, mock_log.warning.call_count) # warning logged
@mock.patch('os_brick.executor.Executor._execute')
def test__format_volume(self, mock_execute):
@mock.patch('os_brick.privileged.luks.format_volume')
def test__format_volume(self, mock_format_volume):
self.encryptor._format_volume("passphrase")
mock_execute.assert_has_calls([
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks1', '--key-file=-', self.dev_path,
process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
])
mock_format_volume.assert_called_once_with(
self.dev_path, 'passphrase', self.version, None, None
)
@mock.patch('os_brick.executor.Executor._execute')
def test__open_volume(self, mock_execute):
@mock.patch('os_brick.privileged.luks.open_volume')
def test__open_volume(self, mock_open_volume):
self.encryptor._open_volume("passphrase")
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
mock_open_volume.assert_called_once_with(
self.dev_path, self.dev_name, 'passphrase'
)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume(self, mock_execute):
@mock.patch('os_brick.privileged.luks.replace_device_mapping')
@mock.patch('os_brick.privileged.luks.open_volume')
def test_attach_volume(self, mock_open, mock_update_mapping):
fake_key = '0c84146034e747639b698368807286df'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
mock_open(self.dev_path, self.dev_name, fake_key)
mock_update_mapping(self.dev_name, self.symlink_path)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_not_formatted(self, mock_execute):
@mock.patch('os_brick.privileged.luks.replace_device_mapping')
@mock.patch('os_brick.privileged.luks.format_volume')
@mock.patch('os_brick.privileged.luks.is_luks')
@mock.patch('os_brick.privileged.luks.open_volume')
def test_attach_volume_not_formatted(
self,
mock_open,
mock_is_luks,
mock_format,
mock_update_mapping,
):
fake_key = 'bc37c5eccebe403f9cc2d0dd20dac2bc'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
putils.ProcessExecutionError(exit_code=1), # isLuks
mock.DEFAULT, # luksFormat
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
# fail the first attempt to force formatting
mock_open.side_effect = [
putils.ProcessExecutionError(exit_code=1),
mock.DEFAULT,
]
mock_is_luks.return_value = False
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks1', '--key-file=-', self.dev_path,
process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
mock_open.assert_has_calls(
[
mock.call(self.dev_path, self.dev_name, fake_key),
mock.call(self.dev_path, self.dev_name, fake_key),
]
)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_fail(self, mock_execute):
mock_is_luks.assert_called_once_with(self.dev_path)
mock_format.assert_called_once_with(
self.dev_path, fake_key, self.version, None, None
)
mock_update_mapping(self.dev_name, self.symlink_path)
@mock.patch('os_brick.privileged.luks.is_luks')
@mock.patch('os_brick.privileged.luks.open_volume')
def test_attach_volume_fail(self, mock_open, mock_is_luks):
fake_key = 'ea6c2e1b8f7f4f84ae3560116d659ba2'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
mock.DEFAULT, # isLuks
# fail the first attempt to force formatting
mock_open.side_effect = [
putils.ProcessExecutionError(exit_code=1),
mock.DEFAULT,
]
mock_is_luks.return_value = True
self.assertRaises(putils.ProcessExecutionError,
self.encryptor.attach_volume, None)
self.assertRaises(
putils.ProcessExecutionError, self.encryptor.attach_volume, None
)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
@mock.patch('os_brick.executor.Executor._execute')
def test__close_volume(self, mock_execute):
@mock.patch('os_brick.privileged.luks.close_volume')
def test__close_volume(self, mock_close_volume):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=[0, 4]),
])
mock_close_volume.assert_called_once_with(self.dev_name)
@mock.patch('os_brick.executor.Executor._execute')
def test_detach_volume(self, mock_execute):
@mock.patch('os_brick.encryptors.luks.LuksEncryptor._close_volume')
def test_detach_volume(self, mock_close_volume):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=[0, 4]),
])
mock_close_volume.assert_called_once_with()
def test_init_deprecated_options(self):
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
luks.LuksEncryptor(
'sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
luks.LuksEncryptor(
root_helper='sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
with self.assertWarnsRegex(
DeprecationWarning,
"The 'execute' argument is no longer used",
):
luks.LuksEncryptor(
connection_info=self.connection_info,
keymgr=self.keymgr,
execute='foo',
) # would actually be a function but who cares
def test_init_volume_encryption_not_supported(self):
# Tests that creating a CryptsetupEncryptor fails if there is no
@ -205,58 +187,53 @@ class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
type = 'unencryptable'
data = dict(volume_id='a194699b-aa07-4433-a945-a5d23802043e')
connection_info = dict(driver_volume_type=type, data=data)
exc = self.assertRaises(exception.VolumeEncryptionNotSupported,
luks.LuksEncryptor,
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api())
exc = self.assertRaises(
exception.VolumeEncryptionNotSupported,
luks.LuksEncryptor,
connection_info=connection_info,
keymgr=fake.fake_api(),
)
self.assertIn(type, str(exc))
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os.path.exists', return_value=True)
def test_init_volume_encryption_with_old_name(self, mock_exists,
mock_execute):
def test_init_volume_encryption_with_old_name(self):
self.mock_is_available.return_value = True
# If an old name crypt device exists, dev_path should be the old name.
old_dev_name = self.dev_path.split('/')[-1]
encryptor = luks.LuksEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
connection_info=self.connection_info, keymgr=self.keymgr
)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(old_dev_name, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_called_once_with('/dev/mapper/%s' % old_dev_name)
mock_execute.assert_called_once_with(
'cryptsetup', 'status', old_dev_name, run_as_root=True)
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os.path.exists', side_effect=[False, True])
def test_init_volume_encryption_with_wwn(self, mock_exists, mock_execute):
def test_init_volume_encryption_with_wwn(self):
self.mock_is_available.side_effect = [False, True]
# If an wwn name crypt device exists, dev_path should be based on wwn.
old_dev_name = self.dev_path.split('/')[-1]
wwn = 'fake_wwn'
connection_info = copy.deepcopy(self.connection_info)
connection_info['data']['multipath_id'] = wwn
encryptor = luks.LuksEncryptor(
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api())
connection_info=connection_info, keymgr=fake.fake_api()
)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(wwn, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_has_calls([
mock.call('/dev/mapper/%s' % old_dev_name),
mock.call('/dev/mapper/%s' % wwn)])
mock_execute.assert_called_once_with(
'cryptsetup', 'status', wwn, run_as_root=True)
self.mock_is_available.assert_has_calls(
[
mock.call(old_dev_name),
mock.call(wwn),
]
)
@mock.patch('os_brick.utils.get_device_size')
@mock.patch.object(luks.LuksEncryptor, '_execute')
@mock.patch('os_brick.privileged.luks.extend_volume')
@mock.patch.object(luks.LuksEncryptor, '_get_passphrase')
@mock.patch.object(luks.LuksEncryptor, '_get_key')
def test_extend_volume(self, mock_key, mock_pass, mock_exec, mock_size):
def test_extend_volume(self, mock_key, mock_pass, mock_extend, mock_size):
encryptor = self.encryptor
res = encryptor.extend_volume(mock.sentinel.context)
self.assertEqual(mock_size.return_value, res)
@ -265,66 +242,48 @@ class LuksEncryptorTestCase(test_base.VolumeEncryptorTestCase):
mock_key.return_value.get_encoded.assert_called_once_with()
key = mock_key.return_value.get_encoded.return_value
mock_pass.assert_called_once_with(key)
mock_exec.assert_called_once_with(
'cryptsetup', 'resize', encryptor.dev_path,
process_input=mock_pass.return_value, run_as_root=True,
check_exit_code=True, root_helper=encryptor._root_helper)
mock_extend.assert_called_once_with(
encryptor.dev_path, mock_pass.return_value,
)
mock_size.assert_called_once_with(encryptor, encryptor.dev_path)
class Luks2EncryptorTestCase(LuksEncryptorTestCase):
version = 'luks2'
def _create(self):
return luks.Luks2Encryptor(root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
return luks.Luks2Encryptor(
connection_info=self.connection_info, keymgr=self.keymgr
)
@mock.patch('os_brick.executor.Executor._execute')
def test__format_volume(self, mock_execute):
self.encryptor._format_volume("passphrase")
def test_init_deprecated_options(self):
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
luks.Luks2Encryptor(
'sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
mock_execute.assert_has_calls([
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks2', '--key-file=-', self.dev_path,
process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
])
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
luks.Luks2Encryptor(
root_helper='sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_not_formatted(self, mock_execute):
fake_key = 'bc37c5eccebe403f9cc2d0dd20dac2bc'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
putils.ProcessExecutionError(exit_code=1), # isLuks
mock.DEFAULT, # luksFormat
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
]
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks2', '--key-file=-', self.dev_path,
process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
with self.assertWarnsRegex(
DeprecationWarning,
"The 'execute' argument is no longer used",
):
luks.Luks2Encryptor(
connection_info=self.connection_info,
keymgr=self.keymgr,
execute='foo',
) # would actually be a function but who cares

View File

@ -19,9 +19,40 @@ from os_brick.tests.encryptors import test_base
class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self):
return nop.NoOpEncryptor(root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
return nop.NoOpEncryptor(
connection_info=self.connection_info, keymgr=self.keymgr
)
def test_init_deprecated_options(self):
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
nop.NoOpEncryptor(
'sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
with self.assertWarnsRegex(
DeprecationWarning,
"The 'root_helper' argument is no longer used",
):
nop.NoOpEncryptor(
root_helper='sudo',
connection_info=self.connection_info,
keymgr=self.keymgr,
)
with self.assertWarnsRegex(
DeprecationWarning,
"The 'execute' argument is no longer used",
):
nop.NoOpEncryptor(
connection_info=self.connection_info,
keymgr=self.keymgr,
execute='foo',
) # would actually be a function but who cares
def test_attach_volume(self):
test_args = {

View File

@ -0,0 +1,131 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_concurrency import processutils
import os_brick.privileged as privsep_brick
import os_brick.privileged.cryptsetup
from os_brick.tests import base
class PrivCryptsetupTestCase(base.TestCase):
def setUp(self):
super().setUp()
# Disable privsep server/client mode
privsep_brick.default.set_client_mode(False)
self.addCleanup(privsep_brick.default.set_client_mode, True)
@mock.patch(
'os_brick.privileged.cryptsetup.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available(self, mock_execute):
result = os_brick.privileged.cryptsetup.is_available('crypt-test')
self.assertTrue(result)
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
@mock.patch(
'os_brick.privileged.cryptsetup.os.path.exists',
new=mock.Mock(return_value=False),
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__missing_disk(self, mock_execute):
result = os_brick.privileged.cryptsetup.is_available('crypt-test')
self.assertFalse(result)
mock_execute.assert_not_called()
@mock.patch(
'os_brick.privileged.cryptsetup.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('os_brick.privileged.cryptsetup.LOG')
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__with_error_disk(self, mock_execute, mock_log):
mock_execute.side_effect = processutils.ProcessExecutionError(
exit_code=2, stderr='foo'
)
result = os_brick.privileged.cryptsetup.is_available('crypt-test')
self.assertFalse(result)
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
mock_log.warning.assert_called_once()
@mock.patch(
'os_brick.privileged.cryptsetup.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('os_brick.privileged.cryptsetup.LOG')
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__with_non_crypt_disk(self, mock_execute, mock_log):
mock_execute.side_effect = processutils.ProcessExecutionError(
exit_code=1, stderr='foo'
)
result = os_brick.privileged.cryptsetup.is_available('crypt-test')
self.assertFalse(result)
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
mock_log.warning.assert_not_called()
@mock.patch('oslo_concurrency.processutils.execute')
def test_replace_device_mapping(self, mock_execute):
os_brick.privileged.cryptsetup.replace_device_mapping(
'crypt-test', '/dev/disk/by-path/'
)
mock_execute.assert_called_once_with(
'ln',
'--symbolic',
'--force',
'/dev/mapper/crypt-test',
'/dev/disk/by-path/',
),
@mock.patch('oslo_concurrency.processutils.execute')
def test_open_volume(self, mock_execute):
os_brick.privileged.cryptsetup.open_volume(
'/dev/disk/by-path/', 'crypt-test', 'passphrase'
)
mock_execute.assert_called_once_with(
'cryptsetup',
'create',
'--key-file=-',
'crypt-test',
'/dev/disk/by-path/',
process_input='passphrase',
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_close_volume(self, mock_execute):
os_brick.privileged.cryptsetup.close_volume('crypt-test')
mock_execute.assert_called_once_with(
'cryptsetup',
'remove',
'crypt-test',
check_exit_code=[0, 4],
attempts=3,
)

View File

@ -0,0 +1,191 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_concurrency import processutils
import os_brick.privileged as privsep_brick
import os_brick.privileged.luks
from os_brick.tests import base
class PrivLUKSTestCase(base.TestCase):
def setUp(self):
super().setUp()
# Disable privsep server/client mode
privsep_brick.default.set_client_mode(False)
self.addCleanup(privsep_brick.default.set_client_mode, True)
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_luks(self, mock_execute):
os_brick.privileged.luks.is_luks('crypt-test')
mock_execute.assert_called_with(
'cryptsetup',
'isLuks',
'--verbose',
'crypt-test',
check_exit_code=True,
)
@mock.patch('os_brick.privileged.luks.LOG')
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_luks__with_error(self, mock_execute, mock_log):
mock_execute.side_effect = processutils.ProcessExecutionError(
exit_code=1, stderr='foo'
)
os_brick.privileged.luks.is_luks('crypt-test')
mock_execute.assert_called_with(
'cryptsetup',
'isLuks',
'--verbose',
'crypt-test',
check_exit_code=True,
)
mock_log.warning.assert_called_once()
@mock.patch(
'os_brick.privileged.luks.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available(self, mock_execute):
self.assertTrue(os_brick.privileged.luks.is_available('crypt-test'))
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
@mock.patch(
'os_brick.privileged.luks.os.path.exists',
new=mock.Mock(return_value=False),
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__missing_disk(self, mock_execute):
self.assertFalse(os_brick.privileged.luks.is_available('crypt-test'))
mock_execute.assert_not_called()
@mock.patch(
'os_brick.privileged.luks.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('os_brick.privileged.luks.LOG')
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__with_error_disk(self, mock_execute, mock_log):
mock_execute.side_effect = processutils.ProcessExecutionError(
exit_code=2, stderr='foo'
)
self.assertFalse(os_brick.privileged.luks.is_available('crypt-test'))
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
mock_log.warning.assert_called_once()
@mock.patch(
'os_brick.privileged.luks.os.path.exists',
new=mock.Mock(return_value=True),
)
@mock.patch('os_brick.privileged.luks.LOG')
@mock.patch('oslo_concurrency.processutils.execute')
def test_is_available__with_non_crypt_disk(self, mock_execute, mock_log):
mock_execute.side_effect = processutils.ProcessExecutionError(
exit_code=1, stderr='foo'
)
self.assertFalse(os_brick.privileged.luks.is_available('crypt-test'))
mock_execute.assert_called_once_with(
'cryptsetup', 'status', 'crypt-test'
)
mock_log.warning.assert_not_called()
@mock.patch('oslo_concurrency.processutils.execute')
def test_replace_device_mapping(self, mock_execute):
os_brick.privileged.luks.replace_device_mapping(
'crypt-test', '/dev/disk/by-path/'
)
mock_execute.assert_called_once_with(
'ln',
'--symbolic',
'--force',
'/dev/mapper/crypt-test',
'/dev/disk/by-path/',
check_exit_code=True,
),
@mock.patch('oslo_concurrency.processutils.execute')
def test_open_volume(self, mock_execute):
os_brick.privileged.luks.open_volume(
'/dev/disk/by-path/', 'crypt-test', 'passphrase'
)
mock_execute.assert_called_once_with(
'cryptsetup',
'luksOpen',
'--key-file=-',
'/dev/disk/by-path/',
'crypt-test',
process_input='passphrase',
check_exit_code=True,
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_close_volume(self, mock_execute):
os_brick.privileged.luks.close_volume('crypt-test')
mock_execute.assert_called_once_with(
'cryptsetup',
'luksClose',
'crypt-test',
check_exit_code=[0, 4],
attempts=3,
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_extend_volume(self, mock_execute):
os_brick.privileged.luks.extend_volume(
'/dev/disk/by-path/', 'passphrase'
)
mock_execute.assert_called_once_with(
'cryptsetup',
'resize',
'/dev/disk/by-path/',
process_input='passphrase',
check_exit_code=True,
)
@mock.patch('oslo_concurrency.processutils.execute')
def test_format_volume(self, mock_execute):
os_brick.privileged.luks.format_volume(
'/dev/disk/by-path/', 'passphrase', 'luks1'
)
mock_execute.assert_called_once_with(
'cryptsetup',
'--batch-mode',
'luksFormat',
'--type',
'luks1',
'--key-file=-',
'/dev/disk/by-path/',
process_input='passphrase',
check_exit_code=True,
attempts=3,
)

View File

@ -0,0 +1,14 @@
---
features:
- |
All supported encryptors, namely ``NoOpEncryptor``, ``LuksEncryptor``,
``Luks2Encryptor``, and the deprecated ``CryptsetupEncryptor``, have been
switched to privsep. This should have no impact for most users as nova and
cinder already use privsep elsewhere.
upgrade:
- |
The ``NoOpEncryptor``, ``LuksEncryptor``, ``Luks2Encryptor``, and
deprecated ``CryptsetupEncryptor`` encryptors no longer require
``root_helper`` or ``execute`` arguments. These arguments have been
deprecated for removal in a future release. Callers can simply drop these
arguments.