Reuse API v2 for v1 calls

The v2 API code can be reused for the v1 API with just minor
tweaks to the returned JSON and a couple changes in the HTTP
response code.

Change-Id: I852a383bf8041220490abde74dfe0a65376b924e
Closes-bug: #1627921
This commit is contained in:
Sean McGinnis 2017-03-02 03:59:05 +00:00 committed by Sean McGinnis
parent c50e4b2e6a
commit d6980a5575
10 changed files with 597 additions and 2026 deletions

View File

@ -22,11 +22,11 @@ WSGI middleware for OpenStack Volume API.
from cinder.api import extensions
import cinder.api.openstack
from cinder.api.v1 import snapshots
from cinder.api.v1 import types
from cinder.api.v1 import volume_metadata
from cinder.api.v1 import volumes
from cinder.api.v2 import limits
from cinder.api.v2 import snapshot_metadata
from cinder.api.v2 import types
from cinder.api.v2 import volume_metadata
from cinder.api import versions

View File

@ -15,188 +15,89 @@
"""The volumes snapshots api."""
from oslo_log import log as logging
from oslo_utils import strutils
import webob
from webob import exc
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder import exception
from cinder.i18n import _, _LI
from cinder import utils
from cinder import volume
from cinder.api.v2 import snapshots as snapshots_v2
LOG = logging.getLogger(__name__)
def _snapshot_v2_to_v1(snapv2_result):
"""Transform a v2 snapshot dict to v1."""
snapshots = snapv2_result.get('snapshots')
if snapshots is None:
snapshots = [snapv2_result['snapshot']]
for snapv1 in snapshots:
# The updated_at property was added in v2
snapv1.pop('updated_at', None)
# Name and description were renamed
snapv1['display_name'] = snapv1.pop('name', '')
snapv1['display_description'] = snapv1.pop('description', '')
return snapv2_result
def _translate_snapshot_detail_view(snapshot):
"""Maps keys for snapshots details view."""
def _update_search_opts(req):
"""Update the requested search options.
d = _translate_snapshot_summary_view(snapshot)
# NOTE(gagupta): No additional data / lookups at the moment
return d
This is a little silly, as ``display_name`` needs to be switched
to just ``name``, which internally to v2 gets switched to be
``display_name``. Oh well.
"""
if 'display_name' in req.GET:
req.GET['name'] = req.GET.pop('display_name')
return req
def _translate_snapshot_summary_view(snapshot):
"""Maps keys for snapshots summary view."""
d = {}
d['id'] = snapshot['id']
d['created_at'] = snapshot['created_at']
d['display_name'] = snapshot['display_name']
d['display_description'] = snapshot['display_description']
d['volume_id'] = snapshot['volume_id']
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
if snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
dict):
d['metadata'] = snapshot['metadata']
else:
d['metadata'] = {}
return d
class SnapshotsController(wsgi.Controller):
class SnapshotsController(snapshots_v2.SnapshotsController):
"""The Snapshots API controller for the OpenStack API."""
def __init__(self, ext_mgr=None):
self.volume_api = volume.API()
self.ext_mgr = ext_mgr
super(SnapshotsController, self).__init__()
def show(self, req, id):
"""Return data about the given snapshot."""
context = req.environ['cinder.context']
# Not found exception will be handled at the wsgi level
snapshot = self.volume_api.get_snapshot(context, id)
req.cache_db_snapshot(snapshot)
return {'snapshot': _translate_snapshot_detail_view(snapshot)}
def delete(self, req, id):
"""Delete a snapshot."""
context = req.environ['cinder.context']
LOG.info(_LI("Delete snapshot with id: %s"), id)
# Not found exception will be handled at the wsgi level
snapshot = self.volume_api.get_snapshot(context, id)
self.volume_api.delete_snapshot(context, snapshot)
return webob.Response(status_int=202)
result = super(SnapshotsController, self).show(req, id)
return _snapshot_v2_to_v1(result)
def index(self, req):
"""Returns a summary list of snapshots."""
return self._items(req, entity_maker=_translate_snapshot_summary_view)
return _snapshot_v2_to_v1(
super(SnapshotsController, self).index(
_update_search_opts(req)))
def detail(self, req):
"""Returns a detailed list of snapshots."""
return self._items(req, entity_maker=_translate_snapshot_detail_view)
def _items(self, req, entity_maker):
"""Returns a list of snapshots, transformed through entity_maker."""
context = req.environ['cinder.context']
# pop out limit and offset , they are not search_opts
search_opts = req.GET.copy()
search_opts.pop('limit', None)
search_opts.pop('offset', None)
# filter out invalid option
allowed_search_options = ('status', 'volume_id', 'display_name')
utils.remove_invalid_filter_options(context, search_opts,
allowed_search_options)
snapshots = self.volume_api.get_all_snapshots(context,
search_opts=search_opts)
limited_list = common.limited(snapshots.objects, req)
req.cache_db_snapshots(limited_list)
res = [entity_maker(snapshot) for snapshot in limited_list]
return {'snapshots': res}
return _snapshot_v2_to_v1(
super(SnapshotsController, self).detail(
_update_search_opts(req)))
@wsgi.response(200)
def create(self, req, body):
"""Creates a new snapshot."""
kwargs = {}
context = req.environ['cinder.context']
if not self.is_valid_body(body, 'snapshot'):
if (body is None or not body.get('snapshot') or
not isinstance(body['snapshot'], dict)):
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
kwargs['metadata'] = snapshot.get('metadata', None)
if 'display_name' in body['snapshot']:
body['snapshot']['name'] = body['snapshot'].pop('display_name')
try:
volume_id = snapshot['volume_id']
except KeyError:
msg = _("'volume_id' must be specified")
raise exc.HTTPBadRequest(explanation=msg)
if 'display_description' in body['snapshot']:
body['snapshot']['description'] = body['snapshot'].pop(
'display_description')
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, volume_id)
if 'metadata' not in body['snapshot']:
body['snapshot']['metadata'] = {}
force = snapshot.get('force', False)
msg = _LI("Create snapshot from volume %s")
LOG.info(msg, volume_id)
if not strutils.is_valid_boolstr(force):
msg = _("Invalid value '%s' for force. ") % force
raise exception.InvalidParameterValue(err=msg)
if strutils.bool_from_string(force):
new_snapshot = self.volume_api.create_snapshot_force(
context,
volume,
snapshot.get('display_name'),
snapshot.get('display_description'),
**kwargs)
else:
new_snapshot = self.volume_api.create_snapshot(
context,
volume,
snapshot.get('display_name'),
snapshot.get('display_description'),
**kwargs)
req.cache_db_snapshot(new_snapshot)
retval = _translate_snapshot_detail_view(new_snapshot)
return {'snapshot': retval}
return _snapshot_v2_to_v1(
super(SnapshotsController, self).create(req, body))
def update(self, req, id, body):
"""Update a snapshot."""
context = req.environ['cinder.context']
if not body:
try:
return _snapshot_v2_to_v1(
super(SnapshotsController, self).update(req, id, body))
except exc.HTTPBadRequest:
raise exc.HTTPUnprocessableEntity()
if 'snapshot' not in body:
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
update_dict = {}
valid_update_keys = (
'display_name',
'display_description',
)
for key in valid_update_keys:
if key in snapshot:
update_dict[key] = snapshot[key]
# Not found exception will be handled at the wsgi level
snapshot = self.volume_api.get_snapshot(context, id)
self.volume_api.update_snapshot(context, snapshot, update_dict)
snapshot.update(update_dict)
req.cache_db_snapshot(snapshot)
return {'snapshot': _translate_snapshot_detail_view(snapshot)}
def create_resource(ext_mgr):
return wsgi.Resource(SnapshotsController(ext_mgr))

View File

@ -1,48 +0,0 @@
# Copyright (c) 2011 Zadara Storage Inc.
# Copyright (c) 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The volume type & volume types extra specs extension."""
from cinder.api.openstack import wsgi
from cinder.api.views import types as views_types
from cinder.volume import volume_types
class VolumeTypesController(wsgi.Controller):
"""The volume types API controller for the OpenStack API."""
_view_builder_class = views_types.ViewBuilder
def index(self, req):
"""Returns the list of volume types."""
context = req.environ['cinder.context']
vol_types = volume_types.get_all_types(context)
vol_types = list(vol_types.values())
req.cache_resource(vol_types, name='types')
return self._view_builder.index(req, vol_types)
def show(self, req, id):
"""Return a single volume type item."""
context = req.environ['cinder.context']
# Not found exception will be handled at the wsgi level
vol_type = volume_types.get_volume_type(context, id)
req.cache_resource(vol_type, name='types')
return self._view_builder.show(req, vol_type)
def create_resource():
return wsgi.Resource(VolumeTypesController())

View File

