6b11d276d1
Many changes to the Cinder REST API require changes to the consumers of the API. For example, If we need to add a required parameter to a method that is called by Nova, we'd need both the Nova calling code and the cinderclient that Nova uses to change. But newer Cinder versions with the change must work with older Nova versions, and there is no mechanism for this at the moment. Adding microversions will solve this problem. With microversions, the highest supported version will be negotiated by a field in the HTTP header that is sent to the Cinder API. In the case where the field 'versions' is not sent (i.e. clients and scripts that pre-date this change), then the lowest supported version would be used. In order to ensure that the API consumer is explicitly asking for a microversioned API, a new endpoint v3 is added, which is identical to API version v2. This means that our new Cinder API v3 would be the default, and consumers of the API that wished to use a newer version could do so by using that endpoint and a microversion in the HTTP header. New tests for microversioned API features on endpoint /v3 should be added to cinder/tests/unit/api/v3/ directory. Existing functionality will be tested via the .../v2/ unit tests. DocImpact APIImpact Implements: https://blueprints.launchpad.net/cinder/+spec/cinder-api-microversions Change-Id: I48cdbbc900c2805e59ee9aebc3b1c64aed3212ae
632 lines
25 KiB
Python
632 lines
25 KiB
Python
# Copyright 2011 Denali Systems, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from lxml import etree
|
|
import mock
|
|
from oslo_config import cfg
|
|
from oslo_utils import timeutils
|
|
from six.moves.urllib import parse as urllib
|
|
import webob
|
|
|
|
from cinder.api import common
|
|
from cinder.api.v2 import snapshots
|
|
from cinder import context
|
|
from cinder import db
|
|
from cinder import exception
|
|
from cinder import objects
|
|
from cinder import test
|
|
from cinder.tests.unit.api import fakes
|
|
from cinder.tests.unit.api.v2 import stubs
|
|
from cinder.tests.unit import fake_snapshot
|
|
from cinder.tests.unit import fake_volume
|
|
from cinder.tests.unit import utils
|
|
from cinder import volume
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
UUID = '00000000-0000-0000-0000-000000000001'
|
|
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
|
|
|
|
|
|
def _get_default_snapshot_param():
|
|
return {
|
|
'id': UUID,
|
|
'volume_id': 12,
|
|
'status': 'available',
|
|
'volume_size': 100,
|
|
'created_at': None,
|
|
'updated_at': None,
|
|
'user_id': 'bcb7746c7a41472d88a1ffac89ba6a9b',
|
|
'project_id': '7ffe17a15c724e2aa79fc839540aec15',
|
|
'display_name': 'Default name',
|
|
'display_description': 'Default description',
|
|
'deleted': None,
|
|
'volume': {'availability_zone': 'test_zone'}
|
|
}
|
|
|
|
|
|
def stub_snapshot_delete(self, context, snapshot):
|
|
if snapshot['id'] != UUID:
|
|
raise exception.SnapshotNotFound(snapshot['id'])
|
|
|
|
|
|
def stub_snapshot_get(self, context, snapshot_id):
|
|
if snapshot_id != UUID:
|
|
raise exception.SnapshotNotFound(snapshot_id)
|
|
|
|
param = _get_default_snapshot_param()
|
|
return param
|
|
|
|
|
|
def stub_snapshot_get_all(self, context, search_opts=None):
|
|
param = _get_default_snapshot_param()
|
|
return [param]
|
|
|
|
|
|
class SnapshotApiTest(test.TestCase):
|
|
def setUp(self):
|
|
super(SnapshotApiTest, self).setUp()
|
|
self.controller = snapshots.SnapshotsController()
|
|
|
|
self.stubs.Set(db, 'snapshot_get_all_by_project',
|
|
stubs.stub_snapshot_get_all_by_project)
|
|
self.stubs.Set(db, 'snapshot_get_all',
|
|
stubs.stub_snapshot_get_all)
|
|
|
|
self.ctx = context.RequestContext('admin', 'fakeproject', True)
|
|
|
|
@mock.patch(
|
|
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
|
|
def test_snapshot_create(self, mock_validate):
|
|
volume = utils.create_volume(self.ctx)
|
|
snapshot_name = 'Snapshot Test Name'
|
|
snapshot_description = 'Snapshot Test Desc'
|
|
snapshot = {
|
|
"volume_id": volume.id,
|
|
"force": False,
|
|
"name": snapshot_name,
|
|
"description": snapshot_description
|
|
}
|
|
|
|
body = dict(snapshot=snapshot)
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots')
|
|
resp_dict = self.controller.create(req, body)
|
|
|
|
self.assertIn('snapshot', resp_dict)
|
|
self.assertEqual(snapshot_name, resp_dict['snapshot']['name'])
|
|
self.assertEqual(snapshot_description,
|
|
resp_dict['snapshot']['description'])
|
|
self.assertTrue(mock_validate.called)
|
|
self.assertIn('updated_at', resp_dict['snapshot'])
|
|
db.volume_destroy(self.ctx, volume.id)
|
|
|
|
def test_snapshot_create_force(self):
|
|
volume = utils.create_volume(self.ctx, status='in-use')
|
|
snapshot_name = 'Snapshot Test Name'
|
|
snapshot_description = 'Snapshot Test Desc'
|
|
snapshot = {
|
|
"volume_id": volume.id,
|
|
"force": True,
|
|
"name": snapshot_name,
|
|
"description": snapshot_description
|
|
}
|
|
body = dict(snapshot=snapshot)
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots')
|
|
resp_dict = self.controller.create(req, body)
|
|
|
|
self.assertIn('snapshot', resp_dict)
|
|
self.assertEqual(snapshot_name,
|
|
resp_dict['snapshot']['name'])
|
|
self.assertEqual(snapshot_description,
|
|
resp_dict['snapshot']['description'])
|
|
self.assertIn('updated_at', resp_dict['snapshot'])
|
|
|
|
snapshot = {
|
|
"volume_id": volume.id,
|
|
"force": "**&&^^%%$$##@@",
|
|
"name": "Snapshot Test Name",
|
|
"description": "Snapshot Test Desc"
|
|
}
|
|
body = dict(snapshot=snapshot)
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots')
|
|
self.assertRaises(exception.InvalidParameterValue,
|
|
self.controller.create,
|
|
req,
|
|
body)
|
|
|
|
db.volume_destroy(self.ctx, volume.id)
|
|
|
|
def test_snapshot_create_without_volume_id(self):
|
|
snapshot_name = 'Snapshot Test Name'
|
|
snapshot_description = 'Snapshot Test Desc'
|
|
body = {
|
|
"snapshot": {
|
|
"force": True,
|
|
"name": snapshot_name,
|
|
"description": snapshot_description
|
|
}
|
|
}
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots')
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.create, req, body)
|
|
|
|
@mock.patch.object(volume.api.API, "update_snapshot",
|
|
side_effect=stubs.stub_snapshot_update)
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
@mock.patch('cinder.objects.Volume.get_by_id')
|
|
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
|
@mock.patch(
|
|
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
|
|
def test_snapshot_update(
|
|
self, mock_validate, snapshot_get_by_id, volume_get_by_id,
|
|
snapshot_metadata_get, update_snapshot):
|
|
snapshot = {
|
|
'id': UUID,
|
|
'volume_id': 1,
|
|
'status': 'available',
|
|
'volume_size': 100,
|
|
'display_name': 'Default name',
|
|
'display_description': 'Default description',
|
|
'expected_attrs': ['metadata'],
|
|
}
|
|
ctx = context.RequestContext('admin', 'fake', True)
|
|
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
|
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
|
snapshot_get_by_id.return_value = snapshot_obj
|
|
volume_get_by_id.return_value = fake_volume_obj
|
|
|
|
updates = {
|
|
"name": "Updated Test Name",
|
|
}
|
|
body = {"snapshot": updates}
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
|
|
res_dict = self.controller.update(req, UUID, body)
|
|
expected = {
|
|
'snapshot': {
|
|
'id': UUID,
|
|
'volume_id': '1',
|
|
'status': u'available',
|
|
'size': 100,
|
|
'created_at': None,
|
|
'updated_at': None,
|
|
'name': u'Updated Test Name',
|
|
'description': u'Default description',
|
|
'metadata': {},
|
|
}
|
|
}
|
|
self.assertEqual(expected, res_dict)
|
|
self.assertTrue(mock_validate.called)
|
|
self.assertEqual(2, len(self.notifier.notifications))
|
|
|
|
def test_snapshot_update_missing_body(self):
|
|
body = {}
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.update, req, UUID, body)
|
|
|
|
def test_snapshot_update_invalid_body(self):
|
|
body = {'name': 'missing top level snapshot key'}
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.update, req, UUID, body)
|
|
|
|
def test_snapshot_update_not_found(self):
|
|
self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
|
|
updates = {
|
|
"name": "Updated Test Name",
|
|
}
|
|
body = {"snapshot": updates}
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/not-the-uuid')
|
|
self.assertRaises(webob.exc.HTTPNotFound, self.controller.update, req,
|
|
'not-the-uuid', body)
|
|
|
|
@mock.patch.object(volume.api.API, "delete_snapshot",
|
|
side_effect=stubs.stub_snapshot_update)
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
@mock.patch('cinder.objects.Volume.get_by_id')
|
|
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
|
def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
|
|
snapshot_metadata_get, delete_snapshot):
|
|
snapshot = {
|
|
'id': UUID,
|
|
'volume_id': 1,
|
|
'status': 'available',
|
|
'volume_size': 100,
|
|
'display_name': 'Default name',
|
|
'display_description': 'Default description',
|
|
'expected_attrs': ['metadata'],
|
|
}
|
|
ctx = context.RequestContext('admin', 'fake', True)
|
|
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
|
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
|
snapshot_get_by_id.return_value = snapshot_obj
|
|
volume_get_by_id.return_value = fake_volume_obj
|
|
|
|
snapshot_id = UUID
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % snapshot_id)
|
|
resp = self.controller.delete(req, snapshot_id)
|
|
self.assertEqual(202, resp.status_int)
|
|
|
|
def test_snapshot_delete_invalid_id(self):
|
|
self.stubs.Set(volume.api.API, "delete_snapshot", stub_snapshot_delete)
|
|
snapshot_id = INVALID_UUID
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % snapshot_id)
|
|
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
|
|
req, snapshot_id)
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
@mock.patch('cinder.objects.Volume.get_by_id')
|
|
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
|
def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
|
|
snapshot_metadata_get):
|
|
snapshot = {
|
|
'id': UUID,
|
|
'volume_id': 1,
|
|
'status': 'available',
|
|
'volume_size': 100,
|
|
'display_name': 'Default name',
|
|
'display_description': 'Default description',
|
|
'expected_attrs': ['metadata'],
|
|
}
|
|
ctx = context.RequestContext('admin', 'fake', True)
|
|
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
|
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
|
snapshot_get_by_id.return_value = snapshot_obj
|
|
volume_get_by_id.return_value = fake_volume_obj
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
|
|
resp_dict = self.controller.show(req, UUID)
|
|
|
|
self.assertIn('snapshot', resp_dict)
|
|
self.assertEqual(UUID, resp_dict['snapshot']['id'])
|
|
self.assertIn('updated_at', resp_dict['snapshot'])
|
|
|
|
def test_snapshot_show_invalid_id(self):
|
|
snapshot_id = INVALID_UUID
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % snapshot_id)
|
|
self.assertRaises(webob.exc.HTTPNotFound,
|
|
self.controller.show, req, snapshot_id)
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
@mock.patch('cinder.objects.Volume.get_by_id')
|
|
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
|
@mock.patch('cinder.volume.api.API.get_all_snapshots')
|
|
def test_snapshot_detail(self, get_all_snapshots, snapshot_get_by_id,
|
|
volume_get_by_id, snapshot_metadata_get):
|
|
snapshot = {
|
|
'id': UUID,
|
|
'volume_id': 1,
|
|
'status': 'available',
|
|
'volume_size': 100,
|
|
'display_name': 'Default name',
|
|
'display_description': 'Default description',
|
|
'expected_attrs': ['metadata']
|
|
}
|
|
ctx = context.RequestContext('admin', 'fake', True)
|
|
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
|
|
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
|
|
snapshot_get_by_id.return_value = snapshot_obj
|
|
volume_get_by_id.return_value = fake_volume_obj
|
|
snapshots = objects.SnapshotList(objects=[snapshot_obj])
|
|
get_all_snapshots.return_value = snapshots
|
|
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots/detail')
|
|
resp_dict = self.controller.detail(req)
|
|
|
|
self.assertIn('snapshots', resp_dict)
|
|
resp_snapshots = resp_dict['snapshots']
|
|
self.assertEqual(1, len(resp_snapshots))
|
|
self.assertIn('updated_at', resp_snapshots[0])
|
|
|
|
resp_snapshot = resp_snapshots.pop()
|
|
self.assertEqual(UUID, resp_snapshot['id'])
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_admin_list_snapshots_limited_to_project(self,
|
|
snapshot_metadata_get):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots',
|
|
use_admin_context=True)
|
|
res = self.controller.index(req)
|
|
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(1, len(res['snapshots']))
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_list_snapshots_with_limit_and_offset(self,
|
|
snapshot_metadata_get):
|
|
def list_snapshots_with_limit_and_offset(snaps, is_admin):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?limit=1\
|
|
&offset=1',
|
|
use_admin_context=is_admin)
|
|
res = self.controller.index(req)
|
|
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(1, len(res['snapshots']))
|
|
self.assertEqual(snaps[1].id, res['snapshots'][0]['id'])
|
|
self.assertIn('updated_at', res['snapshots'][0])
|
|
|
|
# Test that we get an empty list with an offset greater than the
|
|
# number of items
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots?limit=1&offset=3')
|
|
self.assertEqual({'snapshots': []}, self.controller.index(req))
|
|
|
|
self.stubs.UnsetAll()
|
|
volume, snaps = self._create_db_snapshots(3)
|
|
# admin case
|
|
list_snapshots_with_limit_and_offset(snaps, is_admin=True)
|
|
# non-admin case
|
|
list_snapshots_with_limit_and_offset(snaps, is_admin=False)
|
|
|
|
@mock.patch.object(db, 'snapshot_get_all_by_project')
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_list_snpashots_with_wrong_limit_and_offset(self,
|
|
mock_metadata_get,
|
|
mock_snapshot_get_all):
|
|
"""Test list with negative and non numeric limit and offset."""
|
|
mock_snapshot_get_all.return_value = []
|
|
|
|
# Negative limit
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots?limit=-1&offset=1')
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.index,
|
|
req)
|
|
|
|
# Non numeric limit
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots?limit=a&offset=1')
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.index,
|
|
req)
|
|
|
|
# Negative offset
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots?limit=1&offset=-1')
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.index,
|
|
req)
|
|
|
|
# Non numeric offset
|
|
req = fakes.HTTPRequest.blank('/v2/snapshots?limit=1&offset=a')
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.index,
|
|
req)
|
|
|
|
def _assert_list_next(self, expected_query=None, project='fakeproject',
|
|
**kwargs):
|
|
"""Check a page of snapshots list."""
|
|
# Since we are accessing v2 api directly we don't need to specify
|
|
# v2 in the request path, if we did, we'd get /v2/v2 links back
|
|
request_path = '/v2/%s/snapshots' % project
|
|
expected_path = request_path
|
|
|
|
# Construct the query if there are kwargs
|
|
if kwargs:
|
|
request_str = request_path + '?' + urllib.urlencode(kwargs)
|
|
else:
|
|
request_str = request_path
|
|
|
|
# Make the request
|
|
req = fakes.HTTPRequest.blank(request_str)
|
|
res = self.controller.index(req)
|
|
|
|
# We only expect to have a next link if there is an actual expected
|
|
# query.
|
|
if expected_query:
|
|
# We must have the links
|
|
self.assertIn('snapshots_links', res)
|
|
links = res['snapshots_links']
|
|
|
|
# Must be a list of links, even if we only get 1 back
|
|
self.assertTrue(list, type(links))
|
|
next_link = links[0]
|
|
|
|
# rel entry must be next
|
|
self.assertIn('rel', next_link)
|
|
self.assertIn('next', next_link['rel'])
|
|
|
|
# href entry must have the right path
|
|
self.assertIn('href', next_link)
|
|
href_parts = urllib.urlparse(next_link['href'])
|
|
self.assertEqual(expected_path, href_parts.path)
|
|
|
|
# And the query from the next link must match what we were
|
|
# expecting
|
|
params = urllib.parse_qs(href_parts.query)
|
|
self.assertDictEqual(expected_query, params)
|
|
|
|
# Make sure we don't have links if we were not expecting them
|
|
else:
|
|
self.assertNotIn('snapshots_links', res)
|
|
|
|
def _create_db_snapshots(self, num_snaps):
|
|
volume = utils.create_volume(self.ctx)
|
|
snaps = [utils.create_snapshot(self.ctx,
|
|
volume.id,
|
|
display_name='snap' + str(i))
|
|
for i in range(num_snaps)]
|
|
|
|
self.addCleanup(db.volume_destroy, self.ctx, volume.id)
|
|
for snap in snaps:
|
|
self.addCleanup(db.snapshot_destroy, self.ctx, snap.id)
|
|
|
|
snaps.reverse()
|
|
return volume, snaps
|
|
|
|
def test_list_snapshots_next_link_default_limit(self):
|
|
"""Test that snapshot list pagination is limited by osapi_max_limit."""
|
|
self.stubs.UnsetAll()
|
|
volume, snaps = self._create_db_snapshots(3)
|
|
|
|
# NOTE(geguileo): Since cinder.api.common.limited has already been
|
|
# imported his argument max_limit already has a default value of 1000
|
|
# so it doesn't matter that we change it to 2. That's why we need to
|
|
# mock it and send it current value. We still need to set the default
|
|
# value because other sections of the code use it, for example
|
|
# _get_collection_links
|
|
CONF.set_default('osapi_max_limit', 2)
|
|
|
|
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
|
|
original_call=common.get_pagination_params):
|
|
return original_call(params, max_limit)
|
|
|
|
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
|
|
original_call=common._get_limit_param):
|
|
return original_call(params, max_limit)
|
|
|
|
with mock.patch.object(common, 'get_pagination_params',
|
|
get_pagination_params), \
|
|
mock.patch.object(common, '_get_limit_param',
|
|
_get_limit_param):
|
|
# The link from the first page should link to the second
|
|
self._assert_list_next({'marker': [snaps[1].id]})
|
|
|
|
# Second page should have no next link
|
|
self._assert_list_next(marker=snaps[1].id)
|
|
|
|
def test_list_snapshots_next_link_with_limit(self):
|
|
"""Test snapshot list pagination with specific limit."""
|
|
self.stubs.UnsetAll()
|
|
volume, snaps = self._create_db_snapshots(2)
|
|
|
|
# The link from the first page should link to the second
|
|
self._assert_list_next({'limit': ['1'], 'marker': [snaps[0].id]},
|
|
limit=1)
|
|
|
|
# Even though there are no more elements, we should get a next element
|
|
# per specification.
|
|
expected = {'limit': ['1'], 'marker': [snaps[1].id]}
|
|
self._assert_list_next(expected, limit=1, marker=snaps[0].id)
|
|
|
|
# When we go beyond the number of elements there should be no more
|
|
# next links
|
|
self._assert_list_next(limit=1, marker=snaps[1].id)
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1',
|
|
use_admin_context=True)
|
|
res = self.controller.index(req)
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(3, len(res['snapshots']))
|
|
|
|
@mock.patch.object(db, 'snapshot_get_all')
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_admin_list_snapshots_by_tenant_id(self, snapshot_metadata_get,
|
|
snapshot_get_all):
|
|
def get_all(context, filters=None, marker=None, limit=None,
|
|
sort_keys=None, sort_dirs=None, offset=None):
|
|
if 'project_id' in filters and 'tenant1' in filters['project_id']:
|
|
return [stubs.stub_snapshot(1, tenant_id='tenant1')]
|
|
else:
|
|
return []
|
|
|
|
snapshot_get_all.side_effect = get_all
|
|
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1'
|
|
'&project_id=tenant1',
|
|
use_admin_context=True)
|
|
res = self.controller.index(req)
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(1, len(res['snapshots']))
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_all_tenants_non_admin_gets_all_tenants(self,
|
|
snapshot_metadata_get):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1')
|
|
res = self.controller.index(req)
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(1, len(res['snapshots']))
|
|
|
|
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
|
|
def test_non_admin_get_by_project(self, snapshot_metadata_get):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots')
|
|
res = self.controller.index(req)
|
|
self.assertIn('snapshots', res)
|
|
self.assertEqual(1, len(res['snapshots']))
|
|
|
|
def _create_snapshot_bad_body(self, body):
|
|
req = fakes.HTTPRequest.blank('/v2/fake/snapshots')
|
|
req.method = 'POST'
|
|
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self.controller.create, req, body)
|
|
|
|
def test_create_no_body(self):
|
|
self._create_snapshot_bad_body(body=None)
|
|
|
|
def test_create_missing_snapshot(self):
|
|
body = {'foo': {'a': 'b'}}
|
|
self._create_snapshot_bad_body(body=body)
|
|
|
|
def test_create_malformed_entity(self):
|
|
body = {'snapshot': 'string'}
|
|
self._create_snapshot_bad_body(body=body)
|
|
|
|
|
|
class SnapshotSerializerTest(test.TestCase):
|
|
def _verify_snapshot(self, snap, tree):
|
|
self.assertEqual('snapshot', tree.tag)
|
|
|
|
for attr in ('id', 'status', 'size', 'created_at',
|
|
'name', 'description', 'volume_id'):
|
|
self.assertEqual(str(snap[attr]), tree.get(attr))
|
|
|
|
def test_snapshot_show_create_serializer(self):
|
|
serializer = snapshots.SnapshotTemplate()
|
|
raw_snapshot = dict(
|
|
id='snap_id',
|
|
status='snap_status',
|
|
size=1024,
|
|
created_at=timeutils.utcnow(),
|
|
name='snap_name',
|
|
description='snap_desc',
|
|
display_description='snap_desc',
|
|
volume_id='vol_id',
|
|
)
|
|
text = serializer.serialize(dict(snapshot=raw_snapshot))
|
|
|
|
tree = etree.fromstring(text)
|
|
|
|
self._verify_snapshot(raw_snapshot, tree)
|
|
|
|
def test_snapshot_index_detail_serializer(self):
|
|
serializer = snapshots.SnapshotsTemplate()
|
|
raw_snapshots = [
|
|
dict(
|
|
id='snap1_id',
|
|
status='snap1_status',
|
|
size=1024,
|
|
created_at=timeutils.utcnow(),
|
|
name='snap1_name',
|
|
description='snap1_desc',
|
|
volume_id='vol1_id',
|
|
),
|
|
dict(
|
|
id='snap2_id',
|
|
status='snap2_status',
|
|
size=1024,
|
|
created_at=timeutils.utcnow(),
|
|
name='snap2_name',
|
|
description='snap2_desc',
|
|
volume_id='vol2_id',
|
|
)
|
|
]
|
|
text = serializer.serialize(dict(snapshots=raw_snapshots))
|
|
|
|
tree = etree.fromstring(text)
|
|
|
|
self.assertEqual('snapshots', tree.tag)
|
|
self.assertEqual(len(raw_snapshots), len(tree))
|
|
for idx, child in enumerate(tree):
|
|
self._verify_snapshot(raw_snapshots[idx], child)
|