Merge "encryptors: Workaround mangled passphrases"

This commit is contained in:
Jenkins 2016-11-30 11:49:44 +00:00 committed by Gerrit Code Review
commit b567298c96
5 changed files with 196 additions and 21 deletions

View File

@ -14,12 +14,13 @@
# under the License.
import array
import binascii
import os
from os_brick.encryptors import base
from os_brick import exception
from os_brick.i18n import _LW
from os_brick.i18n import _LW, _LI
from oslo_concurrency import processutils
from oslo_log import log as logging
@ -127,6 +128,17 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
check_exit_code=True, run_as_root=True,
root_helper=self._root_helper)
def _get_mangled_passphrase(self, key):
"""Convert the raw key into a list of unsigned int's and then a string
"""
# NOTE(lyarwood): This replicates the methods used prior to Newton to
# first encode the passphrase as a list of unsigned int's before
# decoding back into a string. This method strips any leading 0's
# of the resulting hex digit pairs, resulting in a different
# passphrase being returned.
encoded_key = array.array('B', key).tolist()
return ''.join(hex(x).replace('0x', '') for x in encoded_key)
def attach_volume(self, context, **kwargs):
"""Shadows the device and passes an unencrypted version to the
instance.
@ -139,7 +151,16 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
key = self._get_key(context).get_encoded()
passphrase = self._get_passphrase(key)
self._open_volume(passphrase, **kwargs)
try:
self._open_volume(passphrase, **kwargs)
except processutils.ProcessExecutionError as e:
if e.exit_code == 2:
# NOTE(lyarwood): Workaround bug#1633518 by attempting to use
# a mangled passphrase to open the device..
LOG.info(_LI("Unable to open %s with the current passphrase, "
"attempting to use a mangled passphrase to open "
"the volume."), self.dev_path)
self._open_volume(self._get_mangled_passphrase(key), **kwargs)
# modify the original symbolic link to refer to the decrypted device
self._execute('ln', '--symbolic', '--force',

View File

@ -100,6 +100,44 @@ class LuksEncryptor(cryptsetup.CryptsetupEncryptor):
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
def _unmangle_volume(self, key, passphrase, **kwargs):
"""Workaround for bug#1633518
First identify if a mangled passphrase is used and if found then
replace with the correct unmangled version of the passphrase.
"""
mangled_passphrase = self._get_mangled_passphrase(key)
self._open_volume(mangled_passphrase, **kwargs)
self._close_volume(**kwargs)
LOG.debug("%s correctly opened with a mangled passphrase, replacing "
"this with the original passphrase", self.dev_path)
# NOTE(lyarwood): Now that we are sure that the mangled passphrase is
# used attempt to add the correct passphrase before removing the
# mangled version from the volume.
# luksAddKey currently prompts for the following input :
# Enter any existing passphrase:
# Enter new passphrase for key slot:
# Verify passphrase:
self._execute('cryptsetup', 'luksAddKey', self.dev_path,
process_input=''.join([mangled_passphrase, '\n',
passphrase, '\n', passphrase]),
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
# Verify that we can open the volume with the current passphrase
# before removing the mangled passphrase.
self._open_volume(passphrase, **kwargs)
self._close_volume(**kwargs)
# luksRemoveKey only prompts for the key to remove.
self._execute('cryptsetup', 'luksRemoveKey', self.dev_path,
process_input=mangled_passphrase,
run_as_root=True, check_exit_code=True,
root_helper=self._root_helper)
LOG.debug("%s mangled passphrase successfully replaced", self.dev_path)
def attach_volume(self, context, **kwargs):
"""Shadows the device and passes an unencrypted version to the
instance.
@ -125,6 +163,16 @@ class LuksEncryptor(cryptsetup.CryptsetupEncryptor):
self.dev_path)
self._format_volume(passphrase, **kwargs)
self._open_volume(passphrase, **kwargs)
elif e.exit_code == 2:
# NOTE(lyarwood): Workaround bug#1633518 by replacing any
# mangled passphrases that are found on the volume.
# TODO(lyarwood): Remove workaround during R.
LOG.warning(_LW("%s is not usable with the current "
"passphrase, attempting to use a mangled "
"passphrase to open the volume."),
self.dev_path)
self._unmangle_volume(key, passphrase, **kwargs)
self._open_volume(passphrase, **kwargs)
else:
raise

View File

@ -13,9 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import array
from castellan.tests.unit.key_manager import fake
import codecs
import mock
from os_brick import encryptors
@ -35,8 +33,6 @@ class VolumeEncryptorTestCase(base.TestCase):
":volume-fake_uuid-lun-1",
},
}
_hex = codecs.getdecoder("hex_codec")('0' * 32)[0]
self.encryption_key = array.array('B', _hex).tolist()
self.root_helper = None
self.keymgr = fake.fake_api()
self.encryptor = self._create()