@ -1,145 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from webob import exc
from cinder.api.openstack import wsgi
from cinder import exception
from cinder.i18n import _
from cinder import volume
class Controller(wsgi.Controller):
"""The volume metadata API controller for the OpenStack API."""
def __init__(self):
self.volume_api = volume.API()
super(Controller, self).__init__()
def _get_metadata(self, context, volume_id):
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, volume_id)
meta = self.volume_api.get_volume_metadata(context, volume)
return meta
def index(self, req, volume_id):
"""Returns the list of metadata for a given volume."""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, volume_id)}
def create(self, req, volume_id, body):
try:
metadata = body['metadata']
except (KeyError, TypeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
context = req.environ['cinder.context']
new_metadata = self._update_volume_metadata(context,
volume_id,
metadata,
delete=False)
return {'metadata': new_metadata}
def update(self, req, volume_id, id, body):
try:
meta_item = body['meta']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
if id not in meta_item:
expl = _('Request body and URI mismatch')
raise exc.HTTPBadRequest(explanation=expl)
if len(meta_item) > 1:
expl = _('Request body contains too many items')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
self._update_volume_metadata(context,
volume_id,
meta_item,
delete=False)
return {'meta': meta_item}
def update_all(self, req, volume_id, body):
try:
metadata = body['metadata']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
new_metadata = self._update_volume_metadata(context,
volume_id,
metadata,
delete=True)
return {'metadata': new_metadata}
def _update_volume_metadata(self, context,
volume_id, metadata,
delete=False):
try:
volume = self.volume_api.get(context, volume_id)
return self.volume_api.update_volume_metadata(context,
volume,
metadata,
delete)
# Not found exception will be handled at the wsgi level
except (ValueError, AttributeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
except exception.InvalidVolumeMetadata as error:
raise exc.HTTPBadRequest(explanation=error.msg)
except exception.InvalidVolumeMetadataSize as error:
raise exc.HTTPRequestEntityTooLarge(explanation=error.msg)
def show(self, req, volume_id, id):
"""Return a single metadata item."""
context = req.environ['cinder.context']
data = self._get_metadata(context, volume_id)
try:
return {'meta': {id: data[id]}}
except KeyError:
raise exception.VolumeMetadataNotFound(volume_id=volume_id,
metadata_key=id)
def delete(self, req, volume_id, id):
"""Deletes an existing metadata."""
context = req.environ['cinder.context']
metadata = self._get_metadata(context, volume_id)
if id not in metadata:
raise exception.VolumeMetadataNotFound(volume_id=volume_id,
metadata_key=id)
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, volume_id)
self.volume_api.delete_volume_metadata(context, volume, id)
return webob.Response(status_int=200)
def create_resource():
return wsgi.Resource(Controller())

View File

@ -15,316 +15,128 @@
"""The volumes api."""
import ast
from oslo_log import log as logging
from oslo_utils import uuidutils
import webob
from webob import exc
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder.i18n import _, _LI
from cinder import objects
from cinder.objects import fields
from cinder import utils
from cinder import volume as cinder_volume
from cinder.volume import utils as volume_utils
from cinder.api.v2 import volumes as volumes_v2
LOG = logging.getLogger(__name__)
def _translate_attachment_detail_view(_context, vol):
"""Maps keys for attachment details view."""
d = _translate_attachment_summary_view(_context, vol)
# No additional data / lookups at the moment
return d
def _translate_attachment_summary_view(_context, vol):
"""Maps keys for attachment summary view."""
def _attachment_v2_to_v1(vol):
"""Converts v2 attachment details to v1 format."""
d = []
attachments = vol.volume_attachment
attachments = vol.pop('attachments', [])
for attachment in attachments:
if (attachment.get('attach_status') ==
fields.VolumeAttachStatus.ATTACHED):
a = {'id': attachment.get('volume_id'),
'attachment_id': attachment.get('id'),
'volume_id': attachment.get('volume_id'),
'server_id': attachment.get('instance_uuid'),
'host_name': attachment.get('attached_host'),
'device': attachment.get('mountpoint'),
}
d.append(a)
a = {'id': attachment.get('id'),
'attachment_id': attachment.get('attachment_id'),
'volume_id': attachment.get('volume_id'),
'server_id': attachment.get('server_id'),
'host_name': attachment.get('host_name'),
'device': attachment.get('device'),
}
d.append(a)
return d
def _translate_volume_detail_view(context, vol, image_id=None):
"""Maps keys for volumes details view."""
def _volume_v2_to_v1(volv2_results, image_id=None):
"""Converts v2 volume details to v1 format."""
volumes = volv2_results.get('volumes')
if volumes is None:
volumes = [volv2_results['volume']]
d = _translate_volume_summary_view(context, vol, image_id)
for vol in volumes:
# Need to form the string true/false explicitly here to
# maintain our API contract
if vol.get('multiattach'):
vol['multiattach'] = 'true'
else:
vol['multiattach'] = 'false'
# No additional data / lookups at the moment
if not vol.get('image_id') and image_id:
vol['image_id'] = image_id
return d
vol['attachments'] = _attachment_v2_to_v1(vol)
if not vol.get('metadata'):
vol['metadata'] = {}
# Convert the name changes
vol['display_name'] = vol.pop('name')
vol['display_description'] = vol.pop('description', '')
# Remove the properties not present for v1
vol.pop('consistencygroup_id', None)
vol.pop('encryption_key_id', None)
vol.pop('links', None)
vol.pop('migration_status', None)
vol.pop('replication_status', None)
vol.pop('updated_at', None)
vol.pop('user_id', None)
LOG.debug("vol=%s", vol)
return volv2_results
def _translate_volume_summary_view(context, vol, image_id=None):
"""Maps keys for volumes summary view."""
d = {}
d['id'] = vol['id']
d['status'] = vol['status']
d['size'] = vol['size']
d['availability_zone'] = vol['availability_zone']
d['created_at'] = vol['created_at']
# Need to form the string true/false explicitly here to
# maintain our API contract
if vol['bootable']:
d['bootable'] = 'true'
else:
d['bootable'] = 'false'
if vol['multiattach']:
d['multiattach'] = 'true'
else:
d['multiattach'] = 'false'
d['attachments'] = []
if vol['attach_status'] == fields.VolumeAttachStatus.ATTACHED:
d['attachments'] = _translate_attachment_detail_view(context, vol)
d['display_name'] = vol['display_name']
d['display_description'] = vol['display_description']
if vol['volume_type_id'] and vol.get('volume_type'):
d['volume_type'] = vol['volume_type']['name']
else:
d['volume_type'] = vol['volume_type_id']
d['snapshot_id'] = vol['snapshot_id']
d['source_volid'] = vol['source_volid']
d['encrypted'] = vol['encryption_key_id'] is not None
if image_id:
d['image_id'] = image_id
LOG.info(_LI("vol=%s"), vol)
if vol.metadata:
d['metadata'] = vol.metadata
else:
d['metadata'] = {}
return d
class VolumeController(wsgi.Controller):
class VolumeController(volumes_v2.VolumeController):
"""The Volumes API controller for the OpenStack API."""
def __init__(self, ext_mgr):
self.volume_api = cinder_volume.API()
self.ext_mgr = ext_mgr
super(VolumeController, self).__init__()
def show(self, req, id):
"""Return data about the given volume."""
context = req.environ['cinder.context']
# Not found exception will be handled at the wsgi level
vol = self.volume_api.get(context, id, viewable_admin_meta=True)
req.cache_db_volume(vol)
utils.add_visible_admin_metadata(vol)
return {'volume': _translate_volume_detail_view(context, vol)}
def delete(self, req, id):
"""Delete a volume."""
context = req.environ['cinder.context']
LOG.info(_LI("Delete volume with id: %s"), id)
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, id)
self.volume_api.delete(context, volume)
return webob.Response(status_int=202)
return _volume_v2_to_v1(super(VolumeController, self).show(
req, id))
def index(self, req):
"""Returns a summary list of volumes."""
return self._items(req, entity_maker=_translate_volume_summary_view)
# The v1 info was much more detailed than the v2 non-detailed result
return _volume_v2_to_v1(
super(VolumeController, self).detail(req))
def detail(self, req):
"""Returns a detailed list of volumes."""
return self._items(req, entity_maker=_translate_volume_detail_view)
def _items(self, req, entity_maker):
"""Returns a list of volumes, transformed through entity_maker."""
# pop out limit and offset , they are not search_opts
search_opts = req.GET.copy()
search_opts.pop('limit', None)
search_opts.pop('offset', None)
for k, v in search_opts.items():
try:
search_opts[k] = ast.literal_eval(v)
except (ValueError, SyntaxError):
LOG.debug('Could not evaluate value %s, assuming string', v)
context = req.environ['cinder.context']
utils.remove_invalid_filter_options(context,
search_opts,
self._get_volume_search_options())
volumes = self.volume_api.get_all(context, marker=None, limit=None,
sort_keys=['created_at'],
sort_dirs=['desc'],
filters=search_opts,
viewable_admin_meta=True)
for volume in volumes:
utils.add_visible_admin_metadata(volume)
limited_list = common.limited(volumes.objects, req)
req.cache_db_volumes(limited_list)
res = [entity_maker(context, vol) for vol in limited_list]
return {'volumes': res}
def _image_uuid_from_href(self, image_href):
# If the image href was generated by nova api, strip image_href
# down to an id.
try:
image_uuid = image_href.split('/').pop()
except (TypeError, AttributeError):
msg = _("Invalid imageRef provided.")
raise exc.HTTPBadRequest(explanation=msg)
if not uuidutils.is_uuid_like(image_uuid):
msg = _("Invalid imageRef provided.")
raise exc.HTTPBadRequest(explanation=msg)
return image_uuid
return _volume_v2_to_v1(
super(VolumeController, self).detail(req))
@wsgi.response(200)
def create(self, req, body):
"""Creates a new volume."""
if not self.is_valid_body(body, 'volume'):
if (body is None or not body.get('volume') or
not isinstance(body['volume'], dict)):
raise exc.HTTPUnprocessableEntity()
LOG.debug('Create volume request body: %s', body)
context = req.environ['cinder.context']
volume = body['volume']
image_id = None
if body.get('volume'):
image_id = body['volume'].get('imageRef')
kwargs = {}
req_volume_type = volume.get('volume_type', None)
if req_volume_type:
# Not found exception will be handled at the wsgi level
kwargs['volume_type'] = (
objects.VolumeType.get_by_name_or_id(context, req_volume_type))
kwargs['metadata'] = volume.get('metadata', None)
snapshot_id = volume.get('snapshot_id')
if snapshot_id is not None:
if not uuidutils.is_uuid_like(snapshot_id):
msg = _("Snapshot ID must be in UUID form.")
raise exc.HTTPBadRequest(explanation=msg)
# Not found exception will be handled at the wsgi level
kwargs['snapshot'] = self.volume_api.get_snapshot(context,
snapshot_id)
else:
kwargs['snapshot'] = None
source_volid = volume.get('source_volid')
if source_volid is not None:
# Not found exception will be handled at the wsgi level
kwargs['source_volume'] = self.volume_api.get_volume(context,
source_volid)
else:
kwargs['source_volume'] = None
size = volume.get('size', None)
if size is None and kwargs['snapshot'] is not None:
size = kwargs['snapshot']['volume_size']
elif size is None and kwargs['source_volume'] is not None:
size = kwargs['source_volume']['size']
LOG.info(_LI("Create volume of %s GB"), size)
multiattach = volume.get('multiattach', False)
kwargs['multiattach'] = multiattach
image_href = None
image_uuid = None
if self.ext_mgr.is_loaded('os-image-create'):
# NOTE(jdg): misleading name "imageRef" as it's an image-id
image_href = volume.get('imageRef')
if image_href is not None:
image_uuid = self._image_uuid_from_href(image_href)
kwargs['image_id'] = image_uuid
kwargs['availability_zone'] = volume.get('availability_zone', None)
new_volume = self.volume_api.create(context,
size,
volume.get('display_name'),
volume.get('display_description'),
**kwargs)
retval = _translate_volume_detail_view(context, new_volume, image_uuid)
return {'volume': retval}
def _get_volume_search_options(self):
"""Return volume search options allowed by non-admin."""
return ('display_name', 'status', 'metadata')
try:
return _volume_v2_to_v1(
super(VolumeController, self).create(req, body),
image_id=image_id)
except exc.HTTPBadRequest as e:
# Image failures are the only ones that actually used
# HTTPBadRequest
error_msg = '%s' % e
if 'Invalid image' in error_msg:
raise
raise exc.HTTPUnprocessableEntity()
def update(self, req, id, body):
"""Update a volume."""
context = req.environ['cinder.context']
if not body:
if (body is None or not body.get('volume') or
not isinstance(body['volume'], dict)):
raise exc.HTTPUnprocessableEntity()
if 'volume' not in body:
try:
return _volume_v2_to_v1(super(VolumeController, self).update(
req, id, body))
except exc.HTTPBadRequest:
raise exc.HTTPUnprocessableEntity()
volume = body['volume']
update_dict = {}
valid_update_keys = (
'display_name',
'display_description',
'metadata',
)
for key in valid_update_keys:
if key in volume:
update_dict[key] = volume[key]
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, id, viewable_admin_meta=True)
volume_utils.notify_about_volume_usage(context, volume,
'update.start')
self.volume_api.update(context, volume, update_dict)
volume.update(update_dict)
utils.add_visible_admin_metadata(volume)
volume_utils.notify_about_volume_usage(context, volume,
'update.end')
return {'volume': _translate_volume_detail_view(context, volume)}
def create_resource(ext_mgr):
return wsgi.Resource(VolumeController(ext_mgr))

