Add ability to modify volume type

This patch adds the volume retype operation, which allows a user to
change a given volume's type.

When retyping, the scheduler checks if the volume's current host can
accept the new type (i.e., it checks if the host passes the filters when
using the new type). If the current host is suitable, its manager is
called which calls upon the driver to change the volume's type.

There are two cases where a retype operation may require migrating the
volume:
1. The volume's current host cannot accept the new type
2. The volume's driver cannot perform the retype operation

In case of a migration, a volume with the new type is created, and the
data is migrated to it.

Volume migrations resulting from retype can be controlled by passing a
policy, which can be either:
1. 'never': Never migrate (the retype fails if migration is required)
   (default)
2. 'on-demand': Migrate when necessary

This version will cause retype operations to fail if the current and
new volume types have different:
1. QoS settings that are enforced by the front-end for in-use volumes.
2. encryption settings.

Subsequent patches can address these cases.

DocImpact

Change-Id: I2dc99b4fa64d611d2bb936fc3890ca334e08bb55
Implements: blueprint volume-retype
This commit is contained in:
Avishay Traeger 2013-09-03 16:06:27 +03:00
parent 24d7e24413
commit 3fd7857a36
26 changed files with 1073 additions and 62 deletions

View File

@ -314,6 +314,21 @@ class VolumeActionsController(wsgi.Controller):
self.volume_api.update_readonly_flag(context, volume, readonly_flag)
return webob.Response(status_int=202)
@wsgi.action('os-retype')
def _retype(self, req, id, body):
"""Change type of existing volume."""
context = req.environ['cinder.context']
volume = self.volume_api.get(context, id)
try:
new_type = body['os-retype']['new_type']
except KeyError:
msg = _("New volume type must be specified.")
raise webob.exc.HTTPBadRequest(explanation=msg)
policy = body['os-retype'].get('migration_policy')
self.volume_api.retype(context, volume, new_type, policy)
return webob.Response(status_int=202)
class Volume_actions(extensions.ExtensionDescriptor):
"""Enable volume actions

View File

@ -561,6 +561,10 @@ class KeyManagerError(CinderException):
msg_fmt = _("key manager error: %(reason)s")
class VolumeRetypeFailed(CinderException):
message = _("Volume retype failed: %(reason)s")
# Driver specific exceptions
# Coraid
class CoraidException(VolumeDriverException):

64
cinder/quota_utils.py Normal file
View File

@ -0,0 +1,64 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import quota
LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
def get_volume_type_reservation(ctxt, volume, type_id):
# Reserve quotas for the given volume type
try:
reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
QUOTAS.add_volume_type_opts(ctxt,
reserve_opts,
type_id)
reservations = QUOTAS.reserve(ctxt, **reserve_opts)
except exception.OverQuota as e:
overs = e.kwargs['overs']
usages = e.kwargs['usages']
quotas = e.kwargs['quotas']
def _consumed(name):
return (usages[name]['reserved'] + usages[name]['in_use'])
for over in overs:
if 'gigabytes' in over:
s_size = volume['size']
d_quota = quotas[over]
d_consumed = _consumed(over)
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"%(s_size)sG volume - (%(d_consumed)dG of "
"%(d_quota)dG already consumed)")
LOG.warn(msg % {'s_pid': ctxt.project_id,
's_size': s_size,
'd_consumed': d_consumed,
'd_quota': d_quota})
raise exception.VolumeSizeExceedsAvailableQuota(
requested=s_size, quota=d_quota, consumed=d_consumed)
elif 'volumes' in over:
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"volume (%(d_consumed)d volumes "
"already consumed)")
LOG.warn(msg % {'s_pid': ctxt.project_id,
'd_consumed': _consumed(over)})
raise exception.VolumeLimitExceeded(
allowed=quotas[over])
return reservations

View File

@ -54,6 +54,9 @@ class ChanceScheduler(driver.Scheduler):
return self._filter_hosts(request_spec, hosts, **kwargs)
def _choose_host_from_list(self, hosts):
return hosts[int(random.random() * len(hosts))]
def _schedule(self, context, topic, request_spec, **kwargs):
"""Picks a host that is up at random."""
hosts = self._get_weighted_candidates(context, topic,
@ -61,7 +64,7 @@ class ChanceScheduler(driver.Scheduler):
if not hosts:
msg = _("Could not find another host")
raise exception.NoValidHost(reason=msg)
return hosts[int(random.random() * len(hosts))]
return self._choose_host_from_list(hosts)
def schedule_create_volume(self, context, request_spec, filter_properties):
"""Picks a host that is up at random."""
@ -98,3 +101,48 @@ class ChanceScheduler(driver.Scheduler):
msg = (_('cannot place volume %(id)s on %(host)s')
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg)
def find_retype_host(self, context, request_spec, filter_properties,
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
current_host = request_spec['volume_properties']['host']
# The volume already exists on this host, and so we shouldn't check if
# it can accept the volume again.
filter_properties['vol_exists_on'] = current_host
weighed_hosts = self._get_weighted_candidates(
context,
CONF.volume_topic,
request_spec,
filter_properties=filter_properties)
if not weighed_hosts:
msg = (_('No valid hosts for volume %(id)s with type %(type)s')
% {'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
raise exception.NoValidHost(reason=msg)
target_host = None
for weighed_host in weighed_hosts:
if weighed_host == current_host:
target_host = current_host
if migration_policy == 'never' and target_host is None:
msg = (_('Current host not valid for volume %(id)s with type '
'%(type)s, migration not allowed')
% {'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
raise exception.NoValidHost(reason=msg)
if not target_host:
target_host = self._choose_host_from_list(weighed_hosts)
elevated = context.elevated()
host_states = self.host_manager.get_all_host_states(elevated)
for host_state in host_states:
if host_state.host == target_host:
return (host_state, migration_policy)
# NOTE(avishay):We should never get here, but raise just in case
msg = (_('No host_state for selected host %s') % target_host)
raise exception.NoValidHost(reason=msg)

View File

@ -77,6 +77,11 @@ class Scheduler(object):
"""Check if the specified host passes the filters."""
raise NotImplementedError(_("Must implement host_passes_filters"))
def find_retype_host(self, context, request_spec, filter_properties={},
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
raise NotImplementedError(_("Must implement find_retype_host"))
def schedule(self, context, topic, method, *_args, **_kwargs):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))

View File

@ -99,6 +99,38 @@ class FilterScheduler(driver.Scheduler):
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg)
def find_retype_host(self, context, request_spec, filter_properties={},
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
current_host = request_spec['volume_properties']['host']
# The volume already exists on this host, and so we shouldn't check if
# it can accept the volume again in the CapacityFilter.
filter_properties['vol_exists_on'] = current_host
weighed_hosts = self._get_weighted_candidates(context, request_spec,
filter_properties)
if not weighed_hosts:
msg = (_('No valid hosts for volume %(id)s with type %(type)s')
% {'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
raise exception.NoValidHost(reason=msg)
for weighed_host in weighed_hosts:
host_state = weighed_host.obj
if host_state.host == current_host:
return host_state
if migration_policy == 'never':
msg = (_('Current host not valid for volume %(id)s with type '
'%(type)s, migration not allowed')
% {'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
raise exception.NoValidHost(reason=msg)
top_host = self._choose_top_host(weighed_hosts, request_spec)
return top_host.obj
def _post_select_populate_filter_properties(self, filter_properties,
host_state):
"""Add additional information to the filter properties after a host has
@ -236,8 +268,12 @@ class FilterScheduler(driver.Scheduler):
filter_properties)
if not weighed_hosts:
return None
best_host = weighed_hosts[0]
LOG.debug(_("Choosing %s") % best_host)
return self._choose_top_host(weighed_hosts, request_spec)
def _choose_top_host(self, weighed_hosts, request_spec):
top_host = weighed_hosts[0]
host_state = top_host.obj
LOG.debug(_("Choosing %s") % host_state.host)
volume_properties = request_spec['volume_properties']
best_host.obj.consume_from_volume(volume_properties)
return best_host
host_state.consume_from_volume(volume_properties)
return top_host

