Add snapshot instances admin APIs

Add new API entry points for share snapshot instances:
- share-snapshot-instance-list
- share-snapshot-instance-show
- share-snapshot-instance-reset-status

APIImpact
DocImpact

Implements: blueprint snapshot-instances
Change-Id: Ica1e81012f19926e0f1ba9cd6d8eecc5fbbf40b5
This commit is contained in:
zhongjun 2015-10-14 16:40:32 +08:00
parent ee0e68e156
commit 8a487bf95d
14 changed files with 719 additions and 4 deletions

View File

@ -61,6 +61,11 @@
"share_snapshot:force_delete": "rule:admin_api",
"share_snapshot:reset_status": "rule:admin_api",
"share_snapshot_instance:detail": "rule:admin_api",
"share_snapshot_instance:index": "rule:admin_api",
"share_snapshot_instance:show": "rule:admin_api",
"share_snapshot_instance:reset_status": "rule:admin_api",
"share_type:index": "rule:default",
"share_type:show": "rule:default",
"share_type:default": "rule:default",

View File

@ -69,13 +69,16 @@ REST_API_VERSION_HISTORY = """
* 2.17 - Added project_id and user_id fields to the JSON response of
snapshot show/create/manage API.
* 2.18 - Add gateway to the JSON response of share network show API.
* 2.19 - Share snapshot instances admin APIs
(list/show/detail/reset-status).
"""
# The minimum and maximum versions of the API supported
# The default api version request is defined to be the
# the minimum version of the API supported.
_MIN_API_VERSION = "2.0"
_MAX_API_VERSION = "2.18"
_MAX_API_VERSION = "2.19"
DEFAULT_API_VERSION = _MIN_API_VERSION

View File

@ -118,3 +118,7 @@ user documentation.
2.18
----
Add gateway in share network show API.
2.19
----
Add admin APIs(list/show/detail/reset-status) of snapshot instances.

View File

@ -41,6 +41,7 @@ from manila.api.v2 import share_export_locations
from manila.api.v2 import share_instance_export_locations
from manila.api.v2 import share_instances
from manila.api.v2 import share_replicas
from manila.api.v2 import share_snapshot_instances
from manila.api.v2 import share_snapshots
from manila.api.v2 import share_types
from manila.api.v2 import shares
@ -205,6 +206,13 @@ class APIRouter(manila.api.openstack.APIRouter):
action="manage",
conditions={"method": ["POST"]})
self.resources['snapshot_instances'] = (
share_snapshot_instances.create_resource())
mapper.resource("snapshot-instance", "snapshot-instances",
controller=self.resources['snapshot_instances'],
collection={'detail': 'GET'},
member={'action': 'POST'})
self.resources["share_metadata"] = share_metadata.create_resource()
share_metadata_controller = self.resources["share_metadata"]

View File

@ -0,0 +1,86 @@
# Copyright 2016 Huawei Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from webob import exc
from manila.api.openstack import wsgi
from manila.api.views import share_snapshot_instances as instance_view
from manila import db
from manila import exception
from manila.i18n import _
from manila import share
class ShareSnapshotInstancesController(wsgi.Controller,
wsgi.AdminActionsMixin):
"""The share snapshot instances API controller for the OpenStack API."""
resource_name = 'share_snapshot_instance'
_view_builder_class = instance_view.ViewBuilder
def __init__(self):
self.share_api = share.API()
super(self.__class__, self).__init__()
@wsgi.Controller.api_version('2.19')
@wsgi.Controller.authorize
def show(self, req, id):
context = req.environ['manila.context']
try:
snapshot_instance = db.share_snapshot_instance_get(
context, id)
except exception.ShareSnapshotInstanceNotFound:
msg = (_("Snapshot instance %s not found.") % id)
raise exc.HTTPNotFound(explanation=msg)
return self._view_builder.detail(req, snapshot_instance)
@wsgi.Controller.api_version('2.19')
@wsgi.Controller.authorize
def index(self, req):
"""Return a summary list of snapshot instances."""
return self._get_instances(req)
@wsgi.Controller.api_version('2.19')
@wsgi.Controller.authorize
def detail(self, req):
"""Returns a detailed list of snapshot instances."""
return self._get_instances(req, is_detail=True)
def _get_instances(self, req, is_detail=False):
"""Returns list of snapshot instances."""
context = req.environ['manila.context']
snapshot_id = req.params.get('snapshot_id')
instances = db.share_snapshot_instance_get_all_with_filters(
context, {'snapshot_ids': snapshot_id})
if is_detail:
instances = self._view_builder.detail_list(req, instances)
else:
instances = self._view_builder.summary_list(req, instances)
return instances
@wsgi.Controller.api_version('2.19')
@wsgi.action('reset_status')
def reset_status(self, req, id, body):
"""Reset the 'status' attribute in the database."""
return self._reset_status(req, id, body)
def _update(self, *args, **kwargs):
db.share_snapshot_instance_update(*args, **kwargs)
def create_resource():
return wsgi.Resource(ShareSnapshotInstancesController())

