Merge "Implement project personas for volume actions"

This commit is contained in:
Zuul 2021-09-15 13:19:15 +00:00 committed by Gerrit Code Review
commit ca4bfddba6
5 changed files with 934 additions and 139 deletions

View File

@ -40,37 +40,100 @@ ROLL_DETACHING_POLICY = "volume_extension:volume_actions:roll_detaching"
TERMINATE_POLICY = "volume_extension:volume_actions:terminate_connection"
INITIALIZE_POLICY = "volume_extension:volume_actions:initialize_connection"
deprecated_extend_policy = base.CinderDeprecatedRule(
name=EXTEND_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_extend_attached_policy = base.CinderDeprecatedRule(
name=EXTEND_ATTACHED_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_revert_policy = base.CinderDeprecatedRule(
name=REVERT_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_retype_policy = base.CinderDeprecatedRule(
name=RETYPE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_update_only_policy = base.CinderDeprecatedRule(
name=UPDATE_READONLY_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_upload_image_policy = base.CinderDeprecatedRule(
name=UPLOAD_IMAGE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_initialize_policy = base.CinderDeprecatedRule(
name=INITIALIZE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_terminate_policy = base.CinderDeprecatedRule(
name=TERMINATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_roll_detaching_policy = base.CinderDeprecatedRule(
name=ROLL_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_reserve_policy = base.CinderDeprecatedRule(
name=RESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_unreserve_policy = base.CinderDeprecatedRule(
name=UNRESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_begin_detaching_policy = base.CinderDeprecatedRule(
name=BEGIN_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_attach_policy = base.CinderDeprecatedRule(
name=ATTACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_detach_policy = base.CinderDeprecatedRule(
name=DETACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
volume_action_policies = [
policy.DocumentedRuleDefault(
name=EXTEND_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Extend a volume.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-extend)'
}
]),
],
deprecated_rule=deprecated_extend_policy,
),
policy.DocumentedRuleDefault(
name=EXTEND_ATTACHED_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Extend a attached volume.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-extend)'
}
]),
],
deprecated_rule=deprecated_extend_attached_policy,
),
policy.DocumentedRuleDefault(
name=REVERT_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Revert a volume to a snapshot.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (revert)'
}
]),
],
deprecated_rule=deprecated_revert_policy,
),
policy.DocumentedRuleDefault(
name=RESET_STATUS,
check_str=base.RULE_ADMIN_API,
@ -80,27 +143,32 @@ volume_action_policies = [
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-reset_status)'
}
]),
],
),
policy.DocumentedRuleDefault(
name=RETYPE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Retype a volume.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-retype)'
}
]),
],
deprecated_rule=deprecated_retype_policy,
),
policy.DocumentedRuleDefault(
name=UPDATE_READONLY_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Update a volume's readonly flag.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-update_readonly_flag)'
}
]),
],
deprecated_rule=deprecated_update_only_policy,
),
policy.DocumentedRuleDefault(
name=FORCE_DELETE_POLICY,
check_str=base.RULE_ADMIN_API,
@ -110,7 +178,8 @@ volume_action_policies = [
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-force_delete)'
}
]),
],
),
policy.DocumentedRuleDefault(
name=UPLOAD_PUBLIC_POLICY,
check_str=base.RULE_ADMIN_API,
@ -120,17 +189,20 @@ volume_action_policies = [
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-volume_upload_image)'
}
]),
],
),
policy.DocumentedRuleDefault(
name=UPLOAD_IMAGE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Upload a volume to image.",
operations=[
{
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-volume_upload_image)'
}
]),
],
deprecated_rule=deprecated_upload_image_policy,
),
policy.DocumentedRuleDefault(
name=FORCE_DETACH_POLICY,
check_str=base.RULE_ADMIN_API,
@ -140,7 +212,8 @@ volume_action_policies = [
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-force_detach)'
}
]),
],
),
policy.DocumentedRuleDefault(
name=MIGRATE_POLICY,
check_str=base.RULE_ADMIN_API,
@ -150,7 +223,8 @@ volume_action_policies = [
'method': 'POST',
'path': '/volumes/{volume_id}/action (os-migrate_volume)'
}
]),
],
),
policy.DocumentedRuleDefault(
name=MIGRATE_COMPLETE_POLICY,
check_str=base.RULE_ADMIN_API,
@ -159,79 +233,96 @@ volume_action_policies = [
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-migrate_volume_completion)'}
]),
],
),
policy.DocumentedRuleDefault(
name=INITIALIZE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Initialize volume attachment.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-initialize_connection)'}
]),
],
deprecated_rule=deprecated_initialize_policy,
),
policy.DocumentedRuleDefault(
name=TERMINATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Terminate volume attachment.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-terminate_connection)'}
]),
],
deprecated_rule=deprecated_terminate_policy,
),
policy.DocumentedRuleDefault(
name=ROLL_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Roll back volume status to 'in-use'.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-roll_detaching)'}
]),
],
deprecated_rule=deprecated_roll_detaching_policy,
),
policy.DocumentedRuleDefault(
name=RESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Mark volume as reserved.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-reserve)'}
]),
],
deprecated_rule=deprecated_reserve_policy,
),
policy.DocumentedRuleDefault(
name=UNRESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Unmark volume as reserved.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-unreserve)'}
]),
],
deprecated_rule=deprecated_unreserve_policy,
),
policy.DocumentedRuleDefault(
name=BEGIN_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Begin detach volumes.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-begin_detaching)'}
]),
],
deprecated_rule=deprecated_begin_detaching_policy,
),
policy.DocumentedRuleDefault(
name=ATTACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Add attachment metadata.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-attach)'}
]),
],
deprecated_rule=deprecated_attach_policy,
),
policy.DocumentedRuleDefault(
name=DETACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Clear attachment metadata.",
operations=[{
'method': 'POST',
'path':
'/volumes/{volume_id}/action (os-detach)'}
]),
],
deprecated_rule=deprecated_detach_policy,
),
]

