Volume manage/unmanage support

Implements: blueprint add-export-import-volumes

Volume manage/unmanage support.

This change adds two new API extensions:

volume_unmanage.py:
Adds an "os-unmanage" action on an existing volume, which causes a
delete operation to flow through the stack, with a flag that indicates
that a different method ("unmanage") should be called on the driver
instead of delete_volume. A default, empty, implementation of unmanage
is provided.

volume_manage.py
Adds a new "os-manage-volume" API. A POST to this URI is very similar to
volume creation, except that the images, snapshots and existing volumes
cannot be specified. Instead the following must be specified:

host: Cinder host on which the existing storage resides
ref: Driver-specific reference to the existing storage object

name, description, volume_type, metadata and availability_zone are
supported as per a normal volume creation.

In order to support some re-use between volume_manage and the regular
volume creation, add_visible_admin_metadata has been factored out into
the cinder utils.py module.

The rest of the changes are just the implications of the host/ref
specification moving through the api, rpcapi, manager and flow (API and
Manager) layers. Management of an existing volume causes the
manage_existing_get_size() and manage_existing() methods to be called on
the driver, and a reference LVM implementation is provided.
brick/local_dev/lvm.py now includes a method to rename an LV.

Change-Id: Ifc5255b2fd277c0f60d25fc82a777e405b861320
This commit is contained in:
Geraint North 2014-02-13 09:17:27 -05:00 committed by Avishay Traeger
parent ee371dfc56
commit e6a3206523
22 changed files with 1152 additions and 67 deletions

View File

@ -0,0 +1,165 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from webob import exc
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder.api.v2.views import volumes as volume_views
from cinder.api.v2 import volumes
from cinder import exception
from cinder.openstack.common import log as logging
from cinder.openstack.common import uuidutils
from cinder import utils
from cinder import volume as cinder_volume
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
authorize = extensions.extension_authorizer('volume', 'volume_manage')
class VolumeManageController(wsgi.Controller):
"""The /os-volume-manage controller for the OpenStack API."""
_view_builder_class = volume_views.ViewBuilder
def __init__(self, *args, **kwargs):
super(VolumeManageController, self).__init__(*args, **kwargs)
self.volume_api = cinder_volume.API()
@wsgi.response(202)
@wsgi.serializers(xml=volumes.VolumeTemplate)
@wsgi.deserializers(xml=volumes.CreateDeserializer)
def create(self, req, body):
"""Instruct Cinder to manage a storage object.
Manages an existing backend storage object (e.g. a Linux logical
volume or a SAN disk) by creating the Cinder objects required to manage
it, and possibly renaming the backend storage object
(driver dependent)
From an API perspective, this operation behaves very much like a
volume creation operation, except that properties such as image,
snapshot and volume references don't make sense, because we are taking
an existing storage object into Cinder management.
Required HTTP Body:
{
'volume':
{
'host': <Cinder host on which the existing storage resides>,
'ref': <Driver-specific reference to the existing storage object>,
}
}
See the appropriate Cinder drivers' implementations of the
manage_volume method to find out the accepted format of 'ref'.
This API call will return with an error if any of the above elements
are missing from the request, or if the 'host' element refers to a
cinder host that is not registered.
The volume will later enter the error state if it is discovered that
'ref' is bad.
Optional elements to 'volume' are:
name A name for the new volume.
description A description for the new volume.
volume_type ID or name of a volume type to associate with
the new Cinder volume. Does not necessarily
guarantee that the managed volume will have the
properties described in the volume_type. The
driver may choose to fail if it identifies that
the specified volume_type is not compatible with
the backend storage object.
metadata Key/value pairs to be associated with the new
volume.
availability_zone The availability zone to associate with the new
volume.
"""
context = req.environ['cinder.context']
authorize(context)
if not self.is_valid_body(body, 'volume'):
msg = _("Missing required element '%s' in request body") % 'volume'
raise exc.HTTPBadRequest(explanation=msg)
volume = body['volume']
# Check that the required keys are present, return an error if they
# are not.
required_keys = set(['ref', 'host'])
missing_keys = list(required_keys - set(volume.keys()))
if missing_keys:
msg = _("The following elements are required: %s") % \
', '.join(missing_keys)
raise exc.HTTPBadRequest(explanation=msg)
LOG.debug('Manage volume request body: %s', body)
kwargs = {}
req_volume_type = volume.get('volume_type', None)
if req_volume_type:
try:
if not uuidutils.is_uuid_like(req_volume_type):
kwargs['volume_type'] = \
volume_types.get_volume_type_by_name(
context, req_volume_type)
else:
kwargs['volume_type'] = volume_types.get_volume_type(
context, req_volume_type)
except exception.VolumeTypeNotFound:
msg = _("Volume type not found.")
raise exc.HTTPNotFound(explanation=msg)
else:
kwargs['volume_type'] = {}
kwargs['name'] = volume.get('name', None)
kwargs['description'] = volume.get('description', None)
kwargs['metadata'] = volume.get('metadata', None)
kwargs['availability_zone'] = volume.get('availability_zone', None)
try:
new_volume = self.volume_api.manage_existing(context,
volume['host'],
volume['ref'],
**kwargs)
except exception.ServiceNotFound:
msg = _("Service not found.")
raise exc.HTTPNotFound(explanation=msg)
new_volume = dict(new_volume.iteritems())
utils.add_visible_admin_metadata(context, new_volume, self.volume_api)
return self._view_builder.detail(req, new_volume)
class Volume_manage(extensions.ExtensionDescriptor):
"""Allows existing backend storage to be 'managed' by Cinder."""
name = 'VolumeManage'
alias = 'os-volume-manage'
namespace = ('http://docs.openstack.org/volume/ext/'
'os-volume-manage/api/v1')
updated = '2014-02-10T00:00:00+00:00'
def get_resources(self):
controller = VolumeManageController()
res = extensions.ResourceExtension(Volume_manage.alias,
controller)
return [res]