View File

@ -0,0 +1,63 @@
# Copyright 2016 Huawei Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manila.api import common
class ViewBuilder(common.ViewBuilder):
"""Model the server API response as a python dictionary."""
_collection_name = 'snapshot_instances'
def summary_list(self, request, instances):
"""Summary view of a list of share snapshot instances."""
return self._list_view(self.summary, request, instances)
def detail_list(self, request, instances):
"""Detailed view of a list of share snapshot instances."""
return self._list_view(self.detail, request, instances)
def summary(self, request, instance):
"""Generic, non-detailed view of a share snapshot instance."""
instance_dict = {
'id': instance.get('id'),
'snapshot_id': instance.get('snapshot_id'),
'status': instance.get('status'),
}
return {'snapshot_instance': instance_dict}
def detail(self, request, instance):
"""Detailed view of a single share snapshot instance."""
instance_dict = {
'id': instance.get('id'),
'snapshot_id': instance.get('snapshot_id'),
'created_at': instance.get('created_at'),
'updated_at': instance.get('updated_at'),
'status': instance.get('status'),
'share_id': instance.get('share_instance').get('share_id'),
'share_instance_id': instance.get('share_instance_id'),
'progress': instance.get('progress'),
'provider_location': instance.get('provider_location'),
}
return {'snapshot_instance': instance_dict}
def _list_view(self, func, request, instances):
"""Provide a view for a list of share snapshot instances."""
instances_list = [func(request, instance)['snapshot_instance']
for instance in instances]
instances_dict = {self._collection_name: instances_list}
return instances_dict

View File

