Merge "Abort volume creation when encryption spec is invalid"

This commit is contained in:
Zuul 2021-07-09 15:00:27 +00:00 committed by Gerrit Code Review
commit 1b5aad85d0
4 changed files with 54 additions and 7 deletions

View File

@ -1035,7 +1035,12 @@ def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]:
kernel source tree. Cinder does not support the [:keycount] or
[:ivopts] options.
"""
cipher_alg, cipher_mode, ivgen_alg = cipher_spec.split('-')
try:
cipher_alg, cipher_mode, ivgen_alg = cipher_spec.split('-')
except ValueError:
raise exception.InvalidVolumeType(
reason="Invalid cipher field in encryption type")
cipher_alg = cipher_alg + '-' + str(key_size)
return {'cipher_alg': cipher_alg,

View File

@ -2163,3 +2163,9 @@ class TestImageUtils(test.TestCase):
'ivgen_alg': 'essiv'}
result = image_utils.decode_cipher('aes-xts-essiv', 256)
self.assertEqual(expected, result)
def test_decode_cipher_invalid(self):
self.assertRaises(exception.InvalidVolumeType,
image_utils.decode_cipher,
'aes',
256)

View File

@ -983,15 +983,15 @@ class VolumeUtilsTestCase(test.TestCase):
def test_create_encryption_key_encrypted(self, create_key,
get_volume_type_encryption,
is_encryption):
enc_key = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
enc_spec = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
ctxt = context.get_admin_context()
type_ref1 = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref1['id'], enc_key)
ctxt, type_ref1['id'], enc_spec)
get_volume_type_encryption.return_value = encryption
CONF.set_override(
'backend',
@ -1010,6 +1010,39 @@ class VolumeUtilsTestCase(test.TestCase):
algorithm='aes',
length=256)
@mock.patch('cinder.volume.volume_types.is_encrypted', return_value=True)
@mock.patch('cinder.volume.volume_types.get_volume_type_encryption')
@mock.patch('cinder.keymgr.conf_key_mgr.ConfKeyManager.create_key')
def test_create_encryption_key_invalid_spec(self, create_key,
get_volume_type_encryption,
is_encryption):
enc_spec = {'cipher': None,
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
ctxt = context.get_admin_context()
type_ref1 = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref1['id'], enc_spec)
get_volume_type_encryption.return_value = encryption
CONF.set_override(
'backend',
'cinder.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
km = key_manager.API()
self.assertRaises(exception.Invalid,
volume_utils.create_encryption_key,
ctxt,
km,
fake.VOLUME_TYPE_ID)
is_encryption.assert_called_once_with(ctxt,
fake.VOLUME_TYPE_ID)
get_volume_type_encryption.assert_called_once_with(
ctxt,
fake.VOLUME_TYPE_ID)
create_key.assert_not_called()
@ddt.data('<is> True', '<is> true', '<is> yes')
def test_is_replicated_spec_true(self, enabled):
res = volume_utils.is_replicated_spec({'replication_enabled': enabled})

View File

@ -1010,6 +1010,9 @@ def create_encryption_key(context: context.RequestContext,
cipher = volume_type_encryption.cipher
length = volume_type_encryption.key_size
algorithm = cipher.split('-')[0] if cipher else None
if algorithm is None:
raise exception.InvalidVolumeType(
message="Invalid encryption spec")
try:
encryption_key_id = key_manager.create_key(
context,