View File

@ -0,0 +1,78 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from webob import exc
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import volume
LOG = logging.getLogger(__name__)
authorize = extensions.extension_authorizer('volume', 'volume_unmanage')
class VolumeUnmanageController(wsgi.Controller):
def __init__(self, *args, **kwargs):
super(VolumeUnmanageController, self).__init__(*args, **kwargs)
self.volume_api = volume.API()
@wsgi.response(202)
@wsgi.action('os-unmanage')
def unmanage(self, req, id, body):
"""Stop managing a volume.
This action is very much like a delete, except that a different
method (unmanage) is called on the Cinder driver. This has the effect
of removing the volume from Cinder management without actually
removing the backend storage object associated with it.
There are no required parameters.
A Not Found error is returned if the specified volume does not exist.
A Bad Request error is returned if the specified volume is still
attached to an instance.
"""
context = req.environ['cinder.context']
authorize(context)
LOG.audit(_("Unmanage volume with id: %s"), id, context=context)
try:
vol = self.volume_api.get(context, id)
self.volume_api.delete(context, vol, unmanage_only=True)
except exception.NotFound:
msg = _("Volume could not be found")
raise exc.HTTPNotFound(explanation=msg)
except exception.VolumeAttached:
msg = _("Volume cannot be deleted while in attached state")
raise exc.HTTPBadRequest(explanation=msg)
return webob.Response(status_int=202)
class Volume_unmanage(extensions.ExtensionDescriptor):
"""Enable volume unmanage operation."""
name = "VolumeUnmanage"
alias = "os-volume-unmanage"
namespace = "http://docs.openstack.org/volume/ext/volume-unmanage/api/v1.1"
updated = "2012-05-31T00:00:00+00:00"
def get_controller_extensions(self):
controller = VolumeUnmanageController()
extension = extensions.ControllerExtension(self, 'volumes', controller)
return [extension]

View File

@ -153,59 +153,11 @@ class VolumeController(wsgi.Controller):
_view_builder_class = volume_views.ViewBuilder
_visible_admin_metadata_keys = ['readonly', 'attached_mode']
def __init__(self, ext_mgr):
self.volume_api = cinder_volume.API()
self.ext_mgr = ext_mgr
super(VolumeController, self).__init__()
def _add_visible_admin_metadata(self, context, volume):
if context is None:
return
visible_admin_meta = {}
if context.is_admin:
volume_tmp = volume
else:
try:
volume_tmp = self.volume_api.get(context.elevated(),
volume['id'])
except Exception:
return
if volume_tmp.get('volume_admin_metadata'):
for item in volume_tmp['volume_admin_metadata']:
if item['key'] in self._visible_admin_metadata_keys:
visible_admin_meta[item['key']] = item['value']
# avoid circular ref when volume is a Volume instance
elif (volume_tmp.get('admin_metadata') and
isinstance(volume_tmp.get('admin_metadata'), dict)):
for key in self._visible_admin_metadata_keys:
if key in volume_tmp['admin_metadata'].keys():
visible_admin_meta[key] = volume_tmp['admin_metadata'][key]
if not visible_admin_meta:
return
# NOTE(zhiyan): update visible administration metadata to
# volume metadata, administration metadata will rewrite existing key.
if volume.get('volume_metadata'):
orig_meta = list(volume.get('volume_metadata'))
for item in orig_meta:
if item['key'] in visible_admin_meta.keys():
item['value'] = visible_admin_meta.pop(item['key'])
for key, value in visible_admin_meta.iteritems():
orig_meta.append({'key': key, 'value': value})
volume['volume_metadata'] = orig_meta
# avoid circular ref when vol is a Volume instance
elif (volume.get('metadata') and
isinstance(volume.get('metadata'), dict)):
volume['metadata'].update(visible_admin_meta)
else:
volume['metadata'] = visible_admin_meta
@wsgi.serializers(xml=VolumeTemplate)
def show(self, req, id):
"""Return data about the given volume."""
@ -218,7 +170,7 @@ class VolumeController(wsgi.Controller):
msg = _("Volume could not be found")
raise exc.HTTPNotFound(explanation=msg)
self._add_visible_admin_metadata(context, vol)
utils.add_visible_admin_metadata(context, vol, self.volume_api)
return self._view_builder.detail(req, vol)
@ -279,7 +231,7 @@ class VolumeController(wsgi.Controller):
volumes = [dict(vol.iteritems()) for vol in volumes]
for volume in volumes:
self._add_visible_admin_metadata(context, volume)
utils.add_visible_admin_metadata(context, volume, self.volume_api)
limited_list = common.limited(volumes, req)
@ -398,7 +350,7 @@ class VolumeController(wsgi.Controller):
# a dict to avoid an error.
new_volume = dict(new_volume.iteritems())
self._add_visible_admin_metadata(context, new_volume)
utils.add_visible_admin_metadata(context, new_volume, self.volume_api)
retval = self._view_builder.detail(req, new_volume)
@ -455,7 +407,7 @@ class VolumeController(wsgi.Controller):
volume.update(update_dict)
self._add_visible_admin_metadata(context, volume)
utils.add_visible_admin_metadata(context, volume, self.volume_api)
volume_utils.notify_about_volume_usage(context, volume,
'update.end')