View File

@ -1,208 +0,0 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import iso8601
from cinder import exception as exc
from cinder import objects
from cinder.objects import fields
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
DEFAULT_VOL_TYPE = "vol_type_name"
def stub_volume(id, **kwargs):
volume = {
'id': id,
'user_id': fake.USER_ID,
'project_id': fake.PROJECT_ID,
'host': 'fakehost',
'size': 1,
'availability_zone': 'fakeaz',
'attached_mode': 'rw',
'status': 'fakestatus',
'migration_status': None,
'attach_status': fields.VolumeAttachStatus.ATTACHED,
'bootable': False,
'name': 'vol name',
'display_name': 'displayname',
'display_description': 'displaydesc',
'updated_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
tzinfo=iso8601.iso8601.Utc()),
'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
tzinfo=iso8601.iso8601.Utc()),
'snapshot_id': None,
'source_volid': None,
'volume_type_id': fake.VOLUME_TYPE_ID,
'volume_admin_metadata': [{'key': 'attached_mode', 'value': 'rw'},
{'key': 'readonly', 'value': 'False'}],
'volume_type': fake_volume.fake_db_volume_type(name=DEFAULT_VOL_TYPE),
'volume_attachment': [],
'multiattach': False,
'readonly': 'False'}
volume.update(kwargs)
if kwargs.get('volume_glance_metadata', None):
volume['bootable'] = True
if kwargs.get('attach_status') == 'detached':
del volume['volume_admin_metadata'][0]
return volume
def stub_volume_create(self, context, size, name, description, snapshot,
**param):
vol = stub_volume(fake.VOLUME_ID)
vol['size'] = size
vol['display_name'] = name
vol['display_description'] = description
vol['source_volid'] = None
try:
vol['snapshot_id'] = snapshot['id']
except (KeyError, TypeError):
vol['snapshot_id'] = None
vol['availability_zone'] = param.get('availability_zone', 'fakeaz')
return vol
def stub_volume_api_create(self, context, *args, **kwargs):
vol = stub_volume_create(self, context, *args, **kwargs)
return fake_volume.fake_volume_obj(context, **vol)
def stub_volume_create_from_image(self, context, size, name, description,
snapshot, volume_type, metadata,
availability_zone):
vol = stub_volume('1')
vol['status'] = 'creating'
vol['size'] = size
vol['display_name'] = name
vol['display_description'] = description
vol['bootable'] = False
vol['availability_zone'] = 'cinder'
return vol
def stub_volume_update(self, context, *args, **param):
pass
def stub_volume_delete(self, context, *args, **param):
pass
def stub_volume_get(self, context, volume_id, viewable_admin_meta=False):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
if viewable_admin_meta:
return stub_volume(volume_id)
else:
volume = stub_volume(volume_id)
del volume['volume_admin_metadata']
return volume
def stub_volume_get_db(context, volume_id):
if context.is_admin:
return stub_volume(volume_id)
else:
volume = stub_volume(volume_id)
del volume['volume_admin_metadata']
return volume
def stub_volume_api_get(self, context, volume_id, viewable_admin_meta=False):
vol = stub_volume(volume_id)
return fake_volume.fake_volume_obj(context, **vol)
def stub_volume_api_get_all_by_project(self, context, marker, limit,
sort_keys=None, sort_dirs=None,
filters=None,
viewable_admin_meta=False,
offset=None):
vol = stub_volume_get(self, context, fake.VOLUME_ID,
viewable_admin_meta=viewable_admin_meta)
vol_obj = fake_volume.fake_volume_obj(context, **vol)
return objects.VolumeList(objects=[vol_obj])
def stub_volume_get_all(context, search_opts=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, filters=None,
viewable_admin_meta=False, offset=None):
return [stub_volume(fake.VOLUME_ID, project_id=fake.PROJECT_ID),
stub_volume(fake.VOLUME2_ID, project_id=fake.PROJECT2_ID),
stub_volume(fake.VOLUME3_ID, project_id=fake.PROJECT3_ID)]
def stub_volume_get_all_by_project(self, context, marker, limit,
sort_keys=None, sort_dirs=None,
filters=None,
viewable_admin_meta=False, offset=None):
return [stub_volume_get(self, context, fake.VOLUME_ID,
viewable_admin_meta=True)]
def stub_snapshot(id, **kwargs):
snapshot = {'id': id,
'volume_id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'created_at': None,
'display_name': 'Default name',
'display_description': 'Default description',
'project_id': fake.PROJECT_ID,
'snapshot_metadata': []}
snapshot.update(kwargs)
return snapshot
def stub_snapshot_get_all(context, filters=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, offset=None):
return [stub_snapshot(fake.SNAPSHOT_ID, project_id=fake.PROJECT_ID),
stub_snapshot(fake.SNAPSHOT2_ID, project_id=fake.PROJECT2_ID),
stub_snapshot(fake.SNAPSHOT3_ID, project_id=fake.PROJECT3_ID)]
def stub_snapshot_get_all_by_project(context, project_id, filters=None,
marker=None, limit=None, sort_keys=None,
sort_dirs=None, offset=None):
return [stub_snapshot(fake.VOLUME_ID)]
def stub_snapshot_update(self, context, *args, **param):
pass
def stub_service_get_all(context, **filters):
return [{'availability_zone': "zone1:host1", "disabled": 0}]
def stub_volume_type_get(context, id, *args, **kwargs):
return {'id': id,
'name': 'vol_type_name',
'description': 'A fake volume type',
'is_public': True,
'projects': [],
'extra_specs': {},
'created_at': None,
'deleted_at': None,
'updated_at': None,
'qos_specs_id': fake.QOS_SPEC_ID,
'deleted': False}

View File

