os-brick/os_brick/tests/encryptors/test_luks.py

256 lines
12 KiB
Python

# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import mock
from castellan.common.objects import symmetric_key as key
from os_brick.encryptors import luks
from os_brick.tests.encryptors import test_cryptsetup
from oslo_concurrency import processutils as putils
class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
def _create(self):
return luks.LuksEncryptor(root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=self.keymgr)
@mock.patch('os_brick.executor.Executor._execute')
def test_is_luks(self, mock_execute):
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
], any_order=False)
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os_brick.encryptors.luks.LOG')
def test_is_luks_with_error(self, mock_log, mock_execute):
error_msg = "Device %s is not a valid LUKS device." % self.dev_path
mock_execute.side_effect = putils.ProcessExecutionError(
exit_code=1, stderr=error_msg)
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, root_helper=self.root_helper,
check_exit_code=True),
])
self.assertEqual(1, mock_log.warning.call_count) # warning logged
@mock.patch('os_brick.executor.Executor._execute')
def test__format_volume(self, mock_execute):
self.encryptor._format_volume("passphrase")
mock_execute.assert_has_calls([
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--key-file=-', self.dev_path,
process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
])
@mock.patch('os_brick.executor.Executor._execute')
def test__open_volume(self, mock_execute):
self.encryptor._open_volume("passphrase")
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='passphrase',
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume(self, mock_execute):
fake_key = '0c84146034e747639b698368807286df'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None, fake_key))
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
])
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_not_formatted(self, mock_execute):
fake_key = 'bc37c5eccebe403f9cc2d0dd20dac2bc'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None, fake_key))
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
putils.ProcessExecutionError(exit_code=1), # isLuks
mock.DEFAULT, # luksFormat
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
]
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--key-file=-', self.dev_path, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_fail(self, mock_execute):
fake_key = 'ea6c2e1b8f7f4f84ae3560116d659ba2'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None, fake_key))
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
mock.DEFAULT, # isLuks
]
self.assertRaises(putils.ProcessExecutionError,
self.encryptor.attach_volume, None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
], any_order=False)
@mock.patch('os_brick.executor.Executor._execute')
def test__close_volume(self, mock_execute):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=[0, 4]),
])
@mock.patch('os_brick.executor.Executor._execute')
def test_detach_volume(self, mock_execute):
self.encryptor.detach_volume()
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=[0, 4]),
])
def test_get_mangled_passphrase(self):
# Confirm that a mangled passphrase is provided as per bug#1633518
unmangled_raw_key = bytes(binascii.unhexlify('0725230b'))
symmetric_key = key.SymmetricKey('AES', len(unmangled_raw_key) * 8,
unmangled_raw_key)
unmangled_encoded_key = symmetric_key.get_encoded()
self.assertEqual(self.encryptor._get_mangled_passphrase(
unmangled_encoded_key), '72523b')
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_unmangle_passphrase(self, mock_execute):
fake_key = '0725230b'
fake_key_mangled = '72523b'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = \
test_cryptsetup.fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=2), # luksOpen
mock.DEFAULT, # luksOpen
mock.DEFAULT, # luksClose
mock.DEFAULT, # luksAddKey
mock.DEFAULT, # luksOpen
mock.DEFAULT, # luksClose
mock.DEFAULT, # luksRemoveKey
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
]
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key_mangled,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=[0, 4], attempts=3),
mock.call('cryptsetup', 'luksAddKey', self.dev_path,
'--force-password',
process_input=''.join([fake_key_mangled,
'\n', fake_key,
'\n', fake_key]),
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=[0, 4], attempts=3),
mock.call('cryptsetup', 'luksRemoveKey', self.dev_path,
process_input=fake_key_mangled,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
], any_order=False)
self.assertEqual(9, mock_execute.call_count)