Merge "V3 jsonschema validation: volume_type_encryption"
This commit is contained in:
commit
b7097ad60b
@ -20,18 +20,16 @@ import webob
|
|||||||
|
|
||||||
from cinder.api import extensions
|
from cinder.api import extensions
|
||||||
from cinder.api.openstack import wsgi
|
from cinder.api.openstack import wsgi
|
||||||
|
from cinder.api.schemas import volume_type_encryption
|
||||||
|
from cinder.api import validation
|
||||||
from cinder import db
|
from cinder import db
|
||||||
from cinder import exception
|
from cinder import exception
|
||||||
from cinder.i18n import _
|
from cinder.i18n import _
|
||||||
from cinder.policies import volume_type as policy
|
from cinder.policies import volume_type as policy
|
||||||
from cinder import rpc
|
from cinder import rpc
|
||||||
from cinder import utils
|
|
||||||
from cinder.volume import volume_types
|
from cinder.volume import volume_types
|
||||||
|
|
||||||
|
|
||||||
CONTROL_LOCATION = ['front-end', 'back-end']
|
|
||||||
|
|
||||||
|
|
||||||
class VolumeTypeEncryptionController(wsgi.Controller):
|
class VolumeTypeEncryptionController(wsgi.Controller):
|
||||||
"""The volume type encryption API controller for the OpenStack API."""
|
"""The volume type encryption API controller for the OpenStack API."""
|
||||||
|
|
||||||
@ -48,28 +46,6 @@ class VolumeTypeEncryptionController(wsgi.Controller):
|
|||||||
# Not found exception will be handled at the wsgi level
|
# Not found exception will be handled at the wsgi level
|
||||||
volume_types.get_volume_type(context, type_id)
|
volume_types.get_volume_type(context, type_id)
|
||||||
|
|
||||||
def _check_encryption_input(self, encryption, create=True):
|
|
||||||
if encryption.get('key_size') is not None:
|
|
||||||
encryption['key_size'] = utils.validate_integer(
|
|
||||||
encryption['key_size'], 'key_size',
|
|
||||||
min_value=0, max_value=db.MAX_INT)
|
|
||||||
|
|
||||||
if create:
|
|
||||||
msg = None
|
|
||||||
if 'provider' not in encryption.keys():
|
|
||||||
msg = _('provider must be defined')
|
|
||||||
elif 'control_location' not in encryption.keys():
|
|
||||||
msg = _('control_location must be defined')
|
|
||||||
|
|
||||||
if msg is not None:
|
|
||||||
raise exception.InvalidInput(reason=msg)
|
|
||||||
|
|
||||||
# Check control location
|
|
||||||
if 'control_location' in encryption.keys():
|
|
||||||
if encryption['control_location'] not in CONTROL_LOCATION:
|
|
||||||
msg = _("Valid control location are: %s") % CONTROL_LOCATION
|
|
||||||
raise exception.InvalidInput(reason=msg)
|
|
||||||
|
|
||||||
def _encrypted_type_in_use(self, context, volume_type_id):
|
def _encrypted_type_in_use(self, context, volume_type_id):
|
||||||
volume_list = db.volume_type_encryption_volume_get(context,
|
volume_list = db.volume_type_encryption_volume_get(context,
|
||||||
volume_type_id)
|
volume_type_id)
|
||||||
@ -87,17 +63,20 @@ class VolumeTypeEncryptionController(wsgi.Controller):
|
|||||||
self._check_type(context, type_id)
|
self._check_type(context, type_id)
|
||||||
return self._get_volume_type_encryption(context, type_id)
|
return self._get_volume_type_encryption(context, type_id)
|
||||||
|
|
||||||
def create(self, req, type_id, body=None):
|
@validation.schema(volume_type_encryption.create)
|
||||||
|
def create(self, req, type_id, body):
|
||||||
"""Create encryption specs for an existing volume type."""
|
"""Create encryption specs for an existing volume type."""
|
||||||
context = req.environ['cinder.context']
|
context = req.environ['cinder.context']
|
||||||
context.authorize(policy.ENCRYPTION_POLICY)
|
context.authorize(policy.ENCRYPTION_POLICY)
|
||||||
|
|
||||||
|
key_size = body['encryption'].get('key_size')
|
||||||
|
if key_size is not None:
|
||||||
|
body['encryption']['key_size'] = int(key_size)
|
||||||
|
|
||||||
if self._encrypted_type_in_use(context, type_id):
|
if self._encrypted_type_in_use(context, type_id):
|
||||||
expl = _('Cannot create encryption specs. Volume type in use.')
|
expl = _('Cannot create encryption specs. Volume type in use.')
|
||||||
raise webob.exc.HTTPBadRequest(explanation=expl)
|
raise webob.exc.HTTPBadRequest(explanation=expl)
|
||||||
|
|
||||||
self.assert_valid_body(body, 'encryption')
|
|
||||||
|
|
||||||
self._check_type(context, type_id)
|
self._check_type(context, type_id)
|
||||||
|
|
||||||
encryption_specs = self._get_volume_type_encryption(context, type_id)
|
encryption_specs = self._get_volume_type_encryption(context, type_id)
|
||||||
@ -106,24 +85,21 @@ class VolumeTypeEncryptionController(wsgi.Controller):
|
|||||||
|
|
||||||
encryption_specs = body['encryption']
|
encryption_specs = body['encryption']
|
||||||
|
|
||||||
self._check_encryption_input(encryption_specs)
|
|
||||||
|
|
||||||
db.volume_type_encryption_create(context, type_id, encryption_specs)
|
db.volume_type_encryption_create(context, type_id, encryption_specs)
|
||||||
notifier_info = dict(type_id=type_id, specs=encryption_specs)
|
notifier_info = dict(type_id=type_id, specs=encryption_specs)
|
||||||
notifier = rpc.get_notifier('volumeTypeEncryption')
|
notifier = rpc.get_notifier('volumeTypeEncryption')
|
||||||
notifier.info(context, 'volume_type_encryption.create', notifier_info)
|
notifier.info(context, 'volume_type_encryption.create', notifier_info)
|
||||||
return body
|
return body
|
||||||
|
|
||||||
def update(self, req, type_id, id, body=None):
|
@validation.schema(volume_type_encryption.update)
|
||||||
|
def update(self, req, type_id, id, body):
|
||||||
"""Update encryption specs for a given volume type."""
|
"""Update encryption specs for a given volume type."""
|
||||||
context = req.environ['cinder.context']
|
context = req.environ['cinder.context']
|
||||||
context.authorize(policy.ENCRYPTION_POLICY)
|
context.authorize(policy.ENCRYPTION_POLICY)
|
||||||
|
|
||||||
self.assert_valid_body(body, 'encryption')
|
key_size = body['encryption'].get('key_size')
|
||||||
|
if key_size is not None:
|
||||||
if len(body) > 1:
|
body['encryption']['key_size'] = int(key_size)
|
||||||
expl = _('Request body contains too many items.')
|
|
||||||
raise webob.exc.HTTPBadRequest(explanation=expl)
|
|
||||||
|
|
||||||
self._check_type(context, type_id)
|
self._check_type(context, type_id)
|
||||||
|
|
||||||
@ -132,7 +108,6 @@ class VolumeTypeEncryptionController(wsgi.Controller):
|
|||||||
raise webob.exc.HTTPBadRequest(explanation=expl)
|
raise webob.exc.HTTPBadRequest(explanation=expl)
|
||||||
|
|
||||||
encryption_specs = body['encryption']
|
encryption_specs = body['encryption']
|
||||||
self._check_encryption_input(encryption_specs, create=False)
|
|
||||||
|
|
||||||
db.volume_type_encryption_update(context, type_id, encryption_specs)
|
db.volume_type_encryption_update(context, type_id, encryption_specs)
|
||||||
notifier_info = dict(type_id=type_id, id=id)
|
notifier_info = dict(type_id=type_id, id=id)
|
||||||
|
47
cinder/api/schemas/volume_type_encryption.py
Normal file
47
cinder/api/schemas/volume_type_encryption.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
# Copyright (C) 2018 NTT DATA
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Schema for V3 volume type encryption API.
|
||||||
|
|
||||||
|
"""
|
||||||
|
import copy
|
||||||
|
|
||||||
|
from cinder.api.validation import parameter_types
|
||||||
|
|
||||||
|
create = {
|
||||||
|
'type': 'object',
|
||||||
|
'properties': {
|
||||||
|
'encryption': {
|
||||||
|
'type': 'object',
|
||||||
|
'properties': {
|
||||||
|
'key_size': parameter_types.key_size,
|
||||||
|
'provider': {'type': 'string', 'minLength': 0,
|
||||||
|
'maxLength': 255},
|
||||||
|
'control_location': {'enum': ['front-end', 'back-end']},
|
||||||
|
'cipher': {'type': ['string', 'null'],
|
||||||
|
'minLength': 0, 'maxLength': 255},
|
||||||
|
},
|
||||||
|
'required': ['provider', 'control_location'],
|
||||||
|
'additionalProperties': False,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'required': ['encryption'],
|
||||||
|
'additionalProperties': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
update = copy.deepcopy(create)
|
||||||
|
update['properties']['encryption']['required'] = []
|
@ -256,3 +256,9 @@ binary = {
|
|||||||
'type': 'string',
|
'type': 'string',
|
||||||
'enum': [binary for binary in constants.LOG_BINARIES + ('', '*')]
|
'enum': [binary for binary in constants.LOG_BINARIES + ('', '*')]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
key_size = {'type': ['string', 'integer', 'null'],
|
||||||
|
'minimum': 0,
|
||||||
|
'maximum': constants.DB_MAX_INT,
|
||||||
|
'format': 'key_size'}
|
||||||
|
@ -23,6 +23,7 @@ import re
|
|||||||
|
|
||||||
import jsonschema
|
import jsonschema
|
||||||
from jsonschema import exceptions as jsonschema_exc
|
from jsonschema import exceptions as jsonschema_exc
|
||||||
|
from oslo_utils import strutils
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
import six
|
import six
|
||||||
@ -372,6 +373,15 @@ def _validate_backup_status(param_value):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@jsonschema.FormatChecker.cls_checks('key_size')
|
||||||
|
def _validate_key_size(param_value):
|
||||||
|
if param_value is not None:
|
||||||
|
if not strutils.is_int_like(param_value):
|
||||||
|
raise exception.InvalidInput(reason=(
|
||||||
|
_('key_size must be an integer.')))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class FormatChecker(jsonschema.FormatChecker):
|
class FormatChecker(jsonschema.FormatChecker):
|
||||||
"""A FormatChecker can output the message from cause exception
|
"""A FormatChecker can output the message from cause exception
|
||||||
|
|
||||||
|
@ -13,8 +13,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import mock
|
|
||||||
|
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
from six.moves import http_client
|
from six.moves import http_client
|
||||||
import webob
|
import webob
|
||||||
@ -24,7 +22,6 @@ from cinder import db
|
|||||||
from cinder import test
|
from cinder import test
|
||||||
from cinder.tests.unit.api import fakes
|
from cinder.tests.unit.api import fakes
|
||||||
from cinder.tests.unit import fake_constants as fake
|
from cinder.tests.unit import fake_constants as fake
|
||||||
from cinder import utils
|
|
||||||
|
|
||||||
|
|
||||||
def return_volume_type_encryption(context, volume_type_id):
|
def return_volume_type_encryption(context, volume_type_id):
|
||||||
@ -36,8 +33,7 @@ def fake_volume_type_encryption():
|
|||||||
'cipher': 'fake_cipher',
|
'cipher': 'fake_cipher',
|
||||||
'control_location': 'front-end',
|
'control_location': 'front-end',
|
||||||
'key_size': 256,
|
'key_size': 256,
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'
|
||||||
'volume_type_id': fake.VOLUME_TYPE_ID
|
|
||||||
}
|
}
|
||||||
return values
|
return values
|
||||||
|
|
||||||
@ -162,8 +158,8 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
body = {"encryption": {'cipher': cipher,
|
body = {"encryption": {'cipher': cipher,
|
||||||
'control_location': control_location,
|
'control_location': control_location,
|
||||||
'key_size': key_size,
|
'key_size': key_size,
|
||||||
'provider': provider,
|
'provider': provider
|
||||||
'volume_type_id': volume_type['id']}}
|
}}
|
||||||
|
|
||||||
self.assertEqual(0, len(self.notifier.notifications))
|
self.assertEqual(0, len(self.notifier.notifications))
|
||||||
res = self._get_response(volume_type)
|
res = self._get_response(volume_type)
|
||||||
@ -189,8 +185,6 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
res_dict['encryption']['control_location'])
|
res_dict['encryption']['control_location'])
|
||||||
self.assertEqual(key_size, res_dict['encryption']['key_size'])
|
self.assertEqual(key_size, res_dict['encryption']['key_size'])
|
||||||
self.assertEqual(provider, res_dict['encryption']['provider'])
|
self.assertEqual(provider, res_dict['encryption']['provider'])
|
||||||
self.assertEqual(volume_type['id'],
|
|
||||||
res_dict['encryption']['volume_type_id'])
|
|
||||||
|
|
||||||
# check database
|
# check database
|
||||||
encryption = db.volume_type_encryption_get(context.get_admin_context(),
|
encryption = db.volume_type_encryption_get(context.get_admin_context(),
|
||||||
@ -199,17 +193,9 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
self.assertEqual(cipher, encryption['cipher'])
|
self.assertEqual(cipher, encryption['cipher'])
|
||||||
self.assertEqual(key_size, encryption['key_size'])
|
self.assertEqual(key_size, encryption['key_size'])
|
||||||
self.assertEqual(provider, encryption['provider'])
|
self.assertEqual(provider, encryption['provider'])
|
||||||
self.assertEqual(volume_type['id'], encryption['volume_type_id'])
|
|
||||||
|
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
def test_create_json(self):
|
|
||||||
with mock.patch.object(utils,
|
|
||||||
'validate_integer') as mock_validate_integer:
|
|
||||||
mock_validate_integer.return_value = 128
|
|
||||||
self._create('fake_cipher', 'front-end', 128, 'fake_encryptor')
|
|
||||||
self.assertTrue(mock_validate_integer.called)
|
|
||||||
|
|
||||||
def test_create_invalid_volume_type(self):
|
def test_create_invalid_volume_type(self):
|
||||||
volume_type = self._default_volume_type
|
volume_type = self._default_volume_type
|
||||||
body = {"encryption": fake_volume_type_encryption()}
|
body = {"encryption": fake_volume_type_encryption()}
|
||||||
@ -270,8 +256,8 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
'key_size': 128,
|
'key_size': 128,
|
||||||
'control_location': 'front-end',
|
'control_location': 'front-end',
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'
|
||||||
'volume_type_id': volume_type['id']}}
|
}}
|
||||||
|
|
||||||
# Try to create encryption specs for a volume type
|
# Try to create encryption specs for a volume type
|
||||||
# with a volume.
|
# with a volume.
|
||||||
@ -291,8 +277,7 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
db.volume_destroy(context.get_admin_context(), fake.VOLUME_ID)
|
db.volume_destroy(context.get_admin_context(), fake.VOLUME_ID)
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
def _encryption_create_bad_body(self, body,
|
def _encryption_create_bad_body(self, body):
|
||||||
msg='Create body is not valid.'):
|
|
||||||
|
|
||||||
volume_type = self._default_volume_type
|
volume_type = self._default_volume_type
|
||||||
db.volume_type_create(context.get_admin_context(), volume_type)
|
db.volume_type_create(context.get_admin_context(), volume_type)
|
||||||
@ -302,31 +287,40 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
|
|
||||||
res_dict = jsonutils.loads(res.body)
|
res_dict = jsonutils.loads(res.body)
|
||||||
|
|
||||||
expected = {
|
self.assertEqual(http_client.BAD_REQUEST,
|
||||||
'badRequest': {
|
res_dict['badRequest']['code'])
|
||||||
'code': http_client.BAD_REQUEST,
|
|
||||||
'message': (msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.assertEqual(expected, res_dict)
|
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
def test_create_no_body(self):
|
def test_create_no_body(self):
|
||||||
msg = "Missing required element 'encryption' in request body."
|
self._encryption_create_bad_body(body=None)
|
||||||
self._encryption_create_bad_body(body=None, msg=msg)
|
|
||||||
|
|
||||||
def test_create_malformed_entity(self):
|
def test_create_malformed_entity(self):
|
||||||
body = {'encryption': 'string'}
|
body = {'encryption': 'string'}
|
||||||
msg = "Missing required element 'encryption' in request body."
|
self._encryption_create_bad_body(body=body)
|
||||||
self._encryption_create_bad_body(body=body, msg=msg)
|
|
||||||
|
|
||||||
def test_create_negative_key_size(self):
|
def test_create_negative_key_size(self):
|
||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
'key_size': -128,
|
'key_size': -128,
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider',
|
||||||
'volume_type_id': fake.VOLUME_TYPE_ID}}
|
'control_location': 'front-end'
|
||||||
msg = 'key_size must be >= 0'
|
}}
|
||||||
self._encryption_create_bad_body(body=body, msg=msg)
|
self._encryption_create_bad_body(body=body)
|
||||||
|
|
||||||
|
def test_create_with_minimum_key_size(self):
|
||||||
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
|
'key_size': '-1',
|
||||||
|
'provider': 'fake_provider',
|
||||||
|
'control_location': 'front-end'
|
||||||
|
}}
|
||||||
|
self._encryption_create_bad_body(body=body)
|
||||||
|
|
||||||
|
def test_create_with_maximum_key_size(self):
|
||||||
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
|
'key_size': '12345678788',
|
||||||
|
'provider': 'fake_provider',
|
||||||
|
'control_location': 'front-end'
|
||||||
|
}}
|
||||||
|
self._encryption_create_bad_body(body=body)
|
||||||
|
|
||||||
def test_create_none_key_size(self):
|
def test_create_none_key_size(self):
|
||||||
self._create('fake_cipher', 'front-end', None, 'fake_encryptor')
|
self._create('fake_cipher', 'front-end', None, 'fake_encryptor')
|
||||||
@ -334,17 +328,17 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
def test_create_invalid_control_location(self):
|
def test_create_invalid_control_location(self):
|
||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
'control_location': 'fake_control',
|
'control_location': 'fake_control',
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'
|
||||||
'volume_type_id': fake.VOLUME_TYPE_ID}}
|
}}
|
||||||
msg = ("Invalid input received: Valid control location are: "
|
self._encryption_create_bad_body(body=body)
|
||||||
"['front-end', 'back-end']")
|
|
||||||
self._encryption_create_bad_body(body=body, msg=msg)
|
|
||||||
|
|
||||||
def test_create_no_provider(self):
|
def test_create_no_provider(self):
|
||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher'}}
|
||||||
'volume_type_id': fake.VOLUME_TYPE_ID}}
|
self._encryption_create_bad_body(body=body)
|
||||||
msg = ("Invalid input received: provider must be defined")
|
|
||||||
self._encryption_create_bad_body(body=body, msg=msg)
|
def test_create_no_control_location(self):
|
||||||
|
body = {"encryption": {'provider': 'fake_provider'}}
|
||||||
|
self._encryption_create_bad_body(body=body)
|
||||||
|
|
||||||
def test_delete(self):
|
def test_delete(self):
|
||||||
volume_type = self._default_volume_type
|
volume_type = self._default_volume_type
|
||||||
@ -359,8 +353,7 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
'key_size': 128,
|
'key_size': 128,
|
||||||
'control_location': 'front-end',
|
'control_location': 'front-end',
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'}}
|
||||||
'volume_type_id': volume_type['id']}}
|
|
||||||
|
|
||||||
# Create, and test that get returns something
|
# Create, and test that get returns something
|
||||||
res = self._get_response(volume_type, req_method='POST',
|
res = self._get_response(volume_type, req_method='POST',
|
||||||
@ -398,8 +391,8 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
body = {"encryption": {'cipher': 'cipher',
|
body = {"encryption": {'cipher': 'cipher',
|
||||||
'key_size': 128,
|
'key_size': 128,
|
||||||
'control_location': 'front-end',
|
'control_location': 'front-end',
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'
|
||||||
'volume_type_id': volume_type['id']}}
|
}}
|
||||||
|
|
||||||
# Create encryption with volume type, and test with GET
|
# Create encryption with volume type, and test with GET
|
||||||
res = self._get_response(volume_type, req_method='POST',
|
res = self._get_response(volume_type, req_method='POST',
|
||||||
@ -486,17 +479,14 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
self.assertEqual(expected, jsonutils.loads(res.body))
|
self.assertEqual(expected, jsonutils.loads(res.body))
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
@mock.patch('cinder.utils.validate_integer')
|
def test_update_item(self):
|
||||||
def test_update_item(self, mock_validate_integer):
|
|
||||||
mock_validate_integer.return_value = 512
|
|
||||||
volume_type = self._default_volume_type
|
volume_type = self._default_volume_type
|
||||||
|
|
||||||
# Create Encryption Specs
|
# Create Encryption Specs
|
||||||
create_body = {"encryption": {'cipher': 'cipher',
|
create_body = {"encryption": {'cipher': 'cipher',
|
||||||
'control_location': 'front-end',
|
'control_location': 'front-end',
|
||||||
'key_size': 128,
|
'key_size': 128,
|
||||||
'provider': 'fake_provider',
|
'provider': 'fake_provider'}}
|
||||||
'volume_type_id': volume_type['id']}}
|
|
||||||
self._create_type_and_encryption(volume_type, create_body)
|
self._create_type_and_encryption(volume_type, create_body)
|
||||||
|
|
||||||
# Update Encryption Specs
|
# Update Encryption Specs
|
||||||
@ -521,11 +511,10 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
# Confirm Encryption Specs
|
# Confirm Encryption Specs
|
||||||
self.assertEqual(512, res_dict['key_size'])
|
self.assertEqual(512, res_dict['key_size'])
|
||||||
self.assertEqual('fake_provider2', res_dict['provider'])
|
self.assertEqual('fake_provider2', res_dict['provider'])
|
||||||
self.assertTrue(mock_validate_integer.called)
|
|
||||||
|
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
def _encryption_update_bad_body(self, update_body, msg):
|
def _encryption_update_bad_body(self, update_body):
|
||||||
|
|
||||||
# Create Volume Type and Encryption
|
# Create Volume Type and Encryption
|
||||||
volume_type = self._default_volume_type
|
volume_type = self._default_volume_type
|
||||||
@ -539,36 +528,26 @@ class VolumeTypeEncryptionTest(test.TestCase):
|
|||||||
fake.ENCRYPTION_KEY_ID)
|
fake.ENCRYPTION_KEY_ID)
|
||||||
res_dict = jsonutils.loads(res.body)
|
res_dict = jsonutils.loads(res.body)
|
||||||
|
|
||||||
expected = {
|
|
||||||
'badRequest': {
|
|
||||||
'code': http_client.BAD_REQUEST,
|
|
||||||
'message': (msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Confirm Failure
|
# Confirm Failure
|
||||||
self.assertEqual(expected, res_dict)
|
self.assertEqual(http_client.BAD_REQUEST,
|
||||||
|
res_dict['badRequest']['code'])
|
||||||
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
|
||||||
|
|
||||||
def test_update_too_many_items(self):
|
def test_update_too_many_items(self):
|
||||||
update_body = {"encryption": {'key_size': 512},
|
update_body = {"encryption": {'key_size': 512},
|
||||||
"encryption2": {'key_size': 256}}
|
"encryption2": {'key_size': 256}}
|
||||||
msg = 'Request body contains too many items.'
|
self._encryption_update_bad_body(update_body)
|
||||||
self._encryption_update_bad_body(update_body, msg)
|
|
||||||
|
|
||||||
def test_update_key_size_non_integer(self):
|
def test_update_key_size_non_integer(self):
|
||||||
update_body = {"encryption": {'key_size': 'abc'}}
|
update_body = {"encryption": {'key_size': 'abc'}}
|
||||||
msg = 'key_size must be an integer'
|
self._encryption_update_bad_body(update_body)
|
||||||
self._encryption_update_bad_body(update_body, msg)
|
|
||||||
|
|
||||||
def test_update_item_invalid_body(self):
|
def test_update_item_invalid_body(self):
|
||||||
update_body = {"key_size": "value1"}
|
update_body = {"key_size": "value1"}
|
||||||
msg = "Missing required element 'encryption' in request body."
|
self._encryption_update_bad_body(update_body)
|
||||||
self._encryption_update_bad_body(update_body, msg)
|
|
||||||
|
|
||||||
def _encryption_empty_update(self, update_body):
|
def _encryption_empty_update(self, update_body):
|
||||||
msg = "Missing required element 'encryption' in request body."
|
self._encryption_update_bad_body(update_body)
|
||||||
self._encryption_update_bad_body(update_body, msg)
|
|
||||||
|
|
||||||
def test_update_no_body(self):
|
def test_update_no_body(self):
|
||||||
self._encryption_empty_update(update_body=None)
|
self._encryption_empty_update(update_body=None)
|
||||||
|
Loading…
Reference in New Issue
Block a user