View File

@ -18,17 +18,18 @@ import binascii
import copy
import mock
import six
import uuid
from castellan.common.objects import symmetric_key as key
from castellan.tests.unit.key_manager import fake
from os_brick.encryptors import cryptsetup
from os_brick import exception
from os_brick.tests.encryptors import test_base
from oslo_concurrency import processutils as putils
def fake__get_key(context):
raw = bytes(binascii.unhexlify('0' * 32))
def fake__get_key(context, passphrase):
raw = bytes(binascii.unhexlify(passphrase))
symmetric_key = key.SymmetricKey('AES', len(raw) * 8, raw)
return symmetric_key
@ -38,8 +39,8 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
@mock.patch('os.path.exists', return_value=False)
def _create(self, mock_exists):
return cryptsetup.CryptsetupEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
root_helper=self.root_helper,
keymgr=self.keymgr)
def setUp(self):
@ -64,14 +65,15 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume(self, mock_execute):
fake_key = uuid.uuid4().hex
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None)
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input='0' * 32,
self.dev_path, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
@ -153,3 +155,34 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
mock.call('/dev/mapper/%s' % wwn)])
mock_execute.assert_called_once_with(
'cryptsetup', 'status', wwn, run_as_root=True)
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_unmangle_passphrase(self, mock_execute):
fake_key = '0725230b'
fake_key_mangled = '72523b'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=2), # luksOpen
mock.DEFAULT,
mock.DEFAULT,
]
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'create', '--key-file=-', self.dev_name,
self.dev_path, process_input=fake_key_mangled,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
])
self.assertEqual(3, mock_execute.call_count)

View File

@ -13,9 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import mock
import uuid
from castellan.common.objects import symmetric_key as key
from os_brick.encryptors import luks
from os_brick.tests.encryptors import test_cryptsetup
from oslo_concurrency import processutils as putils
@ -79,15 +82,16 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume(self, mock_execute):
fake_key = uuid.uuid4().hex
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
test_cryptsetup.fake__get_key(None, fake_key))
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
@ -98,9 +102,10 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_not_formatted(self, mock_execute):
fake_key = uuid.uuid4().hex
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
test_cryptsetup.fake__get_key(None, fake_key))
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
@ -114,18 +119,18 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', '--batch-mode', 'luksFormat',
'--key-file=-', self.dev_path, process_input='0' * 32,
'--key-file=-', self.dev_path, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
@ -136,9 +141,10 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_fail(self, mock_execute):
fake_key = uuid.uuid4().hex
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = (
test_cryptsetup.fake__get_key(None))
test_cryptsetup.fake__get_key(None, fake_key))
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=1), # luksOpen
@ -150,7 +156,7 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
@ -177,3 +183,74 @@ class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
root_helper=self.root_helper,
attempts=3, run_as_root=True, check_exit_code=True),
])
def test_get_mangled_passphrase(self):
# Confirm that a mangled passphrase is provided as per bug#1633518
unmangled_raw_key = bytes(binascii.unhexlify('0725230b'))
symmetric_key = key.SymmetricKey('AES', len(unmangled_raw_key) * 8,
unmangled_raw_key)
unmangled_encoded_key = symmetric_key.get_encoded()
self.assertEqual(self.encryptor._get_mangled_passphrase(
unmangled_encoded_key), '72523b')
@mock.patch('os_brick.executor.Executor._execute')
def test_attach_volume_unmangle_passphrase(self, mock_execute):
fake_key = '0725230b'
fake_key_mangled = '72523b'
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = \
test_cryptsetup.fake__get_key(None, fake_key)
mock_execute.side_effect = [
putils.ProcessExecutionError(exit_code=2), # luksOpen
mock.DEFAULT, # luksOpen
mock.DEFAULT, # luksClose
mock.DEFAULT, # luksAddKey
mock.DEFAULT, # luksOpen
mock.DEFAULT, # luksClose
mock.DEFAULT, # luksRemoveKey
mock.DEFAULT, # luksOpen
mock.DEFAULT, # ln
]
self.encryptor.attach_volume(None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key_mangled,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksAddKey', self.dev_path,
process_input=''.join([fake_key_mangled,
'\n', fake_key,
'\n', fake_key]),
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksClose', self.dev_name,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True, attempts=3),
mock.call('cryptsetup', 'luksRemoveKey', self.dev_path,
process_input=fake_key_mangled,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input=fake_key,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
mock.call('ln', '--symbolic', '--force',
'/dev/mapper/%s' % self.dev_name, self.symlink_path,
root_helper=self.root_helper, run_as_root=True,
check_exit_code=True),
], any_order=False)
self.assertEqual(9, mock_execute.call_count)