View File

@ -1032,6 +1032,7 @@ class VolumeApiTest(test.TestCase):
req.headers = mv.get_mv_header(mv.VOLUME_REVERT)
req.api_version_request = mv.get_api_version(
mv.VOLUME_REVERT)
req.environ['cinder.context'] = self.ctxt
# update volume's status failed
mock_update.side_effect = [False, True]

View File

@ -14,20 +14,706 @@
from http import HTTPStatus
from unittest import mock
import ddt
from cinder.api.contrib import admin_actions
from cinder.api.contrib import volume_actions
from cinder.api import extensions
from cinder.api import microversions as mv
from cinder.api.v3 import volumes
from cinder import exception
from cinder.objects import fields
from cinder.policies import volume_actions as policy
from cinder.policies import volumes as volume_policy
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit import fake_constants
from cinder.tests.unit.policies import base
from cinder.tests.unit.policies import test_base
from cinder.tests.unit import utils as test_utils
from cinder.volume import api as volume_api
from cinder.volume import manager as volume_manager
@ddt.ddt
class VolumeActionsPolicyTest(base.BasePolicyTest):
authorized_users = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_users = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_admins = [
'legacy_admin',
'system_admin',
'project_admin',
]
unauthorized_admins = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.ext_mgr = extensions.ExtensionManager()
self.controller = volume_actions.VolumeActionsController(self.ext_mgr)
self.admin_controller = admin_actions.VolumeAdminController(
self.ext_mgr)
self.volume_controller = volumes.VolumeController(self.ext_mgr)
self.manager = volume_manager.VolumeManager()
self.manager.driver = mock.MagicMock()
self.manager.driver.initialize_connection = mock.MagicMock()
self.manager.driver.initialize_connection.side_effect = (
self._initialize_connection)
self.api_path = '/v3/%s/volumes' % (self.project_id)
self.api_version = mv.BASE_VERSION
def _initialize_connection(self, volume, connector):
return {'data': connector}
def _create_volume(self, attached=False, **kwargs):
vol_type = test_utils.create_volume_type(self.project_admin_context,
name='fake_vol_type',
testcase_instance=self)
volume = test_utils.create_volume(self.project_member_context,
volume_type_id=vol_type.id,
testcase_instance=self, **kwargs)
if attached:
volume = test_utils.attach_volume(self.project_member_context,
volume.id,
fake_constants.INSTANCE_ID,
'fake_host',
'fake_mountpoint')
return volume
@ddt.data(*base.all_users)
def test_extend_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.EXTEND_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
# DB validations will throw VolumeNotFound for some contexts
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_extend_attached_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.EXTEND_ATTACHED_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_EXTEND_INUSE)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_revert_policy(self, user_id):
volume = self._create_volume()
snap = test_utils.create_snapshot(
self.project_member_context,
volume.id,
status=fields.SnapshotStatus.AVAILABLE,
testcase_instance=self)
rule_name = policy.REVERT_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_REVERT)
req.method = 'POST'
body = {
"revert": {
"snapshot_id": snap.id
}
}
# Relax the volume:GET_POLICY in order to get past that check.
self.policy.set_rules({volume_policy.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.volume_controller.revert, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reset_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.RESET_STATUS
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reset_status": {
"status": "available",
"attach_status": "detached",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._reset_status, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_retype_policy(self, user_id):
volume = self._create_volume()
test_utils.create_volume_type(self.project_admin_context,
name='another_vol_type',
testcase_instance=self)
rule_name = policy.RETYPE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-retype": {
"new_type": "another_vol_type",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._retype, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_update_readonly_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.UPDATE_READONLY_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-update_readonly_flag": {
"readonly": True
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_readonly_update, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_force_delete_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.FORCE_DELETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_delete": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_delete, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_force_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.FORCE_DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_detach": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_image_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_IMAGE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_public_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_PUBLIC_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.UPLOAD_IMAGE_PARAMS)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
"visibility": "public",
}
}
# Relax the UPLOAD_IMAGE_POLICY in order to get past that check.
self.policy.set_rules({policy.UPLOAD_IMAGE_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.objects.Service.get_by_id')
def test_migrate_policy(self, user_id, mock_get_service_by_id):
volume = self._create_volume()
rule_name = policy.MIGRATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume": {
"host": "node1@lvm"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._migrate_volume, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_migrate_complete_policy(self, user_id):
volume = self._create_volume()
# Can't use self._create_volume() because it would fail when
# trying to create the volume type a second time.
new_volume = test_utils.create_volume(self.project_member_context,
testcase_instance=self)
rule_name = policy.MIGRATE_COMPLETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume_completion": {
"new_volume": new_volume.id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(
user_id, self.authorized_admins, self.unauthorized_admins,
unauthorized_exceptions, rule_name,
self.admin_controller._migrate_volume_completion, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.attach_volume')
def test_attach_policy(self, user_id, mock_attach_volume):
def attach_volume(context, volume, instance_uuid, host_name,
mountpoint, mode):
return self.manager.attach_volume(context, volume.id,
instance_uuid, host_name,
mountpoint, mode)
mock_attach_volume.side_effect = attach_volume
volume = self._create_volume(status='available')
rule_name = policy.ATTACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-attach": {
"instance_uuid": fake_constants.INSTANCE_ID,
"mountpoint": "/dev/vdc"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._attach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-detach": {
"attachment_id": volume.volume_attachment[0].id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_begin_detaching_policy(self, user_id):
volume = self._create_volume(status='in-use', attach_status='attached')
rule_name = policy.BEGIN_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-begin_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._begin_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reserve_policy(self, user_id):
volume = self._create_volume(status='available')
rule_name = policy.RESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._reserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_unreserve_policy(self, user_id):
volume = self._create_volume(status='reserved')
rule_name = policy.UNRESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-unreserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._unreserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_roll_detaching_policy(self, user_id):
volume = self._create_volume(status='detaching')
rule_name = policy.ROLL_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-roll_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._roll_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.initialize_connection')
def test_initialize_policy(self, user_id, mock_initialize_connection):
def initialize_connection(*args):
return self.manager.initialize_connection(*args)
mock_initialize_connection.side_effect = initialize_connection
volume = self._create_volume()
rule_name = policy.INITIALIZE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-initialize_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._initialize_connection,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_terminate_policy(self, user_id, mock_terminate_connection):
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force=False)
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume()
rule_name = policy.TERMINATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-terminate_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._terminate_connection,
req, id=volume.id, body=body)
class VolumeActionsPolicySecureRbacTest(VolumeActionsPolicyTest):
authorized_users = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
]
unauthorized_users = [
'legacy_owner',
'system_member',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)
# TODO(yikun): The below policy test cases should be added:
# * REVERT_POLICY
# * RESET_STATUS
# * FORCE_DETACH_POLICY
# * UPLOAD_PUBLIC_POLICY
# * UPLOAD_IMAGE_POLICY
# * MIGRATE_POLICY
# * MIGRATE_COMPLETE_POLICY
class VolumeProtectionTests(test_base.CinderPolicyTests):
def test_admin_can_extend_volume(self):
admin_context = self.admin_context

