Merge "tests: Merge v2, v3 snapshot API tests"

This commit is contained in:
Zuul
2025-10-31 21:01:12 +00:00
committed by Gerrit Code Review
2 changed files with 715 additions and 751 deletions

View File

@@ -1,748 +0,0 @@
# Copyright 2011 Denali Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from http import HTTPStatus
from unittest import mock
from urllib import parse as urllib
from zoneinfo import ZoneInfo
import ddt
from oslo_config import cfg
import webob
from cinder.api import common
from cinder.api.v2 import snapshots
from cinder import context
from cinder import db
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit import test
from cinder.tests.unit import utils
from cinder import volume
CONF = cfg.CONF
UUID = '00000000-0000-0000-0000-000000000001'
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
def _get_default_snapshot_param():
return {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'created_at': None,
'updated_at': None,
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
'display_name': 'Default name',
'display_description': 'Default description',
'deleted': None,
'volume': {'availability_zone': 'test_zone'}
}
def fake_snapshot_delete(self, context, snapshot):
if snapshot['id'] != UUID:
raise exception.SnapshotNotFound(snapshot['id'])
def fake_snapshot_get(self, context, snapshot_id):
if snapshot_id != UUID:
raise exception.SnapshotNotFound(snapshot_id)
param = _get_default_snapshot_param()
return param
def fake_snapshot_get_all(self, context, search_opts=None):
param = _get_default_snapshot_param()
return [param]
@ddt.ddt
class SnapshotApiTest(test.TestCase):
def setUp(self):
super(SnapshotApiTest, self).setUp()
self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
self.controller = snapshots.SnapshotsController()
self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
def test_snapshot_create(self):
volume = utils.create_volume(self.ctx, volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": False,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot_name, resp_dict['snapshot']['name'])
self.assertEqual(snapshot_description,
resp_dict['snapshot']['description'])
self.assertIn('updated_at', resp_dict['snapshot'])
db.volume_destroy(self.ctx, volume.id)
def test_snapshot_create_with_null_validate(self):
volume = utils.create_volume(self.ctx, volume_type_id=None)
snapshot = {
"volume_id": volume.id,
"force": False,
"name": None,
"description": None
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertIsNone(resp_dict['snapshot']['name'])
self.assertIsNone(resp_dict['snapshot']['description'])
db.volume_destroy(self.ctx, volume.id)
@ddt.data(True, 'y', 'true', 'yes', '1', 'on')
def test_snapshot_create_force(self, force_param):
volume = utils.create_volume(self.ctx, status='in-use',
volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot_name,
resp_dict['snapshot']['name'])
self.assertEqual(snapshot_description,
resp_dict['snapshot']['description'])
self.assertIn('updated_at', resp_dict['snapshot'])
db.volume_destroy(self.ctx, volume.id)
@ddt.data(False, 'n', 'false', 'No', '0', 'off')
def test_snapshot_create_force_failure(self, force_param):
volume = utils.create_volume(self.ctx, status='in-use',
volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.InvalidVolume,
self.controller.create,
req,
body=body)
db.volume_destroy(self.ctx, volume.id)
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01', 'falSE', 0, 'trUE', 1,
"1 ")
def test_snapshot_create_invalid_force_param(self, force_param):
volume = utils.create_volume(self.ctx, status='available',
volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.ValidationError,
self.controller.create,
req,
body=body)
db.volume_destroy(self.ctx, volume.id)
def test_snapshot_create_without_volume_id(self):
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
body = {
"snapshot": {
"force": True,
"name": snapshot_name,
"description": snapshot_description
}
}
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.ValidationError,
self.controller.create, req, body=body)
@ddt.data({"snapshot": {"description": " sample description",
"name": " test"}},
{"snapshot": {"description": "sample description ",
"name": "test "}},
{"snapshot": {"description": " sample description ",
"name": " test name "}})
def test_snapshot_create_with_leading_trailing_spaces(self, body):
volume = utils.create_volume(self.ctx, volume_type_id=None)
body['snapshot']['volume_id'] = volume.id
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertEqual(body['snapshot']['display_name'].strip(),
resp_dict['snapshot']['name'])
self.assertEqual(body['snapshot']['description'].strip(),
resp_dict['snapshot']['description'])
db.volume_destroy(self.ctx, volume.id)
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2014-01-01 00:00:00",
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
expected = {
'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': datetime.datetime(2014, 1, 1, 0, 0, 0,
tzinfo=ZoneInfo('UTC')),
'updated_at': None,
'name': u'Updated Test Name',
'description': u'Default description',
'metadata': {},
}
}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update_with_null_validate(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2014-01-01 00:00:00",
'volume_size': 100,
'name': 'Default name',
'description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": None,
"description": None,
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
res_dict['snapshot']['status'])
self.assertIsNone(res_dict['snapshot']['name'])
self.assertIsNone(res_dict['snapshot']['description'])
def test_snapshot_update_missing_body(self):
body = {}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
self.assertRaises(exception.ValidationError,
self.controller.update, req, UUID, body=body)
def test_snapshot_update_invalid_body(self):
body = {'name': 'missing top level snapshot key'}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
self.assertRaises(exception.ValidationError,
self.controller.update, req, UUID, body=body)
def test_snapshot_update_not_found(self):
self.mock_object(volume.api.API, "get_snapshot", fake_snapshot_get)
updates = {
"name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/not-the-uuid')
self.assertRaises(exception.SnapshotNotFound, self.controller.update,
req, 'not-the-uuid', body=body)
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update_with_leading_trailing_spaces(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2018-01-14 00:00:00",
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": " test ",
"description": " test "
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
expected = {
'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': datetime.datetime(2018, 1, 14, 0, 0, 0,
tzinfo=ZoneInfo('UTC')),
'updated_at': None,
'name': u'test',
'description': u'test',
'metadata': {},
}
}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
@mock.patch.object(volume.api.API, "delete_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get, delete_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
snapshot_id = UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
req.environ['cinder.context'] = self.ctx
resp = self.controller.delete(req, snapshot_id)
self.assertEqual(HTTPStatus.ACCEPTED, resp.status_int)
def test_snapshot_delete_invalid_id(self):
self.mock_object(volume.api.API, "delete_snapshot",
fake_snapshot_delete)
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound, self.controller.delete,
req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
resp_dict = self.controller.show(req, UUID)
self.assertIn('snapshot', resp_dict)
self.assertEqual(UUID, resp_dict['snapshot']['id'])
self.assertIn('updated_at', resp_dict['snapshot'])
def test_snapshot_show_invalid_id(self):
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound,
self.controller.show, req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
@mock.patch('cinder.volume.api.API.get_all_snapshots')
def test_snapshot_detail(self, get_all_snapshots, snapshot_get_by_id,
volume_get_by_id, snapshot_metadata_get):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
snapshots = objects.SnapshotList(objects=[snapshot_obj])
get_all_snapshots.return_value = snapshots
req = fakes.HTTPRequest.blank('/v3/snapshots/detail')
resp_dict = self.controller.detail(req)
self.assertIn('snapshots', resp_dict)
resp_snapshots = resp_dict['snapshots']
self.assertEqual(1, len(resp_snapshots))
self.assertIn('updated_at', resp_snapshots[0])
resp_snapshot = resp_snapshots.pop()
self.assertEqual(UUID, resp_snapshot['id'])
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_limited_to_project(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snapshots_with_limit_and_offset(self,
snapshot_metadata_get):
def list_snapshots_with_limit_and_offset(snaps, is_admin):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?limit=1'
'&offset=1' % fake.PROJECT_ID,
use_admin_context=is_admin)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
self.assertIn('updated_at', res['snapshots'][0])
# Test that we get an empty list with an offset greater than the
# number of items
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=3')
self.assertEqual({'snapshots': []}, self.controller.index(req))
volume, snaps = self._create_db_snapshots(3)
# admin case
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
# non-admin case
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
@mock.patch.object(db, 'snapshot_get_all_by_project')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snapshots_with_wrong_limit_and_offset(self,
mock_metadata_get,
mock_snapshot_get_all):
"""Test list with negative and non numeric limit and offset."""
mock_snapshot_get_all.return_value = []
# Negative limit
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=-1&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric limit
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=a&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Negative offset
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=-1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric offset
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=a')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Test that we get an exception HTTPBadRequest(400) with an offset
# greater than the maximum offset value.
url = '/v3/snapshots?limit=1&offset=323245324356534235'
req = fakes.HTTPRequest.blank(url)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, req)
def _assert_list_next(self, expected_query=None, project=fake.PROJECT_ID,
**kwargs):
"""Check a page of snapshots list."""
# Since we are accessing v2 api directly we don't need to specify
# v2 in the request path, if we did, we'd get /v3/v2 links back
request_path = '/v3/%s/snapshots' % project
expected_path = request_path
# Construct the query if there are kwargs
if kwargs:
request_str = request_path + '?' + urllib.urlencode(kwargs)
else:
request_str = request_path
# Make the request
req = fakes.HTTPRequest.blank(request_str)
res = self.controller.index(req)
# We only expect to have a next link if there is an actual expected
# query.
if expected_query:
# We must have the links
self.assertIn('snapshots_links', res)
links = res['snapshots_links']
# Must be a list of links, even if we only get 1 back
self.assertIsInstance(links, list)
next_link = links[0]
# rel entry must be next
self.assertIn('rel', next_link)
self.assertIn('next', next_link['rel'])
# href entry must have the right path
self.assertIn('href', next_link)
href_parts = urllib.urlparse(next_link['href'])
self.assertEqual(expected_path, href_parts.path)
# And the query from the next link must match what we were
# expecting
params = urllib.parse_qs(href_parts.query)
self.assertDictEqual(expected_query, params)
# Make sure we don't have links if we were not expecting them
else:
self.assertNotIn('snapshots_links', res)
def _create_db_snapshots(self, num_snaps):
volume = utils.create_volume(self.ctx, volume_type_id=None)
snaps = [utils.create_snapshot(self.ctx,
volume.id,
display_name='snap' + str(i))
for i in range(num_snaps)]
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
for snap in snaps:
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
snaps.reverse()
return volume, snaps
def test_list_snapshots_next_link_default_limit(self):
"""Test that snapshot list pagination is limited by osapi_max_limit."""
volume, snaps = self._create_db_snapshots(3)
# NOTE(geguileo): Since cinder.api.common.limited has already been
# imported his argument max_limit already has a default value of 1000
# so it doesn't matter that we change it to 2. That's why we need to
# mock it and send it current value. We still need to set the default
# value because other sections of the code use it, for example
# _get_collection_links
CONF.set_default('osapi_max_limit', 2)
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
original_call=common.get_pagination_params):
return original_call(params, max_limit)
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
original_call=common._get_limit_param):
return original_call(params, max_limit)
with mock.patch.object(common, 'get_pagination_params',
get_pagination_params), \
mock.patch.object(common, '_get_limit_param',
_get_limit_param):
# The link from the first page should link to the second
self._assert_list_next({'marker': [snaps[1].id]})
# Second page should have no next link
self._assert_list_next(marker=snaps[1].id)
def test_list_snapshots_next_link_with_limit(self):
"""Test snapshot list pagination with specific limit."""
volume, snaps = self._create_db_snapshots(2)
# The link from the first page should link to the second
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
limit=1)
# Even though there are no more elements, we should get a next element
# per specification.
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
# When we go beyond the number of elements there should be no more
# next links
self._assert_list_next(limit=1, marker=snaps[1].id)
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
snapshot_get_all):
def get_all(context, filters=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, offset=None):
if 'project_id' in filters and 'tenant1' in filters['project_id']:
return [v2_fakes.fake_snapshot(fake.VOLUME_ID,
tenant_id='tenant1')]
else:
return []
snapshot_get_all.side_effect = get_all
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1'
'&project_id=tenant1' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_all_tenants_non_admin_gets_all_tenants(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_non_admin_get_by_project(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def _create_snapshot_bad_body(self, body):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
req.method = 'POST'
self.assertRaises(exception.ValidationError,
self.controller.create, req, body=body)
def test_create_no_body(self):
self._create_snapshot_bad_body(body=None)
def test_create_missing_snapshot(self):
body = {'foo': {'a': 'b'}}
self._create_snapshot_bad_body(body=body)
def test_create_malformed_entity(self):
body = {'snapshot': 'string'}
self._create_snapshot_bad_body(body=body)

View File

@@ -13,17 +13,25 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from http import HTTPStatus
from unittest import mock
import urllib.parse
from zoneinfo import ZoneInfo
import ddt
from oslo_config import cfg
from oslo_utils import strutils
import webob
from webob import exc
from cinder.api import common
from cinder.api import microversions as mv
from cinder.api.v3 import snapshots
from cinder import context
from cinder import db
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder.tests.unit.api import fakes
@@ -35,11 +43,13 @@ from cinder.tests.unit import test
from cinder.tests.unit import utils as test_utils
from cinder import volume
CONF = cfg.CONF
UUID = '00000000-0000-0000-0000-000000000001'
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
def fake_get(self, context, *args, **kwargs):
def fake_volume_get(self, context, *args, **kwargs):
vol = {'id': fake.VOLUME_ID,
'size': 100,
'name': 'fake',
@@ -54,6 +64,41 @@ def fake_get(self, context, *args, **kwargs):
return fake_volume.fake_volume_obj(context, **vol)
def _get_default_snapshot_param():
return {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'created_at': None,
'updated_at': None,
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
'display_name': 'Default name',
'display_description': 'Default description',
'deleted': None,
'volume': {'availability_zone': 'test_zone'}
}
def fake_snapshot_delete(self, context, snapshot):
if snapshot['id'] != UUID:
raise exception.SnapshotNotFound(snapshot['id'])
def fake_snapshot_get(self, context, snapshot_id):
if snapshot_id != UUID:
raise exception.SnapshotNotFound(snapshot_id)
param = _get_default_snapshot_param()
return param
def fake_snapshot_get_all(self, context, search_opts=None):
param = _get_default_snapshot_param()
return [param]
def create_snapshot_query_with_metadata(metadata_query_string,
api_microversion):
"""Helper to create metadata querystring with microversion"""
@@ -68,8 +113,8 @@ def create_snapshot_query_with_metadata(metadata_query_string,
@ddt.ddt
class SnapshotApiTest(test.TestCase):
def setUp(self):
super(SnapshotApiTest, self).setUp()
self.mock_object(volume.api.API, 'get', fake_get)
super().setUp()
self.mock_object(volume.api.API, 'get', fake_volume_get)
self.mock_object(db.sqlalchemy.api, 'volume_type_get',
v2_fakes.fake_volume_type_get)
self.patch('cinder.quota.QUOTAS.reserve')
@@ -455,3 +500,670 @@ class SnapshotApiTest(test.TestCase):
self.controller.create(req, body=body)
# ... but also shouldn't allow an in-use snapshot
self.assertNotIn('allow_in_use', mock_create.call_args_list[0][1])
@ddt.ddt
class SnapshotApiTestNoMicroversion(test.TestCase):
def setUp(self):
super().setUp()
self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
self.controller = snapshots.SnapshotsController()
self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
def test_snapshot_create(self):
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": False,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot_name, resp_dict['snapshot']['name'])
self.assertEqual(snapshot_description,
resp_dict['snapshot']['description'])
self.assertIn('updated_at', resp_dict['snapshot'])
db.volume_destroy(self.ctx, volume.id)
def test_snapshot_create_with_null_validate(self):
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
snapshot = {
"volume_id": volume.id,
"force": False,
"name": None,
"description": None
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertIsNone(resp_dict['snapshot']['name'])
self.assertIsNone(resp_dict['snapshot']['description'])
db.volume_destroy(self.ctx, volume.id)
@ddt.data(True, 'y', 'true', 'yes', '1', 'on')
def test_snapshot_create_force(self, force_param):
volume = test_utils.create_volume(
self.ctx, status='in-use', volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertIn('snapshot', resp_dict)
self.assertEqual(snapshot_name,
resp_dict['snapshot']['name'])
self.assertEqual(snapshot_description,
resp_dict['snapshot']['description'])
self.assertIn('updated_at', resp_dict['snapshot'])
db.volume_destroy(self.ctx, volume.id)
@ddt.data(False, 'n', 'false', 'No', '0', 'off')
def test_snapshot_create_force_failure(self, force_param):
volume = test_utils.create_volume(
self.ctx, status='in-use', volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.InvalidVolume,
self.controller.create,
req,
body=body)
db.volume_destroy(self.ctx, volume.id)
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01', 'falSE', 0, 'trUE', 1,
"1 ")
def test_snapshot_create_invalid_force_param(self, force_param):
volume = test_utils.create_volume(
self.ctx, status='available', volume_type_id=None)
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
snapshot = {
"volume_id": volume.id,
"force": force_param,
"name": snapshot_name,
"description": snapshot_description
}
body = dict(snapshot=snapshot)
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.ValidationError,
self.controller.create,
req,
body=body)
db.volume_destroy(self.ctx, volume.id)
def test_snapshot_create_without_volume_id(self):
snapshot_name = 'Snapshot Test Name'
snapshot_description = 'Snapshot Test Desc'
body = {
"snapshot": {
"force": True,
"name": snapshot_name,
"description": snapshot_description
}
}
req = fakes.HTTPRequest.blank('/v3/snapshots')
self.assertRaises(exception.ValidationError,
self.controller.create, req, body=body)
@ddt.data({"snapshot": {"description": " sample description",
"name": " test"}},
{"snapshot": {"description": "sample description ",
"name": "test "}},
{"snapshot": {"description": " sample description ",
"name": " test name "}})
def test_snapshot_create_with_leading_trailing_spaces(self, body):
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
body['snapshot']['volume_id'] = volume.id
req = fakes.HTTPRequest.blank('/v3/snapshots')
resp_dict = self.controller.create(req, body=body)
self.assertEqual(body['snapshot']['display_name'].strip(),
resp_dict['snapshot']['name'])
self.assertEqual(body['snapshot']['description'].strip(),
resp_dict['snapshot']['description'])
db.volume_destroy(self.ctx, volume.id)
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2014-01-01 00:00:00",
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
expected = {
'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': datetime.datetime(
2014, 1, 1, 0, 0, 0, tzinfo=ZoneInfo('UTC'),
),
'updated_at': None,
'name': u'Updated Test Name',
'description': u'Default description',
'metadata': {},
}
}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update_with_null_validate(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2014-01-01 00:00:00",
'volume_size': 100,
'name': 'Default name',
'description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": None,
"description": None,
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
res_dict['snapshot']['status'])
self.assertIsNone(res_dict['snapshot']['name'])
self.assertIsNone(res_dict['snapshot']['description'])
def test_snapshot_update_missing_body(self):
body = {}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
self.assertRaises(exception.ValidationError,
self.controller.update, req, UUID, body=body)
def test_snapshot_update_invalid_body(self):
body = {'name': 'missing top level snapshot key'}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
self.assertRaises(exception.ValidationError,
self.controller.update, req, UUID, body=body)
def test_snapshot_update_not_found(self):
self.mock_object(volume.api.API, "get_snapshot", fake_snapshot_get)
updates = {
"name": "Updated Test Name",
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/not-the-uuid')
self.assertRaises(exception.SnapshotNotFound, self.controller.update,
req, 'not-the-uuid', body=body)
@mock.patch.object(volume.api.API, "update_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_update_with_leading_trailing_spaces(
self, snapshot_get_by_id, volume_get,
snapshot_metadata_get, update_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'created_at': "2018-01-14 00:00:00",
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get.return_value = fake_volume_obj
updates = {
"name": " test ",
"description": " test "
}
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
res_dict = self.controller.update(req, UUID, body=body)
expected = {
'snapshot': {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'size': 100,
'created_at': datetime.datetime(2018, 1, 14, 0, 0, 0,
tzinfo=ZoneInfo('UTC')),
'updated_at': None,
'name': u'test',
'description': u'test',
'metadata': {},
}
}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
@mock.patch.object(volume.api.API, "delete_snapshot",
side_effect=v2_fakes.fake_snapshot_update)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get, delete_snapshot):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
snapshot_id = UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
req.environ['cinder.context'] = self.ctx
resp = self.controller.delete(req, snapshot_id)
self.assertEqual(HTTPStatus.ACCEPTED, resp.status_int)
def test_snapshot_delete_invalid_id(self):
self.mock_object(volume.api.API, "delete_snapshot",
fake_snapshot_delete)
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound, self.controller.delete,
req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
snapshot_metadata_get):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata'],
}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
req.environ['cinder.context'] = self.ctx
resp_dict = self.controller.show(req, UUID)
self.assertIn('snapshot', resp_dict)
self.assertEqual(UUID, resp_dict['snapshot']['id'])
self.assertIn('updated_at', resp_dict['snapshot'])
def test_snapshot_show_invalid_id(self):
snapshot_id = INVALID_UUID
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
self.assertRaises(exception.SnapshotNotFound,
self.controller.show, req, snapshot_id)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
@mock.patch('cinder.volume.api.API.get_all_snapshots')
def test_snapshot_detail(self, get_all_snapshots, snapshot_get_by_id,
volume_get_by_id, snapshot_metadata_get):
snapshot = {
'id': UUID,
'volume_id': fake.VOLUME_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 100,
'display_name': 'Default name',
'display_description': 'Default description',
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
snapshots = objects.SnapshotList(objects=[snapshot_obj])
get_all_snapshots.return_value = snapshots
req = fakes.HTTPRequest.blank('/v3/snapshots/detail')
resp_dict = self.controller.detail(req)
self.assertIn('snapshots', resp_dict)
resp_snapshots = resp_dict['snapshots']
self.assertEqual(1, len(resp_snapshots))
self.assertIn('updated_at', resp_snapshots[0])
resp_snapshot = resp_snapshots.pop()
self.assertEqual(UUID, resp_snapshot['id'])
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_limited_to_project(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snapshots_with_limit_and_offset(self,
snapshot_metadata_get):
def list_snapshots_with_limit_and_offset(snaps, is_admin):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?limit=1'
'&offset=1' % fake.PROJECT_ID,
use_admin_context=is_admin)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
self.assertIn('updated_at', res['snapshots'][0])
# Test that we get an empty list with an offset greater than the
# number of items
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=3')
self.assertEqual({'snapshots': []}, self.controller.index(req))
volume, snaps = self._create_db_snapshots(3)
# admin case
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
# non-admin case
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
@mock.patch.object(db, 'snapshot_get_all_by_project')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_list_snapshots_with_wrong_limit_and_offset(self,
mock_metadata_get,
mock_snapshot_get_all):
"""Test list with negative and non numeric limit and offset."""
mock_snapshot_get_all.return_value = []
# Negative limit
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=-1&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric limit
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=a&offset=1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Negative offset
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=-1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Non numeric offset
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=a')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index,
req)
# Test that we get an exception HTTPBadRequest(400) with an offset
# greater than the maximum offset value.
url = '/v3/snapshots?limit=1&offset=323245324356534235'
req = fakes.HTTPRequest.blank(url)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, req)
def _assert_list_next(self, expected_query=None, project=fake.PROJECT_ID,
**kwargs):
"""Check a page of snapshots list."""
# Since we are accessing v2 api directly we don't need to specify
# v2 in the request path, if we did, we'd get /v3/v2 links back
request_path = '/v3/%s/snapshots' % project
expected_path = request_path
# Construct the query if there are kwargs
if kwargs:
request_str = request_path + '?' + urllib.parse.urlencode(kwargs)
else:
request_str = request_path
# Make the request
req = fakes.HTTPRequest.blank(request_str)
res = self.controller.index(req)
# We only expect to have a next link if there is an actual expected
# query.
if expected_query:
# We must have the links
self.assertIn('snapshots_links', res)
links = res['snapshots_links']
# Must be a list of links, even if we only get 1 back
self.assertIsInstance(links, list)
next_link = links[0]
# rel entry must be next
self.assertIn('rel', next_link)
self.assertIn('next', next_link['rel'])
# href entry must have the right path
self.assertIn('href', next_link)
href_parts = urllib.parse.urlparse(next_link['href'])
self.assertEqual(expected_path, href_parts.path)
# And the query from the next link must match what we were
# expecting
params = urllib.parse.parse_qs(href_parts.query)
self.assertDictEqual(expected_query, params)
# Make sure we don't have links if we were not expecting them
else:
self.assertNotIn('snapshots_links', res)
def _create_db_snapshots(self, num_snaps):
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
snaps = [
test_utils.create_snapshot(
self.ctx, volume.id, display_name='snap' + str(i)
) for i in range(num_snaps)
]
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
for snap in snaps:
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
snaps.reverse()
return volume, snaps
def test_list_snapshots_next_link_default_limit(self):
"""Test that snapshot list pagination is limited by osapi_max_limit."""
volume, snaps = self._create_db_snapshots(3)
# NOTE(geguileo): Since cinder.api.common.limited has already been
# imported his argument max_limit already has a default value of 1000
# so it doesn't matter that we change it to 2. That's why we need to
# mock it and send it current value. We still need to set the default
# value because other sections of the code use it, for example
# _get_collection_links
CONF.set_default('osapi_max_limit', 2)
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
original_call=common.get_pagination_params):
return original_call(params, max_limit)
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
original_call=common._get_limit_param):
return original_call(params, max_limit)
with mock.patch.object(common, 'get_pagination_params',
get_pagination_params), \
mock.patch.object(common, '_get_limit_param',
_get_limit_param):
# The link from the first page should link to the second
self._assert_list_next({'marker': [snaps[1].id]})
# Second page should have no next link
self._assert_list_next(marker=snaps[1].id)
def test_list_snapshots_next_link_with_limit(self):
"""Test snapshot list pagination with specific limit."""
volume, snaps = self._create_db_snapshots(2)
# The link from the first page should link to the second
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
limit=1)
# Even though there are no more elements, we should get a next element
# per specification.
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
# When we go beyond the number of elements there should be no more
# next links
self._assert_list_next(limit=1, marker=snaps[1].id)
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all')
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
snapshot_get_all):
def get_all(context, filters=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, offset=None):
if 'project_id' in filters and 'tenant1' in filters['project_id']:
return [v2_fakes.fake_snapshot(fake.VOLUME_ID,
tenant_id='tenant1')]
else:
return []
snapshot_get_all.side_effect = get_all
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1'
'&project_id=tenant1' % fake.PROJECT_ID,
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_all_tenants_non_admin_gets_all_tenants(self,
snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
@mock.patch.object(db, 'snapshot_get_all_by_project',
v2_fakes.fake_snapshot_get_all_by_project)
@mock.patch.object(db, 'snapshot_get_all',
v2_fakes.fake_snapshot_get_all)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
def test_non_admin_get_by_project(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def _create_snapshot_bad_body(self, body):
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
req.method = 'POST'
self.assertRaises(exception.ValidationError,
self.controller.create, req, body=body)
def test_create_no_body(self):
self._create_snapshot_bad_body(body=None)
def test_create_missing_snapshot(self):
body = {'foo': {'a': 'b'}}
self._create_snapshot_bad_body(body=body)
def test_create_malformed_entity(self):
body = {'snapshot': 'string'}
self._create_snapshot_bad_body(body=body)