tests: Merge v2, v3 snapshot API tests
We no longer support the v2 API. Given the v3 controller is the v2 controller but with the addition of microversions and some extra routes *and* the v2 tests obviously don't set microversions or test any of those additional routes, we can simply point the v2 controller tests at v3 controller and things will just work (TM). We do just this, moving the test to the v3 test file in the process. Change-Id: If90d2029fec5a5f5fce95a2ff7cbad2bdc71e495 Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
@@ -1,748 +0,0 @@
|
||||
# Copyright 2011 Denali Systems, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from http import HTTPStatus
|
||||
from unittest import mock
|
||||
from urllib import parse as urllib
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import ddt
|
||||
from oslo_config import cfg
|
||||
import webob
|
||||
|
||||
from cinder.api import common
|
||||
from cinder.api.v2 import snapshots
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
from cinder.tests.unit.api import fakes
|
||||
from cinder.tests.unit.api.v2 import fakes as v2_fakes
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit import fake_snapshot
|
||||
from cinder.tests.unit import fake_volume
|
||||
from cinder.tests.unit import test
|
||||
from cinder.tests.unit import utils
|
||||
from cinder import volume
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
UUID = '00000000-0000-0000-0000-000000000001'
|
||||
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
|
||||
|
||||
|
||||
def _get_default_snapshot_param():
|
||||
return {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'created_at': None,
|
||||
'updated_at': None,
|
||||
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
|
||||
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'deleted': None,
|
||||
'volume': {'availability_zone': 'test_zone'}
|
||||
}
|
||||
|
||||
|
||||
def fake_snapshot_delete(self, context, snapshot):
|
||||
if snapshot['id'] != UUID:
|
||||
raise exception.SnapshotNotFound(snapshot['id'])
|
||||
|
||||
|
||||
def fake_snapshot_get(self, context, snapshot_id):
|
||||
if snapshot_id != UUID:
|
||||
raise exception.SnapshotNotFound(snapshot_id)
|
||||
|
||||
param = _get_default_snapshot_param()
|
||||
return param
|
||||
|
||||
|
||||
def fake_snapshot_get_all(self, context, search_opts=None):
|
||||
param = _get_default_snapshot_param()
|
||||
return [param]
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class SnapshotApiTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(SnapshotApiTest, self).setUp()
|
||||
self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
|
||||
self.controller = snapshots.SnapshotsController()
|
||||
self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
|
||||
|
||||
def test_snapshot_create(self):
|
||||
volume = utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": False,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(snapshot_name, resp_dict['snapshot']['name'])
|
||||
self.assertEqual(snapshot_description,
|
||||
resp_dict['snapshot']['description'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
def test_snapshot_create_with_null_validate(self):
|
||||
|
||||
volume = utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": False,
|
||||
"name": None,
|
||||
"description": None
|
||||
}
|
||||
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertIsNone(resp_dict['snapshot']['name'])
|
||||
self.assertIsNone(resp_dict['snapshot']['description'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data(True, 'y', 'true', 'yes', '1', 'on')
|
||||
def test_snapshot_create_force(self, force_param):
|
||||
volume = utils.create_volume(self.ctx, status='in-use',
|
||||
volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(snapshot_name,
|
||||
resp_dict['snapshot']['name'])
|
||||
self.assertEqual(snapshot_description,
|
||||
resp_dict['snapshot']['description'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data(False, 'n', 'false', 'No', '0', 'off')
|
||||
def test_snapshot_create_force_failure(self, force_param):
|
||||
volume = utils.create_volume(self.ctx, status='in-use',
|
||||
volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.InvalidVolume,
|
||||
self.controller.create,
|
||||
req,
|
||||
body=body)
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01', 'falSE', 0, 'trUE', 1,
|
||||
"1 ")
|
||||
def test_snapshot_create_invalid_force_param(self, force_param):
|
||||
volume = utils.create_volume(self.ctx, status='available',
|
||||
volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create,
|
||||
req,
|
||||
body=body)
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
def test_snapshot_create_without_volume_id(self):
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
body = {
|
||||
"snapshot": {
|
||||
"force": True,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create, req, body=body)
|
||||
|
||||
@ddt.data({"snapshot": {"description": " sample description",
|
||||
"name": " test"}},
|
||||
{"snapshot": {"description": "sample description ",
|
||||
"name": "test "}},
|
||||
{"snapshot": {"description": " sample description ",
|
||||
"name": " test name "}})
|
||||
def test_snapshot_create_with_leading_trailing_spaces(self, body):
|
||||
volume = utils.create_volume(self.ctx, volume_type_id=None)
|
||||
body['snapshot']['volume_id'] = volume.id
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertEqual(body['snapshot']['display_name'].strip(),
|
||||
resp_dict['snapshot']['name'])
|
||||
self.assertEqual(body['snapshot']['description'].strip(),
|
||||
resp_dict['snapshot']['description'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2014-01-01 00:00:00",
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": "Updated Test Name",
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
expected = {
|
||||
'snapshot': {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'size': 100,
|
||||
'created_at': datetime.datetime(2014, 1, 1, 0, 0, 0,
|
||||
tzinfo=ZoneInfo('UTC')),
|
||||
'updated_at': None,
|
||||
'name': u'Updated Test Name',
|
||||
'description': u'Default description',
|
||||
'metadata': {},
|
||||
}
|
||||
}
|
||||
self.assertEqual(expected, res_dict)
|
||||
self.assertEqual(2, len(self.notifier.notifications))
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update_with_null_validate(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2014-01-01 00:00:00",
|
||||
'volume_size': 100,
|
||||
'name': 'Default name',
|
||||
'description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": None,
|
||||
"description": None,
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
|
||||
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
|
||||
res_dict['snapshot']['status'])
|
||||
self.assertIsNone(res_dict['snapshot']['name'])
|
||||
self.assertIsNone(res_dict['snapshot']['description'])
|
||||
|
||||
def test_snapshot_update_missing_body(self):
|
||||
body = {}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.update, req, UUID, body=body)
|
||||
|
||||
def test_snapshot_update_invalid_body(self):
|
||||
body = {'name': 'missing top level snapshot key'}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.update, req, UUID, body=body)
|
||||
|
||||
def test_snapshot_update_not_found(self):
|
||||
self.mock_object(volume.api.API, "get_snapshot", fake_snapshot_get)
|
||||
updates = {
|
||||
"name": "Updated Test Name",
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/not-the-uuid')
|
||||
self.assertRaises(exception.SnapshotNotFound, self.controller.update,
|
||||
req, 'not-the-uuid', body=body)
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update_with_leading_trailing_spaces(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2018-01-14 00:00:00",
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": " test ",
|
||||
"description": " test "
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
expected = {
|
||||
'snapshot': {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'size': 100,
|
||||
'created_at': datetime.datetime(2018, 1, 14, 0, 0, 0,
|
||||
tzinfo=ZoneInfo('UTC')),
|
||||
'updated_at': None,
|
||||
'name': u'test',
|
||||
'description': u'test',
|
||||
'metadata': {},
|
||||
}
|
||||
}
|
||||
self.assertEqual(expected, res_dict)
|
||||
self.assertEqual(2, len(self.notifier.notifications))
|
||||
|
||||
@mock.patch.object(volume.api.API, "delete_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
|
||||
snapshot_metadata_get, delete_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
|
||||
snapshot_id = UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
resp = self.controller.delete(req, snapshot_id)
|
||||
self.assertEqual(HTTPStatus.ACCEPTED, resp.status_int)
|
||||
|
||||
def test_snapshot_delete_invalid_id(self):
|
||||
self.mock_object(volume.api.API, "delete_snapshot",
|
||||
fake_snapshot_delete)
|
||||
snapshot_id = INVALID_UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
self.assertRaises(exception.SnapshotNotFound, self.controller.delete,
|
||||
req, snapshot_id)
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
|
||||
snapshot_metadata_get):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
resp_dict = self.controller.show(req, UUID)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(UUID, resp_dict['snapshot']['id'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
|
||||
def test_snapshot_show_invalid_id(self):
|
||||
snapshot_id = INVALID_UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
self.assertRaises(exception.SnapshotNotFound,
|
||||
self.controller.show, req, snapshot_id)
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
@mock.patch('cinder.volume.api.API.get_all_snapshots')
|
||||
def test_snapshot_detail(self, get_all_snapshots, snapshot_get_by_id,
|
||||
volume_get_by_id, snapshot_metadata_get):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata']
|
||||
}
|
||||
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
snapshots = objects.SnapshotList(objects=[snapshot_obj])
|
||||
get_all_snapshots.return_value = snapshots
|
||||
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/detail')
|
||||
resp_dict = self.controller.detail(req)
|
||||
|
||||
self.assertIn('snapshots', resp_dict)
|
||||
resp_snapshots = resp_dict['snapshots']
|
||||
self.assertEqual(1, len(resp_snapshots))
|
||||
self.assertIn('updated_at', resp_snapshots[0])
|
||||
|
||||
resp_snapshot = resp_snapshots.pop()
|
||||
self.assertEqual(UUID, resp_snapshot['id'])
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_limited_to_project(self,
|
||||
snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_list_snapshots_with_limit_and_offset(self,
|
||||
snapshot_metadata_get):
|
||||
def list_snapshots_with_limit_and_offset(snaps, is_admin):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?limit=1'
|
||||
'&offset=1' % fake.PROJECT_ID,
|
||||
use_admin_context=is_admin)
|
||||
res = self.controller.index(req)
|
||||
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
|
||||
self.assertIn('updated_at', res['snapshots'][0])
|
||||
|
||||
# Test that we get an empty list with an offset greater than the
|
||||
# number of items
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=3')
|
||||
self.assertEqual({'snapshots': []}, self.controller.index(req))
|
||||
|
||||
volume, snaps = self._create_db_snapshots(3)
|
||||
# admin case
|
||||
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
|
||||
# non-admin case
|
||||
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project')
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_list_snapshots_with_wrong_limit_and_offset(self,
|
||||
mock_metadata_get,
|
||||
mock_snapshot_get_all):
|
||||
"""Test list with negative and non numeric limit and offset."""
|
||||
mock_snapshot_get_all.return_value = []
|
||||
|
||||
# Negative limit
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=-1&offset=1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Non numeric limit
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=a&offset=1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Negative offset
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=-1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Non numeric offset
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=a')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Test that we get an exception HTTPBadRequest(400) with an offset
|
||||
# greater than the maximum offset value.
|
||||
url = '/v3/snapshots?limit=1&offset=323245324356534235'
|
||||
req = fakes.HTTPRequest.blank(url)
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index, req)
|
||||
|
||||
def _assert_list_next(self, expected_query=None, project=fake.PROJECT_ID,
|
||||
**kwargs):
|
||||
"""Check a page of snapshots list."""
|
||||
# Since we are accessing v2 api directly we don't need to specify
|
||||
# v2 in the request path, if we did, we'd get /v3/v2 links back
|
||||
request_path = '/v3/%s/snapshots' % project
|
||||
expected_path = request_path
|
||||
|
||||
# Construct the query if there are kwargs
|
||||
if kwargs:
|
||||
request_str = request_path + '?' + urllib.urlencode(kwargs)
|
||||
else:
|
||||
request_str = request_path
|
||||
|
||||
# Make the request
|
||||
req = fakes.HTTPRequest.blank(request_str)
|
||||
res = self.controller.index(req)
|
||||
|
||||
# We only expect to have a next link if there is an actual expected
|
||||
# query.
|
||||
if expected_query:
|
||||
# We must have the links
|
||||
self.assertIn('snapshots_links', res)
|
||||
links = res['snapshots_links']
|
||||
|
||||
# Must be a list of links, even if we only get 1 back
|
||||
self.assertIsInstance(links, list)
|
||||
next_link = links[0]
|
||||
|
||||
# rel entry must be next
|
||||
self.assertIn('rel', next_link)
|
||||
self.assertIn('next', next_link['rel'])
|
||||
|
||||
# href entry must have the right path
|
||||
self.assertIn('href', next_link)
|
||||
href_parts = urllib.urlparse(next_link['href'])
|
||||
self.assertEqual(expected_path, href_parts.path)
|
||||
|
||||
# And the query from the next link must match what we were
|
||||
# expecting
|
||||
params = urllib.parse_qs(href_parts.query)
|
||||
self.assertDictEqual(expected_query, params)
|
||||
|
||||
# Make sure we don't have links if we were not expecting them
|
||||
else:
|
||||
self.assertNotIn('snapshots_links', res)
|
||||
|
||||
def _create_db_snapshots(self, num_snaps):
|
||||
volume = utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snaps = [utils.create_snapshot(self.ctx,
|
||||
volume.id,
|
||||
display_name='snap' + str(i))
|
||||
for i in range(num_snaps)]
|
||||
|
||||
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
|
||||
for snap in snaps:
|
||||
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
|
||||
|
||||
snaps.reverse()
|
||||
return volume, snaps
|
||||
|
||||
def test_list_snapshots_next_link_default_limit(self):
|
||||
"""Test that snapshot list pagination is limited by osapi_max_limit."""
|
||||
volume, snaps = self._create_db_snapshots(3)
|
||||
|
||||
# NOTE(geguileo): Since cinder.api.common.limited has already been
|
||||
# imported his argument max_limit already has a default value of 1000
|
||||
# so it doesn't matter that we change it to 2. That's why we need to
|
||||
# mock it and send it current value. We still need to set the default
|
||||
# value because other sections of the code use it, for example
|
||||
# _get_collection_links
|
||||
CONF.set_default('osapi_max_limit', 2)
|
||||
|
||||
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common.get_pagination_params):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common._get_limit_param):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
with mock.patch.object(common, 'get_pagination_params',
|
||||
get_pagination_params), \
|
||||
mock.patch.object(common, '_get_limit_param',
|
||||
_get_limit_param):
|
||||
# The link from the first page should link to the second
|
||||
self._assert_list_next({'marker': [snaps[1].id]})
|
||||
|
||||
# Second page should have no next link
|
||||
self._assert_list_next(marker=snaps[1].id)
|
||||
|
||||
def test_list_snapshots_next_link_with_limit(self):
|
||||
"""Test snapshot list pagination with specific limit."""
|
||||
volume, snaps = self._create_db_snapshots(2)
|
||||
|
||||
# The link from the first page should link to the second
|
||||
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
|
||||
limit=1)
|
||||
|
||||
# Even though there are no more elements, we should get a next element
|
||||
# per specification.
|
||||
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
|
||||
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
|
||||
|
||||
# When we go beyond the number of elements there should be no more
|
||||
# next links
|
||||
self._assert_list_next(limit=1, marker=snaps[1].id)
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
|
||||
fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(3, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all')
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
|
||||
snapshot_get_all):
|
||||
def get_all(context, filters=None, marker=None, limit=None,
|
||||
sort_keys=None, sort_dirs=None, offset=None):
|
||||
if 'project_id' in filters and 'tenant1' in filters['project_id']:
|
||||
return [v2_fakes.fake_snapshot(fake.VOLUME_ID,
|
||||
tenant_id='tenant1')]
|
||||
else:
|
||||
return []
|
||||
|
||||
snapshot_get_all.side_effect = get_all
|
||||
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1'
|
||||
'&project_id=tenant1' % fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_all_tenants_non_admin_gets_all_tenants(self,
|
||||
snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
|
||||
fake.PROJECT_ID)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_non_admin_get_by_project(self, snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
def _create_snapshot_bad_body(self, body):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
|
||||
req.method = 'POST'
|
||||
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create, req, body=body)
|
||||
|
||||
def test_create_no_body(self):
|
||||
self._create_snapshot_bad_body(body=None)
|
||||
|
||||
def test_create_missing_snapshot(self):
|
||||
body = {'foo': {'a': 'b'}}
|
||||
self._create_snapshot_bad_body(body=body)
|
||||
|
||||
def test_create_malformed_entity(self):
|
||||
body = {'snapshot': 'string'}
|
||||
self._create_snapshot_bad_body(body=body)
|
||||
@@ -13,17 +13,25 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from http import HTTPStatus
|
||||
from unittest import mock
|
||||
import urllib.parse
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import ddt
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import strutils
|
||||
import webob
|
||||
from webob import exc
|
||||
|
||||
from cinder.api import common
|
||||
from cinder.api import microversions as mv
|
||||
from cinder.api.v3 import snapshots
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
from cinder.tests.unit.api import fakes
|
||||
@@ -35,11 +43,13 @@ from cinder.tests.unit import test
|
||||
from cinder.tests.unit import utils as test_utils
|
||||
from cinder import volume
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
UUID = '00000000-0000-0000-0000-000000000001'
|
||||
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
|
||||
|
||||
|
||||
def fake_get(self, context, *args, **kwargs):
|
||||
def fake_volume_get(self, context, *args, **kwargs):
|
||||
vol = {'id': fake.VOLUME_ID,
|
||||
'size': 100,
|
||||
'name': 'fake',
|
||||
@@ -54,6 +64,41 @@ def fake_get(self, context, *args, **kwargs):
|
||||
return fake_volume.fake_volume_obj(context, **vol)
|
||||
|
||||
|
||||
def _get_default_snapshot_param():
|
||||
return {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'created_at': None,
|
||||
'updated_at': None,
|
||||
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
|
||||
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'deleted': None,
|
||||
'volume': {'availability_zone': 'test_zone'}
|
||||
}
|
||||
|
||||
|
||||
def fake_snapshot_delete(self, context, snapshot):
|
||||
if snapshot['id'] != UUID:
|
||||
raise exception.SnapshotNotFound(snapshot['id'])
|
||||
|
||||
|
||||
def fake_snapshot_get(self, context, snapshot_id):
|
||||
if snapshot_id != UUID:
|
||||
raise exception.SnapshotNotFound(snapshot_id)
|
||||
|
||||
param = _get_default_snapshot_param()
|
||||
return param
|
||||
|
||||
|
||||
def fake_snapshot_get_all(self, context, search_opts=None):
|
||||
param = _get_default_snapshot_param()
|
||||
return [param]
|
||||
|
||||
|
||||
def create_snapshot_query_with_metadata(metadata_query_string,
|
||||
api_microversion):
|
||||
"""Helper to create metadata querystring with microversion"""
|
||||
@@ -68,8 +113,8 @@ def create_snapshot_query_with_metadata(metadata_query_string,
|
||||
@ddt.ddt
|
||||
class SnapshotApiTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(SnapshotApiTest, self).setUp()
|
||||
self.mock_object(volume.api.API, 'get', fake_get)
|
||||
super().setUp()
|
||||
self.mock_object(volume.api.API, 'get', fake_volume_get)
|
||||
self.mock_object(db.sqlalchemy.api, 'volume_type_get',
|
||||
v2_fakes.fake_volume_type_get)
|
||||
self.patch('cinder.quota.QUOTAS.reserve')
|
||||
@@ -455,3 +500,670 @@ class SnapshotApiTest(test.TestCase):
|
||||
self.controller.create(req, body=body)
|
||||
# ... but also shouldn't allow an in-use snapshot
|
||||
self.assertNotIn('allow_in_use', mock_create.call_args_list[0][1])
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class SnapshotApiTestNoMicroversion(test.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
|
||||
self.controller = snapshots.SnapshotsController()
|
||||
self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
|
||||
|
||||
def test_snapshot_create(self):
|
||||
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": False,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(snapshot_name, resp_dict['snapshot']['name'])
|
||||
self.assertEqual(snapshot_description,
|
||||
resp_dict['snapshot']['description'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
def test_snapshot_create_with_null_validate(self):
|
||||
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": False,
|
||||
"name": None,
|
||||
"description": None
|
||||
}
|
||||
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertIsNone(resp_dict['snapshot']['name'])
|
||||
self.assertIsNone(resp_dict['snapshot']['description'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data(True, 'y', 'true', 'yes', '1', 'on')
|
||||
def test_snapshot_create_force(self, force_param):
|
||||
volume = test_utils.create_volume(
|
||||
self.ctx, status='in-use', volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(snapshot_name,
|
||||
resp_dict['snapshot']['name'])
|
||||
self.assertEqual(snapshot_description,
|
||||
resp_dict['snapshot']['description'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data(False, 'n', 'false', 'No', '0', 'off')
|
||||
def test_snapshot_create_force_failure(self, force_param):
|
||||
volume = test_utils.create_volume(
|
||||
self.ctx, status='in-use', volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.InvalidVolume,
|
||||
self.controller.create,
|
||||
req,
|
||||
body=body)
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@ddt.data("**&&^^%%$$##@@", '-1', 2, '01', 'falSE', 0, 'trUE', 1,
|
||||
"1 ")
|
||||
def test_snapshot_create_invalid_force_param(self, force_param):
|
||||
volume = test_utils.create_volume(
|
||||
self.ctx, status='available', volume_type_id=None)
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
|
||||
snapshot = {
|
||||
"volume_id": volume.id,
|
||||
"force": force_param,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
body = dict(snapshot=snapshot)
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create,
|
||||
req,
|
||||
body=body)
|
||||
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
def test_snapshot_create_without_volume_id(self):
|
||||
snapshot_name = 'Snapshot Test Name'
|
||||
snapshot_description = 'Snapshot Test Desc'
|
||||
body = {
|
||||
"snapshot": {
|
||||
"force": True,
|
||||
"name": snapshot_name,
|
||||
"description": snapshot_description
|
||||
}
|
||||
}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create, req, body=body)
|
||||
|
||||
@ddt.data({"snapshot": {"description": " sample description",
|
||||
"name": " test"}},
|
||||
{"snapshot": {"description": "sample description ",
|
||||
"name": "test "}},
|
||||
{"snapshot": {"description": " sample description ",
|
||||
"name": " test name "}})
|
||||
def test_snapshot_create_with_leading_trailing_spaces(self, body):
|
||||
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
|
||||
body['snapshot']['volume_id'] = volume.id
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots')
|
||||
resp_dict = self.controller.create(req, body=body)
|
||||
|
||||
self.assertEqual(body['snapshot']['display_name'].strip(),
|
||||
resp_dict['snapshot']['name'])
|
||||
self.assertEqual(body['snapshot']['description'].strip(),
|
||||
resp_dict['snapshot']['description'])
|
||||
db.volume_destroy(self.ctx, volume.id)
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2014-01-01 00:00:00",
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": "Updated Test Name",
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
expected = {
|
||||
'snapshot': {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'size': 100,
|
||||
'created_at': datetime.datetime(
|
||||
2014, 1, 1, 0, 0, 0, tzinfo=ZoneInfo('UTC'),
|
||||
),
|
||||
'updated_at': None,
|
||||
'name': u'Updated Test Name',
|
||||
'description': u'Default description',
|
||||
'metadata': {},
|
||||
}
|
||||
}
|
||||
self.assertEqual(expected, res_dict)
|
||||
self.assertEqual(2, len(self.notifier.notifications))
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update_with_null_validate(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2014-01-01 00:00:00",
|
||||
'volume_size': 100,
|
||||
'name': 'Default name',
|
||||
'description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": None,
|
||||
"description": None,
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
|
||||
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
|
||||
res_dict['snapshot']['status'])
|
||||
self.assertIsNone(res_dict['snapshot']['name'])
|
||||
self.assertIsNone(res_dict['snapshot']['description'])
|
||||
|
||||
def test_snapshot_update_missing_body(self):
|
||||
body = {}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.update, req, UUID, body=body)
|
||||
|
||||
def test_snapshot_update_invalid_body(self):
|
||||
body = {'name': 'missing top level snapshot key'}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.update, req, UUID, body=body)
|
||||
|
||||
def test_snapshot_update_not_found(self):
|
||||
self.mock_object(volume.api.API, "get_snapshot", fake_snapshot_get)
|
||||
updates = {
|
||||
"name": "Updated Test Name",
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/not-the-uuid')
|
||||
self.assertRaises(exception.SnapshotNotFound, self.controller.update,
|
||||
req, 'not-the-uuid', body=body)
|
||||
|
||||
@mock.patch.object(volume.api.API, "update_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_update_with_leading_trailing_spaces(
|
||||
self, snapshot_get_by_id, volume_get,
|
||||
snapshot_metadata_get, update_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'created_at': "2018-01-14 00:00:00",
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get.return_value = fake_volume_obj
|
||||
|
||||
updates = {
|
||||
"name": " test ",
|
||||
"description": " test "
|
||||
}
|
||||
body = {"snapshot": updates}
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
res_dict = self.controller.update(req, UUID, body=body)
|
||||
expected = {
|
||||
'snapshot': {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'size': 100,
|
||||
'created_at': datetime.datetime(2018, 1, 14, 0, 0, 0,
|
||||
tzinfo=ZoneInfo('UTC')),
|
||||
'updated_at': None,
|
||||
'name': u'test',
|
||||
'description': u'test',
|
||||
'metadata': {},
|
||||
}
|
||||
}
|
||||
self.assertEqual(expected, res_dict)
|
||||
self.assertEqual(2, len(self.notifier.notifications))
|
||||
|
||||
@mock.patch.object(volume.api.API, "delete_snapshot",
|
||||
side_effect=v2_fakes.fake_snapshot_update)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
|
||||
snapshot_metadata_get, delete_snapshot):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
|
||||
snapshot_id = UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
resp = self.controller.delete(req, snapshot_id)
|
||||
self.assertEqual(HTTPStatus.ACCEPTED, resp.status_int)
|
||||
|
||||
def test_snapshot_delete_invalid_id(self):
|
||||
self.mock_object(volume.api.API, "delete_snapshot",
|
||||
fake_snapshot_delete)
|
||||
snapshot_id = INVALID_UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
self.assertRaises(exception.SnapshotNotFound, self.controller.delete,
|
||||
req, snapshot_id)
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
|
||||
snapshot_metadata_get):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata'],
|
||||
}
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(self.ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
|
||||
req.environ['cinder.context'] = self.ctx
|
||||
resp_dict = self.controller.show(req, UUID)
|
||||
|
||||
self.assertIn('snapshot', resp_dict)
|
||||
self.assertEqual(UUID, resp_dict['snapshot']['id'])
|
||||
self.assertIn('updated_at', resp_dict['snapshot'])
|
||||
|
||||
def test_snapshot_show_invalid_id(self):
|
||||
snapshot_id = INVALID_UUID
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
|
||||
self.assertRaises(exception.SnapshotNotFound,
|
||||
self.controller.show, req, snapshot_id)
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
@mock.patch('cinder.objects.Volume.get_by_id')
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
@mock.patch('cinder.volume.api.API.get_all_snapshots')
|
||||
def test_snapshot_detail(self, get_all_snapshots, snapshot_get_by_id,
|
||||
volume_get_by_id, snapshot_metadata_get):
|
||||
snapshot = {
|
||||
'id': UUID,
|
||||
'volume_id': fake.VOLUME_ID,
|
||||
'status': fields.SnapshotStatus.AVAILABLE,
|
||||
'volume_size': 100,
|
||||
'display_name': 'Default name',
|
||||
'display_description': 'Default description',
|
||||
'expected_attrs': ['metadata']
|
||||
}
|
||||
ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
|
||||
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
||||
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
||||
snapshot_get_by_id.return_value = snapshot_obj
|
||||
volume_get_by_id.return_value = fake_volume_obj
|
||||
snapshots = objects.SnapshotList(objects=[snapshot_obj])
|
||||
get_all_snapshots.return_value = snapshots
|
||||
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots/detail')
|
||||
resp_dict = self.controller.detail(req)
|
||||
|
||||
self.assertIn('snapshots', resp_dict)
|
||||
resp_snapshots = resp_dict['snapshots']
|
||||
self.assertEqual(1, len(resp_snapshots))
|
||||
self.assertIn('updated_at', resp_snapshots[0])
|
||||
|
||||
resp_snapshot = resp_snapshots.pop()
|
||||
self.assertEqual(UUID, resp_snapshot['id'])
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_limited_to_project(self,
|
||||
snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_list_snapshots_with_limit_and_offset(self,
|
||||
snapshot_metadata_get):
|
||||
def list_snapshots_with_limit_and_offset(snaps, is_admin):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?limit=1'
|
||||
'&offset=1' % fake.PROJECT_ID,
|
||||
use_admin_context=is_admin)
|
||||
res = self.controller.index(req)
|
||||
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
|
||||
self.assertIn('updated_at', res['snapshots'][0])
|
||||
|
||||
# Test that we get an empty list with an offset greater than the
|
||||
# number of items
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=3')
|
||||
self.assertEqual({'snapshots': []}, self.controller.index(req))
|
||||
|
||||
volume, snaps = self._create_db_snapshots(3)
|
||||
# admin case
|
||||
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
|
||||
# non-admin case
|
||||
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project')
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_list_snapshots_with_wrong_limit_and_offset(self,
|
||||
mock_metadata_get,
|
||||
mock_snapshot_get_all):
|
||||
"""Test list with negative and non numeric limit and offset."""
|
||||
mock_snapshot_get_all.return_value = []
|
||||
|
||||
# Negative limit
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=-1&offset=1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Non numeric limit
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=a&offset=1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Negative offset
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=-1')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Non numeric offset
|
||||
req = fakes.HTTPRequest.blank('/v3/snapshots?limit=1&offset=a')
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index,
|
||||
req)
|
||||
|
||||
# Test that we get an exception HTTPBadRequest(400) with an offset
|
||||
# greater than the maximum offset value.
|
||||
url = '/v3/snapshots?limit=1&offset=323245324356534235'
|
||||
req = fakes.HTTPRequest.blank(url)
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
self.controller.index, req)
|
||||
|
||||
def _assert_list_next(self, expected_query=None, project=fake.PROJECT_ID,
|
||||
**kwargs):
|
||||
"""Check a page of snapshots list."""
|
||||
# Since we are accessing v2 api directly we don't need to specify
|
||||
# v2 in the request path, if we did, we'd get /v3/v2 links back
|
||||
request_path = '/v3/%s/snapshots' % project
|
||||
expected_path = request_path
|
||||
|
||||
# Construct the query if there are kwargs
|
||||
if kwargs:
|
||||
request_str = request_path + '?' + urllib.parse.urlencode(kwargs)
|
||||
else:
|
||||
request_str = request_path
|
||||
|
||||
# Make the request
|
||||
req = fakes.HTTPRequest.blank(request_str)
|
||||
res = self.controller.index(req)
|
||||
|
||||
# We only expect to have a next link if there is an actual expected
|
||||
# query.
|
||||
if expected_query:
|
||||
# We must have the links
|
||||
self.assertIn('snapshots_links', res)
|
||||
links = res['snapshots_links']
|
||||
|
||||
# Must be a list of links, even if we only get 1 back
|
||||
self.assertIsInstance(links, list)
|
||||
next_link = links[0]
|
||||
|
||||
# rel entry must be next
|
||||
self.assertIn('rel', next_link)
|
||||
self.assertIn('next', next_link['rel'])
|
||||
|
||||
# href entry must have the right path
|
||||
self.assertIn('href', next_link)
|
||||
href_parts = urllib.parse.urlparse(next_link['href'])
|
||||
self.assertEqual(expected_path, href_parts.path)
|
||||
|
||||
# And the query from the next link must match what we were
|
||||
# expecting
|
||||
params = urllib.parse.parse_qs(href_parts.query)
|
||||
self.assertDictEqual(expected_query, params)
|
||||
|
||||
# Make sure we don't have links if we were not expecting them
|
||||
else:
|
||||
self.assertNotIn('snapshots_links', res)
|
||||
|
||||
def _create_db_snapshots(self, num_snaps):
|
||||
volume = test_utils.create_volume(self.ctx, volume_type_id=None)
|
||||
snaps = [
|
||||
test_utils.create_snapshot(
|
||||
self.ctx, volume.id, display_name='snap' + str(i)
|
||||
) for i in range(num_snaps)
|
||||
]
|
||||
|
||||
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
|
||||
for snap in snaps:
|
||||
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
|
||||
|
||||
snaps.reverse()
|
||||
return volume, snaps
|
||||
|
||||
def test_list_snapshots_next_link_default_limit(self):
|
||||
"""Test that snapshot list pagination is limited by osapi_max_limit."""
|
||||
volume, snaps = self._create_db_snapshots(3)
|
||||
|
||||
# NOTE(geguileo): Since cinder.api.common.limited has already been
|
||||
# imported his argument max_limit already has a default value of 1000
|
||||
# so it doesn't matter that we change it to 2. That's why we need to
|
||||
# mock it and send it current value. We still need to set the default
|
||||
# value because other sections of the code use it, for example
|
||||
# _get_collection_links
|
||||
CONF.set_default('osapi_max_limit', 2)
|
||||
|
||||
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common.get_pagination_params):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common._get_limit_param):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
with mock.patch.object(common, 'get_pagination_params',
|
||||
get_pagination_params), \
|
||||
mock.patch.object(common, '_get_limit_param',
|
||||
_get_limit_param):
|
||||
# The link from the first page should link to the second
|
||||
self._assert_list_next({'marker': [snaps[1].id]})
|
||||
|
||||
# Second page should have no next link
|
||||
self._assert_list_next(marker=snaps[1].id)
|
||||
|
||||
def test_list_snapshots_next_link_with_limit(self):
|
||||
"""Test snapshot list pagination with specific limit."""
|
||||
volume, snaps = self._create_db_snapshots(2)
|
||||
|
||||
# The link from the first page should link to the second
|
||||
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
|
||||
limit=1)
|
||||
|
||||
# Even though there are no more elements, we should get a next element
|
||||
# per specification.
|
||||
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
|
||||
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
|
||||
|
||||
# When we go beyond the number of elements there should be no more
|
||||
# next links
|
||||
self._assert_list_next(limit=1, marker=snaps[1].id)
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
|
||||
fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(3, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all')
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
|
||||
snapshot_get_all):
|
||||
def get_all(context, filters=None, marker=None, limit=None,
|
||||
sort_keys=None, sort_dirs=None, offset=None):
|
||||
if 'project_id' in filters and 'tenant1' in filters['project_id']:
|
||||
return [v2_fakes.fake_snapshot(fake.VOLUME_ID,
|
||||
tenant_id='tenant1')]
|
||||
else:
|
||||
return []
|
||||
|
||||
snapshot_get_all.side_effect = get_all
|
||||
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1'
|
||||
'&project_id=tenant1' % fake.PROJECT_ID,
|
||||
use_admin_context=True)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_all_tenants_non_admin_gets_all_tenants(self,
|
||||
snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots?all_tenants=1' %
|
||||
fake.PROJECT_ID)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
@mock.patch.object(db, 'snapshot_get_all_by_project',
|
||||
v2_fakes.fake_snapshot_get_all_by_project)
|
||||
@mock.patch.object(db, 'snapshot_get_all',
|
||||
v2_fakes.fake_snapshot_get_all)
|
||||
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
||||
def test_non_admin_get_by_project(self, snapshot_metadata_get):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
|
||||
res = self.controller.index(req)
|
||||
self.assertIn('snapshots', res)
|
||||
self.assertEqual(1, len(res['snapshots']))
|
||||
|
||||
def _create_snapshot_bad_body(self, body):
|
||||
req = fakes.HTTPRequest.blank('/v3/%s/snapshots' % fake.PROJECT_ID)
|
||||
req.method = 'POST'
|
||||
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.controller.create, req, body=body)
|
||||
|
||||
def test_create_no_body(self):
|
||||
self._create_snapshot_bad_body(body=None)
|
||||
|
||||
def test_create_missing_snapshot(self):
|
||||
body = {'foo': {'a': 'b'}}
|
||||
self._create_snapshot_bad_body(body=body)
|
||||
|
||||
def test_create_malformed_entity(self):
|
||||
body = {'snapshot': 'string'}
|
||||
self._create_snapshot_bad_body(body=body)
|
||||
|
||||
Reference in New Issue
Block a user