View File

@ -55,14 +55,6 @@
# DELETE /types
"volume_extension:types_manage": ""
# Revert a volume to a snapshot.
# POST /volumes/{volume_id}/action (revert)
"volume:revert_to_snapshot": ""
# Upload a volume to image.
# POST /volumes/{volume_id}/action (os-volume_upload_image)
"volume_extension:volume_actions:upload_image": ""
# List type extra specs.
# GET /types/{type_id}/extra_specs
"volume_extension:types_extra_specs:index": ""

View File

@ -43,26 +43,31 @@ class VolumeTransferTestCase(test.TestCase):
project_id=fake.PROJECT_ID)
self.updated_at = timeutils.utcnow()
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_volume_create_delete(self, mock_notify):
def test_transfer_volume_create_delete(self):
tx_api = transfer_api.API()
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at)
response = tx_api.create(self.ctxt, volume.id, 'Description')
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
response = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('awaiting-transfer', volume['status'],
'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
tx_api.delete(self.ctxt, response['id'])
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
tx_api.delete(self.ctxt, response['id'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.delete.start"),
mock.call(self.ctxt, mock.ANY, "transfer.delete.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('available', volume['status'], 'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.delete.start"),
mock.call(self.ctxt, mock.ANY, "transfer.delete.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(4, mock_notify.call_count)
def test_transfer_invalid_volume(self):
tx_api = transfer_api.API()
@ -84,8 +89,7 @@ class VolumeTransferTestCase(test.TestCase):
tx_api.create,
self.ctxt, volume.id, 'Description')
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_invalid_authkey(self, mock_notify):
def test_transfer_accept_invalid_authkey(self):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
@ -103,39 +107,42 @@ class VolumeTransferTestCase(test.TestCase):
tx_api.accept,
self.ctxt, transfer['id'], 'wrong')
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_invalid_volume(self, mock_notify):
def test_transfer_accept_invalid_volume(self):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at,
volume_type_id=self.vt['id'])
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('awaiting-transfer', volume['status'],
'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume.status = 'wrong'
volume.save()
self.assertRaises(exception.InvalidVolume,
tx_api.accept,
self.ctxt, transfer['id'], transfer['auth_key'])
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
self.assertRaises(exception.InvalidVolume,
tx_api.accept,
self.ctxt, transfer['id'], transfer['auth_key'])
# Because the InvalidVolume exception is raised in tx_api, so
# there is only transfer.accept.start called and missing
# transfer.accept.end.
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start")]
mock_notify.assert_has_calls(calls)
self.assertEqual(1, mock_notify.call_count)
volume.status = 'awaiting-transfer'
volume.save()
# Because the InvalidVolume exception is raised in tx_api, so there is
# only transfer.accept.start called and missing transfer.accept.end.
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start")]
mock_notify.assert_has_calls(calls)
self.assertEqual(3, mock_notify.call_count)
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_volume_in_consistencygroup(self, mock_notify):
def test_transfer_accept_volume_in_consistencygroup(self):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
@ -153,8 +160,7 @@ class VolumeTransferTestCase(test.TestCase):
@mock.patch.object(QUOTAS, "limit_check")
@mock.patch.object(QUOTAS, "reserve")
@mock.patch.object(QUOTAS, "add_volume_type_opts")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept(self, mock_notify, mock_quota_voltype,
def test_transfer_accept(self, mock_quota_voltype,
mock_quota_reserve, mock_quota_limit):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
@ -162,13 +168,26 @@ class VolumeTransferTestCase(test.TestCase):
volume = utils.create_volume(self.ctxt,
volume_type_id=fake.VOLUME_TYPE_ID,
updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID
response = tx_api.accept(self.ctxt,
transfer['id'],
transfer['auth_key'])
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
response = tx_api.accept(self.ctxt,
transfer['id'],
transfer['auth_key'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual(fake.PROJECT2_ID, volume.project_id)
self.assertEqual(fake.USER2_ID, volume.user_id)
@ -178,13 +197,6 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(response['id'], transfer['id'],
'Unexpected transfer id in response.')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create(),
# and twice at accept().
self.assertEqual(4, mock_notify.call_count)
# Check QUOTAS reservation calls
# QUOTAS.add_volume_type_opts
reserve_opt = {'volumes': 1, 'gigabytes': 1}
@ -207,8 +219,7 @@ class VolumeTransferTestCase(test.TestCase):
@mock.patch.object(QUOTAS, "reserve")
@mock.patch.object(QUOTAS, "add_volume_type_opts")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_over_quota(self, mock_notify, mock_quota_voltype,
def test_transfer_accept_over_quota(self, mock_quota_voltype,
mock_quota_reserve):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
@ -216,7 +227,11 @@ class VolumeTransferTestCase(test.TestCase):
volume = utils.create_volume(self.ctxt,
volume_type_id=fake.VOLUME_TYPE_ID,
updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
self.assertEqual(2, mock_notify.call_count)
fake_overs = ['volumes_lvmdriver-3']
fake_quotas = {'gigabytes_lvmdriver-3': 1,
'volumes_lvmdriver-3': 10}
@ -230,19 +245,19 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID
self.assertRaises(exception.VolumeLimitExceeded,
tx_api.accept,
self.ctxt,
transfer['id'],
transfer['auth_key'])
# notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(2, mock_notify.call_count)
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
self.assertRaises(exception.VolumeLimitExceeded,
tx_api.accept,
self.ctxt,
transfer['id'],
transfer['auth_key'])
# notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(0, mock_notify.call_count)
@mock.patch.object(QUOTAS, "limit_check")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_over_quota_check_limit(self, mock_notify,
mock_quota_limit):
def test_transfer_accept_over_quota_check_limit(self, mock_quota_limit):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
@ -261,14 +276,16 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID
self.assertRaises(exception.VolumeSizeExceedsLimit,
tx_api.accept,
self.ctxt,
transfer['id'],
transfer['auth_key'])
# notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(2, mock_notify.call_count)
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
self.assertRaises(exception.VolumeSizeExceedsLimit,
tx_api.accept,
self.ctxt,
transfer['id'],
transfer['auth_key'])
# notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(0, mock_notify.call_count)
def test_transfer_get(self):
tx_api = transfer_api.API()
@ -323,20 +340,22 @@ class VolumeTransferTestCase(test.TestCase):
sort_keys=mock.sentinel.sort_keys)
self.assertEqual(get_all_by_project.return_value, res)
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_delete_transfer_with_deleted_volume(self, mock_notify):
def test_delete_transfer_with_deleted_volume(self):
# create a volume
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at)
# create a transfer
tx_api = transfer_api.API()
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
self.assertEqual(t['id'], transfer['id'], 'Unexpected transfer id')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
# force delete volume
volume.destroy()
# Make sure transfer has been deleted.
@ -345,8 +364,7 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt,
transfer['id'])
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_with_snapshots(self, mock_notify):
def test_transfer_accept_with_snapshots(self):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
@ -356,7 +374,14 @@ class VolumeTransferTestCase(test.TestCase):
utils.create_volume_type(self.ctxt.elevated(),
id=fake.VOLUME_TYPE_ID, name="test_type")
utils.create_snapshot(self.ctxt, volume.id, status='available')
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create().
self.assertEqual(2, mock_notify.call_count)
# Get volume and snapshot quota before accept
self.ctxt.user_id = fake.USER2_ID
@ -366,18 +391,19 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(0, usages.get('volumes', {}).get('in_use', 0))
self.assertEqual(0, usages.get('snapshots', {}).get('in_use', 0))
tx_api.accept(self.ctxt, transfer['id'], transfer['auth_key'])
with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
tx_api.accept(self.ctxt, transfer['id'], transfer['auth_key'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at accept().
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual(fake.PROJECT2_ID, volume.project_id)
self.assertEqual(fake.USER2_ID, volume.user_id)
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create(),
# and twice at accept().
self.assertEqual(4, mock_notify.call_count)
# Get volume and snapshot quota after accept
self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID
@ -386,8 +412,7 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(1, usages.get('volumes', {}).get('in_use', 0))
self.assertEqual(1, usages.get('snapshots', {}).get('in_use', 0))
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_with_snapshots_invalid(self, mock_notify):
def test_transfer_accept_with_snapshots_invalid(self):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()