cinder/cinder/tests/unit/policies/test_volume_actions.py

1197 lines
49 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from http import HTTPStatus
from unittest import mock
import ddt
from cinder.api.contrib import admin_actions
from cinder.api.contrib import volume_actions
from cinder.api import extensions
from cinder.api import microversions as mv
from cinder.api.v3 import volumes
from cinder import exception
from cinder.objects import fields
from cinder.policies import volume_actions as policy
from cinder.policies import volumes as volume_policy
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit import fake_constants
from cinder.tests.unit.policies import base
from cinder.tests.unit.policies import test_base
from cinder.tests.unit import utils as test_utils
from cinder.volume import api as volume_api
from cinder.volume import manager as volume_manager
@ddt.ddt
class VolumeActionsPolicyTest(base.BasePolicyTest):
authorized_users = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_users = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_admins = [
'legacy_admin',
'system_admin',
'project_admin',
]
unauthorized_admins = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.ext_mgr = extensions.ExtensionManager()
self.controller = volume_actions.VolumeActionsController(self.ext_mgr)
self.admin_controller = admin_actions.VolumeAdminController(
self.ext_mgr)
self.volume_controller = volumes.VolumeController(self.ext_mgr)
self.manager = volume_manager.VolumeManager()
self.manager.driver = mock.MagicMock()
self.manager.driver.initialize_connection = mock.MagicMock()
self.manager.driver.initialize_connection.side_effect = (
self._initialize_connection)
self.api_path = '/v3/%s/volumes' % (self.project_id)
self.api_version = mv.BASE_VERSION
self.mock_is_service = self.patch(
'cinder.volume.api.API.is_service_request',
return_value=True)
def _initialize_connection(self, volume, connector):
return {'data': connector}
def _create_volume(self, attached=False, **kwargs):
vol_type = test_utils.create_volume_type(self.project_admin_context,
name='fake_vol_type',
testcase_instance=self)
volume = test_utils.create_volume(self.project_member_context,
volume_type_id=vol_type.id,
testcase_instance=self, **kwargs)
if attached:
volume = test_utils.attach_volume(self.project_member_context,
volume.id,
fake_constants.INSTANCE_ID,
'fake_host',
'fake_mountpoint')
return volume
@ddt.data(*base.all_users)
def test_extend_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.EXTEND_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
# DB validations will throw VolumeNotFound for some contexts
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_extend_attached_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.EXTEND_ATTACHED_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_EXTEND_INUSE)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_revert_policy(self, user_id):
volume = self._create_volume()
snap = test_utils.create_snapshot(
self.project_member_context,
volume.id,
status=fields.SnapshotStatus.AVAILABLE,
testcase_instance=self)
rule_name = policy.REVERT_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_REVERT)
req.method = 'POST'
body = {
"revert": {
"snapshot_id": snap.id
}
}
# Relax the volume:GET_POLICY in order to get past that check.
self.policy.set_rules({volume_policy.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.volume_controller.revert, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reset_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.RESET_STATUS
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reset_status": {
"status": "available",
"attach_status": "detached",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._reset_status, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_retype_policy(self, user_id):
volume = self._create_volume()
test_utils.create_volume_type(self.project_admin_context,
name='another_vol_type',
testcase_instance=self)
rule_name = policy.RETYPE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-retype": {
"new_type": "another_vol_type",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._retype, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_update_readonly_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.UPDATE_READONLY_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-update_readonly_flag": {
"readonly": True
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_readonly_update, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_force_delete_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.FORCE_DELETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_delete": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_delete, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_force_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.FORCE_DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_detach": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_image_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_IMAGE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_public_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_PUBLIC_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.UPLOAD_IMAGE_PARAMS)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
"visibility": "public",
}
}
# Relax the UPLOAD_IMAGE_POLICY in order to get past that check.
self.policy.set_rules({policy.UPLOAD_IMAGE_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.objects.Service.get_by_id')
def test_migrate_policy(self, user_id, mock_get_service_by_id):
volume = self._create_volume()
rule_name = policy.MIGRATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume": {
"host": "node1@lvm"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._migrate_volume, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_migrate_complete_policy(self, user_id):
volume = self._create_volume()
# Can't use self._create_volume() because it would fail when
# trying to create the volume type a second time.
new_volume = test_utils.create_volume(self.project_member_context,
testcase_instance=self)
rule_name = policy.MIGRATE_COMPLETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume_completion": {
"new_volume": new_volume.id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(
user_id, self.authorized_admins, self.unauthorized_admins,
unauthorized_exceptions, rule_name,
self.admin_controller._migrate_volume_completion, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.attach_volume')
def test_attach_policy(self, user_id, mock_attach_volume):
def attach_volume(context, volume, instance_uuid, host_name,
mountpoint, mode):
return self.manager.attach_volume(context, volume.id,
instance_uuid, host_name,
mountpoint, mode)
mock_attach_volume.side_effect = attach_volume
volume = self._create_volume(status='available')
rule_name = policy.ATTACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-attach": {
"instance_uuid": fake_constants.INSTANCE_ID,
"mountpoint": "/dev/vdc"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._attach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-detach": {
"attachment_id": volume.volume_attachment[0].id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_begin_detaching_policy(self, user_id):
volume = self._create_volume(status='in-use', attach_status='attached')
rule_name = policy.BEGIN_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-begin_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._begin_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reserve_policy(self, user_id):
volume = self._create_volume(status='available')
rule_name = policy.RESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._reserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_unreserve_policy(self, user_id):
volume = self._create_volume(status='reserved')
rule_name = policy.UNRESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-unreserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._unreserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_roll_detaching_policy(self, user_id):
volume = self._create_volume(status='detaching')
rule_name = policy.ROLL_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-roll_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._roll_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.initialize_connection')
def test_initialize_policy(self, user_id, mock_initialize_connection):
def initialize_connection(*args):
return self.manager.initialize_connection(*args)
mock_initialize_connection.side_effect = initialize_connection
volume = self._create_volume()
rule_name = policy.INITIALIZE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-initialize_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._initialize_connection,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_terminate_policy(self, user_id, mock_terminate_connection):
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force=False)
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume()
rule_name = policy.TERMINATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-terminate_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._terminate_connection,
req, id=volume.id, body=body)
class VolumeActionsPolicySecureRbacTest(VolumeActionsPolicyTest):
authorized_users = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
]
unauthorized_users = [
'legacy_owner',
'system_member',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)
class VolumeProtectionTests(test_base.CinderPolicyTests):
def test_admin_can_extend_volume(self):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_extend_volume(self):
user_context = self.user_context
volume = self._create_fake_volume(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_extend_volume_for_others(self, mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
def test_admin_can_extend_attached_volume(self):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(
admin_context, path, 'POST', body=body,
microversion=mv.VOLUME_EXTEND_INUSE)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_extend_attached_volume(self):
user_context = self.user_context
volume = self._create_fake_volume(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(
user_context, path, 'POST', body=body,
microversion=mv.VOLUME_EXTEND_INUSE)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_extend_attached_volume_for_others(self, mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-extend": {"new_size": "2"}}
response = self._get_request_response(
non_owner_context, path, 'POST', body=body,
microversion=mv.VOLUME_EXTEND_INUSE)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
def test_admin_can_retype_volume(self):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
vol_type = self._create_fake_type(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-retype": {"new_type": "%s" % vol_type.name,
"migration_policy": "never"}}
response = self._get_request_response(
admin_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_retype_volume(self):
user_context = self.user_context
volume = self._create_fake_volume(user_context)
vol_type = self._create_fake_type(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-retype": {"new_type": "%s" % vol_type.name,
"migration_policy": "never"}}
response = self._get_request_response(
user_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_retype_volume_for_others(self, mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
vol_type = self._create_fake_type(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-retype": {"new_type": "%s" % vol_type.name,
"migration_policy": "never"}}
response = self._get_request_response(
non_owner_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
def test_admin_can_update_readonly(self):
admin_context = self.admin_context
volume = self._create_fake_volume(
admin_context, admin_metadata={"readonly": "False"})
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-update_readonly_flag": {"readonly": "True"}}
response = self._get_request_response(
admin_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_update_readonly(self):
user_context = self.user_context
volume = self._create_fake_volume(
user_context, admin_metadata={"readonly": "False"})
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-update_readonly_flag": {"readonly": "True"}}
response = self._get_request_response(
user_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_update_readonly_for_others(self, mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(
user_context, admin_metadata={"readonly": "False"})
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-update_readonly_flag": {"readonly": "True"}}
response = self._get_request_response(
non_owner_context, path, 'POST', body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
@mock.patch.object(volume_api.API, 'get_volume')
def test_admin_can_force_delete_volumes(self, mock_volume):
# Make sure administrators are authorized to force delete volumes
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-force_delete": {}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get_volume')
def test_nonadmin_cannot_force_delete_volumes(self, mock_volume):
# Make sure volumes only can be force deleted by admin
user_context = self.user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-force_delete": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'attach_volume')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'detach_volume')
def test_admin_can_attach_detach_volume(self, mock_detach, mock_attach):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-attach": {"instance_uuid": fake_constants.UUID1,
"mountpoint": "/dev/vdc"}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
body = {"os-detach": {}}
# Detach for user call succeeds because the volume has no attachments
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'attach_volume')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'detach_volume')
def test_owner_can_attach_detach_volume(self, mock_detach, mock_attach):
user_context = self.user_context
volume = self._create_fake_volume(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-attach": {"instance_uuid": fake_constants.UUID1,
"mountpoint": "/dev/vdc"}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
# Succeeds for a user call because there are no attachments
body = {"os-detach": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'attach_volume')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI, 'detach_volume')
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_attach_detach_volume_for_others(self, mock_volume,
mock_detach,
mock_attach):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-attach": {"instance_uuid": fake_constants.UUID1,
"mountpoint": "/dev/vdc"}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
body = {"os-detach": {}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
def test_admin_can_reserve_unreserve_volume(self):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-reserve": {}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
body = {"os-unreserve": {}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_reserve_unreserve_volume(self):
user_context = self.user_context
volume = self._create_fake_volume(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-reserve": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
body = {"os-unreserve": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_reserve_unreserve_volume_for_others(self,
mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-attach": {"instance_uuid": fake_constants.UUID1,
"mountpoint": "/dev/vdc"}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
body = {"os-detach": {}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'initialize_connection')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'terminate_connection')
def test_admin_can_initialize_terminate_conn(self, mock_t, mock_i):
admin_context = self.admin_context
admin_context.service_roles = ['service']
volume = self._create_fake_volume(admin_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-initialize_connection": {'connector': {}}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.OK, response.status_int)
body = {"os-terminate_connection": {'connector': {}}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'initialize_connection')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'terminate_connection')
def test_owner_can_initialize_terminate_conn(self, mock_t, mock_i):
user_context = self.user_context
user_context.service_roles = ['service']
volume = self._create_fake_volume(user_context)
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-initialize_connection": {'connector': {}}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.OK, response.status_int)
body = {"os-terminate_connection": {'connector': {}}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'initialize_connection')
@mock.patch.object(volume_api.volume_rpcapi.VolumeAPI,
'terminate_connection')
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_initialize_terminate_conn_for_others(self,
mock_volume,
mock_t,
mock_i):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context)
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-initialize_connection": {'connector': {}}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
body = {"os-terminate_connection": {'connector': {}}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
def test_admin_can_begin_roll_detaching(self):
admin_context = self.admin_context
volume = self._create_fake_volume(admin_context, status='in-use',
attach_status='attached')
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': admin_context.project_id, 'volume_id': volume.id
}
body = {"os-begin_detaching": {}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
body = {"os-roll_detaching": {}}
response = self._get_request_response(admin_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
def test_owner_can_begin_roll_detaching(self):
user_context = self.user_context
volume = self._create_fake_volume(user_context, status='in-use',
attach_status='attached')
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': user_context.project_id, 'volume_id': volume.id
}
body = {"os-begin_detaching": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
body = {"os-roll_detaching": {}}
response = self._get_request_response(user_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
@mock.patch.object(volume_api.API, 'get')
def test_owner_cannot_begin_roll_detaching_for_others(self, mock_volume):
user_context = self.user_context
non_owner_context = self.other_user_context
volume = self._create_fake_volume(user_context, status='in-use',
attach_status='attached')
mock_volume.return_value = volume
path = '/v3/%(project_id)s/volumes/%(volume_id)s/action' % {
'project_id': non_owner_context.project_id, 'volume_id': volume.id
}
body = {"os-begin_detaching": {}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
body = {"os-roll_detaching": {}}
response = self._get_request_response(non_owner_context, path, 'POST',
body=body)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)