View File

@ -30,6 +30,12 @@ class CapacityFilter(filters.BaseHostFilter):
def host_passes(self, host_state, filter_properties):
"""Return True if host has sufficient capacity."""
# If the volume already exists on this host, don't fail it for
# insufficient capacity (e.g., if we are retyping)
if host_state.host == filter_properties.get('vol_exists_on'):
return True
volume_size = filter_properties.get('size')
if host_state.free_capacity_gb is None:

View File

@ -29,6 +29,7 @@ from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import api as notifier
from cinder import quota
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
@ -41,13 +42,15 @@ scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
CONF = cfg.CONF
CONF.register_opt(scheduler_driver_opt)
QUOTAS = quota.QUOTAS
LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '1.4'
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
@ -100,37 +103,95 @@ class SchedulerManager(manager.Manager):
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
def _migrate_volume_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'migration_status': None}}
self._set_volume_state_and_notify('migrate_volume_to_host',
volume_state,
context, ex, request_spec)
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
def _migrate_volume_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'migration_status': None}}
self._set_volume_state_and_notify('migrate_volume_to_host',
volume_state,
context, ex, request_spec)
try:
tgt_host = self.driver.host_passes_filters(context, host,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
self._migrate_volume_set_error(context, ex, request_spec)
_migrate_volume_set_error(self, context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
self._migrate_volume_set_error(context, ex, request_spec)
_migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_ref = db.volume_get(context, volume_id)
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
tgt_host,
force_host_copy)
def _set_volume_state_and_notify(self, method, updates, context, ex,
request_spec):
# TODO(harlowja): move into a task that just does this later.
def retype(self, context, topic, volume_id,
request_spec, filter_properties=None):
"""Schedule the modification of a volume's type.
LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
{'method': method, 'ex': ex})
:param context: the request context
:param topic: the topic listened on
:param volume_id: the ID of the volume to retype
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
"""
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
QUOTAS.rollback(context, reservations)
if (volume_ref['instance_uuid'] is None and
volume_ref['attached_host'] is None):
orig_status = 'available'
else:
orig_status = 'in-use'
volume_state = {'volume_state': {'status': orig_status}}
self._set_volume_state_and_notify('retype', volume_state,
context, ex, request_spec, msg)
volume_ref = db.volume_get(context, volume_id)
reservations = request_spec.get('quota_reservations')
new_type = request_spec.get('volume_type')
if new_type is None:
msg = _('New volume type not specified in request_spec.')
ex = exception.ParameterNotFound(param='volume_type')
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
# Default migration policy is 'never'
migration_policy = request_spec.get('migration_policy')
if not migration_policy:
migration_policy = 'never'
try:
tgt_host = self.driver.find_retype_host(context, request_spec,
filter_properties,
migration_policy)
except exception.NoValidHost as ex:
msg = (_("Could not find a host for volume %(volume_id)s with "
"type %(type_id)s.") %
{'type_id': new_type['id'], 'volume_id': volume_id})
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
except Exception as ex:
with excutils.save_and_reraise_exception():
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, None, reservations)
else:
volume_rpcapi.VolumeAPI().retype(context, volume_ref,
new_type['id'], tgt_host,
migration_policy, reservations)
def _set_volume_state_and_notify(self, method, updates, context, ex,
request_spec, msg=None):
# TODO(harlowja): move into a task that just does this later.
if not msg:
msg = (_("Failed to schedule_%(method)s: %(ex)s") %
{'method': method, 'ex': ex})
LOG.error(msg)
volume_state = updates['volume_state']
properties = request_spec.get('volume_properties', {})

View File