@ -0,0 +1,263 @@
# Copyright 2016 Huawei Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
import six
from webob import exc
from manila.api.v2 import share_snapshot_instances
from manila.common import constants
from manila import context
from manila import exception
from manila import policy
from manila import test
from manila.tests.api import fakes
from manila.tests import db_utils
from manila.tests import fake_share
CONF = cfg.CONF
@ddt.ddt
class ShareSnapshotInstancesApiTest(test.TestCase):
"""Share snapshot instance Api Test."""
def setUp(self):
super(ShareSnapshotInstancesApiTest, self).setUp()
self.controller = (share_snapshot_instances.
ShareSnapshotInstancesController())
self.resource_name = self.controller.resource_name
self.api_version = '2.19'
self.snapshot_instances_req = fakes.HTTPRequest.blank(
'/snapshot-instances', version=self.api_version)
self.admin_context = context.RequestContext('admin', 'fake', True)
self.member_context = context.RequestContext('fake', 'fake')
self.snapshot_instances_req.environ['manila.context'] = (
self.admin_context)
self.snapshot_instances_req_admin = fakes.HTTPRequest.blank(
'/snapshot-instances', version=self.api_version,
use_admin_context=True)
self.mock_policy_check = self.mock_object(policy, 'check_policy')
def _get_request(self, uri, context=None):
if context is None:
context = self.admin_context
req = fakes.HTTPRequest.blank('/snapshot_instances',
version="2.19")
req.environ['manila.context'] = context
return req
def _get_fake_snapshot_instance(self, summary=False, **values):
snapshot_instance = fake_share.fake_snapshot_instance(
as_primitive=True)
expected_keys = {
'id',
'snapshot_id',
'status',
}
expected_snapshot_instance = {key: snapshot_instance[key] for key
in snapshot_instance if key
in expected_keys}
if not summary:
expected_snapshot_instance['share_id'] = (
snapshot_instance.get('share_instance').get('share_id'))
expected_snapshot_instance.update({
'created_at': snapshot_instance.get('created_at'),
'updated_at': snapshot_instance.get('updated_at'),
'progress': snapshot_instance.get('progress'),
'provider_location': snapshot_instance.get(
'provider_location'),
'share_instance_id': snapshot_instance.get(
'share_instance_id'),
})
return snapshot_instance, expected_snapshot_instance
def _setup_snapshot_instance_data(self, instance=None):
if instance is None:
share_instance = db_utils.create_share_instance(
status=constants.STATUS_AVAILABLE,
share_id='fake_share_id_1')
instance = db_utils.create_snapshot_instance(
'fake_snapshot_id_1',
status=constants.STATUS_AVAILABLE,
share_instance_id=share_instance['id'])
req = fakes.HTTPRequest.blank(
'/v2/fake/snapshot-instances/%s/action' % instance['id'],
version=self.api_version)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.headers['X-Openstack-Manila-Api-Version'] = self.api_version
return instance, req
def _get_context(self, role):
return getattr(self, '%s_context' % role)
@ddt.data(None, 'FAKE_SNAPSHOT_ID')
def test_list_snapshot_instances_summary(self, snapshot_id):
snapshot_instance, expected_snapshot_instance = (
self._get_fake_snapshot_instance(summary=True))
self.mock_object(share_snapshot_instances.db,
'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[snapshot_instance]))
url = '/snapshot-instances'
if snapshot_id:
url += '?snapshot_id=%s' % snapshot_id
req = fakes.HTTPRequest.blank(url, version=self.api_version)
req_context = req.environ['manila.context']
res_dict = self.controller.index(req)
self.assertEqual([expected_snapshot_instance],
res_dict['snapshot_instances'])
self.mock_policy_check.assert_called_once_with(
req_context, self.resource_name, 'index')
def test_list_snapshot_instances_detail(self):
snapshot_instance, expected_snapshot_instance = (
self._get_fake_snapshot_instance())
self.mock_object(share_snapshot_instances.db,
'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[snapshot_instance]))
res_dict = self.controller.detail(self.snapshot_instances_req)
self.assertEqual([expected_snapshot_instance],
res_dict['snapshot_instances'])
self.mock_policy_check.assert_called_once_with(
self.admin_context, self.resource_name, 'detail')
def test_list_snapshot_instances_detail_invalid_snapshot(self):
self.mock_object(share_snapshot_instances.db,
'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[]))
req = self.snapshot_instances_req
req.GET['snapshot_id'] = 'FAKE_SNAPSHOT_ID'
res_dict = self.controller.detail(req)
self.assertEqual([], res_dict['snapshot_instances'])
self.mock_policy_check.assert_called_once_with(
self.admin_context, self.resource_name, 'detail')
def test_show(self):
snapshot_instance, expected_snapshot_instance = (
self._get_fake_snapshot_instance())
self.mock_object(
share_snapshot_instances.db, 'share_snapshot_instance_get',
mock.Mock(return_value=snapshot_instance))
res_dict = self.controller.show(self.snapshot_instances_req,
snapshot_instance.get('id'))
self.assertEqual(expected_snapshot_instance,
res_dict['snapshot_instance'])
self.mock_policy_check.assert_called_once_with(
self.admin_context, self.resource_name, 'show')
def test_show_snapshot_instance_not_found(self):
mock__view_builder_call = self.mock_object(
share_snapshot_instances.instance_view.ViewBuilder, 'detail')
fake_exception = exception.ShareSnapshotInstanceNotFound(
instance_id='FAKE_SNAPSHOT_INSTANCE_ID')
self.mock_object(share_snapshot_instances.db,
'share_snapshot_instance_get',
mock.Mock(side_effect=fake_exception))
self.assertRaises(exc.HTTPNotFound,
self.controller.show,
self.snapshot_instances_req,
'FAKE_SNAPSHOT_INSTANCE_ID')
self.assertFalse(mock__view_builder_call.called)
@ddt.data('index', 'detail', 'show', 'reset_status')
def test_policy_not_authorized(self, method_name):
method = getattr(self.controller, method_name)
if method_name in ('index', 'detail'):
arguments = {}
else:
arguments = {
'id': 'FAKE_SNAPSHOT_ID',
'body': {'FAKE_KEY': 'FAKE_VAL'},
}
noauthexc = exception.PolicyNotAuthorized(action=six.text_type(method))
with mock.patch.object(
policy, 'check_policy', mock.Mock(side_effect=noauthexc)):
self.assertRaises(
exc.HTTPForbidden, method, self.snapshot_instances_req,
**arguments)
@ddt.data('index', 'show', 'detail', 'reset_status')
def test_upsupported_microversion(self, method_name):
unsupported_microversions = ('1.0', '2.18')
method = getattr(self.controller, method_name)
arguments = {
'id': 'FAKE_SNAPSHOT_ID',
}
if method_name in ('index'):
arguments.clear()
for microversion in unsupported_microversions:
req = fakes.HTTPRequest.blank(
'/snapshot-instances', version=microversion)
self.assertRaises(exception.VersionNotFoundForAPIMethod,
method, req, **arguments)
def _reset_status(self, context, instance, req,
valid_code=202, valid_status=None, body=None):
if body is None:
body = {'reset_status': {'status': constants.STATUS_ERROR}}
req.body = six.b(jsonutils.dumps(body))
req.environ['manila.context'] = context
with mock.patch.object(
policy, 'check_policy', fakes.mock_fake_admin_check):
resp = req.get_response(fakes.app())
# validate response code and model status
self.assertEqual(valid_code, resp.status_int)
if valid_code == 404:
self.assertRaises(exception.ShareSnapshotInstanceNotFound,
(share_snapshot_instances.db.
share_snapshot_instance_get),
context,
instance['id'])
else:
actual_instance = (
share_snapshot_instances.db.share_snapshot_instance_get(
context, instance['id']))
self.assertEqual(valid_status, actual_instance['status'])
@ddt.data(*fakes.fixture_reset_status_with_different_roles)
@ddt.unpack
def test_reset_status_with_different_roles(self, role, valid_code,
valid_status, version):
instance, action_req = self._setup_snapshot_instance_data()
ctxt = self._get_context(role)
self._reset_status(ctxt, instance, action_req,
valid_code=valid_code,
valid_status=valid_status)

View File

@ -140,7 +140,7 @@ def fake_snapshot(create_instance=False, **kwargs):
return db_fakes.FakeModel(snapshot)
def fake_snapshot_instance(base_snapshot=None, **kwargs):
def fake_snapshot_instance(base_snapshot=None, as_primitive=False, **kwargs):
if base_snapshot is None:
base_snapshot = fake_snapshot()
snapshot_instance = {
@ -151,6 +151,7 @@ def fake_snapshot_instance(base_snapshot=None, **kwargs):
'provider_location': 'i_live_here_actually',
'share_name': 'fakename',
'share_id': 'fakeshareinstanceid',
'share_instance': {'share_id': 'fakeshareid', },
'share_instance_id': 'fakeshareinstanceid',
'deleted': False,
'updated_at': datetime.datetime(2016, 3, 21, 0, 5, 58),
@ -159,7 +160,10 @@ def fake_snapshot_instance(base_snapshot=None, **kwargs):
'share': fake_share(),
}
snapshot_instance.update(kwargs)
return db_fakes.FakeModel(snapshot_instance)
if as_primitive:
return snapshot_instance
else:
return db_fakes.FakeModel(snapshot_instance)
def expected_snapshot(version=None, id='fake_snapshot_id', **kwargs):

View File

@ -34,7 +34,7 @@ ShareGroup = [
help="The minimum api microversion is configured to be the "
"value of the minimum microversion supported by Manila."),
cfg.StrOpt("max_api_microversion",
default="2.18",
default="2.19",
help="The maximum api microversion is configured to be the "
"value of the latest microversion supported by Manila."),
cfg.StrOpt("region",

View File

@ -550,6 +550,68 @@ class SharesV2Client(shares_client.SharesClient):
self.expected_success(202, resp.status)
return body
###############
def get_snapshot_instance(self, instance_id, version=LATEST_MICROVERSION):
resp, body = self.get("snapshot-instances/%s" % instance_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshot_instances(self, detail=False, snapshot_id=None,
version=LATEST_MICROVERSION):
"""Get list of share snapshot instances."""
uri = "snapshot-instances%s" % ('/detail' if detail else '')
if snapshot_id is not None:
uri += '?snapshot_id=%s' % snapshot_id
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def reset_snapshot_instance_status(self, instance_id,
status=constants.STATUS_AVAILABLE,
version=LATEST_MICROVERSION):
"""Reset the status."""
uri = 'snapshot-instances/%s/action' % instance_id
post_body = {
'reset_status': {
'status': status
}
}
body = json.dumps(post_body)
resp, body = self.post(uri, body, extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def wait_for_snapshot_instance_status(self, instance_id, expected_status):
"""Waits for a snapshot instance status to reach a given status."""
body = self.get_snapshot_instance(instance_id)
instance_status = body['status']
start = int(time.time())
while instance_status != expected_status:
time.sleep(self.build_interval)
body = self.get_snapshot_instance(instance_id)
instance_status = body['status']
if instance_status == expected_status:
return
if 'error' in instance_status:
raise share_exceptions.SnapshotInstanceBuildErrorException(
id=instance_id)
if int(time.time()) - start >= self.build_timeout:
message = ('The status of snapshot instance %(id)s failed to '
'reach %(expected_status)s status within the '
'required time (%(time)ss). Current '
'status: %(current_status)s.' %
{
'expected_status': expected_status,
'time': self.build_timeout,
'id': instance_id,
'current_status': instance_status,
})
raise exceptions.TimeoutException(message)
###############
def _get_access_action_name(self, version, action):

View File

@ -37,6 +37,11 @@ class SnapshotBuildErrorException(exceptions.TempestException):
message = "Snapshot %(snapshot_id)s failed to build and is in ERROR status"
class SnapshotInstanceBuildErrorException(exceptions.TempestException):
message = ("Snapshot instance %(id)s failed to build and is in "
"ERROR status.")
class CGSnapshotBuildErrorException(exceptions.TempestException):
message = ("CGSnapshot %(cgsnapshot_id)s failed to build and is in ERROR "
"status")

View File

@ -0,0 +1,121 @@
# Copyright 2016 Huawei
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from tempest import config
from tempest import test
import testtools
from manila_tempest_tests.tests.api import base
CONF = config.CONF
@testtools.skipUnless(CONF.share.run_snapshot_tests,
'Snapshot tests are disabled.')
@base.skip_if_microversion_lt("2.19")
@ddt.ddt
class ShareSnapshotInstancesTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ShareSnapshotInstancesTest, cls).resource_setup()
cls.share = cls.create_share()
snap = cls.create_snapshot_wait_for_active(cls.share["id"])
cls.snapshot = cls.shares_v2_client.get_snapshot(snap['id'])
@ddt.data(True, False)
@test.attr(type=[base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND])
def test_list_snapshot_instances_by_snapshot(self, detail):
"""Test that we get only the 1 snapshot instance from snapshot."""
snapshot_instances = self.shares_v2_client.list_snapshot_instances(
detail=detail, snapshot_id=self.snapshot['id'])
expected_keys = ['id', 'snapshot_id', 'status']
if detail:
extra_detail_keys = ['provider_location', 'share_id',
'share_instance_id', 'created_at',
'updated_at', 'progress']
expected_keys.extend(extra_detail_keys)
si_num = len(snapshot_instances)
self.assertEqual(1, si_num,
'Incorrect amount of snapshot instances found; '
'expected 1, found %s.' % si_num)
si = snapshot_instances[0]
self.assertEqual(self.snapshot['id'], si['snapshot_id'],
'Snapshot instance %s has incorrect snapshot id;'
' expected %s, got %s.' % (si['id'],
self.snapshot['id'],
si['snapshot_id']))
if detail:
self.assertEqual(self.snapshot['share_id'], si['share_id'])
for key in si:
self.assertIn(key, expected_keys)
self.assertEqual(len(expected_keys), len(si))
@test.attr(type=[base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND])
def test_list_snapshot_instances(self):
"""Test that we get at least the snapshot instance."""
snapshot_instances = self.shares_v2_client.list_snapshot_instances()
snapshot_ids = [si['snapshot_id'] for si in snapshot_instances]
msg = ('Snapshot instance for snapshot %s was not found.' %
self.snapshot['id'])
self.assertIn(self.snapshot['id'], snapshot_ids, msg)
@test.attr(type=[base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND])
def test_get_snapshot_instance(self):
instances = self.shares_v2_client.list_snapshot_instances(
snapshot_id=self.snapshot['id'])
instance_detail = self.shares_v2_client.get_snapshot_instance(
instance_id=instances[0]['id'])
expected_keys = (
'id', 'created_at', 'updated_at', 'progress', 'provider_location',
'share_id', 'share_instance_id', 'snapshot_id', 'status',
)
for key in instance_detail:
self.assertIn(key, expected_keys)
self.assertEqual(len(expected_keys), len(instance_detail))
self.assertEqual(self.snapshot['id'], instance_detail['snapshot_id'])
self.assertEqual(self.snapshot['share_id'],
instance_detail['share_id'])
self.assertEqual(self.snapshot['provider_location'],
instance_detail['provider_location'])
@test.attr(type=[base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND])
def test_reset_snapshot_instance_status_and_delete(self):
"""Test resetting a snapshot instance's status attribute."""
snapshot = self.create_snapshot_wait_for_active(self.share["id"])
snapshot_instances = self.shares_v2_client.list_snapshot_instances(
snapshot_id=snapshot['id'])
sii = snapshot_instances[0]['id']
for status in ("error", "available"):
self.shares_v2_client.reset_snapshot_instance_status(
sii, status=status)
self.shares_v2_client.wait_for_snapshot_instance_status(
sii, expected_status=status)
self.shares_v2_client.delete_snapshot(snapshot['id'])
self.shares_v2_client.wait_for_resource_deletion(
snapshot_id=snapshot['id'])

View File

@ -0,0 +1,88 @@
# Copyright 2016 Huawei
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib import exceptions as lib_exc
from tempest import test
import testtools
from manila_tempest_tests.tests.api import base
CONF = config.CONF
@testtools.skipUnless(CONF.share.run_snapshot_tests,
'Snapshot tests are disabled.')
@base.skip_if_microversion_lt("2.19")
class SnapshotInstancesNegativeTest(base.BaseSharesMixedTest):
@classmethod
def resource_setup(cls):
super(SnapshotInstancesNegativeTest, cls).resource_setup()
cls.admin_client = cls.admin_shares_v2_client
cls.member_client = cls.shares_v2_client
cls.share = cls.create_share(client=cls.admin_client)
cls.snapshot = cls.create_snapshot_wait_for_active(
cls.share["id"], client=cls.admin_client)
@test.attr(type=[base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND])
def test_list_snapshot_instances_with_snapshot_by_non_admin(self):
self.assertRaises(
lib_exc.Forbidden,
self.member_client.list_snapshot_instances,
snapshot_id=self.snapshot['id'])
@test.attr(type=[base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND])
def test_get_snapshot_instance_by_non_admin(self):
instances = self.admin_client.list_snapshot_instances(
snapshot_id=self.snapshot['id'])
self.assertRaises(
lib_exc.Forbidden,
self.member_client.get_snapshot_instance,
instance_id=instances[0]['id'])
@test.attr(type=[base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND])
def test_reset_snapshot_instance_status_by_non_admin(self):
instances = self.admin_client.list_snapshot_instances(
snapshot_id=self.snapshot['id'])
self.assertRaises(
lib_exc.Forbidden,
self.member_client.reset_snapshot_instance_status,
instances[0]['id'],
'error')
@testtools.skipUnless(CONF.share.run_snapshot_tests,
'Snapshot tests are disabled.')
@base.skip_if_microversion_lt("2.19")
class SnapshotInstancesNegativeNoResourceTest(base.BaseSharesMixedTest):
@classmethod
def resource_setup(cls):
super(SnapshotInstancesNegativeNoResourceTest, cls).resource_setup()
cls.admin_client = cls.admin_shares_v2_client
cls.member_client = cls.shares_v2_client
@test.attr(type=[base.TAG_NEGATIVE, base.TAG_API])
def test_get_snapshot_instance_with_non_existent_instance(self):
self.assertRaises(lib_exc.NotFound,
self.admin_client.get_snapshot_instance,
instance_id="nonexistent_instance")
@test.attr(type=[base.TAG_NEGATIVE, base.TAG_API])
def test_list_snapshot_instances_by_non_admin(self):
self.assertRaises(
lib_exc.Forbidden,
self.member_client.list_snapshot_instances)

View File

@ -0,0 +1,3 @@
---
features:
- Add list, show, and reset-status admin APIs for snapshot instances.