@ -15,8 +15,11 @@
import ddt
import mock
from oslo_config import cfg
from six.moves.urllib import parse as urllib
import webob
from cinder.api import common
from cinder.api.v1 import snapshots
from cinder import context
from cinder import db
@ -25,52 +28,51 @@ from cinder import objects
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v1 import stubs
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit import utils
from cinder import volume
UUID = fake.SNAPSHOT_ID
INVALID_UUID = fake.WILL_NOT_BE_FOUND_ID
CONF = cfg.CONF
UUID = '00000000-0000-0000-0000-000000000003'
INVALID_UUID = '00000000-0000-0000-0000-000000000004'
def _get_default_snapshot_param():
return {'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'created_at': None,
'display_name': 'Default name',
'display_description': 'Default description', }
return {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'created_at': None,
'updated_at': None,
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
'display_name': 'Default name',
'display_description': 'Default description',
'deleted': None,
'volume': {'availability_zone': 'test_zone'}
}
def stub_snapshot_create(self, context,
volume_id, name,
description, metadata):
snapshot = _get_default_snapshot_param()
snapshot['volume_id'] = volume_id
snapshot['display_name'] = name
snapshot['display_description'] = description
snapshot['metadata'] = metadata
return snapshot
def stub_snapshot_delete(self, context, snapshot):
def fake_snapshot_delete(self, context, snapshot):
if snapshot['id'] != UUID:
raise exception.NotFound
raise exception.SnapshotNotFound(snapshot['id'])
def stub_snapshot_get(self, context, snapshot_id):
def fake_snapshot_get(self, context, snapshot_id):
if snapshot_id != UUID:
raise exception.NotFound
raise exception.SnapshotNotFound(snapshot_id)
param = _get_default_snapshot_param()
return param
def stub_snapshot_get_all(self, context, search_opts=None):
def fake_snapshot_get_all(self, context, search_opts=None):
param = _get_default_snapshot_param()
return [param]
@ -80,59 +82,68 @@ class SnapshotApiTest(test.TestCase):
def setUp(self):
super(SnapshotApiTest, self).setUp()
self.controller = snapshots.SnapshotsController()
self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
self.stubs.Set(db, 'snapshot_get_all_by_project',
stubs.stub_snapshot_get_all_by_project)
self.stubs.Set(db, 'snapshot_get_all',
stubs.stub_snapshot_get_all)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_snapshot_create(self, mock_validate):
volume = utils.create_volume(self.ctx)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": False,
"name": snapshot_name,
"description": snapshot_description
}
def test_snapshot_create(self):
self.stubs.Set(volume.api.API, "create_snapshot", stub_snapshot_create)
self.stubs.Set(volume.api.API, 'get', stubs.stub_volume_get)
snapshot = {"volume_id": fake.VOLUME_ID,
"force": False,
"display_name": "Snapshot Test Name",
"display_description": "Snapshot Test Desc"}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v1/snapshots')
resp_dict = self.controller.create(req, body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot['display_name'],
resp_dict['snapshot']['display_name'])
self.assertEqual(snapshot['display_description'],
self.assertEqual(snapshot_name, resp_dict['snapshot']['display_name'])
self.assertEqual(snapshot_description,
resp_dict['snapshot']['display_description'])
self.assertTrue(mock_validate.called)
self.assertNotIn('updated_at', resp_dict['snapshot'])
db.volume_destroy(self.ctx, volume.id)
@ddt.data(True, 'y', 'true', 'trUE', 'yes', '1', 1)
@ddt.data(True, 'y', 'true', 'trUE', 'yes', '1', 'on', 1, "1 ")
def test_snapshot_create_force(self, force_param):
self.stubs.Set(volume.api.API,
"create_snapshot_force",
stub_snapshot_create)
self.mock_object(volume.api.API, 'get', stubs.stub_volume_api_get)
snapshot = {"volume_id": fake.VOLUME_ID,
"force": force_param,
"display_name": "Snapshot Test Name",
"display_description": "Snapshot Test Desc"}
volume = utils.create_volume(self.ctx, status='in-use')
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v1/snapshots')
resp_dict = self.controller.create(req, body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot['display_name'],
self.assertEqual(snapshot_name,
resp_dict['snapshot']['display_name'])
self.assertEqual(snapshot['display_description'],
self.assertEqual(snapshot_description,
resp_dict['snapshot']['display_description'])
self.assertNotIn('updated_at', resp_dict['snapshot'])
@ddt.data(False, 'n', 'false', 'falSE', 'No', '0', 0)
db.volume_destroy(self.ctx, volume.id)
@ddt.data(False, 'n', 'false', 'falSE', 'No', '0', 'off', 0)
def test_snapshot_create_force_failure(self, force_param):
self.stubs.Set(volume.api.API,
"create_snapshot_force",
stub_snapshot_create)
self.mock_object(volume.api.API, 'get', stubs.stub_volume_api_get)
snapshot = {"volume_id": fake.VOLUME_ID,
"force": force_param,
"display_name": "Snapshot Test Name",
"display_description": "Snapshot Test Desc"}
volume = utils.create_volume(self.ctx, status='in-use')
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v1/snapshots')
self.assertRaises(exception.InvalidVolume,
@ -140,16 +151,20 @@ class SnapshotApiTest(test.TestCase):
req,
body)
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01', 'on', 'off', "1 ")
db.volume_destroy(self.ctx, volume.id)
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01')
def test_snapshot_create_invalid_force_param(self, force_param):
self.stubs.Set(volume.api.API,
"create_snapshot_force",
stub_snapshot_create)
self.mock_object(volume.api.API, 'get', stubs.stub_volume_api_get)
snapshot = {"volume_id": fake.SNAPSHOT_ID,
"force": "**&&^^%%$$##@@",
"display_name": "Snapshot Test Name",
"display_description": "Snapshot Test Desc"}
volume = utils.create_volume(self.ctx, status='in-use')
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v1/snapshots')
self.assertRaises(exception.InvalidParameterValue,
@ -157,6 +172,8 @@ class SnapshotApiTest(test.TestCase):
req,
body)
db.volume_destroy(self.ctx, volume.id)
def test_snapshot_create_without_volume_id(self):
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
@ -172,12 +189,15 @@ class SnapshotApiTest(test.TestCase):
self.controller.create, req, body)
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=stubs.stub_snapshot_update)
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update(self, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get, update_snapshot):
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_snapshot_update(
self, mock_validate, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
@ -187,26 +207,30 @@ class SnapshotApiTest(test.TestCase):
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
updates = {"display_name": "Updated Test Name", }
updates = {
"display_name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
res_dict = self.controller.update(req, UUID, body)
expected = {'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': None,
'display_name': u'Updated Test Name',
'display_description': u'Default description',
'metadata': {},
}}
expected = {
'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': None,
'display_name': u'Updated Test Name',
'display_description': u'Default description',
'metadata': {},
}
}
self.assertEqual(expected, res_dict)
def test_snapshot_update_missing_body(self):
@ -216,23 +240,23 @@ class SnapshotApiTest(test.TestCase):
self.controller.update, req, UUID, body)
def test_snapshot_update_invalid_body(self):
body = {'display_name': 'missing top level snapshot key'}
body = {'name': 'missing top level snapshot key'}
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
self.controller.update, req, UUID, body)
def test_snapshot_update_not_found(self):
self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
self.mock_object(volume.api.API, "get_snapshot", fake_snapshot_get)
updates = {
"display_name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v1/snapshots/not-the-uuid')
self.assertRaises(exception.NotFound, self.controller.update, req,
'not-the-uuid', body)
self.assertRaises(exception.SnapshotNotFound, self.controller.update,
req, 'not-the-uuid', body)
@mock.patch.object(volume.api.API, "delete_snapshot",
side_effect=stubs.stub_snapshot_update)
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
@ -247,7 +271,7 @@ class SnapshotApiTest(test.TestCase):
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
@ -259,13 +283,12 @@ class SnapshotApiTest(test.TestCase):
self.assertEqual(202, resp.status_int)
def test_snapshot_delete_invalid_id(self):
self.stubs.Set(volume.api.API, "delete_snapshot", stub_snapshot_delete)
self.mock_object(volume.api.API, "delete_snapshot",
fake_snapshot_delete)
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound,
self.controller.delete,
req,
snapshot_id)
self.assertRaises(exception.SnapshotNotFound, self.controller.delete,
req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@ -281,25 +304,23 @@ class SnapshotApiTest(test.TestCase):
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
resp_dict = self.controller.show(req, UUID)
self.assertIn('snapshot', resp_dict)
self.assertEqual(UUID, resp_dict['snapshot']['id'])
self.assertNotIn('updated_at', resp_dict['snapshot'])
def test_snapshot_show_invalid_id(self):
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound,
self.controller.show,
req,
snapshot_id)
self.controller.show, req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@ -316,7 +337,7 @@ class SnapshotApiTest(test.TestCase):
'display_description': 'Default description',
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
@ -330,10 +351,15 @@ class SnapshotApiTest(test.TestCase):
self.assertIn('snapshots', resp_dict)
resp_snapshots = resp_dict['snapshots']
self.assertEqual(1, len(resp_snapshots))
self.assertNotIn('updated_at', resp_snapshots[0])
resp_snapshot = resp_snapshots.pop()
self.assertEqual(UUID, resp_snapshot['id'])
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_limited_to_project(self,
snapshot_metadata_get):
@ -347,55 +373,224 @@ class SnapshotApiTest(test.TestCase):
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snapshots_with_limit_and_offset(self,
snapshot_metadata_get):
def list_snapshots_with_limit_and_offset(is_admin):
def stub_snapshot_get_all_by_project(context, project_id,
filters=None, marker=None,
limit=None, sort_keys=None,
sort_dirs=None, offset=None):
return [
stubs.stub_snapshot(fake.SNAPSHOT_ID,
display_name='backup1'),
stubs.stub_snapshot(fake.SNAPSHOT2_ID,
display_name='backup2'),
stubs.stub_snapshot(fake.SNAPSHOT3_ID,
display_name='backup3'),
]
self.stubs.Set(db, 'snapshot_get_all_by_project',
stub_snapshot_get_all_by_project)
req = fakes.HTTPRequest.blank('/v1/%s/snapshots?limit=1\
&offset=1' % fake.PROJECT_ID,
def list_snapshots_with_limit_and_offset(snaps, is_admin):
req = fakes.HTTPRequest.blank('/v1/%s/snapshots?limit=1'
'&offset=1' % fake.PROJECT_ID,
use_admin_context=is_admin)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(fake.SNAPSHOT2_ID, res['snapshots'][0]['id'])
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
self.assertNotIn('updated_at', res['snapshots'][0])
# Test that we get an empty list with an offset greater than the
# number of items
req = fakes.HTTPRequest.blank('/v1/snapshots?limit=1&offset=3')
self.assertEqual({'snapshots': []}, self.controller.index(req))
volume, snaps = self._create_db_snapshots(3)
# admin case
list_snapshots_with_limit_and_offset(is_admin=True)
# non_admin case
list_snapshots_with_limit_and_offset(is_admin=False)
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
# non-admin case
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
@mock.patch.object(db, 'snapshot_get_all_by_project')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snpashots_with_wrong_limit_and_offset(self,
mock_metadata_get,
mock_snapshot_get_all):
"""Test list with negative and non numeric limit and offset."""
mock_snapshot_get_all.return_value = []
# Negative limit
req = fakes.HTTPRequest.blank('/v1/snapshots?limit=-1&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric limit
req = fakes.HTTPRequest.blank('/v1/snapshots?limit=a&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Negative offset
req = fakes.HTTPRequest.blank('/v1/snapshots?limit=1&offset=-1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric offset
req = fakes.HTTPRequest.blank('/v1/snapshots?limit=1&offset=a')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Test that we get an exception HTTPBadRequest(400) with an offset
# greater than the maximum offset value.
url = '/v1/snapshots?limit=1&offset=323245324356534235'
req = fakes.HTTPRequest.blank(url)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, req)
def _assert_list_next(self, expected_query=None, project=fake.PROJECT_ID,
**kwargs):
"""Check a page of snapshots list."""
# Since we are accessing v2 api directly we don't need to specify
# v2 in the request path, if we did, we'd get /v2/v2 links back
request_path = '/v2/%s/snapshots' % project
expected_path = request_path
# Construct the query if there are kwargs
if kwargs:
request_str = request_path + '?' + urllib.urlencode(kwargs)
else:
request_str = request_path
# Make the request
req = fakes.HTTPRequest.blank(request_str)
res = self.controller.index(req)
# We only expect to have a next link if there is an actual expected
# query.
if expected_query:
# We must have the links
self.assertIn('snapshots_links', res)
links = res['snapshots_links']
# Must be a list of links, even if we only get 1 back
self.assertTrue(list, type(links))
next_link = links[0]
# rel entry must be next
self.assertIn('rel', next_link)
self.assertIn('next', next_link['rel'])
# href entry must have the right path
self.assertIn('href', next_link)
href_parts = urllib.urlparse(next_link['href'])
self.assertEqual(expected_path, href_parts.path)
# And the query from the next link must match what we were
# expecting
params = urllib.parse_qs(href_parts.query)
self.assertDictEqual(expected_query, params)
# Make sure we don't have links if we were not expecting them
else:
self.assertNotIn('snapshots_links', res)
def _create_db_snapshots(self, num_snaps):
volume = utils.create_volume(self.ctx)
snaps = [utils.create_snapshot(self.ctx,
volume.id,
display_name='snap' + str(i))
for i in range(num_snaps)]
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
for snap in snaps:
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
snaps.reverse()
return volume, snaps
def test_list_snapshots_next_link_default_limit(self):
"""Test that snapshot list pagination is limited by osapi_max_limit."""
volume, snaps = self._create_db_snapshots(3)
# NOTE(geguileo): Since cinder.api.common.limited has already been
# imported his argument max_limit already has a default value of 1000
# so it doesn't matter that we change it to 2. That's why we need to
# mock it and send it current value. We still need to set the default
# value because other sections of the code use it, for example
# _get_collection_links
CONF.set_default('osapi_max_limit', 2)
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
original_call=common.get_pagination_params):
return original_call(params, max_limit)
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
original_call=common._get_limit_param):
return original_call(params, max_limit)
with mock.patch.object(common, 'get_pagination_params',
get_pagination_params), \
mock.patch.object(common, '_get_limit_param',
_get_limit_param):
# The link from the first page should link to the second
self._assert_list_next({'marker': [snaps[1].id]})
# Second page should have no next link
self._assert_list_next(marker=snaps[1].id)
def test_list_snapshots_next_link_with_limit(self):
"""Test snapshot list pagination with specific limit."""
volume, snaps = self._create_db_snapshots(2)
# The link from the first page should link to the second
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
limit=1)
# Even though there are no more elements, we should get a next element
# per specification.
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
# When we go beyond the number of elements there should be no more
# next links
self._assert_list_next(limit=1, marker=snaps[1].id)
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank(
'/v1/%s/snapshots?all_tenants=1' % fake.PROJECT_ID,
use_admin_context=True)
req = fakes.HTTPRequest.blank('/v1/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_all_tenants_non_admin_gets_all_tenants(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank(
'/v1/%s/snapshots?all_tenants=1' % fake.PROJECT_ID)
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
snapshot_get_all):
def get_all(context, filters=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, offset=None):
if 'project_id' in filters and 'tenant1' in filters['project_id']:
return [v2_fakes.fake_snapshot(fake.VOLUME_ID,
tenant_id='tenant1')]
else:
return []
snapshot_get_all.side_effect = get_all
req = fakes.HTTPRequest.blank('/v1/%s/snapshots?all_tenants=1'
'&project_id=tenant1' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_all_tenants_non_admin_gets_all_tenants(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_non_admin_get_by_project(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/%s/snapshots' % fake.PROJECT_ID)
@ -403,16 +598,7 @@ class SnapshotApiTest(test.TestCase):
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
class SnapshotsUnprocessableEntityTestCase(test.TestCase):
"""Tests of places we throw 422 Unprocessable Entity."""
def setUp(self):
super(SnapshotsUnprocessableEntityTestCase, self).setUp()
self.controller = snapshots.SnapshotsController()
def _unprocessable_snapshot_create(self, body):
def _create_snapshot_bad_body(self, body):
req = fakes.HTTPRequest.blank('/v1/%s/snapshots' % fake.PROJECT_ID)
req.method = 'POST'
@ -420,12 +606,12 @@ class SnapshotsUnprocessableEntityTestCase(test.TestCase):
self.controller.create, req, body)
def test_create_no_body(self):
self._unprocessable_snapshot_create(body=None)
self._create_snapshot_bad_body(body=None)
def test_create_missing_snapshot(self):
body = {'foo': {'a': 'b'}}
self._unprocessable_snapshot_create(body=body)
self._create_snapshot_bad_body(body=body)
def test_create_malformed_entity(self):
body = {'snapshot': 'string'}
self._unprocessable_snapshot_create(body=body)
self._create_snapshot_bad_body(body=body)

View File

@ -1,166 +0,0 @@
# Copyright 2011 OpenStack Foundation
# aLL Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo_utils import timeutils
from cinder.api.v1 import types
from cinder.api.views import types as views_types
from cinder import exception
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_constants as fake
from cinder.volume import volume_types
def stub_volume_type(id):
specs = {"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"key5": "value5"}
return dict(id=id, name='vol_type_%s' % id, extra_specs=specs)
def return_volume_types_get_all_types(context, search_opts=None):
d = {}
for vtype in [fake.VOLUME_TYPE_ID, fake.VOLUME_TYPE2_ID,
fake.VOLUME_TYPE3_ID]:
vtype_name = 'vol_type_%s' % vtype
d[vtype_name] = stub_volume_type(vtype)
return d
def return_empty_volume_types_get_all_types(context, search_opts=None):
return {}
def return_volume_types_get_volume_type(context, id):
if id == fake.WILL_NOT_BE_FOUND_ID:
raise exception.VolumeTypeNotFound(volume_type_id=id)
return stub_volume_type(id)
class VolumeTypesApiTest(test.TestCase):
def setUp(self):
super(VolumeTypesApiTest, self).setUp()
self.controller = types.VolumeTypesController()
def test_volume_types_index(self):
self.stubs.Set(volume_types, 'get_all_types',
return_volume_types_get_all_types)
req = fakes.HTTPRequest.blank('/v1/%s/types' % fake.PROJECT_ID)
res_dict = self.controller.index(req)
self.assertEqual(3, len(res_dict['volume_types']))
expected_names = ['vol_type_%s' % fake.VOLUME_TYPE_ID,
'vol_type_%s' % fake.VOLUME_TYPE2_ID,
'vol_type_%s' % fake.VOLUME_TYPE3_ID]
actual_names = map(lambda e: e['name'], res_dict['volume_types'])
self.assertEqual(set(expected_names), set(actual_names))
for entry in res_dict['volume_types']:
self.assertEqual('value1', entry['extra_specs']['key1'])
def test_volume_types_index_no_data(self):
self.stubs.Set(volume_types, 'get_all_types',
return_empty_volume_types_get_all_types)
req = fakes.HTTPRequest.blank('/v1/%s/types' % fake.PROJECT_ID)
res_dict = self.controller.index(req)
self.assertEqual(0, len(res_dict['volume_types']))
def test_volume_types_show(self):
self.stubs.Set(volume_types, 'get_volume_type',
return_volume_types_get_volume_type)
type_id = fake.VOLUME_TYPE_ID
req = fakes.HTTPRequest.blank('/v1/%s/types/' % fake.PROJECT_ID
+ type_id)
res_dict = self.controller.show(req, type_id)
self.assertEqual(1, len(res_dict))
self.assertEqual(type_id, res_dict['volume_type']['id'])
vol_type_name = 'vol_type_' + type_id
self.assertEqual(vol_type_name, res_dict['volume_type']['name'])
def test_volume_types_show_not_found(self):
self.stubs.Set(volume_types, 'get_volume_type',
return_volume_types_get_volume_type)
req = fakes.HTTPRequest.blank('/v1/%s/types/%s' %
(fake.PROJECT_ID,
fake.WILL_NOT_BE_FOUND_ID))
self.assertRaises(exception.VolumeTypeNotFound, self.controller.show,
req, fake.WILL_NOT_BE_FOUND_ID)
def test_view_builder_show(self):
view_builder = views_types.ViewBuilder()
now = timeutils.utcnow().isoformat()
raw_volume_type = dict(name='new_type',
deleted=False,
created_at=now,
updated_at=now,
extra_specs={},
deleted_at=None,
description=None,
id=fake.VOLUME_TYPE_ID)
request = fakes.HTTPRequest.blank("/v1")
output = view_builder.show(request, raw_volume_type)
self.assertIn('volume_type', output)
expected_volume_type = dict(name='new_type',
extra_specs={},
description=None,
is_public=None,
id=fake.VOLUME_TYPE_ID)
self.assertDictEqual(expected_volume_type, output['volume_type'])
def test_view_builder_list(self):
view_builder = views_types.ViewBuilder()
now = timeutils.utcnow().isoformat()
raw_volume_types = []
volume_type_ids = []
for i in range(0, 10):
volume_type_id = str(uuid.uuid4())
volume_type_ids.append(volume_type_id)
raw_volume_types.append(dict(name='new_type',
deleted=False,
created_at=now,
updated_at=now,
extra_specs={},
deleted_at=None,
description=None,
id=volume_type_id))
request = fakes.HTTPRequest.blank("/v1")
output = view_builder.index(request, raw_volume_types)
self.assertIn('volume_types', output)
for i in range(0, 10):
expected_volume_type = dict(name='new_type',
extra_specs={},
id=volume_type_ids[i],
is_public=None,
description=None)
self.assertDictEqual(expected_volume_type,
output['volume_types'][i])

View File

@ -1,661 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
import webob
from cinder.api import extensions
from cinder.api.v1 import volume_metadata
from cinder.api.v1 import volumes
import cinder.db
from cinder import exception as exc
from cinder import objects
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v1 import stubs
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder import volume
CONF = cfg.CONF
def return_create_volume_metadata_max(context, volume_id, metadata, delete):
return stub_max_volume_metadata(volume_id)
def return_create_volume_metadata(context, volume_id, metadata, delete,
meta_type):
return stub_volume_metadata(volume_id)
def return_new_volume_metadata(context, volume_id, metadata,
delete, meta_type):
return stub_new_volume_metadata(volume_id)
def return_create_volume_metadata_insensitive(context, volume_id,
metadata, delete,
meta_type):
return stub_volume_metadata_insensitive(volume_id)
def return_volume_metadata(context, volume_id):
return stub_volume_metadata(volume_id)
def return_empty_volume_metadata(context, volume_id):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
return {}
def return_empty_container_metadata(context, volume_id, metadata,
delete, meta_type):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
return {}
def delete_volume_metadata(context, volume_id, key, meta_type):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
pass
def stub_volume_metadata(volume_id):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
return metadata
def stub_new_volume_metadata(volume_id):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
metadata = {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
}
return metadata
def stub_volume_metadata_insensitive(volume_id):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4",
}
return metadata
def stub_max_volume_metadata(volume_id):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
metadata = {"metadata": {}}
for num in range(CONF.quota_metadata_items):
metadata['metadata']['key%i' % num] = "blah"
return metadata
def get_volume(self, context, volume_id, *args, **kwargs):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound('bogus test message')
vol = {'id': volume_id,
'size': 100,
'name': 'fake',
'host': 'fake-host',
'status': 'available',
'encryption_key_id': None,
'volume_type_id': None,
'migration_status': None,
'availability_zone': 'zone1:host1',
'attach_status': fields.VolumeAttachStatus.DETACHED}
return fake_volume.fake_volume_obj(context, **vol)
class volumeMetaDataTest(test.TestCase):
def setUp(self):
super(volumeMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
self.stubs.Set(volume.api.API, 'get', stubs.stub_volume_get)
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_volume_metadata)
self.patch(
'cinder.db.service_get_all', autospec=True,
return_value=stubs.stub_service_get_all(None))
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.volume_controller = volumes.VolumeController(self.ext_mgr)
self.controller = volume_metadata.Controller()
self.url = '/v1/%s/volumes/%s/metadata' % (
fake.PROJECT_ID, fake.VOLUME_ID)
vol = {"size": 100,
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
"availability_zone": "zone1:host1",
"metadata": {}}
body = {"volume": vol}
req = fakes.HTTPRequest.blank('/v1/volumes')
self.volume_controller.create(req, body)
def test_index(self):
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, fake.VOLUME_ID)
expected = {
'metadata': {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
},
}
self.assertEqual(expected, res_dict)
def test_index_nonexistent_volume(self):
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(exc.VolumeNotFound,
self.controller.index,
req, fake.WILL_NOT_BE_FOUND_ID)
def test_index_no_metadata(self):
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, fake.VOLUME_ID)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
def test_show(self):
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, fake.VOLUME_ID, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
def test_show_nonexistent_volume(self):
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(exc.VolumeNotFound,
self.controller.show, req,
fake.WILL_NOT_BE_FOUND_ID, 'key2')
def test_show_meta_not_found(self):
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(exc.VolumeMetadataNotFound,
self.controller.show, req, fake.VOLUME_ID, 'key6')
@mock.patch.object(cinder.db, 'volume_metadata_delete')
@mock.patch.object(cinder.db, 'volume_metadata_get')
def test_delete(self, metadata_get, metadata_delete):
fake_volume = objects.Volume(id=fake.VOLUME_ID, status='available')
fake_context = mock.Mock()
metadata_get.side_effect = return_volume_metadata
metadata_delete.side_effect = delete_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res = self.controller.delete(req, fake.VOLUME_ID, 'key2')
self.assertEqual(200, res.status_int)
get_volume.assert_called_with(fake_context, fake.VOLUME_ID)
@mock.patch.object(cinder.db, 'volume_metadata_get')
def test_delete_nonexistent_volume(self, metadata_get):
fake_context = mock.Mock()
metadata_get.side_effect = return_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'DELETE'
req.environ['cinder.context'] = fake_context
self.assertRaises(exc.VolumeNotFound, self.controller.delete,
req, fake.WILL_NOT_BE_FOUND_ID, 'key1')
def test_delete_meta_not_found(self):
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(exc.VolumeMetadataNotFound,
self.controller.delete, req, fake.VOLUME_ID, 'key6')
@mock.patch.object(cinder.db, 'volume_metadata_update')
@mock.patch.object(cinder.db, 'volume_metadata_get')
def test_create(self, metadata_get, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_get.side_effect = return_empty_volume_metadata
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank('/v1/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3", }}
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.create(req, fake.VOLUME_ID, body)
self.assertEqual(body, res_dict)
@mock.patch.object(cinder.db, 'volume_metadata_update')
@mock.patch.object(cinder.db, 'volume_metadata_get')
def test_create_with_keys_in_uppercase_and_lowercase(self, metadata_get,
metadata_update):
# if the keys in uppercase_and_lowercase, should return the one
# which server added
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_get.side_effect = return_empty_volume_metadata
metadata_update.side_effect = return_create_volume_metadata_insensitive
req = fakes.HTTPRequest.blank('/v1/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"KEY1": "value1",
"key2": "value2",
"KEY2": "value2",
"key3": "value3",
"KEY4": "value4"}}
expected = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4"}}
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.create(req, fake.VOLUME_ID, body)
self.assertEqual(expected, res_dict)
def test_create_empty_body(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, fake.VOLUME_ID, None)
def test_create_item_empty_key(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, fake.VOLUME_ID, body)
def test_create_item_key_too_long(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create,
req, fake.VOLUME_ID, body)
def test_create_nonexistent_volume(self):
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank('/v1/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dump_as_bytes(body)
self.assertRaises(exc.VolumeNotFound,
self.controller.create, req,
fake.WILL_NOT_BE_FOUND_ID, body)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_all(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_new_volume_metadata
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dump_as_bytes(expected)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.update_all(req, fake.VOLUME_ID,
expected)
self.assertEqual(expected, res_dict)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
@mock.patch.object(cinder.db, 'volume_metadata_update')
@mock.patch.object(cinder.db, 'volume_metadata_get')
def test_update_all_with_keys_in_uppercase_and_lowercase(self,
metadata_get,
metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_get.side_effect = return_create_volume_metadata
metadata_update.side_effect = return_new_volume_metadata
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {
'metadata': {
'key10': 'value10',
'KEY10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dump_as_bytes(expected)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.update_all(req, fake.VOLUME_ID, body)
self.assertEqual(expected, res_dict)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_all_empty_container(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_empty_container_metadata
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': {}}
req.body = jsonutils.dump_as_bytes(expected)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.update_all(req, fake.VOLUME_ID,
expected)
self.assertEqual(expected, res_dict)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_item_value_too_long(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": ("a" * 260)}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, fake.VOLUME_ID, "key1", body)
self.assertFalse(metadata_update.called)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
def test_update_all_malformed_container(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'meta': {}}
req.body = jsonutils.dump_as_bytes(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, fake.VOLUME_ID,
expected)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_all_malformed_data(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': ['asdf']}
req.body = jsonutils.dump_as_bytes(expected)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, fake.VOLUME_ID,
expected)
def test_update_all_nonexistent_volume(self):
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {'metadata': {'key10': 'value10'}}
req.body = jsonutils.dump_as_bytes(body)
self.assertRaises(exc.VolumeNotFound,
self.controller.update_all, req,
fake.WILL_NOT_BE_FOUND_ID, body)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_item(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
res_dict = self.controller.update(req, fake.VOLUME_ID, 'key1',
body)
expected = {'meta': {'key1': 'value1'}}
self.assertEqual(expected, res_dict)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
def test_update_item_nonexistent_volume(self):
req = fakes.HTTPRequest.blank(
'/v1.1/%s/volumes/%s/metadata/key1' % (
fake.PROJECT_ID, fake.WILL_NOT_BE_FOUND_ID))
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(exc.VolumeNotFound,
self.controller.update, req,
fake.WILL_NOT_BE_FOUND_ID, 'key1',
body)
def test_update_item_empty_body(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, fake.VOLUME_ID, 'key1',
None)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_item_empty_key(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, fake.VOLUME_ID,
'', body)
self.assertFalse(metadata_update.called)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_update_item_key_too_long(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, fake.VOLUME_ID, ("a" * 260), body)
self.assertFalse(metadata_update.called)
get_volume.assert_called_once_with(fake_context, fake.VOLUME_ID)
def test_update_item_too_many_keys(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1", "key2": "value2"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, fake.VOLUME_ID, 'key1',
body)
def test_update_item_body_uri_mismatch(self):
self.stubs.Set(cinder.db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/bad')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, fake.VOLUME_ID, 'bad',
body)
@mock.patch.object(cinder.db, 'volume_metadata_update')
def test_invalid_metadata_items_on_create(self, metadata_update):
fake_volume = {'id': fake.VOLUME_ID, 'status': 'available'}
fake_context = mock.Mock()
metadata_update.side_effect = return_create_volume_metadata
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
# test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dump_as_bytes(data)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req,
fake.VOLUME_ID, data)
# test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dump_as_bytes(data)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req,
fake.VOLUME_ID, data)
# test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(data)
req.environ['cinder.context'] = fake_context
with mock.patch.object(self.controller.volume_api,
'get') as get_volume:
get_volume.return_value = fake_volume
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req,
fake.VOLUME_ID, data)

View File

@ -13,12 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import iso8601
import ddt
import mock
from oslo_config import cfg
from six.moves import range
import webob
from cinder.api import extensions
@ -29,15 +31,13 @@ from cinder import exception as exc
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v1 import stubs
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit import utils
from cinder.volume import api as volume_api
NS = '{http://docs.openstack.org/api/openstack-block-storage/1.0/content}'
CONF = cfg.CONF
@ -49,17 +49,16 @@ class VolumeApiTest(test.TestCase):
self.ext_mgr.extensions = {}
fake_image.mock_image_service(self)
self.controller = volumes.VolumeController(self.ext_mgr)
self.stubs.Set(db, 'volume_get_all', stubs.stub_volume_get_all)
self.patch(
'cinder.db.service_get_all', autospec=True,
return_value=stubs.stub_service_get_all(None))
self.stubs.Set(volume_api.API, 'delete', stubs.stub_volume_delete)
self.maxDiff = None
self.ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
def test_volume_create(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(volume_api.API, 'get', v2_fakes.fake_volume_get)
self.mock_object(volume_api.API, "create",
v2_fakes.fake_volume_api_create)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
vol = {"size": 100,
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
@ -86,12 +85,17 @@ class VolumeApiTest(test.TestCase):
'encrypted': False}}
self.assertEqual(expected, res_dict)
def test_volume_create_with_type(self):
vol_type = CONF.default_volume_type
db.volume_type_create(context.get_admin_context(),
dict(name=vol_type, extra_specs={}))
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
vol_type)
@mock.patch.object(db, 'service_get_all',
return_value=v2_fakes.fake_service_get_all_by_topic(
None, None),
autospec=True)
def test_volume_create_with_type(self, mock_service_get):
vol_type = db.volume_type_create(
context.get_admin_context(),
dict(name=CONF.default_volume_type, extra_specs={})
)
db_vol_type = db.volume_type_get(context.get_admin_context(),
vol_type.id)
vol = {"size": 100,
"display_name": "Volume Test Name",
@ -103,6 +107,7 @@ class VolumeApiTest(test.TestCase):
# Raise 404 when type name isn't valid
self.assertRaises(exc.VolumeTypeNotFoundByName,
self.controller.create, req, body)
# Use correct volume type name
vol.update(dict(volume_type=CONF.default_volume_type))
body.update(dict(volume=vol))
@ -145,9 +150,10 @@ class VolumeApiTest(test.TestCase):
req, body)
def test_volume_create_with_image_id(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(volume_api.API, "create",
v2_fakes.fake_volume_api_create)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
test_id = "c905cedb-7281-47e4-8a62-f26bc5fc4c77"
@ -180,8 +186,9 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(expected, res_dict)
def test_volume_create_with_image_id_is_integer(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.mock_object(volume_api.API, "create", v2_fakes.fake_volume_create)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = {"size": '1',
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
@ -195,7 +202,10 @@ class VolumeApiTest(test.TestCase):
body)
def test_volume_create_with_image_id_not_uuid_format(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.mock_object(volume_api.API, "create", v2_fakes.fake_volume_create)
self.mock_object(fake_image._FakeImageService,
"detail",
v2_fakes.fake_image_service_detail)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = {"size": '1',
"display_name": "Volume Test Name",
@ -210,7 +220,10 @@ class VolumeApiTest(test.TestCase):
body)
def test_volume_create_with_image_id_with_empty_string(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.mock_object(volume_api.API, "create", v2_fakes.fake_volume_create)
self.mock_object(fake_image._FakeImageService,
"detail",
v2_fakes.fake_image_service_detail)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = {"size": 1,
"display_name": "Volume Test Name",
@ -224,16 +237,15 @@ class VolumeApiTest(test.TestCase):
req,
body)
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
@mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get)
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
@mock.patch.object(volume_api.API, 'update',
side_effect=stubs.stub_volume_update, autospec=True)
def test_volume_update(self, *args):
def test_volume_update(self):
self.mock_object(volume_api.API, 'get', v2_fakes.fake_volume_api_get)
self.mock_object(volume_api.API, "update", v2_fakes.fake_volume_update)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
self.mock_object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
updates = {
"display_name": "Updated Test Name",
}
@ -262,19 +274,14 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={"qos_max_iops": 2000,
"readonly": "False",
"attached_mode": "rw"})
@mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get)
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
@mock.patch.object(volume_api.API, 'update',
side_effect=stubs.stub_volume_update, autospec=True)
def test_volume_update_metadata(self, *args):
def test_volume_update_metadata(self):
self.mock_object(volume_api.API, 'get', v2_fakes.fake_volume_api_get)
self.mock_object(volume_api.API, "update", v2_fakes.fake_volume_update)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
updates = {
"metadata": {"qos_max_iops": 2000}
"metadata": {"qos_max_iops": '2000'}
}
body = {"volume": updates}
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
@ -304,14 +311,9 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(2, len(self.notifier.notifications))
def test_volume_update_with_admin_metadata(self):
def stubs_volume_admin_metadata_get(context, volume_id):
return {'key': 'value',
'readonly': 'True'}
self.stubs.Set(db, 'volume_admin_metadata_get',
stubs_volume_admin_metadata_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
self.mock_object(volume_api.API, "update", v2_fakes.fake_volume_update)
volume = stubs.stub_volume(fake.VOLUME_ID)
volume = v2_fakes.create_fake_volume(fake.VOLUME_ID)
del volume['name']
del volume['volume_type']
del volume['volume_type_id']
@ -380,12 +382,13 @@ class VolumeApiTest(test.TestCase):
req, fake.VOLUME_ID, body)
def test_update_not_found(self):
self.mock_object(volume_api.API, "get",
v2_fakes.fake_volume_get_notfound)
updates = {
"display_name": "Updated Test Name",
"name": "Updated Test Name",
}
body = {"volume": updates}
body = {"volume": updates}
req = fakes.HTTPRequest.blank(
'/v1/volumes/%s' % fake.WILL_NOT_BE_FOUND_ID)
self.assertRaises(exc.VolumeNotFound,
@ -393,16 +396,10 @@ class VolumeApiTest(test.TestCase):
req, fake.WILL_NOT_BE_FOUND_ID, body)
def test_volume_list(self):
def stubs_volume_admin_metadata_get(context, volume_id):
return {'attached_mode': 'rw',
'readonly': 'False'}
self.stubs.Set(db, 'volume_admin_metadata_get',
stubs_volume_admin_metadata_get)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(volume_api.API, 'get_all',
v2_fakes.fake_volume_api_get_all_by_project)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes')
res_dict = self.controller.index(req)
@ -429,7 +426,7 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(1, len(req.cached_resource()))
def test_volume_list_with_admin_metadata(self):
volume = stubs.stub_volume(fake.VOLUME_ID)
volume = v2_fakes.create_fake_volume(fake.VOLUME_ID)
del volume['name']
del volume['volume_type']
del volume['volume_type_id']
@ -475,17 +472,14 @@ class VolumeApiTest(test.TestCase):
'size': 1}]}
self.assertEqual(expected, res_dict)
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
def test_volume_list_detail(self, *args):
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
def test_volume_list_detail(self):
self.mock_object(volume_api.API, 'get_all',
v2_fakes.fake_volume_api_get_all_by_project)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail')
res_dict = self.controller.index(req)
res_dict = self.controller.detail(req)
expected = {'volumes': [{'status': 'fakestatus',
'display_description': 'displaydesc',
'availability_zone': 'fakeaz',
@ -509,7 +503,7 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(1, len(req.cached_resource()))
def test_volume_list_detail_with_admin_metadata(self):
volume = stubs.stub_volume(fake.VOLUME_ID)
volume = v2_fakes.create_fake_volume(fake.VOLUME_ID)
del volume['name']
del volume['volume_type']
del volume['volume_type_id']
@ -555,14 +549,11 @@ class VolumeApiTest(test.TestCase):
'size': 1}]}
self.assertEqual(expected, res_dict)
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
@mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get, autospec=True)
def test_volume_show(self, *args):
def test_volume_show(self):
self.mock_object(volume_api.API, 'get', v2_fakes.fake_volume_api_get)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
res_dict = self.controller.show(req, fake.VOLUME_ID)
expected = {'volume': {'status': 'fakestatus',
@ -588,15 +579,22 @@ class VolumeApiTest(test.TestCase):
self.assertIsNotNone(req.cached_resource_by_id(fake.VOLUME_ID))
def test_volume_show_no_attachments(self):
def stub_volume_get(self, context, volume_id, **kwargs):
vol = stubs.stub_volume(
def fake_volume_get(self, context, volume_id, **kwargs):
vol = v2_fakes.create_fake_volume(
volume_id,
attach_status = fields.VolumeAttachStatus.DETACHED)
attach_status=fields.VolumeAttachStatus.DETACHED)
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
def fake_volume_admin_metadata_get(context, volume_id, **kwargs):
return v2_fakes.fake_volume_admin_metadata_get(
context, volume_id,
attach_status=fields.VolumeAttachStatus.DETACHED)
self.mock_object(volume_api.API, 'get', fake_volume_get)
self.mock_object(db, 'volume_admin_metadata_get',
fake_volume_admin_metadata_get)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
res_dict = self.controller.show(req, fake.VOLUME_ID)
@ -617,41 +615,12 @@ class VolumeApiTest(test.TestCase):
1900, 1, 1, 1, 1, 1,
tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
def test_volume_show_bootable(self):
def stub_volume_get(self, context, volume_id, **kwargs):
vol = (stubs.stub_volume(volume_id,
volume_glance_metadata=dict(foo='bar')))
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
res_dict = self.controller.show(req, fake.VOLUME_ID)
expected = {'volume': {'status': 'fakestatus',
'display_description': 'displaydesc',
'availability_zone': 'fakeaz',
'display_name': 'displayname',
'encrypted': False,
'attachments': [],
'multiattach': 'false',
'bootable': 'true',
'volume_type': 'vol_type_name',
'snapshot_id': None,
'source_volid': None,
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': fake.VOLUME_ID,
'created_at': datetime.datetime(
1900, 1, 1, 1, 1, 1,
tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
def test_volume_show_no_volume(self):
self.mock_object(volume_api.API, "get",
v2_fakes.fake_volume_get_notfound)
req = fakes.HTTPRequest.blank(
'/v1/volumes/%s' % fake.WILL_NOT_BE_FOUND_ID)
@ -659,34 +628,28 @@ class VolumeApiTest(test.TestCase):
self.controller.show,
req,
fake.WILL_NOT_BE_FOUND_ID)
# Finally test that we did not cache anything
# Finally test that nothing was cached
self.assertIsNone(req.cached_resource_by_id(fake.WILL_NOT_BE_FOUND_ID))
def _create_db_volumes(self, num_volumes):
volumes = [utils.create_volume(self.ctxt, display_name='vol%s' % i)
for i in range(num_volumes)]
for vol in volumes:
self.addCleanup(db.volume_destroy, self.ctxt, vol.id)
volumes.reverse()
return volumes
def test_volume_detail_limit_offset(self):
created_volumes = self._create_db_volumes(2)
def volume_detail_limit_offset(is_admin):
def stub_volume_get_all_by_project(context, project_id, marker,
limit, sort_keys=None,
sort_dirs=None, filters=None,
viewable_admin_meta=False,
offset=None):
return [
stubs.stub_volume(fake.VOLUME_ID, display_name='vol1'),
stubs.stub_volume(fake.VOLUME2_ID, display_name='vol2'),
]
self.stubs.Set(db, 'volume_get_all_by_project',
stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2\
&offset=1',
req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2'
'&offset=1',
use_admin_context=is_admin)
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
self.assertEqual(1, len(volumes))
self.assertEqual(fake.VOLUME2_ID, volumes[0]['id'])
self.assertEqual(created_volumes[1].id, volumes[0]['id'])
# admin case
volume_detail_limit_offset(is_admin=True)
@ -694,7 +657,7 @@ class VolumeApiTest(test.TestCase):
volume_detail_limit_offset(is_admin=False)
def test_volume_show_with_admin_metadata(self):
volume = stubs.stub_volume(fake.VOLUME_ID)
volume = v2_fakes.create_fake_volume(fake.VOLUME_ID)
del volume['name']
del volume['volume_type']
del volume['volume_type_id']
@ -741,48 +704,49 @@ class VolumeApiTest(test.TestCase):
self.assertEqual(expected, res_dict)
def test_volume_show_with_encrypted_volume(self):
def stub_volume_get(self, context, volume_id, **kwargs):
vol = stubs.stub_volume(volume_id, encryption_key_id=fake.KEY_ID)
def fake_volume_get(self, context, volume_id, **kwargs):
vol = v2_fakes.create_fake_volume(volume_id,
encryption_key_id=fake.KEY_ID)
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(volume_api.API, 'get', fake_volume_get)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
res_dict = self.controller.show(req, fake.VOLUME_ID)
self.assertTrue(res_dict['volume']['encrypted'])
def test_volume_show_with_unencrypted_volume(self):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(volume_api.API, 'get', v2_fakes.fake_volume_api_get)
self.mock_object(db.sqlalchemy.api, '_volume_type_get_full',
v2_fakes.fake_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
res_dict = self.controller.show(req, fake.VOLUME_ID)
self.assertEqual(False, res_dict['volume']['encrypted'])
@mock.patch.object(volume_api.API, 'delete', v2_fakes.fake_volume_delete)
@mock.patch.object(volume_api.API, 'get', v2_fakes.fake_volume_get)
def test_volume_delete(self):
self.stubs.Set(db.sqlalchemy.api, 'volume_get',
stubs.stub_volume_get_db)
req = fakes.HTTPRequest.blank('/v1/volumes/%s' % fake.VOLUME_ID)
resp = self.controller.delete(req, fake.VOLUME_ID)
self.assertEqual(202, resp.status_int)
def test_volume_delete_no_volume(self):
self.mock_object(volume_api.API, "get",
v2_fakes.fake_volume_get_notfound)
req = fakes.HTTPRequest.blank(
'/v1/volumes/%s' % fake.WILL_NOT_BE_FOUND_ID)
self.assertRaises(exc.VolumeNotFound,
self.controller.delete,
req,
fake.WILL_NOT_BE_FOUND_ID)
req, fake.WILL_NOT_BE_FOUND_ID)
def test_admin_list_volumes_limited_to_project(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
self.mock_object(db, 'volume_get_all_by_project',
v2_fakes.fake_volume_get_all_by_project)
req = fakes.HTTPRequest.blank('/v1/%s/volumes' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
@ -790,9 +754,10 @@ class VolumeApiTest(test.TestCase):
self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
@mock.patch.object(db, 'volume_get_all', v2_fakes.fake_volume_get_all)
@mock.patch.object(db, 'volume_get_all_by_project',
v2_fakes.fake_volume_get_all_by_project)
def test_admin_list_volumes_all_tenants(self):
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank(
'/v1/%s/volumes?all_tenants=1' % fake.PROJECT_ID,
use_admin_context=True)
@ -800,91 +765,26 @@ class VolumeApiTest(test.TestCase):
self.assertIn('volumes', res)
self.assertEqual(3, len(res['volumes']))
@mock.patch.object(db, 'volume_get_all', v2_fakes.fake_volume_get_all)
@mock.patch.object(db, 'volume_get_all_by_project',
v2_fakes.fake_volume_get_all_by_project)
@mock.patch.object(volume_api.API, 'get', v2_fakes.fake_volume_get)
def test_all_tenants_non_admin_gets_all_tenants(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank(
'/v1/%s/volumes?all_tenants=1' % fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
@mock.patch.object(db, 'volume_get_all_by_project',
v2_fakes.fake_volume_get_all_by_project)
@mock.patch.object(volume_api.API, 'get', v2_fakes.fake_volume_get)
def test_non_admin_get_by_project(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/%s/volumes' % fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
@mock.patch('cinder.volume.api.API.get_all')
def test_get_volumes_filter_with_string(self, get_all):
req = mock.MagicMock()
req.GET.copy.return_value = {'display_name': 'Volume-573108026'}
context = mock.Mock()
req.environ = {'cinder.context': context}
self.controller._items(req, mock.Mock)
get_all.assert_called_once_with(
context, sort_dirs=['desc'], viewable_admin_meta=True,
sort_keys=['created_at'], limit=None,
filters={'display_name': 'Volume-573108026'}, marker=None)
@mock.patch('cinder.volume.api.API.get_all')
def test_get_volumes_filter_with_list(self, get_all):
req = mock.MagicMock()
req.GET.copy.return_value = {'id': "['1', '2', '3']"}
context = mock.Mock()
req.environ = {'cinder.context': context}
self.controller._items(req, mock.Mock)
get_all.assert_called_once_with(
context, sort_dirs=['desc'], viewable_admin_meta=True,
sort_keys=['created_at'], limit=None,
filters={'id': ['1', '2', '3']}, marker=None)
@mock.patch('cinder.volume.api.API.get_all')
def test_get_volumes_filter_with_expression(self, get_all):
req = mock.MagicMock()
req.GET.copy.return_value = {'id': "d+"}
context = mock.Mock()
req.environ = {'cinder.context': context}
self.controller._items(req, mock.Mock)
get_all.assert_called_once_with(
context, sort_dirs=['desc'], viewable_admin_meta=True,
sort_keys=['created_at'], limit=None, filters={'id': 'd+'},
marker=None)
@ddt.data({'s': 'ea895e29-8485-4930-bbb8-c5616a309c0e'},
['ea895e29-8485-4930-bbb8-c5616a309c0e'],
42)
def test_volume_creation_fails_with_invalid_snapshot_type(self, value):
snapshot_id = value
vol = {"size": 1,
"snapshot_id": snapshot_id}
body = {"volume": vol}
req = fakes.HTTPRequest.blank('/v1/volumes')
# Raise 400 when snapshot has not uuid type.
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, body)
class VolumesUnprocessableEntityTestCase(test.TestCase):
"""Tests of places we throw 422 Unprocessable Entity from."""
def setUp(self):
super(VolumesUnprocessableEntityTestCase, self).setUp()
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.controller = volumes.VolumeController(self.ext_mgr)
def _unprocessable_volume_create(self, body):
req = fakes.HTTPRequest.blank('/v1/%s/volumes' % fake.PROJECT_ID)
req.method = 'POST'