@ -35,6 +35,7 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
1.2 - Add request_spec, filter_properties arguments
to create_volume()
1.3 - Add migrate_volume_to_host() method
1.4 - Add retype method
'''
RPC_API_VERSION = '1.0'
@ -72,6 +73,17 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
filter_properties=filter_properties),
version='1.3')
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
request_spec_p = jsonutils.to_primitive(request_spec)
return self.cast(ctxt, self.make_msg(
'retype',
topic=topic,
volume_id=volume_id,
request_spec=request_spec_p,
filter_properties=filter_properties),
version='1.4')
def update_service_capabilities(self, ctxt,
service_name, host,
capabilities):

View File

@ -24,7 +24,6 @@ from oslo.config import cfg
from cinder import db
from cinder import exception
from cinder.scheduler import chance
from cinder.scheduler import driver
from cinder import utils
@ -40,7 +39,7 @@ CONF.register_opts(simple_scheduler_opts)
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
def schedule_create_volume(self, context, request_spec, filter_properties):
def _get_weighted_candidates(self, context, topic, request_spec, **kwargs):
"""Picks a host that is up and has the fewest volumes."""
elevated = context.elevated()
@ -50,39 +49,35 @@ class SimpleScheduler(chance.ChanceScheduler):
volume_properties = request_spec.get('volume_properties')
volume_size = volume_properties.get('size')
availability_zone = volume_properties.get('availability_zone')
filter_properties = kwargs.get('filter_properties', {})
zone, host = None, None
if availability_zone:
zone, _x, host = availability_zone.partition(':')
if host and context.is_admin:
topic = CONF.volume_topic
service = db.service_get_by_args(elevated, host, topic)
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
updated_volume = driver.volume_update_db(context, volume_id, host)
self.volume_rpcapi.create_volume(context, updated_volume, host,
request_spec, filter_properties,
snapshot_id=snapshot_id,
image_id=image_id)
return None
return [host]
candidates = []
results = db.service_get_all_volume_sorted(elevated)
if zone:
results = [(s, gigs) for (s, gigs) in results
if s['availability_zone'] == zone]
for result in results:
(service, volume_gigabytes) = result
if volume_gigabytes + volume_size > CONF.max_gigabytes:
msg = _("Not enough allocatable volume gigabytes remaining")
raise exception.NoValidHost(reason=msg)
no_skip = service['host'] != filter_properties.get('vol_exists_on')
if no_skip and volume_gigabytes + volume_size > CONF.max_gigabytes:
continue
if utils.service_is_up(service) and not service['disabled']:
updated_volume = driver.volume_update_db(context, volume_id,
service['host'])
self.volume_rpcapi.create_volume(context, updated_volume,
service['host'], request_spec,
filter_properties,
snapshot_id=snapshot_id,
image_id=image_id)
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
candidates.append(service['host'])
if candidates:
return candidates
else:
msg = _("No service with adequate space or no service running")
raise exception.NoValidHost(reason=msg)
def _choose_host_from_list(self, hosts):
return hosts[0]

View File

@ -30,14 +30,6 @@ from cinder import volume
from cinder.volume import api as volume_api
def fake_volume_api(*args, **kwargs):
return True
def fake_volume_get(*args, **kwargs):
return {'id': 'fake', 'host': 'fake'}
class VolumeActionsTest(test.TestCase):
_actions = ('os-detach', 'os-reserve', 'os-unreserve')
@ -46,12 +38,30 @@ class VolumeActionsTest(test.TestCase):
def setUp(self):
super(VolumeActionsTest, self).setUp()
self.stubs.Set(volume.API, 'get', fake_volume_api)
self.UUID = uuid.uuid4()
for _method in self._methods:
self.stubs.Set(volume.API, _method, fake_volume_api)
self.api_patchers = {}
for _meth in self._methods:
self.api_patchers[_meth] = mock.patch('cinder.volume.API.' + _meth)
self.api_patchers[_meth].start()
self.api_patchers[_meth].return_value = True
self.stubs.Set(volume.API, 'get', fake_volume_get)
vol = {'id': 'fake', 'host': 'fake', 'status': 'available', 'size': 1,
'migration_status': None, 'volume_type_id': 'fake'}
self.get_patcher = mock.patch('cinder.volume.API.get')
self.mock_volume_get = self.get_patcher.start()
self.mock_volume_get.return_value = vol
self.update_patcher = mock.patch('cinder.volume.API.update')
self.mock_volume_update = self.update_patcher.start()
self.mock_volume_update.return_value = vol
self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')
def tearDown(self):
for patcher in self.api_patchers:
self.api_patchers[patcher].stop()
self.update_patcher.stop()
self.get_patcher.stop()
super(VolumeActionsTest, self).tearDown()
def test_simple_api_actions(self):
app = fakes.wsgi_app()
@ -282,6 +292,124 @@ class VolumeActionsTest(test.TestCase):
make_update_readonly_flag_test(self, None, 400)
class VolumeRetypeActionsTest(VolumeActionsTest):
def setUp(self):
def get_vol_type(*args, **kwargs):
d1 = {'id': 'fake', 'qos_specs_id': 'fakeqid1', 'extra_specs': {}}
d2 = {'id': 'foo', 'qos_specs_id': 'fakeqid2', 'extra_specs': {}}
return d1 if d1['id'] == args[1] else d2
self.retype_patchers = {}
self.retype_mocks = {}
paths = ['cinder.volume.volume_types.get_volume_type',
'cinder.volume.volume_types.get_volume_type_by_name',
'cinder.volume.qos_specs.get_qos_specs',
'cinder.quota.QUOTAS.add_volume_type_opts',
'cinder.quota.QUOTAS.reserve']
for path in paths:
name = path.split('.')[-1]
self.retype_patchers[name] = mock.patch(path)
self.retype_mocks[name] = self.retype_patchers[name].start()
self.retype_mocks['get_volume_type'].side_effect = get_vol_type
self.retype_mocks['get_volume_type_by_name'].side_effect = get_vol_type
self.retype_mocks['add_volume_type_opts'].return_value = None
self.retype_mocks['reserve'].return_value = None
super(VolumeRetypeActionsTest, self).setUp()
def tearDown(self):
for name, patcher in self.retype_patchers.iteritems():
patcher.stop()
super(VolumeRetypeActionsTest, self).tearDown()
def _retype_volume_exec(self, expected_status, new_type='foo'):
req = webob.Request.blank('/v2/fake/volumes/1/action')
req.method = 'POST'
req.headers['content-type'] = 'application/json'
retype_body = {'new_type': new_type, 'migration_policy': 'never'}
req.body = jsonutils.dumps({'os-retype': retype_body})
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, expected_status)
@mock.patch('cinder.volume.qos_specs.get_qos_specs')
def test_retype_volume_success(self, _mock_get_qspecs):
# Test that the retype API works for both available and in-use
self._retype_volume_exec(202)
self.mock_volume_get.return_value['status'] = 'in-use'
specs = {'qos_specs': {'id': 'fakeqid1', 'consumer': 'back-end'}}
_mock_get_qspecs.return_value = specs
self._retype_volume_exec(202)
def test_retype_volume_no_body(self):
# Request with no body should fail
req = webob.Request.blank('/v2/fake/volumes/1/action')
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.body = jsonutils.dumps({'os-retype': None})
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_retype_volume_bad_policy(self):
# Request with invalid migration policy should fail
req = webob.Request.blank('/v2/fake/volumes/1/action')
req.method = 'POST'
req.headers['content-type'] = 'application/json'
retype_body = {'new_type': 'foo', 'migration_policy': 'invalid'}
req.body = jsonutils.dumps({'os-retype': retype_body})
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_retype_volume_bad_status(self):
# Should fail if volume does not have proper status
self.mock_volume_get.return_value['status'] = 'error'
self._retype_volume_exec(400)
def test_retype_type_no_exist(self):
# Should fail if new type does not exist
exc = exception.VolumeTypeNotFound('exc')
self.retype_mocks['get_volume_type'].side_effect = exc
self._retype_volume_exec(404)
def test_retype_same_type(self):
# Should fail if new type and old type are the same
self._retype_volume_exec(400, new_type='fake')
def test_retype_over_quota(self):
# Should fail if going over quota for new type
exc = exception.OverQuota(overs=['gigabytes'],
quotas={'gigabytes': 20},
usages={'gigabytes': {'reserved': 5,
'in_use': 15}})
self.retype_mocks['reserve'].side_effect = exc
self._retype_volume_exec(413)
@mock.patch('cinder.volume.qos_specs.get_qos_specs')
def _retype_volume_diff_qos(self, vol_status, consumer, expected_status,
_mock_get_qspecs):
def fake_get_qos(ctxt, qos_id):
d1 = {'qos_specs': {'id': 'fakeqid1', 'consumer': consumer}}
d2 = {'qos_specs': {'id': 'fakeqid2', 'consumer': consumer}}
return d1 if d1['qos_specs']['id'] == qos_id else d2
self.mock_volume_get.return_value['status'] = vol_status
_mock_get_qspecs.side_effect = fake_get_qos
self._retype_volume_exec(expected_status)
def test_retype_volume_diff_qos_fe_in_use(self):
# should fail if changing qos enforced by front-end for in-use volumes
self._retype_volume_diff_qos('in-use', 'front-end', 400)
def test_retype_volume_diff_qos_fe_available(self):
# should NOT fail if changing qos enforced by FE for available volumes
self._retype_volume_diff_qos('available', 'front-end', 202)
def test_retype_volume_diff_qos_be(self):
# should NOT fail if changing qos enforced by back-end
self._retype_volume_diff_qos('available', 'back-end', 202)
self._retype_volume_diff_qos('in-use', 'back-end', 202)
def stub_volume_get(self, context, volume_id):
volume = stubs.stub_volume(volume_id)
if volume_id == 5:

View File

@ -33,6 +33,7 @@
"volume:migrate_volume": [["rule:admin_api"]],
"volume:migrate_volume_completion": [["rule:admin_api"]],
"volume:update_readonly_flag": [],
"volume:retype": [],
"volume_extension:volume_admin_actions:reset_status": [["rule:admin_api"]],
"volume_extension:snapshot_admin_actions:reset_status": [["rule:admin_api"]],

View File

@ -36,21 +36,25 @@ class FakeHostManager(host_manager.HostManager):
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'reserved_percentage': 10,
'volume_backend_name': 'lvm1',
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'reserved_percentage': 10,
'volume_backend_name': 'lvm2',
'timestamp': None},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'reserved_percentage': 0,
'volume_backend_name': 'lvm3',
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'reserved_percentage': 5,
'volume_backend_name': 'lvm4',
'timestamp': None},
}

View File

@ -228,3 +228,75 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
sched.host_passes_filters,
ctx, 'host1', request_spec, {})
self.assertTrue(_mock_service_get_topic.called)
@mock.patch('cinder.db.service_get_all_by_topic')
def test_retype_policy_never_migrate_pass(self, _mock_service_get_topic):
# Retype should pass if current host passes filters and
# policy=never. host4 doesn't have enough space to hold an additional
# 200GB, but it is already the host of this volume and should not be
# counted twice.
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
extra_specs = {'volume_backend_name': 'lvm4'}
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI',
'extra_specs': extra_specs},
'volume_properties': {'project_id': 1,
'size': 200,
'host': 'host4'}}
host_state = sched.find_retype_host(ctx, request_spec,
filter_properties={},
migration_policy='never')
self.assertEqual(host_state.host, 'host4')
@mock.patch('cinder.db.service_get_all_by_topic')
def test_retype_policy_never_migrate_fail(self, _mock_service_get_topic):
# Retype should fail if current host doesn't pass filters and
# policy=never.
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
extra_specs = {'volume_backend_name': 'lvm1'}
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI',
'extra_specs': extra_specs},
'volume_properties': {'project_id': 1,
'size': 200,
'host': 'host4'}}
self.assertRaises(exception.NoValidHost, sched.find_retype_host, ctx,
request_spec, filter_properties={},
migration_policy='never')
@mock.patch('cinder.db.service_get_all_by_topic')
def test_retype_policy_demand_migrate_pass(self, _mock_service_get_topic):
# Retype should pass if current host fails filters but another host
# is suitable when policy=on-demand.
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
extra_specs = {'volume_backend_name': 'lvm1'}
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI',
'extra_specs': extra_specs},
'volume_properties': {'project_id': 1,
'size': 200,
'host': 'host4'}}
host_state = sched.find_retype_host(ctx, request_spec,
filter_properties={},
migration_policy='on-demand')
self.assertEqual(host_state.host, 'host1')
@mock.patch('cinder.db.service_get_all_by_topic')
def test_retype_policy_demand_migrate_fail(self, _mock_service_get_topic):
# Retype should fail if current host doesn't pass filters and
# no other suitable candidates exist even if policy=on-demand.
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
extra_specs = {'volume_backend_name': 'lvm1'}
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI',
'extra_specs': extra_specs},
'volume_properties': {'project_id': 1,
'size': 2048,
'host': 'host4'}}
self.assertRaises(exception.NoValidHost, sched.find_retype_host, ctx,
request_spec, filter_properties={},
migration_policy='on-demand')

View File

@ -98,3 +98,14 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.3')
@mock.patch('cinder.openstack.common.rpc.cast')
def test_retype(self, _mock_rpc_method):
self._test_scheduler_api('retype',
rpc_method='cast',
_mock_method=_mock_rpc_method,
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.4')

View File

@ -155,6 +155,39 @@ class SchedulerManagerTestCase(test.TestCase):
self.assertEqual(CONF.scheduler_default_weighers,
['AllocatedCapacityWeigher'])
@mock.patch('cinder.db.volume_update')
@mock.patch('cinder.db.volume_get')
def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get,
_mock_vol_update):
# Test NoValidHost exception behavior for retype.
# Puts the volume in original state and eats the exception.
fake_volume_id = 1
topic = 'fake_topic'
volume_id = fake_volume_id
request_spec = {'volume_id': fake_volume_id, 'volume_type': {'id': 3},
'migration_policy': 'on-demand'}
vol_info = {'id': fake_volume_id, 'status': 'in-use',
'instance_uuid': 'foo', 'attached_host': None}
_mock_vol_get.return_value = vol_info
_mock_vol_update.return_value = {'status': 'in-use'}
_mock_find_retype_host = mock.Mock(
side_effect=exception.NoValidHost(reason=""))
orig_retype = self.manager.driver.find_retype_host
self.manager.driver.find_retype_host = _mock_find_retype_host
self.manager.retype(self.context, topic, volume_id,
request_spec=request_spec,
filter_properties={})
_mock_vol_get.assert_called_once_with(self.context, fake_volume_id)
_mock_find_retype_host.assert_called_once_with(self.context,
request_spec, {},
'on-demand')
_mock_vol_update.assert_called_once_with(self.context, fake_volume_id,
{'status': 'in-use'})
self.manager.driver.find_retype_host = orig_retype
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class."""

View File

@ -61,6 +61,7 @@ from cinder.volume import driver
from cinder.volume.drivers import lvm
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
from cinder.volume import volume_types
QUOTAS = quota.QUOTAS
@ -2002,7 +2003,8 @@ class VolumeTestCase(BaseVolumeTestCase):
"""Test volume migration done by driver."""
# stub out driver and rpc functions
self.stubs.Set(self.volume.driver, 'migrate_volume',
lambda x, y, z: (True, {'user_id': 'foo'}))
lambda x, y, z, new_type_id=None: (True,
{'user_id': 'foo'}))
volume = tests_utils.create_volume(self.context, size=0,
host=CONF.host,
@ -2045,6 +2047,92 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertEqual(volume['host'], 'newhost')
self.assertIsNone(volume['migration_status'])
def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
migrate_exc=False, exc=None, diff_equal=False):
elevated = context.get_admin_context()
project_id = self.context.project_id
db.volume_type_create(elevated, {'name': 'old', 'extra_specs': {}})
old_vol_type = db.volume_type_get_by_name(elevated, 'old')
db.volume_type_create(elevated, {'name': 'new', 'extra_specs': {}})
vol_type = db.volume_type_get_by_name(elevated, 'new')
db.quota_create(elevated, project_id, 'volumes_new', 10)
volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host, status='retyping',
volume_type_id=old_vol_type['id'])
if snap:
self._create_snapshot(volume['id'], size=volume['size'])
host_obj = {'host': 'newhost', 'capabilities': {}}
reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
QUOTAS.add_volume_type_opts(self.context,
reserve_opts,
vol_type['id'])
reservations = QUOTAS.reserve(self.context,
project_id=project_id,
**reserve_opts)
with mock.patch.object(self.volume.driver, 'retype') as _retype:
with mock.patch.object(volume_types, 'volume_types_diff') as _diff:
with mock.patch.object(self.volume, 'migrate_volume') as _mig:
_retype.return_value = driver
_diff.return_value = ({}, diff_equal)
if migrate_exc:
_mig.side_effect = KeyError
else:
_mig.return_value = True
if not exc:
self.volume.retype(self.context, volume['id'],
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
else:
self.assertRaises(exc, self.volume.retype,
self.context, volume['id'],
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
# get volume/quota properties
volume = db.volume_get(elevated, volume['id'])
try:
usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
volumes_in_use = usage.in_use
except exception.QuotaUsageNotFound:
volumes_in_use = 0
# check properties
if not exc:
self.assertEqual(volume['volume_type_id'], vol_type['id'])
self.assertEqual(volume['status'], 'available')
self.assertEqual(volumes_in_use, 1)
else:
self.assertEqual(volume['volume_type_id'], old_vol_type['id'])
self.assertEqual(volume['status'], 'available')
self.assertEqual(volumes_in_use, 0)
def test_retype_volume_driver_success(self):
self._retype_volume_exec(True)
def test_retype_volume_migration_bad_policy(self):
# Test volume retype that requires migration by not allowed
self._retype_volume_exec(False, policy='never',
exc=exception.VolumeMigrationFailed)
def test_retype_volume_migration_with_snaps(self):
self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)
def test_retype_volume_migration_failed(self):
self._retype_volume_exec(False, migrate_exc=True, exc=KeyError)
def test_retype_volume_migration_success(self):
self._retype_volume_exec(False, migrate_exc=False, exc=None)
def test_retype_volume_migration_equal_types(self):
self._retype_volume_exec(False, diff_equal=True)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
# create a volume and assign to host

View File

@ -251,3 +251,18 @@ class VolumeRpcAPITestCase(test.TestCase):
new_volume=self.fake_volume,
error=False,
version='1.10')
def test_retype(self):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
volume=self.fake_volume,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.12')

View File

@ -237,3 +237,68 @@ class VolumeTypeTestCase(test.TestCase):
'k3': 'v3'}}}
res = volume_types.get_volume_type_qos_specs(type_ref['id'])
self.assertDictMatch(expected, res)
def test_volume_types_diff(self):
#type_ref 1 and 2 have the same extra_specs, while 3 has different
keyvals1 = {"key1": "val1", "key2": "val2"}
keyvals2 = {"key1": "val0", "key2": "val2"}
type_ref1 = volume_types.create(self.ctxt, "type1", keyvals1)
type_ref2 = volume_types.create(self.ctxt, "type2", keyvals1)
type_ref3 = volume_types.create(self.ctxt, "type3", keyvals2)
# Check equality with only extra_specs
diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
type_ref2['id'])
self.assertEqual(same, True)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
type_ref3['id'])
self.assertEqual(same, False)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val0'))
#qos_ref 1 and 2 have the same specs, while 3 has different
qos_keyvals1 = {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}
qos_keyvals2 = {'k1': 'v0', 'k2': 'v2', 'k3': 'v3'}
qos_ref1 = qos_specs.create(self.ctxt, 'qos-specs-1', qos_keyvals1)
qos_ref2 = qos_specs.create(self.ctxt, 'qos-specs-2', qos_keyvals1)
qos_ref3 = qos_specs.create(self.ctxt, 'qos-specs-3', qos_keyvals2)
# Check equality with qos specs too
qos_specs.associate_qos_with_type(self.ctxt, qos_ref1['id'],
type_ref1['id'])
qos_specs.associate_qos_with_type(self.ctxt, qos_ref2['id'],
type_ref2['id'])
diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
type_ref2['id'])
self.assertEqual(same, True)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v1'))
qos_specs.disassociate_qos_specs(self.ctxt, qos_ref2['id'],
type_ref2['id'])
qos_specs.associate_qos_with_type(self.ctxt, qos_ref3['id'],
type_ref2['id'])
diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
type_ref2['id'])
self.assertEqual(same, False)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v0'))
qos_specs.disassociate_qos_specs(self.ctxt, qos_ref3['id'],
type_ref2['id'])
qos_specs.associate_qos_with_type(self.ctxt, qos_ref2['id'],
type_ref2['id'])
# And add encryption for good measure
enc_keyvals1 = {'cipher': 'c1', 'key_size': 256, 'provider': 'p1',
'control_location': 'front-end'}
enc_keyvals2 = {'cipher': 'c1', 'key_size': 128, 'provider': 'p1',
'control_location': 'front-end'}
db.volume_type_encryption_update_or_create(self.ctxt, type_ref1['id'],
enc_keyvals1)
db.volume_type_encryption_update_or_create(self.ctxt, type_ref2['id'],
enc_keyvals2)
diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
type_ref2['id'])
self.assertEqual(same, False)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v1'))
self.assertEqual(diff['encryption']['key_size'], (256, 128))

View File

@ -30,6 +30,7 @@ def create_volume(ctxt,
migration_status=None,
size=1,
availability_zone='fake_az',
volume_type_id=None,
**kwargs):
"""Create a volume object in the DB."""
vol = {}
@ -43,6 +44,8 @@ def create_volume(ctxt,
vol['display_description'] = display_description
vol['attach_status'] = 'detached'
vol['availability_zone'] = availability_zone
if volume_type_id:
vol['volume_type_id'] = volume_type_id
for key in kwargs:
vol[key] = kwargs[key]
return db.volume_create(ctxt, vol)

View File

@ -32,11 +32,14 @@ from cinder import keymgr
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
from cinder.openstack.common import uuidutils
import cinder.policy
from cinder import quota
from cinder import quota_utils
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils
from cinder.volume.flows import create_volume
from cinder.volume import qos_specs
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
@ -852,6 +855,96 @@ class API(base.Base):
self.update_volume_admin_metadata(context.elevated(), volume,
{'readonly': str(flag)})
@wrap_check_policy
def retype(self, context, volume, new_type, migration_policy=None):
"""Attempt to modify the type associated with an existing volume."""
if volume['status'] not in ['available', 'in-use']:
msg = _('Unable to update type due to incorrect status '
'on volume: %s') % volume['id']
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if volume['migration_status'] is not None:
msg = (_("Volume %s is already part of an active migration.")
% volume['id'])
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if migration_policy and migration_policy not in ['on-demand', 'never']:
msg = _('migration_policy must be \'on-demand\' or \'never\', '
'passed: %s') % str(new_type)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
# Support specifying volume type by ID or name
try:
if uuidutils.is_uuid_like(new_type):
vol_type = volume_types.get_volume_type(context, new_type)
else:
vol_type = volume_types.get_volume_type_by_name(context,
new_type)
except exception.InvalidVolumeType:
msg = _('Invalid volume_type passed: %s') % str(new_type)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
vol_type_id = vol_type['id']
vol_type_qos_id = vol_type['qos_specs_id']
old_vol_type = None
old_vol_type_id = volume['volume_type_id']
old_vol_type_qos_id = None
# Error if the original and new type are the same
if volume['volume_type_id'] == vol_type_id:
msg = _('New volume_type same as original: %s') % str(new_type)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
if volume['volume_type_id']:
old_vol_type = volume_types.get_volume_type(
context, old_vol_type_id)
old_vol_type_qos_id = old_vol_type['qos_specs_id']
# We don't support changing encryption requirements yet
old_enc = volume_types.get_volume_type_encryption(context,
old_vol_type_id)
new_enc = volume_types.get_volume_type_encryption(context,
vol_type_id)
if old_enc != new_enc:
msg = _('Retype cannot change encryption requirements')
raise exception.InvalidInput(reason=msg)
# We don't support changing QoS at the front-end yet for in-use volumes
# TODO(avishay): Call Nova to change QoS setting (libvirt has support
# - virDomainSetBlockIoTune() - Nova does not have support yet).
if (volume['status'] != 'available' and
old_vol_type_qos_id != vol_type_qos_id):
for qos_id in [old_vol_type_qos_id, vol_type_qos_id]:
if qos_id:
specs = qos_specs.get_qos_specs(context.elevated(), qos_id)
if specs['qos_specs']['consumer'] != 'back-end':
msg = _('Retype cannot change front-end qos specs for '
'in-use volumes')
raise exception.InvalidInput(reason=msg)
self.update(context, volume, {'status': 'retyping'})
# We're checking here in so that we can report any quota issues as
# early as possible, but won't commit until we change the type. We
# pass the reservations onward in case we need to roll back.
reservations = quota_utils.get_volume_type_reservation(context, volume,
vol_type_id)
request_spec = {'volume_properties': volume,
'volume_id': volume['id'],
'volume_type': vol_type,
'migration_policy': migration_policy,
'quota_reservations': reservations}
self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume['id'],
request_spec=request_spec,
filter_properties={})
class HostAPI(base.Base):
def __init__(self):

View File

@ -479,9 +479,30 @@ class VolumeDriver(object):
Returns a boolean indicating whether the migration occurred, as well as
model_update.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
:param host: A dictionary describing the host to migrate to, where
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities.
"""
return (False, None)
def retype(self, context, volume, new_type, diff, host):
"""Convert the volume to be of the new type.
Returns a boolean indicating whether the retype occurred.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
:param new_type: A dictionary describing the volume type to convert to
:param diff: A dictionary with the difference between the two types
:param host: A dictionary describing the host to migrate to, where
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities.
"""
return False
class ISCSIDriver(VolumeDriver):
"""Executes commands relating to ISCSI volumes.

View File

@ -180,7 +180,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.11'
RPC_API_VERSION = '1.12'
def __init__(self, volume_driver=None, service_name=None,
*args, **kwargs):
@ -773,7 +773,7 @@ class VolumeManager(manager.SchedulerDependentManager):
volume_ref = self.db.volume_get(context.elevated(), volume_id)
self.driver.accept_transfer(context, volume_ref, new_user, new_project)
def _migrate_volume_generic(self, ctxt, volume, host):
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
rpcapi = volume_rpcapi.VolumeAPI()
# Create new volume on remote host
@ -785,6 +785,8 @@ class VolumeManager(manager.SchedulerDependentManager):
# We don't copy volume_type because the db sets that according to
# volume_type_id, which we do copy
del new_vol_values['volume_type']
if new_type_id:
new_vol_values['volume_type_id'] = new_type_id
new_vol_values['host'] = host['host']
new_vol_values['status'] = 'creating'
new_vol_values['migration_status'] = 'target:%s' % volume['id']
@ -813,7 +815,8 @@ class VolumeManager(manager.SchedulerDependentManager):
# Copy the source volume to the destination volume
try:
if volume['status'] == 'available':
if (volume['instance_uuid'] is None and
volume['attached_host'] is None):
self.driver.copy_volume_data(ctxt, volume, new_volume,
remote='dest')
# The above call is synchronous so we complete the migration
@ -837,6 +840,14 @@ class VolumeManager(manager.SchedulerDependentManager):
rpcapi.delete_volume(ctxt, new_volume)
new_volume['migration_status'] = None
def _get_original_status(self, volume):
if (volume['instance_uuid'] is None and
volume['attached_host'] is None):
return 'available'
else:
return 'in-use'
@utils.require_driver_initialized
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
error=False):
msg = _("migrate_volume_completion: completing migration for "
@ -846,6 +857,10 @@ class VolumeManager(manager.SchedulerDependentManager):
new_volume = self.db.volume_get(ctxt, new_volume_id)
rpcapi = volume_rpcapi.VolumeAPI()
status_update = None
if volume['status'] == 'retyping':
status_update = {'status': self._get_original_status(volume)}
if error:
msg = _("migrate_volume_completion is cleaning up an error "
"for volume %(vol1)s (temporary volume %(vol2)s")
@ -853,7 +868,10 @@ class VolumeManager(manager.SchedulerDependentManager):
'vol2': new_volume['id']})
new_volume['migration_status'] = None
rpcapi.delete_volume(ctxt, new_volume)
self.db.volume_update(ctxt, volume_id, {'migration_status': None})
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_id, updates)
return volume_id
self.db.volume_update(ctxt, volume_id,
@ -868,19 +886,27 @@ class VolumeManager(manager.SchedulerDependentManager):
self.db.finish_volume_migration(ctxt, volume_id, new_volume_id)
self.db.volume_destroy(ctxt, new_volume_id)
self.db.volume_update(ctxt, volume_id, {'migration_status': None})
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_id, updates)
return volume['id']
@utils.require_driver_initialized
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False):
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
new_type_id=None):
"""Migrate the volume to the specified host (called on source host)."""
volume_ref = self.db.volume_get(ctxt, volume_id)
model_update = None
moved = False
status_update = None
if volume_ref['status'] == 'retyping':
status_update = {'status': self._get_original_status(volume_ref)}
self.db.volume_update(ctxt, volume_ref['id'],
{'migration_status': 'migrating'})
if not force_host_copy:
if not force_host_copy and new_type_id is None:
try:
LOG.debug(_("volume %s: calling driver migrate_volume"),
volume_ref['id'])
@ -890,6 +916,8 @@ class VolumeManager(manager.SchedulerDependentManager):
if moved:
updates = {'host': host['host'],
'migration_status': None}
if status_update:
updates.update(status_update)
if model_update:
updates.update(model_update)
volume_ref = self.db.volume_update(ctxt,
@ -898,16 +926,21 @@ class VolumeManager(manager.SchedulerDependentManager):
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
if not moved:
try:
self._migrate_volume_generic(ctxt, volume_ref, host)
self._migrate_volume_generic(ctxt, volume_ref, host,
new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
@ -1013,3 +1046,101 @@ class VolumeManager(manager.SchedulerDependentManager):
self._notify_about_volume_usage(
context, volume, "resize.end",
extra_usage_info={'size': int(new_size)})
@utils.require_driver_initialized
def retype(self, ctxt, volume_id, new_type_id, host,
migration_policy='never', reservations=None):
def _retype_error(context, volume_id, old_reservations,
new_reservations, status_update):
try:
self.db.volume_update(context, volume_id, status_update)
finally:
QUOTAS.rollback(context, old_reservations)
QUOTAS.rollback(context, new_reservations)
context = ctxt.elevated()
volume_ref = self.db.volume_get(ctxt, volume_id)
status_update = {'status': self._get_original_status(volume_ref)}
if context.project_id != volume_ref['project_id']:
project_id = volume_ref['project_id']
else:
project_id = context.project_id
# Get old reservations
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume_ref.get('volume_type_id'))
old_reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
except Exception:
old_reservations = None
self.db.volume_update(context, volume_id, status_update)
LOG.exception(_("Failed to update usages while retyping volume."))
raise exception.CinderException(_("Failed to get old volume type"
" quota reservations"))
# We already got the new reservations
new_reservations = reservations
# If volume types have the same contents, no need to do anything
retyped = False
diff, all_equal = volume_types.volume_types_diff(
context, volume_ref.get('volume_type_id'), new_type_id)
if all_equal:
retyped = True
# Call driver to try and change the type
if not retyped:
try:
new_type = volume_types.get_volume_type(context, new_type_id)
retyped = self.driver.retype(context, volume_ref, new_type,
diff, host)
if retyped:
LOG.info(_("Volume %s: retyped succesfully"), volume_id)
except Exception:
retyped = False
LOG.info(_("Volume %s: driver error when trying to retype, "
"falling back to generic mechanism."),
volume_ref['id'])
# We could not change the type, so we need to migrate the volume, where
# the destination volume will be of the new type
if not retyped:
if migration_policy == 'never':
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
msg = _("Retype requires migration but is not allowed.")
raise exception.VolumeMigrationFailed(reason=msg)
snaps = self.db.snapshot_get_all_for_volume(context,
volume_ref['id'])
if snaps:
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self.db.volume_update(context, volume_ref['id'],
{'migration_status': 'starting'})
try:
self.migrate_volume(context, volume_id, host,
new_type_id=new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
self.db.volume_update(context, volume_id,
{'volume_type_id': new_type_id,
'status': status_update['status']})
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
if new_reservations:
QUOTAS.commit(context, new_reservations, project_id=project_id)
self.publish_service_capabilities(context)

View File

@ -46,6 +46,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
1.10 - Add migrate_volume_completion, remove rename_volume.
1.11 - Adds mode parameter to attach_volume()
to support volume read-only attaching.
1.12 - Adds retype.
'''
BASE_RPC_API_VERSION = '1.0'
@ -180,3 +181,17 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
topic=rpc.queue_get_for(ctxt, self.topic,
volume['host']),
version='1.10')
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
self.cast(ctxt,
self.make_msg('retype',
volume_id=volume['id'],
new_type_id=new_type_id,
host=host_p,
migration_policy=migration_policy,
reservations=reservations),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
version='1.12')

