Update retype API to use versionedobjects

The following patch updates retype API to use volume
versionedobjects.  Changes were made to be backwards
compatible with older RPC clients.  It only includes
changes to the core cinder code.  Changes in the
drivers are left to each driver maintainer to update.

Note that this patch DOES NOT try to use object dot
notation everywhere, since it would increase the
size of the patch.  Instead, it will be done in
subsequent patches.

Change-Id: Ie79abf085349b496930fb75a76f299e65587ba6d
Partial-Implements: blueprint cinder-objects
This commit is contained in:
Thang Pham 2015-09-26 19:09:39 -07:00
parent dabc7dedbb
commit 8ae1483370
9 changed files with 184 additions and 99 deletions

View File

@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.9'
RPC_API_VERSION = '1.10'
target = messaging.Target(version=RPC_API_VERSION)
@ -182,7 +182,7 @@ class SchedulerManager(manager.Manager):
force_host_copy)
def retype(self, context, topic, volume_id,
request_spec, filter_properties=None):
request_spec, filter_properties=None, volume=None):
"""Schedule the modification of a volume's type.
:param context: the request context
@ -190,10 +190,17 @@ class SchedulerManager(manager.Manager):
:param volume_id: the ID of the volume to retype
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
:param volume: the volume object to retype
"""
self._wait_for_scheduler()
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
@ -204,14 +211,13 @@ class SchedulerManager(manager.Manager):
self._set_volume_state_and_notify('retype', volume_state,
context, ex, request_spec, msg)
volume_ref = db.volume_get(context, volume_id)
reservations = request_spec.get('quota_reservations')
new_type = request_spec.get('volume_type')
if new_type is None:
msg = _('New volume type not specified in request_spec.')
ex = exception.ParameterNotFound(param='volume_type')
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
volume, msg, reservations)
# Default migration policy is 'never'
migration_policy = request_spec.get('migration_policy')
@ -225,15 +231,15 @@ class SchedulerManager(manager.Manager):
except exception.NoValidHost as ex:
msg = (_("Could not find a host for volume %(volume_id)s with "
"type %(type_id)s.") %
{'type_id': new_type['id'], 'volume_id': volume_id})
{'type_id': new_type['id'], 'volume_id': volume.id})
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
volume, msg, reservations)
except Exception as ex:
with excutils.save_and_reraise_exception():
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, None, reservations)
volume, None, reservations)
else:
volume_rpcapi.VolumeAPI().retype(context, volume_ref,
volume_rpcapi.VolumeAPI().retype(context, volume,
new_type['id'], tgt_host,
migration_policy, reservations)

View File

@ -43,6 +43,7 @@ class SchedulerAPI(object):
1.7 - Add get_active_pools method
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
1.10 - Adds support for sending objects over RPC in retype()
"""
RPC_API_VERSION = '1.0'
@ -107,15 +108,20 @@ class SchedulerAPI(object):
filter_properties=filter_properties)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
request_spec=None, filter_properties=None, volume=None):
cctxt = self.client.prepare(version='1.4')
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'retype',
topic=topic,
volume_id=volume_id,
request_spec=request_spec_p,
filter_properties=filter_properties)
msg_args = {'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.10'):
version = '1.10'
msg_args['volume'] = volume
else:
version = '1.4'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):

View File

@ -128,14 +128,31 @@ class SchedulerRpcAPITestCase(test.TestCase):
filter_properties='filter_properties',
version='1.3')
def test_retype(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_retype(self, can_send_version):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.10')
can_send_version.assert_called_with('1.10')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_retype_old(self, can_send_version):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.4')
can_send_version.assert_called_with('1.10')
def test_manage_existing(self):
self._test_scheduler_api('manage_existing',

View File

@ -225,37 +225,37 @@ class SchedulerManagerTestCase(test.TestCase):
request_spec, {})
@mock.patch('cinder.db.volume_update')
@mock.patch('cinder.db.volume_get')
def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get,
_mock_vol_update):
@mock.patch('cinder.db.volume_attachment_get_used_by_volume_id')
def test_retype_volume_exception_returns_volume_state(
self, _mock_vol_attachment_get, _mock_vol_update):
# Test NoValidHost exception behavior for retype.
# Puts the volume in original state and eats the exception.
volume = tests_utils.create_volume(self.context,
status='retyping',
previous_status='in-use')
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.attach_volume(self.context, volume['id'],
instance_uuid, None, '/dev/fake')
fake_volume_id = volume.id
volume_attach = tests_utils.attach_volume(self.context, volume.id,
instance_uuid, None,
'/dev/fake')
_mock_vol_attachment_get.return_value = [volume_attach]
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id, 'volume_type': {'id': 3},
request_spec = {'volume_id': volume.id, 'volume_type': {'id': 3},
'migration_policy': 'on-demand'}
_mock_vol_get.return_value = volume
_mock_vol_update.return_value = {'status': 'in-use'}
_mock_find_retype_host = mock.Mock(
side_effect=exception.NoValidHost(reason=""))
orig_retype = self.manager.driver.find_retype_host
self.manager.driver.find_retype_host = _mock_find_retype_host
self.manager.retype(self.context, topic, fake_volume_id,
self.manager.retype(self.context, topic, volume.id,
request_spec=request_spec,
filter_properties={})
filter_properties={},
volume=volume)
_mock_vol_get.assert_called_once_with(self.context, fake_volume_id)
_mock_find_retype_host.assert_called_once_with(self.context,
request_spec, {},
'on-demand')
_mock_vol_update.assert_called_once_with(self.context, fake_volume_id,
_mock_vol_update.assert_called_once_with(self.context, volume.id,
{'status': 'in-use'})
self.manager.driver.find_retype_host = orig_retype

View File

@ -683,6 +683,17 @@ class VolumeTestCase(BaseVolumeTestCase):
False,
FAKE_METADATA_TYPE.fake_type)
@mock.patch('cinder.db.volume_update')
def test_update_with_ovo(self, volume_update):
"""Test update volume using oslo_versionedobject."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_api = cinder.volume.api.API()
updates = {'display_name': 'foobbar'}
volume_api.update(self.context, volume, updates)
volume_update.assert_called_once_with(self.context, volume.id,
updates)
self.assertEqual('foobbar', volume.display_name)
def test_delete_volume_metadata_with_metatype(self):
"""Test delete volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
@ -3908,7 +3919,6 @@ class VolumeTestCase(BaseVolumeTestCase):
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
@ -4753,15 +4763,16 @@ class VolumeMigrationTestCase(VolumeTestCase):
host=CONF.host, status='retyping',
volume_type_id=old_vol_type['id'],
replication_status=rep_status)
volume['previous_status'] = 'available'
volume.previous_status = 'available'
volume.save()
if snap:
self._create_snapshot(volume['id'], size=volume['size'])
self._create_snapshot(volume.id, size=volume.size)
if driver or diff_equal:
host_obj = {'host': CONF.host, 'capabilities': {}}
else:
host_obj = {'host': 'newhost', 'capabilities': {}}
reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
reserve_opts = {'volumes': 1, 'gigabytes': volume.size}
QUOTAS.add_volume_type_opts(self.context,
reserve_opts,
vol_type['id'])
@ -4782,20 +4793,21 @@ class VolumeMigrationTestCase(VolumeTestCase):
_mig.return_value = True
if not exc:
self.volume.retype(self.context, volume['id'],
self.volume.retype(self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
reservations=reservations,
volume=volume)
else:
self.assertRaises(exc, self.volume.retype,
self.context, volume['id'],
self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
get_volume.assert_called_once_with(self.context, volume['id'])
reservations=reservations,
volume=volume)
# get volume/quota properties
volume = db.volume_get(elevated, volume['id'])
volume = objects.Volume.get_by_id(elevated, volume.id)
try:
usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
volumes_in_use = usage.in_use
@ -4804,19 +4816,19 @@ class VolumeMigrationTestCase(VolumeTestCase):
# check properties
if driver or diff_equal:
self.assertEqual(vol_type['id'], volume['volume_type_id'])
self.assertEqual('available', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(vol_type['id'], volume.volume_type_id)
self.assertEqual('available', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
elif not exc:
self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
self.assertEqual('retyping', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(old_vol_type['id'], volume.volume_type_id)
self.assertEqual('retyping', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
else:
self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
self.assertEqual('available', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(old_vol_type['id'], volume.volume_type_id)
self.assertEqual('available', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(0, volumes_in_use)
def test_retype_volume_driver_success(self):

View File

@ -399,7 +399,9 @@ class VolumeRpcAPITestCase(test.TestCase):
error=False,
version='1.10')
def test_retype(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_retype(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
@ -407,12 +409,31 @@ class VolumeRpcAPITestCase(test.TestCase):
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
volume=self.fake_volume,
volume=self.fake_volume_obj,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.34')
can_send_version.assert_called_once_with('1.34')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_retype_old(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
volume=self.fake_volume_obj,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.12')
can_send_version.assert_called_once_with('1.34')
def test_manage_existing(self):
self._test_volume_api('manage_existing',

View File

@ -456,8 +456,16 @@ class API(base.Base):
msg = _("The volume cannot be updated during maintenance.")
raise exception.InvalidVolume(reason=msg)
vref = self.db.volume_update(context, volume['id'], fields)
LOG.info(_LI("Volume updated successfully."), resource=vref)
# NOTE(thangp): Update is called by various APIs, some of which are
# not yet using oslo_versionedobjects. We need to handle the case
# where volume is either a dict or a oslo_versionedobject.
if isinstance(volume, objects_base.CinderObject):
volume.update(fields)
volume.save()
LOG.info(_LI("Volume updated successfully."), resource=volume)
else:
vref = self.db.volume_update(context, volume['id'], fields)
LOG.info(_LI("Volume updated successfully."), resource=vref)
def get(self, context, volume_id, viewable_admin_meta=False):
volume = objects.Volume.get_by_id(context, volume_id)
@ -1436,18 +1444,18 @@ class API(base.Base):
@wrap_check_policy
def retype(self, context, volume, new_type, migration_policy=None):
"""Attempt to modify the type associated with an existing volume."""
if volume['status'] not in ['available', 'in-use']:
if volume.status not in ['available', 'in-use']:
msg = _('Unable to update type due to incorrect status: '
'%(vol_status)s on volume: %(vol_id)s. Volume status '
'must be available or '
'in-use.') % {'vol_status': volume['status'],
'vol_id': volume['id']}
'in-use.') % {'vol_status': volume.status,
'vol_id': volume.id}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if self._is_volume_migrating(volume):
msg = (_("Volume %s is already part of an active migration.")
% volume['id'])
% volume.id)
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1457,8 +1465,7 @@ class API(base.Base):
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
cg_id = volume.get('consistencygroup_id', None)
if cg_id:
if volume.consistencygroup_id:
msg = _("Volume must not be part of a consistency group.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1480,16 +1487,16 @@ class API(base.Base):
vol_type_qos_id = vol_type['qos_specs_id']
old_vol_type = None
old_vol_type_id = volume['volume_type_id']
old_vol_type_id = volume.volume_type_id
old_vol_type_qos_id = None
# Error if the original and new type are the same
if volume['volume_type_id'] == vol_type_id:
if volume.volume_type_id == vol_type_id:
msg = _('New volume_type same as original: %s.') % new_type
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
if volume['volume_type_id']:
if volume.volume_type_id:
old_vol_type = volume_types.get_volume_type(
context, old_vol_type_id)
old_vol_type_qos_id = old_vol_type['qos_specs_id']
@ -1506,14 +1513,14 @@ class API(base.Base):
# We don't support changing QoS at the front-end yet for in-use volumes
# TODO(avishay): Call Nova to change QoS setting (libvirt has support
# - virDomainSetBlockIoTune() - Nova does not have support yet).
if (volume['status'] != 'available' and
if (volume.status != 'available' and
old_vol_type_qos_id != vol_type_qos_id):
for qos_id in [old_vol_type_qos_id, vol_type_qos_id]:
if qos_id:
specs = qos_specs.get_qos_specs(context.elevated(), qos_id)
if specs['consumer'] != 'back-end':
msg = _('Retype cannot change front-end qos specs for '
'in-use volume: %s.') % volume['id']
'in-use volume: %s.') % volume.id
raise exception.InvalidInput(reason=msg)
# We're checking here in so that we can report any quota issues as
@ -1523,17 +1530,17 @@ class API(base.Base):
vol_type_id)
self.update(context, volume, {'status': 'retyping',
'previous_status': volume['status']})
'previous_status': volume.status})
request_spec = {'volume_properties': volume,
'volume_id': volume['id'],
'volume_id': volume.id,
'volume_type': vol_type,
'migration_policy': migration_policy,
'quota_reservations': reservations}
self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume['id'],
self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume.id,
request_spec=request_spec,
filter_properties={})
filter_properties={}, volume=volume)
LOG.info(_LI("Retype volume request issued successfully."),
resource=volume)

View File

@ -190,7 +190,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.33'
RPC_API_VERSION = '1.34'
target = messaging.Target(version=RPC_API_VERSION)
@ -2039,22 +2039,28 @@ class VolumeManager(manager.SchedulerDependentManager):
resource=volume)
def retype(self, ctxt, volume_id, new_type_id, host,
migration_policy='never', reservations=None):
migration_policy='never', reservations=None, volume=None):
def _retype_error(context, volume_id, old_reservations,
def _retype_error(context, volume, old_reservations,
new_reservations, status_update):
try:
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
finally:
QUOTAS.rollback(context, old_reservations)
QUOTAS.rollback(context, new_reservations)
context = ctxt.elevated()
volume_ref = self.db.volume_get(ctxt, volume_id)
status_update = {'status': volume_ref['previous_status']}
if context.project_id != volume_ref['project_id']:
project_id = volume_ref['project_id']
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
status_update = {'status': volume.previous_status}
if context.project_id != volume.project_id:
project_id = volume.project_id
else:
project_id = context.project_id
@ -2069,19 +2075,21 @@ class VolumeManager(manager.SchedulerDependentManager):
# set the volume status to error. Should that be done
# here? Setting the volume back to it's original status
# for now.
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
# Get old reservations
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume_ref.get('volume_type_id'))
volume.volume_type_id)
old_reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
except Exception:
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
LOG.exception(_LE("Failed to update usages "
"while retyping volume."))
raise exception.CinderException(_("Failed to get old volume type"
@ -2093,7 +2101,7 @@ class VolumeManager(manager.SchedulerDependentManager):
# If volume types have the same contents, no need to do anything
retyped = False
diff, all_equal = volume_types.volume_types_diff(
context, volume_ref.get('volume_type_id'), new_type_id)
context, volume.volume_type_id, new_type_id)
if all_equal:
retyped = True
@ -2113,7 +2121,7 @@ class VolumeManager(manager.SchedulerDependentManager):
try:
new_type = volume_types.get_volume_type(context, new_type_id)
ret = self.driver.retype(context,
volume_ref,
volume,
new_type,
diff,
host)
@ -2125,49 +2133,49 @@ class VolumeManager(manager.SchedulerDependentManager):
retyped = ret
if retyped:
LOG.info(_LI("Volume %s: retyped successfully"), volume_id)
LOG.info(_LI("Volume %s: retyped successfully"), volume.id)
except Exception:
retyped = False
LOG.exception(_LE("Volume %s: driver error when trying to "
"retype, falling back to generic "
"mechanism."), volume_ref['id'])
"mechanism."), volume.id)
# We could not change the type, so we need to migrate the volume, where
# the destination volume will be of the new type
if not retyped:
if migration_policy == 'never':
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Retype requires migration but is not allowed.")
raise exception.VolumeMigrationFailed(reason=msg)
snaps = objects.SnapshotList.get_all_for_volume(context,
volume_ref['id'])
volume.id)
if snaps:
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Don't allow volume with replicas to be migrated
rep_status = volume_ref['replication_status']
rep_status = volume.replication_status
if rep_status is not None and rep_status != 'disabled':
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self.db.volume_update(context, volume_ref['id'],
{'migration_status': 'starting'})
volume.migration_status = 'starting'
volume.save()
try:
self.migrate_volume(context, volume_id, host,
self.migrate_volume(context, volume.id, host,
new_type_id=new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
else:
model_update = {'volume_type_id': new_type_id,
@ -2175,7 +2183,8 @@ class VolumeManager(manager.SchedulerDependentManager):
'status': status_update['status']}
if retype_model_update:
model_update.update(retype_model_update)
self.db.volume_update(context, volume_id, model_update)
volume.update(model_update)
volume.save()
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
@ -2183,7 +2192,7 @@ class VolumeManager(manager.SchedulerDependentManager):
QUOTAS.commit(context, new_reservations, project_id=project_id)
self.publish_service_capabilities(context)
LOG.info(_LI("Retype volume completed successfully."),
resource=volume_ref)
resource=volume)
def manage_existing(self, ctxt, volume_id, ref=None):
try:

View File

@ -81,6 +81,7 @@ class VolumeAPI(object):
args. Forwarding CGSnapshot object instead of CGSnapshot_id.
1.32 - Adds support for sending objects over RPC in create_volume().
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
"""
BASE_RPC_API_VERSION = '1.0'
@ -253,14 +254,20 @@ class VolumeAPI(object):
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.12')
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'retype', volume_id=volume['id'],
new_type_id=new_type_id, host=host_p,
migration_policy=migration_policy,
reservations=reservations)
msg_args = {'volume_id': volume.id, 'new_type_id': new_type_id,
'host': host_p, 'migration_policy': migration_policy,
'reservations': reservations}
if self.client.can_send_version('1.34'):
version = '1.34'
msg_args['volume'] = volume
else:
version = '1.12'
new_host = utils.extract_host(volume.host)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref):
new_host = utils.extract_host(volume['host'])