Add check_encryption_provider to volume utils

Change-Id: Id6df9534ca2c653d0e61cceb49dc54f2025159ba
This commit is contained in:
Eric Harney 2018-08-29 15:20:07 -04:00
parent e8fe5aaf6a
commit 10bf2e63e2
2 changed files with 80 additions and 0 deletions

View File

@ -1165,3 +1165,47 @@ class VolumeUtilsTestCase(test.TestCase):
else:
fake_driver.copy_image_to_volume.assert_called_once_with(
ctxt, volume, fake_image_service, image_id)
@ddt.data({'cipher': 'aes-xts-plain64',
'provider': 'luks'},
{'cipher': 'aes-xts-plain64',
'provider': 'nova.volume.encryptors.luks.LuksEncryptor'})
def test_check_encryption_provider(self, encryption_metadata):
ctxt = context.get_admin_context()
type_ref = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref['id'], encryption_metadata)
with mock.patch(
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
return_value=encryption):
volume_data = {'id': fake.VOLUME_ID,
'volume_type_id': type_ref['id']}
ctxt = context.get_admin_context()
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
ret = volume_utils.check_encryption_provider(
db,
volume,
mock.sentinel.context)
self.assertEqual('aes-xts-plain64', ret['cipher'])
def test_check_encryption_provider_invalid(self):
encryption_metadata = {'cipher': 'aes-xts-plain64',
'provider': 'invalid'}
ctxt = context.get_admin_context()
type_ref = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref['id'], encryption_metadata)
with mock.patch(
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
return_value=encryption):
volume_data = {'id': fake.VOLUME_ID,
'volume_type_id': type_ref['id']}
ctxt = context.get_admin_context()
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
self.assertRaises(exception.VolumeDriverException,
volume_utils.check_encryption_provider,
db,
volume,
mock.sentinel.context)

View File

@ -20,8 +20,10 @@ import functools
import json
import math
import operator
import os
from os import urandom
import re
import tempfile
import time
import uuid
@ -31,6 +33,7 @@ from castellan import key_manager as castellan_key_manager
import eventlet
from eventlet import tpool
from keystoneauth1 import loading as ks_loading
from os_brick import encryptors
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
@ -1173,3 +1176,36 @@ def copy_image_to_volume(driver, context, volume, image_meta, image_location,
" to volume %(volume_id)s successfully.",
{'image_id': image_id, 'volume_id': volume.id,
'image_location': image_location})
def _image_conversion_dir():
tmpdir = (CONF.image_conversion_dir or
tempfile.gettempdir())
# ensure temporary directory exists
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)
return tmpdir
def check_encryption_provider(db, volume, context):
"""Check that this is a LUKS encryption provider.
:returns: encryption dict
"""
encryption = db.volume_encryption_metadata_get(context, volume.id)
provider = encryption['provider']
if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
if provider != encryptors.LUKS:
message = _("Provider %s not supported.") % provider
raise exception.VolumeDriverException(message=message)
if 'cipher' not in encryption or 'key_size' not in encryption:
msg = _('encryption spec must contain "cipher" and '
'"key_size"')
raise exception.VolumeDriverException(message=msg)
return encryption