View File

@ -168,8 +168,92 @@ def is_encrypted(context, volume_type_id):
return encryption is not None
def get_volume_type_encryption(context, volume_type_id):
if volume_type_id is None:
return None
encryption = db.volume_type_encryption_get(context, volume_type_id)
return encryption
def get_volume_type_qos_specs(volume_type_id):
ctxt = context.get_admin_context()
res = db.volume_type_qos_specs_get(ctxt,
volume_type_id)
return res
def volume_types_diff(context, vol_type_id1, vol_type_id2):
"""Returns a 'diff' of two volume types and whether they are equal.
Returns a tuple of (diff, equal), where 'equal' is a boolean indicating
whether there is any difference, and 'diff' is a dictionary with the
following format:
{'extra_specs': {'key1': (value_in_1st_vol_type, value_in_2nd_vol_type),
'key2': (value_in_1st_vol_type, value_in_2nd_vol_type),
...}
'qos_specs': {'key1': (value_in_1st_vol_type, value_in_2nd_vol_type),
'key2': (value_in_1st_vol_type, value_in_2nd_vol_type),
...}
'encryption': {'cipher': (value_in_1st_vol_type, value_in_2nd_vol_type),
{'key_size': (value_in_1st_vol_type, value_in_2nd_vol_type),
...}
"""
def _fix_qos_specs(qos_specs):
if qos_specs:
qos_specs.pop('id', None)
qos_specs.pop('name', None)
qos_specs.update(qos_specs.pop('specs', {}))
def _fix_encryption_specs(encryption):
if encryption1:
encryption = dict(encryption)
for param in ['volume_type_id', 'created_at', 'updated_at',
'deleted_at']:
encryption.pop(param, None)
def _dict_diff(dict1, dict2):
res = {}
equal = True
if dict1 is None:
dict1 = {}
if dict2 is None:
dict2 = {}
for k, v in dict1.iteritems():
res[k] = (v, dict2.get(k))
if k not in dict2 or res[k][0] != res[k][1]:
equal = False
for k, v in dict2.iteritems():
res[k] = (dict1.get(k), v)
if k not in dict1 or res[k][0] != res[k][1]:
equal = False
return (res, equal)
all_equal = True
diff = {}
vol_type1 = get_volume_type(context, vol_type_id1)
vol_type2 = get_volume_type(context, vol_type_id2)
extra_specs1 = vol_type1.get('extra_specs')
extra_specs2 = vol_type2.get('extra_specs')
diff['extra_specs'], equal = _dict_diff(extra_specs1, extra_specs2)
if not equal:
all_equal = False
qos_specs1 = get_volume_type_qos_specs(vol_type_id1).get('qos_specs')
_fix_qos_specs(qos_specs1)
qos_specs2 = get_volume_type_qos_specs(vol_type_id2).get('qos_specs')
_fix_qos_specs(qos_specs2)
diff['qos_specs'], equal = _dict_diff(qos_specs1, qos_specs2)
if not equal:
all_equal = False
encryption1 = get_volume_type_encryption(context, vol_type_id1)
_fix_encryption_specs(encryption1)
encryption2 = get_volume_type_encryption(context, vol_type_id2)
_fix_encryption_specs(encryption2)
diff['encryption'], equal = _dict_diff(encryption1, encryption2)
if not equal:
all_equal = False
return (diff, all_equal)

View File

@ -15,6 +15,7 @@
"volume:get_all_snapshots": [],
"volume:extend": [],
"volume:update_readonly_flag": [],
"volume:retype": [],
"volume_extension:types_manage": [["rule:admin_api"]],
"volume_extension:types_extra_specs": [["rule:admin_api"]],