Merge "Specify key algorithm and size for create_key"

This commit is contained in:
Jenkins 2016-08-18 06:49:52 +00:00 committed by Gerrit Code Review
commit 3ad7384913
3 changed files with 78 additions and 8 deletions

View File

@ -52,17 +52,19 @@ class MockKeyManager(key_mgr.KeyManager):
def __init__(self):
self.keys = {}
def _generate_hex_key(self, **kwargs):
key_length = kwargs.get('key_length', 256)
def _generate_hex_key(self, length):
if not length:
length = 256
# hex digit => 4 bits
hex_encoded = utils.generate_password(length=key_length // 4,
hex_encoded = utils.generate_password(length=length // 4,
symbolgroups='0123456789ABCDEF')
return hex_encoded
def _generate_key(self, **kwargs):
_hex = self._generate_hex_key(**kwargs)
_hex = self._generate_hex_key(kwargs.get('key_length'))
key_bytes = array.array('B', binascii.unhexlify(_hex)).tolist()
return key.SymmetricKey('AES', key_bytes)
algorithm = kwargs.get('algorithm', 'AES')
return key.SymmetricKey(algorithm, key_bytes)
def create_key(self, ctxt, **kwargs):
"""Creates a key.

View File

@ -1047,16 +1047,23 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type(self):
def test_create_volume_with_encrypted_volume_type_aes(self):
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER})
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
@ -1067,7 +1074,56 @@ class VolumeTestCase(BaseVolumeTestCase):
'name',
'description',
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get_key(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.key) * 8)
self.assertEqual('aes', key.alg)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertEqual(cipher, metadata.get('cipher'))
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_blowfish(self):
ctxt = context.get_admin_context()
cipher = 'blowfish-cbc'
key_size = 32
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(ctxt, 'LUKS')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get_key(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.key) * 8)
self.assertEqual('blowfish', key.alg)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertEqual(cipher, metadata.get('cipher'))
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_volume_with_provider_id(self):

View File

@ -357,7 +357,19 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
encryption_key_id = key_manager.copy_key(context,
encryption_key_id)
else:
encryption_key_id = key_manager.create_key(context)
volume_type_encryption = (
volume_types.get_volume_type_encryption(context,
volume_type_id))
cipher = volume_type_encryption.cipher
length = volume_type_encryption.key_size
# NOTE(kaitlin-farr): dm-crypt expects the cipher in a
# hyphenated format (aes-xts-plain64). The algorithm needs
# to be parsed out to pass to the key manager (aes).
algorithm = cipher.split('-')[0] if cipher else None
encryption_key_id = key_manager.create_key(context,
algorithm=algorithm,
key_length=length)
return encryption_key_id