encryptors: Mock os_brick.executor correctly
Previously the encryptor test classes would mock the privileged rootwrap executor directly in the setUp method of each class. This will potentially race and is not required when mocking out execute calls within os_brick. This change also introduces a BaseEncryptorTestCase test class to avoid duplicate runs of the various get_encryptors test methods. Change-Id: I170fbcc07672c9c77b613c1eb84bcefbcd42ce77
This commit is contained in:
parent
2a092e28f9
commit
6f1bb596dc
@ -23,12 +23,11 @@ from os_brick.tests import base
|
|||||||
|
|
||||||
|
|
||||||
class VolumeEncryptorTestCase(base.TestCase):
|
class VolumeEncryptorTestCase(base.TestCase):
|
||||||
def _create(self, root_helper, connection_info, keymgr, execute):
|
def _create(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(VolumeEncryptorTestCase, self).setUp()
|
super(VolumeEncryptorTestCase, self).setUp()
|
||||||
self.cmds = []
|
|
||||||
self.connection_info = {
|
self.connection_info = {
|
||||||
"data": {
|
"data": {
|
||||||
"device_path": "/dev/disk/by-path/"
|
"device_path": "/dev/disk/by-path/"
|
||||||
@ -36,36 +35,23 @@ class VolumeEncryptorTestCase(base.TestCase):
|
|||||||
":volume-fake_uuid-lun-1",
|
":volume-fake_uuid-lun-1",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
patcher = mock.patch("os_brick.privileged.rootwrap.execute")
|
|
||||||
self.mock_execute = patcher.start()
|
|
||||||
self.addCleanup(patcher.stop)
|
|
||||||
_hex = codecs.getdecoder("hex_codec")('0' * 32)[0]
|
_hex = codecs.getdecoder("hex_codec")('0' * 32)[0]
|
||||||
self.encryption_key = array.array('B', _hex).tolist()
|
self.encryption_key = array.array('B', _hex).tolist()
|
||||||
self.root_helper = None
|
self.root_helper = None
|
||||||
self.encryptor = self._create(root_helper=self.root_helper,
|
self.keymgr = fake.fake_api()
|
||||||
connection_info=self.connection_info,
|
self.encryptor = self._create()
|
||||||
keymgr=fake.fake_api(),
|
|
||||||
execute=self.mock_execute)
|
|
||||||
|
|
||||||
def assert_exec_has_calls(self, expected_calls, any_order=False):
|
|
||||||
"""Check that the root exec mock has calls, excluding child calls."""
|
class BaseEncryptorTestCase(VolumeEncryptorTestCase):
|
||||||
if any_order:
|
|
||||||
self.assertSetEqual(set(expected_calls),
|
|
||||||
set(self.mock_execute.call_args_list))
|
|
||||||
else:
|
|
||||||
self.assertListEqual(expected_calls,
|
|
||||||
self.mock_execute.call_args_list)
|
|
||||||
|
|
||||||
def test_get_encryptors(self):
|
def test_get_encryptors(self):
|
||||||
root_helper = None
|
|
||||||
|
|
||||||
encryption = {'control_location': 'front-end',
|
encryption = {'control_location': 'front-end',
|
||||||
'provider': 'LuksEncryptor'}
|
'provider': 'LuksEncryptor'}
|
||||||
encryptor = encryptors.get_volume_encryptor(
|
encryptor = encryptors.get_volume_encryptor(
|
||||||
root_helper=root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr,
|
||||||
execute=self.mock_execute,
|
|
||||||
**encryption)
|
**encryption)
|
||||||
|
|
||||||
self.assertIsInstance(encryptor,
|
self.assertIsInstance(encryptor,
|
||||||
@ -75,10 +61,9 @@ class VolumeEncryptorTestCase(base.TestCase):
|
|||||||
encryption = {'control_location': 'front-end',
|
encryption = {'control_location': 'front-end',
|
||||||
'provider': 'CryptsetupEncryptor'}
|
'provider': 'CryptsetupEncryptor'}
|
||||||
encryptor = encryptors.get_volume_encryptor(
|
encryptor = encryptors.get_volume_encryptor(
|
||||||
root_helper=root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr,
|
||||||
execute=self.mock_execute,
|
|
||||||
**encryption)
|
**encryption)
|
||||||
|
|
||||||
self.assertIsInstance(encryptor,
|
self.assertIsInstance(encryptor,
|
||||||
@ -89,25 +74,23 @@ class VolumeEncryptorTestCase(base.TestCase):
|
|||||||
encryption = {'control_location': 'front-end',
|
encryption = {'control_location': 'front-end',
|
||||||
'provider': 'NoOpEncryptor'}
|
'provider': 'NoOpEncryptor'}
|
||||||
encryptor = encryptors.get_volume_encryptor(
|
encryptor = encryptors.get_volume_encryptor(
|
||||||
root_helper=root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr,
|
||||||
execute=self.mock_execute,
|
|
||||||
**encryption)
|
**encryption)
|
||||||
|
|
||||||
self.assertIsInstance(encryptor,
|
self.assertIsInstance(encryptor,
|
||||||
encryptors.nop.NoOpEncryptor,
|
encryptors.nop.NoOpEncryptor,
|
||||||
"encryptor is not an instance of NoOpEncryptor")
|
"encryptor is not an instance of NoOpEncryptor")
|
||||||
|
|
||||||
def test_get_error_encryptos(self):
|
def test_get_error_encryptors(self):
|
||||||
encryption = {'control_location': 'front-end',
|
encryption = {'control_location': 'front-end',
|
||||||
'provider': 'ErrorEncryptor'}
|
'provider': 'ErrorEncryptor'}
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
encryptors.get_volume_encryptor,
|
encryptors.get_volume_encryptor,
|
||||||
root_helper=None,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr,
|
||||||
execute=self.mock_execute,
|
|
||||||
**encryption)
|
**encryption)
|
||||||
|
|
||||||
@mock.patch('os_brick.encryptors.LOG')
|
@mock.patch('os_brick.encryptors.LOG')
|
||||||
@ -117,10 +100,9 @@ class VolumeEncryptorTestCase(base.TestCase):
|
|||||||
provider = 'TestEncryptor'
|
provider = 'TestEncryptor'
|
||||||
try:
|
try:
|
||||||
encryptors.get_volume_encryptor(
|
encryptors.get_volume_encryptor(
|
||||||
root_helper=None,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr,
|
||||||
execute=self.mock_execute,
|
|
||||||
**encryption)
|
**encryption)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error.assert_called_once_with("Error instantiating "
|
log.error.assert_called_once_with("Error instantiating "
|
||||||
|
@ -33,16 +33,14 @@ def fake__get_key(context):
|
|||||||
return symmetric_key
|
return symmetric_key
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('os_brick.executor.encodeutils.safe_decode',
|
|
||||||
lambda x, errors=None: x)
|
|
||||||
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
||||||
|
|
||||||
@mock.patch('os.path.exists', return_value=False)
|
@mock.patch('os.path.exists', return_value=False)
|
||||||
def _create(self, mock_exists, root_helper,
|
def _create(self, mock_exists):
|
||||||
connection_info, keymgr, execute):
|
return cryptsetup.CryptsetupEncryptor(
|
||||||
return cryptsetup.CryptsetupEncryptor(root_helper=root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=keymgr,
|
keymgr=self.keymgr)
|
||||||
execute=execute)
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(CryptsetupEncryptorTestCase, self).setUp()
|
super(CryptsetupEncryptorTestCase, self).setUp()
|
||||||
@ -52,10 +50,11 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
|
|
||||||
self.symlink_path = self.dev_path
|
self.symlink_path = self.dev_path
|
||||||
|
|
||||||
def test__open_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test__open_volume(self, mock_execute):
|
||||||
self.encryptor._open_volume("passphrase")
|
self.encryptor._open_volume("passphrase")
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
|
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
|
||||||
self.dev_path, process_input='passphrase',
|
self.dev_path, process_input='passphrase',
|
||||||
run_as_root=True,
|
run_as_root=True,
|
||||||
@ -63,13 +62,14 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
check_exit_code=True),
|
check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_attach_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_attach_volume(self, mock_execute):
|
||||||
self.encryptor._get_key = mock.MagicMock()
|
self.encryptor._get_key = mock.MagicMock()
|
||||||
self.encryptor._get_key.return_value = fake__get_key(None)
|
self.encryptor._get_key.return_value = fake__get_key(None)
|
||||||
|
|
||||||
self.encryptor.attach_volume(None)
|
self.encryptor.attach_volume(None)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
|
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
|
||||||
self.dev_path, process_input='0' * 32,
|
self.dev_path, process_input='0' * 32,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
@ -80,19 +80,21 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test__close_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test__close_volume(self, mock_execute):
|
||||||
self.encryptor.detach_volume()
|
self.encryptor.detach_volume()
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'remove', self.dev_name,
|
mock.call('cryptsetup', 'remove', self.dev_name,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_detach_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_detach_volume(self, mock_execute):
|
||||||
self.encryptor.detach_volume()
|
self.encryptor.detach_volume()
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'remove', self.dev_name,
|
mock.call('cryptsetup', 'remove', self.dev_name,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
@ -111,26 +113,27 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
keymgr=fake.fake_api())
|
keymgr=fake.fake_api())
|
||||||
self.assertIn(type, six.text_type(exc))
|
self.assertIn(type, six.text_type(exc))
|
||||||
|
|
||||||
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
@mock.patch('os.path.exists', return_value=True)
|
@mock.patch('os.path.exists', return_value=True)
|
||||||
def test_init_volume_encryption_with_old_name(self,
|
def test_init_volume_encryption_with_old_name(self, mock_exists,
|
||||||
mock_exists):
|
mock_execute):
|
||||||
# If an old name crypt device exists, dev_path should be the old name.
|
# If an old name crypt device exists, dev_path should be the old name.
|
||||||
old_dev_name = self.dev_path.split('/')[-1]
|
old_dev_name = self.dev_path.split('/')[-1]
|
||||||
encryptor = cryptsetup.CryptsetupEncryptor(
|
encryptor = cryptsetup.CryptsetupEncryptor(
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=self.connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=self.keymgr)
|
||||||
execute=self.mock_execute)
|
|
||||||
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
|
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
|
||||||
self.assertEqual(old_dev_name, encryptor.dev_name)
|
self.assertEqual(old_dev_name, encryptor.dev_name)
|
||||||
self.assertEqual(self.dev_path, encryptor.dev_path)
|
self.assertEqual(self.dev_path, encryptor.dev_path)
|
||||||
self.assertEqual(self.symlink_path, encryptor.symlink_path)
|
self.assertEqual(self.symlink_path, encryptor.symlink_path)
|
||||||
mock_exists.assert_called_once_with('/dev/mapper/%s' % old_dev_name)
|
mock_exists.assert_called_once_with('/dev/mapper/%s' % old_dev_name)
|
||||||
self.mock_execute.assert_called_once_with(
|
mock_execute.assert_called_once_with(
|
||||||
'cryptsetup', 'status', old_dev_name, run_as_root=True)
|
'cryptsetup', 'status', old_dev_name, run_as_root=True)
|
||||||
|
|
||||||
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
@mock.patch('os.path.exists', side_effect=[False, True])
|
@mock.patch('os.path.exists', side_effect=[False, True])
|
||||||
def test_init_volume_encryption_with_wwn(self, mock_exists):
|
def test_init_volume_encryption_with_wwn(self, mock_exists, mock_execute):
|
||||||
# If an wwn name crypt device exists, dev_path should be based on wwn.
|
# If an wwn name crypt device exists, dev_path should be based on wwn.
|
||||||
old_dev_name = self.dev_path.split('/')[-1]
|
old_dev_name = self.dev_path.split('/')[-1]
|
||||||
wwn = 'fake_wwn'
|
wwn = 'fake_wwn'
|
||||||
@ -140,7 +143,7 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
connection_info=connection_info,
|
connection_info=connection_info,
|
||||||
keymgr=fake.fake_api(),
|
keymgr=fake.fake_api(),
|
||||||
execute=self.mock_execute)
|
execute=mock_execute)
|
||||||
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
|
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
|
||||||
self.assertEqual(wwn, encryptor.dev_name)
|
self.assertEqual(wwn, encryptor.dev_name)
|
||||||
self.assertEqual(self.dev_path, encryptor.dev_path)
|
self.assertEqual(self.dev_path, encryptor.dev_path)
|
||||||
@ -148,5 +151,5 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
|||||||
mock_exists.assert_has_calls([
|
mock_exists.assert_has_calls([
|
||||||
mock.call('/dev/mapper/%s' % old_dev_name),
|
mock.call('/dev/mapper/%s' % old_dev_name),
|
||||||
mock.call('/dev/mapper/%s' % wwn)])
|
mock.call('/dev/mapper/%s' % wwn)])
|
||||||
self.mock_execute.assert_called_once_with(
|
mock_execute.assert_called_once_with(
|
||||||
'cryptsetup', 'status', wwn, run_as_root=True)
|
'cryptsetup', 'status', wwn, run_as_root=True)
|
||||||
|
@ -21,34 +21,32 @@ from os_brick.tests.encryptors import test_cryptsetup
|
|||||||
from oslo_concurrency import processutils as putils
|
from oslo_concurrency import processutils as putils
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('os_brick.executor.encodeutils.safe_decode',
|
|
||||||
lambda x, errors=None: x)
|
|
||||||
class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
||||||
def _create(self, root_helper, connection_info, keymgr, execute):
|
def _create(self):
|
||||||
return luks.LuksEncryptor(root_helper=root_helper,
|
return luks.LuksEncryptor(root_helper=self.root_helper,
|
||||||
connection_info=connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=keymgr,
|
keymgr=self.keymgr)
|
||||||
execute=execute)
|
|
||||||
|
|
||||||
def test_is_luks(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
luks.is_luks(self.root_helper, self.dev_path)
|
def test_is_luks(self, mock_execute):
|
||||||
|
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
|
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
|
||||||
run_as_root=True, root_helper=self.root_helper,
|
run_as_root=True, root_helper=self.root_helper,
|
||||||
check_exit_code=True),
|
check_exit_code=True),
|
||||||
], any_order=False)
|
], any_order=False)
|
||||||
|
|
||||||
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
@mock.patch('os_brick.encryptors.luks.LOG')
|
@mock.patch('os_brick.encryptors.luks.LOG')
|
||||||
def test_is_luks_with_error(self, mock_log):
|
def test_is_luks_with_error(self, mock_log, mock_execute):
|
||||||
error_msg = "Device %s is not a valid LUKS device." % self.dev_path
|
error_msg = "Device %s is not a valid LUKS device." % self.dev_path
|
||||||
self.mock_execute.side_effect = \
|
mock_execute.side_effect = putils.ProcessExecutionError(
|
||||||
putils.ProcessExecutionError(exit_code=1,
|
exit_code=1, stderr=error_msg)
|
||||||
stderr=error_msg)
|
|
||||||
|
|
||||||
luks.is_luks(self.root_helper, self.dev_path)
|
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
|
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
|
||||||
run_as_root=True, root_helper=self.root_helper,
|
run_as_root=True, root_helper=self.root_helper,
|
||||||
check_exit_code=True),
|
check_exit_code=True),
|
||||||
@ -56,19 +54,11 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
|
|
||||||
self.assertEqual(1, mock_log.warning.call_count) # warning logged
|
self.assertEqual(1, mock_log.warning.call_count) # warning logged
|
||||||
|
|
||||||
def test_is_luks_with_execute(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
mock_execute = mock.Mock()
|
def test__format_volume(self, mock_execute):
|
||||||
luks.is_luks(self.root_helper, self.dev_path, execute=mock_execute)
|
|
||||||
self.assertListEqual(
|
|
||||||
[mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
|
|
||||||
run_as_root=True, root_helper=self.root_helper,
|
|
||||||
check_exit_code=True)],
|
|
||||||
mock_execute.call_args_list)
|
|
||||||
|
|
||||||
def test__format_volume(self):
|
|
||||||
self.encryptor._format_volume("passphrase")
|
self.encryptor._format_volume("passphrase")
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
|
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
|
||||||
'--key-file=-', self.dev_path,
|
'--key-file=-', self.dev_path,
|
||||||
process_input='passphrase',
|
process_input='passphrase',
|
||||||
@ -76,24 +66,26 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
run_as_root=True, check_exit_code=True, attempts=3),
|
run_as_root=True, check_exit_code=True, attempts=3),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test__open_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test__open_volume(self, mock_execute):
|
||||||
self.encryptor._open_volume("passphrase")
|
self.encryptor._open_volume("passphrase")
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
||||||
self.dev_name, process_input='passphrase',
|
self.dev_name, process_input='passphrase',
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_attach_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_attach_volume(self, mock_execute):
|
||||||
self.encryptor._get_key = mock.MagicMock()
|
self.encryptor._get_key = mock.MagicMock()
|
||||||
self.encryptor._get_key.return_value = (
|
self.encryptor._get_key.return_value = (
|
||||||
test_cryptsetup.fake__get_key(None))
|
test_cryptsetup.fake__get_key(None))
|
||||||
|
|
||||||
self.encryptor.attach_volume(None)
|
self.encryptor.attach_volume(None)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
||||||
self.dev_name, process_input='0' * 32,
|
self.dev_name, process_input='0' * 32,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
@ -104,12 +96,13 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_attach_volume_not_formatted(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_attach_volume_not_formatted(self, mock_execute):
|
||||||
self.encryptor._get_key = mock.MagicMock()
|
self.encryptor._get_key = mock.MagicMock()
|
||||||
self.encryptor._get_key.return_value = (
|
self.encryptor._get_key.return_value = (
|
||||||
test_cryptsetup.fake__get_key(None))
|
test_cryptsetup.fake__get_key(None))
|
||||||
|
|
||||||
self.mock_execute.side_effect = [
|
mock_execute.side_effect = [
|
||||||
putils.ProcessExecutionError(exit_code=1), # luksOpen
|
putils.ProcessExecutionError(exit_code=1), # luksOpen
|
||||||
putils.ProcessExecutionError(exit_code=1), # isLuks
|
putils.ProcessExecutionError(exit_code=1), # isLuks
|
||||||
mock.DEFAULT, # luksFormat
|
mock.DEFAULT, # luksFormat
|
||||||
@ -119,7 +112,7 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
|
|
||||||
self.encryptor.attach_volume(None)
|
self.encryptor.attach_volume(None)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
||||||
self.dev_name, process_input='0' * 32,
|
self.dev_name, process_input='0' * 32,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
@ -141,12 +134,13 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
], any_order=False)
|
], any_order=False)
|
||||||
|
|
||||||
def test_attach_volume_fail(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_attach_volume_fail(self, mock_execute):
|
||||||
self.encryptor._get_key = mock.MagicMock()
|
self.encryptor._get_key = mock.MagicMock()
|
||||||
self.encryptor._get_key.return_value = (
|
self.encryptor._get_key.return_value = (
|
||||||
test_cryptsetup.fake__get_key(None))
|
test_cryptsetup.fake__get_key(None))
|
||||||
|
|
||||||
self.mock_execute.side_effect = [
|
mock_execute.side_effect = [
|
||||||
putils.ProcessExecutionError(exit_code=1), # luksOpen
|
putils.ProcessExecutionError(exit_code=1), # luksOpen
|
||||||
mock.DEFAULT, # isLuks
|
mock.DEFAULT, # isLuks
|
||||||
]
|
]
|
||||||
@ -154,7 +148,7 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
self.assertRaises(putils.ProcessExecutionError,
|
self.assertRaises(putils.ProcessExecutionError,
|
||||||
self.encryptor.attach_volume, None)
|
self.encryptor.attach_volume, None)
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
|
||||||
self.dev_name, process_input='0' * 32,
|
self.dev_name, process_input='0' * 32,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
@ -164,19 +158,21 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
|
|||||||
run_as_root=True, check_exit_code=True),
|
run_as_root=True, check_exit_code=True),
|
||||||
], any_order=False)
|
], any_order=False)
|
||||||
|
|
||||||
def test__close_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test__close_volume(self, mock_execute):
|
||||||
self.encryptor.detach_volume()
|
self.encryptor.detach_volume()
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksClose', self.dev_name,
|
mock.call('cryptsetup', 'luksClose', self.dev_name,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
attempts=3, run_as_root=True, check_exit_code=True),
|
attempts=3, run_as_root=True, check_exit_code=True),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_detach_volume(self):
|
@mock.patch('os_brick.executor.Executor._execute')
|
||||||
|
def test_detach_volume(self, mock_execute):
|
||||||
self.encryptor.detach_volume()
|
self.encryptor.detach_volume()
|
||||||
|
|
||||||
self.assert_exec_has_calls([
|
mock_execute.assert_has_calls([
|
||||||
mock.call('cryptsetup', 'luksClose', self.dev_name,
|
mock.call('cryptsetup', 'luksClose', self.dev_name,
|
||||||
root_helper=self.root_helper,
|
root_helper=self.root_helper,
|
||||||
attempts=3, run_as_root=True, check_exit_code=True),
|
attempts=3, run_as_root=True, check_exit_code=True),
|
||||||
|
@ -18,11 +18,10 @@ from os_brick.tests.encryptors import test_base
|
|||||||
|
|
||||||
|
|
||||||
class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):
|
||||||
def _create(self, root_helper, connection_info, keymgr, execute):
|
def _create(self):
|
||||||
return nop.NoOpEncryptor(root_helper=root_helper,
|
return nop.NoOpEncryptor(root_helper=self.root_helper,
|
||||||
connection_info=connection_info,
|
connection_info=self.connection_info,
|
||||||
keymgr=keymgr,
|
keymgr=self.keymgr)
|
||||||
execute=execute)
|
|
||||||
|
|
||||||
def test_attach_volume(self):
|
def test_attach_volume(self):
|
||||||
self.encryptor.attach_volume(None)
|
self.encryptor.attach_volume(None)
|
||||||
|
Loading…
Reference in New Issue
Block a user