View File

@ -667,3 +667,17 @@ class LVM(executor.Executor):
def vg_mirror_size(self, mirror_count):
return (self.vg_free_space / (mirror_count + 1))
def rename_volume(self, lv_name, new_name):
"""Change the name of an existing volume."""
try:
self._execute('lvrename', self.vg_name, lv_name, new_name,
root_helper=self._root_helper,
run_as_root=True)
except putils.ProcessExecutionError as err:
LOG.exception(_('Error renaming logical volume'))
LOG.error(_('Cmd :%s') % err.cmd)
LOG.error(_('StdOut :%s') % err.stdout)
LOG.error(_('StdErr :%s') % err.stderr)
raise

View File

@ -562,6 +562,16 @@ class KeyManagerError(CinderException):
msg_fmt = _("key manager error: %(reason)s")
class ManageExistingInvalidReference(CinderException):
message = _("Manage existing volume failed due to invalid backend "
"reference %(existing_ref)s: %(reason)s")
class ManageExistingVolumeTypeMismatch(CinderException):
message = _("Manage existing volume failed due to volume type mismatch: "
"%(reason)s")
# Driver specific exceptions
# Coraid
class CoraidException(VolumeDriverException):

View File

@ -50,7 +50,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.4'
RPC_API_VERSION = '1.5'
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
@ -185,6 +185,30 @@ class SchedulerManager(manager.Manager):
new_type['id'], tgt_host,
migration_policy, reservations)
def manage_existing(self, context, topic, volume_id,
request_spec, filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
def _manage_existing_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('manage_existing', volume_state,
context, ex, request_spec)
volume_ref = db.volume_get(context, volume_id)
try:
tgt_host = self.driver.host_passes_filters(context,
volume_ref['host'],
request_spec,
filter_properties)
except exception.NoValidHost as ex:
_manage_existing_set_error(self, context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
_manage_existing_set_error(self, context, ex, request_spec)
else:
volume_rpcapi.VolumeAPI().manage_existing(context, volume_ref,
request_spec.get('ref'))
def _set_volume_state_and_notify(self, method, updates, context, ex,
request_spec, msg=None):
# TODO(harlowja): move into a task that just does this later.

View File

@ -36,6 +36,7 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
to create_volume()
1.3 - Add migrate_volume_to_host() method
1.4 - Add retype method
1.5 - Add manage_existing method
'''
RPC_API_VERSION = '1.0'
@ -84,6 +85,17 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
filter_properties=filter_properties),
version='1.4')
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
request_spec_p = jsonutils.to_primitive(request_spec)
return self.cast(ctxt, self.make_msg(
'manage_existing',
topic=topic,
volume_id=volume_id,
request_spec=request_spec_p,
filter_properties=filter_properties),
version='1.5')
def update_service_capabilities(self, ctxt,
service_name, host,
capabilities):

View File

@ -0,0 +1,217 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import webob
from cinder import context
from cinder import exception
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
def app():
# no auth, just let environ['cinder.context'] pass through
api = fakes.router.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v2'] = api
return mapper
def db_service_get_by_host_and_topic(context, host, topic):
"""Replacement for db.service_get_by_host_and_topic.
We stub the db.service_get_by_host_and_topic method to return something
for a specific host, and raise an exception for anything else. We don't
use the returned data (the code under test just use the call to check for
existence of a host, so the content returned doesn't matter.
"""
if host == 'host_ok':
return {}
raise exception.ServiceNotFound(service_id=host)
# Some of the tests check that volume types are correctly validated during a
# volume manage operation. This data structure represents an existing volume
# type.
fake_vt = {'id': 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
'name': 'good_fakevt'}
def vt_get_volume_type_by_name(context, name):
"""Replacement for cinder.volume.volume_types.get_volume_type_by_name.
Overrides cinder.volume.volume_types.get_volume_type_by_name to return
the volume type based on inspection of our fake structure, rather than
going to the Cinder DB.
"""
if name == fake_vt['name']:
return fake_vt
raise exception.VolumeTypeNotFoundByName(volume_type_name=name)
def vt_get_volume_type(context, vt_id):
"""Replacement for cinder.volume.volume_types.get_volume_type.
Overrides cinder.volume.volume_types.get_volume_type to return the
volume type based on inspection of our fake structure, rather than going
to the Cinder DB.
"""
if vt_id == fake_vt['id']:
return fake_vt
raise exception.VolumeTypeNotFound(volume_type_id=vt_id)
def api_manage(*args, **kwargs):
"""Replacement for cinder.volume.api.API.manage_existing.
Overrides cinder.volume.api.API.manage_existing to return some fake volume
data structure, rather than initiating a real volume managing.
Note that we don't try to replicate any passed-in information (e.g. name,
volume type) in the returned structure.
"""
vol = {
'status': 'creating',
'display_name': 'fake_name',
'availability_zone': 'nova',
'tenant_id': 'fake',
'created_at': 'DONTCARE',
'id': 'ffffffff-0000-ffff-0000-ffffffffffff',
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
'launched_at': 'DONTCARE',
'size': 0,
'attach_status': 'detached',
'volume_type_id': None}
return vol
@mock.patch('cinder.db.service_get_by_host_and_topic',
db_service_get_by_host_and_topic)
@mock.patch('cinder.volume.volume_types.get_volume_type_by_name',
vt_get_volume_type_by_name)
@mock.patch('cinder.volume.volume_types.get_volume_type',
vt_get_volume_type)
class VolumeManageTest(test.TestCase):
"""Test cases for cinder/api/contrib/volume_manage.py
The API extension adds a POST /os-volume-manage API that is passed a cinder
host name, and a driver-specific reference parameter. If everything
is passed correctly, then the cinder.volume.api.API.manage_existing method
is invoked to manage an existing storage object on the host.
In this set of test cases, we are ensuring that the code correctly parses
the request structure and raises the correct exceptions when things are not
right, and calls down into cinder.volume.api.API.manage_existing with the
correct arguments.
"""
def setUp(self):
super(VolumeManageTest, self).setUp()
def _get_resp(self, body):
"""Helper to execute an os-volume-manage API call."""
req = webob.Request.blank('/v2/fake/os-volume-manage')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.environ['cinder.context'] = context.RequestContext('admin',
'fake',
True)
req.body = jsonutils.dumps(body)
res = req.get_response(app())
return res
@mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage)
def test_manage_volume_ok(self, mock_api_manage):
"""Test successful manage volume execution.
Tests for correct operation when valid arguments are passed in the
request body. We ensure that cinder.volume.api.API.manage_existing got
called with the correct arguments, and that we return the correct HTTP
code to the caller.
"""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 202, res)
# Check that the manage API was called with the correct arguments.
self.assertEqual(mock_api_manage.call_count, 1)
args = mock_api_manage.call_args[0]
self.assertEqual(args[1], body['volume']['host'])
self.assertEqual(args[2], body['volume']['ref'])
def test_manage_volume_missing_host(self):
"""Test correct failure when host is not specified."""
body = {'volume': {'ref': 'fake_ref'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 400)
def test_manage_volume_missing_ref(self):
"""Test correct failure when the ref is not specified."""
body = {'volume': {'host': 'host_ok'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 400)
pass
@mock.patch('cinder.volume.api.API.manage_existing', api_manage)
def test_manage_volume_volume_type_by_uuid(self):
"""Tests for correct operation when a volume type is specified by ID.
We wrap cinder.volume.api.API.manage_existing so that managing is not
actually attempted.
"""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref',
'volume_type':
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 202, res)
pass
@mock.patch('cinder.volume.api.API.manage_existing', api_manage)
def test_manage_volume_volume_type_by_name(self):
"""Tests for correct operation when a volume type is specified by name.
We wrap cinder.volume.api.API.manage_existing so that managing is not
actually attempted.
"""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref',
'volume_type': 'good_fakevt'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 202, res)
pass
def test_manage_volume_bad_volume_type_by_uuid(self):
"""Test failure on nonexistent volume type specified by ID."""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref',
'volume_type':
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 404, res)
pass
def test_manage_volume_bad_volume_type_by_name(self):
"""Test failure on nonexistent volume type specified by name."""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref',
'volume_type': 'bad_fakevt'}}
res = self._get_resp(body)
self.assertEqual(res.status_int, 404, res)
pass

View File

@ -0,0 +1,159 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import webob
from cinder import context
from cinder import exception
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
# This list of fake volumes is used by our tests. Each is configured in a
# slightly different way, and includes only the properties that are required
# for these particular tests to function correctly.
snapshot_vol_id = 'ffffffff-0000-ffff-0000-fffffffffffd'
detached_vol_id = 'ffffffff-0000-ffff-0000-fffffffffffe'
attached_vol_id = 'ffffffff-0000-ffff-0000-ffffffffffff'
bad_vol_id = 'ffffffff-0000-ffff-0000-fffffffffff0'
vols = {snapshot_vol_id: {'id': snapshot_vol_id,
'status': 'available',
'attach_status': 'detached',
'host': 'fake_host',
'project_id': 'fake_project',
'migration_status': None,
'encryption_key_id': None},
detached_vol_id: {'id': detached_vol_id,
'status': 'available',
'attach_status': 'detached',
'host': 'fake_host',
'project_id': 'fake_project',
'migration_status': None,
'encryption_key_id': None},
attached_vol_id: {'id': attached_vol_id,
'status': 'available',
'attach_status': 'attached',
'host': 'fake_host',
'project_id': 'fake_project',
'migration_status': None,
'encryption_key_id': None}
}
def app():
# no auth, just let environ['cinder.context'] pass through
api = fakes.router.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v2'] = api
return mapper
def api_get(self, context, volume_id):
"""Replacement for cinder.volume.api.API.get.
We stub the cinder.volume.api.API.get method to check for the existence
of volume_id in our list of fake volumes and raise an exception if the
specified volume ID is not in our list.
"""
vol = vols.get(volume_id, None)
if not vol:
raise exception.NotFound
return vol
def db_snapshot_get_all_for_volume(context, volume_id):
"""Replacement for cinder.db.snapshot_get_all_for_volume.
We stub the cinder.db.snapshot_get_all_for_volume method because when we
go to unmanage a volume, the code checks for snapshots and won't unmanage
volumes with snapshots. For these tests, only the snapshot_vol_id reports
any snapshots. The delete code just checks for array length, doesn't
inspect the contents.
"""
if volume_id == snapshot_vol_id:
return ['fake_snapshot']
return []
@mock.patch('cinder.volume.api.API.get', api_get)
@mock.patch('cinder.db.snapshot_get_all_for_volume',
db_snapshot_get_all_for_volume)
class VolumeUnmanageTest(test.TestCase):
"""Test cases for cinder/api/contrib/volume_unmanage.py
The API extension adds an action to volumes, "os-unmanage", which will
effectively issue a delete operation on the volume, but with a flag set
that means that a different method will be invoked on the driver, so that
the volume is not actually deleted in the storage backend.
In this set of test cases, we are ensuring that the code correctly parses
the request structure and raises the correct exceptions when things are not
right, and calls down into cinder.volume.api.API.delete with the correct
arguments.
"""
def setUp(self):
super(VolumeUnmanageTest, self).setUp()
def _get_resp(self, volume_id):
"""Helper to build an os-unmanage req for the specified volume_id."""
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id)
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.environ['cinder.context'] = context.RequestContext('admin',
'fake',
True)
body = {'os-unmanage': ''}
req.body = jsonutils.dumps(body)
res = req.get_response(app())
return res
@mock.patch('cinder.db.volume_update')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.delete_volume')
def test_unmanage_volume_ok(self, mock_rpcapi, mock_db):
"""Return success for valid and unattached volume."""
res = self._get_resp(detached_vol_id)
# volume_update is (context, id, new_data)
self.assertEqual(mock_db.call_count, 1)
self.assertEqual(len(mock_db.call_args[0]), 3, mock_db.call_args)
self.assertEqual(mock_db.call_args[0][1], detached_vol_id)
# delete_volume is (context, status, unmanageOnly)
self.assertEqual(mock_rpcapi.call_count, 1)
self.assertEqual(len(mock_rpcapi.call_args[0]), 3)
self.assertEqual(mock_rpcapi.call_args[0][2], True)
self.assertEqual(res.status_int, 202, res)
def test_unmanage_volume_bad_volume_id(self):
"""Return 404 if the volume does not exist."""
res = self._get_resp(bad_vol_id)
self.assertEqual(res.status_int, 404, res)
def test_unmanage_volume_attached_(self):
"""Return 400 if the volume exists but is attached."""
res = self._get_resp(attached_vol_id)
self.assertEqual(res.status_int, 400, res)
def test_unmanage_volume_with_snapshots(self):
"""Return 400 if the volume exists but has snapshots."""
res = self._get_resp(snapshot_vol_id)
self.assertEqual(res.status_int, 400, res)

View File

@ -32,9 +32,9 @@ from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
from cinder.tests.image import fake as fake_image
from cinder import utils
from cinder.volume import api as volume_api
CONF = cfg.CONF
NS = '{http://docs.openstack.org/api/openstack-volume/2.0/content}'
@ -1134,8 +1134,9 @@ class VolumeApiTest(test.TestCase):
volume = dict(volume_admin_metadata=admin_metadata,
volume_metadata=metadata)
admin_ctx = context.get_admin_context()
self.controller._add_visible_admin_metadata(admin_ctx,
volume)
utils.add_visible_admin_metadata(admin_ctx, volume,
self.controller.volume_api)
self.assertEqual(volume['volume_metadata'],
[{"key": "key", "value": "value"},
{"key": "readonly", "value": "visible"},
@ -1148,8 +1149,8 @@ class VolumeApiTest(test.TestCase):
volume = dict(admin_metadata=admin_metadata,
metadata=metadata)
admin_ctx = context.get_admin_context()
self.controller._add_visible_admin_metadata(admin_ctx,
volume)
utils.add_visible_admin_metadata(admin_ctx, volume,
self.controller.volume_api)
self.assertEqual(volume['metadata'],
{'key': 'value',
'attached_mode': 'visible',

View File

@ -70,3 +70,6 @@ class FakeBrickLVM(object):
def activate_lv(self, lv, is_snapshot=False):
pass
def rename_volume(self, lv_name, new_name):
pass

View File

@ -57,6 +57,8 @@
"volume_extension:quotas:show": [],
"volume_extension:quotas:update": [],
"volume_extension:quota_classes": [],
"volume_extension:volume_manage": [["rule:admin_api"]],
"volume_extension:volume_unmanage": [["rule:admin_api"]],
"limits_extension:used_limits": [],

View File

@ -109,3 +109,14 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.4')
@mock.patch('cinder.openstack.common.rpc.cast')
def test_manage_existing(self, _mock_rpc_method):
self._test_scheduler_api('manage_existing',
rpc_method='cast',
_mock_method=_mock_rpc_method,
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.5')

View File

@ -2867,6 +2867,80 @@ class LVMISCSIVolumeDriverTestCase(DriverTestCase):
self.assertEqual(moved, True)
self.assertIsNone(model_update)
@staticmethod
def _get_manage_existing_lvs(name):
"""Helper method used by the manage_existing tests below."""
lvs = [{'name': 'fake_lv', 'size': '1.75'},
{'name': 'fake_lv_bad_size', 'size': 'Not a float'}]
for lv in lvs:
if lv['name'] == name:
return lv
def _setup_stubs_for_manage_existing(self):
"""Helper to set up common stubs for the manage_existing tests."""
self.volume.driver.vg = FakeBrickLVM('cinder-volumes',
False,
None,
'default')
self.stubs.Set(self.volume.driver.vg, 'get_volume',
self._get_manage_existing_lvs)
def test_lvm_manage_existing(self):
"""Good pass on managing an LVM volume.
This test case ensures that, when a logical volume with the
specified name exists, and the size is as expected, no error is
returned from driver.manage_existing, and that the rename_volume
function is called in the Brick LVM code with the correct arguments.
"""
self._setup_stubs_for_manage_existing()
ref = {'lv_name': 'fake_lv'}
vol = {'name': 'test', 'id': 1, 'size': 0}
def _rename_volume(old_name, new_name):
self.assertEqual(old_name, ref['lv_name'])
self.assertEqual(new_name, vol['name'])
self.stubs.Set(self.volume.driver.vg, 'rename_volume',
_rename_volume)
size = self.volume.driver.manage_existing_get_size(vol, ref)
self.assertEqual(size, 2)
model_update = self.volume.driver.manage_existing(vol, ref)
self.assertIsNone(model_update)
def test_lvm_manage_existing_bad_size(self):
"""Make sure correct exception on bad size returned from LVM.
This test case ensures that the correct exception is raised when
the information returned for the existing LVs is not in the format
that the manage_existing code expects.
"""
self._setup_stubs_for_manage_existing()
ref = {'lv_name': 'fake_lv_bad_size'}
vol = {'name': 'test', 'id': 1, 'size': 2}
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.driver.manage_existing_get_size,
vol, ref)
def test_lvm_manage_existing_bad_ref(self):
"""Error case where specified LV doesn't exist.
This test case ensures that the correct exception is raised when
the caller attempts to manage a volume that does not exist.
"""
self._setup_stubs_for_manage_existing()
ref = {'lv_name': 'fake_nonexistent_lv'}
vol = {'name': 'test', 'id': 1, 'size': 0, 'status': 'available'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.volume.driver.manage_existing_get_size,
vol, ref)
class LVMVolumeDriverTestCase(DriverTestCase):
"""Test case for VolumeDriver"""

View File

@ -154,7 +154,9 @@ class VolumeRpcAPITestCase(test.TestCase):
def test_delete_volume(self):
self._test_volume_api('delete_volume',
rpc_method='cast',
volume=self.fake_volume)
volume=self.fake_volume,
unmanage_only=False,
version='1.15')
def test_create_snapshot(self):
self._test_volume_api('create_snapshot',
@ -268,3 +270,10 @@ class VolumeRpcAPITestCase(test.TestCase):
migration_policy='never',
reservations=None,
version='1.12')
def test_manage_existing(self):
self._test_volume_api('manage_existing',
rpc_method='cast',
volume=self.fake_volume,
ref={'lv_name': 'foo'},
version='1.15')

View File

@ -791,3 +791,57 @@ def check_string_length(value, name, min_length=0, max_length=None):
msg = _("%(name)s has more than %(max_length)s "
"characters.") % {'name': name, 'max_length': max_length}
raise exception.InvalidInput(message=msg)
_visible_admin_metadata_keys = ['readonly', 'attached_mode']
def add_visible_admin_metadata(context, volume, volume_api):
"""Add user-visible admin metadata to regular metadata.
Extracts the admin metadata keys that are to be made visible to
non-administrators, and adds them to the regular metadata structure for the
passed-in volume.
"""
if context is None:
return
visible_admin_meta = {}
if context.is_admin:
volume_tmp = volume
else:
try:
volume_tmp = volume_api.get(context.elevated(), volume['id'])
except Exception:
return
if volume_tmp.get('volume_admin_metadata'):
for item in volume_tmp['volume_admin_metadata']:
if item['key'] in _visible_admin_metadata_keys:
visible_admin_meta[item['key']] = item['value']
# avoid circular ref when volume is a Volume instance
elif (volume_tmp.get('admin_metadata') and
isinstance(volume_tmp.get('admin_metadata'), dict)):
for key in _visible_admin_metadata_keys:
if key in volume_tmp['admin_metadata'].keys():
visible_admin_meta[key] = volume_tmp['admin_metadata'][key]
if not visible_admin_meta:
return
# NOTE(zhiyan): update visible administration metadata to
# volume metadata, administration metadata will rewrite existing key.
if volume.get('volume_metadata'):
orig_meta = list(volume.get('volume_metadata'))
for item in orig_meta:
if item['key'] in visible_admin_meta.keys():
item['value'] = visible_admin_meta.pop(item['key'])
for key, value in visible_admin_meta.iteritems():
orig_meta.append({'key': key, 'value': value})
volume['volume_metadata'] = orig_meta
# avoid circular ref when vol is a Volume instance
elif (volume.get('metadata') and
isinstance(volume.get('metadata'), dict)):
volume['metadata'].update(visible_admin_meta)
else:
volume['metadata'] = visible_admin_meta

View File

@ -191,7 +191,7 @@ class API(base.Base):
return volume
@wrap_check_policy
def delete(self, context, volume, force=False):
def delete(self, context, volume, force=False, unmanage_only=False):
if context.is_admin and context.project_id != volume['project_id']:
project_id = volume['project_id']
else:
@ -254,7 +254,7 @@ class API(base.Base):
self.db.volume_update(context, volume_id, {'status': 'deleting',
'terminated_at': now})
self.volume_rpcapi.delete_volume(context, volume)
self.volume_rpcapi.delete_volume(context, volume, unmanage_only)
@wrap_check_policy
def update(self, context, volume, fields):
@ -981,6 +981,47 @@ class API(base.Base):
request_spec=request_spec,
filter_properties={})
def manage_existing(self, context, host, ref, name=None, description=None,
volume_type=None, metadata=None,
availability_zone=None):
if availability_zone is None:
elevated = context.elevated()
try:
service = self.db.service_get_by_host_and_topic(
elevated, host, CONF.volume_topic)
except exception.ServiceNotFound:
with excutils.save_and_reraise_exception():
LOG.error(_('Unable to find service for given host.'))
availability_zone = service.get('availability_zone')
volume_type_id = volume_type['id'] if volume_type else None
volume_properties = {
'size': 0,
'user_id': context.user_id,
'project_id': context.project_id,
'status': 'creating',
'attach_status': 'detached',
# Rename these to the internal name.
'display_description': description,
'display_name': name,
'host': host,
'availability_zone': availability_zone,
'volume_type_id': volume_type_id,
'metadata': metadata
}
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume = self.db.volume_create(context, volume_properties)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume['id'],
'ref': ref}
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
volume['id'],
request_spec=request_spec)
return volume
class HostAPI(base.Base):
def __init__(self):

View File

@ -515,6 +515,57 @@ class VolumeDriver(object):
"""Accept the transfer of a volume for a new user/project."""
pass
def manage_existing(self, volume, existing_ref):
"""Brings an existing backend storage object under Cinder management.
existing_ref is passed straight through from the API request's
manage_existing_ref value, and it is up to the driver how this should
be interpreted. It should be sufficient to identify a storage object
that the driver should somehow associate with the newly-created cinder
volume structure.
There are two ways to do this:
1. Rename the backend storage object so that it matches the,
volume['name'] which is how drivers traditionally map between a
cinder volume and the associated backend storage object.
2. Place some metadata on the volume, or somewhere in the backend, that
allows other driver requests (e.g. delete, clone, attach, detach...)
to locate the backend storage object when required.
If the existing_ref doesn't make sense, or doesn't refer to an existing
backend storage object, raise a ManageExistingInvalidReference
exception.
The volume may have a volume_type, and the driver can inspect that and
compare against the properties of the referenced backend storage
object. If they are incompatible, raise a
ManageExistingVolumeTypeMismatch, specifying a reason for the failure.
"""
msg = _("Manage existing volume not implemented.")
raise NotImplementedError(msg)
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of volume to be managed by manage_existing.
When calculating the size, round up to the next GB.
"""
msg = _("Manage existing volume not implemented.")
raise NotImplementedError(msg)
def unmanage(self, volume):
"""Removes the specified volume from Cinder management.
Does not delete the underlying backend storage object.
For most drivers, this will not need to do anything. However, some
drivers might use this call as an opportunity to clean up any
Cinder-specific configuration that they have associated with the
backend storage object.
"""
pass
class ISCSIDriver(VolumeDriver):
"""Executes commands relating to ISCSI volumes.

View File

@ -18,6 +18,7 @@ Driver for Linux servers running LVM.
"""
import math
import os
import socket
@ -386,6 +387,61 @@ class LVMVolumeDriver(driver.VolumeDriver):
self.vg.extend_volume(volume['name'],
self._sizestr(new_size))
def manage_existing(self, volume, existing_ref):
"""Manages an existing LV.
Renames the LV to match the expected name for the volume.
Error checking done by manage_existing_get_size is not repeated.
"""
lv_name = existing_ref['lv_name']
lv = self.vg.get_volume(lv_name)
# Attempt to rename the LV to match the OpenStack internal name.
try:
self.vg.rename_volume(lv_name, volume['name'])
except processutils.ProcessExecutionError as exc:
exception_message = (_("Failed to rename logical volume %(name)s, "
"error message was: %(err_msg)s")
% {'name': lv_name,
'err_msg': exc.stderr})
raise exception.VolumeBackendAPIException(
data=exception_message)
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of an existing LV for manage_existing.
existing_ref is a dictionary of the form:
{'lv_name': <name of LV>}
"""
# Check that the reference is valid
if 'lv_name' not in existing_ref:
reason = _('Reference must contain lv_name element.')
raise exception.ManageExistingInvalidReference(
existing_ref=existing_ref, reason=reason)
lv_name = existing_ref['lv_name']
lv = self.vg.get_volume(lv_name)
# Raise an exception if we didn't find a suitable LV.
if not lv:
kwargs = {'existing_ref': lv_name,
'reason': 'Specified logical volume does not exist.'}
raise exception.ManageExistingInvalidReference(**kwargs)
# LV size is returned in gigabytes. Attempt to parse size as a float
# and round up to the next integer.
try:
lv_size = int(math.ceil(float(lv['size'])))
except ValueError:
exception_message = (_("Failed to manage existing volume "
"%(name)s, because reported size %(size)s "
"was not a floating-point number.")
% {'name': lv_name,
'size': lv['size']})
raise exception.VolumeBackendAPIException(
data=exception_message)
return lv_size
class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
"""Executes commands relating to ISCSI volumes.

View File

@ -0,0 +1,114 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.engines
from taskflow.patterns import linear_flow
from cinder import exception
from cinder import flow_utils
from cinder.openstack.common import log as logging
from cinder.volume.flows.api import create_volume as create_api
from cinder.volume.flows.manager import create_volume as create_mgr
LOG = logging.getLogger(__name__)
ACTION = 'volume:manage_existing'
class PrepareForQuotaReservationTask(flow_utils.CinderTask):
"""Gets the volume size from the driver."""
default_provides = set(['size', 'volume_type_id', 'volume_properties',
'volume_spec'])
def __init__(self, db, driver):
super(PrepareForQuotaReservationTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
def execute(self, context, volume_ref, manage_existing_ref):
volume_id = volume_ref['id']
if not self.driver.initialized:
driver_name = self.driver.__class__.__name__
LOG.error(_("Unable to manage existing volume. "
"Volume driver %s not initialized.") % driver_name)
self.db.volume_update(context, volume_id, dict(status='error'))
raise exception.DriverNotInitialized()
size = self.driver.manage_existing_get_size(volume_ref,
manage_existing_ref)
return {'size': size,
'volume_type_id': volume_ref['volume_type_id'],
'volume_properties': volume_ref,
'volume_spec': {'status': volume_ref['status'],
'volume_name': volume_ref['name'],
'volume_id': volume_ref['id']}}
class ManageExistingTask(flow_utils.CinderTask):
"""Brings an existing volume under Cinder management."""
default_provides = set(['volume'])
def __init__(self, db, driver):
super(ManageExistingTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
def execute(self, context, volume_ref, manage_existing_ref, size):
model_update = self.driver.manage_existing(volume_ref,
manage_existing_ref)
if not model_update:
model_update = {}
model_update.update({'size': size})
try:
volume_ref = self.db.volume_update(context, volume_ref['id'],
model_update)
except exception.CinderException:
LOG.exception(_("Failed updating model of volume %(volume_id)s"
" with creation provided model %(model)s") %
{'volume_id': volume_ref['id'],
'model': model_update})
raise
return {'volume': volume_ref}
def get_flow(context, db, driver, host, volume_id, ref):
"""Constructs and returns the manager entrypoint flow."""
flow_name = ACTION.replace(":", "_") + "_manager"
volume_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
create_what = {
'context': context,
'volume_id': volume_id,
'manage_existing_ref': ref
}
volume_flow.add(create_mgr.ExtractVolumeRefTask(db, host),
create_mgr.NotifyVolumeActionTask(db,
"manage_existing.start"),
PrepareForQuotaReservationTask(db, driver),
create_api.QuotaReserveTask(),
ManageExistingTask(db, driver),
create_api.QuotaCommitTask(),
create_mgr.CreateVolumeOnFinishTask(db, "create.end"))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(volume_flow, store=create_what)

View File

@ -55,6 +55,7 @@ from cinder import quota
from cinder import utils
from cinder.volume.configuration import Configuration
from cinder.volume.flows.manager import create_volume
from cinder.volume.flows.manager import manage_existing
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
@ -163,7 +164,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.14'
RPC_API_VERSION = '1.15'
def __init__(self, volume_driver=None, service_name=None,
*args, **kwargs):
@ -353,7 +354,7 @@ class VolumeManager(manager.SchedulerDependentManager):
return volume_ref['id']
@locked_volume_operation
def delete_volume(self, context, volume_id):
def delete_volume(self, context, volume_id, unmanage_only=False):
"""Deletes and unexports volume."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
@ -381,7 +382,10 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("volume %s: removing export"), volume_ref['id'])
self.driver.remove_export(context, volume_ref)
LOG.debug(_("volume %s: deleting"), volume_ref['id'])
self.driver.delete_volume(volume_ref)
if unmanage_only:
self.driver.unmanage(volume_ref)
else:
self.driver.delete_volume(volume_ref)
except exception.VolumeIsBusy:
LOG.error(_("Cannot delete volume %s: volume is busy"),
volume_ref['id'])
@ -1253,6 +1257,28 @@ class VolumeManager(manager.SchedulerDependentManager):
QUOTAS.commit(context, new_reservations, project_id=project_id)
self.publish_service_capabilities(context)
def manage_existing(self, ctxt, volume_id, ref=None):
LOG.debug('manage_existing: managing %s' % ref)
try:
flow_engine = manage_existing.get_flow(
ctxt,
self.db,
self.driver,
self.host,
volume_id,
ref)
except Exception:
LOG.exception(_("Failed to create manage_existing flow."))
raise exception.CinderException(
_("Failed to create manage existing flow."))
flow_engine.run()
# Fetch created volume from storage
volume_ref = flow_engine.storage.fetch('volume')
# Update volume stats
self.stats['allocated_capacity_gb'] += volume_ref['size']
return volume_ref['id']
def _add_or_delete_fc_connection(self, conn_info, zone_op):
"""Add or delete connection control to fibre channel network.

View File

@ -49,6 +49,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
1.12 - Adds retype.
1.13 - Adds create_export.
1.14 - Adds reservation parameter to extend_volume().
1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
'''
BASE_RPC_API_VERSION = '1.0'
@ -79,11 +80,13 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
host),
version='1.4')
def delete_volume(self, ctxt, volume):
def delete_volume(self, ctxt, volume, unmanage_only=False):
self.cast(ctxt,
self.make_msg('delete_volume',
volume_id=volume['id']),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
volume_id=volume['id'],
unmanage_only=unmanage_only),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
version='1.15')
def create_snapshot(self, ctxt, volume, snapshot):
self.cast(ctxt, self.make_msg('create_snapshot',
@ -206,3 +209,12 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
self.topic,
volume['host']),
version='1.13')
def manage_existing(self, ctxt, volume, ref):
return self.cast(ctxt, self.make_msg('manage_existing',
volume_id=volume['id'],
ref=ref),
topic=rpc.queue_get_for(ctxt,
self.topic,
volume['host']),
version='1.15')